Cassandra Source

Download connector Cassandra Connector for Kafka 2.1.0 Cassandra Connector for Kafka 1.0

Blog:Tuning the Cassandra -> Kafka Connector by Mike from WalmartLabs

Kafka Connect Cassandra is a Source Connector for reading data from Cassandra and writing to Kafka.

Prerequisites

  • Apache Kafka 0.11.x or above
  • Kafka Connect 0.11.x or above
  • Cassandra 3.0.9 or above
  • Java 1.8

Features

  1. The KCQL routing querying - Allows for the table to topic routing
  2. Incremental mode with timestamp, timeuuid and tokens support via kcql
  3. Bulk mode
  4. Initial offset - Allows you to select where to start from

KCQL Support

The Cassandra source supports KCQL, Kafka Connect Query Language. The following support KCQL is available:

  1. Selection of which tables to track for new records
  2. Setting bulk or incremental mode
  3. Selection of which field to track for incremental changes

Tip

You can specify multiple KCQL statements separated by ; to have a the connector sink multiple topics.

INSERT INTO kafka_topic SELECT FIELD,... FROM cassandra_table [PK FIELD] [INCREMENTALMODE=TIMESTAMP|TIMEUUID|TOKEN]

-- Select all columns from table orders and insert into a topic called orders-topic, use column created to track new rows.
-- Incremental mode set to TIMEUUID
INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID

-- Select created, product, price from table orders and insert into a topic called orders-topic, use column created to track new rows.
INSERT INTO orders-topic SELECT created, product, price FROM orders PK created.

The PK keyword identifies the column used to track deltas in the target tables. If the incremental mode is set to TOKEN this column value is wrapped inside Cassandras token function.

Note

The connector only accepts one column as the PK regardless of incremental mode used.

Modes

Incremental

In incremental mode the connector supports querying based on a column in the tables with CQL data type of Timestamp or TimeUUID. Incremental mode is set by specifying INCREMENTALMODE in the kcql statement as either TIMESTAMP, TIMEUUID or TOKEN.

Kafka Connect tracks the latest record it retrieved from each table, so it can start at the correct location on the next iteration (or in case of a crash). In this case the maximum value of the records returned by the result-set is tracked and stored in Kafka by the framework. If no offset is found for the table at startup a default timestamp of 1900-01-01 is used. This is then passed to a prepared statement containing a range query.

Tip

You can limit where the connector starts by setting the connect.cassandra.initial.offset option.

Specifying TOKEN causes the connector to wrap the values in the token function. Only a PRIMARY KEY field of type token is supported. Your Cassandra cluster must use the Byte Ordered partitioner but this it is generally not recommended due to the creation of hotspots in the cluster. However, if Byte Ordered Partitioner is not used, the KC connector will miss all of the new rows whose token(PK column) falls “behind” the token recorded as the offset. This is because Cassandra’s other partitioners don’t order the tokens.

Warning

You must use the Byte Order Partitioner for the TOKEN mode to work correctly or data will be missing from Kafka topic.

Below are examples of the types of queries the connector will issue against Cassandra:

-- for timestamp type `timeuuid`
SELECT * FROM demo.orders WHERE created > maxTimeuuid(?) AND created <= minTimeuuid(?)

-- for timestamp type as `timestamp`
SELECT * FROM demo.orders WHERE created > ? AND created <= ?

-- for token
SELECT * FROM demo.orders WHERE created > token(?) and created <= token(?)

Bulk

In bulk mode the connector extracts the full table, nowhere clause is attached to the query. Bulk mode is set when no incremental mode is present in the KCQL statement.

Warning

Watch out for the poll interval. After each interval, the bulk query will be executed again.

Lenses QuickStart

The easiest way to try out this is using Lenses Box the pre-configured docker, that comes with this connector pre-installed. You would need to Connectors –> New Connector –> Source –> Cassandra and paste your configuration

../../_images/lenses-create-cassandra-source-connector.png

Cassandra Setup

First, download and install Cassandra if you don’t have a compatible cluster available.

#make a folder for cassandra
mkdir cassandra

#Download Cassandra
wget http://apache.cs.uu.nl/cassandra/3.5/apache-cassandra-3.5-bin.tar.gz

#extract archive to cassandra folder
tar -xvf apache-cassandra-3.5-bin.tar.gz -C cassandra

#Set up environment variables
export CASSANDRA_HOME=~/cassandra/apache-cassandra-3.5-bin
export PATH=$PATH:$CASSANDRA_HOME/bin

#Start Cassandra
sudo sh ~/cassandra/bin/cassandra

Test Data

Once you have installed and started Cassandra create a table to extract records from. This snippet creates a table called orders and inserts 3 rows representing fictional orders or some options and futures on a trading platform.

Start the Cassandra cql shell and execute the following:

CREATE KEYSPACE demo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
use demo;

create table orders (id int, created timeuuid, product text, qty int, price float, PRIMARY KEY (id, created))
WITH CLUSTERING ORDER BY (created asc);

INSERT INTO orders (id, created, product, qty, price) VALUES (1, now(), 'OP-DAX-P-20150201-95.7', 100, 94.2);
INSERT INTO orders (id, created, product, qty, price) VALUES (2, now(), 'OP-DAX-C-20150201-100', 100, 99.5);
INSERT INTO orders (id, created, product, qty, price) VALUES (3, now(), 'FU-KOSPI-C-20150201-100', 200, 150);

SELECT * FROM orders;

 id | created                              | price | product                 | qty
----+--------------------------------------+-------+-------------------------+-----
  1 | 17fa1050-137e-11e6-ab60-c9fbe0223a8f |  94.2 |  OP-DAX-P-20150201-95.7 | 100
  2 | 17fb6fe0-137e-11e6-ab60-c9fbe0223a8f |  99.5 |   OP-DAX-C-20150201-100 | 100
  3 | 17fbbe00-137e-11e6-ab60-c9fbe0223a8f |   150 | FU-KOSPI-C-20150201-100 | 200

(3 rows)

Installing the Connector

Connect, in production should be run in distributed mode

  1. Install and configure a Kafka Connect cluster
  2. Create a folder on each server called plugins/lib
  3. Copy into the above folder the required connector jars from the stream reactor download
  4. Edit connect-avro-distributed.properties in the etc/schema-registry folder and uncomment the plugin.path option. Set it to the root directory i.e. plugins you deployed the stream reactor connector jars in step 2.
  5. Start Connect, bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties

Connect Workers are long running processes so set an init.d or systemctl service accordingly.

Starting the Connector (Distributed)

Download, and install Stream Reactor to your Kafka Connect cluster. Follow the instructions here if you haven’t already done so. All paths in the quickstart are based on the location you installed Stream Reactor.

Once the Connect has started we can now use the kafka-connect-tools cli to post in our distributed properties file for Cassandra. If you are using the dockers you will have to set the following environment variable too for the CLI to connect to the Kafka Connect Rest API.

export KAFKA_CONNECT_REST="http://myserver:myport"
➜  bin/connect-cli create cassandra-source-orders < conf/cassandra-source-incr.properties

name=cassandra-source-orders
connector.class=com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector
connect.cassandra.key.space=demo
connect.cassandra.kcql=INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID
connect.cassandra.contact.points=localhost
connect.cassandra.username=cassandra
connect.cassandra.password=cassandra

We can use the CLI to check if the connector is up but you should be able to see this in logs as well.

