HazelCast Sink¶
Download connector Hazelcast Connector 1.2 for Kafka Hazelcast Connector 1.1 for Kafka
This HazelCast Sink allows you to write events from Kafka to HazelCast. The connector takes the value from the Kafka Connect SinkRecords and inserts/updates an entry in HazelCast. The Sink supports writing to a reliable topic, ring buffer, queue, set, list, imap, multi-map, and icache. This Connector has been developed in collaboration with HazelCast.
Prerequisites¶
- Apache Kafka 0.11.x of above
- Kafka Connect 0.11.x or above
- Hazelcast 3.6.4 or higher
- Java 1.8
Features¶
- The KCQL routing querying - Kafka topic payload field selection is supported, allowing you to chose the fields to be written to Hazelcast
- Encoding as JSON or AVRO in Hazelcast via KCQL
- Storing in a Hazelcast RELIABLE_TOPIC, RING_BUFFER, QUEUE, SET, LIST, IMAP, MULTI_MAP, ICACHE via KCQL
- Parallel writes for improved performance
- Error policies for handling failures.
KCQL Support¶
INSERT INTO entity_name SELECT { FIELD, ... } FROM kafka_topic [PK FIELD, ...] WITHFORMAT
{ JSON|AVRO } STOREAS { RELIABLE_TOPIC|RING_BUFFER|QUEUE|SET|LIST|IMAP|MULTI_MAP|ICACHE }
Tip
You can specify multiple KCQL statements separated by ;
to have a the connector sink multiple topics.
The Hazelcast sink supports KCQL, Kafka Connect Query Language. The following support KCQL is available:
- Field selection
- Target Hazelcast storage selection, RELIABLE_TOPIC, RING_BUFFER, QUEUE, SET, LIST, IMAP, MULTI_MAP or ICACHE
- Serialiaztion format in Hazelcast, JSON, text or AVRO
- Setting the primary keys for MAPS and MULTI_MAP.
Example:
-- Insert mode, select all fields from topicA and write to tableA
INSERT INTO tableA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB and write to tableB, store as serialized AVRO encoded byte arrays
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB WITHFORMAT avro STOREAS RING_BUFFER
This is set in the connect.hazelcast.kcql
option.
Primary Keys¶
When inserting into MAP or MULTI_MAP we need a key, the PK
keyword can be used to specifiy
the fields which will be used for the key value. The field values will be concatenated and separated
by a -
. If no fields are set the topic name, partition and message offset are used.
With Format¶
Hazelcast requires that data stored in collections and topics is serializable. The Sink offers two modes to store data.
AVRO In this mode the Sink converts the SinkRecords from Kafka to AVRO encoded byte arrays. JSON In this mode the Sink converts the SinkRecords from Kafka to JSON strings.
This behaviour is controlled by the KCQL statement in the connect.hazelcast.kcql
option. The default is JSON.
Stored As¶
The Hazelcast Sink supports storing data in RingBuffers, ReliableTopics, Queues, Sets, Lists, IMaps, Multi-maps and
ICaches. This behaviour is controlled by the KCQL statement in the connect.hazelcast.kcql
option. Note that
IMaps, Multi-maps and ICaches support a key as well as a value.
-- store into a ring buffer
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB WITHFORMAT avro STOREAS RING_BUFFER
-- store into a reliable topic
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB WITHFORMAT avro STOREAS RELIABLE_TOPIC
-- store into a queue
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB WITHFORMAT avro STOREAS QUEUE
-- store into a set
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB WITHFORMAT avro STOREAS SET
-- store into a list
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB WITHFORMAT avro STOREAS LIST
-- store into an i-map with field1 used as the map key
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB PK field1 WITHFORMAT avro STOREAS IMAP
-- store into a multi-map with field1 used as the map key
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB PK field1 WITHFORMAT avro STOREAS MULTI_MAP
-- store into an i-cache with field1 used as the cache key
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB PK field1 WITHFORMAT avro STOREAS ICACHE
Parallel Writes¶
By default each task in the Sink will write the records it receives sequentially, the Sink optionally supports parallel writes where an executorThreadPool is started and records are written in parallel. While this results in better performance we can’t guarantee the order of the writes.
To enable parallel writes set the connect.hazelcast.parallel.write
configuration option to true
Lenses QuickStart¶
The easiest way to try out this is using Lenses Box the pre-configured docker, that comes with this connector pre-installed. You would need to Connectors –> New Connector –> Sink –> HazelCast and paste your configuration
HazelCast Setup¶
Download and install HazelCast from here
When you download and extract the Hazelcast ZIP or TAR.GZ package, you will see 3 scripts under the /bin
folder which
provide basic functionality for member and cluster management.
The following are the names and descriptions of each script:
- start.sh - Starts a Hazelcast member with the default configuration in the working directory.
- stop.sh - Stops the Hazelcast member that was started in the current working directory.
Start HazelCast:
➜ bin/start.sh
INFO: [10.128.137.102]:5701 [dev] [3.6.4] Address[10.128.137.102]:5701 is STARTING
Aug 16, 2016 2:43:04 PM com.hazelcast.nio.tcp.nonblocking.NonBlockingIOThreadingModel
INFO: [10.128.137.102]:5701 [dev] [3.6.4] TcpIpConnectionManager configured with Non Blocking IO-threading model: 3 input threads and 3 output threads
Aug 16, 2016 2:43:07 PM com.hazelcast.cluster.impl.MulticastJoiner
INFO: [10.128.137.102]:5701 [dev] [3.6.4]
Members [1] {
Member [10.128.137.102]:5701 this
}
Aug 16, 2016 2:43:07 PM com.hazelcast.core.LifecycleService
INFO: [10.128.137.102]:5701 [dev] [3.6.4] Address[10.128.137.102]:5701 is STARTED
This will start Hazelcast with a default group called dev and password dev-pass
Installing the Connector¶
Connect, in production should be run in distributed mode
- Install and configure a Kafka Connect cluster
- Create a folder on each server called
plugins/lib
- Copy into the above folder the required connector jars from the stream reactor download
- Edit
connect-avro-distributed.properties
in theetc/schema-registry
folder and uncomment theplugin.path
option. Set it to the root directory i.e. plugins you deployed the stream reactor connector jars in step 2. - Start Connect,
bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
Connect Workers are long running processes so set an init.d
or systemctl
service accordingly.
Starting the Connector (Distributed)¶
Download, and install Stream Reactor. Follow the instructions here if you haven’t already done so. All paths in the quickstart are based on the location you installed Stream Reactor.
Once the Connect has started we can now use the kafka-connect-tools cli to post in our distributed properties file for HazelCast. For the CLI to work including when using the dockers you will have to set the following environment variable to point the Kafka Connect Rest API.
export KAFKA_CONNECT_REST="http://myserver:myport"
➜ bin/connect-cli create hazelcast-sink < conf/hazelcast-sink.properties
name=hazelcast-sink
connector.class=com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastSinkConnector
tasks.max=1
topics=hazelcast-topic
connect.hazelcast.cluster.members=locallhost
connect.hazelcast.group.name=dev
connect.hazelcast.group.password=dev-pass
connect.hazelcast.kcql=INSERT INTO sink-test SELECT * FROM hazelcast-topic WITHFORMAT JSON
If you switch back to the terminal you started Kafka Connect in, you should see the Elastic Search Sink being accepted and the task starting.
We can use the CLI to check if the connector is up but you should be able to see this in logs as well.
#check for running connectors with the CLI
➜ bin/connect-cli ps
hazelcast-sink
INFO
__ __
/ / ____ _____ ____/ /___ ____ ____
/ / / __ `/ __ \/ __ / __ \/ __ \/ __ \
/ /___/ /_/ / / / / /_/ / /_/ / /_/ / /_/ /
/_____/\__,_/_/ /_/\__,_/\____/\____/ .___/
/_/
/ / / /___ _____ ___ / / ____/___ ______/ /_/ ___/(_)___ / /__
/ /_/ / __ `/_ / / _ \/ / / / __ `/ ___/ __/\__ \/ / __ \/ //_/
/ __ / /_/ / / /_/ __/ / /___/ /_/ (__ ) /_ ___/ / / / / / ,<
/_/ /_/\__,_/ /___/\___/_/\____/\__,_/____/\__//____/_/_/ /_/_/|_|
by Andrew Stevenson
(com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastSinkTask:41)
Members [1] {
Member [172.17.0.2]:5701
}
INFO: HazelcastClient[dev-kafka-connect-05e64989-41d9-433e-ad21-b54894486384][3.6.4] is CLIENT_CONNECTED
Test Records¶
Tip
If your input topic doesn’t match the target use Lenses SQL to transform in real-time the input, no Java or Scala required!
Now we need to put some records it to the hazelcast-topic topics. We can use the kafka-avro-console-producer
to do this.
Start the producer and pass in a schema to register in the Schema Registry. The schema has a firstname
field of type
string a lastname
field of type string, an age
field of type int and a salary
field of type double.
bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic hazelcast-topic \
--property value.schema='{"type":"record","name":"User",
"fields":[{"name":"firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"salary","type":"double"}]}'
Now the producer is waiting for input. Paste in the following:
{"firstName": "John", "lastName": "Smith", "age":30, "salary": 4830}
Check for records in HazelCast¶
Now check the logs of the connector you should see this:
[2016-08-20 16:53:58,608] INFO Received 1 records. (com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastWriter:62)
[2016-08-20 16:53:58,644] INFO Written 1 (com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastWriter:71)
Configurations¶
The Kafka Connect framework requires the following in addition to any connectors specific configurations:
Config | Description | Type | Value |
---|---|---|---|
name |
Name of the connector | string | This must be unique across the Connect cluster |
topics |
The topics to sink.
The connector will check this matchs the KCQL statement
|
string | |
tasks.max |
The number of tasks to scale output | int | 1 |
connector.class |
Name of the connector class | string | com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelcastSinkConnector |
Connector Configurations¶
Config | Description | Type |
---|---|---|
connect.hazelcast.kcql |
KCQL expression describing field selection and routes | string |
connect.hazelcast.cluster.members |
Address List is the initial list of
cluster addresses to which the client
will connect. The client uses this list to
find an alive node. Although it may be enough
to give only one address of a node in the
cluster (since all nodes communicate with each other),
it is recommended that you give the addresses for all the nodes
|
string |
connect.hazelcast.group.name |
The group name of the connector
in the target Hazelcast cluster
|
string |
Optional Configurations¶
Config | Description | Type | Default |
---|---|---|---|
connect.hazelcast.batch.size |
Specifies how many records to insert together
at one time. If the connect framework provides
fewer records when it is
calling the Sink it won’t wait to
fulfill this value but rather execute it
|
int | 1000 |
connect.hazelcast.group.password |
The password for the group name | string | dev-pass |
connect.hazelcast.timeout |
Connection timeout is the timeout value in
milliseconds for nodes to accept client
connection requests
|
int | 5000 |
connect.hazelcast.retries |
The number of times a client will
retry the connection at startup |
int | 2 |
connect.hazelcast.keep.alive |
Enables/disables the SO_KEEPALIVE socket option
|
boolean | true |
connect.hazelcast.tcp.no.delay |
Enables/disables the SO_REUSEADDR socket option
|
boolean | true |
connect.hazelcast.linger.seconds |
Enables/disables SO_LINGER with the specified
linger time in seconds
|
int | 3 |
connect.hazelcast.buffer.size |
Sets the SO_SNDBUF and SO_RCVBUF options
to the specified value in KB for this Socket |
int | 32 |
connect.hazelcast.parallel.write |
All the sink to write in parallel the records
received from Kafka on each poll.
Order of writes in not guaranteed
|
boolean | true |
connect.hazelcast.error.policy |
Specifies the action to be
taken if an error occurs while inserting the data.
There are three available options, NOOP, the error
is swallowed, THROW, the error is allowed
to propagate and retry.
For RETRY the Kafka message is redelivered up
to a maximum number of times specified by the
connect.hazelcast.max.retries option |
string | THROW |
connect.hazelcast.max.retries |
The maximum number of times a message
is retried. Only valid when the
connect.hazelcast.error.policy is set to RETRY |
string | 10 |
connect.hazelcast.retry.interval |
The interval, in milliseconds between retries,
if the sink is using
connect.hazelcast.error.policy set to RETRY |
string | 60000 |
connect.progress.enabled |
Enables the output for how many
records have been processed
|
boolean | false |
Example¶
name=hazelcast-sink
connector.class=com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastSinkConnector
tasks.max=1
topics=hazelcast-topic
connect.hazelcast.cluster.members=locallhost
connect.hazelcast.group.name=dev
connect.hazelcast.group.password=dev-pass
connect.hazelcast.kcql=INSERT INTO sink-test SELECT * FROM hazelcast-topic WITHFORMAT JSON STOREAS RING_BUFFER
connect.progress.enabled=true
Schema Evolution¶
Upstream changes to schemas are handled by Schema registry which will validate the addition and removal or fields, data type changes and if defaults are set. The Schema Registry enforces AVRO schema evolution rules. More information can be found here.
The Sink serializes either an AVRO or JSON representation of the Sink record to the target reliable topic in Hazelcast. Hazelcast is agnostic to the schema.
Kubernetes¶
Helm Charts are provided at our repo, add the repo to your Helm instance and install. We recommend using the Landscaper to manage Helm Values since typically each Connector instance has its own deployment.
Add the Helm charts to your Helm instance:
helm repo add landoop https://landoop.github.io/kafka-helm-charts/
TroubleShooting¶
Please review the FAQs and join our slack channel