A Kafka Connect source connector to read events from MQTT and push them to Kafka.
The following KCQL is supported:
INSERT INTO <your-kafka-topic> SELECT * FROM <your-mqtt-topic> [WITHCONVERTER=`myclass`]
Selection of fields from the JMS message is not supported.
Examples:
-- Insert mode, select all fields from topicA -- and write to topic topic with converter myclass INSERT INTO topic SELECT * FROM /mqttTopicA [WITHCONVERTER=myclass] -- wildcard INSERT INTO topic SELECT * FROM /mqttTopicA/+/sensors [WITHCONVERTER=`myclass`]
In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.
Multiple keys fields are supported using a delimiter:
// `[` enclosed by `]` denotes optional values WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']
The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.
The connector supports both wildcard and shared subscriptions but the KCQL command must be placed inside single quotes.
-- wildcard INSERT INTO kafkaTopic1 SELECT * FROM /mqttTopicA/+/sensors WITHCONVERTER=`myclass`
The connector supports converters to handle different messages payload format in the source topic. See source record converters.
export CONNECTOR=mqtt docker-compose up -d mqtt
Login into the mqtt container:
docker exec \ -ti mqtt \ mosquitto_pub \ -m "{\"deviceId\":1,\"value\":31.1,\"region\":\"EMEA\",\"timestamp\":1482236627236}" \ -d -r -t /mjson
If you are using Lenses, login into Lenses and navigate to the connectors page, select MQTT as the source and paste the following:
name=mqtt-source connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector tasks.max=1 connect.mqtt.kcql=INSERT INTO mqtt SELECT * FROM /mjson WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` connect.mqtt.client.id=dm_source_id connect.mqtt.hosts=tcp://mqtt:1883 connect.mqtt.service.quality=1
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/sh
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create mqtt < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status mqtt
Check the records in Lenses or with via the console:
kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --topic mqtt \ --from-beginning
Bring down the stack:
docker-compose down
On this page