5.0
InfluxDB
Kafka Connect sink connector for writing data from Kafka to InfluxDB v1.x
KCQL support
The following KCQL is supported:
INSERT INTO <your-measure>
SELECT FIELD, ...
FROM kafka_topic_name
[WITHTIMESTAMP FIELD|sys_time]
[WITHTAG(FIELD|(constant_key=constant_value)]
Examples:
-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO measureA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the point measurement
INSERT INTO measureB SELECT x AS a, y AS b, c FROM topicB WITHTIMESTAMP y
-- Insert mode, select 3 fields and rename from topicB and write to indexB,
-- use field Y as the current system time for Point measurement
INSERT INTO measureB SELECT x AS a, y AS b, z FROM topicB WITHTIMESTAMP sys_time()
-- Tagging using constants
INSERT INTO measureA SELECT * FROM topicA WITHTAG (DataMountaineer=awesome, Influx=rulz!)
-- Tagging using fields in the payload. Say we have a Payment structure
-- with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to)
-- Tagging using a combination of fields in the payload and constants.
-- Say we have a Payment structure with these fields: amount, from, to, note
INSERT INTO measureA SELECT * FROM topicA WITHTAG (from, to, provider=DataMountaineer)
Concepts
Kafka payload support
This sink supports the following Kafka payloads:
- Schema.Struct and Struct (Avro)
- Schema.Struct and JSON
- No Schema and JSON
See connect payloads for more information.
Tags
InfluxDB allows via the client API to provide a set of tags (key-value) to each point added. The current connector version allows you to provide them via the KCQL.
Error policies
The connector supports Error policies .
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=influx
docker-compose up -d influxdb
Peparing the target system
Login into the influx container:
docker exec -ti influxdb influx
execute the following:
CREATE DATABASE mydb
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select InfluxDB as the sink and paste the following:
name=influxdb
connector.class=com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector
tasks.max=1
topics=influx
connect.influx.url=http://influxdb:8086
connect.influx.db=mydb
connect.influx.username=admin
connect.influx.kcql=INSERT INTO influxMeasure SELECT * FROM influx WITHTIMESTAMP sys_time()
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create influxdb < connector.properties
connect-cli create influxdb < connector.properties
Wait a for the connector to start and check its running:
connect-cli status influxdb
Inserting test data
In the to fastdata container start the kafka producer shell:
kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic influx \
--property value.schema='{"type":"record","name":"User",
"fields":[{"name":"company","type":"string"},{"name":"address","type":"string"},{"name":"latitude","type":"float"},{"name":"longitude","type":"float"}]}'
the console is now waiting for your input, enter the following:
{"company": "DataMountaineer","address": "MontainTop","latitude": -49.817964,"longitude": -141.645812}
Check for data in Influxdb
In the Influx container run:
USE mydb
SELECT * FROM influxMeasure
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.influx.url | The InfluxDB database url. | string | |
connect.influx.db | The database to store the values to. | string | |
connect.influx.username | The user to connect to the influx database | string | |
connect.influx.password | The password for the influxdb user. | password | |
connect.influx.kcql | KCQL expression describing field selection and target measurements. | string | |
connect.progress.enabled | Enables the output for how many records have been processed by the connector | boolean | false |
connect.influx.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.influx.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.influx.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.influx.retention.policy | Determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. DURATION determines how long InfluxDB keeps the data - the options for specifying the duration of the retention policy are listed below. Note that the minimum retention period is one hour. m minutes h hours d days w weeks INF infinite Default retention is autogen from 1.0 onwards or default for any previous version | string | autogen |
connect.influx.consistency.level | Specifies the write consistency. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is ALL. | string | ALL |