A Kafka Connect JMS source connector to subscribe to messages on JMS queues and topics and write them to a Kafka topic.
The following KCQL is supported:
INSERT INTO kafka_topic SELECT * FROM jms_destination WITHTYPE [TOPIC|QUEUE] [WITHCONVERTER=`myclass`]
Selection of fields from the JMS message is not supported.
Examples:
-- Select from a JMS queue and write to a Kafka topic INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE QUEUE -- Select from a JMS topic and write to a Kafka topic with a json converter INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE TOPIC WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.AvroConverter`
The connector uses the standard JMS proctocols and has been tested against ActiveMQ.
The connector allows for the JMS inital.context.factory and connection.factory to be set according to your JMS provider. The appropriate implementation jars must be added to the CLASSPATH of the connect workers or placed in the plugin.path of the connector.
Each JMS message is committed only when it has been written to Kafka. If a failure happens when writing to Kafka, i.e. the message is too large, then that JMS message will not be acknowledged. It will stay in the queue so it can be actioned upon.
The schema of the messages is fixed and can found Data Types unless a converter is used.
The connector support both TOPICS and QUEUES, controlled by the WITHTYPE KCQL clause.
The connector supports converters to handle different messages payload format in the source topic or queue. See source record converters.
If no converter is provide the JMS message is converter to a Kafka Struct representation. See Data Types.
export CONNECTOR=jms docker-compose up -d activemq
Login into the activemq container:
docker exec -ti activemq /bin/bash
Run the following command to generate messages:
bin/activemq producer --destination queue://jms-queue --message "hello Lenses!"
If you are using Lenses, login into Lenses and navigate to the connectors page, select JMS as the source and paste the following:
name=jms-source connector.class=com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector tasks.max=1 connect.jms.kcql=INSERT INTO jms SELECT * FROM jms-queue WITHTYPE QUEUE connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory connect.jms.url=tcp://activemq:61616 connect.jms.connection.factory=ConnectionFactory
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create jms < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status jms
Check the records in Lenses or with via the console:
kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --topic jms \ --from-beginning
Bring down the stack:
docker-compose down
On this page