5.0
MQTT
Kafka Connect sink connector for writing data from Kafka to MQTT.
KCQL support
The following KCQL is supported:
INSERT
INTO <mqtt-topic>
SELECT FIELD, ...
FROM <kafka-topic>
Examples:
-- Insert into /landoop/demo all fields from kafka_topicA
INSERT INTO /landoop/demo SELECT * FROM kafka_topicA
-- Insert into /landoop/demo all fields from dynamic field
INSERT INTO `$field` SELECT * FROM control.boxes.test WITHTARGET = field
Concepts
Dynamic targets
The connector can dynamic write to MQTT topics determined by a field in the Kafka message value using the
WITHTARGET target clause and specifing $field
as the target in the KCQL statement.
Kafka payload support
This sink supports the following Kafka payloads:
- Schema.Struct and Struct (Avro)
- Schema.Struct and JSON
- No Schema and JSON
See connect payloads for more information.
Error polices
The connector supports Error polices .
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=mqtt
docker-compose up -d mqtt
Prepare the target system
Login into the MongoDB container and create the container
docker exec -ti mongo mongo
create a database by inserting a dummy record:
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select MQTT as the sink and paste the following:
name=mqtt
connector.class=com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector
tasks.max=1
topics=orders
connect.mqtt.hosts=tcp://mqtt:1883
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.keep.alive=1000
connect.mqtt.service.quality=1
connect.mqtt.client.id=dm_sink_id
connect.mqtt.kcql=INSERT INTO /lenses/orders SELECT * FROM orders
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create mqtt < connector.properties
connect-cli create mqtt < connector.properties
Wait a for the connector to start and check its running:
connect-cli status mqtt
Inserting test data
In the to fastdata container start the kafka producer shell:
kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}
Check for data in MQTT
docker exec -ti mqtt mosquitto_sub -t /lenses/orders
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.mqtt.hosts | Contains the MQTT connection end points. | string | |
connect.mqtt.username | Contains the Mqtt connection user name | string | |
connect.mqtt.password | Contains the Mqtt connection password | password | |
connect.mqtt.service.quality | Specifies the Mqtt quality of service | int | |
connect.mqtt.timeout | Provides the time interval to establish the mqtt connection | int | 3000 |
connect.mqtt.clean | connect.mqtt.clean | boolean | true |
connect.mqtt.keep.alive | The keep alive functionality assures that the connection is still open and both broker and client are connected to the broker during the establishment of the connection. The interval is the longest possible period of time, which broker and client can endure without sending a message. | int | 5000 |
connect.mqtt.client.id | Contains the Mqtt session client id | string | |
connect.mqtt.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.mqtt.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.mqtt.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.mqtt.retained.messages | Specifies the Mqtt retained flag. | boolean | false |
connect.mqtt.converter.throw.on.error | If set to false the conversion exception will be swallowed and everything carries on BUT the message is lost!!; true will throw the exception.Default is false. | boolean | false |
connect.converter.avro.schemas | If the AvroConverter is used you need to provide an avro Schema to be able to read and translate the raw bytes to an avro record. The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE in case of source converter, or $KAFKA_TOPIC=PATH_TO_AVRO_SCHEMA in case of sink converter | string | |
connect.mqtt.kcql | Contains the Kafka Connect Query Language describing the sourced MQTT source and the target Kafka topics | string | |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.mqtt.ssl.ca.cert | Provides the path to the CA certificate file to use with the Mqtt connection | string | |
connect.mqtt.ssl.cert | Provides the path to the certificate file to use with the Mqtt connection | string | |
connect.mqtt.ssl.key | Certificate private [config] key file path. | string |