5.0
JMS
Kafka Connect sink connector for writing data from Kafka to JMS.
JMS driver for your JMS distribution - The connector does not package an client jars
KCQL support
The following KCQL is supported:
INSERT INTO <jms-destination>
SELECT FIELD, ...
FROM <your-kafka-topic>
[WITHFORMAT AVRO|JSON|MAP|OBJECT]
WITHTYPE TOPIC|QUEUE
Examples:
-- Select all fields from topicA and write to jmsA queue
INSERT INTO jmsA SELECT * FROM topicA WITHTYPE QUEUE
-- Select 3 fields and rename from topicB and write
-- to jmsB topic as JSON in a TextMessage
INSERT INTO jmsB SELECT x AS a, y, z FROM topicB WITHFORMAT JSON WITHTYPE TOPIC
Concepts
JMS Topics and Queues
The sink can write to either topics or queues, specified by the WITHTYPE clause.
JMS Payload
When a message is sent to a JMS target it can be one of the following:
- JSON - Send a TextMessage
- AVRO - Send a BytesMessage
- MAP - Send a MapMessage
- OBJECT - Send an ObjectMessage
This is set by the WITHFORMAT keyword.
Kafka payload support
This sink supports the following Kafka payloads:
- Schema.Struct and Struct (Avro)
- Schema.Struct and JSON
- No Schema and JSON
See connect payloads for more information.
Error polices
The connector supports Error polices .
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=jms
docker-compose up -d activemq
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select JMS as the sink and paste the following:
name=jms
connector.class=com.datamountaineer.streamreactor.connect.jms.sink.JMSSinkConnector
tasks.max=1
topics=orders
connect.jms.url=tcp://activemq:61616
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.connection.factory=ConnectionFactory
connect.jms.kcql=INSERT INTO orders SELECT * FROM orders WITHTYPE QUEUE WITHFORMAT JSON
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create jms-sink < connector.properties
connect-cli create jms-sink < connector.properties
Wait a for the connector to start and check its running:
connect-cli status jms
Inserting test data
In the to fastdata container start the kafka producer shell:
kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}
Check for data in JMS
docker exec -ti activemq bin/activemq consumer --destination queue://orders
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.jms.url | Provides the JMS broker url | string | |
connect.jms.initial.context.factory | Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory | string | |
connect.jms.connection.factory | Provides the full class name for the ConnectionFactory compile to use, e.gorg.apache.activemq.ActiveMQConnectionFactory | string | ConnectionFactory |
connect.jms.kcql | connect.jms.kcql | string | |
connect.jms.subscription.name | subscription name to use when subscribing to a topic, specifying this makes a durable subscription for topics | string | |
connect.jms.password | Provides the password for the JMS connection | password | |
connect.jms.username | Provides the user for the JMS connection | string | |
connect.jms.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.jms.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.jms.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.jms.destination.selector | Selector to use for destination lookup. Either CDI or JNDI. | string | CDI |
connect.jms.initial.context.extra.params | List (comma separated) of extra properties as key/value pairs with a colon delimiter to supply to the initial context e.g. SOLACE_JMS_VPN:my_solace_vp | list | [] |
connect.jms.batch.size | The number of records to poll for on the target JMS destination in each Connect poll. | int | 100 |
connect.jms.polling.timeout | Provides the timeout to poll incoming messages | long | 1000 |
connect.jms.source.default.converter | Contains a canonical class name for the default converter of a raw JMS message bytes to a SourceRecord. Overrides to the default can be done by using connect.jms.source.converters still. i.e. com.datamountaineer.streamreactor.connect.source.converters.AvroConverter | string | |
connect.jms.converter.throw.on.error | If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false. | boolean | false |
connect.converter.avro.schemas | If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE | string | |
connect.jms.headers | Contains collection of static JMS headers included in every SinkRecord The format is connect.jms.headers="$MQTT_TOPIC=rmq.jms.message.type:TextMessage,rmq.jms.message.priority:2;$MQTT_TOPIC2=rmq.jms.message.type:JSONMessage" | string | |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.jms.evict.interval.minutes | Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged. | int | 10 |
connect.jms.evict.threshold.minutes | The number of minutes after which an uncommitted entry becomes evictable from the connector cache. | int | 10 |
connect.jms.scale.type | How the connector tasks parallelization is decided. Available values are kcql and default. If kcql is provided it will be based on the number of KCQL statements written; otherwise it will be driven based on the connector tasks.max | string | kcql |