Cassandra Source¶
Download connector Cassandra Connector for Kafka 2.1.0 Cassandra Connector for Kafka 1.0
Blog: | Tuning the Cassandra -> Kafka Connector by Mike from WalmartLabs |
---|
Kafka Connect Cassandra is a Source Connector for reading data from Cassandra and writing to Kafka.
Prerequisites¶
- Apache Kafka 0.11.x or above
- Kafka Connect 0.11.x or above
- Cassandra 3.0.9 or above
- Java 1.8
Features¶
- The KCQL routing querying - Allows for the table to topic routing
- Incremental mode with timestamp, timeuuid and tokens support via kcql
- Bulk mode
- Initial offset - Allows you to select where to start from
KCQL Support¶
The Cassandra source supports KCQL, Kafka Connect Query Language. The following support KCQL is available:
- Selection of which tables to track for new records
- Setting bulk or incremental mode
- Selection of which field to track for incremental changes
Tip
You can specify multiple KCQL statements separated by ;
to have a the connector sink multiple topics.
INSERT INTO kafka_topic SELECT FIELD,... FROM cassandra_table [PK FIELD] [INCREMENTALMODE=TIMESTAMP|TIMEUUID|TOKEN]
-- Select all columns from table orders and insert into a topic called orders-topic, use column created to track new rows.
-- Incremental mode set to TIMEUUID
INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID
-- Select created, product, price from table orders and insert into a topic called orders-topic, use column created to track new rows.
INSERT INTO orders-topic SELECT created, product, price FROM orders PK created.
The PK
keyword identifies the column used to track deltas in the target tables. If the incremental mode is set to TOKEN
this column
value is wrapped inside Cassandras token function.
Note
The connector only accepts one column as the PK regardless of incremental mode used.
Modes¶
Incremental¶
In incremental
mode the connector supports querying based on a column in the tables with CQL data type of Timestamp or TimeUUID.
Incremental mode is set by specifying INCREMENTALMODE
in the kcql
statement as either TIMESTAMP
, TIMEUUID
or TOKEN
.
Kafka Connect tracks the latest record it retrieved from each table, so it can start at the correct location on the next iteration (or in case of a crash). In this case the maximum value of the records returned by the result-set is tracked and stored in Kafka by the framework. If no offset is found for the table at startup a default timestamp of 1900-01-01 is used. This is then passed to a prepared statement containing a range query.
Tip
You can limit where the connector starts by setting the connect.cassandra.initial.offset
option.
Specifying TOKEN causes the connector to wrap the values in the token function. Only a PRIMARY KEY field of type token is supported. Your Cassandra cluster must use the Byte Ordered partitioner but this it is generally not recommended due to the creation of hotspots in the cluster. However, if Byte Ordered Partitioner is not used, the KC connector will miss all of the new rows whose token(PK column) falls “behind” the token recorded as the offset. This is because Cassandra’s other partitioners don’t order the tokens.
Warning
You must use the Byte Order Partitioner for the TOKEN mode to work correctly or data will be missing from Kafka topic.
Below are examples of the types of queries the connector will issue against Cassandra:
-- for timestamp type `timeuuid`
SELECT * FROM demo.orders WHERE created > maxTimeuuid(?) AND created <= minTimeuuid(?)
-- for timestamp type as `timestamp`
SELECT * FROM demo.orders WHERE created > ? AND created <= ?
-- for token
SELECT * FROM demo.orders WHERE created > token(?) and created <= token(?)
Bulk¶
In bulk
mode the connector extracts the full table, nowhere clause is attached to the query. Bulk mode is set when no incremental mode
is present in the KCQL statement.
Warning
Watch out for the poll interval. After each interval, the bulk query will be executed again.
Lenses QuickStart¶
The easiest way to try out this is using Lenses Box the pre-configured docker, that comes with this connector pre-installed. You would need to Connectors –> New Connector –> Source –> Cassandra and paste your configuration
Cassandra Setup¶
First, download and install Cassandra if you don’t have a compatible cluster available.
#make a folder for cassandra
mkdir cassandra
#Download Cassandra
wget http://apache.cs.uu.nl/cassandra/3.5/apache-cassandra-3.5-bin.tar.gz
#extract archive to cassandra folder
tar -xvf apache-cassandra-3.5-bin.tar.gz -C cassandra
#Set up environment variables
export CASSANDRA_HOME=~/cassandra/apache-cassandra-3.5-bin
export PATH=$PATH:$CASSANDRA_HOME/bin
#Start Cassandra
sudo sh ~/cassandra/bin/cassandra
Test Data¶
Once you have installed and started Cassandra create a table to extract records from. This snippet creates a table called orders and inserts 3 rows representing fictional orders or some options and futures on a trading platform.
Start the Cassandra cql shell and execute the following:
CREATE KEYSPACE demo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
use demo;
create table orders (id int, created timeuuid, product text, qty int, price float, PRIMARY KEY (id, created))
WITH CLUSTERING ORDER BY (created asc);
INSERT INTO orders (id, created, product, qty, price) VALUES (1, now(), 'OP-DAX-P-20150201-95.7', 100, 94.2);
INSERT INTO orders (id, created, product, qty, price) VALUES (2, now(), 'OP-DAX-C-20150201-100', 100, 99.5);
INSERT INTO orders (id, created, product, qty, price) VALUES (3, now(), 'FU-KOSPI-C-20150201-100', 200, 150);
SELECT * FROM orders;
id | created | price | product | qty
----+--------------------------------------+-------+-------------------------+-----
1 | 17fa1050-137e-11e6-ab60-c9fbe0223a8f | 94.2 | OP-DAX-P-20150201-95.7 | 100
2 | 17fb6fe0-137e-11e6-ab60-c9fbe0223a8f | 99.5 | OP-DAX-C-20150201-100 | 100
3 | 17fbbe00-137e-11e6-ab60-c9fbe0223a8f | 150 | FU-KOSPI-C-20150201-100 | 200
(3 rows)
Installing the Connector¶
Connect, in production should be run in distributed mode
- Install and configure a Kafka Connect cluster
- Create a folder on each server called
plugins/lib
- Copy into the above folder the required connector jars from the stream reactor download
- Edit
connect-avro-distributed.properties
in theetc/schema-registry
folder and uncomment theplugin.path
option. Set it to the root directory i.e. plugins you deployed the stream reactor connector jars in step 2. - Start Connect,
bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
Connect Workers are long running processes so set an init.d
or systemctl
service accordingly.
Starting the Connector (Distributed)¶
Download, and install Stream Reactor to your Kafka Connect cluster. Follow the instructions here if you haven’t already done so. All paths in the quickstart are based on the location you installed Stream Reactor.
Once the Connect has started we can now use the kafka-connect-tools cli to post in our distributed properties file for Cassandra. If you are using the dockers you will have to set the following environment variable too for the CLI to connect to the Kafka Connect Rest API.
export KAFKA_CONNECT_REST="http://myserver:myport"
➜ bin/connect-cli create cassandra-source-orders < conf/cassandra-source-incr.properties
name=cassandra-source-orders
connector.class=com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector
connect.cassandra.key.space=demo
connect.cassandra.kcql=INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID
connect.cassandra.contact.points=localhost
connect.cassandra.username=cassandra
connect.cassandra.password=cassandra
We can use the CLI to check if the connector is up but you should be able to see this in logs as well.
#check for running connectors with the CLI
➜ bin/connect-cli ps
cassandra-source
INFO
__ __
/ / ____ _____ ____/ /___ ____ ____
/ / / __ `/ __ \/ __ / __ \/ __ \/ __ \
/ /___/ /_/ / / / / /_/ / /_/ / /_/ / /_/ /
/_____/\__,_/_/ /_/\__,_/\____/\____/ .___/
/_/
______ __ _____
/ ____/___ _______________ _____ ____/ /________ _/ ___/____ __ _______________
/ / / __ `/ ___/ ___/ __ `/ __ \/ __ / ___/ __ `/\__ \/ __ \/ / / / ___/ ___/ _ \
/ /___/ /_/ (__ |__ ) /_/ / / / / /_/ / / / /_/ /___/ / /_/ / /_/ / / / /__/ __/
\____/\__,_/____/____/\__,_/_/ /_/\__,_/_/ \__,_//____/\____/\__,_/_/ \___/\___/
By Andrew Stevenson. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceTask:64)
If you switch back to the terminal you started the Connector in you should see the Cassandra Source being accepted and the task starting and processing the 3 existing rows.
INFO Source task Thread[WorkerSourceTask-cassandra-source-orders-0,5,main] finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:342)
INFO Query SELECT * FROM demo.orders WHERE created > maxTimeuuid(?) AND created <= minTimeuuid(?) ALLOW FILTERING executing with bindings (2016-05-06 09:23:28+0200, 2016-05-06 13:44:33+0200). (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:156)
INFO Querying returning results for demo.orders. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:185)
INFO Processed 3 rows for table orders-topic.orders (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:206)
INFO Found 3. Draining entries to batchSize 100. (com.datamountaineer.streamreactor.connect.queues.QueueHelpers$:45)
INFO Found 0. Draining entries to batchSize 100. (com.datamountaineer.streamreactor.connect.queues.QueueHelpers$:45)
Check Kafka, 3 rows as before.
➜ bin/kafka-avro-console-consumer \
--zookeeper localhost:2181 \
--topic orders-topic \
--from-beginning
{"id":{"int":1},"created":{"string":"Thu May 05 13:24:22 CEST 2016"},"price":{"float":94.2},"product":{"string":"DAX-P-20150201-95.7"},"qty":{"int":100}}
{"id":{"int":2},"created":{"string":"Thu May 05 13:26:21 CEST 2016"},"price":{"float":99.5},"product":{"string":"OP-DAX-C-20150201-100"},"qty":{"int":100}}
{"id":{"int":3},"created":{"string":"Thu May 05 13:26:44 CEST 2016"},"price":{"float":150.0},"product":{"string":"FU-KOSPI-C-20150201-100"},"qty":{"int":200}}
The Source tasks will continue to poll but not pick up any new rows yet.
Inserting New Data¶
Now let’s insert a row into the Cassandra table. Start the CQL shell and execute the following:
use demo;
INSERT INTO orders (id, created, product, qty, price) VALUES (4, now(), 'FU-DATAMOUNTAINEER-C-20150201-100', 500, 10000);
SELECT * FROM orders;
id | created | price | product | qty
----+--------------------------------------+-------+-----------------------------------+-----
1 | 17fa1050-137e-11e6-ab60-c9fbe0223a8f | 94.2 | OP-DAX-P-20150201-95.7 | 100
2 | 17fb6fe0-137e-11e6-ab60-c9fbe0223a8f | 99.5 | OP-DAX-C-20150201-100 | 100
4 | 02acf5d0-1380-11e6-ab60-c9fbe0223a8f | 10000 | FU-DATAMOUNTAINEER-C-20150201-100 | 500
3 | 17fbbe00-137e-11e6-ab60-c9fbe0223a8f | 150 | FU-KOSPI-C-20150201-100 | 200
(4 rows)
cqlsh:demo>
Check the logs.
INFO Query SELECT * FROM demo.orders WHERE created > maxTimeuuid(?) AND created <= minTimeuuid(?) ALLOW FILTERING executing with bindings (2016-05-06 13:31:37+0200, 2016-05-06 13:45:33+0200). (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:156)
INFO Querying returning results for demo.orders. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:185)
INFO Processed 1 rows for table orders-topic.orders (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:206)
INFO Found 0. Draining entries to batchSize 100. (com.datamountaineer.streamreactor.connect.queues.QueueHelpers$:45)
Check Kafka.
➜ bin/kafka-avro-console-consumer \
--zookeeper localhost:2181 \
--topic orders-topic \
--from-beginning
{"id":{"int":1},"created":{"string":"17fa1050-137e-11e6-ab60-c9fbe0223a8f"},"price":{"float":94.2},"product":{"string":"OP-DAX-P-20150201-95.7"},"qty":{"int":100}}
{"id":{"int":2},"created":{"string":"17fb6fe0-137e-11e6-ab60-c9fbe0223a8f"},"price":{"float":99.5},"product":{"string":"OP-DAX-C-20150201-100"},"qty":{"int":100}}
{"id":{"int":3},"created":{"string":"17fbbe00-137e-11e6-ab60-c9fbe0223a8f"},"price":{"float":150.0},"product":{"string":"FU-KOSPI-C-20150201-100"},"qty":{"int":200}}
{"id":{"int":4},"created":{"string":"02acf5d0-1380-11e6-ab60-c9fbe0223a8f"},"price":{"float":10000.0},"product":{"string":"FU-DATAMOUNTAINEER-C-20150201-100"},"qty":{"int":500}}
Bingo, we have our extra row.
Data Types¶
The Source connector supports copying tables in bulk and incrementally to Kafka.
The following CQL data types are supported:
CQL Type | Connect Data Type |
---|---|
TimeUUID | Optional String |
UUID | Optional String |
Inet | Optional String |
Ascii | Optional String |
Text | Optional String |
Timestamp | Optional String |
Date | Optional String |
Tuple | Optional String |
UDT | Optional String |
Boolean | Optional Boolean |
TinyInt | Optional Int8 |
SmallInt | Optional Int16 |
Int | Optional Int32 |
Decimal | Optional String |
Float | Optional Float32 |
Counter | Optional Int64 |
BigInt | Optional Int64 |
VarInt | Optional Int64 |
Double | Optional Int64 |
Time | Optional Int64 |
Blob | Optional Bytes |
Map | Optional [String | MAP] |
List | Optional [String | ARRAY] |
Set | Optional [String | ARRAY] |
Note
For Map, List and Set the value is extracted from the Cassandra Row and inserted as a JSON string representation by default. If you need to use this types as objects. Add “connect.cassandra.mapping.collection.to.json=false” configuration.
Configurations¶
The Kafka Connect framework requires the following in addition to any connectors specific configurations:
Config | Description | Type | Value |
---|---|---|---|
name |
Name of the connector | string | |
tasks.max |
The number of tasks to scale output | int | 1 |
connector.class |
Name of the connector class | string | com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector |
Config | Description | Type |
---|---|---|
connect.cassandra.contact.points |
Contact points (hosts) in Cassandra cluster | string |
connect.cassandra.key.space |
Key space the tables to write belong to | string |
connect.cassandra.username |
Username to connect to Cassandra with | string |
connect.cassandra.password |
Password to connect to Cassandra with | string |
connect.cassandra.import.mode |
Either bulk or incremental
WATCH OUT WITH BULK MODE AS MAY REPEATEDLY PULL IN THE SAME DATA
|
string |
connect.cassandra.kcq |
Kafka connect query language expression. Allows for expressive table
to topic routing, field selection and renaming. In incremental mode
the timestampColumn can be specified by
PK colName Examples:
INSERT INTO TOPIC1 SELECT * FROM TOPIC1 PK myTimeUUICol The timestamp column must be of CQL Type TimeUUID
|
string |
Optional Configurations¶
Config | Description | Type | Default |
---|---|---|---|
connect.cassandra.port |
Port for the native Java driver. Default = 9042 | int | |
connect.cassandra.ssl.enabled |
Enables SSL communication against SSL enable Cassandra cluster.
|
boolean | false |
connect.cassandra.key.store.path |
Path to the client Key Store. | string | |
connect.cassandra.trust.store.password |
Password for truststore | string | |
connect.cassandra.trust.store.path |
Path to the client Trust Store. | string | |
connect.cassandra.trust.store.type |
Type of the Trust Store, defaults to JKS | JKS | |
connect.cassandra.ssl.client.cert.auth |
Enable client certification authentication by Cassandra. Requires KeyStore options to be set. | string | false |
connect.cassandra.key.store.password |
Password for the client Key Store | string | |
connect.cassandra.key.store.type |
Type of the Key Store, defaults to JKS | JKS | |
connect.cassandra.import.poll.interval |
The polling interval between queries against tables in milliseconds.
|
int | 1 second |
connect.cassandra.task.buffer.size |
The size of the queue for buffering resultset records before write to Kafka.
|
int | 10000 |
connect.cassandra.batch.size |
The number of records the Source task should drain from the reader queue
|
int | 1000 |
connect.cassandra.fetch.size |
The max number of rows the Cassandra driver will fetch at one time | int | 5000 |
connect.cassandra.slice.duration |
Duration in milliseconds to query for in target Cassandra table. Used to restrict query timestamp span.
|
long | 10000 |
connect.cassandra.initial.offset |
The initial timestamp to start querying in Cassandra from (yyyy-MM-dd HH:mm:ss.SSS’Z’).
|
string | 1900-01-01 00:00:00.0000000Z |
connect.progress.enabled |
Enables the output for how many records have been processed. | boolean | false |
connect.cassandra.consistency.level |
Consistency refers to how up-to-date and synchronized a row of Cassandra data is on all of its replicas.
Cassandra offers tunable consistency. For any given read or write operation,
the client application decides how consistent the requested data must be.
Please refer the to Cassandra documentation for further details
|
string | ONE |
connect.cassandra.mapping.collection.to.json |
Mapping columns with type Map, List and Set like json | boolean | true |
connect.cassandra.default.value |
By default a column omitted from the JSON map will be set to NULL.
Alternatively, if set UNSET, pre-existing value will be preserved.
|
string | |
connect.cassandra.slice.delay.ms |
A delay between the current time and the time range of the query. | Int | 3000 |
connect.cassandra.time.slice.ms |
The range of time in milliseconds the source task the timestamp/timeuuid will use for query | Int | 10000 |
connect.cassandra.import.allow.filtering |
Enable ALLOW FILTERING in incremental selects. | boolean | true |
connect.cassandra.assigned.tables |
The tables a task has been assigned. | ||
connect.cassandra.delete.enabled |
Enables row deletion from cassandra | boolean | false |
connect.cassandra.delete.statement |
Delete statement for cassandra
If connect.cassandra.delete.enabled is true, connect.cassandra.delete.statement is required.
|
||
connect.cassandra.delete.struct_flds |
Fields in the key struct data type used in there delete statement.
Comma-separated in the order they are found in connect.cassandra.delete.statement.
Keep default value to use the record key as a primitive type.
|
Bulk Example¶
name=cassandra-source-orders-bulk
connector.class=com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector
connect.cassandra.key.space=demo
connect.cassandra.kcql=INSERT INTO TABLE_X SELECT * FROM TOPIC_Y
connect.cassandra.contact.points=localhost
connect.cassandra.username=cassandra
connect.cassandra.password=cassandra
Incremental Example¶
name=cassandra-source-orders-incremental
connector.class=com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector
connect.cassandra.key.space=demo
connect.cassandra.kcql=INSERT INTO TABLE_X SELECT * FROM TOPIC_Y PK created INCREMENTALMODE=TIMEUUID
connect.cassandra.contact.points=localhost
connect.cassandra.username=cassandra
connect.cassandra.password=cassandra
Schema Evolution¶
Upstream changes to schemas are handled by Schema registry which will validate the addition and removal or fields, data type changes and if defaults are set. The Schema Registry enforces AVRO schema evolution rules. More information can be found here.
For the Source connector, at present no column selection is handled, every column from the table is queried to column additions and deletions are handled in accordance with the compatibility mode of the Schema Registry.
Future releases will support tables auto-creation and adding columns on changes to the topic schema.
Kubernetes¶
Helm Charts are provided at our repo, add the repo to your Helm instance and install. We recommend using the Landscaper to manage Helm Values since typically each Connector instance has its own deployment.
Add the Helm charts to your Helm instance:
helm repo add landoop https://landoop.github.io/kafka-helm-charts/
TroubleShooting¶
Please review the FAQs and join our slack channel