5.0
Elastic
A set of Kafka Connect sink connectors for writing records from Kafka to Elastic.
Requires:
- Elastic 6 or
- Elastic 7+
KCQL support
The following KCQL is supported:
INSERT | UPSERT
INTO <elastic_index >
SELECT FIELD, ...
FROM kafka_topic
[PK FIELD,...]
[WITHDOCTYPE=<your_document_type>]
[WITHINDEXSUFFIX=<your_suffix>]
Examples:
-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO indexA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB
-- and write to indexB
INSERT INTO indexB SELECT x AS a, y, zc FROM topicB PK y
-- UPSERT
UPSERT INTO indexC SELECT id, string_field FROM topicC PK id
Concepts
Primary Keys
The PK keyword can be used to specify the fields which will be used for the key value. The field values will be concatenated and separated by a -. If no fields are set the topic name, partition and message offset are used.
Document Type
WITHDOCTYPE allows you to associate a document type to the document inserted.
Index Suffix
WITHINDEXSUFFIX allows you to specify a suffix to your index and we support date format.
Example:
WITHINDEXSUFFIX=_suffix_{YYYY-MM-dd}
Error polices
The connector supports Error polices .
Auto Index Creation
The Sink will automatically create missing indexes at startup.
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
For the Elasticsearch 6 Connector:
export CONNECTOR=elastic6
docker-compose up -d elastic
For the Elasticsearch 7+ Connector:
export CONNECTOR=elastic7
docker-compose up -d elastic
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select Elastic as the sink and paste the following:
For the Elasticsearch 6 Connector:
name=elastic
connector.class=com.datamountaineer.streamreactor.connect.elastic6.ElasticSinkConnector
tasks.max=1
topics=orders
connect.elastic.protocol=http
connect.elastic.hosts=elastic
connect.elastic.port=9200
connect.elastic.cluster.name=elasticsearch
connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders
connect.progress.enabled=true
For the Elasticsearch 7+ Connector:
name=elastic
connector.class=com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector
tasks.max=1
topics=orders
connect.elastic.protocol=http
connect.elastic.hosts=elastic
connect.elastic.port=9200
connect.elastic.cluster.name=elasticsearch
connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders
connect.progress.enabled=true
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create elastic < connector.properties
Wait a for the connector to start and check its running:
connect-cli status elastic
Inserting test data
In the to fastdata container start the kafka producer shell:
kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}
Check for data in Elastic
Login to the elastic container:
docker exec -ti elastic bin/elasticsearch-sql-cli
Run the following:
SELECT * FROM orders;
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.elastic.protocol | URL protocol (http, https) | string | http |
connect.elastic.hosts | List of hostnames for Elastic Search cluster node, not including protocol or port. | string | localhost |
connect.elastic.port | Port on which Elastic Search node listens on | string | 9300 |
connect.elastic.tableprefix | Table prefix (optional) | string | |
connect.elastic.cluster.name | Name of the elastic search cluster, used in local mode for setting the connection | string | elasticsearch |
connect.elastic.write.timeout | The time to wait in millis. Default is 5 minutes. | int | 300000 |
connect.elastic.batch.size | How many records to process at one time. As records are pulled from Kafka it can be 100k+ which will not be feasible to throw at Elastic search at once | int | 4000 |
connect.elastic.use.http.username | Username if HTTP Basic Auth required default is null. | string | |
connect.elastic.use.http.password | Password if HTTP Basic Auth required default is null. | string | |
connect.elastic.error.policy | Specifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.elastic.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.elastic.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.elastic.kcql | KCQL expression describing field selection and routes. | string | |
connect.elastic.pk.separator | Separator used when have more that one field in PK | string | - |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |