sharetwitterlinkedIn

Streaming Data Pipelines with Pulsar IO

November 10, 2021
head img

Building modern data infrastructure is hard. Organizations today need to be able to manage large volumes of heterogeneous data that is being generated and delivered around the clock. With the quantity and velocity of data and the different needs it must serve, there is no "one size fits all solution." Instead, organizations must be able to move data between different systems in order to store, process and serve the data.

Historically, organizations have used a number of different tools, such as Apache Kafka for streaming workloads and RabbitMQ for messaging workloads, to try to move data. Today, organizations are streamlining this process with Apache Pulsar.

Apache Pulsar is a cloud-native, distributed messaging and streaming platform. Designed to serve modern data needs, Pulsar supports flexible messaging semantics, tiered storage, multi-tenancy, and geo-replication. The Pulsar project has experienced rapid community growth, ecosystem expansion, and global adoption since it became a top-level Apache Software Foundation project in 2018. With Pulsar as the backbone of data infrastructure, companies are able to move data in a fast and scalable way. In this blog post we will take a look at how you can easily ingress and egress data between Pulsar and external systems with Pulsar IO.

1. What is Pulsar IO?

Pulsar IO is a complete toolkit for creating, deploying, and managing Pulsar connectors that integrate with external systems like key/value stores, distributed file systems, search indexes, databases, data warehouses, other messaging systems and more. Since Pulsar IO is built on top of Pulsar’s serverless computing layer known as Pulsar Function, writing a Pulsar IO connector is as simple as writing a Pulsar Function.

With Pulsar IO, you can easily move data in and out of Pulsar by either using existing Pulsar connectors or writing your own custom connectors. Benefits of leveraging Pulsar IO include:

  • There are many existing Pulsar IO connectors for external systems, such as Apache Kafka, Cassandra, and Aerospike. Using these connectors help reduce time to production, since all the necessary pieces for creating the integrations are in place. Developers just need to provide configurations (like connection urls and credentials) to run the connectors.
  • Pulsar IO comes with managed runtime, which takes care of execution, scheduling, scaling, and fault tolerance. Developers can focus on configurations and business logics.
  • You can reduce boilerplate code for producing and consuming applications, by using the provided interfaces.
  • You can easily scale out - for cases that you need more instances to handle the incoming traffic - by changing one simple configuration value. If you use the Kubernetes Runtime, elastic scaling on traffic demand comes out of the box.
  • Pulsar IO helps you leverage schemas by specifying the type of Schema on the data models and make sure that schema enforcement is in place - Json, Avro and Protobufs are supported.

2. Pulsar IO Runtime

As Pulsar IO is built on top of Pulsar Function, it has the same runtime options. When deploying Pulsar IO connectors you have the following options:

  • Thread: Runs inside the same JVM as the worker. (Normally used for testing purposes and local run. Not recommended for production deployments.)
  • Process: Runs in a different process and you can use multiple workers to scale out across multiple nodes.
  • Kubernetes: Runs as a pod inside your Kubernetes cluster and the worker coordinates with Kubernetes. This allows leveraging all the benefits a cloud-native environment like Kubernetes has to offer, like easy scale-out.

3. Pulsar IO Interfaces

As already mentioned, Pulsar IO reduces the boilerplate code required for producing and consuming application. It does so, by providing different base interfaces that abstract away the boilerplate code and allow us to focus on the business logic.

The Pulsar IO supports base interfaces for Sources and Sinks. Source connectors allow you to bring data into Pulsar from external systems, while Sink connectors can be used to move data out of Pulsar and into an external system such as a database.

There is also a specialized type of Source connector known as the Push Source. The Push Source connectors make it easy to implement certain integrations that require to push data. A Push Source example can be a change data capture source system that, after receiving a new change, automatically pushes that change into Pulsar.

The Source Interface

public interface Source<T> extends AutoCloseable {

    /**
     * Open connector with configuration.
     *
     * @param config initialization config
     * @param sourceContext environment where the source connector is running
     * @throws Exception IO type exceptions when opening a connector
     */
    void open(final Map<String, Object> config, SourceContext sourceContext) throws Exception;

    /**
     * Reads the next message from source.
     * If source does not have any new messages, this call should block.
     * @return next message from source.  The return result should never be null
     * @throws Exception
     */
    Record<T> read() throws Exception;
}

The Push Source Interface

public interface BatchSource<T> extends AutoCloseable {

    /**
     * Open connector with configuration.
     *
     * @param config config that's supplied for source
     * @param context environment where the source connector is running
     * @throws Exception IO type exceptions when opening a connector
     */
    void open(final Map<String, Object> config, SourceContext context) throws Exception;

    /**
     * Discovery phase of a connector.  This phase will only be run on one instance, i.e. instance 0, of the connector.
     * Implementations use the taskEater consumer to output serialized representation of tasks as they are discovered.
     *
     * @param taskEater function to notify the framework about the new task received.
     * @throws Exception during discover
     */
    void discover(Consumer<byte[]> taskEater) throws Exception;

    /**
     * Called when a new task appears for this connector instance.
     *
     * @param task the serialized representation of the task
     */
    void prepare(byte[] task) throws Exception;

    /**
     * Read data and return a record
     * Return null if no more records are present for this task
     * @return a record
     */
    Record<T> readNext() throws Exception;
}

The Sink Interface

public interface Sink<T> extends AutoCloseable {
    /**
     * Open connector with configuration.
     *
     * @param config initialization config
     * @param sinkContext environment where the sink connector is running
     * @throws Exception IO type exceptions when opening a connector
     */
    void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception;

    /**
     * Write a message to Sink.
     *
     * @param record record to write to sink
     * @throws Exception
     */
    void write(Record<T> record) throws Exception;
}

4. Conclusion

Apache Pulsar is able to serve as the backbone of modern data infrastructure. It enables organizations to move data in a fast and scalable way. Pulsar IO is a connector framework that gives developers all the necessary tools to create, deploy, and manage Pulsar connectors that integrate with different systems. It allows developers to focus on the application logic by abstracting away all the boilerplate code.

5. Further Reading

If you are interested in learning more and build your own connectors take a look at the following resources:

© StreamNative, Inc. 2021Apache, Apache Pulsar, Apache BookKeeper, Apache Flink, and associated open source project names are trademarks of the Apache Software Foundation.TermsPrivacy