Pulsar


Kafka Connect sink connector for writing data from Kafka to Pulsar.

KCQL support 

The following KCQL is supported:

INSERT
INTO <pulsar-topic>
SELECT FIELD, ...
FROM <kafka-topic>
[WITHKEY(FIELD, ...)]
[BATCH=100]

Examples:

# Select all fields
INSERT INTO persistent://lenses/standalone/connect/kafka-topic SELECT * FROM kafka_topic
# Select individual fields
INSERT INTO persistent://lenses/standalone/connect/kafka-topic SELECT id, product_name FROM kafka_topic

Concepts 

The connector writes JSON messages to Pulsar topics.

Keyed Messages 

The connector key messages in Pulsar defined by value in the Kafka message using the WITHKEY clause.

Compression 

The connector supports compression on Pulsar messages using the WITHCOMPRESSION clause. The available values are:

  • ZLIB
  • LZ4

Batching 

The BATCH clause controls the batching of writes to Pulsar.

Partitioning 

The connector supports partitioning in Puslar using the WITHPARTITIONER clause. The available values are:

  • SINGLEPARTITION
  • ROUNDROBINPARTITION
  • CUSTOMPARTITION

Kafka payload support 

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)
  • Schema.Struct and JSON
  • No Schema and JSON

See connect payloads for more information.

Error polices 

The connector supports Error polices.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=pulsar
docker-compose up -d pulsar

Watch for message in Pulsar 

In the Pulsar contain start the consumer and wait for messages to arrive:

docker exec \
    -ti pulsar \
    bin/pulsar-client \
    consume \
    persistent://lenses/standalone/connect/orders \
    --subscription-name lenses

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select Pulsar as the sink and paste the following:

name=pulsar
connector.class=com.datamountaineer.streamreactor.connect.pulsar.sink.PulsarSinkConnector
tasks.max=1
topics=orders
connect.pulsar.kcql=INSERT INTO persistent://lenses/standalone/connect/orders SELECT * FROM orders
connect.pulsar.hosts=pulsar://pulsar:6650

To start the connector using the command line, log into the lenses-box container:


docker exec -ti lenses-box /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create pulsar < connector.properties

connect-cli create pulsar < connector.properties

Wait for the connector to start and check it’s running:

connect-cli status pulsar

Inserting test data 

In the lenses-box container start the kafka producer shell:


kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'

the console is now waiting for your input, enter the following:


{
  "id": 1,
  "created": "2016-05-06 13:53:00",
  "product": "OP-DAX-P-20150201-95.7",
  "price": 94.2,
  "qty": 100
}

Check for data in Pulsar 

In your Pulsar container console the data will arrive.

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.pulsar.hostsContains the Pulsar connection end points.string
connect.pulsar.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.pulsar.retry.intervalThe time in milliseconds between retries.int60000
connect.pulsar.max.retriesThe maximum number of times to try the write again.int20
connect.pulsar.kcqlContains the Kafka Connect Query Language describing the flow from Apache Pulsar to Apache Kafka topicsstring
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
connect.pulsar.tls.ca.certProvides the path to the CA certificate file to use with the Pulsar connectionstring
connect.pulsar.tls.certProvides the path to the certificate file to use with the Pulsar connectionstring
connect.pulsar.tls.keyCertificate private [config] key file path.string
--
Last modified: November 18, 2024