Cassandra


A Kafka Connect sink connector for writing records from Kafka to Cassandra.

Requires:

  • Cassandra 2.2.4+ if your are on version 2.
  • or 3.0.1+ if you are on version 3.

KCQL support 

The following KCQL is supported:

INSERT INTO <your-cassandra-table>
SELECT FIELD,...
FROM <your-table>
[TTL=Time to live]

Examples:

-- Insert mode, select all fields from topicA and
-- write to tableA
INSERT INTO tableA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB
-- and write to tableB
INSERT INTO tableB SELECT x AS a, y, c FROM topicB

-- Insert mode, select 3 fields and rename from topicB
-- and write to tableB with TTL
INSERT INTO tableB SELECT x, y FROM topicB TTL=100000

Concepts 

The connector convert the value of Kafka messages to JSON and uses the Cassandra JSON insert feature to write records.

Deletion in Cassandra 

Compacted topics in Kafka retain the last message per key. Deletion in Kafka occurs by tombstoning. If compaction is enabled on the topic and a message is sent with a null payload, Kafka flags this record for delete and is compacted/removed from the topic.

Deletion in Cassandra is supported based on fields in the key of messages with a empty/null payload. A Cassandra delete statement must be provided which specifies the Cassandra CQL delete statement and with parameters to bind field values from the key to, for example, with the delete statement of:

DELETE FROM orders WHERE id = ? and product = ?

If a message was received with a empty/null value and key fields key.id and key.product the final bound Cassandra statement would be:

# Message
# "{ "key": { "id" : 999, "product" : "DATAMOUNTAINEER" }, "value" : null }"
# DELETE FROM orders WHERE id = 999 and product = "DATAMOUNTAINEER"

# connect.cassandra.delete.enabled=true
# connect.cassandra.delete.statement=DELETE FROM orders WHERE id = ? and product = ?
# connect.cassandra.delete.struct_flds=id,product

Deletion will only occur if a message with an empty payload is received from Kafka.

Kafka payload support 

This sink supports the following Kafka payloads:

  • Schema.Struct and Struct (Avro)
  • Schema.Struct and JSON
  • No Schema and JSON

See connect payloads for more information.

Error polices 

The connector supports Error polices.

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=cassandra
docker-compose up -d cassandra

Preparing the target system 

Log in into your Cassandra container and create the target table:

docker exec -ti cassandra cqlsh

CREATE KEYSPACE demo
WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3};

USE demo;

CREATE TABLE orders (
    id INT
    , created VARCHAR
    , product VARCHAR
    , qty INT
    , price FLOAT
    , PRIMARY KEY (id, created)
    )
WITH CLUSTERING ORDER BY (created ASC);

Start the connector 

If you are using Lenses, log in into Lenses and navigate to the connectors page, select Cassandra as the sink and paste the following:

name=cassandra-sink
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=orders
connect.cassandra.kcql=INSERT INTO orders SELECT * FROM orders
connect.cassandra.port=9042
connect.cassandra.key.space=demo
connect.cassandra.contact.points=cassandra

To start the connector using the command line, log into the lenses-box container:


docker exec -ti lenses-box /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create cassandra-sink < connector.properties

Wait for the connector to start and check it’s running:

connect-cli status cassandra-sink

Inserting test data 

In the lenses-box container start the kafka producer shell:


kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'

the console is now waiting for your input, enter the following:


{
  "id": 1,
  "created": "2016-05-06 13:53:00",
  "product": "OP-DAX-P-20150201-95.7",
  "price": 94.2,
  "qty": 100
}

Check for data in Cassandra 

In the cassandra container run:

SELECT * FROM orders;

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.cassandra.contact.pointsInitial contact point host for Cassandra including port.stringlocalhost
connect.cassandra.portCassandra native port.int9042
connect.cassandra.key.spaceKeyspace to write to.string
connect.cassandra.usernameUsername to connect to Cassandra with.string
connect.cassandra.passwordPassword for the username to connect to Cassandra with.password
connect.cassandra.ssl.enabledSecure Cassandra driver connection via SSL.booleanfalse
connect.cassandra.trust.store.pathPath to the client Trust Store.string
connect.cassandra.trust.store.passwordPassword for the client Trust Store.password
connect.cassandra.trust.store.typeType of the Trust Store, defaults to JKSstringJKS
connect.cassandra.key.store.typeType of the Key Store, defauts to JKSstringJKS
connect.cassandra.ssl.client.cert.authEnable client certification authentication by Cassandra. Requires KeyStore options to be set.booleanfalse
connect.cassandra.key.store.pathPath to the client Key Store.string
connect.cassandra.key.store.passwordPassword for the client Key Storepassword
connect.cassandra.consistency.levelConsistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas. Cassandra offers tunable consistency. For any given read or write operation, the client application decides how consistent the requested data must be.string
connect.cassandra.fetch.sizeThe number of records the Cassandra driver will return at once.int5000
connect.cassandra.load.balancing.policyCassandra Load balancing policy. ROUND_ROBIN, TOKEN_AWARE, LATENCY_AWARE or DC_AWARE_ROUND_ROBIN. TOKEN_AWARE and LATENCY_AWARE use DC_AWARE_ROUND_ROBINstringTOKEN_AWARE
connect.cassandra.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them.stringTHROW
connect.cassandra.max.retriesThe maximum number of times to try the write again.int20
connect.cassandra.retry.intervalThe time in milliseconds between retries.int60000
connect.cassandra.threadpool.sizeThe sink inserts all the data concurrently. To fail fast in case of an error, the sink has its own thread pool. Set the value to zero and the threadpool will default to 4* NO_OF_CPUs. Set a value greater than 0 and that would be the size of this threadpool.int0
connect.cassandra.delete.struct_fldsFields in the key struct data type used in there delete statement. Comma-separated in the order they are found in connect.cassandra.delete.statement. Keep default value to use the record key as a primitive type.list[]
connect.cassandra.delete.statementDelete statement for cassandrastring
connect.cassandra.kcqlKCQL expression describing field selection and routes.string
connect.cassandra.default.valueBy default a column omitted from the JSON map will be set to NULL. Alternatively, if set UNSET, pre-existing value will be preserved.string
connect.cassandra.delete.enabledEnables row deletion from cassandrabooleanfalse
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
--
Last modified: November 18, 2024