5.0
Kudu
Kafka Connect sink connector for writing data from Kafka to Kudu.
KCQL support
The following KCQL is supported:
INSERT | UPDATE
INTO <kudu-table>
SELECT FIELDS, ...
FROM <kafka-topic>
[AUTOCREATE]
[DISTRIBUTE BY FIELD, ... INTO NBR_OF_BUCKETS BUCKETS]
[AUTOEVOLVE]
Examples:
-- Insert mode, select all fields from topicA and write to tableA
INSERT INTO tableA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB and write to tableB
INSERT INTO tableB SELECT x AS a, y, z AS c FROM topicB
-- Upsert mode, select all fields from topicC,
-- auto create tableC and auto evolve, use field1 as the primary key
UPSERT INTO tableC
SELECT * FROM topicC AUTOCREATE DISTRIBUTEBY field1 INTO 10 BUCKETS AUTOEVOLVE
Concepts
Kudu at present does not have a concept of databases like Impala. If you want to insert into Impala you need to set the target table accordingly:
impala:database.impala_table
Insert Mode
Insert is the default write mode of the sink.
Kafka currently can provide exactly once delivery semantics, however to ensure no errors are produced if unique constraints have been implemented on the target tables, the sink can run in UPSERT mode. If the error policy has been set to NOOP then the Sink will discard the error and continue to process, however, it currently makes no attempt to distinguish violation of integrity constraints from other exceptions such as casting issues.
Upsert Mode
The connector supports Kudu upserts which replaces the existing row if a match is found on the primary keys. If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.
Auto Create
If you set AUTOCREATE, the sink will use the schema attached to the topic to create a table in Kudu. The primary keys are set by the PK keyword. The field values will be concatenated and separated by a -. If no fields are set the topic name, partition and message offset are used.
Kudu has a number of partition strategies. The sink only supports the HASH partition strategy, you can control which fields are used to hash and the number of buckets which should be created. This behaviour is controlled via the DISTRIBUTEBY clause.
DISTRIBUTE BY id INTO 10 BUCKETS
CREATE EXTERNAL TABLE my_mapping_table
STORED AS KUDU
TBLPROPERTIES (
'kudu.table_name' = 'my_kudu_table'
);
Auto Evolution
The Connector supports auto evolution of tables for each topic. When set the connector will identify new schemas for each topic based on the schema version from the Schema registry. New columns will be identified and an alter table DDL statement issued against Kudu. All new columns are set as nullable
Schema evolution can occur upstream, for example new fields added or change in data type in the schema of the topic.
Fields cannot be deleted upstream. Fields should be of AVRO union type [null,
If a upstream field is removed and the topic is not following the Schema Registry’s evolution rules, i.e. not full or backwards compatible, any errors will default to the error policy.
Error polices
The connector supports Error polices .
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=kudu
docker-compose up -d kudu
Prepare the target system
Login into the Kudu container and start the impala-shell:
docker exec -ti kudu impala-shell
CREATE TABLE orders (
id INT
, created VARCHAR
, product VARCHAR
, qty INT
, price DOUBLE
, PRIMARY KEY (id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES (
'kudu.num_tablet_replicas' = '1'
);
SELECT * FROM orders;
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select Kudu as the sink and paste the following:
name=kudu
connector.class=com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector
tasks.max=1
topics=orders
connect.kudu.master=kudu
connect.kudu.kcql=INSERT INTO impala::default.orders SELECT * FROM orders
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create kudu < connector.properties
connect-cli create kudu < connector.properties
Wait a for the connector to start and check its running:
connect-cli status kudu
Inserting test data
In the to fastdata container start the kafka producer shell:
kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}
Check for data in Kudu
In the Kudu container:
SELECT * FROM orders;
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.kudu.master | Kudu master address, comma separated list. | string | localhost |
connect.kudu.kcql | connect.kudu.kcql | string | |
connect.kudu.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.kudu.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.kudu.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.kudu.schema.registry.url | Url for the schema registry | string | http://localhost:8081 |
connect.kudu.write.flush.mode | Specify kudu write mode: SYNC - flush each sink record. Batching is disabled. BATCH_BACKGROUND - flush batch of sink records in background thread. BATCH_SYNC - flush batch of sink records. | string | SYNC |
connect.kudu.mutation.buffer.space | Kudu Session mutation buffer space | int | 1000 |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |