sharetwitterlinkedIn

600K Topics Per Cluster: Stability Optimization of Apache Pulsar at Tencent Cloud

October 20, 2022
head img

Background

Tencent Cloud is a secure, reliable, and high-performance cloud computing service provided by Tencent, one of the largest Internet companies in China and beyond. With a worldwide network of data centers, Tencent Cloud is committed to offering industry-leading solutions that integrate its cloud computing, big data, artificial intelligence, Internet of Things, security, and other advanced technologies to support the digital transformation of enterprises around the world. Tencent Cloud has 70 availability zones in 26 geographical regions, serving millions of customers from more than 100 countries and regions.

Currently, a Pulsar cluster at Tencent Cloud can serve around 600,000 topics in production with cost controlled at a relatively low level for different use cases. In this blog post, I will share some of our practices of optimizing Apache Pulsar for better stability and performance over the past year.

How to avoid acknowledgment holes

Different from other messaging systems, Pulsar supports both individual acknowledgments and cumulative acknowledgments (the latter is similar to Kafka offsets). Although individual message acknowledgments provide solutions to some online business scenarios, they also lead to acknowledgment holes.

Acknowledgment holes refer to the gaps between ranges, which result in fragmented acknowledgments. They are very common when you use shared subscriptions or choose to acknowledge messages individually. Pulsar uses an abstraction called individuallyDeletedMessages to track fragmented acknowledgments in the form of ranges (intervals). Essentially, this attribute is a collection of open and closed intervals. A square bracket means the message has been processed while a parenthesis indicates an acknowledgment hole.

In Figure 1, for example, in the first interval (5:1226..5:1280], 5 is the Ledger ID and 1226 and 1280 are the Entry IDs. As the interval is left-open and right-closed, it means 5:1280 is acknowledged and that 5:1226 is not.

Figure 1

There are many factors that can cause fragmented acknowledgments, such as the broker’s failure to process messages. In the early versions of Pulsar, there were no returns for acknowledgments, so we couldn’t ensure the acknowledgment request was correctly handled. In Apache Pulsar 2.8.0 and later versions, AckResponse was introduced for transaction messages to support returns. Another major cause is the client’s failure to call acknowledgments for some reason, which is very common in production.

To avoid acknowledgment holes, I listed the following two solutions that we tried at Tencent Cloud for your reference.

First, carefully configure the backlog size. In Pulsar, a message can be either a batch message or a single message. For a batch message, you don’t know the exact number of entries contained in it. Note that Pulsar parses batch messages on the consumer side instead of the broker side. In practice, however, it is rather difficult to precisely calculate the backlog size.

Second, create a broker compensatory mechanism for unacknowledged messages. As individuallyDeletedMessages contains information on unacknowledged messages, we can let the broker redeliver them to the client to fill the gaps.

Before I explain the details of the second solution, let’s take a look at the different stages in which messages can be in a topic. In Figure 2, a producer publishes messages on a topic, which are then received by a consumer. Messages in different states are marked in three colors.

Figure 2

  • Red: The latest messages sent to the topic.
  • Gray: The messages sent to the topic but not consumed by the consumer.
  • Blue: The messages already consumed and acknowledged.

Pulsar allows you to configure backlog policies to manage unacknowledged messages when the backlog size is exceeded.

  • producer_exception: The broker disconnects from the client by throwing an exception. This tells the producer to stop sending new messages. It is the major policy that we are using in production at Tencent Cloud.
  • producer_request_hold: The broker holds and does not persist the producer's request payload. The producer will stop sending new messages.
  • consumer_backlog_eviction: The broker discards the oldest unacknowledged messages in the backlog to make sure the consumer can receive new messages. As messages are lost in this way, we haven’t used the policy in production.

So, how does Pulsar define the backlog size? In Figure 3, all messages in the stream have been consumed but not all of them have been acknowledged by the consumer.

Figure 3

The backlog size refers to the range from markDeletePosition to the position that is before readPosition. In this example, M2 and M4 are acknowledgment holes, which may result from broker request failures or user code errors. After the size is exceeded, the producer will be disconnected (for example, you use the producer_exception policy) so that no new messages will be sent.

