By clicking "Accept all cookies" you agree to have cookies stored on your device to improve site navigation, analyze site usage, and assist with our marketing efforts. See our privacy policy for more information.
Note: StreamNative now offers a unified approach to managing Pulsar clusters on Kubernetes systems, transitioning from two distinct versions of operators—Pulsar Operators (Basic Version) and StreamNative Operator (Advanced Version)—to a single, consolidated operator, StreamNative Operator, effective from the start of 2024. As part of this change, we will cease the release of new versions of Pulsar Operators, with future updates and enhancements being exclusively available through the StreamNative Operator, accessible only via StreamNative's paid services.
In this Part 3 blog, I will demonstrate how to containerize Pulsar client applications (producer and consumer) using Dockerfiles in VS Code. With Dockerfiles, we can build the container image in the local Docker daemon, test the image using docker run, tag the image and push it to the Docker registry. This is probably the most common approach for the cloud-native build process.
Preparation
In this demo, I used python venv to control the Python version. The following code snippet shows how I created the Python environment and Python library before opening the folder using code .
My initial VS Code interface looks like this. I also opened a terminal window to run kubectl or docker build in the same interface.
In Part 1, I exposed the proxy Service as a load balancer (external IP). This way, I can connect to the broker on Kubernetes directly from my home network. I will use that same Pulsar cluster on Kubernetes for this demo.
Let’s get started with the Python producer and consumer.
Create a Python client
1. Use ⇧⌘P to bring up the VS Code Command Palette, type “new,” and select “File: New Folder.” Add a folder called “producer.”
2. You can use the icon in the project explorer to create a new Python file, like test_producer.py.
3. Before you create a client (either producer or consumer), you need to create a topic first. You can use either pulsar-admin or admin restful API to create topics. I used kubectl exec to run the pulsar-admin command in the broker container and created a topic using the VS Code terminal window.
5. Now we are ready to type some Python codes. Copy the following snippet to test_producer.py and save the file.
import pulsar
client = pulsar.Client('pulsar://10.0.0.36:6650')
producer = client.create_producer(
'persistent://public/default/my-topic1',
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=10)
def producer_callback(res, msg_id):
print(f"message published {msg_id}")
i = 0
while i <1000:
producer.send_async(
f"Hello-{i}".encode('utf-8'),
producer_callback)
i+=1
6. In the terminal, you can run the producer code like this. Note that my laptop can access the Pulsar proxy IP directly.
source .py39/bin/activate
python producer/test_producer.py
2022-05-15 21:49:46.917 INFO [0x104948580] ClientConnection:182 | [ -> pulsar://10.0.0.36:6650] Create ClientConnection, timeout=10000
2022-05-15 21:49:46.918 INFO [0x104948580] ConnectionPool:96 | Created connection for pulsar://10.0.0.36:6650
2022-05-15 21:49:46.942 INFO [0x16b81b000] ClientConnection:368 | [10.0.0.7:59912 -> 10.0.0.36:6650] Connected to broker
2022-05-15 21:49:46.979 INFO [0x16b81b000] HandlerBase:64 | [persistent://public/default/my-topic1, ] Getting connection from pool
2022-05-15 21:49:46.986 INFO [0x16b81b000] ClientConnection:182 | [ -> pulsar://10.0.0.36:6650] Create ClientConnection, timeout=10000
2022-05-15 21:49:46.986 INFO [0x16b81b000] ConnectionPool:96 | Created connection for pulsar://my-broker-0.my-broker-headless.sn-platform.svc.cluster.local:6650
2022-05-15 21:49:46.993 INFO [0x16b81b000] ClientConnection:370 | [10.0.0.7:59913 -> 10.0.0.36:6650] Connected to broker through proxy. Logical broker: pulsar://my-broker-0.my-broker-headless.sn-platform.svc.cluster.local:6650
2022-05-15 21:49:47.040 INFO [0x16b81b000] ProducerImpl:188 | [persistent://public/default/my-topic1, ] Created producer on broker [10.0.0.7:59913 -> 10.0.0.36:6650]
7. The code returned without an error, but how do we know it published 1000 messages on the topic? Let’s use pulsar-admin to check the topic stats. Alternatively, you can use the restful endpoint to access the admin port 8080.
8. From the output, you can see that there are 1000 msgIn. With the client code ready, we can build the image and make this producer a Kubernetes Deployment in the next steps.
Containerize and deploy the producer application
1. Use the command palette to create a Dockerfile.
2. You can find the Dockerfile and requirements.txt created in your project folder. I moved the files to the producer folder and modified the content to fit the folder because I wanted to create two images for the producer and the consumer, respectively. Don’t forget to put the Python dependency (pulsar-client==2.9.2) in the requirements.txt file.
3. Now, we are ready to build the Docker image. Run the following command to build the image. Note that if you are using Mac M1, you need to specify the image platform to fit your Kubernetes worker OS (mine is Ubuntu). Also, remember to log in to your Docker Hub account. In my case, it is yuwsung1.
5. From the output above, you can see that the containerized producer published another 1000 messages on the topic.
However, this image is useless. The URL, topic name, and other producer properties are hard-coded in the Python code. Therefore, we need to set those properties to a Kubernetes ConfigMap and use a Deployment to mount the ConfigMap as container environment variables. Then in the Python code, we can import the OS module and read the environment variables to replace those properties.
Use a ConfigMap to manage producer properties
1. We can change the producer code by using environment variables:
import pulsar
import os
pulsar_url = os.environ.get('PULSAR_URL')
topic = os.environ.get('PULSAR_TOPIC')
client = pulsar.Client(pulsar_url)
producer = client.create_producer(
topic,
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=10)
def producer_callback(res, msg_id):
print(f"message published {msg_id}")
i = 0
while i <1000:
producer.send_async(
f"Hello-{i}".encode('utf-8'),
producer_callback)
i+=1
2. To test the code locally, I exported PULSAR_URL and TOPIC in my current local environment.
4. Note that the v2 needs PULSAR_URL and PULSAR_TOPIC ingested into the producer Pod. The followings are my ConfigMap and Deployment manifests for your reference.
6. From the above output, you can see that the new containerized producer mounted the Pulsar URL and topic name from the ConfigMap and produced another 1000 messages (4000 in total) to the topic.
Now we can follow the same steps to create a consumer container image and the corresponding Deployment. I will skip the steps in this tutorial, but you can find the consumer code in my GitHub repo.
In the next blog, we will discuss using a Cloud Native Builder, kpack, and ArgoCD to auto-build the container images.
More on Apache Pulsar
Pulsar has become one of the most active Apache projects over the past few years, with a vibrant community driving innovation and improvements to the project. Check out the following resources to learn more about Pulsar.
Pulsar Virtual Summit Europe 2023 will take place on Tuesday, May 23rd, 2023! See this blog to learn more and submit your session!
Spin up a Pulsar cluster in minutes with StreamNative Cloud. StreamNative Cloud provides a simple, fast, and cost-effective way to run Pulsar in the public cloud.
YuWei Sung is a Solutions Engineer at StreamNative. His career portfolio includes urban planning/geospatial analysis, data mining/machine learning, and distributed data systems. He has been in the field (presales, postsales, and a bit of support) for about a decade (EMC, Dell, Pivotal, VMware, and Yugabyte).