Announcing: The Pulsar PMC Published The 2020 Apache Pulsar User Survey Report!

Overview
Get started
Install and upgrade
Configure
Secure
Manage and monitor
Connect
Process
Overview
Pulsar Flink Connector
Pulsar Spark Connector
Overview
Link
Read data from Pulsar (source)
Write data to Pulsar (sink)
Configure
Secure
Tutorial
Pulsar Functions
Release notes

Configure Pulsar Spark connector

You can set the following configurations for the Pulsar Spark connector (both source and sink).

Pulsar source configuration

For Pulsar source, you can set the following configurations for both batch and streaming queries.

Configuration Value Required / Optional Default Query type Description
topic A topic name string Required N/A Streaming and batch queries The topic to be consumed.

Only one of topic, topics or topicsPattern options can be specified for Pulsar source.
topics A comma-separated list of topics Required N/A Streaming and batch queries The topic list to be consumed.

Only one of topic, topics or topicsPattern options can be specified for Pulsar source.
topicsPattern A Java regex string Required N/A Streaming and batch queries The pattern used to subscribe to topic(s).

Only one of topic, topics or topicsPattern options can be specified for Pulsar source.
service.url A service URL of your Pulsar cluster Required N/A Streaming and batch queries Pulsar serviceUrl configuration.
admin.url A service HTTP URL of your Pulsar cluster Required N/A Streaming and batch queries Pulsar serviceHttpUrl configuration.
tartingOffsets "earliest" (streaming and batch queries)

"latest" (streaming query)

A JSON string

Example

""" {""topic-1"":[8,11,16,101,24,1,32,1],""topic-5"":[8,15,16,105,24,5,32,5]} """
Optional "earliest"(batch query)

"latest"(streaming query)
Streaming and batch queries startingOffsets option controls where a reader reads data from.

"earliest": lacks a valid offset, the reader reads all the data in the partition, starting from the very beginning.

"latest": lacks a valid offset, the reader reads from the newest records written after the reader starts running.

A JSON string: specifies a starting offset for each Topic. You can use org.apache.spark.sql.pulsar.JsonUtils.topicOffsets(Map[String, MessageId]) to convert a message offset to a JSON string.

Note:

For batch query, "latest" is not allowed, either implicitly specified or use MessageId.latest ([8,-1,-1,-1,-1,-1,-1,-1,-1,127,16,-1,-1,-1,-1,-1,-1,-1,-1,127]) in JSON.

For streaming query, "latest" only applies when a new query is started, and the resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at "earliest".
endingOffsets "latest"" (batch query)

A JSON string

Example

"topic-1":[8,12,16,102,24,2,32,2],"topic-5":[8,16,16,106,24,6,32,6]}
Optional "latest" Batch query Controls where a reader stops reading data.

"latest": the reader stops reading data at the latest record.

A JSON string: specifies an ending offset for each topic.

Note:

MessageId.earliest ([8,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,16,-1,-1,-1,-1,-1,-1,-1,-1,-1,1]) is not allowed.
pulsar.client.* Corresponding setting values Optional N/A Streaming and batch queries One Pulsar client instance is shared among all threads within the one executor. Client related settings can be set here prefixed by pulsar.client.
pulsar.reader.* Corresponding setting values Optional N/A Streaming and batch queries Topic data in Pulsar is actually read out using Pulsar Reader API.

Therefore, you can set reader related settings by specifying pulsar.reader.*.

Pulsar sink configuration

For Pulsar sink, you can set the following configurations for both batch and streaming queries.

Configuration Value Required / Optional Default Query type Description
service.url A service URL of your Pulsar cluster Required N/A Streaming and batch queries Pulsar serviceUrl configuration.
admin.url A service HTTP URL of your Pulsar cluster Required N/A Streaming and batch queries Pulsar serviceHttpUrl configuration.
topic The topic to write data to Optional N/A Streaming and batch queries If specified, each record is written to a same topic specified by this parameter, otherwise, you should construct a __topic column for each record to be used for message routing.
failOnDataLoss true

false
Optional true Streaming query Controls whether to fail a query when data is lost (for example, topics are deleted, or messages are deleted because of retention policy).

This may cause a false alarm. You can set it to false when it doesn't work as you expected.

A batch query always fails if it fails to read any data from the provided offsets due to data loss.
pulsar.client.* Corresponding setting values Optional N/A Streaming and batch queries One Pulsar client instance is shared among all threads within the one executor.

Client related settings can set here prefixed by pulsar.client.
pulsar.producer.* Corresponding setting values Optional N/A Streaming and batch queries Topic data in Pulsar is actually written out using Pulsar Producer API.

Therefore, you can set reader related settings by specifying pulsar.producer.*.