5.0
Hazelcast
A Kafka Connect sink connector for writing records from Kafka to Hazelcast.
KCQL support
The following KCQL is supported:
INSERT INTO <hazelcast-namespace>
SELECT FIELD, ...
FROM <your-kafka-topic>
[PK FIELD, ...]
WITHFORMAT JSON|AVRO
STOREAS RELIABLE_TOPIC|RING_BUFFER|QUEUE|SET|LIST|IMAP|MULTI_MAP|ICACHE
Example:
-- store into a ring buffer
INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS RING_BUFFER
-- store into a reliable topic
INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS RELIABLE_TOPIC
-- store into a queue
INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS QUEUE
-- store into a set
INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS SET
-- store into a list
INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS LIST
-- store into an i-map with field1 used as the map key
INSERT INTO lenses SELECT x,y,z FROM topicB PK field1 WITHFORMAT AVRO STOREAS IMAP
-- store into a multi-map with field1 used as the map key
INSERT INTO lenses SELECT x,y,z FROM topicB PK field1 WITHFORMAT AVRO STOREAS MULTI_MAP
-- store into an i-cache with field1 used as the cache key
INSERT INTO lenses SELECT x,y,z FROM topicB PK field1 WITHFORMAT AVRO STOREAS ICACHE
Concepts
The connector takes the value from the Kafka Connect SinkRecords and inserts/updates an entry in HazelCast. The Sink supports writing to
- reliable topics
- ring buffers
- queues
- sets
- lists
- imap
- multi-map
- icache.
Primary Keys
When inserting into MAP or MULTI_MAP we need a key, the PK keyword can be used to specify the fields which will be used for the key value. The field values will be concatenated and separated by a -. If no fields are set the topic name, partition and message offset are used.
With Format
Hazelcast requires that data stored in collections and topics is serializable. The Sink offers two modes to store data.
AVRO In this mode the Sink converts the SinkRecords from Kafka to AVRO encoded byte arrays. JSON In this mode the Sink converts the SinkRecords from Kafka to JSON strings.
This behavior is controlled by the KCQL statement in the connect.hazelcast.kcql option. The default is JSON.
Stored As
The Hazelcast Sink supports storing data in RingBuffers, ReliableTopics, Queues, Sets, Lists, IMaps, Multi-maps and ICaches. This behaviour is controlled by the KCQL statement in the connect.hazelcast.kcql option.
Parallel Writes
By default each task in the Sink will write the records it receives sequentially, the Sink optionally supports parallel writes where an executorThreadPool is started and records are written in parallel. While this results in better performance we can’t guarantee the order of the writes.
To enable parallel writes set the connect.hazelcast.parallel.write configuration option to true
Error polices
The connector supports Error polices .
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=hazelcast
docker-compose up -d hazelcast
Next configure and start Hazelcast so that the Connector can join the cluster. Login to the Hazelcast container and create a file called hazelcast.xml with the following content:
docker exec -ti hazelcast /bin/bash
apt-get update && apt-get install -y vim
<hazelcast xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.hazelcast.com/schema/config/hazelcast-config-3.8.xsd"
xmlns="http://www.hazelcast.com/schema/config">
<cluster-name>dev</cluster-name>
</hazelcast>
Then start Hazelcast:
hz start -c hazelcast.xml
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select Hazelcast as the sink and paste the following:
name=hazelcast
connector.class=com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastSinkConnector
tasks.max=1
topics=orders
connect.hazelcast.cluster.members=hazelcast
connect.hazelcast.cluster.name=dev
connect.hazelcast.kcql=INSERT INTO orders SELECT * FROM orders WITHFORMAT JSON STOREAS QUEUE
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create hazelcast < connector.properties
Wait a for the connector to start and check its running:
connect-cli status hazelcast
Inserting test data
In the to fastdata container start the kafka producer shell:
kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}
Check for data in Hazelcast
Login to the hazelcast container:
docker exec -ti hazelcast /bin/bash
Run start the console app:
java -cp hazelcast-all-3.8.4.jar com.hazelcast.client.console.ClientConsoleApp
In the app, switch to the dev namespace and check the queue:
ns dev
q.iterator
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
ssl.keymanager.algorithm | The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine. | string | SunX509 |
ssl.keystore.type | The file format of the key store file. This is optional for client. | string | JKS |
ssl.keystore.location | The location of the key store file. This is optional for client and can be used for two-way authentication for client. | string | |
ssl.truststore.location | The location of the trust store file. | string | |
ssl.provider | The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. | string | |
ssl.secure.random.implementation | The SecureRandom PRNG implementation to use for SSL cryptography operations. | string | |
ssl.key.password | The password of the private key in the key store file. This is optional for client. | password | |
ssl.trustmanager.algorithm | The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine. | string | PKIX |
ssl.truststore.password | The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled. | password | |
ssl.endpoint.identification.algorithm | The endpoint identification algorithm to validate server hostname using server certificate. | string | https |
ssl.enabled.protocols | The list of protocols enabled for SSL connections. | list | [TLSv1.2, TLSv1.1, TLSv1] |
ssl.protocol | The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. | string | TLS |
ssl.cipher.suites | A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported. | list | |
ssl.truststore.type | The file format of the trust store file. | string | JKS |
ssl.keystore.password | The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured. | password | |
connect.hazelcast.cluster.members | Address List is the initial list of cluster addresses to which the client will connect. The client uses this list to find an alive node. Although it may be enough to give only one address of a node in the cluster (since all nodes communicate with each other), it is recommended that you give the addresses for all the nodes. | list | |
connect.hazelcast.timeout | Connection timeout is the timeout value in milliseconds for nodes to accept client connection requests. | long | 5000 |
connect.hazelcast.retries | Number of times a client will retry the connection at startup. | int | 2 |
connect.hazelcast.keep.alive | Enables/disables the SO_KEEPALIVE socket option. The default value is true. | boolean | true |
connect.hazelcast.tcp.no.delay | Enables/disables the TCP_NODELAY socket option. The default value is true. | boolean | true |
connect.hazelcast.reuse.address | Enables/disables the SO_REUSEADDR socket option. The default value is true. | boolean | true |
connect.hazelcast.ssl.enabled | Enables ssl | boolean | false |
connect.hazelcast.linger.seconds | Enables/disables SO_LINGER with the specified linger time in seconds. The default value is 3. | int | 3 |
connect.hazelcast.buffer.size | Sets the SO_SNDBUF and SO_RCVBUF options to the specified value in KB for this Socket. The default value is 32. | int | 32 |
connect.hazelcast.cluster.name | The target Hazelcast cluster name. | string | |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.hazelcast.kcql | connect.hazelcast.kcql | string | |
connect.hazelcast.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.hazelcast.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.hazelcast.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.hazelcast.threadpool.size | The sink inserts all the data concurrently. To fail fast in case of an error, the sink has its own thread pool. Set the value to zero and the threadpool will default to 4* NO_OF_CPUs. Set a value greater than 0 and that would be the size of this threadpool. | int | 0 |
connect.hazelcast.parallel.write | All the sink to write in parallel the records received from Kafka on each poll. | boolean | false |