Announcing: The Pulsar PMC Published The 2020 Apache Pulsar User Survey Report!

Overview
Get started
Install and upgrade
Configure
Secure
Manage and monitor
Connect
Process
Overview
Pulsar Flink Connector
Overview
Link
Read data from Pulsar (source)
Write data to Pulsar (sink)
Configure
Secure
Use Pulsar catalog
Tutorial
Pulsar Spark Connector
Pulsar Functions
Release notes

Use Pulsar catalog

As with DBMSs, Flink provides a way to treat data source as a catalog and manipulate data in SQL way.

Flink always searches for tables, views, and UDFs in the current catalog and database. To use Pulsar catalog and treat topics in Pulsar as tables in Flink, you should use pulsarcatalog that has been defined in ./conf/sql-client-defaults.yaml, which defines the default behavior of Pulsar SQL client after it is started.

tableEnv.useCatalog("pulsarcatalog")
tableEnv.useDatabase("public/default")
tableEnv.scan("topic0")
Flink SQL> USE CATALOG pulsarcatalog;
Flink SQL> USE `public/default`;
Flink SQL> select * from topic0;

After starting Pulsar SQL client, if you want to change its default behavior, you can use the SET command to override the following configurations which are optional in environment file.

Option Value Default Description
default-database The default database name. public/default A topic in Pulsar is treated as a table in Flink when using Pulsar catalog, therefore, database is another name for tenant/namespace. The database is the basic path for table lookup or creation.
start-mode "earliest"(streaming and batch queries)

"latest" (streaming query)
"latest" Controls where a table reads data from.
table-default-partitions The default number of partitions when a table is created in Table API. 5 A table in Pulsar catalog is a topic in Pulsar, when creating table in Pulsar catalog, table.partitions controls the number of partitions when creating a topic.