Announcing AMQP-on-Pulsar: bring native AMQP protocol support to Apache Pulsar
Note:
The AoP plugin supports only the 0-9-1 protocol with basic produce and consume functionalities, and does not include advanced features such as transactions. It is available as an open-source plugin and is only offered as a private preview feature in the Private Cloud distribution. It is not available on StreamNative Cloud. Please use it with caution.
We are excited to announce that StreamNative and ChinaMobile are open-sourcing "AMQP on Pulsar" (AoP). AoP brings the native AMQP protocol support to Apache Pulsar by introducing an AMQP protocol handler on Pulsar brokers. Similar to KoP, AoP is also an implementation of the pluggable protocol handler. By adding the AoP protocol handler in your existing Pulsar cluster, you can migrate your existing RabbitMQ applications and services to Pulsar without modifying the code. This enables RabbitMQ applications to leverage Pulsar’s powerful features, such as infinite event stream retention with Apache BookKeeper and tiered storage.
What is Apache Pulsar?
Apache Pulsar is an event streaming platform designed from the ground up to be cloud-native- deploying a multi-layer and segment-centric architecture. The architecture separates serving and storage into different layers, making the system container-friendly. The cloud-native architecture provides scalability, availability, and resiliency and enables companies to expand their offerings with real-time data-enabled solutions. Pulsar has gained wide adoption since it was open-sourced in 2016 and was designated an Apache Top-Level project in 2018.
The Need For AoP
Pulsar provides a unified messaging model for both queueing and streaming workloads. Pulsar implemented its own protobuf-based binary protocol to provide high performance and low latency. This choice of protobuf makes it convenient to implement Pulsar clients and the project already supports Java, Go, Python, and C++ languages alongside third-party clients provided by the community. However, existing applications written using other messaging protocols have to be rewritten to adopt Pulsar’s new unified messaging protocol.
To address this, the Pulsar community developed applications to facilitate the migration to Pulsar from other messaging systems. For example, Pulsar provides the RabbitMQ Source connector and RabbitMQ Sink connector to get through the data transfer between Pulsar and RabbitMQ. Yet, there was still a strong demand from those looking to switch from other AMQP applications to Pulsar.
StreamNative and ChinaMobile collaboration
StreamNative was receiving a lot of inbound requests for help migrating from other messaging systems to Pulsar and recognized the need to support other messaging protocols (such as AMQP and Kafka) natively on Pulsar. StreamNative began working on introducing a general protocol handler framework in Pulsar that would allow developers using other messaging protocols to use Pulsar.
ChinaMobile is the Gold Member of OpenStack Foundation and has the largest OpenStack cluster deployment practice in the world. RabbitMQ is the default integration of the message middleware in OpenStack, and ChinaMobile has encountered great challenges in the deployment and maintenance of RabbitMQ. In the OpenStack system, RabbitMQ, as an RPC communication component, has a large number of messages flowing in and out. During the operation process, there is often a backlog of messages. This will cause memory exceptions, and processes will often be stuck due to memory exceptions. On the other hand, RabbitMQ's mirrored queue is used in order to ensure high availability of data. When a node runs into an abnormal state, the entire cluster is unavailable regularly. Moreover, RabbitMQ's programming language erlang is obscure and difficult to troubleshoot. In summary, considering the instability of RabbitMQ cluster, the difficulty of operation and maintenance, and the difficulty of troubleshooting, ChinaMobile intends to develop a middleware product that can replace RabbitMQ.
At the same time, there are many customers in the ChinaMobile's public cloud who need to use AMQP message queues. However, RabbitMQ does not meet the condition for cloud access. Therefore, ChinaMobile's middleware team begins to investigate the self-developed technical route of AMQP message queue. By comparing Qpid, RocketMQ and Pulsar, ChinaMobile is attracted by Pulsar's unique architecture which decouples data serving and data storage into separate layers. After investigating Pulsar for a period of time, ChinaMobile finds that StreamNative has had KoP open-sourced, which makes ChinaMobile believe that it is feasible to develop AMQP based on Pulsar. ChinaMobile and StreamNative begin to cooperate on development of AMQP based on Pulsar.
Implementations
AoP architecture overview
AoP is implemented as a pluggable protocol handler that can support native AMQP wire protocol on Pulsar by leveraging Pulsar features such as Pulsar topics, cursors etc.
The diagram below illustrates a Pulsar cluster with the AoP protocol handler. Both the AMQP Proxy and AMQP protocol handler can run along with Pulsar brokers. Now, AoP is based on AMQP 0.9.1 wire protocol and we are considering adding support for AMQP 1.0 wire protocol.
AoP basic concepts
AMQP 0.9.1 introduces some basic concepts such as Exchange, Queue, and Router. It’s quite different from the Pulsar model. So it needs to find an approach that can leverage Pulsar’s existing features and associate them together. The following figure illustrates the message flow in AoP and discusses the details about the message persistence, message routing, and message delivery.
- When a producer sends a message to the AmqpExchange, the AmqpExchange persists the message into a Pulsar topic (original message topic).
- The replicator in the AmqpExchange replicates messages to the message routers.
- The message router decides whether to route this message to the AmqpQueue. If yes, the AmqpQueue persists the message ID into a Pulsar topic (index message topic).
- The AmqpQueue delivers the messages to the consumer.
AmqpExchange
An AmqpExchange has an original message topic for maintaining messages produced by the AMQP producer. And it has a replicator for replicating messages to the AMQP queues. The replicator is backed by a Pulsar durable cursor, which can ensure that the messages can be replicated to the queue and not be lost.
AmqpMessageRouter
The AmqpMessageRouter maintains the message routing type and the routing rules from an AmqpExchange to an AmqpQueue. The routing type and the routing rules also persist into the Pulsar storage layer. So we can recover the message router even if the broker is restarted.
AmqpQueue
An AmqpQueue has an index message Topic that stores IndexMessages that are routed to this queue. The IndexMessage consists of a message ID of the original message and the exchange name where the message comes from. When the AmqpQueue delivers messages to the consumers, it will read the original message data by the IndexMessage and dispatch the original message data to the consumers.
Vhost assignment
In AoP, an AMQP Vhost can be served by a Pulsar broker and a Pulsar broker can serve multiple Vhosts. So adding more Vhosts and brokers can achieve horizontal expansion. This allows you to set up a larger AoP cluster with many Vhosts.
A Vhost in AoP is backed by a Pulsar namespace with a single bundle. If a broker crashes, other brokers can take over the Vhosts that are maintained by this crashed broker. Also the Vhosts can leverage the broker load balance mechanism. The broker can relocate Vhost with a high workload to an idle broker. The following figure illustrates the Vhost assignment.
Proxy
The AoP Proxy is for finding the owner broker responsible for the Vhost when the client connects to the AMQP server and transfers data between the client and the owner broker. As described in the above section, the target Vhost is served by a broker in the cluster. This can be achieved by the topic lookup mechanism in Pulsar. This is why a Vhost can only be backed by a namespace with a single bundle. If the namespace has multiple bundles, we cannot find the owner broker by the Vhost name.
The following figure illustrates the AoP Proxy service workflow.
- The AMQP client creates a connection with the AoP Proxy.
- The Proxy service sends a lookup request to Pulsar cluster to find out the owner broker URL of the Vhost along with the connection.
- The Pulsar cluster returns the owner broker URL to the AoP Proxy.
- The AoP Proxy creates a connection to the owner broker and starts to transfer data between the AMQP client and the owner Broker.
Currently, the AoP Proxy service works with the Pulsar Broker together. Users could choose whether to start the Proxy service by the configuration(amqpProxyEnable).
Try it out
AoP is open sourced under Apache License V2 in https://github.com/streamnative/aop, which is available in the StreamNative Hub. You can download the AoP protocol handler through this link. For details about how to install and use the AoP protocol handler, see here. In future, the AoP protocol handler will be embedded in the StreamNative Platform Version 1.1. You can also download the StreamNative platform to try out all the features of AoP. If you already have a Pulsar cluster running and would like to enable AMQP protocol support on it, you can follow the instructions to install the AoP protocol handler to your existing Pulsar cluster.
Here is more information on AoP's code and document. We are looking forward to your issues, and PRs. You can also join #aop channel in Pulsar Slack to discuss all things about AMQP-on-Pulsar.
Thanks
The AoP project was originally initiated by StreamNative. The ChinaMobile development team played a very important role in the development process. Many thanks to Zongtang Hu, Shaojie Wang and Hao Zhang from ChinaMobile for their contributions to this project!
Have something to say about this article? Share it with us on Twitter or contact us.
Newsletter
Our strategies and tactics delivered right to your inbox