Jun 14, 2021
7 min read

Exactly-Once Semantics with Transactions in Pulsar

Penghui Li
Engineering Lead of Messaging Team, StreamNative
Exactly-Once Semantics
Apache Pulsar
No items found.

We have hit an exciting milestone for the Apache Pulsar community: exactly once semantics. As part of the 2.8 Pulsar release, we have evolved the exactly-once semantic from guaranteed message deduplication on a single topic to atomic produce and acknowledgement over multiple topics via Transaction API. In this post, I’ll explain what this means, how we made this evolution, and how the transaction features in Pulsar simplify exactly-once semantics for building messaging and streaming applications.

Before diving into the transaction features, let’s get started with an overview of messaging semantics.

What is exactly-once semantics?

In any distributed system, the machines that form the system can always fail independently of one another. In Apache Pulsar, an individual broker or bookie can crash, or a network failure can happen while the producer is producing a message to a topic. Depending on how the producer handles such a failure, the application can get one of three different semantics.

At-least-once Semantics

If the producer receives an acknowledgement (ACK) from the Pulsar broker, it means that the message has been written to the Pulsar topic. However, if a producer times out on receiving an acknowledgement or receives an error from the Pulsar broker, it might retry sending the message to the Pulsar topic. If the broker had failed right before it sent the ACK but after the message was successfully written to the Pulsar topic, this reattempt leads to the message being written twice and delivered more than once to the consumers.

At-most-once Semantics

If the producer does not attempt to produce the message when it times out on receiving an acknowledgement or receives an error, then the message might end up not being written to the Pulsar topic, and not delivered to the consumers. In some cases in order to avoid the possibility of duplication, we accept that messages will not be written.

Exactly-once Semantics

Exactly-once semantics guarantees that even if a producer retries sending a message multiple times, the message will only be written exactly-once to the Pulsar topic. Exactly-once semantics is the most desirable guarantee, but also one that is not well understood. Exactly-once semantics requires coordination between the messaging system itself and the application producing and consuming the messages. For example, if after consuming and acknowledging a message successfully, your application rewinds the subscription to a previous message ID, your application will receive all the messages from that message ID to the latest one, all over again.

Challenges in supporting exactly-once semantics

Supporting exactly-once delivery semantics in messaging systems presents some challenges. To describe them, I’ll start with a simple example.

Suppose there is a producer that sends a message “Hello StreamNative” to a Pulsar topic called “Greetings”. Further suppose a consumer on the other end receives messages from the topic and prints them. In a happy path where there are no failures, this works well, and the message “Hello StreamNative” is written to the “Greetings” topic only once. The consumer receives the message, processes it, and acknowledges it to indicate that it has completed its processing. The consumer will not receive the message again, even if the consumer application crashes and restarts.

However, at scale, failure scenarios can happen all the time.

A bookie can fail

Pulsar stores messages in BookKeeper. BookKeeper is a highly available, durable log storage service where data written to a ledger (a segment of a Pulsar topic) is persisted and replicated multiple times (number n). As a result, BookKeeper can tolerate n-1 bookie failures, meaning that a ledger is available as long as there is at least one bookie available. Inherited from Zab/Paxos, BookKeeper’s replication protocol guarantees that once the data has been successfully written to a quorum of bookies, the data is permanently stored and will be replicated to all bookies within the same ensemble.

A broker can fail or the producer-to-broker connection can fail

Durability in Pulsar depends on the producer receiving an ACK from the Pulsar broker. Failure to receive that ACK does not necessarily mean that the produce request itself failed. The broker can crash after writing a message but before it sends an ACK back to the producer. It can also crash before even writing the message to the topic. Since there is no way for the producer to know the nature of the failure, it is forced to assume that the message was not written successfully and to retry it. In some cases, the same message is duplicated in the Pulsar topic, causing the consumers to receive it more than once.

The Pulsar client can fail

Exactly-once delivery must account for client failures as well. But it is also hard to tell if a client has actually failed and is not just temporarily partitioned from the Pulsar brokers or undergoing an application pause. Having the ability to distinguish between a permanent failure and a soft one is important. The Pulsar broker should discard messages sent by a zombie producer, likewise for the consumer. Once a new client has been restarted, it must be able to recover from whatever state the previous failed client left behind and begin processing from a safe point.