#check for running connectors with the CLI
➜ bin/connect-cli ps
cassandra-source
   INFO
    __                    __
   / /   ____ _____  ____/ /___  ____  ____
  / /   / __ `/ __ \/ __  / __ \/ __ \/ __ \
 / /___/ /_/ / / / / /_/ / /_/ / /_/ / /_/ /
/_____/\__,_/_/ /_/\__,_/\____/\____/ .___/
                                   /_/
     ______                                __           _____
    / ____/___ _______________ _____  ____/ /________ _/ ___/____  __  _______________
   / /   / __ `/ ___/ ___/ __ `/ __ \/ __  / ___/ __ `/\__ \/ __ \/ / / / ___/ ___/ _ \
  / /___/ /_/ (__  |__  ) /_/ / / / / /_/ / /  / /_/ /___/ / /_/ / /_/ / /  / /__/  __/
  \____/\__,_/____/____/\__,_/_/ /_/\__,_/_/   \__,_//____/\____/\__,_/_/   \___/\___/

   By Andrew Stevenson. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceTask:64)

If you switch back to the terminal you started the Connector in you should see the Cassandra Source being accepted and the task starting and processing the 3 existing rows.

INFO Source task Thread[WorkerSourceTask-cassandra-source-orders-0,5,main] finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:342)
INFO Query SELECT * FROM demo.orders WHERE created > maxTimeuuid(?) AND created <= minTimeuuid(?)  ALLOW FILTERING executing with bindings (2016-05-06 09:23:28+0200, 2016-05-06 13:44:33+0200). (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:156)
INFO Querying returning results for demo.orders. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:185)
INFO Processed 3 rows for table orders-topic.orders (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:206)
INFO Found 3. Draining entries to batchSize 100. (com.datamountaineer.streamreactor.connect.queues.QueueHelpers$:45)
INFO Found 0. Draining entries to batchSize 100. (com.datamountaineer.streamreactor.connect.queues.QueueHelpers$:45)

Check Kafka, 3 rows as before.

➜ bin/kafka-avro-console-consumer \
--zookeeper localhost:2181 \
--topic orders-topic \
--from-beginning
{"id":{"int":1},"created":{"string":"Thu May 05 13:24:22 CEST 2016"},"price":{"float":94.2},"product":{"string":"DAX-P-20150201-95.7"},"qty":{"int":100}}
{"id":{"int":2},"created":{"string":"Thu May 05 13:26:21 CEST 2016"},"price":{"float":99.5},"product":{"string":"OP-DAX-C-20150201-100"},"qty":{"int":100}}
{"id":{"int":3},"created":{"string":"Thu May 05 13:26:44 CEST 2016"},"price":{"float":150.0},"product":{"string":"FU-KOSPI-C-20150201-100"},"qty":{"int":200}}

The Source tasks will continue to poll but not pick up any new rows yet.

Inserting New Data

Now let’s insert a row into the Cassandra table. Start the CQL shell and execute the following:

use demo;

INSERT INTO orders (id, created, product, qty, price) VALUES (4, now(), 'FU-DATAMOUNTAINEER-C-20150201-100', 500, 10000);

SELECT * FROM orders;

 id | created                              | price | product                           | qty
----+--------------------------------------+-------+-----------------------------------+-----
  1 | 17fa1050-137e-11e6-ab60-c9fbe0223a8f |  94.2 |            OP-DAX-P-20150201-95.7 | 100
  2 | 17fb6fe0-137e-11e6-ab60-c9fbe0223a8f |  99.5 |             OP-DAX-C-20150201-100 | 100
  4 | 02acf5d0-1380-11e6-ab60-c9fbe0223a8f | 10000 | FU-DATAMOUNTAINEER-C-20150201-100 | 500
  3 | 17fbbe00-137e-11e6-ab60-c9fbe0223a8f |   150 |           FU-KOSPI-C-20150201-100 | 200

(4 rows)
cqlsh:demo>

Check the logs.

INFO Query SELECT * FROM demo.orders WHERE created > maxTimeuuid(?) AND created <= minTimeuuid(?)  ALLOW FILTERING executing with bindings (2016-05-06 13:31:37+0200, 2016-05-06 13:45:33+0200). (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:156)
INFO Querying returning results for demo.orders. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:185)
INFO Processed 1 rows for table orders-topic.orders (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:206)
INFO Found 0. Draining entries to batchSize 100. (com.datamountaineer.streamreactor.connect.queues.QueueHelpers$:45)

