4.1
RethinkDB
Kafka Connect sink connector for writing data from Kafka to RethinkDB.
KCQL support
The following KCQL is supported:
INSERT | UPSERT
INTO <table_name>
SELECT FIELD, ...
FROM <kafka-topic>
[AUTOCREATE]
[PK FIELD, ...]
Concepts
Autocreate
The connector can autocreate a table in RethinkDb using the AUTOCREATE clause.
Primary Key
When creating tables and inserting records the primary keys can be defined by the PK clause.
If none are specified a concatenation of the topic name, partition and offset are used when inserting record. When creating the table with no keys set the
primary key field called id
.
Insert mode
The connector support an insert mode. This corresponds to RethinkDb’s ERROR conflict policy
Upsert mode
The connector support an upsert mode. This corresponds to RethinkDb’s REPLACE conflict policy
Kafka payload support
This sink supports the following Kafka payloads:
- Schema.Struct and Struct (Avro)
- Schema.Struct and JSON
- No Schema and JSON
See connect payloads for more information.
Error polices
The connector supports Error polices .
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=rethinkdb
docker-compose up -d rethink
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select RethinkDB as the sink and paste the following:
name=rethink
connector.class=com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkConnector
tasks.max=1
topics=rethink-topic
connect.rethink.db=test
connect.rethink.host=rethink
connect.rethink.port=28015
connect.rethink.kcql=INSERT INTO lenses SELECT * FROM rethink
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create rethink < connector.properties
connect-cli create rethink < connector.properties
Wait a for the connector to start and check its running:
connect-cli status rethink
Inserting test data
In the to fastdata container start the kafka producer shell:
kafka-avro-console-producer \
--broker-list localhost:9092 --topic rethink \
--property value.schema='{"type":"record","name":"User","namespace":"com.datamountaineer.streamreactor.connect.rethink"
,"fields":[{"name":"firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"salary","type":"double"}]}'
the console is now waiting for your input, enter the following:
{"firstName": "John", "lastName": "Smith", "age":30, "salary": 4830}
Check for data in RethinkDB
Go to the ReThink Admin console http://localhost:8080/#tables.
- The Data Explorer tab insert the following and hit run.
r.table('lenses')
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.rethink.host | Rethink server host. | string | localhost |
connect.rethink.db | The reThink database to read from. | string | connect_rethink_sink |
connect.rethink.port | Client port of rethink server to connect to. | int | 28015 |
connect.rethink.kcql | The KCQL expression for the connector. | string | |
connect.rethink.rethink.username | The user name to connect to rethink with. | string | |
connect.rethink.password | The password for the user. | password | |
connect.rethink.rethink.auth.key | The authorization key to use in combination with the certificate file. | password | |
connect.rethink.rethink.cert.file | Certificate file to use for secure TLS connection to the rethinkdb servers. Cannot be used with username/password. | string | |
connect.rethink.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.rethink.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.rethink.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |