Jan 26, 2023
5 min read

Announcing the Hudi Sink Connector for Apache Pulsar

Yong Zhang
Software Engineer, StreamNative

We’re excited to announce the general availability of the Hudi Sink connector for Apache Pulsar. The connector enables seamless integration between Apache Hudi and Apache Pulsar, improving the diversity of the Apache Pulsar ecosystem. The Hudi + Pulsar connector offers a convenient, efficient, and flexible approach to moving data from Pulsar to Hudi without requiring user code.

For more information on why lakehouse technologies are growing in popularity, check out this blog.

See the Hudi Sink connector in action during the Pulsar Summit SF 2022 talk Unlocking the Power of Lakehouse Architectures with Pulsar and Hudi from Addison Higham (Chief Architect, StreamNative) and Alexey Kudinkin (Founding Engineer, Onehouse).

What is the Hudi Sink connector?

The Hudi Sink connector is a Pulsar IO connector that pulls data from Apache Pulsar topics and persists data to Hudi tables.

Diagram of how Pulsar connects to Hudi
Figure 1. Hudi sink

Why develop the Hudi Sink connector?

In the last 5 years, the rise of streaming data and the need for lower data latency have pushed data lakes to their limits. As a result, lakehouse technologies such as Apache Hudi have seen rapid adoption. Apache Pulsar, a distributed, open-source pub-sub messaging and streaming platform for real-time workloads, is a natural fit for lakehouse architectures. Integrating Apache Pulsar with Lakehouse streamlines data lifecycle management and data analysis.

StreamNative built the Hudi Sink Connector to provide Hudi users with a way to connect the flow of messages from Pulsar and use more powerful features, while avoiding problems with connectivity that can appear when there are intrinsic differences1 between systems or privacy requirements. 

The connector solves this problem by fully integrating with Pulsar (including its serverless functions, per-message processing, and event-stream processing). The connector presents a low-code solution with out-of-the-box capabilities such as multi-tenant connectivity, geo-replication, protocols for direct connection to end-user mobile clients or IoT clients, and more.

What are the benefits of using the Hudi Sink connector?

The integration between Hudi and Apache Pulsar provides three key benefits:

  • Simplicity: Quickly move data from Apache Pulsar to Hudi without any user code.
  • Efficiency: Reduce your time spent configuring the data layer. This means you more time to discover the maximum business value from real-time data in an effective way.
  • Scalability: Run in different modes (standalone or distributed). This allows you to build reactive data pipelines to meet business and operational needs in real time.

How do I get started with the Hudi Sink connector?

The following example shows how to configure the connector running in a standalone Pulsar service.

Prerequisites

First, you need to prepare these components:

Initialize the pyspark environment.

1. Create a virtualenv with python3.

python3 -m venv .hudi-pyspark && source .hudi-pyspark/bin/activate

2. Download pyspark.

pip install pyspark==3.2.1 && export PYSPARK_PYTHON=$(which python3)

Apache Pulsar provides a Pulsar IO feature to run the connector. Follow the steps below to quickly get the connector up and running.

Configure the sink connector with local Filesystem

1. Decompress the Pulsar package and go to its root directory.

tar -xvf apache-pulsar-2.10.1-bin.tar.gz && cd apache-pulsar-2.10.1

2. Start the Pulsar service with the daemon client tool.

bin/pulsar-daemon start standalone

3. Create a directory for storing table data.

mkdir hudi-sink

4. Create the sink configuration file hudi-sink.json. Note that you need to update archive and hoodie.base.path to the correct path.

{
         "tenant": "public",
         "namespace": "default",
         "name": "hudi-sink",
         "inputs": [
           "test-hudi-pulsar"
         ],
         "archive": "/path/to/pulsar-io-lakehouse-2.10.1.1.nar",
         "parallelism": 1,
         "processingGuarantees": "EFFECTIVELY_ONCE",
         "configs":   {
             "type": "hudi",
             "hoodie.table.name": "hudi-connector-test",
             "hoodie.table.type": "COPY_ON_WRITE",
             "hoodie.base.path": "file:///path/to/hudi-sink",
             "hoodie.datasource.write.recordkey.field": "id",
             "hoodie.datasource.write.partitionpath.field": "id",
    				 "maxRecordsPerCommit": "10"
         }
     }

