A set of Kafka Connect sink connectors for writing records from Kafka to Elastic.
Requires:
The following KCQL is supported:
INSERT | UPSERT INTO <elastic_index > SELECT FIELD, ... FROM kafka_topic [PK FIELD,...] [WITHDOCTYPE=<your_document_type>] [WITHINDEXSUFFIX=<your_suffix>]
Examples:
-- Insert mode, select all fields from topicA and write to indexA INSERT INTO indexA SELECT * FROM topicA -- Insert mode, select 3 fields and rename from topicB -- and write to indexB INSERT INTO indexB SELECT x AS a, y, zc FROM topicB PK y -- UPSERT UPSERT INTO indexC SELECT id, string_field FROM topicC PK id
The PK keyword can be used to specify the fields which will be used for the key value. The field values will be concatenated and separated by a -. If no fields are set the topic name, partition and message offset are used.
WITHDOCTYPE allows you to associate a document type to the document inserted.
WITHINDEXSUFFIX allows you to specify a suffix to your index and we support date format.
Example:
WITHINDEXSUFFIX=_suffix_{YYYY-MM-dd}
The connector supports Error polices.
The Sink will automatically create missing indexes at startup.
export CONNECTOR=elastic6 docker-compose up -d elastic
export CONNECTOR=elastic7 docker-compose up -d elastic
If you are using Lenses, login into Lenses and navigate to the connectors page, select Elastic as the sink and paste the following:
name=elastic connector.class=com.datamountaineer.streamreactor.connect.elastic6.ElasticSinkConnector tasks.max=1 topics=orders connect.elastic.protocol=http connect.elastic.hosts=elastic connect.elastic.port=9200 connect.elastic.cluster.name=elasticsearch connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders connect.progress.enabled=true
name=elastic connector.class=com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector tasks.max=1 topics=orders connect.elastic.protocol=http connect.elastic.hosts=elastic connect.elastic.port=9200 connect.elastic.cluster.name=elasticsearch connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders connect.progress.enabled=true
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create elastic < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status elastic
In the to lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{ "id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty": 100 }
Login to the elastic container:
docker exec -ti elastic bin/elasticsearch-sql-cli
Run the following:
SELECT * FROM orders;
Bring down the stack:
docker-compose down
On this page