Kafka Connect sink connector for writing data from Kafka to JMS.
JMS driver for your JMS distribution - The connector does not package an client jars
The following KCQL is supported:
INSERT INTO <jms-destination> SELECT FIELD, ... FROM <your-kafka-topic> [WITHFORMAT AVRO|JSON|MAP|OBJECT] WITHTYPE TOPIC|QUEUE
Examples:
-- Select all fields from topicA and write to jmsA queue INSERT INTO jmsA SELECT * FROM topicA WITHTYPE QUEUE -- Select 3 fields and rename from topicB and write -- to jmsB topic as JSON in a TextMessage INSERT INTO jmsB SELECT x AS a, y, z FROM topicB WITHFORMAT JSON WITHTYPE TOPIC
The sink can write to either topics or queues, specified by the WITHTYPE clause.
When a message is sent to a JMS target it can be one of the following:
This is set by the WITHFORMAT keyword.
This sink supports the following Kafka payloads:
See connect payloads for more information.
The connector supports Error polices.
export CONNECTOR=jms docker-compose up -d activemq
If you are using Lenses, login into Lenses and navigate to the connectors page, select JMS as the sink and paste the following:
name=jms connector.class=com.datamountaineer.streamreactor.connect.jms.sink.JMSSinkConnector tasks.max=1 topics=orders connect.jms.url=tcp://activemq:61616 connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory connect.jms.connection.factory=ConnectionFactory connect.jms.kcql=INSERT INTO orders SELECT * FROM orders WITHTYPE QUEUE WITHFORMAT JSON
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create jms-sink < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status jms
In the lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{ "id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty": 100 }
docker exec -ti activemq bin/activemq consumer --destination queue://orders
Bring down the stack:
docker-compose down
On this page