Check Kafka.

➜  bin/kafka-avro-console-consumer \
--zookeeper localhost:2181 \
--topic orders-topic \
--from-beginning

{"id":{"int":1},"created":{"string":"17fa1050-137e-11e6-ab60-c9fbe0223a8f"},"price":{"float":94.2},"product":{"string":"OP-DAX-P-20150201-95.7"},"qty":{"int":100}}
{"id":{"int":2},"created":{"string":"17fb6fe0-137e-11e6-ab60-c9fbe0223a8f"},"price":{"float":99.5},"product":{"string":"OP-DAX-C-20150201-100"},"qty":{"int":100}}
{"id":{"int":3},"created":{"string":"17fbbe00-137e-11e6-ab60-c9fbe0223a8f"},"price":{"float":150.0},"product":{"string":"FU-KOSPI-C-20150201-100"},"qty":{"int":200}}
{"id":{"int":4},"created":{"string":"02acf5d0-1380-11e6-ab60-c9fbe0223a8f"},"price":{"float":10000.0},"product":{"string":"FU-DATAMOUNTAINEER-C-20150201-100"},"qty":{"int":500}}

Bingo, we have our extra row.

Data Types

The Source connector supports copying tables in bulk and incrementally to Kafka.

The following CQL data types are supported:

CQL Type Connect Data Type
TimeUUID Optional String
UUID Optional String
Inet Optional String
Ascii Optional String
Text Optional String
Timestamp Optional String
Date Optional String
Tuple Optional String
UDT Optional String
Boolean Optional Boolean
TinyInt Optional Int8
SmallInt Optional Int16
Int Optional Int32
Decimal Optional String
Float Optional Float32
Counter Optional Int64
BigInt Optional Int64
VarInt Optional Int64
Double Optional Int64
Time Optional Int64
Blob Optional Bytes
Map Optional [String | MAP]
List Optional [String | ARRAY]
Set Optional [String | ARRAY]

Note

For Map, List and Set the value is extracted from the Cassandra Row and inserted as a JSON string representation by default. If you need to use this types as objects. Add “connect.cassandra.mapping.collection.to.json=false” configuration.

Configurations

The Kafka Connect framework requires the following in addition to any connectors specific configurations:

Config Description Type Value
name Name of the connector string  
tasks.max The number of tasks to scale output int 1
connector.class Name of the connector class string com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector
Config Description Type
connect.cassandra.contact.points Contact points (hosts) in Cassandra cluster string
connect.cassandra.key.space Key space the tables to write belong to string
connect.cassandra.username Username to connect to Cassandra with string
connect.cassandra.password Password to connect to Cassandra with string
connect.cassandra.import.mode
Either bulk or incremental
WATCH OUT WITH BULK MODE AS MAY REPEATEDLY PULL IN THE SAME DATA
string
connect.cassandra.kcq
Kafka connect query language expression. Allows for expressive table
to topic routing, field selection and renaming. In incremental mode
the timestampColumn can be specified by PK colName
Examples:
INSERT INTO TOPIC1 SELECT * FROM TOPIC1 PK myTimeUUICol
The timestamp column must be of CQL Type TimeUUID
string

Optional Configurations

