In previous blogs, we introduced the basic concept of Pulsar transactions as well as the design logic of the transaction coordinator. In this blog, we will take a closer a look at another core component of Pulsar transactions, namely the transaction buffer.
What is the transaction buffer?
After you send messages to a topic partition with a transaction, the messages are stored in the transaction buffer (TB) of that partition. The transaction buffer provides committed guarantees for reads. All messages sent using the transaction are not visible to the consumer until the transaction is committed. If the transaction is aborted, the consumer will not be able to receive the messages.
How does the transaction buffer work?
In Pulsar, all messages are immutable. You cannot individually delete or update a message that has been sent. You can only write particular messages to represent the status of the sent messages, just like marks for commit and abort operations. Pulsar adopts a strategy of restricted reading to implement transactions.
When committing or aborting a transaction, a commit mark or abort mark will be appended to the topic ledger. The mark is not a real message and is not available to the client. It is only used to identify whether the transaction has been committed or aborted. These marks are stored in BookKeeper as shown below.
Transaction marks are mainly used for transaction buffer recovery. The transaction buffer reads entries from the topic’s managed ledger. When it detects a committed mark or an aborted mark in the ledger, it removes the transaction from ongoingTxns and updates maxReadPosition accordingly (I will explain these two attributes later).
If the entry detected is an aborted mark, the transaction buffer will retain this transaction in aborts (a map) to filter out aborted messages when dispatching massages.
maxReadPosition and aborted transactions
In order to improve message reading efficiency (especially for catch-up reads), we introduced maxReadPosition. It is mainly used for cumulative acknowledgments. Only messages before this position can be sent to consumers. With this abstraction, the broker does not need to cache the position of messages whose status is unknown.
Pulsar uses a map (aborts) to store all aborted transactions. When you send messages to consumers, you can use the map to check if the transaction messages have been aborted. After the ledger recorded by the transaction is deleted from the managed ledger, the transaction can be removed from aborts.
In this implementation, all the messages are appended to the topic. They are dispatched in published order instead of committed order. Since the consumer can only read messages before maxReadPosition, it increases end-to-end latency.
Pulsar also maintains a map to record all ongoing transactions. It is used to help update maxReadPosition. If the transaction buffer has ongoing transactions, maxReadPosition should be the first ongoing transaction position - 1. If there is no ongoing transaction, maxReadPosition will be consistent with the position of the normal message published.
Apply lowWaterMark from the transaction coordinator
The transaction coordinator stores the metadata information of lowWaterMark. It indicates that the transactions before the lowWaterMark have either been committed or aborted.
The transaction buffer will obtain the lowWaterMark information when committing or aborting a transaction and store it in the transaction buffer. The lowWaterMark makes sure messages are not sent to the transaction buffer with ended transactions. Nevertheless, this is just a best-effort guarantee. Therefore, after each commit or abort operation, the lowWaterMark will be used again to check whether there is an ended transaction in ongoingTxns. If there is, the transaction will be aborted.
Transaction buffer snapshot
In order to recover maxReadPosition and aborted transactions, the transaction buffer takes a snapshot of them to persist at intervals. The snapshot is stored in a system topic __transaction_buffer_snapshot. Each namespace has a system topic to store all the transaction buffer snapshots in the namespace.
When the transaction buffer recovers maxReadPosition and aborted transactions, it reads their corresponding snapshot from the system topic. It then reads entries in the topic’s managed ledger to recover messages beginning from the recovered maxReadPosition. This way, it does not need to perform the recovery from the beginning.
The transaction buffer represents a key component for Pulsar transactions. When new messages arrive on a topic with a transaction, the broker will move them to the transaction buffer. The messages in the buffered state will not be available to consumers until the transaction is committed.
More on Apache Pulsar
Pulsar has become one of the most active Apache projects over the past few years, with a vibrant community driving innovation and improvements to the project. Check out the following resources to learn more about Pulsar.