5.0
Pulsar
A Kafka Connector source connector to write events from Apache Pulsar to Apache Kafka.
KCQL support
The following KCQL is supported:
INSERT INTO kafka_topic
SELECT FIELDS, ...
FROM pulsar_topic
[WITHSUBSCRIPTION=[SHARED|EXCLUSIVE|FAILOVER]]
[WITHCONVERTER=`myclass`]
Selection of fields from the Pulsar message is not supported.
Examples:
-- shared subscription and batching
INSERT INTO kafka-topicA
SELECT *
FROM persistent://lenses/standalone/connect/kafka-topic
BATCH = 100
WITHSUBSCRIPTION=SHARED
-- default failover
INSERT INTO kafka-topicA
SELECT *
FROM persistent://lenses/standalone/connect/kafka-topic
WITHCONVERTER=`myclass`
Concepts
Subscription Type
The connector supports the following Pulsar subscription types set using the WITHSUBSCRIPTION clause:
- SHARED
- EXCLUSIVE
- FAILOVER (default)
Message Converters
The connector supports converters to handle different messages payload format in the source topic. See source record converters .
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=pulsar
docker-compose up -d pulsar
Inserting test data
Login into the pulsar container:
docker exec -ti pulsar /bin/bash
Run the following command to generate messages:
bin/pulsar-client produce \
persistent://lenses/standalone/connect/kafka-topic \
--messages 'hello-pulsar'
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select Pulsar as the source and paste the following:
name=pulsar
connector.class=com.datamountaineer.streamreactor.connect.pulsar.source.PulsarSourceConnector
tasks.max=1
connect.pulsar.kcql=INSERT INTO pulsar SELECT * FROM persistent://lenses/standalone/connect/kafka-topic BATCH = 10
connect.pulsar.hosts=pulsar://pulsar:6650
connect.progress.enabled=true
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create pulsar < connector.properties
connect-cli create pulsar < connector.properties
Wait a for the connector to start and check its running:
connect-cli status pulsar
Check for records in Kafka
Check the records in Lenses or with via the console:
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic pulsar \
--from-beginning
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.pulsar.hosts | Contains the Pulsar connection end points. | string | |
connect.pulsar.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.pulsar.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.pulsar.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.pulsar.converter.throw.on.error | If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false. | boolean | false |
connect.converter.avro.schemas | If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE | string | |
connect.pulsar.kcql | Contains the Kafka Connect Query Language describing the flow from Apache Pulsar to Apache Kafka topics | string | |
connect.pulsar.polling.timeout | Provides the timeout to poll incoming messages in milliseconds. Connect will write to Kafka is this reached of the connect.pulsar.batch.size. Which ever is first. | int | 1000 |
connect.pulsar.batch.size | The number of records Connect will wait before writing to Kafka. The connector will return if connect.pulsar.polling.timeout is reached first. | int | 100 |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.pulsar.tls.ca.cert | Provides the path to the CA certificate file to use with the Pulsar connection | string | |
connect.pulsar.tls.cert | Provides the path to the certificate file to use with the Pulsar connection | string | |
connect.pulsar.tls.key | Certificate private [config] key file path. | string |