Announcing: The Pulsar PMC Published The 2020 Apache Pulsar User Survey Report!

Overview
Get started
Install and upgrade
Configure
Secure
Manage and monitor
Connect
Process
Overview
Pulsar Flink Connector
Pulsar Spark Connector
Overview
Link
Read data from Pulsar (source)
Write data to Pulsar (sink)
Configure
Secure
Tutorial
Pulsar Functions
Release notes

Write data to Pulsar (sink)

Prerequisite

You can use one of the following methods to use the Pulsar Spark connector and you need to configure it before using the connector.

  • Client library: you can use all features of Pulsar Spark connector (Java and Scala).

  • CLI: you can use all features of Pulsar Spark connector in interactive mode (Scala).

Client library

As with any Spark applications, spark-submit is used to launch your application.

pulsar-spark-connector_{{SCALA_BINARY_VERSION}} and its dependencies can be directly added to spark-submit using --packages.

Example

$ ./bin/spark-submit 
  --packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}}
  --repositories https://dl.bintray.com/streamnative/maven
  ...

CLI

For experimenting on spark-shell (or pyspark for Python), you can also use --packages to add pulsar-spark-connector_{{SCALA_BINARY_VERSION}} and its dependencies directly.

Example

$ ./bin/spark-shell 
  --packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}}
  --repositories https://dl.bintray.com/streamnative/maven
  ...

When locating an artifact or library, --packages option checks the following repositories in order:

  1. Local maven repository

  2. Maven central repository

  3. Other repositories specified by --repositories

The format for the coordinates should be groupId:artifactId:version.

Tip

For more information about submitting applications with external dependencies, see application submission guide.

Write data to Pulsar (sink)

Overview

Pulsar Spark Connector allows Spark writing data to Pulsar.

The DataFrame written to Pulsar can have arbitrary schema, since each record in DataFrame is transformed as one message sent to Pulsar, fields of DataFrame are divided into two groups:

  • __key and __eventTime fields are encoded as metadata of Pulsar messages.

  • Other fields are grouped and encoded using AVRO and put in value().

Example

producer.newMessage().key(__key).value(avro_encoded_fields).eventTime(__eventTime)

Create a Pulsar sink for streaming queries

Example 1

This example writes key-value data from a DataFrame to a specific Pulsar topic specified in an option.

val ds = df
  .selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topic", "topic1")
  .start()

Example 2

This example writes key-value data from a DataFrame to Pulsar using a topic specified in the data.

val ds = df
  .selectExpr("__topic", "CAST(__key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .start()

Create a Pulsar sink for batch queries

Example 1

This example writes key-value data from a DataFrame to a specific Pulsar topic specified in an option.

df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topic", "topic1")
  .save()

Example 2

This example writes key-value data from a DataFrame to Pulsar using a topic specified in the data.

df.selectExpr("__topic", "CAST(__key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .save()

Limitation

Currently, Pulsar Spark Connector provides at-least-once semantic. Consequently, when writing either streaming queries or batch queries to Pulsar, some records may be duplicated.

A possible solution to remove duplicates when reading the written data could be to introduce a primary (unique) key that can be used to perform deduplication when reading.