4.1
RethinkDB
A Kafka Connector source connector to write events from ReThinkDB to Kafka.
KCQL support
The following KCQL is supported:
INSERT INTO kafka_topic
SELECT *
FROM rethink_table
[INITIALIZE]
[BATCH ...]
Selection of fields from the Pulsar message is not supported.
Examples:
-- Insert into Kafka the change feed from tableA
INSERT INTO topicA SELECT * FROM tableA
-- Insert into topicA the change feed from tableA, read from start
INSERT INTO tableA SELECT * FROM topicA INITIALIZE
-- Insert into topicA the change feed from tableA, read from start,
-- read from start and batch 100 rows to send to kafka
INSERT INTO tableA SELECT * FROM topicA INITIALIZE BATCH 100
Concepts
RethinkDB supports a changefeed. This connector listens to the change feed for tables described in the KCQL table.
Initialization
The connector can read from the start of the changefeed to bootstrap a Kafka topic. To initialize use the INITIALIZE clause in the KCQL statement.
Batching
The connector can batch records when writing to a Kafka topic. To set the batching use the BATCH clause in the KCQL statement.
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=rethinkdb
docker-compose up -d rethink
Inserting test data
Go to the ReThink Admin console http://localhost:8080/#tables.
- Add a table called lenses.
- The Data Explorer tab insert the following and hit run to insert the record into the table.
r.table('lenses').insert([
{ name: "datamountaineers-rule", tv_show: "Battlestar Galactica",
posts: [
{title: "Decommissioning speech3", content: "The Cylon War is long over..."},
{title: "We are at war", content: "Moments ago, this ship received word..."},
{title: "The new Earth", content: "The discoveries of the past few days..."}
]
}
])
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select RethinkDB as the source and paste the following:
name=rethink
connector.class=com.datamountaineer.streamreactor.connect.rethink.source.ReThinkSourceConnector
tasks.max=1
connect.rethink.host=rethink
connect.rethink.port=28015
connect.rethink.db=test
connect.rethink.kcql=INSERT INTO rethink SELECT * FROM lenses INITIALIZE
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create rethink < connector.properties
connect-cli create rethink < connector.properties
Wait a for the connector to start and check its running:
connect-cli status rethink
Check for records in Kafka
Check the records in Lenses or with via the console:
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic rethink \
--from-beginning
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.rethink.host | Rethink server host. | string | localhost |
connect.rethink.db | The reThink database to read from. | string | connect_rethink_sink |
connect.rethink.port | Client port of rethink server to connect to. | int | 28015 |
connect.rethink.kcql | The KCQL expression for the connector. | string | |
connect.rethink.rethink.username | The user name to connect to rethink with. | string | |
connect.rethink.password | The password for the user. | password | |
connect.rethink.rethink.auth.key | The authorization key to use in combination with the certificate file. | password | |
connect.rethink.rethink.cert.file | Certificate file to use for secure TLS connection to the rethinkdb servers. Cannot be used with username/password. | string | |
connect.rethink.linger.ms | The number of milliseconds to wait before flushing the received messages to Kafka. The records willbe flushed if the batch size is reached before the linger period has expired. | long | 5000 |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |