Kafka Connect sink connector for writing data from Kafka to Pulsar.
The following KCQL is supported:
INSERT INTO <pulsar-topic> SELECT FIELD, ... FROM <kafka-topic> [WITHKEY(FIELD, ...)] [BATCH=100]
Examples:
# Select all fields INSERT INTO persistent://lenses/standalone/connect/kafka-topic SELECT * FROM kafka_topic # Select individual fields INSERT INTO persistent://lenses/standalone/connect/kafka-topic SELECT id, product_name FROM kafka_topic
The connector writes JSON messages to Pulsar topics.
The connector key messages in Pulsar defined by value in the Kafka message using the WITHKEY clause.
The connector supports compression on Pulsar messages using the WITHCOMPRESSION clause. The available values are:
The BATCH clause controls the batching of writes to Pulsar.
The connector supports partitioning in Puslar using the WITHPARTITIONER clause. The available values are:
This sink supports the following Kafka payloads:
See connect payloads for more information.
The connector supports Error polices.
export CONNECTOR=pulsar docker-compose up -d pulsar
In the Pulsar contain start the consumer and wait for messages to arrive:
docker exec \ -ti pulsar \ bin/pulsar-client \ consume \ persistent://lenses/standalone/connect/orders \ --subscription-name lenses
If you are using Lenses, login into Lenses and navigate to the connectors page, select Pulsar as the sink and paste the following:
name=pulsar connector.class=com.datamountaineer.streamreactor.connect.pulsar.sink.PulsarSinkConnector tasks.max=1 topics=orders connect.pulsar.kcql=INSERT INTO persistent://lenses/standalone/connect/orders SELECT * FROM orders connect.pulsar.hosts=pulsar://pulsar:6650
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create pulsar < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status pulsar
In the lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{ "id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty": 100 }
In your Pulsar container console the data will arrive.
Bring down the stack:
docker-compose down
On this page