Kafka Connect sink connector for writing data from Kafka to Kudu.
The following KCQL is supported:
INSERT | UPDATE INTO <kudu-table> SELECT FIELDS, ... FROM <kafka-topic> [AUTOCREATE] [DISTRIBUTE BY FIELD, ... INTO NBR_OF_BUCKETS BUCKETS] [AUTOEVOLVE]
Examples:
-- Insert mode, select all fields from topicA and write to tableA INSERT INTO tableA SELECT * FROM topicA -- Insert mode, select 3 fields and rename from topicB and write to tableB INSERT INTO tableB SELECT x AS a, y, z AS c FROM topicB -- Upsert mode, select all fields from topicC, -- auto create tableC and auto evolve, use field1 as the primary key UPSERT INTO tableC SELECT * FROM topicC AUTOCREATE DISTRIBUTEBY field1 INTO 10 BUCKETS AUTOEVOLVE
Kudu at present does not have a concept of databases like Impala. If you want to insert into Impala you need to set the target table accordingly:
impala:database.impala_table
Insert is the default write mode of the sink.
Kafka currently can provide exactly once delivery semantics, however to ensure no errors are produced if unique constraints have been implemented on the target tables, the sink can run in UPSERT mode. If the error policy has been set to NOOP then the Sink will discard the error and continue to process, however, it currently makes no attempt to distinguish violation of integrity constraints from other exceptions such as casting issues.
The connector supports Kudu upserts which replaces the existing row if a match is found on the primary keys. If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.
If you set AUTOCREATE, the sink will use the schema attached to the topic to create a table in Kudu. The primary keys are set by the PK keyword. The field values will be concatenated and separated by a -. If no fields are set the topic name, partition and message offset are used.
Kudu has a number of partition strategies. The sink only supports the HASH partition strategy, you can control which fields are used to hash and the number of buckets which should be created. This behavior is controlled via the DISTRIBUTEBY clause.
DISTRIBUTE BY id INTO 10 BUCKETS
CREATE EXTERNAL TABLE my_mapping_table STORED AS KUDU TBLPROPERTIES ( 'kudu.table_name' = 'my_kudu_table' );
The Connector supports auto evolution of tables for each topic. When set the connector will identify new schemas for each topic based on the schema version from the Schema registry. New columns will be identified and an alter table DDL statement issued against Kudu. All new columns are set as nullable
Schema evolution can occur upstream, for example new fields added or change in data type in the schema of the topic.
Fields cannot be deleted upstream. Fields should be of AVRO union type [null, ] with a default set. This allows the Connector to either retrieve the default value or null. The Connector is not aware that the field has been deleted as a value is always supplied to it.
If a upstream field is removed and the topic is not following the Schema Registry’s evolution rules, i.e. not full or backwards compatible, any errors will default to the error policy.
The connector supports Error polices.
export CONNECTOR=kudu docker-compose up -d kudu
Login into the Kudu container and start the impala-shell:
docker exec -ti kudu impala-shell
CREATE TABLE orders ( id INT , created VARCHAR , product VARCHAR , qty INT , price DOUBLE , PRIMARY KEY (id) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU TBLPROPERTIES ( 'kudu.num_tablet_replicas' = '1' ); SELECT * FROM orders;
If you are using Lenses, login into Lenses and navigate to the connectors page, select Kudu as the sink and paste the following:
name=kudu connector.class=com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector tasks.max=1 topics=orders connect.kudu.master=kudu connect.kudu.kcql=INSERT INTO impala::default.orders SELECT * FROM orders
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create kudu < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status kudu
In the lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{ "id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty": 100 }
In the Kudu container:
SELECT * FROM orders;
Bring down the stack:
docker-compose down
http://localhost:8081
On this page