Kafka Connect sink connector for writing data from Kafka to HBase.
KCQL support
The following KCQL is supported:
INTO <table_name>
FROM <kafka-topic>
[PK FIELD, ...]
-- Insert mode, select all fields from topicA and write to tableA
-- and use the default rowkey (topic name, partition, offset)
-- Insert mode, select 3 fields and rename from topicB
-- use field y from the topic as the row key
INSERT INTO tableB SELECT x, y, z FROM topicB PK y
Primary Keys
The PK keyword can be used to specify the fields which will be used for the key value. The field values will be concatenated and separated by a -. If no fields are set the topic name, partition and message offset are used.
Error polices
The connector supports Error polices .
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=hbase
docker-compose up -d hbase
Preparing the target system
The sink expects a table to already exist:
docker exec -ti hbase /bin/bash
hbase shell
create 'person',{NAME=>'lenses', VERSIONS=>1}
Start the connector
HBase requires a hbase-site.xml file. Login to fastdatadev and create a file called hbase-site.xml in the root directory:
docker exec -ti hbase /bin/bash
and add the following as the contents"
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
If you are using Lenses, login into Lenses and navigate to the connectors page , select HBase as the sink and paste the following:
connect.hbase.kcql=INSERT INTO person SELECT * FROM hbase PK firstName, lastName
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create hbase < connector.properties
connect-cli create hbase < connector.properties
Wait a for the connector to start and check its running:
connect-cli status hbase
Inserting test data
In the to fastdata container start the kafka producer shell:
kafka-avro-console-producer \
--broker-list localhost:9092 --topic rethink \
--property value.schema='{"type":"record","name":"User","namespace":"com.datamountaineer.streamreactor.connect.rethink"
the console is now waiting for your input, enter the following:
{"firstName": "John", "lastName": "Smith", "age":30, "salary": 4830}
Check for data in HBase
In the HBase container run:
scan 'person'
Clean up
Bring down the stack:
docker-compose down
Name | Description | Type | Default Value |
connect.hbase.conf.dir | The HBase configuration directory. | string | |
connect.hbase.column.family | The hbase column family. | string | |
connect.hbase.kcql | connect.hbase.kcql | string | |
connect.hbase.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automatically | string | THROW |
connect.hbase.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.hbase.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.hbase.security.principal | The principal to use when HDFS is using Kerberos to for authentication. | string | |
connect.hbase.security.keytab | The path to the keytab file for the HDFS connector principal. This keytab file should only be readable by the connector user. | string | |
connect.hbase.namenode.principal | The principal for HDFS Namenode. | string | |
connect.hbase.security.kerberos.ticket.renew.ms | The period in milliseconds to renew the Kerberos ticket. | long | 3600000 |
connect.hbase.security.kerberos.user | The user name for login in. Used when auth.mode is set to USERPASSWORD | string | |
connect.hbase.security.kerberos.password | The user password to login to Kerberos. Used when auth.mode is set to USERPASSWORD | password | |
connect.hbase.security.kerberos.krb5 | The path to the KRB5 file | string | |
connect.hbase.security.kerberos.jaas | The path to the JAAS file | string | |
connect.hbase.security.kerberos.jaas.entry.name | The entry in the jaas file to consider | string | com.sun.security.jgss.initiate |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.hbase.security.kerberos.enabled | Configuration indicating whether HDFS is using Kerberos for authentication. | boolean | false |
connect.hbase.security.kerberos.auth.mode | The authentication mode for Kerberos. It can be KEYTAB or USERPASSWORD | string | KEYTAB |
connect.hbase.security.kerberos.debug | Configuration to enable Kerberos debug logging | boolean | false |