Kafka Connect sink connector for writing data from Kafka to InfluxDB v1.x
The following KCQL is supported:
INSERT INTO <your-measure> SELECT FIELD, ... FROM kafka_topic_name [WITHTIMESTAMP FIELD|sys_time] [WITHTAG(FIELD|(constant_key=constant_value)]
Examples:
-- Insert mode, select all fields from topicA and write to indexA INSERT INTO measureA SELECT * FROM topicA -- Insert mode, select 3 fields and rename from topicB and write to indexB, -- use field Y as the point measurement INSERT INTO measureB SELECT x AS a, y AS b, c FROM topicB WITHTIMESTAMP y -- Insert mode, select 3 fields and rename from topicB and write to indexB, -- use field Y as the current system time for Point measurement INSERT INTO measureB SELECT x AS a, y AS b, z FROM topicB WITHTIMESTAMP sys_time() -- Tagging using constants INSERT INTO measureA SELECT * FROM topicA WITHTAG (DataMountaineer=awesome, Influx=rulz!) -- Tagging using fields in the payload. Say we have a Payment structure -- with these fields: amount, from, to, note INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to) -- Tagging using a combination of fields in the payload and constants. -- Say we have a Payment structure with these fields: amount, from, to, note INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to, provider=DataMountaineer)
This sink supports the following Kafka payloads:
See connect payloads for more information.
InfluxDB allows via the client API to provide a set of tags (key-value) to each point added. The current connector version allows you to provide them via the KCQL.
The connector supports Error polices.
export CONNECTOR=influx docker-compose up -d influxdb
Login into the influx container:
docker exec -ti influxdb influx
execute the following:
CREATE DATABASE mydb
If you are using Lenses, login into Lenses and navigate to the connectors page, select InfluxDB as the sink and paste the following:
name=influxdb connector.class=com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector tasks.max=1 topics=influx connect.influx.url=http://influxdb:8086 connect.influx.db=mydb connect.influx.username=admin connect.influx.kcql=INSERT INTO influxMeasure SELECT * FROM influx WITHTIMESTAMP sys_time()
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create influxdb < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status influxdb
In the lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 \ --topic influx \ --property value.schema='{"type":"record","name":"User", "fields":[{"name":"company","type":"string"},{"name":"address","type":"string"},{"name":"latitude","type":"float"},{"name":"longitude","type":"float"}]}'
the console is now waiting for your input, enter the following:
{ "company": "DataMountaineer", "address": "MontainTop", "latitude": -49.817964, "longitude": -141.645812 }
In the Influx container run:
USE mydb SELECT * FROM influxMeasure
Bring down the stack:
docker-compose down
autogen
default
On this page