A Deep Dive into Transaction Coordinators in Apache Pulsar
In a previous blog, we introduced the core components of Pulsar transactions, the transaction API, and the transaction data flow. In the subsequent blogs, we will introduce the details of each component, covering the transaction coordinator, transaction buffer, and pending ack state.
This blog gives you a comprehensive understanding of the transaction coordinator, including its design logic and transaction logs. It is a core component of Pulsar transactions and guarantees their integrity.
What is the transaction coordinator?
The transaction coordinator (TC) manages the entire lifecycle of transactions and makes sure they function as expected. The transaction coordinator handles transaction timeouts and ensures that the transaction is aborted after a timeout.
Additionally, the transaction coordinator guarantees the durability of transactions. It records all metadata changes of a transaction and interacts with the topic owner broker to complete the transaction.
Transaction ID and transaction coordinator ID
In Pulsar, each transaction is identified with a 128-bit transaction ID (TxnID). The highest 16 bits are reserved for the transaction coordinator ID and the remaining bits are used by the TC to generate monotonically increasing numbers.
A Pulsar cluster can have multiple transaction coordinators. You can use their IDs to identify different transaction coordinators.
The transaction coordinator is responsible for generating transaction IDs. They persist in the transaction log, which is an internal component of the transaction coordinator.
Transaction metadata
After a new transaction opens, the client will publish, consume, and acknowledges messages on topics/partitions with this transaction. During the process, the TC needs to know which topics the client has interacted with (for example, publishing and acknowledging messages). This tells the TC which broker it needs to talk to when completing the transaction.
On the client side, after a new topic/partition joins the transaction, the client will send a transaction metadata change request to the TC to add the newly created partition or acked partition. The TC then persists the metadata change into the transaction log, which guarantees that the transaction can be recovered in case of a failure.
The transaction metadata contains:
- Transaction ID
- Transaction status (for example, OPEN, COMMITTING, and COMMITTED)
- Created partitions
- Acknowledged partitions and subscriptions
Complete a transaction
The client will commit a transaction if everything goes well in the transaction or abort it if any errors occur. At this point, the transaction is coming to an end. The TC will change the transaction state to COMMITTING or ABORTING and then interact with the owner broker of related topics to ensure the transaction ends successfully. After the transaction is completed, the TC changes the transaction state to COMMITTED or ABORTED. Since the TC will retry to complete the transaction, the transaction's complete operation needs to guarantee the reentrancy.
Transaction log
Add
The transaction log topic stores the metadata changes of a transaction instead of the actual messages in the transaction. The messages are stored in topic partitions. A transaction can be in various states such as OPEN, COMMITTING, and COMMITTED. It is the state and associated metadata that are stored in the transaction log.
Essentially, the transaction log is a Pulsar system topic in the pulsar/system namespace. Each TC has an independent transaction log.
Delete
When a transaction operation is added to the log, the storage position of the operation will be returned. The position of all logs about a transaction is recorded in memory. When the transaction status changes to COMMITTED or ABORTED, it indicates that the transaction's lifecycle has completed. Therefore, you can delete the transaction's metadata in the log based on the log position stored in memory.
Recover
When the broker goes down, or the coordinator restarts due to the load balancing strategy, the coordinator needs to restore the metadata information of the transaction.
Keep in mind that if the status of a transaction is COMMITTED or ABORTED, you can delete the log directly. If the transaction is in the COMMITTING or ABORTING state, you need to perform the commit or abort operation in the transaction buffer (TB) or in the transaction pending ack (TP) after changing the transaction status to COMMITTED or ABORTED. After that, you can delete the log of this transaction.
As shown in Figure 4, Txn-0 should do the commit operation in TB and TP. Txn-1 has ended so you can delete the log about Txn-1. Txn-2 is in the OPEN state.
Transaction timeout
In Pulsar, each transaction can have its own unique timeout. If the client does not commit or abort the transaction within the timeout period, the TC will abort the transaction. In this case, the client will not be allowed to commit or abort this transaction again.
Low watermark
As transactions continue to be created and ended, the TC always knows the smallest ongoing transaction. This means all the transactions with a lower transaction ID have already ended. We call it the “transaction coordinator low watermark”, which can be used to optimize data clean-up.
As shown in Figure 5, before Txn-0 is committed, the low watermark is -1. After Txn-1 is aborted and Txn-0 is committed, the low watermark changes to Txn-1. After Txn-2 is committed, the low watermark changes to Txn-2.
The low watermark information will be carried to the original topic owner broker when the transaction is complete. Because the status of this transaction is unknown in TB and TP, the message or ack request received by TB and TP may carry completed transactions. The low watermark cleans up these useless transactions in TB or TP. If a transaction ID is less than the low watermark, you need to abort the transaction directly in TB or TP.
Transaction coordinator assignment
The transaction coordinator is designed as a separate module running inside a Pulsar broker. A broker can run multiple transaction coordinators. They can run outside of brokers as well, but the current implementation only allows them to run within brokers.
By default, a Pulsar cluster has 16 transaction coordinators. You can change it through the --initial-num-transaction-coordinators option when initializing the Pulsar cluster metadata.
Note that transaction coordinators will be available for scale-up through the Pulsar Admin API in 2.11.0.
How to assign transaction coordinators to brokers
Pulsar uses the existing topic ownership mechanism to assign transaction coordinators. Each TC has a “virtual” topic transaction_coordinator\_assign\_{TCID} for the assignment. For example, If the topic transaction_coordinator_assign_1 is assigned to broker A, it means TC-1 will start on broker A.
The client finds the broker address through the topic lookup mechanism. If the client wants to commit a transaction with transaction ID (1:10), the client will first find the owner broker of the topic transaction_coordinator_assign_1 and then send the transaction commit command to the broker directly. The client will not introduce the lookup request for each transaction operation. Instead, it has a cache that only redoes the lookup after the TC topic ownership is changed.
Summary
This blog explains the concept of transaction coordinators and how Pulsar assigns them to brokers. It provides details about the transaction log, which stores all the transaction metadata changes for transaction durability. It also introduces transaction timeout and low watermark. The latter is a key metric that can be used for data clean-up.
The blog is focused on the transaction coordinator itself, not on the client and transaction coordinator interaction. If you are interested in the complete transaction process, see this blog about transaction details in Pulsar.
In future blogs, we will talk more about other transaction components, such as the transaction buffer.
Newsletter
Our strategies and tactics delivered right to your inbox