Sep 29, 2022
9 min read

A Deep Dive into Transaction Coordinators in Apache Pulsar

Penghui Li
Engineering Lead of Messaging Team, StreamNative
Bo Cong
Apache Pulsar
No items found.

In a previous blog, we introduced the core components of Pulsar transactions, the transaction API, and the transaction data flow. In the subsequent blogs, we will introduce the details of each component, covering the transaction coordinator, transaction buffer, and pending ack state.

This blog gives you a comprehensive understanding of the transaction coordinator, including its design logic and transaction logs. It is a core component of Pulsar transactions and guarantees their integrity.

What is the transaction coordinator?

The transaction coordinator (TC) manages the entire lifecycle of transactions and makes sure they function as expected. The transaction coordinator handles transaction timeouts and ensures that the transaction is aborted after a timeout.

Additionally, the transaction coordinator guarantees the durability of transactions. It records all metadata changes of a transaction and interacts with the topic owner broker to complete the transaction.

Transaction ID and transaction coordinator ID

In Pulsar, each transaction is identified with a 128-bit transaction ID (TxnID). The highest 16 bits are reserved for the transaction coordinator ID and the remaining bits are used by the TC to generate monotonically increasing numbers.

illustration of transaction ID
Figure 1. Transaction ID

A Pulsar cluster can have multiple transaction coordinators. You can use their IDs to identify different transaction coordinators.

The transaction coordinator is responsible for generating transaction IDs. They persist in the transaction log, which is an internal component of the transaction coordinator.

Transaction metadata

After a new transaction opens, the client will publish, consume, and acknowledges messages on topics/partitions with this transaction. During the process, the TC needs to know which topics the client has interacted with (for example, publishing and acknowledging messages). This tells the TC which broker it needs to talk to when completing the transaction.

On the client side, after a new topic/partition joins the transaction, the client will send a transaction metadata change request to the TC to add the newly created partition or acked partition. The TC then persists the metadata change into the transaction log, which guarantees that the transaction can be recovered in case of a failure.

The transaction metadata contains:

  • Transaction ID
  • Transaction status (for example, OPEN, COMMITTING, and COMMITTED)
  • Created partitions
  • Acknowledged partitions and subscriptions

Complete a transaction

The client will commit a transaction if everything goes well in the transaction or abort it if any errors occur. At this point, the transaction is coming to an end. The TC will change the transaction state to COMMITTING or ABORTING and then interact with the owner broker of related topics to ensure the transaction ends successfully. After the transaction is completed, the TC changes the transaction state to COMMITTED or ABORTED. Since the TC will retry to complete the transaction, the transaction's complete operation needs to guarantee the reentrancy.

Transaction log

Add

The transaction log topic stores the metadata changes of a transaction instead of the actual messages in the transaction. The messages are stored in topic partitions. A transaction can be in various states such as OPEN, COMMITTING, and COMMITTED. It is the state and associated metadata that are stored in the transaction log.

figure of Transaction log - Add
Figure 2. Transaction log - Add

Essentially, the transaction log is a Pulsar system topic in the pulsar/system namespace. Each TC has an independent transaction log.

Delete

When a transaction operation is added to the log, the storage position of the operation will be returned. The position of all logs about a transaction is recorded in memory. When the transaction status changes to COMMITTED or ABORTED, it indicates that the transaction's lifecycle has completed. Therefore, you can delete the transaction's metadata in the log based on the log position stored in memory.

Figure 3. Transaction log - Delete
Figure 3. Transaction log - Delete

Recover

When the broker goes down, or the coordinator restarts due to the load balancing strategy, the coordinator needs to restore the metadata information of the transaction.

Keep in mind that if the status of a transaction is COMMITTED or ABORTED, you can delete the log directly. If the transaction is in the COMMITTING or ABORTING state, you need to perform the commit or abort operation in the transaction buffer (TB) or in the transaction pending ack (TP) after changing the transaction status to COMMITTED or ABORTED. After that, you can delete the log of this transaction.

