A Kafka Connect sink connector for writing records from Kafka to Cassandra.
Requires:
The following KCQL is supported:
INSERT INTO <your-cassandra-table> SELECT FIELD,... FROM <your-table> [TTL=Time to live]
Examples:
-- Insert mode, select all fields from topicA and -- write to tableA INSERT INTO tableA SELECT * FROM topicA -- Insert mode, select 3 fields and rename from topicB -- and write to tableB INSERT INTO tableB SELECT x AS a, y, c FROM topicB -- Insert mode, select 3 fields and rename from topicB -- and write to tableB with TTL INSERT INTO tableB SELECT x, y FROM topicB TTL=100000
The connector convert the value of Kafka messages to JSON and uses the Cassandra JSON insert feature to write records.
Compacted topics in Kafka retain the last message per key. Deletion in Kafka occurs by tombstoning. If compaction is enabled on the topic and a message is sent with a null payload, Kafka flags this record for delete and is compacted/removed from the topic.
Deletion in Cassandra is supported based on fields in the key of messages with a empty/null payload. A Cassandra delete statement must be provided which specifies the Cassandra CQL delete statement and with parameters to bind field values from the key to, for example, with the delete statement of:
DELETE FROM orders WHERE id = ? and product = ?
If a message was received with a empty/null value and key fields key.id and key.product the final bound Cassandra statement would be:
# Message # "{ "key": { "id" : 999, "product" : "DATAMOUNTAINEER" }, "value" : null }" # DELETE FROM orders WHERE id = 999 and product = "DATAMOUNTAINEER" # connect.cassandra.delete.enabled=true # connect.cassandra.delete.statement=DELETE FROM orders WHERE id = ? and product = ? # connect.cassandra.delete.struct_flds=id,product
Deletion will only occur if a message with an empty payload is received from Kafka.
This sink supports the following Kafka payloads:
See connect payloads for more information.
The connector supports Error polices.
export CONNECTOR=cassandra docker-compose up -d cassandra
Log in into your Cassandra container and create the target table:
docker exec -ti cassandra cqlsh
CREATE KEYSPACE demo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3}; USE demo; CREATE TABLE orders ( id INT , created VARCHAR , product VARCHAR , qty INT , price FLOAT , PRIMARY KEY (id, created) ) WITH CLUSTERING ORDER BY (created ASC);
If you are using Lenses, log in into Lenses and navigate to the connectors page, select Cassandra as the sink and paste the following:
name=cassandra-sink connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector tasks.max=1 topics=orders connect.cassandra.kcql=INSERT INTO orders SELECT * FROM orders connect.cassandra.port=9042 connect.cassandra.key.space=demo connect.cassandra.contact.points=cassandra
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create cassandra-sink < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status cassandra-sink
In the lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{ "id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty": 100 }
In the cassandra container run:
SELECT * FROM orders;
Bring down the stack:
docker-compose down
On this page