As I mentioned above, the attribute individuallyDeletedMessages contains a collection of ranges, which tell you which messages are acknowledged and which are not. The message that is before the first interval in individuallyDeletedMessages is marked by markDeletePosition. To implement the broker compensatory mechanism, we are using an Executor Service to obtain the information of markDeletePosition and listen on individuallyDeletedMessages with a set frequency (for example, 3 or 5 minutes). It redelivers messages to the consumer for acknowledgments at intervals. This prevents our backlog policy from being frequently triggered because of acknowledgment holes.

Figure 4

Message lifecycle: What you may not know

Before I dive deep into the message lifecycle in Pulsar, let’s briefly talk about some basic concepts involved - TTL, backlog, and retention policies.

  • TTL: The time messages can stay in the unacknowledged state before they are acknowledged by the broker. Note that by default, Pulsar stores all unacknowledged messages indefinitely.
  • Backlog: A collection of unacknowledged messages for a particular subscription. You can consider it as a gap between the messages sent by the producer and the messages received by the consumer.
  • Retention: How long acknowledged messages stay in storage (bookies).

Here is a very interesting question: If we set TTL and retention policies at the same time, how is the lifecycle of a message calculated? Let’s answer this question by analyzing the following code snippet.

void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
    Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated(cursor, newPosition);
    if (pair == null) {
        // Cursor has been removed in the meantime
        trimConsumedLedgersInBackground();
        return;
    }

    PositionImpl previousSlowestReader = pair.getLeft();
    PositionImpl currentSlowestReader = pair.getRight();

    if (previousSlowestReader.compareTo(currentSlowestReader) == 0) {
        // The slowest consumer has not changed position. Nothing to do right now
        return;
    }

    // Only trigger a trimming when switching to the next Ledger
    if (previousSlowestReader.getLedgerId() != newPosition.getLedgerId()) {
        trimConsumedLedgersInBackground();
    }

As we can see from the code below, Pulsar uses TTL policies to check messages at a regular basis (5 minutes by default). If a message should be expired, the cursor is updated.

As for retention, you can set a time or size limit to retain acknowledged messages. Based on the timestamp information in metadata, you can know when a ledger is created. After the threshold is reached for a ledger, it means the ledger is ready for deletion. The last three code lines compare the slowest LedgerId with the LedgerId of newPosition. If the managedLedger is switched, the function trimConsumedLedgersInBackground() will be used, which is the core implementation logic of retention.

We can draw a conclusion for message lifecycle calculation:

  • If TTL < retention, the message lifecycle = TTL + retention
  • If TTL ≥ retention, the message lifecycle = TTL

You may be wondering why ledger deletion is triggered after a ledger is rolled over. This is because ledgers are the smallest unit for deletion and we cannot delete individual messages within a ledger. If a ledger is not rolled over, the retention policy will not be triggered.

Delayed messages and TTL

Pulsar’s delayed message delivery capability allows messages to be processed at a later time rather than to be consumed immediately. In one of our use cases, a user sent hundreds of thousands of delayed messages with the delay period set to 10 days. At the same time, the user configured a TTL policy of 5 days. 5 days later, all delayed messages seemed to be missing, which actually expired due to the TTL policy.

From the two source code snippets below, we can see that the core logic of TTL implementation for message expiry depends on the function cursor.asyncFindNewestMatching. isEntryExpired is concerned with the publishedTime of an entry, which is stored as part of its metadata. We can use cursor.asyncFindNewestMatching to obtain the publishedTime of the entry. As the implementation itself does not entail attributes about delayed messages, the TTL policy will make messages expire if it is less than the delay period. On the user side, this looks like delayed messages are missing.

public boolean expireMessages(int messageTTLInSeconds) {
    if (expirationCheckInProgressUpdater.compareAndSet(obj: this, FALSE, TRUE)) {
        log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName,
                messageTTLInSeconds);
                
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
    try {
        long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
        return Messaqelmpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
        } catch (Exception e) {
            log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
        } finally {
            entry&lt;release();
        }
        return false;
    }, callback: this, ctx: null);
    return true;