The Pulsar community completes the support for exactly-once semantics in steps. We first introduced Idempotent Producer to support exactly-once semantics on a single topic in the Pulsar 1.20.0-incubating release, and then completed the vision by introducing Transaction API to provide atomicity across multiple topics in the recent 2.8.0 release.

Idempotent producer: exactly-once semantics on a single topic

We started the journey of supporting exactly-once semantics in Pulsar by introducing Idempotent Producer in its 1.20.0-incubating release.

What does Idempotent Producer mean? An idempotent operation can be performed once or many times without causing a different result. If Guaranteed Message Deduplication is enabled at the cluster level or the namespace level and a producer is configured to be a Idempotent Producer, the produce requests are idempotent. In the event of an error that causes a producer to retry, the same message sent by the producer multiple times, is guaranteed to write to the Pulsar topic only once on the broker.

To turn on this feature and get exactly-once semantics per partition - meaning no duplicates, no data loss, and in-order semantics - configure the following:

  • Enable message deduplication for all namespaces/topics at the cluster level, or for a specific namespace at the namespace policy level, or for a specific topic at the topic policy level
  • Specify a name for the producer and set the message timeout to 0

How did that feature work? Under the hood, it works in a way very similar to TCP: each message produced to Pulsar will contain a sequence ID that the Pulsar broker will use to dedupe any duplicated message. However, unlike TCP which provides guarantees only within a transient connection, this sequence ID along with the message is persisted to the Pulsar topic and Pulsar broker keeps track of the last received sequence ID. So even if the Pulsar broker fails, any broker that takes over the topic ownership will also know if a message is duplicated or not. The overhead of this mechanism is very low, adding negligible performance overhead over the non-idempotent producer.

You can try out this feature in any Pulsar version newer than 1.20.0-incubating by following the tutorial here.

While powerful, Idempotent producer only solves a narrow scope of challenges for exactly-once semantics. There are still many other challenges it doesn’t resolve. For example, there is no atomicity when a producer attempts to produce messages to multiple topics. A publish error can occur when the broker serving one of the topics crashes. If the producer doesn’t retry publishing the message again, it results in some messages being persisted once and others being lost. If the producer retries, it results in some messages being persisted multiple times.

On the consumer side, the message acknowledgement was a best-effort operation. The message ACKs can potentially be lost because the consumer has no idea if the broker has received them and will not retry sending ACKs again. This will then result in consumers receiving duplicate messages.

Transactions: atomic writes and acknowledgments across multiple topics

To address the remaining challenges described above, we’ve strengthened Pulsar’s delivery semantics by introducing a Pulsar Transaction API to support atomic writes and acknowledgments across multiple topics. This allows a producer to send a batch of messages to multiple topics such that either all messages in the batch are eventually visible to any consumer or none are ever visible to consumers. This feature also allows you to acknowledge your messages across multiple topics in the same transaction along with the messages you have processed, thereby allowing end-to-end exactly-once semantics.

Here is an example code snippet to demonstrate the use of Transaction API:

<script>
PulsarClient pulsarClient = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .enableTransaction(true)
        .build();
Transaction txn = pulsarClient
        .newTransaction()
        .withTransactionTimeout(1, TimeUnit.MINUTES)
        .build()
        .get();
producer.newMessage(txn).value("Hello Pulsar Transaction".getBytes()).send();
Message message = consumer.receive();
consumer.acknowledge(message.getMessageId(), txn);
txn.commit().get();
<script> 

The code example above describes how you can use the new producer API with Transaction API to send messages atomically to a set of topics and use the new consumer API with Transactions to acknowledge the processed messages in the same transaction.

It is worth noting that:

  • A Pulsar topic might have some messages that are part of a transaction while others are not.
  • A Pulsar client can have multiple concurrent transactions outstanding. This design is fundamentally different from the transactions implementation in other older messaging systems, and results in much higher throughput.
  • The current Pulsar Transaction API only supports READ_COMMITTED isolation level. The consumer can only read the messages that are not part of a transaction and the messages that are part of a committed transaction. Messages produced in an aborted transaction are not delivered to any consumers.

To use the Transaction API, you don’t need any additional settings in the Pulsar client.

