As with DBMSs, Flink provides a way to treat data source as a catalog and manipulate data in SQL way.
Flink always searches for tables, views, and UDFs in the current catalog and database. To use Pulsar catalog and treat topics in Pulsar as tables in Flink, you should use
pulsarcatalog that has been defined in
./conf/sql-client-defaults.yaml, which defines the default behavior of Pulsar SQL client after it is started.
tableEnv.useCatalog("pulsarcatalog") tableEnv.useDatabase("public/default") tableEnv.scan("topic0")
Flink SQL> USE CATALOG pulsarcatalog; Flink SQL> USE `public/default`; Flink SQL> select * from topic0;
After starting Pulsar SQL client, if you want to change its default behavior, you can use the
SET command to override the following configurations which are optional in environment file.
||The default database name.||public/default||A topic in Pulsar is treated as a table in Flink when using Pulsar catalog, therefore,
||"earliest"(streaming and batch queries)
"latest" (streaming query)
|"latest"||Controls where a table reads data from.|
||The default number of partitions when a table is created in Table API.||5||A table in Pulsar catalog is a topic in Pulsar, when creating table in Pulsar catalog,