Blog
10 min read

Integrating Apache Pulsar with BigQuery

One common data engineering task is offloading data into your company’s data lake. You may also want to transform and enrich that data during the ingestion process to prepare it for analysis. This blog post will show you how to integrate Apache Pulsar with Google BigQuery to extract meaningful insights.

Let’s assume that you have files stored in an external file system and you want to ingest the contents into your data lake. You will need to build a data pipeline to ingest, transform, and offload the data like the one in the figure below.

illustration ingesting and transforming data

The data pipeline leverages Apache Pulsar as the message bus and performs simple transformations on the data before storing it in a more readable format such as JSON. You can then offload these JSON records to your data lake and transform them into a more queryable format such as parquet using the Apache Pulsar Cloud Storage Sink connector.

Building the Data Pipeline

Now that you have an idea of what you are trying to accomplish, let’s walk through the steps required to implement this pipeline.

Ingesting the Data

First, you need to read the data from the file system and send it to a Pulsar topic. The code snippet below creates a producer that writes <String> messages inside the raw event topic.

<script>
// 1. Load input data file.
List events = IngestionUtils.loadEventData();

// 2. Instantiate Pulsar Client.
PulsarClient pulsarClient = ClientUtils.initPulsarClient(Optional.empty());

// 3. Create a Pulsar Producer.
Producer eventProducer = pulsarClient.newProducer(Schema.STRING)
                .topic(AppConfig.RAW_EVENTS_TOPIC)
                .producerName("raw-events-producer")
                .blockIfQueueFull(true)
                .create();

// 4. Send some messages.
for (String event: events) {
       eventProducer.newMessage()
                 .value(event)
                 .sendAsync()
                 .whenComplete(callback);
   }
<script> 

The Pulsar topic includes the file contents.

Transforming the Data

Second, you must complete the following steps to transform the data.

  • Read the <String> messages from the raw events topic.
  • Parse the messages as an Event object.
  • Write the messages into a downstream parsed events topic in JSON format.
  1. Read the messages by using a Pulsar Function with the following signature Function<String, Event>.
<script>
public class EventParserFunc implements Function {
    private static Logger logger;

    @Override
    public Event process(String input, Context context) throws Exception {
        if (logger == null) {
            logger = context.getLogger();
        }
        logger.info("Received input: " + input);
        Event event = IngestionUtils.lineToEvent(input);
        logger.info("Parsed event: " + event);
        return event;
    }
}<script> 
  1. Deploy the functions using the following configuration file.
<script>
className: io.streamnative.functions.EventParserFunc
tenant: public
namespace: default
name: "event_parser_func"
inputs:
  - "persistent://public/default/raw_events"
output: "persistent://public/default/parsed_events"
parallelism: 1
logTopic: "persistent://public/default/parsed_events_logs"
autoAck: true
cleanupSubscription: true
subName: "parsed_events_sub"
<script> 

The important part here is the className, which is the path for the parsing function, the input topic name, and the output topic name.

  1. Run the mvn clean package command to package the code and generate a jar file.
  2. Deploy the Pulsar Function on the cluster.
<script>
bin/pulsar-admin functions create \
 --function-config-file config/parser_func_config.yaml \
 --jar myjars/examples.jar
<script> 

The –function-config-file points to the configuration file and the –jar option specifies the path for the jar file.

You have successfully deployed the Pulsar Function that will transform your messages.

Offload the Data to Google Cloud Storage

The third step is to deploy the Cloud Sink Connector that will listen to the parsed events topic and store the incoming messages into Google Cloud Storage in the Avro format.

  1. Deploy the connector by providing a configuration file like the one below.
<script>
tenant: "public"
namespace: "default"
name: "gcs-sink"
inputs:
  - "persistent://public/default/parsed_events"
parallelism: 1

configs:
  provider: "google-cloud-storage"
  gcsServiceAccountKeyFileContent: >
    {
      "type": "service_account",
      "project_id": "",
      "private_key_id": "",
      "private_key": "",
      "client_email": "",
      "client_id": "",
      "auth_uri": "",
      "token_uri": "",
      "auth_provider_x509_cert_url": "",
      "client_x509_cert_url": ""
    }

  bucket: "eventsbucket311"
  region: "us-west1"
  endpoint: "https://storage.googleapis.com/"
  formatType: "parquet"
  partitionerType: "time"
  timePartitionPattern: "yyyy-MM-dd"
  timePartitionDuration: "1d"
  batchSize: 10000
  batchTimeMs: 60000
<script> 

The configs section includes the different configurations you want to tune for setting up the connector. You can find all the available configuration options in the connector repository.

Let’s walk through the example code above.

First, specify the connection credentials as part of the configuration. (You can also pass a file.) Next, specify the formatType (parquet) and the partitionerType (time) to partition based on the date. Typically in a streaming pipeline, you don’t want to produce too many small files because they will slow down your queries if the data gets too large. In this example use case, a new file is created every 10,000 messages.

  1. Deploy the connector on your cluster by running the following command:
<script>
bin/pulsar-admin sink create \
--sink-config-file config/gcs_sink.yaml \
--name gcs-sink --archive connectors/pulsar-io-cloud-storage-2.8.1.30.nar
<script> 

The –sink-config-file provides the path to the configuration file, -name specifies the connector name, and the last line specifies the .nar file location.

With the Pulsar function and connector up and running, you are ready to execute the producer code and generate some messages. The sample data file contains 49,999 lines (50,000 including the header).

  1. Run the producer and then navigate to the Google Cloud Storage console where you can verify that you ingested new files and all of your records are accounted for.

Querying on Google Cloud

In Google Cloud Storage, you should see a new folder with the tenant name you specified when you created the topic inside Pulsar.

Querying on Google Cloud interface

In the previous section, the example code uses the public tenant so your folder structure should be public -> default -> topic name -> date.

Before you can go into BigQuery and start querying your data, you’ll need to set up a dataset and then create a new table based on the parquet file you have on Google Cloud Storage.

Previously, you specified you wanted a batch size of 10,000 records and you sent 49,999 messages. You should see 5 parquet files.

Now, you can go into BigQuery and create tables based on your data and start executing queries to validate that everything is in place.

  1. Create a dataset.
Querying on Google Cloud interface
  1. Create a table.
Querying on Google Cloud interface
  1. Verify that all your data is in place and ready for analysis jobs by running Select *.
Querying on Google Cloud interface

Querying on Google Cloud interface

Congratulations, you have successfully integrated Apache Pulsar with BigQuery!

Resources

Ioannis Polyzos
Ioannis is a Solutions Engineer at StreamNative with a focus on data intensive systems. He helps companies modernize their data systems by leveraging Big Data technologies. He is passionate about fast & scalable data pipelines, streaming data flows and Machine Learning systems.

Newsletter

Our strategies and tactics delivered right to your inbox

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.