Aug 28, 2019
4 min

Use Apache Pulsar as Streaming Table with 8 Lines of Code

Yijie Shen

Why redesign Pulsar Flink connector

In our previous post, we presented Apache Pulsar as a cloud-native messaging system that is designed for streaming performance and scalability, analyzed the integration status with Apache Flink 1.6, and looked forward to possible future integrations.

Recently, as Apache Flink just released version 1.9.0 with many notable features, we've reconsidered the previous integration and decided to redesign it from the ground up. In the rework, we followed these two principles:

  • Regard Table API as first-class citizens, ease its use without compromising expressiveness.
  • Resilient to failures with exactly-once source and at-least-once sink.

In the next sections, we would present the use and the design of the new Pulsar Flink connector.

Register a Pulsar table with a minimum number of taps

For messages sent to Pulsar, we know everything about them:

  • The schema of the data, whether it is a primitive type or a record with multiple fields.
  • The storage format of messages in Pulsar, whether it's AVRO, JSON or Protobuf.
  • Your desired metadata, such as event time and publish time.

Therefore, users are supposed to concern less on these storage details and concentrate more on their business logic. A Pulsar streaming table could be instantly composed:

<script>
val prop = new Properties()
    prop.setProperty("service.url", "pulsar://....")
    prop.setProperty("admin.urrl", "http://....")
    prop.setProperty("topicsPattern", "topic-*")
    tableEnv
        .connect(new Pulsar().properties(prop))
        .inAppendMode()
        .registerTableSource("table1")
<script> 

From now on, you could print the schema of the table1, select desired fields, and build analysis based on the table1. Behind the scenes, we do several tedious works for you:

  • Find all matching topics currently available and keep an eye on any changes while the streaming job is running.
  • Fetch schemas for each topic, make sure they all share one same schema; otherwise, it is meaningless to go on with analytics.
  • Build a scheme-specific deserializer on the initializing phase for each read thread. The deserializer knows the format of messages and converts to one Flink Row for each message.

The figure below provides some implementation details of a source task:

  • Each reader is a thread and responsible for one topic partition in Pulsar.
  • For each source task, a topic/partition discoverer thread is scheduled periodically to check newly added topics/partitions.
  • All source tasks share the same logic on distributing partition among tasks. Therefore, each discoverer can identify whether it is responsible for one newly come partition and start a reader accordingly.

Exactly-once Pulsar source

Pulsar is built upon the log abstraction: messages in each topic/partition are durably stored in order and can be uniquely identified by a message ID. It's replayable by nature.

Therefore, whenever a snapshot for a checkpoint is requested for a source task, the task checks all reader threads with its reading position and adds each (topic-name, message ID) pair to the state.

When recovering from failure, the reader threads seek the snapshotted message ID and re-consume all messages after it.

Keep messages alive until a checkpoint is finished

By default, Pulsar brokers immediately delete all messages that have been acknowledged by a consumer.

However, we cannot ack messages in reader thread immediately since the dataflow graph would fail, and we need to replay sources by message ID.

In Flink, when a checkpoint is finished, it means all records that the source had emitted before the checkpoint went through the streaming dataflow and updated the corresponding operator states which are also snapshotted.

From then on, Flink doesn't need to replay these messages anymore, and Pulsar can safely delete them to save space.

We use a durable cursor to keep un-checkpointed messages alive for each topic/partition. Whenever a checkpoint is finished, we move the cursor to checkpointed message IDs. As shown in the figure below, the durable cursor is moved forward once the checkpoint is completed.

Checkpoint illustration

At-least-once Pulsar sink

When you send a message to Pulsar using sendAsync, your message will be buffered in a pendingMessages queue, and you will get a CompletableFuture handle. You can register a callback with the handle and get notified once the sending is complete. Another Pulsar producer API flush sends all messages buffered in the client directly and wait until all messages have been successfully persisted.

We use these two APIs in our Pulsar sink implementation to guarantee its at-least-once semantic. For each record we receive in the sink, we send it to Pulsar with sendAsync and maintain a count pendingRecords that has not been persistent.

On each checkpoint, we call flush() manually and wait for message acknowledgments from Pulsar brokers. The checkpoint is considered complete when we get all acknowledgments and pendingRecords decreases to 0, and the checkpoint is regarded as a failure if an exception occurs while persisting messages.

By default, a failing checkpoint in Flink causes an exception that results in an application restart; therefore, messages are guaranteed to be persisted at least once.

Future directions

We have the following plans under our belts.

Unified source API for both batch and streaming execution

FLIP-27 is brought up again recently since Flink community starts to prepare its 1.10 features. We would stay tuned to its status and bring our new connector to batch/streaming compatible.

Pulsar as a state backend

Since Pulsar has a layered architecture (Streams and Segmented Streams, powered by Apache Bookkeeper), it becomes natural to use Pulsar as a storage layer and store Flink state.

Scale-out source parallelism

Currently, source parallelism has an upper limit to the number of topic partitions. For the upcoming Pulsar 2.5.0, key-shared subscription and sticky consumer enables us to scale-out source parallelism while maintaining the semantics of exact-once.

End-to-end exactly-once

One of the vital features in Pulsar 2.5.0 is transaction support. Once transactional produce is enabled, we could achieve end-to-end exactly-once with Flink two-phase commit sink.

More information

Yijie Shen
Yijie Shen runs Apache Pulsar’s ecosystem at StreamNative, driving the development and strengthening the cooperation between Apache Pulsar and other thriving open-source projects (such as Apache Flink and Apache Spark). Yijie’s main area of expertise lies within the fields of query execution optimization in big data analysis system, data storage, and query optimization of stream storage system.

Newsletter

Our strategies and tactics delivered right to your inbox

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Pulsar Connectors