Pulsar


A Kafka Connect source connector to write events from Apache Pulsar to Apache Kafka.

KCQL support 

The following KCQL is supported:

INSERT INTO kafka_topic
SELECT FIELDS, ...
FROM pulsar_topic
[WITHSUBSCRIPTION=[SHARED|EXCLUSIVE|FAILOVER]]
[WITHCONVERTER=`myclass`]

Selection of fields from the Pulsar message is not supported.

Examples:

-- shared subscription and batching
INSERT INTO kafka-topicA
SELECT *
FROM persistent://lenses/standalone/connect/kafka-topic
BATCH = 100
WITHSUBSCRIPTION=SHARED


-- default failover
INSERT INTO kafka-topicA
SELECT *
FROM persistent://lenses/standalone/connect/kafka-topic
WITHCONVERTER=`myclass`

Concepts 

Subscription type 

The connector supports the following Pulsar subscription types set using the WITHSUBSCRIPTION clause:

  • SHARED
  • EXCLUSIVE
  • FAILOVER (default)

Message Converters 

The connector supports converters to handle different messages payload format in the source topic. See source record converters.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=pulsar
docker-compose up -d pulsar

Inserting test data 

Login into the pulsar container:

docker exec -ti pulsar /bin/bash

Run the following command to generate messages:

bin/pulsar-client produce \
    persistent://lenses/standalone/connect/kafka-topic \
    --messages 'hello-pulsar'

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select Pulsar as the source and paste the following:

name=pulsar
connector.class=com.datamountaineer.streamreactor.connect.pulsar.source.PulsarSourceConnector
tasks.max=1
connect.pulsar.kcql=INSERT INTO pulsar SELECT * FROM persistent://lenses/standalone/connect/kafka-topic BATCH = 10
connect.pulsar.hosts=pulsar://pulsar:6650
connect.progress.enabled=true

To start the connector using the command line, log into the lenses-box container:


docker exec -ti lenses-box /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create pulsar < connector.properties

connect-cli create pulsar < connector.properties

Wait for the connector to start and check it’s running:

connect-cli status pulsar

Check for records in Kafka 

Check the records in Lenses or with via the console:

kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic pulsar \
    --from-beginning

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.pulsar.hostsContains the Pulsar connection end points.string
connect.pulsar.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.pulsar.retry.intervalThe time in milliseconds between retries.int60000
connect.pulsar.max.retriesThe maximum number of times to try the write again.int20
connect.pulsar.converter.throw.on.errorIf set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false.booleanfalse
connect.converter.avro.schemasIf the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILEstring
connect.pulsar.kcqlContains the Kafka Connect Query Language describing the flow from Apache Pulsar to Apache Kafka topicsstring
connect.pulsar.polling.timeoutProvides the timeout to poll incoming messages in milliseconds. Connect will write to Kafka is this reached of the connect.pulsar.batch.size. Which ever is first.int1000
connect.pulsar.batch.sizeThe number of records Connect will wait before writing to Kafka. The connector will return if connect.pulsar.polling.timeout is reached first.int100
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
connect.pulsar.tls.ca.certProvides the path to the CA certificate file to use with the Pulsar connectionstring
connect.pulsar.tls.certProvides the path to the certificate file to use with the Pulsar connectionstring
connect.pulsar.tls.keyCertificate private [config] key file path.string
--
Last modified: November 18, 2024