Mqtt Source¶
Download connector MQTT Connector for Kafka 2.1.0
Blog: | MQTT. Kafka. InfluxDB. SQL. IoT Harmony. |
---|
A Connector to read events from MQTT and push them to Kafka. The connector subscribes to the specified topics and and streams the records to Kafka.
Prerequisites¶
- Apache Kafka 2.x or above
- Kafka Connect 2.x or above
- Mqtt server
- Java 1.8
Features¶
- Pluggable converters for MQTT payloads
- Out of the box converters for JSON/AVRO and Binary
- The KCQL routing querying - Topic to Topic mapping and Field selection
- Wildcard MQTT topic subscriptions
- Supports multi server MQTT connection
KCQL Support¶
INSERT INTO kafka_topic SELECT * FROM /mqtt_source_topic [WITHCONVERTER=`myclass`]
Tip
You can specify multiple KCQL statements separated by ;
to have a the connector sink multiple topics.
The MQTT source supports KCQL, Kafka Connect Query Language. The following support KCQL is available:
- Selection of MQTT source topics
- Selection of Kafka target topics
- Selection of MQTT message converters.
Example:
-- Insert mode, select all fields from topicA and write to topic kafkaTopic1 with converter myclass
INSERT INTO kafkaTopic1 SELECT * FROM /mqttTopicA [WITHCONVERTER=myclass]
-- wildcard
INSERT INTO kafkaTopic1 SELECT * FROM /mqttTopicA/+/sensors [WITHCONVERTER=`myclass`]
Note
Wildcard MQTT subscriptions are supported but require the same converter to be used for all.
Converters¶
We provide four converters out of the box but you can plug your own. See an example here. which
and be set in connect.jms.kcql
statement. The WITHCONVERTER
keyword supports this option.
AvroConverter
com.datamountaineer.streamreactor.connect.converters.source.AvroConverter
The payload is an AVRO message. In this case you need to provide a path for the AVRO schema file to be able to decode it.
JsonSimpleConverter
com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter
The payload is a JSON message. This converter will parse the JSON and create an AVRO record for it which will be sent over to Kafka.
JsonConverterWithSchemaEvolution
An experimental converter for converting JSON messages to AVRO. The resulting AVRO schema is fully compatible as new fields are added as the JSON payload evolves.
BytesConverter
com.datamountaineer.streamreactor.connect.converters.source.BytesConverter
This is the default implementation. The payload is taken as is: an array of bytes and sent over Kafka as an AVRO
record with Schema.BYTES
. You don’t have to provide a mapping for the source to get this converter!!
Payload key¶
In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins
without having to re-map the topic data the connector supports WITHKEY
in the KCQL syntax.
This how the Kafka message key content can be controlled. The general syntax is:
// `[` enclosed by `]` denotes optional values
WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']
Using the above, the resulting Kafka record key content will be the string concatenation for the values of the fields specified. Let’s consider field1=’value1’ field2.A=’value2’ and field3=’value3’, then the key content will be value1.value2.value3. Optionally the delimiter can be set via the KEYDELIMITER keyword.
INSERT INTO sensor_data
SELECT * FROM /sensor_data
WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`
WITHKEY(id)
Lenses QuickStart¶
The easiest way to try out this is using Lenses Box the pre-configured docker, that comes with this connector pre-installed. You would need to Connectors –> New Connector –> Source –> MQTT and paste your configuration
Mqtt Setup¶
For testing, we will use a simple application spinning up an MQTT server using Moquette. Download and unzip this.
Once you have unpacked the archiver you should start the server.
➜ bin/mqtt-server
You should see the following outcome:
log4j:WARN No appenders could be found for logger (io.moquette.server.Server).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Starting mqtt service on port 11883
Hit Enter to start publishing messages on topic: /mjson and /mavro.
The server has started but no records have been published yet. It will start publishing messages when you hit enter. More on this later once we start the source.
Installing the Connector¶
Connect, in production should be run in distributed mode
- Install and configure a Kafka Connect cluster
- Create a folder on each server called
plugins/lib
- Copy into the above folder the required connector jars from the stream reactor download
- Edit
connect-avro-distributed.properties
in theetc/schema-registry
folder and uncomment theplugin.path
option. Set it to the root directory i.e. plugins you deployed the stream reactor connector jars in step 2. - Start Connect,
bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
Connect Workers are long running processes so set an init.d
or systemctl
service accordingly.
Source Connector QuickStart¶
Start Kafka Connect in distributed mode (see install).
In this mode a Rest Endpoint on port 8083
is exposed to accept connector configurations.
We developed Command Line Interface to make interacting with the Connect Rest API easier. The CLI can be found in the Stream Reactor download under
the bin
folder. Alternatively the Jar can be pulled from our GitHub
releases page.
Starting the Connector¶
Download, and install Stream Reactor. Follow the instructions here if you haven’t already done so. All paths in the quickstart are based on the location you installed the Stream Reactor.
Once the Connect has started we can now use the kafka-connect-tools cli to post in our distributed properties file for MQTT. For the CLI to work including when using the dockers you will have to set the following environment variable to point the Kafka Connect Rest API.
export KAFKA_CONNECT_REST="http://myserver:myport"
➜ bin/connect-cli create mqtt-source < conf/source.kcql/mqtt-source.properties
name=mqtt-source
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.kcql=INSERT INTO kjson SELECT * FROM /mjson WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`
connect.mqtt.keep.alive=1000
connect.mqtt.client.id=dm_source_id
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://127.0.0.1:11883
connect.mqtt.service.quality=1
We can use the CLI to check if the connector is up but you should be able to see this in logs as well.
#check for running connectors with the CLI
➜ bin/connect-cli ps
mqtt-source
[2016-12-20 16:51:08,058] INFO
__ __
/ / ____ _____ ____/ /___ ____ ____
/ / / __ `/ __ \/ __ / __ \/ __ \/ __ \
/ /___/ /_/ / / / / /_/ / /_/ / /_/ / /_/ /
/_____/\__,_/_/ /_/\__,_/\____/\____/ .___/
/_/
| \/ | __ _| |_| |_ / ___| ___ _ _ _ __ ___ ___
| |\/| |/ _` | __| __| \___ \ / _ \| | | | '__/ __/ _ \
| | | | (_| | |_| |_ ___) | (_) | |_| | | | (_| __/
|_| |_|\__, |\__|\__| |____/ \___/ \__,_|_| \___\___| by Stefan Bocutiu
|_|
(com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceTask:37)
Test Records¶
Go to the mqtt-server application you downloaded and unzipped and execute:
./bin/mqtt-server
This will put the following records into the MQTT topic:
TemperatureMeasure(1, 31.1, "EMEA", System.currentTimeMillis())
TemperatureMeasure(2, 30.91, "EMEA", System.currentTimeMillis())
TemperatureMeasure(3, 30.991, "EMEA", System.currentTimeMillis())
TemperatureMeasure(4, 31.061, "EMEA", System.currentTimeMillis())
TemperatureMeasure(101, 27.001, "AMER", System.currentTimeMillis())
TemperatureMeasure(102, 38.001, "AMER", System.currentTimeMillis())
TemperatureMeasure(103, 26.991, "AMER", System.currentTimeMillis())
TemperatureMeasure(104, 34.17, "AMER", System.currentTimeMillis())
Check for records in Kafka¶
Check for records in Kafka with the console consumer. the topic for kjson (the MQTT payload was a JSON document and we translated that into a Kafka Connect Struct)
➜ bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic kjson --from-beginning
You should see the following output
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
{"deviceId":1,"value":31.1,"region":"EMEA","timestamp":1482236627236}
{"deviceId":2,"value":30.91,"region":"EMEA","timestamp":1482236627236}
{"deviceId":3,"value":30.991,"region":"EMEA","timestamp":1482236627236}
{"deviceId":4,"value":31.061,"region":"EMEA","timestamp":1482236627236}
{"deviceId":101,"value":27.001,"region":"AMER","timestamp":1482236627236}
{"deviceId":102,"value":38.001,"region":"AMER","timestamp":1482236627236}
{"deviceId":103,"value":26.991,"region":"AMER","timestamp":1482236627236}
{"deviceId":104,"value":34.17,"region":"AMER","timestamp":1482236627236}
Check for records in Kafka with the console consumer. the topic for kavro (the MQTT payload was an AVRO record and we translated that into a Kafka Connect Struct)
➜ bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic kavro --from-beginning
You should see the following output
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
{"deviceId":1,"value":31.1,"region":"EMEA","timestamp":1482236627236}
{"deviceId":2,"value":30.91,"region":"EMEA","timestamp":1482236627236}
{"deviceId":3,"value":30.991,"region":"EMEA","timestamp":1482236627236}
{"deviceId":4,"value":31.061,"region":"EMEA","timestamp":1482236627236}
{"deviceId":101,"value":27.001,"region":"AMER","timestamp":1482236627236}
{"deviceId":102,"value":38.001,"region":"AMER","timestamp":1482236627236}
{"deviceId":103,"value":26.991,"region":"AMER","timestamp":1482236627236}
{"deviceId":104,"value":34.17,"region":"AMER","timestamp":1482236627236}
Features¶
The Mqtt source allows you to plugin your own converter. Say you receive protobuf data, all you have to do is to write your own
very specific converter that knows how to convert from protobuf to SourceRecord. All you have to do is set the WITHCONVERTER=
in the KCQL statement.
Configurations¶
The Kafka Connect framework requires the following in addition to any connectors specific configurations:
Config | Description | Type | Value |
---|---|---|---|
name |
Name of the connector | string | |
tasks.max |
The number of tasks to scale output | int | 1 |
connector.class |
Name of the connector class | string | com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector |
Connector Configurations¶
Config | Description | Type |
---|---|---|
connect.mqtt.kcql |
Kafka connect query language expression. Allows for expressive MQTT topic to Kafka
topic routing. Currently, there is no support for filtering the fields from the incoming payload
|
string |
connect.mqtt.hosts |
Specifies the MQTT connection endpoints
Example
tcp://broker.datamountaineer.com:1883 |
string |
Optional Configurations¶
Config | Description | Type |
---|---|---|
connect.mqtt.service.quality |
The Quality of Service (QoS) level is an agreement between sender and receiver
of a message regarding the guarantees of delivering a message. There are 3 QoS
levels in MQTT: At most once (0); At least once (1); Exactly once (2). Default : 1
|
int |
connect.mqtt.username |
Contains the MQTT connection username | string |
connect.mqtt.password |
Contains the MQTT connection password | string |
connect.mqtt.client.id |
The client connection identifier. If it is not provided the framework will generate one | string |
connect.mqtt.timeout |
The timeout to wait for the broker connection to be established. Default : 3000 (ms) | int |
connect.mqtt.clean |
The clean session flag indicates the broker, whether the client wants to establish
a persistent session or not. A persistent session (the flag is false) means,
that the broker will store all subscriptions for the client and also all missed
messages, when subscribing with Quality of Service (QoS) 1 or 2. If clean session
is set to true, the broker won’t store anything for the client and will
also purge all information from a previous persistent session. Default : true
|
boolean |
connect.mqtt.keep.alive |
The keep-alive functionality assures that the connection is still open and both
broker and client are connected to one another. Therefore the client specifies
a time interval in seconds and communicates it to the broker during establishment
of the connection. The interval is the longest possible period of time, which
broker and the client can endure without sending a message. Default : 5000
|
int |
connect.mqtt.ssl.ca.cert |
Provides the path to the CA certificate file to use with the MQTT connection | string |
connect.mqtt.ssl.cert |
Provides the path to the certificate file to use with the MQTT connection | string |
connect.mqtt.ssl.key |
Certificate private key file path | string |
connect.mqtt.converter.throw.on.error |
If set to false the conversion exception will be swallowed and everything
carries on BUT the message is lost!!;
true will throw the exception. Default : false
|
boolean |
connect.converter.avro.schemas |
If the AvroConverter is used you need to provide an AVRO Schema to be
able to read and translate the raw bytes to an AVRO record.
The format is
$MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE |
boolean |
connect.progress.enabled |
Enables the output for how many records have been processed | boolean |
connect.mqtt.log.message |
Logs messages received from MQTT |
Example¶
name=mqtt-source
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.keep.alive=1000
connect.mqtt.client.id=dm_source_id,
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://127.0.0.1:11883
connect.mqtt.service.quality=1
connect.mqtt.kcql=INSERT INTO kjson SELECT * FROM /mjson WITHCONVERTER=`myclass`;INSERT INTO kavro SELECT * FROM /mavro WITHCONVERTER=`myclass`
connect.converter.avro.schemas=/mavro=$PATH_TO/temperaturemeasure.avro
Provide your own Converter¶
You can always provide your own logic for converting the raw MQTT message bytes to your AVRO record. If you have messages coming in Protobuf format you can deserialize the message based on the schema and create the AVRO record. All you have to do is create a new project and add our dependency:
Gradle:
compile "com.datamountaineer:kafka-connect-common:0.7.1"
Maven:
<dependency>
<groupId>com.datamountaineer</groupId>
<artifactId>kafka-connect-common</artifactId>
<version>0.7.1</version>
</dependency>
Then all you have to do is implement com.datamountaineer.streamreactor.connect.converters.source.Converter
.
Here is our BytesConverter class code:
class BytesConverter extends Converter {
override def convert(kafkaTopic: String, sourceTopic: String, messageId: String, bytes: Array[Byte]): SourceRecord = {
new SourceRecord(Collections.singletonMap(Converter.TopicKey, sourceTopic),
null,
kafkaTopic,
MsgKey.schema,
MsgKey.getStruct(sourceTopic, messageId),
Schema.BYTES_SCHEMA,
bytes)
}
}
All our implementation will send a MsgKey object as the Kafka message key. It contains the MQTT source topic and the MQTT message id
Kubernetes¶
Helm Charts are provided at our repo, add the repo to your Helm instance and install. We recommend using the Landscaper to manage Helm Values since typically each Connector instance has its own deployment.
Add the Helm charts to your Helm instance:
helm repo add landoop https://lensesio.github.io/kafka-helm-charts/
TroubleShooting¶
Please review the FAQs and join our slack channel