Mqtt Source

Download connector MQTT Connector for Kafka 1.1 MQTT Connector for Kafka 1.0

Blog:MQTT. Kafka. InfluxDB. SQL. IoT Harmony.

A Connector to read events from MQTT and push them to Kafka. The connector subscribes to the specified topics and and streams the records to Kafka.

Prerequisites

  • Apache Kafka 0.11.x or above
  • Kafka Connect 0.11.x or above
  • Mqtt server
  • Java 1.8

Features

  1. Pluggable converters for MQTT payloads
  2. Out of the box converters for JSON/AVRO and Binary
  3. The KCQL routing querying - Topic to Topic mapping and Field selection
  4. Wildcard MQTT topic subscriptions

KCQL Support

INSERT INTO kafka_topic SELECT * FROM /mqtt_source_topic [WITHCONVERTER=`myclass`]

Tip

You can specify multiple KCQL statements separated by ; to have a the connector sink multiple topics.

The MQTT source supports KCQL, Kafka Connect Query Language. The following support KCQL is available:

  1. Selection of MQTT source topics
  2. Selection of Kafka target topics
  3. Selection of MQTT message converters.

Example:

-- Insert mode, select all fields from topicA and write to topic kafkaTopic1 with converter myclass
INSERT INTO kafkaTopic1 SELECT * FROM /mqttTopicA [WITHCONVERTER=myclass]

-- wildcard
INSERT INTO kafkaTopic1 SELECT * FROM /mqttTopicA/+/sensors [WITHCONVERTER=`myclass`]

Note

Wildcard MQTT subscriptions are supported but require the same converter to be used for all.

Converters

We provide four converters out of the box but you can plug your own. See an example here. which and be set in connect.jms.kcql statement. The WITHCONVERTER keyword supports this option.

AvroConverter

com.datamountaineer.streamreactor.connect.converters.source.AvroConverter

The payload is an AVRO message. In this case you need to provide a path for the AVRO schema file to be able to decode it.

JsonSimpleConverter

com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter

The payload is a JSON message. This converter will parse the JSON and create an AVRO record for it which will be sent over to Kafka.

JsonConverterWithSchemaEvolution

An experimental converter for converting JSON messages to AVRO. The resulting AVRO schema is fully compatible as new fields are added as the JSON payload evolves.

BytesConverter

com.datamountaineer.streamreactor.connect.converters.source.BytesConverter

This is the default implementation. The payload is taken as is: an array of bytes and sent over Kafka as an AVRO record with Schema.BYTES. You don’t have to provide a mapping for the source to get this converter!!

Payload key

In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.

This how the Kafka message key content can be controlled. The general syntax is:

// `[` enclosed by `]` denotes optional values
WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']

Using the above, the resulting Kafka record key content will be the string concatenation for the values of the fields specified. Let’s consider field1=’value1’ field2.A=’value2’ and field3=’value3’, then the key content will be value1.value2.value3. Optionally the delimiter can be set via the KEYDELIMITER keyword.

INSERT INTO sensor_data
SELECT * FROM /sensor_data
WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`
WITHKEY(id)

Lenses QuickStart

The easiest way to try out this is using Lenses Box the pre-configured docker, that comes with this connector pre-installed. You would need to Connectors –> New Connector –> Source –> MQTT and paste your configuration

../../_images/lenses-create-mqtt-source-connector.png

Mqtt Setup

For testing, we will use a simple application spinning up an MQTT server using Moquette. Download and unzip this.

Once you have unpacked the archiver you should start the server.

➜  bin/mqtt-server

You should see the following outcome:

log4j:WARN No appenders could be found for logger (io.moquette.server.Server).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Starting mqtt service on port 11883
Hit Enter to start publishing messages on topic: /mjson and /mavro.

The server has started but no records have been published yet. More on this later once we start the source.

Installing the Connector

Connect, in production should be run in distributed mode

  1. Install and configure a Kafka Connect cluster
  2. Create a folder on each server called plugins/lib
  3. Copy into the above folder the required connector jars from the stream reactor download
  4. Edit connect-avro-distributed.properties in the etc/schema-registry folder and uncomment the plugin.path option. Set it to the root directory i.e. plugins you deployed the stream reactor connector jars in step 2.
  5. Start Connect, bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties

Connect Workers are long running processes so set an init.d or systemctl service accordingly.

Source Connector QuickStart

Start Kafka Connect in distributed mode (see install). In this mode a Rest Endpoint on port 8083 is exposed to accept connector configurations. We developed Command Line Interface to make interacting with the Connect Rest API easier. The CLI can be found in the Stream Reactor download under the bin folder. Alternatively the Jar can be pulled from our GitHub releases page.

Starting the Connector

Download, and install Stream Reactor. Follow the instructions here if you haven’t already done so. All paths in the quickstart are based on the location you installed the Stream Reactor.

Once the Connect has started we can now use the kafka-connect-tools cli to post in our distributed properties file for MQTT. For the CLI to work including when using the dockers you will have to set the following environment variable to point the Kafka Connect Rest API.

export KAFKA_CONNECT_REST="http://myserver:myport"
➜  bin/connect-cli create mqtt-source < conf/source.kcql/mqtt-source.properties

name=mqtt-source
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.kcql=INSERT INTO kjson SELECT * FROM /mjson WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`
connect.mqtt.keep.alive=1000
connect.mqtt.client.id=dm_source_id
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://127.0.0.1:11883
connect.mqtt.service.quality=1

