Kafka Connect Cassandra is a Source Connector for reading data from Cassandra and writing to Kafka.
The following KCQL is supported:
INSERT INTO <your-topic> SELECT FIELD,... FROM <your-cassandra-table> [PK FIELD] [WITHFORMAT JSON] [INCREMENTALMODE=TIMESTAMP|TIMEUUID|TOKEN|DSESEARCHTIMESTAMP] [WITHKEY(<your-key-field>)]
Examples:
-- Select all columns from table orders and insert into a topic -- called orders-topic, use column created to track new rows. -- Incremental mode set to TIMEUUID INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID -- Select created, product, price from table orders and insert -- into a topic called orders-topic, use column created to track new rows. INSERT INTO orders-topic SELECT created, product, price FROM orders PK created.
The connector can load data in two ways:
This behavior is determined by the mode clause on the KCQL statement:
The connector can write JSON to your Kafka topic using the WITHFORMAT JSON clause but the key and value converters must be set:
key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
In order to facilitate scenarios like retaining the latest value for a given device identifier, or support Kafka Streams joins without having to re-map the topic data the connector supports WITHKEY in the KCQL syntax.
INSERT INTO <topic> SELECT <fields> FROM <column_family> PK <PK_field> WITHFORMAT JSON WITHUNWRAP INCREMENTALMODE=<mode> WITHKEY(<key_field>)
Multiple keys fields are supported using a delimiter:
// `[` enclosed by `]` denotes optional values WITHKEY(field1 [, field2.A , field3]) [KEYDELIMITER='.']
The resulting Kafka record key content will be the string concatenation for the values of the fields specified. Optionally the delimiter can be set via the KEYDELIMITER keyword.
This mode tracks new records added to a table. The columns to track is identified by the PK clause in the KCQL statement. Only one column can be used to track new records. The support Cassandra column data types are:
If set to TOKEN this column value is wrapped inside Cassandras token function which needs unwrapping with the WITHUNWRAP command.
DSESEARCHTIMESTAMP will make a DSE Search queries using Solr instead of a native Cassandra query.
INSERT INTO <topic> SELECT a, b, c, d FROM keyspace.table WHERE solr_query= 'pkCol:{2020-03-23T15:02:21Z TO 2020-03-23T15:30:12.989Z]}' INCREMENTALMODE=DSESEARCHTIMESTAMP
The connector can be configured to:
connect.cassandra.initial.offset
connect.cassandra.import.poll.interval
For a more detailed explanation on how to use Cassandra to Kafka options.
export CONNECTOR=cassandra docker-compose up -d cassandra
Login into your container:
docker exec -ti cassandra cqlsh
and and create the following keyspace, table and insert test data:
CREATE KEYSPACE demo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3}; USE demo; CREATE TABLE orders ( id int , created timeuuid , product text , qty int , price float , PRIMARY KEY (id, created)) WITH CLUSTERING ORDER BY (created asc); INSERT INTO orders (id, created, product, qty, price) VALUES (1, now(), 'OP-DAX-P-20150201-95.7', 100, 94.2); INSERT INTO orders (id, created, product, qty, price) VALUES (2, now(), 'OP-DAX-C-20150201-100', 100, 99.5); INSERT INTO orders (id, created, product, qty, price) VALUES (3, now(), 'FU-KOSPI-C-20150201-100', 200, 150);
If you are using Lenses, login into Lenses and navigate to the connectors page, select Cassandra as the source and paste the following:
name=cassandra connector.class=io.lenses.streamreactor.connect.cassandra.source.CassandraSourceConnector connect.cassandra.key.space=demo connect.cassandra.kcql=INSERT INTO orders-topic SELECT * FROM orders PK created INCREMENTALMODE=TIMEUUID connect.cassandra.contact.points=cassandra
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create cassandra-source < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status cassandra-source
Check the records in Lenses or with via the console:
kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --topic orders-topic \ --from-beginning
The following CQL data types are supported:
On this page