Announcing: The Pulsar PMC Published The 2020 Apache Pulsar User Survey Report!

Get started
Install and upgrade
Manage and monitor
Pulsar Flink Connector
Read data from Pulsar (source)
Write data to Pulsar (sink)
Use Pulsar catalog
Pulsar Spark Connector
Pulsar Functions
Release notes

Read data from Pulsar (source)


You can use one of the following methods to use the Pulsar Flink connector and you need to configure it before using the connector.

  • Client library: you can use all features of Pulsar Flink connector (Java and Scala).

  • Scala REPL: you can use all features of Pulsar Flink connector in interactive mode (Scala).

  • SQL client: you can use some features of Pulsar Flink connector since this is the beta feature of Flink (Flink SQL).

Client library

As with any Flink applications, ./bin/flink run is used to compile and launch your application.


The format of a path must be a protocol (for example, file://) and the path should be accessible on all nodes.


$ ./bin/flink run
  -c com.example.entry.point.ClassName file://path/to/jars/your_jar.jar

Scala REPL

For experimenting on the interactive Scala shell bin/, you can use --addclasspath to add pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar directly.


$ ./bin/ remote <hostname> <portnumber>
  --addclasspath pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar

For more information about full reference, see here.

SQL Client

For playing with SQL Client Beta and writing queries in SQL to manipulate data in Pulsar, you can use --jar to add pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar directly.


$ ./bin/ embedded --jar pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar

By default, to use Pulsar catalog in SQL client and get it registered automatically at startup, the SQL Client reads its configuration from the environment file ./conf/sql-client-defaults.yaml. You need to add Pulsar catalog to catalogs section in this YAML file:

- name: pulsarcatalog
    type: pulsar
    default-database: tn/ns
    service-url: "pulsar://localhost:6650"
    admin-url: "http://localhost:8080"

Read data from Pulsar (source)


Pulsar Flink Connector allows Flink reading data from Pulsar.

Flink's Pulsar consumer is called FlinkPulsarSource<T> or just FlinkPulsarRowSource with data schema auto-inferring. It provides access to one or more Pulsar topics.

The consumer’s constructor accepts the following arguments:

  • The service URL and admin URL for the Pulsar instance to connect to.

  • A DeserializationSchema for deserializing the data from Pulsar when using FlinkPulsarSource.

  • Properties for the Pulsar Source.

    One of the following properties are required to specify topic(s) to consume:

    • topic

    • topics (this is a comma-separated list of topics, and topicsPattern is a Java regex string used to support pattern matching for topic names)

    • topicsPattern


This is a consumer’s constructor.

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("topic", "test-source-topic")
FlinkPulsarSource<String> source = new FlinkPulsarSource<>(serviceUrl, adminUrl, new SimpleStringSchema(), props);

DataStream<String> stream = see.addSource(source);

// chain operations on dataStream of String and sink the output
// end method chaining


Extract structured data form Pulsar message

You can use the following method to extract structured data from Pulsar message.


When FlinkPulsarSource<T> is used, it needs to know how to turn the binary data in Pulsar into Java/Scala objects. The DeserializationSchema allows users to specify such a schema. The T deserialize(byte[] message) method gets called for each Pulsar message, passing the value from Pulsar.

It is usually helpful to start from the AbstractDeserializationSchema, which takes care of describing the produced Java or Scala type to Flink's type system. Users that implement a vanilla DeserializationSchema need to implement the getProducedType(...) method themselves.

For convenience, we provides the following DeserializationSchema:

  • JsonDeser: if the topic is of JSONSchema in Pulsar, you can use JsonDeser.of(POJO_CLASS_NAME.class) for DeserializationSchema.

  • AvroDeser: if the topic is of AVROSchema in Pulsar, you can use AvroDeser.of(POJO_CLASS_NAME.class) for DeserializationSchema.

Schema for FlinkPulsarRowSource

FlinkPulsarRowSource can be used for reading topics with arbitrary schema:

  • For topics without schema or with primitive schema in Pulsar, message payload is loaded to a value column with the corresponding type with Pulsar schema.

  • For topics with Avro or JSON schema, their field names and field types are kept in the result rows.

For both topics, each row in the source has the following metadata fields.

Column Type
__key Bytes
__topic String
__messageId Bytes
__publishTime Timestamp
__eventTime Timestamp

Example 1

The following is the schema of a Pulsar topic with Schema.DOUBLE:

|-- value: DOUBLE
|-- __key: BYTES
|-- __topic: STRING
|-- __messageId: BYTES
|-- __publishTime: TIMESTAMP(3)
|-- __eventTime: TIMESTAMP(3)

Example 2

The Pulsar topic with AVRO schema s converted to a Flink table has the following schema.

public static class Foo {
    public int i;
    public float f;
    public Bar bar;

public static class Bar {
    public boolean b;
    public String s;

Schema s = Schema.AVRO(Foo.getClass());
|-- i: INT
|-- f: FLOAT
|-- bar: ROW<`b` BOOLEAN, `s` STRING>
|-- __key: BYTES
|-- __topic: STRING
|-- __messageId: BYTES
|-- __publishTime: TIMESTAMP(3)
|-- __eventTime: TIMESTAMP(3)

Pulsar sources start position configuration

The Flink Pulsar source allows configuring how the start position for Pulsar partitions are determined.


final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkPulsarSource<String> myConsumer = new FlinkPulsarSource<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record (the default behaviour)

DataStream<String> stream = env.addSource(myConsumer);

Both FlinkPulsarSource and FlinkPulsarRowSource have the explicit configuration methods above for start position.

You can also specify the exact offsets the source should start from for each partition.


Map<String, MessageId> offset = new HashMap<>();
offset.put("topic1-partition-0", mid1);
offset.put("topic1-partition-1", mid2);
offset.put("topic1-partition-2", mid3);


The example above configures the consumer to start from the specified offsets for partitions 0, 1, and 2 of topic1. The offset values should be the next record that the consumer should read for each partition.


  • If the consumer needs to read a partition which does not have a specified offset within the provided offsets map, it uses the default offsets behaviour (for example, setStartLatest()) for that particular partition.
  • These start position configuration methods do not affect the start position when the job is automatically restored from a failure or manually restored using a savepoint. On restore, the start position of each Pulsar partition is determined by the offsets stored in the savepoint or checkpoint. For more information about the checkpoint to enable fault tolerance for the consumer, see Pulsar source and fault tolerance.

Pulsar source and fault tolerance

With Flink's checkpointing enabled, the Flink Pulsar source consumes records from a topic and periodically checkpoint its Pulsar offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink restores the streaming program to the state of the latest checkpoint and re-consume the records from Pulsar, starting from the offsets that are stored in the checkpoint.

The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.

To use Pulsar source and fault tolerance, checkpointing of the topology needs to be enabled at the execution environment:


final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs

Also note that Flink can only restart the topology if enough processing slots are available to restart the topology.

So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.

Flink on YARN supports automatic restart of lost YARN containers.

Topic and partition discovery of Pulsar source

The Flink Pulsar source supports discovering dynamically created Pulsar partitions, and consumes them with exactly-once guarantees. All partitions discovered after the initial retrieval of partition metadata (for example, when the job starts running) is consumed from the earliest possible offset.

By default, partition discovery is disabled. To enable it, set a non-negative value for partitionDiscoveryIntervalMillis in the provided properties config, representing the discovery interval in milliseconds.

Pulsar source and timestamp extraction/watermark emission

In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself.

In addition, the user may want to emit watermarks either periodically, or in an irregular fashion, for example, based on special records in the Pulsar stream that contain the current event-time watermark. For these cases, the Flink Pulsar Source allows the specification of an AssignerWithPeriodicWatermarks or an AssignerWithPunctuatedWatermarks.

Internally, an instance of the assigner is executed per Pulsar partition. When such an assigner is specified, for each record read from Pulsar, the extractTimestamp(T element, long previousElementTimestamp) is called to assign a timestamp to the record and the Watermark getCurrentWatermark() (for periodic) or the Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) (for punctuated) is called to determine if a new watermark should be emitted and with which timestamp.