In our previous post, we presented Apache Pulsar as a cloud-native messaging system that is designed for streaming performance and scalability, analyzed the integration status with Apache Flink 1.6, and looked forward to possible future integrations.
Recently, as Apache Flink just released version 1.9.0 with many notable features, we’ve reconsidered the previous integration and decided to redesign it from the ground up. In the rework, we followed these two principles:
Regard Table API as first-class citizens, ease its use without compromising expressiveness.
Resilient to failures with exactly-once source and at-least-once sink.
In the next sections, we would present the use and the design of the new Pulsar Flink connector.
For messages sent to Pulsar, we know everything about them:
The schema of the data, whether it is a primitive type or a record with multiple fields.
The storage format of messages in Pulsar, whether it’s AVRO, JSON or Protobuf.
Your desired metadata, such as event time and publish time.
Therefore, users are supposed to concern less on these storage details and concentrate more on their business logic. A Pulsar streaming table could be instantly composed:
val prop = new Properties() prop.setProperty("service.url", "pulsar://....") prop.setProperty("admin.urrl", "http://....") prop.setProperty("topicsPattern", "topic-*") tableEnv .connect(new Pulsar().properties(prop)) .inAppendMode() .registerTableSource("table1")
From now on, you could print the schema of the table1, select desired fields, and build analysis based on the table1. Behind the scenes, we do several tedious works for you:
Find all matching topics currently available and keep an eye on any changes while the streaming job is running.
Fetch schemas for each topic, make sure they all share one same schema; otherwise, it is meaningless to go on with analytics.
Build a scheme-specific deserializer on the initializing phase for each read thread. The deserializer knows the format of messages and converts to one Flink Row for each message.
The figure below provides some implementation details of a source task:
Each reader is a thread and responsible for one topic partition in Pulsar.
For each source task, a topic/partition discoverer thread is scheduled periodically to check newly added topics/partitions.
All source tasks share the same logic on distributing partition among tasks. Therefore, each discoverer can identify whether it is responsible for one newly come partition and start a reader accordingly.
Pulsar is built upon the log abstraction: messages in each topic/partition are durably stored in order and can be uniquely identified by a message ID. It’s replayable by nature.
Therefore, whenever a snapshot for a checkpoint is requested for a source task, the task checks all reader threads with its reading position and adds each (topic-name, message ID) pair to the state.
When recovering from failure, the reader threads seek the snapshotted message ID and re-consume all messages after it.
By default, Pulsar brokers immediately delete all messages that have been acknowledged by a consumer.
However, we cannot ack messages in reader thread immediately since the dataflow graph would fail, and we need to replay sources by message ID.
In Flink, when a checkpoint is finished, it means all records that the source had emitted before the checkpoint went through the streaming dataflow and updated the corresponding operator states which are also snapshotted.
From then on, Flink doesn’t need to replay these messages anymore, and Pulsar can safely delete them to save space.
We use a durable cursor to keep un-checkpointed messages alive for each topic/partition. Whenever a checkpoint is finished, we move the cursor to checkpointed message IDs. As shown in the figure below, the durable cursor is moved forward once the checkpoint is completed.
When you send a message to Pulsar using
sendAsync, your message will be buffered in a
pendingMessages queue, and you will get a CompletableFuture handle. You can register a callback with the handle and get notified once the sending is complete. Another Pulsar producer API
flush sends all messages buffered in the client directly and wait until all messages have been successfully persisted.
We use these two APIs in our Pulsar sink implementation to guarantee its at-least-once semantic. For each record we receive in the sink, we send it to Pulsar with
sendAsync and maintain a count
pendingRecords that has not been persistent.
On each checkpoint, we call
flush() manually and wait for message acknowledgments from Pulsar brokers. The checkpoint is considered complete when we get all acknowledgments and
pendingRecords decreases to 0, and the checkpoint is regarded as a failure if an exception occurs while persisting messages.
By default, a failing checkpoint in Flink causes an exception that results in an application restart; therefore, messages are guaranteed to be persisted at least once.
We have the following plans under our belts.
FLIP-27 is brought up again recently since Flink community starts to prepare its 1.10 features. We would stay tuned to its status and bring our new connector to batch/streaming compatible.
Since Pulsar has a layered architecture (Streams and Segmented Streams, powered by Apache Bookkeeper), it becomes natural to use Pulsar as a storage layer and store Flink state.
Currently, source parallelism has an upper limit to the number of topic partitions. For the upcoming Pulsar 2.5.0, key-shared subscription and sticky consumer enables us to scale-out source parallelism while maintaining the semantics of exact-once.
One of the vital features in Pulsar 2.5.0 is transaction support. Once transactional produce is enabled, we could achieve end-to-end exactly-once with Flink two-phase commit sink.