We can use the CLI to check if the connector is up but you should be able to see this in logs as well.

#check for running connectors with the CLI
➜ bin/connect-cli ps
mqtt-source
[2016-12-20 16:51:08,058] INFO
    __                    __
   / /   ____ _____  ____/ /___  ____  ____
  / /   / __ `/ __ \/ __  / __ \/ __ \/ __ \
 / /___/ /_/ / / / / /_/ / /_/ / /_/ / /_/ /
/_____/\__,_/_/ /_/\__,_/\____/\____/ .___/
                                   /_/
|  \/  | __ _| |_| |_  / ___|  ___  _   _ _ __ ___ ___
| |\/| |/ _` | __| __| \___ \ / _ \| | | | '__/ __/ _ \
| |  | | (_| | |_| |_   ___) | (_) | |_| | | | (_|  __/
|_|  |_|\__, |\__|\__| |____/ \___/ \__,_|_|  \___\___| by Stefan Bocutiu
           |_|
 (com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceTask:37)

Test Records

Go to the mqtt-server application you downloaded and unzipped and execute:

./bin/mqtt-server

This will put the following records into the MQTT topic:

TemperatureMeasure(1, 31.1, "EMEA", System.currentTimeMillis())
TemperatureMeasure(2, 30.91, "EMEA", System.currentTimeMillis())
TemperatureMeasure(3, 30.991, "EMEA", System.currentTimeMillis())
TemperatureMeasure(4, 31.061, "EMEA", System.currentTimeMillis())
TemperatureMeasure(101, 27.001, "AMER", System.currentTimeMillis())
TemperatureMeasure(102, 38.001, "AMER", System.currentTimeMillis())
TemperatureMeasure(103, 26.991, "AMER", System.currentTimeMillis())
TemperatureMeasure(104, 34.17, "AMER", System.currentTimeMillis())

Check for records in Kafka

Check for records in Kafka with the console consumer. the topic for kjson (the MQTT payload was a JSON document and we translated that into a Kafka Connect Struct)

➜  bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic kjson --from-beginning

You should see the following output

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
{"deviceId":1,"value":31.1,"region":"EMEA","timestamp":1482236627236}
{"deviceId":2,"value":30.91,"region":"EMEA","timestamp":1482236627236}
{"deviceId":3,"value":30.991,"region":"EMEA","timestamp":1482236627236}
{"deviceId":4,"value":31.061,"region":"EMEA","timestamp":1482236627236}
{"deviceId":101,"value":27.001,"region":"AMER","timestamp":1482236627236}
{"deviceId":102,"value":38.001,"region":"AMER","timestamp":1482236627236}
{"deviceId":103,"value":26.991,"region":"AMER","timestamp":1482236627236}
{"deviceId":104,"value":34.17,"region":"AMER","timestamp":1482236627236}

Check for records in Kafka with the console consumer. the topic for kavro (the MQTT payload was an AVRO record and we translated that into a Kafka Connect Struct)

➜  bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic kavro --from-beginning

You should see the following output

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
{"deviceId":1,"value":31.1,"region":"EMEA","timestamp":1482236627236}
{"deviceId":2,"value":30.91,"region":"EMEA","timestamp":1482236627236}
{"deviceId":3,"value":30.991,"region":"EMEA","timestamp":1482236627236}
{"deviceId":4,"value":31.061,"region":"EMEA","timestamp":1482236627236}
{"deviceId":101,"value":27.001,"region":"AMER","timestamp":1482236627236}
{"deviceId":102,"value":38.001,"region":"AMER","timestamp":1482236627236}
{"deviceId":103,"value":26.991,"region":"AMER","timestamp":1482236627236}
{"deviceId":104,"value":34.17,"region":"AMER","timestamp":1482236627236}

Features

The Mqtt source allows you to plugin your own converter. Say you receive protobuf data, all you have to do is to write your own very specific converter that knows how to convert from protobuf to SourceRecord. All you have to do is set the WITHCONVERTER= in the KCQL statement.

