Aug 30, 2022
5 min read

Announcing the Flink-Pulsar Sink Connector

Yufei Zhang
pulsar connector

We are excited to announce that the Flink-Pulsar Sink Connector has been released in Flink 1.15 and is available for download and use. Read this blog to learn about new features that allow for greater flexibility of topic routing, delayed messages, exactly-once delivery, and schema.

What is the Flink-Pulsar Sink Connector?

The Flink-Pulsar Sink Connector is part of the Flink-Pulsar DataStream Connector. It implements Flink’s new SinkV2 API and allows you to write Flink job results back to Pulsar topics seamlessly. This sink connector, when used with the Flink-Pulsar Source Connector, enables you to define an end-to-end, exactly-once streaming pipeline.

New features of the Flink Pulsar Sink Connector

The Flink-Pulsar Sink Connector provides many useful new features:

  • Flexible topic routing strategies: The Flink-Pulsar Sink Connector is not restricted by Pulsar client routing strategies. It allows you to provide a list of topics, partitions, or topic patterns as the destination. The routing strategy can be message key-hash based or implemented by a custom routing strategy. It also allows you to decide on topics during runtime, such as retrieving the topic from the message body. The flexible topic routing strategies can satisfy the most complicated routing use cases.
    For example, if you want to send processed results such as metric data to multiple downstream topics, without the topic pattern mode in the Sink Connector, you need to write a multiple sink pipeline. With the topic pattern mode, you can specify a regex pattern and all records can be sent to the topics matching the regex pattern. In case there are new target topics, you do not need to stop the pipeline to reconfigure the pipeline; you only need to add the topics and the Sink Connector will discover the dynamically added topics automatically at runtime.
    In more complicated use cases where individual records should be sent to different topics determined by some field values, you can choose to implement a custom routing strategy.
  • Delayed message delivery: You can designate a time interval so that downstream connectors will only consume the messages produced by the Flink-Pulsar Sink Connector after this time interval.
  • Exactly-once delivery guarantee: The Flink-Pulsar Sink Connector implements Pulsar transaction and provides an exactly-once delivery guarantee.
  • Flexible schema choice: Schema is used to serialize Flink records to bytes and send them to Pulsar. You can choose a Pulsar schema that the underlying Pulsar producer would then use. In this case, the Pulsar client handles the serialization. You can also use a Flink schema that is serialized before going to the Pulsar client and then sent as pure bytes by the Pulsar producer. You can also implement your own serialization logic based on a specific use case.

The Flink-Pulsar Sink Connector allows you to customize runtime behavior while also providing out-of-box implementations, such as record serializer, message router, and message delayer. For complete documentation, please refer to Flink-Pulsar Sink Connector documentation.

Using the Pulsar Flink Sink Connector

To use the Flink-Pulsar Sink Connector, you need to add the jar dependency in your Flink application.

<script>

    org.apache.flink
    flink-connector-pulsar
    1.16-SNAPSHOT

<script> 

The following sample code illustrates how to use a data generator and the Flink-Pulsar Sink Connector to write data into Pulsar topics.

<script>
public class SimpleSink {

    public static void main(String[] args) throws Exception {
        // Load application configs.
        ApplicationConfigs configs = loadConfig();

        // Create execution environment.
        StreamExecutionEnvironment env = createEnvironment(configs);

        // Create a fake source.
        InfiniteSourceFunction sourceFunction = new InfiniteSourceFunction<>(new FakerGenerator(), 20000);
        DataStreamSource source = env.addSource(sourceFunction);

        // Create Pulsar sink.
        PulsarSink sink = PulsarSink.builder()
            .setServiceUrl(configs.serviceUrl())
            .setAdminUrl(configs.adminUrl())
            .setTopics("persistent://sample/flink/simple-string")
            .setProducerName("flink-sink-%s")
            .setSerializationSchema(flinkSchema(new SimpleStringSchema()))
            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
            .setConfig(fromMap(configs.sinkConfigs()))
            .build();

        source.sinkTo(sink);

        env.execute("Simple Pulsar Sink");
    }
}
<script> 

The documentation is available on the Flink documentation page. For complete examples and demo projects, we have created the streamnative/flink-example repository that contains detailed demo projects using the Flink-Pulsar Sink Connector. This repository also includes DataStream Source Connector and SQL Connector examples. Follow the instructions on the repository readme to get up and running quickly with the examples.

If you are using the legacy Pulsar-Flink Connector, be aware that the legacy Pulsar-Flink Connector and the Flink-Pulsar Sink Connector introduced in this blog are two different implementations and the legacy connector will be deprecated soon. We recommend you start using the Flink-Pulsar Sink Connector.

Get involved

The Flink-Pulsar Sink Connector is being actively maintained and will continue to evolve and provide a better Flink and Pulsar integration experience with the help of the community. To get involved with the Flink-Pulsar Connector, check out the following featured resources:

Yufei Zhang

Newsletter

Our strategies and tactics delivered right to your inbox

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Pulsar Connectors
Apache Pulsar Announcements