A Kafka Connect sink connector for writing records from Kafka to Hazelcast.
The following KCQL is supported:
INSERT INTO <hazelcast-namespace> SELECT FIELD, ... FROM <your-kafka-topic> [PK FIELD, ...] WITHFORMAT JSON|AVRO STOREAS RELIABLE_TOPIC|RING_BUFFER|QUEUE|SET|LIST|IMAP|MULTI_MAP|ICACHE
Example:
-- store into a ring buffer INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS RING_BUFFER -- store into a reliable topic INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS RELIABLE_TOPIC -- store into a queue INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS QUEUE -- store into a set INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS SET -- store into a list INSERT INTO lenses SELECT x,y,z FROM topicB WITHFORMAT AVRO STOREAS LIST -- store into an i-map with field1 used as the map key INSERT INTO lenses SELECT x,y,z FROM topicB PK field1 WITHFORMAT AVRO STOREAS IMAP -- store into a multi-map with field1 used as the map key INSERT INTO lenses SELECT x,y,z FROM topicB PK field1 WITHFORMAT AVRO STOREAS MULTI_MAP -- store into an i-cache with field1 used as the cache key INSERT INTO lenses SELECT x,y,z FROM topicB PK field1 WITHFORMAT AVRO STOREAS ICACHE
The connector takes the value from the Kafka Connect SinkRecords and inserts/updates an entry in HazelCast. The Sink supports writing to
When inserting into MAP or MULTI_MAP we need a key, the PK keyword can be used to specify the fields which will be used for the key value. The field values will be concatenated and separated by a -. If no fields are set the topic name, partition and message offset are used.
Hazelcast requires that data stored in collections and topics is serializable. The Sink offers two modes to store data.
AVRO In this mode the Sink converts the SinkRecords from Kafka to AVRO encoded byte arrays. JSON In this mode the Sink converts the SinkRecords from Kafka to JSON strings.
This behavior is controlled by the KCQL statement in the connect.hazelcast.kcql option. The default is JSON.
The Hazelcast Sink supports storing data in RingBuffers, ReliableTopics, Queues, Sets, Lists, IMaps, Multi-maps and ICaches. This behavior is controlled by the KCQL statement in the connect.hazelcast.kcql option.
By default each task in the Sink will write the records it receives sequentially, the Sink optionally supports parallel writes where an executorThreadPool is started and records are written in parallel. While this results in better performance we can’t guarantee the order of the writes.
To enable parallel writes set the connect.hazelcast.parallel.write configuration option to true
The connector supports Error polices.
export CONNECTOR=hazelcast docker-compose up -d hazelcast
Next configure and start Hazelcast so that the Connector can join the cluster. Login to the Hazelcast container and create a file called hazelcast.xml with the following content:
docker exec -ti hazelcast /bin/bash apt-get update && apt-get install -y vim
<hazelcast xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.hazelcast.com/schema/config http://www.hazelcast.com/schema/config/hazelcast-config-3.8.xsd" xmlns="http://www.hazelcast.com/schema/config"> <group> <name>dev</name> <password>dev-pass</password> </group> </hazelcast>
Then start Hazelcast:
export JAVA_OPTS="-Dhazelcast.config=hazelcast.xml -Dgroup.name=dev -Dgroup.password=dev-pass" ./server.sh
If you are using Lenses, login into Lenses and navigate to the connectors page, select Hazelcast as the sink and paste the following:
name=hazelcast connector.class=com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastSinkConnector tasks.max=1 topics=orders connect.hazelcast.cluster.members=hazelcast connect.hazelcast.group.name=dev connect.hazelcast.group.password=dev-pass connect.hazelcast.kcql=INSERT INTO orders SELECT * FROM orders WITHFORMAT JSON STOREAS QUEUE
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create hazelcast < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status hazelcast
In the lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{ "id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty": 100 }
Login to the hazelcast container:
docker exec -ti hazelcast /bin/bash
Run start the console app:
java -cp hazelcast-all-3.8.4.jar com.hazelcast.client.console.ClientConsoleApp
In the app, switch to the dev namespace and check the queue:
ns dev
q.iterator
Bring down the stack:
docker-compose down
On this page