TTL implementation for message expiry

public static boolean isEntryExpired(int messageTTLInSeconds, long entryTimestamp) {
    return messageTTLInSeconds != 0
            && (System.currentTimeMillis() >
            entryTimestamp + TimeUnit.SECONDS.toMillis(messageTTLInSeconds));
}

isEntryExpired

To solve this problem, our team submitted a pull request to the Pulsar community. We think it might be a good idea to check both the published time and the delay time for entries. The following is the revised logic:

  • If the published time is reached and the delayed time is greater than the TTL, delayed messages should not expire even if the TTL time is exceeded.
  • The isEntryExpired function will include both the TTL time and the delay time. No matter what delay policy you choose (deliverAfter or deliverAt), the delay time refers to a specific timestamp to deliver messages.

Note that we need to obtain the published time and the delay time for an entry at the same time. Otherwise, the entry objects may be different.

For more information, see this pull request.

Admin API optimization

We found the following problems related to Pulsar’s Admin API:

  • The Pulsar Web service (Admin Rest API) has some synchronous implementations in asynchronous calls, thus blocking the service itself. When the Pulsar Web thread is blocked, the only solution is to restart the corresponding broker to restore the task.
  • Pulsar’s HTTP lookup service is an important part of data streams in Pulsar (Binary lookup is another way). As the HTTP lookup service uses the Web port, once the service is blocked due to the above-mentioned asynchronous calls, the reads and writes of data streams will also be blocked.
  • Low Web service performance. This is mainly caused by the misuse of CompletableFuture. When you define a CompletableFuture object and use thenApply or thenCompose to return the object, this is a synchronous return within CompletableFuture.
  • In higher versions of Pulsar, an abstraction Metadata Store implemented through a thread pool was introduced for the Web service. It may put extra pressure on ZooKeeper if there are too many Web service requests at the same time. As a result, the performance of the entire cluster can be impacted (for example, read and write latency).

We have taken the following measures for the above issues.

  • Monitored the Web service to locate the weak points so that we can make them completely asynchronous.
  • Added timeout logic for the Pulsar Web thread. If the service is not complete within a preset threshold (for example, 30 seconds), a timeout prompt will be returned.
  • Separated the Metadata Store thread pool abstraction.

Zk-node leaks

Figure 5 displays the number of znodes in a cluster and the data stored on them in one of our use cases. The cluster had a large number of znodes, while in fact, we did not have too many topics in use. As we cleaned up the dirty data in the cluster, the znode number and the data size both decreased. The leak was quite serious as almost 80% of data was removed.

Figure 5

The reason for the leak is that when you create a topic, ZooKeeper stores the topic metadata in six different directories as shown below. These directories may contain tons of topic data, such as tenants, namespaces, persistence, and specific policies.

zk-path All topics are included
/admin/local-policies Yes
/admin/partitioned-topics Yes
/admin/policies Yes
/namespace Yes
/managed-ledgers Yes
/loadbalance/bundle-data Yes

The Pulsar community merged some code to fix the leak issue in Pulsar 2.8+. If you are using earlier versions, you might have some dirty data in your cluster. To clean up the data, we proposed the following solution.

  1. Get a topic list through the ZooKeeper client (You can use it to read these paths and form topic names in a set format).
  2. Use pulsar-admin to check whether these topics exist in the cluster. If they do not exist, the associated data must be dirty and should be deleted.
  3. Keep in mind that you need to back up the data before the clean-up so that you can recover topics in case of any unexpected deletion.

Bookie ledger leaks

In production, all our retention policies are no more than 15 days. Even if we add the TTL period (for example, also 15 days), the maximum message lifecycle should be 30 days. However, we found that some ledgers which were created 2 years ago still existed and could not be deleted (We are using an internal monitoring service that checks all ledger files on a regular basis).

One possible reason for orphan ledgers could be the bookie CLI commands. For example, when we use some CLI commands to check the status of a cluster, it may create a ledger on the bookie. However, the retention policy is not applicable to such ledgers.

