The messaging team at BIGO found themselves overwhelmed by the massive amounts of data when using Kafka.
BIGO started to learn Apache Pulsar and ran some tests with it, after which they believed Pulsar could be a solution to their challenges.
Pulsar now serves as a key component in the real-time messaging architecture of BIGO.
The hardware cost was reduced by 50% after the messaging team started to use Pulsar.
Founded in 2014, BIGO Technology (BIGO) is one of the fastest-growing technology companies based in Singapore. Powered by its advanced technologies in audio and video processing, artificial intelligence, and CDN, BIGO boasts a diverse portfolio of social media products, such as BIGO LIVE and Likee, with over 400 million users across 150 countries and regions.
Initially, BIGO used Kafka as the data backbone of its streaming platform. As business continued to expand, the traffic that the platform had to serve increased drastically. This called for better stability and greater real-time data processing capability of the platform, which were extremely important for BIGO’s downstream business, such as online recommendations, real-time data analysis, and real-time data warehouse.
When using Kafka clusters, the messaging team at BIGO found themselves overwhelmed by the massive amounts of data. As they put more human resources in place to maintain multiple Kafka clusters, the cost kept going up. At that time, the team faced the following problems:
Decreased performance in the following cases.
When scaling a cluster and balancing partitions, the team must copy a large amount of data.
Catch-up reads could easily lead to PageCache pollution.
As the number of partitions increased, sequential reads and writes became less effective.
Kafka clusters used KMM (Kafka Mirror Maker) for data synchronization across regions, which could not meet the team’s expectations for performance and stability.
Skyrocketing costs. As Kafka clusters scaled out, the cost rose sharply, requiring more cluster operators and maintainers. At BIGO, it took a maintainer half a day to add a machine to a Kafka cluster and rebalance partitions, while removing a server might need an entire day.
Additional human intervention. The team needed to make manual adjustments when a Kafka broker had a disk failure or recorded a high usage rate.
Data loss risks. When replicas were out of sync (not in the ISR), data could be lost once a broker failed.
“If we continue using Kafka, we need to put more people into operations and maintenance with increasing costs,” said Zhanpeng Wu, Staff Engineer at BIGO. “Our growing business also means higher demands for system reliability, stability, scalability, and latency. And to fill these needs, we start to think about redevelopment based on Kafka while seeking solutions to our problems in the community.”
Why Apache Pulsar
In November 2019, the team began to evaluate major messaging and streaming tools against their needs. At that time, they learned of Apache Pulsar, a next-generation cloud-native distributed messaging and streaming platform.
“Apache Pulsar combines messaging, streaming, storage, and lightweight functions together,” Wu added. “More importantly, Pulsar’s unique architecture of storage and computing separation could be a perfect solution to our pain points in scalability when using Kafka.”
In addition to its unique design, Pulsar also features multi-tenancy, persistent storage, geo-replication, strong consistency, high throughput, and many more built-in abilities. Specifically, Wu’s team explored the following capabilities of Apache Pulsar:
Great scalability. A Pulsar cluster can be seamlessly scaled out to include thousands of nodes.
High throughput. Supports millions of messages (Pub/Sub) per second.
Low latency. Provides low latency of less than 5 ms even during traffic bursts.
Data persistency. Provides durable storage based on Apache BookKeeper, which allows users to customize their replica writing strategy.
Read-write separation. In BookKeeper, reads and writes are separated, which gives full play to sequential I/O. This is more friendly to HDDs. Furthermore, there is no limitation on the number of topics that a bookie can support.
After the team saw first hand the power and reliability of Apache Pulsar, they commenced further exploring Pulsar by running a series of tests in December 2019. “We were using HDDs at that time and no SSDs. At the beginning, we had some performance issues in these tests,” Wu said. “Fortunately, with the help of StreamNative, we brought the performance and stability of brokers and BookKeeper to another level.” Founded by the original creators of Apache Pulsar and Apache BookKeeper, StreamNative builds a cloud-native event streaming platform that enables enterprises to easily access data as real-time event streams.
“After 3 to 4 months of testing and optimization, we were fully confident that Pulsar could help us make a visible difference in the places where Kafka fell short. Therefore, we put Pulsar into production in May 2020,” Wu added.
Apache Pulsar at BIGO: Pub/Sub model
BIGO uses Pulsar mainly for the pub/sub model. At the front end, the team leverages Baina (a data collection service implemented through C++), Kafka Mirror Maker, Flink, as well as other clients written in different languages (for example, Java and Python) to produce messages on Pulsar topics. At the back end, Flink, Flink SQL, and other clients serve as consumers.
Downstream business modules include real-time warehouse, ETL, data analysis, and recommendations. Most of them depend on Flink consuming data from Pulsar topics, and then they process the data based on their own business logic. Finally, the data are written to third-party storage services, such as Hive, ClickHouse, and Redis.
Real-time Messaging Architecture for ETL
The team has built a real-time messaging architecture based on Pulsar and Flink to support a key ETL use case at BIGO. This scenario contains up to thousands of Pulsar topics, with each one having its independent schema. Each topic has a table in HDFS, and each table may have different fields. Data in these topics need to be processed for field conversion and eventually written to HDFS.
“In this use case, the tricky part is that we have too many topics. We cannot afford the maintenance costs if we run a Flink job for each topic,” said Wu. “In the end, we choose to use readers to subscribe to all topics and write data to HDFS after schema processing.”
At first, the traffic sent to different topics was not balanced. Some task managers might need to handle high volumes of data while others received only a small portion of the traffic. For a high-volume topic, the task manager responsible for the reader process of the topic could be overwhelmed. As a result, tasks might get stuck on some machines, slowing down Flink streams. “To solve this problem, we group topics according to their traffic,” said Wu. “For example, we put low-traffic topics into the same group. By contrast, a topic that serves a high traffic volume is in its own group. This makes sure resources are separated with traffic balanced for task managers. By now, we have been running this ETL flow stably for nearly 2 years.”
What Pulsar Has Brought to BIGO
At BIGO, Pulsar has been running stably for over 2 years since its production adoption. With Pulsar’s distinctive features, including high throughput, low latency, and strong reliability, BIGO has seen a tremendous boost in its ability to process data at a low cost.
Here are some of the key results:
Pulsar processes tens of billions of messages daily with a traffic flow of 2 to 3 GB per second.
Pulsar has reduced the hardware cost by nearly 50%.
Hundreds of Pulsar broker and bookie processes are deployed on tens of physical machines (the bookie and broker are deployed on the same node).
The ETL business has over 10,000 topics, with 3 partitions per topic and 3 replicas for ledger storage.
“We have migrated our ETL business from Kafka to Pulsar and will be gradually doing the same for other modules that consume data from Kafka clusters, such as Flink and ClickHouse,” said Wu. “The tiered storage design of Pulsar makes it possible to support millions of topics, which are essential to our ETL use case.”
Deeper Integration with Pulsar
Over the past few years, BIGO has been working to optimize Pulsar and BookKeeper for the better. This has laid the groundwork to further integrate Pulsar within BIGO and promote it in the community. “We have done a lot of work on both Pulsar and BookKeeper,” said Wu. “Broker load balancing and monitoring, read and write performance, disk IO performance, and Pulsar’s integration with Flink and Flink SQL are all very important to us.”
As BIGO looks to put Pulsar in more scenarios within the organization, it is poised to help the community improve Pulsar and reap further benefits from Pulsar in the following ways.
Continue to be an active contributor to the Pulsar community by developing more features for Pulsar (for example, topic policy support) and optimizing both Pulsar and BookKeeper (for example, BookKeeper IO protocol stack).
Migrate more traffic to Pulsar. This contains two major tasks - migrating existing business modules that are using Kafka to Pulsar and running new business directly with Pulsar.
Adopt Kafka on Pulsar (KoP) to ensure a smooth migration experience. “We still have some Flink-based jobs that consume data from existing Kafka clusters. We hope to integrate KoP with them to streamline the migration process,” said Wu.