Jun 11, 2020
8 min read

How to trace Pulsar messages with OpenTracing and Jaeger

Penghui Li
Engineering Lead of Messaging Team, StreamNative

OpenTracing is an open distributed tracing standard for applications and OSS packages. Many tracing backend services support OpenTracing APIs, such as Jaeger, Zipkin and SkyWalking.

This blog guides you through every step of how to trace Pulsar messages by Jaeger through OpenTracing API.

Prerequisite

Before getting started, make sure you have installed JDK 8, Maven 3, and Pulsar (cluster or standalone). If you do not have an available Pulsar, follow the instructions to install one.

Step 1: start a Jaeger backend

  1. Start a Jaeger backend in Docker.
docker run -d -p 6831:6831/udp -p 16686:16686 jaegertracing/all-in-one:latest

If you have successfully started Jaeger, you can open the Jaeger UI website successfully.

Tip : If you do not have a Jager Docker environment, you can download the binaries or build from source.
  1. Visit http://localhost:16686 to open the Jaeger UI website without a username or password.
jager ui interface

Step 2: add maven dependencies

This step uses OpenTracing Pulsar Client, which is integrated with the Pulsar Client and OpenTracing APIs based on Pulsar Client Interceptors, to trace Pulsar messages. Developed by StreamNative, the OpenTracing Pulsar Client acts as a monitoring tool in the StreamNatvie Hub.

Add Jaeger client dependency to connect to Jaeger backend.

<dependency>
 <groupId>org.apache.pulsar</groupId>
 <artifactId>pulsar-client</artifactId>
 <version>2.5.1</version>
</dependency>

<dependency>
 <groupId>io.streamnative</groupId>
 <artifactId>opentracing-pulsar-client</artifactId>
 <version>0.1.0</version>
</dependency>

<dependency>
  <groupId>io.jaegertracing</groupId>
  <artifactId>jaeger-client</artifactId>
  <version>1.2.0</version>
</dependency>

Step 3: use OpenTracing Pulsar Client

For easier understanding, this blog takes a usage scenario as an example. Suppose that you have three jobs and two topics. Job-1 publishes messages to the topic-A and Job-2 consumes messages from the topic-A. When Job-2 receives a message from topic-A, Job-2 sends a message to the topic-B, and then Job-3 consumes messages from topic-B. So there are two topics, two producers and two consumers in this scenario.

According to the scenario described previously, you need to start three applications to finish this job.

  • Job-1: publish messages to topic-A
  • Job-2: consume messages from topic-A and publish messages to topic-B
  • Job-3: consume messages from topic-B

Job-1

This example shows how to publish messages to topic-A in Java.

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-1").withSampler(samplerConfig).withReporter(reporterConfig);

Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

Producer<String> producerA = client.newProducer(Schema.STRING)
        .topic("topic-A")
        .intercept(new TracingProducerInterceptor())
        .create();

for (int i = 0; i < 10; i++) {
    producerA.newMessage().value(String.format("[%d] Hello", i)).send();
}

Job-2

This example shows how to consume messages from topic-A and publish messages to topic-B in Java.

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-2").withSampler(samplerConfig).withReporter(reporterConfig);

Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic("topic-A")
        .subscriptionName("open-tracing")
        .subscriptionType(SubscriptionType.Shared)
        .intercept(new TracingConsumerInterceptor<>())
        .subscribe();

Producer<String> producerB = client.newProducer(Schema.STRING)
        .topic("topic-B")
        .intercept(new TracingProducerInterceptor())
        .create();

while (true) {
    Message<String> received = consumer.receive();
    SpanContext context = TracingPulsarUtils.extractSpanContext(received, tracer);
    TypedMessageBuilder<String> messageBuilder = producerB.newMessage();
    messageBuilder.value(received.getValue() + " Pulsar and OpenTracing!");
    // Inject parent span context
    tracer.inject(context, Format.Builtin.TEXT_MAP, new TypeMessageBuilderInjectAdapter(messageBuilder));
    messageBuilder.send();
    consumer.acknowledge(received);
}

Job-3

This example shows how to consume messages from topic-B in Java.

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-3").withSampler(samplerConfig).withReporter(reporterConfig);

Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic("topic-B")
        .subscriptionName("open-tracing")
        .subscriptionType(SubscriptionType.Shared)
        .intercept(new TracingConsumerInterceptor<>())
        .subscribe();

while (true) {
    Message<String> received = consumer.receive();
    System.out.println(received.getValue());
    consumer.acknowledge(received);
}

Now, you can run Job-3, Job-2 and Job-1 one by one. You can see the Job-3 receives logs in the console as below:

[0] Hello Pulsar and OpenTracing!
[1] Hello Pulsar and OpenTracing!
...
[9] Hello Pulsar and OpenTracing!

Congratulations, your jobs work well. Now you can open the Jaeger UI again and there are ten traces in the Jaeger.

jager ui interface search

You can click a job name to view the details of a trace.

jager ui interface Job 1

The span name is formatted as To__<topic-name> and From__<topic-name>__<subscription_name>, which makes it easy to tell whether it is a producer or a consumer.

Summary

As you can see, OpenTracing Pulsar Client integrates Pulsar client and OpenTracing to trace Pulsar messages easily. If you are using Pulsar and OpenTracing in your application, do not hesitate to try it out!

Additionally, I also wrote a tech blog for How to Use Apache SkyWalking to Trace Apache Pulsar Messages. For the complete content, see here.

Penghui Li
Penghui Li is passionate about helping organizations to architect and implement messaging services. Prior to StreamNative, Penghui was a Software Engineer at Zhaopin.com, where he was the leading Pulsar advocate and helped the company adopt and implement the technology. He is an Apache Pulsar Committer and PMC member. Penghui lives in Beijing, China.

Related articles

Oct 30, 2024
10 min

Announcing the Ursa Engine Public Preview for StreamNative BYOC Clusters

Oct 30, 2024
15 min

Introducing Universal Linking: Revolutionizing Data Replication and Interoperability Across Data Streaming Systems

Newsletter

Our strategies and tactics delivered right to your inbox

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Pulsar Tutorials