Announcing: The Pulsar PMC Published The 2020 Apache Pulsar User Survey Report!

Overview
Get started
Install and upgrade
Configure
Secure
Manage and monitor
Connect
Process
Overview
Pulsar Flink Connector
Pulsar Spark Connector
Overview
Link
Read data from Pulsar (source)
Write data to Pulsar (sink)
Configure
Secure
Tutorial
Pulsar Functions
Release notes

Read data from Pulsar (source)

Prerequisite

You can use one of the following methods to use the Pulsar Spark connector and you need to configure it before using the connector.

  • Client library: you can use all features of Pulsar Spark connector (Java and Scala).

  • CLI: you can use all features of Pulsar Spark connector in interactive mode (Scala).

Client library

As with any Spark applications, spark-submit is used to launch your application.

pulsar-spark-connector_{{SCALA_BINARY_VERSION}} and its dependencies can be directly added to spark-submit using --packages.

Example

$ ./bin/spark-submit 
  --packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}}
  --repositories https://dl.bintray.com/streamnative/maven
  ...

CLI

For experimenting on spark-shell (or pyspark for Python), you can also use --packages to add pulsar-spark-connector_{{SCALA_BINARY_VERSION}} and its dependencies directly.

Example

$ ./bin/spark-shell 
  --packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}}
  --repositories https://dl.bintray.com/streamnative/maven
  ...

When locating an artifact or library, --packages option checks the following repositories in order:

  1. Local maven repository

  2. Maven central repository

  3. Other repositories specified by --repositories

The format for the coordinates should be groupId:artifactId:version.

Tip

For more information about submitting applications with external dependencies, see application submission guide.

Read data from Pulsar (source)

Pulsar Spark Connector allows Spark reading data from Pulsar.

Create a Pulsar source for streaming queries

The following example subscribes to one topic, multiple topics, and a pattern.

// Subscribe to 1 topic
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topic", "topic1")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topics", "topic1,topic2")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topicsPattern", "topic.*")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Tip

For more information on how to use other language bindings for Spark Structured Streaming, see structured streaming programming guide.

Create a Pulsar source for batch queries

If you have a use case that is better suited for batch processing, you can create a Dataset/DataFrame for a defined range of offsets.

The following examples subscribe to one topic, multiple topics, and a pattern.

// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topic", "topic1")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Pulsar offsets
import org.apache.spark.sql.pulsar.JsonUtils._
val startingOffsets = topicOffsets(Map("topic1" -> messageId1, "topic2" -> messageId2))
val endingOffsets = topicOffsets(...)
val df = spark
  .read
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("endingOffsets", endingOffsets)
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topicsPattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Schema of Pulsar source

Pulsar Spark Connector source can be used for all Pulsar topics:

  • For topics without schema or with primitive schema in Pulsar, message's payload is loaded to a value column with the corresponding type with Pulsar schema.

  • For topics with Avro or JSON schema, their field names and field types are kept in the result rows.

For both topics, each row in the source has the following metadata fields.

Column Type
__key Binary
__topic String
__messageId Binary
__publishTime Timestamp
__eventTime Timestamp

Example 1

The following is the schema (as a DataFrame) of a Pulsar topic with Schema.DOUBLE.

root
|-- value: double (nullable = false)
|-- __key: binary (nullable = true)
|-- __topic: string (nullable = true)
|-- __messageId: binary (nullable = true)
|-- __publishTime: timestamp (nullable = true)
|-- __eventTime: timestamp (nullable = true)

Example 2

The Pulsar topic with AVRO schema s converted to a Spark table has the following schema as a DataFrame/DataSet.

case class Foo(i: Int, f: Float, bar: Bar)
  case class Bar(b: Boolean, s: String)
  val s = Schema.AVRO(Foo.getClass)
root
 |-- i: integer (nullable = false)
 |-- f: float (nullable = false)
 |-- bar: struct (nullable = true)
 |    |-- b: boolean (nullable = false)
 |    |-- s: string (nullable = true)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)