4.1
VoltDB
Kafka Connect sink connector for writing data from Kafka to VoltDB.
KCQL support
The following KCQL is supported:
INSERT | UPSERT
INTO <table_name>
SELECT FIELD, ...
FROM <kafka-topic>
Examples:
-- Insert mode, select all fields from topicA and write to tableA
INSERT INTO tableA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB and write to tableB
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB
-- Upsert mode, select 3 fields write to tableB
UPSERT INTO tableB SELECT x AS a, y, z FROM topicB
Concepts
Insert Mode
Insert is the default write mode of the sink.
Kafka currently can provide exactly once delivery semantics, however to ensure no errors are produced if unique constraints have been implemented on the target tables, the sink can run in UPSERT mode. If the error policy has been set to NOOP then the Sink will discard the error and continue to process, however, it currently makes no attempt to distinguish violation of integrity constraints from other exceptions such as casting issues.
Upsert Mode
The connector supports Kudu upserts which replaces the existing row if a match is found on the primary keys. If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.
Error polices
The connector supports Error polices .
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=volt
docker-compose up -d voltdb
Preparing the target system
Login into the VoltDB container and start the VoltDb shell:
docker exec -ti voltdb sqlcmd
CREATE TABLE person(
firstname VARCHAR(128)
, lastname VARCHAR(128)
, age INT
, salary FLOAT,
PRIMARY KEY (firstname, lastname)
);
SELECT * FROM person;
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select VoltDB as the sink and paste the following:
name=voltdb
connector.class=com.datamountaineer.streamreactor.connect.voltdb.VoltSinkConnector
tasks.max=1
topics=voltdb
connect.volt.servers=voltdb:21212
connect.volt.password=
connect.volt.username=
connect.volt.kcql=INSERT INTO person SELECT * FROM voltdb
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create voltdb < connector.properties
connect-cli create voltdb < connector.properties
Wait a for the connector to start and check its running:
connect-cli status voltdb
Inserting test data
In the to fastdata container start the kafka producer shell:
kafka-avro-console-producer \
--broker-list localhost:9092 --topic rethink \
--property value.schema='{"type":"record","name":"User","namespace":"com.datamountaineer.streamreactor.connect.rethink"
,"fields":[{"name":"firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"salary","type":"double"}]}'
the console is now waiting for your input, enter the following:
{"firstName": "John", "lastName": "Smith", "age":30, "salary": 4830}
Check for data in VoltDB
In the VoltDB container run:
SELECT * FROM person;
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.volt.retry.interval | The time in milliseconds between retries | int | 60000 |
connect.volt.servers | Comma separated server[:port] | string | |
connect.volt.username | The user to connect to the volt database | string | |
connect.volt.password | The password for the voltdb user | password | |
connect.volt.kcql | KCQL expression describing field selection and routes | string | |
connect.volt.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.volt.max.retries | The maximum number of times to try the write again | int | 20 |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |