Blog
7 min read

Announcing Spring for Apache Pulsar

We are excited to announce the first milestone release of Spring for Apache Pulsar. This integration enables you to leverage the power of Apache Pulsar straight from your Spring applications.

Let's take a look at the benefits of using Apache Pulsar with Spring before jumping into an example application.

Why use Apache Pulsar with Spring

Spring is the world's most popular Java framework that helps developers create production-ready applications quickly, safely, and easily. It is a flexible framework that offers both out-of-the-box defaults to increase development efficiency, as well customization for any arising requirements. This makes it the perfect candidate to use when building your cloud-native applications.

Apache Pulsar is a cloud-native streaming and messaging platform that enables organizations to build scalable, reliable applications in elastic cloud environments. It combines the best features of traditional messaging and pub-sub systems. In Pulsar’s multi-layer architecture, each layer is scalable, distributed, and decoupled from the other layers. The separation of compute and storage allows you to scale both independently.

Together, Pulsar and Spring allow you to easily build data applications that are scalable, robust, and quick to develop. Integrating Pulsar with Spring microservices further enables seamless interoperation with services written in other languages. Spring for Apache Pulsar offers a toolkit to interface with Pulsar. From Templates to Listeners and Autoconfiguration, all the Spring concepts you love can now be used with Pulsar! Fitting Pulsar into your existing architecture is especially easy if you are using the Spring for Kafka or Spring AMQP integrations. Spring for Pulsar adopts the same concepts and makes you feel right at home.

How to use Spring for Apache Pulsar

We are going to build an example application that will consume signup data to alert the customer success team about new clients. Spring will run our application and provide configuration while Pulsar is used as a messaging bus to route our data.

illustration how to Spring for Apache Pulsar

The full source code for our example is available in this GitHub repository.

Check out this demo video to see Spring for Apache Pulsar in action:

Prerequisites

We are using Maven and Java 17 to run our application. To start using Spring for Apache Pulsar we first need to add it as a dependency to our Spring project.

<properties>
    <spring-pulsar.version>0.1.0-M1</spring-pulsar.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-boot-starter</artifactId>
        <version>${spring-pulsar.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>
<repositories>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>
<pluginRepositories>
    <pluginRepository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </pluginRepository>
</pluginRepositories>

To compile our application we can run mvn clean package. The application can be run using mvn spring-boot:run.

We also need a Pulsar cluster for the application to run against. We can use a local standalone Pulsar cluster, or use StreamNative Cloud to provide one for us.

Connecting to Pulsar

We can now configure our application to connect to Pulsar using Spring configuration. Let's add the following to our src/main/resources/application.yml.

<script>
spring:
  pulsar:
    client:
      service-url: pulsar+ssl://free.o-j8r1u.snio.cloud:6651
      auth-plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
      authentication:
        private-key: file:///Users/user/Downloads/o-j8r1u-free.json
        audience: urn:sn:pulsar:o-j8r1u:free
        issuer-url: https://auth.streamnative.cloud/
    producer:
      batching-enabled: false
      topic-name: persistent://public/default/signups-topic
<script> 

This enables us to connect to StreamNative cloud by using OAuth2 authentication. To retrieve your authentication credentials, please follow the StreamNative Cloud documentation. It also pre-configures a Pulsar producer to not batch messages and sets its default topic name.

Producing data

We can now start to send messages to our cluster. For this example we will generate fake signup data and continuously write it to our signup topic. Using Spring for Apache Pulsar, we can send messages by simply adding a PulsarTemplate to our application code.

<script>
@EnableScheduling
@SpringBootApplication
public class SignupApplication {
    private static final Logger log = LoggerFactory.getLogger(SignupApplication.class);
    
    @Autowired private PulsarTemplate signupTemplate;
    
    @Autowired private PulsarTemplate customerTemplate;
    
    @Scheduled(initialDelay = 5000, fixedRate = 5000)
    void publishSignupData() throws PulsarClientException {
        Signup signup = signupGenerator.generate();
        signupTemplate.setSchema(JSONSchema.of(Signup.class));
        signupTemplate.send(signup);
    }
   …
}
<script> 

The code above creates a scheduled task that generates a fake signup and sends it as a message to the default topic we configured in our application.yml. Note how easy it is to configure the PulsarTemplate to send the message with a JSON schema.

Consuming data

To derive value from our data, we now want to filter the incoming signups. For any signup that uses the ENTERPRISE tier, we will create a new Customer and publish a message on our customer-success topic. To consume the signup topic, we need to add a PulsarListener method to our SignupApplication class.

<script>
@PulsarListener(
    subscriptionName = "signup-consumer",
    topics = "signups-topic",
    schemaType = SchemaType.JSON)
void filterSignups(Signup signup) throws PulsarClientException {
    log.info(
        "{} {} ({}) just signed up for {} tier",
        signup.firstName(),
        signup.lastName(),
        signup.companyEmail(),
        signup.signupTier());

    if (signup.signupTier() == SignupTier.ENTERPRISE) {
        Customer customer = Customer.from(signup);
        customerTemplate.setSchema(JSONSchema.of(Customer.class));
        customerTemplate.send("customer-success", customer);
    }
}
<script> 

Behind the scenes, the PulsarListener annotation configures a Pulsar consumer to read from the specified topic(s) with the given schema. In our filterSignups method, we are using the second PulsarTemplate we added before. This time, we don't want to send messages to the default topic, so we pass in 'customer-success' as the topic name to write to.

Finally, our customer success team can now receive an alert about any new enterprise clients. To do so, they simply need to consume the 'customer-success' topic with the Customer schema.

<script>
@PulsarListener(
    subscriptionName = "customer-consumer",
    topics = "customer-success",
    schemaType = SchemaType.JSON)
void alertCustomerSuccess(Customer customer) {
    log.info(
        "## Start the onboarding for {} - {} {} ({}) - {} ##",
        customer.companyName(),
        customer.firstName(),
        customer.lastName(),
        customer.phoneNumber(),
        customer.companyEmail());
}
<script> 

Advanced features

Spring for Apache Pulsar offers many more advanced features. As an example, we want to show how to utilize a ProducerInterceptor for debug logging our messages.

First, we create a Spring configuration class that adds a ProducerInterceptor bean. Our ProducerInterceptor implementation only needs to log message information upon acknowledgment by the broker.

<script>
@Configuration(proxyBeanMethods = false)
class SignupConfiguration {

  @Bean
  ProducerInterceptor loggingInterceptor() {
    return new LoggingInterceptor();
  }

  static class LoggingInterceptor implements ProducerInterceptor {
    private static final Logger log = LoggerFactory.getLogger(LoggingInterceptor.class);

    @Override
    public void close() {
      // no-op
    }

    @Override
    public boolean eligible(Message message) {
      return true;
    }

    @Override
    public Message beforeSend(Producer producer, Message message) {
      return message;
    }

    @Override
    public void onSendAcknowledgement(
        Producer producer, Message message, MessageId msgId, Throwable exception) {
      log.debug("MessageId: {}, Value: {}", message.getMessageId(), message.getValue());
    }
  }
}
<script> 

Thanks to Spring's auto-configuration, our application will automatically configure our LoggingInterceptor to intercept messages on all Pulsar producers.

The only thing missing to see our interceptor in action is setting the log level to debug in the application.yml.

<script>
logging:
  level:
    io.streamnative.example: debug
<script> 

Conclusion

In this blog, we explored how to use Spring for Apache Pulsar to quickly build a sample application. The Spring for Apache Pulsar integration offers many more features, like subscription types, batching, and manual acknowledgment. For advanced applications that further require read or write access to data in external systems, we can add Pulsar IO connectors. If you are interested in learning more about Pulsar IO connectors, please visit StreamNative Hub.

Get involved

Alexander Preuss
Alexander Preuß is an Ecosystem Engineer at StreamNative. He has been working as a Software Engineer on distributed systems as well as a data engineering consultant for enterprise customers. He is currently based in Germany.

Newsletter

Our strategies and tactics delivered right to your inbox

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.