End-to-end exactly-once stream processing made simple: a Pulsar+Flink Example

Exactly-once stream processing is now possible through the Pulsar Transaction API.

One of the most critical questions for a stream processing system is, “Does my stream processing application get the right answer, even if one of the instances crashes in the middle of processing?” The key, when recovering a failed instance, is to resume processing in exactly the same state as before the crash.

Stream processing on Apache Pulsar is a read-process-write operation on Pulsar topics. A source operator that runs a Pulsar consumer reads messages from one or multiple Pulsar topics, some processing operators transform the messages or modify the state maintained by them, and a sink operator that runs a Pulsar producer writes the resulting messages to another Pulsar topic. Exactly-once stream processing is simply the ability to execute a read-process-write operation exactly once. In such a context, “getting the right answer” means not missing any input messages from the source operator or producing any duplicates to the sink operator. This is the behavior users expect from an exactly-once stream processor.

Let’s take the Pulsar and Flink integration as an example.

Prior to Pulsar 2.8.0, the Pulsar and Flink integration only supported exactly-once source connector and at-least-once sink connector. That means if you want to use Flink to build stream applications with Apache Pulsar, the highest processing guarantee you can get end-to-end is at-least-once - the resulting messages from these streaming applications may potentially produce multiple times to the resulting topic in Pulsar.

With the introduction of Pulsar Transaction in 2.8.0, the Pulsar-Flink sink connector can be easily enhanced to support exactly-once semantics. Because Flink uses a two-phase commit protocol to ensure end-to-end exactly-once semantics, we can implement the designated TwoPhaseCommitSinkFunction and hook up the Flink sink message lifecycle with Pulsar Transaction API. When the Pulsar-Flink sink connector calls beginTransaction, it starts a Pulsar Transaction and obtains the transaction id. All the subsequent messages written to the sink connector will be associated with this transaction ID. They will be flushed to Pulsar when the connector calls preCommit. The Pulsar transaction will then be committed or aborted when the connector calls recoverAndCommit and recoverAndAbort accordingly. The integration is very straightforward and the connector just has to persist the transaction ID together with Flink checkpoints so the transaction ID can be retrieved back for commit or abort.

Based on idempotency and atomicity provided by Pulsar Transactions and the globally consistent checkpoint algorithm offered by Apache Flink, the streaming applications built on Pulsar and Flink can easily achieve end-to-end exactly-once semantics.

Where to go from here

Exactly-once semantics via Transaction API is now supported in StreamNative Cloud as well as in StreamNative Platform v1.0 and later. If you’d like to understand the exactly-once guarantees in more detail, I’d recommend checking out PIP-31 for the transaction feature. If you’d like to dive deeper into the detailed design, this design document is worth reading.

This post primarily focuses on describing the nature of the user-facing guarantees as supported by the Transaction API introduced in Apache Pulsar 2.8.0, and how you can use this feature. In our next post, we will go into more details about the API and design.

If you want to put the new Transaction API to practical use, check out StreamNative Cloud or download StreamNative Platform 1.0 to create your own applications with Pulsar Java clients.

My fellow colleagues Sijie Guo and Addison Higham gave a presentation “Exactly-Once Made Easy: Transactional Messaging in Apache Pulsar” at the Pulsar Virtual Summit North America 2021.

Credits

An amazing team of Pulsar committers and contributors worked for over a year to bring this awesome exactly-once work to Pulsar. Thanks to everyone that has been involved in this feature development: Penghui Li, Ran Gao, Bo Cong, Addison Higham, Jia Zhai, Yong Zhang, Xiaolong Ran, Matteo Merli, and Sijie Guo.

Penghui Li
Penghui Li is passionate about helping organizations to architect and implement messaging services. Prior to StreamNative, Penghui was a Software Engineer at Zhaopin.com, where he was the leading Pulsar advocate and helped the company adopt and implement the technology. He is an Apache Pulsar Committer and PMC member. Penghui lives in Beijing, China.

Related articles

Mar 15, 2024
6 min read

Challenges in Kafka: the Data Retention Stories of Kevin and Patricia

Feb 28, 2024

The Oxia Java Client Library is Now Open Source

No items found.

Newsletter

Our strategies and tactics delivered right to your inbox

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
No items found.