Elastic


A set of Kafka Connect sink connectors for writing records from Kafka to Elastic.

Requires:

  • Elastic 6 or
  • Elastic 7+

KCQL support 

The following KCQL is supported:

INSERT | UPSERT
INTO <elastic_index >
SELECT FIELD, ...
FROM kafka_topic
[PK FIELD,...]
[WITHDOCTYPE=<your_document_type>]
[WITHINDEXSUFFIX=<your_suffix>]
[PROPERTIES (...)]

Examples:

-- Insert mode, select all fields from topicA and write to indexA
INSERT INTO indexA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB
-- and write to indexB
INSERT INTO indexB SELECT x AS a, y, zc FROM topicB PK y

-- UPSERT
UPSERT INTO indexC SELECT id, string_field FROM topicC PK id

Kafka Tombstone Handling 

It is possible to configure how the Connector handles a null value payload (called Kafka tombstones). Please use the behavior.on.null.values property in your KCQL with one of the possible values:

  • IGNORE (ignores tombstones entirely)
  • FAIL (throws Exception if tombstone happens)
  • DELETE (deletes index with specified id)

Example:

INSERT INTO indexA SELECT * FROM topicA PROPERTIES ('behavior.on.null.values'='IGNORE')

Concepts 

Primary Keys 

The PK keyword can be used to specify the fields which will be used for the key value. The field values will be concatenated and separated by a -. If no fields are set the topic name, partition and message offset are used.

Document Type 

WITHDOCTYPE allows you to associate a document type to the document inserted.

Index Suffix 

WITHINDEXSUFFIX allows you to specify a suffix to your index and we support date format.

Example:

WITHINDEXSUFFIX=_suffix_{YYYY-MM-dd}

Index Names 

1. Static Index Names 

To use a static index name, define the target index in the KCQL statement without any prefixes:

INSERT INTO index_name SELECT * FROM topicA

This will consistently create an index named index_name for any messages consumed from topicA.

2. Extracting Index Names from Headers, Keys, and Values 

2.1. Headers 

To extract an index name from a message header, use the _header prefix followed by the header name:

INSERT INTO _header.gate SELECT * FROM topicA

This statement extracts the value from the gate header field and uses it as the index name.

For headers with names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):

INSERT INTO `_header.'prefix.abc.suffix'` SELECT * FROM topicA

In this case, the value of the header named prefix.abc.suffix is used to form the index name.

2.2. Keys 

To use the full value of the message key as the index name, use the _key prefix:

INSERT INTO _key SELECT * FROM topicA

For example, if the message key is "freddie", the resulting index name will be freddie.

2.3. Values 

To extract an index name from a field within the message value, use the _value prefix followed by the field name:

INSERT INTO _value.name SELECT * FROM topicA

This example uses the value of the name field from the message’s value. If the field contains "jason", the index name will be jason.

Nested Fields in Values 

To access nested fields within a value, specify the full path using dot notation:

INSERT INTO _value.name.firstName SELECT * FROM topicA

If the firstName field is nested within the name structure, its value (e.g., "hans") will be used as the index name.

Fields with Dots in Their Names 