5. Submit the Hudi sink with pulsar-admin.

bin/pulsar-admin sinks create --sink-config-file ${PWD}/hudi-sink.json

6. Check the sink status to confirm it is running.

bin/pulsar-admin sinks status --name hudi-sink

The expected output:

{
      "numInstances" : 1,
      "numRunning" : 1,
      "instances" : [ {
        "instanceId" : 0,
        "status" : {
          "running" : true,
          "error" : "",
          "numRestarts" : 0,
          "numReadFromPulsar" : 0,
          "numSystemExceptions" : 0,
          "latestSystemExceptions" : [ ],
          "numSinkExceptions" : 0,
          "latestSinkExceptions" : [ ],
          "numWrittenToSink" : 0,
          "lastReceivedTime" : 0,
          "workerId" : "c-standalone-fw-localhost-8080"
        }
      } ]
    }

numRunning shows 1 and running shows true mean that the sink is started successfully.

7. Produce 100 messages to the topic test-hudi-pulsar to make Hudi flush records to the table hudi-connector-test.

for i in {1..10}; do bin/pulsar-client produce -vs 'json:{"type":"record","name":"data","fields":[{"name":"id","type":"int"}]}' -m "{\"id\":$i}" test-hudi-pulsar; done

8. Check the sink status to confirm the message consumed.

bin/pulsar-admin sinks status --name hudi-sink

The expected output:

{
      "numInstances" : 1,
      "numRunning" : 1,
      "instances" : [ {
        "instanceId" : 0,
        "status" : {
          "running" : true,
          "error" : "",
          "numRestarts" : 0,
          "numReadFromPulsar" : 10,
          "numSystemExceptions" : 0,
          "latestSystemExceptions" : [ ],
          "numSinkExceptions" : 0,
          "latestSinkExceptions" : [ ],
          "numWrittenToSink" : 10,
          "lastReceivedTime" : 1657637475669,
          "workerId" : "c-standalone-fw-localhost-8080"
        }
      } ]
    }

numReadFromPulsar shows 10 and numWrittenToSink shows 10 mean that the messages are written into the sink.

9. Start pyspark with Hudi.

pyspark \
    --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1 \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
    --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

10. Execute the following code in the pyspark.

tablename="hudi-connector-test"
basepath="file:///path/to/hudi-sink"
val tripsSnapshotDF = spark.read.format("hudi").load(basepath)
tripsSnapshotDF.createOrReplaceTempView("pulsar")
spark.sql("select id from pulsar").show()

Then it will show the table hudi-connector-test content, which is produced from the Pulsar topic test-hudi-pulsar.

+---+
| id|
+---+
| 10|
|  9|
|  1|
|  7|
|  6|
|  5|
|  3|
|  8|
|  4|
|  2|
+---+

How can I get involved?

The Hudi Sink connector is a major step in the journey of integrating lakehouse systems into the Pulsar ecosystem. To get involved with the Hudi Sink connector for Apache Pulsar, check out the following featured resources:

  • Try out the Hudi Sink connector. To get started, download the connector and refer to the ReadMe that walks you through the whole process.
  • Make a contribution. The Hudi Sink connector is a community-driven service, which hosts its source code on the StreamNative GitHub repository. If you have any feature requests or bug reports, do not hesitate to share your feedback and ideas and submit a pull request.
  • Contact us. Feel free to create an issue on GitHub, send emails to the Pulsar mailing list, or message us on Twitter to get answers from Pulsar experts.
  • Pulsar Summit Europe 2023 is taking place virtually on May 23rd. Engage with the community by submitting a CFP or becoming a community sponsor (no fee required).

1. Intrinsic differences exist between platforms that have no notion of schema and the ones that have sophisticated schema capabilities because there is no simple way to translate between them. These platform differences range from traditional messaging like Amazon SQS to multi-level hierarchical Avro schema written to a data lake. Distinctions also exist between platforms relying on different data representations, such as Pandas DataFrames and simple messages.

Yong Zhang
Yong Zhang is an Apache Pulsar committer. He works as a software engineer at StreamNative.

Newsletter

Our strategies and tactics delivered right to your inbox

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Apache Pulsar Announcements
Pulsar Connectors