Config Description Type Default
connect.cassandra.port Port for the native Java driver. Default = 9042 int  
connect.cassandra.ssl.enabled
Enables SSL communication against SSL enable Cassandra cluster.
boolean false
connect.cassandra.key.store.path Path to the client Key Store. string  
connect.cassandra.trust.store.password Password for truststore string  
connect.cassandra.trust.store.path Path to the client Trust Store. string  
connect.cassandra.trust.store.type Type of the Trust Store, defaults to JKS   JKS
connect.cassandra.ssl.client.cert.auth Enable client certification authentication by Cassandra. Requires KeyStore options to be set. string false
connect.cassandra.key.store.password Password for the client Key Store string  
connect.cassandra.key.store.type Type of the Key Store, defaults to JKS   JKS
connect.cassandra.import.poll.interval
The polling interval between queries against tables in milliseconds.
int 1 second
connect.cassandra.task.buffer.size
The size of the queue for buffering resultset records before write to Kafka.
int 10000
connect.cassandra.batch.size
The number of records the Source task should drain from the reader queue
int 1000
connect.cassandra.fetch.size The max number of rows the Cassandra driver will fetch at one time int 5000
connect.cassandra.slice.duration
Duration in milliseconds to query for in target Cassandra table. Used to restrict query timestamp span.
long 10000
connect.cassandra.initial.offset
The initial timestamp to start querying in Cassandra from (yyyy-MM-dd HH:mm:ss.SSS’Z’).
string 1900-01-01 00:00:00.0000000Z
connect.progress.enabled Enables the output for how many records have been processed. boolean false
connect.cassandra.consistency.level
Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas.
Cassandra offers tunable consistency. For any given read or write operation,
the client application decides how consistent the requested data must be.
Please refer the to Cassandra documentation for further details
string ONE
connect.cassandra.mapping.collection.to.json Mapping columns with type Map, List and Set like json boolean true
connect.cassandra.default.value
By default a column omitted from the JSON map will be set to NULL.
Alternatively, if set UNSET, pre-existing value will be preserved.
string  
connect.cassandra.slice.delay.ms A delay between the current time and the time range of the query. Int 3000
connect.cassandra.time.slice.ms The range of time in milliseconds the source task the timestamp/timeuuid will use for query Int 10000
connect.cassandra.import.allow.filtering Enable ALLOW FILTERING in incremental selects. boolean true
connect.cassandra.assigned.tables The tables a task has been assigned.    
connect.cassandra.delete.enabled Enables row deletion from cassandra boolean false
connect.cassandra.delete.statement
Delete statement for cassandra
If connect.cassandra.delete.enabled is true, connect.cassandra.delete.statement is required.
   
connect.cassandra.delete.struct_flds
Fields in the key struct data type used in there delete statement.
Comma-separated in the order they are found in connect.cassandra.delete.statement.
Keep default value to use the record key as a primitive type.
   

Bulk Example

name=cassandra-source-orders-bulk
connector.class=com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector
connect.cassandra.key.space=demo
connect.cassandra.kcql=INSERT INTO TABLE_X SELECT * FROM TOPIC_Y
connect.cassandra.contact.points=localhost
connect.cassandra.username=cassandra
connect.cassandra.password=cassandra

Incremental Example

name=cassandra-source-orders-incremental
connector.class=com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector
connect.cassandra.key.space=demo
connect.cassandra.kcql=INSERT INTO TABLE_X SELECT * FROM TOPIC_Y PK created INCREMENTALMODE=TIMEUUID
connect.cassandra.contact.points=localhost
connect.cassandra.username=cassandra
connect.cassandra.password=cassandra

Schema Evolution

Upstream changes to schemas are handled by Schema registry which will validate the addition and removal or fields, data type changes and if defaults are set. The Schema Registry enforces AVRO schema evolution rules. More information can be found here.

For the Source connector, at present no column selection is handled, every column from the table is queried to column additions and deletions are handled in accordance with the compatibility mode of the Schema Registry.

Future releases will support tables auto-creation and adding columns on changes to the topic schema.

Kubernetes

Helm Charts are provided at our repo, add the repo to your Helm instance and install. We recommend using the Landscaper to manage Helm Values since typically each Connector instance has its own deployment.

Add the Helm charts to your Helm instance:

helm repo add landoop https://landoop.github.io/kafka-helm-charts/

TroubleShooting

Please review the FAQs and join our slack channel