Sep 29, 2022
7 min read

Announcing the Flink-Pulsar SQL Connector

Yufei Zhang

We are happy to announce that the Flink-Pulsar SQL Connector has been released and is available for download and use. The Flink-Pulsar SQL Connector supports querying data from and writing data to Pulsar topics with simple Flink SQL queries. With this connector, you can easily create Flink + Pulsar pipelines without writing Java or Scala. Read this blog to learn about the benefits and features of this connector and how to use it.

What is the Flink-Pulsar SQL Connector?

The Flink community provides the SQL and Table API to express Flink jobs using SQL queries. The Flink-Pulsar SQL Connector allows Flink SQL to read from and write to Pulsar topics with simple “SELECT FROM” and “INSERT INTO” statements.

Note: The Flink-Pulsar SQL Connector is implemented based on the Pulsar DataStream Connector and inherits most of the DataStream Connector’s configurations.

What are the benefits of using the Flink-Pulsar SQL Connector?

The Flink-Pulsar SQL Connector provides three key benefits:

  • Ease of Use: This connector allows you to discover real-time data values in Pulsar by submitting Flink jobs via SQL queries without the need to write and deploy Java. You can start queries from Pulsar topics using SQL with native tables without writing CREATE TABLE statements.
  • Scalability: The Flink-Pulsar SQL Connector inherits high scalability from the underlying DataStream Connector, which is designed to be scalable by using the newest source and sink APIs.
  • Flexibility: The Flink-Pulsar SQL Connector gives you the flexibility to subscribe to a topic pattern before a topic matching that pattern is even created. The connector is able to discover newly added topics during runtime without restarting the job.

Features of the Flink-Pulsar SQL Connector

  • Define columns from message metadata: The Flink-Pulsar SQL Connector allows you to map the metadata of a Pulsar message, such as event_time, producer_name, publish_time etc, to Flink table columns. This can be useful when defining a watermark based on the time attributes metadata or enriching the Flink record with topic names.
  • Dynamic topic discovery: Similar to the DataStream Connector, the Flink-Pulsar SQL Connector allows you to define topic patterns, and you can add new data by adding new topics while the Flink SQL job is running. This is useful because when a source topic needs to be scaled up, you don’t need to restart the Flink job.
  • Avro and JSON format: The Flink-Pulsar SQL Connector supports Flink’s JSON and Avro formats to read corresponding binary data stored in Pulsar topics. It can also automatically derive the Flink table schema when reading from a Pulsar topic with JSON or Avro schema. Read this document to learn more about this feature.
  • PulsarCatalog: PulsarCatalog allows you to use a Pulsar cluster as metadata storage for Flink tables. It supports defining two types of tables: explicit and native. native table allows you to read from Pulsar topics without creating the Flink table explicitly, thus the name “native”. Read this document for a detailed description of explicit and native tables.

Get started with the Flink-Pulsar SQL Connector


  • For the Table API program, add the Flink-Pulsar SQL connector to your dependencies.
  • For Flink SQL queries with SQL Client, download the SQL jar and add it to the classpath when starting the SQL client. For example: “./bin/ --jar flink-sql-connector-pulsar-”

MavenSQL Jario.streamnative.connectors flink-sql-connector-pulsar JAR

How to use the Flink-Pulsar SQL Connector

The sample code below demonstrates how to use PulsarCatalog and the Flink-Pulsar SQL Connector. You can find all the code below in the flink-example and sql-examples repositories.

  1. Create PulsarCatalog.
  WITH (
    'type' = 'pulsar-catalog',
    'catalog-admin-url' = 'http://pulsar:8080',
    'catalog-service-url' = 'pulsar://pulsar:6650'
  1. Create an explicit table with watermark strategies.
CREATE TABLE sql_user (
    name STRING,
    age INT,
    income DOUBLE,
    single BOOLEAN,
    createTime BIGINT,
    row_time AS cast(TO_TIMESTAMP(FROM_UNIXTIME(createTime / 1000), 'yyyy-MM-dd HH:mm:ss') as timestamp(3)),
    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'pulsar',
  'topics' = 'persistent://sample/flink/user',
  'format' = 'json'
  1. Run a window query for the table.
SELECT single,
 TUMBLE_START(row_time, INTERVAL '10' SECOND) AS sStart,
 SUM(age) as age_sum from `sql_user`
 GROUP BY TUMBLE(row_time, INTERVAL '10' SECOND), single;
  1. Write into the same table.
INSERT INTO `sql_user` VALUES ('user 1', 11, 25000.0, true, 1656831003);

So far we covered how to create and query from an explicit table. Next we can query directly from the native table mapped from the topic persistent://sample/flink/user.

  1. Read 10 records from the native table named user.
SELECT * FROM `user` LIMIT 10;

For more information, refer to the Flink-Pulsar SQL Connector documentation.

What’s next?

Currently the Flink-Pulsar SQL Connector does not support defining PRIMARY KEY, so it cannot support Change Data Capture (CDC) formats with an upsert/delete operation. We will improve the connector to support upsert mode for CDC scenarios.

Get involved

The Flink-Pulsar SQL Connector is a community-driven initiative. To get involved with the Flink-Pulsar SQL Connector, check out the following resources:

  • Try out the Flink-Pulsar SQL Connector: Download the connector and read the documentation to learn more about it.
  • Make a contribution: If you have any feature requests or bug reports, do not hesitate to share your feedback and ideas by submitting a pull request.
  • Contact us: Feel free to create an issue on GitHub, send emails to the Pulsar mailing list, or message us on Twitter to get answers from Pulsar experts

Yufei Zhang


Our strategies and tactics delivered right to your inbox

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Pulsar Connectors