JMS Source¶
Download connector JMS Connector for Kafka 2.1.0
The JMS Source connector allows subscribe to messages on JMS queues and topics and write them to a Kafka topic. Each JMS message is committed only when it has been written to Kafka. If a failure happens when writing to Kafka, i.e. the message is too large, then that JMS message will not be acknowledged. It will stay in the queue so it can be actioned upon.
Prerequisites¶
- Apache Kafka 2.x or above
- Kafka Connect 2.x or above
- A JMS framework (ActiveMQ for example)
- Java 1.8
Features¶
- Pluggable converters of JMS payloads. If no converters are specified a AVRO message is created representing the JMS Message, the payload from the message is stored as a byte array in the payload field of the AVRO.
- Out of the box converters for JSON/AVRO and Binary
- The KCQL routing querying - JMS Destination to Kafka topic mapping
- Extra connection properties for specialized connections such as SOLACE_VPN.
KCQL Support¶
INSERT INTO kafka_topic SELECT * FROM jms_destination WITHTYPE { TOPIC|QUEUE } [WITHCONVERTER=`myclass`]
Tip
You can specify multiple KCQL statements separated by ;
to have a the connector sink multiple topics.
The JMS source supports KCQL, Kafka Connect Query Language. The following support KCQL is available:
- Selection of JMS topic or queue
- Selecting converter types for JMS message payloads.
Example:
-- Select from a JMS queue and write to a Kafka topic
INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE QUEUE
-- Select from a JMS topic and write to a Kafka topic with a json converter
INSERT INTO topicA SELECT * FROM jms_queue WITHTYPE TOPIC WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.AvroConverter`
JMS Destination Type¶
JMS destination types can be topics or queues, the type can be set via the WITHTYPE
keyword.
Converters¶
We provide four converters out of the box but you can plug your own. See an example here. which
and be set in connect.jms.kcql
statement. The WITHCONVERTER
keyword supports this option.
AvroConverter
com.datamountaineer.streamreactor.connect.converters.source.AvroConverter
The payload is an AVRO message. In this case you need to provide a path for the AVRO schema file to be able to decode it.
JsonSimpleConverter
com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter
The payload is a JSON message. This converter will parse the JSON and create an AVRO record for it which will be sent over to Kafka.
JsonConverterWithSchemaEvolution
An experimental converter for translating JSON messages to AVRO. The resulting AVRO schema is fully compatible as new fields are added as the JSON payload evolves.
BytesConverter
com.datamountaineer.streamreactor.connect.converters.source.BytesConverter
This is the default implementation. The payload is taken as is: an array of bytes and sent over Kafka as an AVRO
record with Schema.BYTES
. You don’t have to provide a mapping for the source to get this converter!!
Lenses QuickStart¶
The easiest way to try out this is using Lenses Box the pre-configured docker, that comes with this connector pre-installed. You would need to Connectors –> New Connector –> Source –> JMS and paste your configuration
ActiveMQ Setup¶
For ActiveMQ follow http://activemq.apache.org/getting-started.html for the instruction of setting
it up. You will also need a client jar, add this to the plugin.path
of each connector worker.
Installing the Connector¶
Connect, in production should be run in distributed mode
- Install and configure a Kafka Connect cluster
- Create a folder on each server called
plugins/lib
- Copy into the above folder the required connector jars from the stream reactor download
- Edit
connect-avro-distributed.properties
in theetc/schema-registry
folder and uncomment theplugin.path
option. Set it to the root directory i.e. plugins you deployed the stream reactor connector jars in step 2. - Start Connect,
bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
Connect Workers are long running processes so set an init.d
or systemctl
service accordingly.
Source Connector QuickStart¶
Start Kafka Connect in distributed mode (see install).
In this mode a Rest Endpoint on port 8083
is exposed to accept connector configurations.
We developed Command Line Interface to make interacting with the Connect Rest API easier. The CLI can be found in the Stream Reactor download under
the bin
folder. Alternatively the Jar can be pulled from our GitHub
releases page.
Starting the Connector (Distributed)¶
Download, and install Stream Reactor. Follow the instructions here if you haven’t already done so. All paths in the quickstart are based on the location you installed the Stream Reactor.
Once the Connect has started we can now use the kafka-connect-tools cli to post in our distributed properties file for JMS. For the CLI to work including when using the dockers you will have to set the following environment variable to point the Kafka Connect Rest API.
export KAFKA_CONNECT_REST="http://myserver:myport"
➜ bin/connect-cli create jms-sink < conf/jms-source.properties
name=jms-source
connector.class=com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector
tasks.max=1
connect.jms.kcql=INSERT INTO topic SELECT * FROM jms-queue
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.url=tcp://localhost:61616
connect.jms.connection.factory=ConnectionFactory
We can use the CLI to check if the connector is up but you should be able to see this in logs as well.
#check for running connectors with the CLI
➜ bin/connect-cli ps
jms-source
name=jms-source
connector.class=com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector
tasks.max=1
connect.jms.kcql=INSERT INTO topic SELECT * FROM jms-queue
connect.jms.initial.context.factory=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connect.jms.url=tcp://localhost:61616
connect.jms.connection.factory=ConnectionFactory
INFO
__ __
/ / ____ _____ ____/ /___ ____ ____
/ / / __ `/ __ \/ __ / __ \/ __ \/ __ \
/ /___/ /_/ / / / / /_/ / /_/ / /_/ / /_/ /
/_____/\__,_/_/ /_/\__,_/\____/\____/ .___/
/_/
____ _____________
/ / |/ / ___/ ___/____ __ _______________
/ / /|_/ /\__ \\__ \/ __ \/ / / / ___/ ___/ _ \ By Andrew Stevenson
/ /_/ / / / /___/ /__/ / /_/ / /_/ / / / /__/ __/
\____/_/ /_//____/____/\____/\__,_/_/ \___/\___/
(com.datamountaineer.streamreactor.connect.jms.source.JMSSourceTask:22)
Test Records¶
Now we need to send some records to the ActiveMQ broker for the Source Connector to pick up. We can do this with the ActiveMQ command line producer. In the bin folder of the Active MQ location run the following to insert 1000 messages into a queue called jms-queue.
activemq producer --destination queue://jms-queue --message "hello DataMountaineer"
We should immediately see the records coming through the sink and into our Kafka topic:
bin/kafka-avro-console-consumer \
--zookeeper localhost:2181 \
--topic topic \
--from-beginning
{"message_timestamp":{"long":1490799748984},"correlation_id":null,"redelivered":{"boolean":false},"reply_to":null,"destination":{"string":"queue://jms-queue"},"message_id":{"string":"ID:Andrews-MacBook-Pro.local-49870-1490799747943-1:1:1:1:997"},"mode":{"int":2},"type":null,"priority":{"int":4},"bytes_payload":{"bytes":"hello"},"properties":null}
{"message_timestamp":{"long":1490799748985},"correlation_id":null,"redelivered":{"boolean":false},"reply_to":null,"destination":{"string":"queue://jms-queue"},"message_id":{"string":"ID:Andrews-MacBook-Pro.local-49870-1490799747943-1:1:1:1:998"},"mode":{"int":2},"type":null,"priority":{"int":4},"bytes_payload":{"bytes":"hello"},"properties":null}
{"message_timestamp":{"long":1490799748986},"correlation_id":null,"redelivered":{"boolean":false},"reply_to":null,"destination":{"string":"queue://jms-queue"},"message_id":{"string":"ID:Andrews-MacBook-Pro.local-49870-1490799747943-1:1:1:1:999"},"mode":{"int":2},"type":null,"priority":{"int":4},"bytes_payload":{"bytes":"hello"},"properties":null}
{"message_timestamp":{"long":1490799748987},"correlation_id":null,"redelivered":{"boolean":false},"reply_to":null,"destination":{"string":"queue://jms-queue"},"message_id":{"string":"ID:Andrews-MacBook-Pro.local-49870-1490799747943-1:1:1:1:1000"},"mode":{"int":2},"type":null,"priority":{"int":4},"bytes_payload":{"bytes":"hello"},"properties":null}
Configurations¶
The Kafka Connect framework requires the following in addition to any connectors specific configurations:
Config | Description | Type | Value |
---|---|---|---|
name |
Name of the connector | string | |
tasks.max |
The number of tasks to scale output | int | 1 |
connector.class |
Name of the connector class | string | com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector |
Connector Configurations¶
Config | Description | Type |
---|---|---|
connect.jms.url |
The JMS broker url | string |
connect.jms.username |
The user for the JMS connection | string |
connect.jms.password |
The password for the JMS connection | string |
connect.jms.initial.context.factory |
Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory |
string |
connect.jms.connection.factory |
The ConnectionFactory implementation to use | string |
connect.jms.destination.selector |
Selector to use for destination lookup. Either CDI or JNDI (default) | string |
connect.jms.kcql |
KCQL expression describing field selection and routes. The kcql expression also handles
setting the JMS destination type, i.e. TOPIC or QUEUE via the
WITHTYPE keywordand additionally the converter via the
WITHCONVERTER keyword. If no converter isspecified the sink will default to the BytesConverter. This will send an avro message
over Kafka using Schema.BYTES
|
string |
Optional configurations
Config | Description | Type |
---|---|---|
connect.jms.initial.context.extra.params |
List (comma separated) of extra properties as key=value pairs
to supply to the initial context e.g. SOLACE_JMS_VPN=my_solace_vp
|
string |
connect.converter.avro.schemas |
If the AvroConverter is used you need to provide an avro Schema to be able to read
and translate the raw bytes to an avro record.
The format is $JMS_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE
|
string |
connect.jms.batch.size |
The batch size to take from the JMS destination on each poll of Kafka Connect. Default is 100 | int |
connect.progress.enabled |
Enables the output for how many records have been processed | boolean |
connect.jms.evict.interval.minutes |
Removes the uncommitted messages from the internal cache. Each JMS message is linked to the Kafka record to be published. Failure to publish a record to Kafka will mean the JMS message will not be acknowledged. | int |
connect.jms.evict.threshold.minutes |
The number of minutes after which an uncommitted entry becomes evictable from the connector cache. | int |
connect.jms.scale.type |
Controls the scale type for the connector. If ‘kcql’ it is used it will scale based on the KCQL statements, otherwise it will respect the ‘tasks.max’ value. | string |
Provide your own Converter¶
You can always provide your own logic for converting the JMS message to your an AVRO record. If you have messages coming in Protobuf format you can deserialize the message based on the schema and create the AVRO record. All you have to do is create a new project and add our dependency:
Gradle:
compile "com.datamountaineer:kafka-connect-common:0.7.1"
Maven:
<dependency>
<groupId>com.datamountaineer</groupId>
<artifactId>kafka-connect-common</artifactId>
<version>0.7.1</version>
</dependency>
Then all you have to do is implement com.datamountaineer.streamreactor.connect.converters.source.Converter
.
Here is our BytesConverter class code:
class BytesConverter extends Converter {
override def convert(kafkaTopic: String, sourceTopic: String, messageId: String, bytes: Array[Byte]): SourceRecord = {
new SourceRecord(Collections.singletonMap(Converter.TopicKey, sourceTopic),
null,
kafkaTopic,
MsgKey.schema,
MsgKey.getStruct(sourceTopic, messageId),
Schema.BYTES_SCHEMA,
bytes)
}
}
Schema Evolution¶
Not applicable.
Kubernetes¶
Helm Charts are provided at our repo, add the repo to your Helm instance and install. We recommend using the Landscaper to manage Helm Values since typically each Connector instance has its own deployment.
Add the Helm charts to your Helm instance:
helm repo add landoop https://lensesio.github.io/kafka-helm-charts/
TroubleShooting¶
Please review the FAQs and join our slack channel