Kafka Connect sink connector for writing data from Kafka to MQTT.
The following KCQL is supported:
INSERT INTO <mqtt-topic> SELECT FIELD, ... FROM <kafka-topic>
Examples:
-- Insert into /landoop/demo all fields from kafka_topicA INSERT INTO /landoop/demo SELECT * FROM kafka_topicA -- Insert into /landoop/demo all fields from dynamic field INSERT INTO `$field` SELECT * FROM control.boxes.test WITHTARGET = field
The connector can dynamic write to MQTT topics determined by a field in the Kafka message value using the WITHTARGET target clause and specifing $field as the target in the KCQL statement.
$field
This sink supports the following Kafka payloads:
See connect payloads for more information.
The connector supports Error polices.
export CONNECTOR=mqtt docker-compose up -d mqtt
Login into the MongoDB container and create the container
docker exec -ti mongo mongo
create a database by inserting a dummy record:
If you are using Lenses, login into Lenses and navigate to the connectors page, select MQTT as the sink and paste the following:
name=mqtt connector.class=com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector tasks.max=1 topics=orders connect.mqtt.hosts=tcp://mqtt:1883 connect.mqtt.clean=true connect.mqtt.timeout=1000 connect.mqtt.keep.alive=1000 connect.mqtt.service.quality=1 connect.mqtt.client.id=dm_sink_id connect.mqtt.kcql=INSERT INTO /lenses/orders SELECT * FROM orders
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create mqtt < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status mqtt
In the lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{ "id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty": 100 }
docker exec -ti mqtt mosquitto_sub -t /lenses/orders
Bring down the stack:
docker-compose down
On this page