Configurations

The Kafka Connect framework requires the following in addition to any connectors specific configurations:

Config Description Type Value
name Name of the connector string  
tasks.max The number of tasks to scale output int 1
connector.class Name of the connector class string com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector

Connector Configurations

Config Description Type
connect.mqtt.kcql
Kafka connect query language expression. Allows for expressive MQTT topic to Kafka
topic routing. Currently, there is no support for filtering the fields from the incoming payload
string
connect.mqtt.hosts
Specifies the MQTT connection endpoints
Example tcp://broker.datamountaineer.com:1883
string

Optional Configurations

Config Description Type
connect.mqtt.service.quality
The Quality of Service (QoS) level is an agreement between sender and receiver
of a message regarding the guarantees of delivering a message. There are 3 QoS
levels in MQTT: At most once (0); At least once (1); Exactly once (2). Default : 1
int
connect.mqtt.username Contains the MQTT connection username string
connect.mqtt.password Contains the MQTT connection password string
connect.mqtt.client.id The client connection identifier. If it is not provided the framework will generate one string
connect.mqtt.timeout The timeout to wait for the broker connection to be established. Default : 3000 (ms) int
connect.mqtt.clean
The clean session flag indicates the broker, whether the client wants to establish
a persistent session or not. A persistent session (the flag is false) means,
that the broker will store all subscriptions for the client and also all missed
messages, when subscribing with Quality of Service (QoS) 1 or 2. If clean session
is set to true, the broker won’t store anything for the client and will
also purge all information from a previous persistent session. Default : true
boolean
connect.mqtt.keep.alive
The keep-alive functionality assures that the connection is still open and both
broker and client are connected to one another. Therefore the client specifies
a time interval in seconds and communicates it to the broker during establishment
of the connection. The interval is the longest possible period of time, which
broker and the client can endure without sending a message. Default : 5000
int
connect.mqtt.ssl.ca.cert Provides the path to the CA certificate file to use with the MQTT connection string
connect.mqtt.ssl.cert Provides the path to the certificate file to use with the MQTT connection string
connect.mqtt.ssl.key Certificate private key file path string
connect.mqtt.converter.throw.on.error
If set to false the conversion exception will be swallowed and everything
carries on BUT the message is lost!!;
true will throw the exception. Default : false
boolean
connect.converter.avro.schemas
If the AvroConverter is used you need to provide an AVRO Schema to be
able to read and translate the raw bytes to an AVRO record.
The format is $MQTT_TOPIC=$PATH_TO_AVRO_SCHEMA_FILE
boolean
connect.progress.enabled Enables the output for how many records have been processed boolean

Example

name=mqtt-source
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.keep.alive=1000
connect.mqtt.client.id=dm_source_id,
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://127.0.0.1:11883
connect.mqtt.service.quality=1
connect.mqtt.kcql=INSERT INTO kjson SELECT * FROM /mjson WITHCONVERTER=`myclass`;INSERT INTO kavro SELECT * FROM /mavro WITHCONVERTER=`myclass`
connect.converter.avro.schemas=/mavro=$PATH_TO/temperaturemeasure.avro

Provide your own Converter

You can always provide your own logic for converting the raw MQTT message bytes to your AVRO record. If you have messages coming in Protobuf format you can deserialize the message based on the schema and create the AVRO record. All you have to do is create a new project and add our dependency:

Gradle:

compile "com.datamountaineer:kafka-connect-common:0.7.1"

Maven:

<dependency>
    <groupId>com.datamountaineer</groupId>
    <artifactId>kafka-connect-common</artifactId>
    <version>0.7.1</version>
</dependency>

Then all you have to do is implement com.datamountaineer.streamreactor.connect.converters.source.Converter.

Here is our BytesConverter class code:

class BytesConverter extends Converter {
  override def convert(kafkaTopic: String, sourceTopic: String, messageId: String, bytes: Array[Byte]): SourceRecord = {
    new SourceRecord(Collections.singletonMap(Converter.TopicKey, sourceTopic),
      null,
      kafkaTopic,
      MsgKey.schema,
      MsgKey.getStruct(sourceTopic, messageId),
      Schema.BYTES_SCHEMA,
      bytes)
  }
}

All our implementation will send a MsgKey object as the Kafka message key. It contains the MQTT source topic and the MQTT message id

Kubernetes

Helm Charts are provided at our repo, add the repo to your Helm instance and install. We recommend using the Landscaper to manage Helm Values since typically each Connector instance has its own deployment.

Add the Helm charts to your Helm instance:

helm repo add landoop https://landoop.github.io/kafka-helm-charts/

TroubleShooting

Please review the FAQs and join our slack channel