Figure 4. Transaction log - Recover

As shown in Figure 4, Txn-0 should do the commit operation in TB and TP. Txn-1 has ended so you can delete the log about Txn-1. Txn-2 is in the OPEN state.

Transaction timeout

In Pulsar, each transaction can have its own unique timeout. If the client does not commit or abort the transaction within the timeout period, the TC will abort the transaction. In this case, the client will not be allowed to commit or abort this transaction again.

Low watermark

As transactions continue to be created and ended, the TC always knows the smallest ongoing transaction. This means all the transactions with a lower transaction ID have already ended. We call it the “transaction coordinator low watermark”, which can be used to optimize data clean-up.

Figure 5. Transaction Coordinator low watermark
Figure 5. Transaction Coordinator low watermark

As shown in Figure 5, before Txn-0 is committed, the low watermark is -1. After Txn-1 is aborted and Txn-0 is committed, the low watermark changes to Txn-1. After Txn-2 is committed, the low watermark changes to Txn-2.

The low watermark information will be carried to the original topic owner broker when the transaction is complete. Because the status of this transaction is unknown in TB and TP, the message or ack request received by TB and TP may carry completed transactions. The low watermark cleans up these useless transactions in TB or TP. If a transaction ID is less than the low watermark, you need to abort the transaction directly in TB or TP.

Transaction coordinator assignment

The transaction coordinator is designed as a separate module running inside a Pulsar broker. A broker can run multiple transaction coordinators. They can run outside of brokers as well, but the current implementation only allows them to run within brokers.

By default, a Pulsar cluster has 16 transaction coordinators. You can change it through the --initial-num-transaction-coordinators option when initializing the Pulsar cluster metadata.

Note that transaction coordinators will be available for scale-up through the Pulsar Admin API in 2.11.0.

How to assign transaction coordinators to brokers

Pulsar uses the existing topic ownership mechanism to assign transaction coordinators. Each TC has a “virtual” topic transaction_coordinator\_assign\_{TCID} for the assignment. For example, If the topic transaction_coordinator_assign_1 is assigned to broker A, it means TC-1 will start on broker A.

Figure 6. Transaction Coordinator assignment

The client finds the broker address through the topic lookup mechanism. If the client wants to commit a transaction with transaction ID (1:10), the client will first find the owner broker of the topic transaction_coordinator_assign_1 and then send the transaction commit command to the broker directly. The client will not introduce the lookup request for each transaction operation. Instead, it has a cache that only redoes the lookup after the TC topic ownership is changed.

Summary

This blog explains the concept of transaction coordinators and how Pulsar assigns them to brokers. It provides details about the transaction log, which stores all the transaction metadata changes for transaction durability. It also introduces transaction timeout and low watermark. The latter is a key metric that can be used for data clean-up.

The blog is focused on the transaction coordinator itself, not on the client and transaction coordinator interaction. If you are interested in the complete transaction process, see this blog about transaction details in Pulsar.

In future blogs, we will talk more about other transaction components, such as the transaction buffer.

Penghui Li
Penghui Li is passionate about helping organizations to architect and implement messaging services. Prior to StreamNative, Penghui was a Software Engineer at Zhaopin.com, where he was the leading Pulsar advocate and helped the company adopt and implement the technology. He is an Apache Pulsar Committer and PMC member. Penghui lives in Beijing, China.
Bo Cong
Bo Cong is an Apache Pulsar committer and a software engineer at StreamNative.

Related articles

Apr 11, 2024
5 min read

The New CAP Theorem for Data Streaming: Understanding the Trade-offs Between Cost, Availability, and Performance

Mar 31, 2024
5 min read

Data Streaming Trends from Kafka Summit London 2024

Newsletter

Our strategies and tactics delivered right to your inbox

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
No items found.