A set of Kafka Connect sink connectors for writing records from Kafka to Elastic.
Requires:
The following KCQL is supported:
INSERT | UPSERT INTO <elastic_index > SELECT FIELD, ... FROM kafka_topic [PK FIELD,...] [WITHDOCTYPE=<your_document_type>] [WITHINDEXSUFFIX=<your_suffix>] [PROPERTIES (...)]
Examples:
-- Insert mode, select all fields from topicA and write to indexA INSERT INTO indexA SELECT * FROM topicA -- Insert mode, select 3 fields and rename from topicB -- and write to indexB INSERT INTO indexB SELECT x AS a, y, zc FROM topicB PK y -- UPSERT UPSERT INTO indexC SELECT id, string_field FROM topicC PK id
It is possible to configure how the Connector handles a null value payload (called Kafka tombstones). Please use the behavior.on.null.values property in your KCQL with one of the possible values:
behavior.on.null.values
IGNORE
FAIL
DELETE
Example:
INSERT INTO indexA SELECT * FROM topicA PROPERTIES ('behavior.on.null.values'='IGNORE')
The PK keyword can be used to specify the fields which will be used for the key value. The field values will be concatenated and separated by a -. If no fields are set the topic name, partition and message offset are used.
WITHDOCTYPE allows you to associate a document type to the document inserted.
WITHINDEXSUFFIX allows you to specify a suffix to your index and we support date format.
WITHINDEXSUFFIX=_suffix_{YYYY-MM-dd}
To use a static index name, define the target index in the KCQL statement without any prefixes:
INSERT INTO index_name SELECT * FROM topicA
This will consistently create an index named index_name for any messages consumed from topicA.
index_name
topicA
To extract an index name from a message header, use the _header prefix followed by the header name:
_header
INSERT INTO _header.gate SELECT * FROM topicA
This statement extracts the value from the gate header field and uses it as the index name.
gate
For headers with names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):
'
INSERT INTO `_header.'prefix.abc.suffix'` SELECT * FROM topicA
In this case, the value of the header named prefix.abc.suffix is used to form the index name.
prefix.abc.suffix
To use the full value of the message key as the index name, use the _key prefix:
_key
INSERT INTO _key SELECT * FROM topicA
For example, if the message key is "freddie", the resulting index name will be freddie.
"freddie"
freddie
To extract an index name from a field within the message value, use the _value prefix followed by the field name:
_value
INSERT INTO _value.name SELECT * FROM topicA
This example uses the value of the name field from the message’s value. If the field contains "jason", the index name will be jason.
name
"jason"
jason
To access nested fields within a value, specify the full path using dot notation:
INSERT INTO _value.name.firstName SELECT * FROM topicA
If the firstName field is nested within the name structure, its value (e.g., "hans") will be used as the index name.
firstName
"hans"
For field names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):
INSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicA
If the value structure contains:
{ "customer.name": { "first.name": "hans" } }
The extracted index name will be hans.
hans
The connector supports Error polices.
The Sink will automatically create missing indexes at startup.
Please note that this feature is not compatible with index names extracted from message headers/keys/values.
export CONNECTOR=elastic6 docker-compose up -d elastic
export CONNECTOR=elastic7 docker-compose up -d elastic
If you are using Lenses, login into Lenses and navigate to the connectors page, select Elastic as the sink and paste the following:
name=elastic connector.class=io.lenses.streamreactor.connect.elastic6.ElasticSinkConnector tasks.max=1 topics=orders connect.elastic.protocol=http connect.elastic.hosts=elastic connect.elastic.port=9200 connect.elastic.cluster.name=elasticsearch connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders connect.progress.enabled=true
name=elastic connector.class=io.lenses.streamreactor.connect.elastic7.ElasticSinkConnector tasks.max=1 topics=orders connect.elastic.protocol=http connect.elastic.hosts=elastic connect.elastic.port=9200 connect.elastic.cluster.name=elasticsearch connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders connect.progress.enabled=true
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create elastic < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status elastic
In the to lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{ "id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty": 100 }
Login to the elastic container:
docker exec -ti elastic bin/elasticsearch-sql-cli
Run the following:
SELECT * FROM orders;
Bring down the stack:
docker-compose down
connect.elastic.protocol
connect.elastic.hosts
connect.elastic.port
connect.elastic.tableprefix
connect.elastic.cluster.name
connect.elastic.write.timeout
connect.elastic.batch.size
connect.elastic.use.http.username
connect.elastic.use.http.password
connect.elastic.error.policy
connect.elastic.max.retries
connect.elastic.retry.interval
connect.elastic.kcql
connect.elastic.pk.separator
connect.progress.enabled
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
JKS
PKCS12
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
TLSv1.2
TLSv1.3
TLS
ssl.keymanager.algorithm
ssl.trustmanager.algorithm
Enabling SSL connections between Kafka Connect and Elasticsearch ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity.
While newer versions of Elasticsearch have SSL enabled by default for internal communication, it’s still necessary to configure SSL for client connections, such as those from Kafka Connect. Even if Elasticsearch has SSL enabled by default, Kafka Connect still needs these configurations to establish a secure connection. By setting up SSL in Kafka Connect, you ensure:
ssl.truststore.location=/path/to/truststore.jks ssl.truststore.password=your_truststore_password ssl.truststore.type=JKS # Can also be PKCS12 if applicable ssl.keystore.location=/path/to/keystore.jks ssl.keystore.password=your_keystore_password ssl.keystore.type=JKS # Can also be PKCS12 if applicable ssl.protocol=TLSv1.2 # Or TLSv1.3 for stronger security ssl.trustmanager.algorithm=PKIX # Default algorithm for managing certificates
On this page