Announcing: The Pulsar PMC Published The 2020 Apache Pulsar User Survey Report!

Overview
Get started
Install and upgrade
Configure
Secure
Manage and monitor
Connect
Process
Overview
Pulsar Flink Connector
Overview
Link
Read data from Pulsar (source)
Write data to Pulsar (sink)
Configure
Secure
Use Pulsar catalog
Tutorial
Pulsar Spark Connector
Pulsar Functions
Release notes

Write data to Pulsar (sink)

Prerequisite

You can use one of the following methods to use the Pulsar Flink connector and you need to configure it before using the connector.

  • Client library: you can use all features of Pulsar Flink connector (Java and Scala).

  • Scala REPL: you can use all features of Pulsar Flink connector in interactive mode (Scala).

  • SQL client: you can use some features of Pulsar Flink connector since this is the beta feature of Flink (Flink SQL).

Client library

As with any Flink applications, ./bin/flink run is used to compile and launch your application.

Note

The format of a path must be a protocol (for example, file://) and the path should be accessible on all nodes.

Example

$ ./bin/flink run
  -c com.example.entry.point.ClassName file://path/to/jars/your_jar.jar
  ...

Scala REPL

For experimenting on the interactive Scala shell bin/start-scala-shell.sh, you can use --addclasspath to add pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar directly.

Example

$ ./bin/start-scala-shell.sh remote <hostname> <portnumber>
  --addclasspath pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar

For more information about full reference, see here.

SQL Client

For playing with SQL Client Beta and writing queries in SQL to manipulate data in Pulsar, you can use --jar to add pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar directly.

Example

$ ./bin/sql-client.sh embedded --jar pulsar-flink-connector_{{SCALA_BINARY_VERSION}}-{{PULSAR_FLINK_VERSION}}.jar

By default, to use Pulsar catalog in SQL client and get it registered automatically at startup, the SQL Client reads its configuration from the environment file ./conf/sql-client-defaults.yaml. You need to add Pulsar catalog to catalogs section in this YAML file:

catalogs:
- name: pulsarcatalog
    type: pulsar
    default-database: tn/ns
    service-url: "pulsar://localhost:6650"
    admin-url: "http://localhost:8080"

Write data to Pulsar (sink)

Overview

Pulsar Flink Connector allows Flink writing data to Pulsar.

Flink’s Pulsar sink is called FlinkPulsarSink for POJO class and FlinkPulsarRowSink for Flink Row type.

Example

FlinkPulsarSink<Person> sink = new FlinkPulsarSink(
  serviceUrl,
  adminUrl,
  Optional.of(topic),      // mandatory target topic or use `Optional.empty()` if sink to different topics for each record
  props,
  TopicKeyExtractor.NULL,  // replace this to extract key or topic for each record
  Person.class);

stream.addSink(sink);

Designate output topic

As demonstrated in the previous example, when writing data to Pulsar using FlinkPulsarSink, you have two ways for specifying the output topic for each topic:

Mandatory topic for all the records. By specifying a topic on the third argument of the constructor (for example, Optional.of(topic)), all records generated by the Flink job are written to the same topic.

Define a topic/key extractor and get the target topic for each record separately. By implementing the TopicKeyExtractor interface, you could define how to extract a topic from a record and route the message to that topic.

Pulsar sink and fault tolerance

With Flink's checkpointing enabled, the FlinkPulsarSink and FlinkPulsarRowSink can provide at-least-once delivery guarantees.

Besides enabling Flink's checkpointing, you should also configure the setter setFlushOnCheckpoint(boolean) appropriately.

By default, setFlushOnCheckpoint(boolean) is set to true. With this enabled, Flink's checkpoints wait for any on-the-fly records at the time of the checkpoint to be acknowledged by Pulsar before succeeding the checkpoint. This ensures that all records before the checkpoint have been written to Pulsar.

Note

setFlushOnCheckpoint(boolean) must be enabled for at-least-once.

Limitation

Currently, Pulsar Flink Connector provides at-least-once semantic. Consequently, when writing either streaming queries or batch queries to Pulsar, some records may be duplicated.

A possible solution to remove duplicates when reading the written data could be to introduce a primary (unique) key that can be used to perform deduplication when reading.