Use Apache Pulsar as Streaming Table with 8 Lines of Code

head img
Yijie Shen
August 28, 2019

Why redesign Pulsar Flink connector

In our previous post, we presented Apache Pulsar as a cloud-native messaging system that is designed for streaming performance and scalability, analyzed the integration status with Apache Flink 1.6, and looked forward to possible future integrations.

Recently, as Apache Flink just released version 1.9.0 with many notable features, we’ve reconsidered the previous integration and decided to redesign it from the ground up. In the rework, we followed these two principles:

In the next sections, we would present the use and the design of the new Pulsar Flink connector.

Register a Pulsar table with a minimum number of taps

For messages sent to Pulsar, we know everything about them:

Therefore, users are supposed to concern less on these storage details and concentrate more on their business logic. A Pulsar streaming table could be instantly composed:

    
    
	val prop = new Properties()
	prop.setProperty("service.url", "pulsar://....")
	prop.setProperty("admin.urrl", "http://....")
	prop.setProperty("topicsPattern", "topic-*")
	tableEnv
		.connect(new Pulsar().properties(prop))
		.inAppendMode()
		.registerTableSource("table1")
	    
	    

From now on, you could print the schema of the table1, select desired fields, and build analysis based on the table1. Behind the scenes, we do several tedious works for you:

The figure below provides some implementation details of a source task:

Components for a Pulsar source task

All source tasks share the same logic on distributing partition among tasks. Therefore, each discoverer can identify whether it is responsible for one newly come partition and start a reader accordingly.

Exactly-once Pulsar source

Pulsar is built upon the log abstraction: messages in each topic/partition are durably stored in order and can be uniquely identified by a message ID. It’s replayable by nature.

Therefore, whenever a snapshot for a checkpoint is requested for a source task, the task checks all reader threads with its reading position and adds each (topic-name, message ID) pair to the state.

When recovering from failure, the reader threads seek the snapshotted message ID and re-consume all messages after it.

Keep messages alive until a checkpoint is finished

By default, Pulsar brokers immediately delete all messages that have been acknowledged by a consumer.

However, we cannot ack messages in reader thread immediately since the dataflow graph would fail, and we need to replay sources by message ID.

In Flink, when a checkpoint is finished, it means all records that the source had emitted before the checkpoint went through the streaming dataflow and updated the corresponding operator states which are also snapshotted.

From then on, Flink doesn’t need to replay these messages anymore, and Pulsar can safely delete them to save space.

We use a durable cursor to keep un-checkpointed messages alive for each topic/partition. Whenever a checkpoint is finished, we move the cursor to checkpointed message IDs. As shown in the figure below, the durable cursor is moved forward once the checkpoint is completed.

Checkpoint

At-least-once Pulsar sink

When you send a message to Pulsar using sendAsync, your message will be buffered in a pendingMessages queue, and you will get a CompletableFuture handle. You can register a callback with the handle and get notified once the sending is complete. Another Pulsar producer API flush sends all messages buffered in the client directly and wait until all messages have been successfully persisted.

We use these two APIs in our Pulsar sink implementation to guarantee its at-least-once semantic. For each record we receive in the sink, we send it to Pulsar with sendAsync and maintain a count pendingRecords that has not been persistent.

On each checkpoint, we call flush() manually and wait for message acknowledgments from Pulsar brokers. The checkpoint is considered complete when we get all acknowledgments and pendingRecords decreases to 0, and the checkpoint is regarded as a failure if an exception occurs while persisting messages.

By default, a failing checkpoint in Flink causes an exception that results in an application restart; therefore, messages are guaranteed to be persisted at least once.

Future directions

We have the following plans under our belts.

Unified source API for both batch and streaming execution

FLIP-27 is brought up again recently since Flink community starts to prepare its 1.10 features. We would stay tuned to its status and bring our new connector to batch/streaming compatible.

Pulsar as a state backend

Since Pulsar has a layered architecture (Streams and Segmented Streams, powered by Apache Bookkeeper), it becomes natural to use Pulsar as a storage layer and store Flink state.

Scale-out source parallelism

Currently, source parallelism has an upper limit to the number of topic partitions. For the upcoming Pulsar 2.5.0, key-shared subscription and sticky consumer enables us to scale-out source parallelism while maintaining the semantics of exact-once.

End-to-end exactly-once

One of the vital features in Pulsar 2.5.0 is transaction support. Once transactional produce is enabled, we could achieve end-to-end exactly-once with Flink two-phase commit sink.

More information

Do you like this blog? Share it now

twitter linkedIn