To delete orphan ledgers, you can try the following ways:

  1. Obtain the metadata of the ledger. Each ledger has its own LedgerInfo, which stores its metadata, such as the creation time and the bookies that store the ledger data. If the ledger metadata are already missing, you can delete their corresponding ledgers directly.
  2. As a Pulsar topic represents a sequence of ledgers, you can check whether a ledger still exists in the ledger list of the topic. If it does not exist, you can delete it.

When you try to delete orphan ledgers, you need to:

  • Pay special attention to the schema, which is mapped to a ledger in Pulsar. The schema ledger is stored on the bookie and the information about the schema itself is stored in ZooKeeper. If you delete the schema by accident, you need to delete the schema information on the broker first and try to recreate it from the producer side.
  • Back up your data first before you delete them.

For more information about how to deal with orphan ledgers, see A Deep Dive into the Topic Data Lifecycle in Apache Pulsar.

Cache optimization

Pulsar uses caches at different levels. Topics have their own caches on the broker side. Write caches and read caches in BookKeeper are allocated based on JVM direct memory (25% of direct memory). For hot data, generally, these caches can be hit and there is no need to read the actual data.

Figure 6 shows some cache metrics we observed in one of our production cases. There was a sharp decrease of read cache size, which led to the sudden increase of read cache misses. As a result, the reads on bookies saw a peak at 16:15 with the latency increasing to nearly 5 seconds. In fact, we noticed that this sudden peak happened periodically.

Figure 6

Let’s take a look at the following two source code snippets to analyze the reason for the above scenario.

try {
     // We need to check all the segments, starting from the current
     // backward to minimize the
     // checks for recently inserted entries
     int size = cacheSegments.size();
     for (int i = 0; i &lt; size; i++) {
         int segmentIdx = (currentSegmentIdx + (size - i)) % size; 

Iterate messages

try {
    int offset = currentSegmentOffset.getAndAdd(entrySize);
    if (offset + entrySize > segmentSize) {
        // Rollover to next segment
        currentSegmentIdx = (currentSegmentIdx + 1) % cacheSegments.size();  
        currentSegment0ffset.set(alignedSize);
        cacheIndexes.get(currentSegmentIdx).clear();
         offset = 0;
}

offset + entrySize vs segmentSize

The first snippet uses a for loop to iterate messages for caches; in the second one, all caches will be cleared if the sum of offset and entrySize is larger than segmentSize. This explains the sudden decrease of read cache size. After that point, caches will be recreated.

Currently, we are using the LRU policy (OHC) to avoid sudden cache fluctuations. This is the result after our optimization:

Figure 7

Summary

In this blog, we shared our experience of using and optimizing Apache Pulsar at Tencent Cloud for better performance and stability. Going forward, the Tencent Cloud team will continue to be an active player in the Pulsar community and work with other community members in the following aspects.

  • Retry policies within the client timeout period. We are thinking about creating an internal mechanism featuring multiple retries (send requests) to avoid message delivery failures.
  • Broker and bookie OOM optimization. Brokers may be out of memory when you have too many fragmented acknowledgments. For bookie OOM cases, they can be caused by different factors. For example, if one of the bookies in an ensemble has slow returns (Write Quorum =3, Ack Quorum = 2), the direct memory can never be released.
  • Bookie AutoRecovery optimization. AutoRecovery can be deployed separately or on the same machines where bookies are running. When you deploy them together, the AutoRecovery process can’t be restarted if you have a ZooKeeper session timeout. This is because there is no retry logic between AutoRecovery and ZooKeeper. Hence, we want to add an internal retry mechanism for AutoRecovery.

More on Apache Pulsar

Pulsar has become one of the most active Apache projects over the past few years, with a vibrant community driving innovation and improvements to the project. Check out the following resources to learn more about Pulsar.

© StreamNative, Inc. 2022Apache, Apache Pulsar, Apache BookKeeper, Apache Flink, and associated open source project names are trademarks of the Apache Software Foundation.TermsPrivacy