For field names that include dots, enclose the entire target in backticks (```) and each segment which consists of a field name in single quotes ('):

INSERT INTO `_value.'customer.name'.'first.name'` SELECT * FROM topicA

If the value structure contains:

{
  "customer.name": {
    "first.name": "hans"
  }
}

The extracted index name will be hans.

Error polices 

The connector supports Error polices.

Auto Index Creation 

The Sink will automatically create missing indexes at startup.

Please note that this feature is not compatible with index names extracted from message headers/keys/values.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.

For the Elasticsearch 6 Connector: 

export CONNECTOR=elastic6
docker-compose up -d elastic

For the Elasticsearch 7+ Connector: 

export CONNECTOR=elastic7
docker-compose up -d elastic

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select Elastic as the sink and paste the following:

For the Elasticsearch 6 Connector: 

name=elastic
connector.class=io.lenses.streamreactor.connect.elastic6.ElasticSinkConnector
tasks.max=1
topics=orders
connect.elastic.protocol=http
connect.elastic.hosts=elastic
connect.elastic.port=9200
connect.elastic.cluster.name=elasticsearch
connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders
connect.progress.enabled=true

For the Elasticsearch 7+ Connector: 

name=elastic
connector.class=io.lenses.streamreactor.connect.elastic7.ElasticSinkConnector
tasks.max=1
topics=orders
connect.elastic.protocol=http
connect.elastic.hosts=elastic
connect.elastic.port=9200
connect.elastic.cluster.name=elasticsearch
connect.elastic.kcql=INSERT INTO orders SELECT * FROM orders
connect.progress.enabled=true

To start the connector using the command line, log into the lenses-box container:


docker exec -ti lenses-box /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create elastic < connector.properties

Wait for the connector to start and check it’s running:

connect-cli status elastic

Inserting test data 

In the to lenses-box container start the kafka producer shell:


kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'

the console is now waiting for your input, enter the following:


{
  "id": 1,
  "created": "2016-05-06 13:53:00",
  "product": "OP-DAX-P-20150201-95.7",
  "price": 94.2,
  "qty": 100
}

Check for data in Elastic 

Login to the elastic container:

 docker exec -ti elastic bin/elasticsearch-sql-cli

Run the following:

 SELECT * FROM orders;

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.elastic.protocolURL protocol (http, https)stringhttp
connect.elastic.hostsList of hostnames for Elastic Search cluster node, not including protocol or port.stringlocalhost
connect.elastic.portPort on which Elastic Search node listens onstring9300
connect.elastic.tableprefixTable prefix (optional)string
connect.elastic.cluster.nameName of the elastic search cluster, used in local mode for setting the connectionstringelasticsearch
connect.elastic.write.timeoutThe time to wait in millis. Default is 5 minutes.int300000
connect.elastic.batch.sizeHow many records to process at one time. As records are pulled from Kafka it can be 100k+ which will not be feasible to throw at Elastic search at onceint4000
connect.elastic.use.http.usernameUsername if HTTP Basic Auth required default is null.string
connect.elastic.use.http.passwordPassword if HTTP Basic Auth required default is null.string
connect.elastic.error.policySpecifies the action to be taken if an error occurs while inserting the data There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.elastic.max.retriesThe maximum number of times to try the write again.int20
connect.elastic.retry.intervalThe time in milliseconds between retries.int60000
connect.elastic.kcqlKCQL expression describing field selection and routes.string
connect.elastic.pk.separatorSeparator used when have more that one field in PKstring-
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse

SSL Configuration Properties 

Property NameDescription
ssl.truststore.locationPath to the truststore file containing the trusted CA certificates for verifying broker certificates.
ssl.truststore.passwordPassword for the truststore file to protect its integrity.
ssl.truststore.typeType of the truststore (e.g., JKS, PKCS12). Default is JKS.
ssl.keystore.locationPath to the keystore file containing the client’s private key and certificate chain for client authentication.
ssl.keystore.passwordPassword for the keystore to protect the private key.
ssl.keystore.typeType of the keystore (e.g., JKS, PKCS12). Default is JKS.
ssl.protocolThe SSL protocol used for secure connections (e.g., TLSv1.2, TLSv1.3). Default is TLS.
ssl.keymanager.algorithmAlgorithm used by the KeyManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.
ssl.trustmanager.algorithmAlgorithm used by the TrustManager to manage certificates. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

KCQL Properties 

NameDescriptionTypeDefault Value
behavior.on.null.valuesSpecifies behavior on Kafka tombstones: IGNORE , DELETE or FAILstringIGNORE

SSL Configuration 

Enabling SSL connections between Kafka Connect and Elasticsearch ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity.

While newer versions of Elasticsearch have SSL enabled by default for internal communication, it’s still necessary to configure SSL for client connections, such as those from Kafka Connect. Even if Elasticsearch has SSL enabled by default, Kafka Connect still needs these configurations to establish a secure connection. By setting up SSL in Kafka Connect, you ensure:

  • Data encryption: Prevents unauthorized access to data being transferred.
  • Authentication: Confirms that Kafka Connect and Elasticsearch are communicating with trusted entities.
  • Compliance: Meets security standards for regulatory requirements (such as GDPR or HIPAA).

Configuration Example 

ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=your_truststore_password
ssl.truststore.type=JKS  # Can also be PKCS12 if applicable

ssl.keystore.location=/path/to/keystore.jks
ssl.keystore.password=your_keystore_password
ssl.keystore.type=JKS  # Can also be PKCS12 if applicable

ssl.protocol=TLSv1.2  # Or TLSv1.3 for stronger security

ssl.trustmanager.algorithm=PKIX  # Default algorithm for managing certificates

Terminology: 

  • Truststore: Holds certificates to check if the node’s certificate is valid.
  • Keystore: Contains your client’s private key and certificate to prove your identity to the node.
  • SSL Protocol: Use TLSv1.2 or TLSv1.3 for up-to-date security.
  • Password Security: Protect passwords by encrypting them or using secure methods like environment variables or secret managers.
--
Last modified: October 2, 2024