5.0

HBase

Kafka Connect sink connector for writing data from Kafka to HBase.

KCQL support 

The following KCQL is supported:

INSERT | UPSERT 
INTO <table_name> 
SELECT FIELD, ... 
FROM <kafka-topic> 
[PK FIELD, ...]

Examples:

-- Insert mode, select all fields from topicA and write to tableA 
-- and use the default rowkey (topic name, partition, offset)
INSERT INTO tableA SELECT * FROM topicA

-- Insert mode, select 3 fields and rename from topicB 
-- use field y from the topic as the row key
INSERT INTO tableB SELECT x, y, z FROM topicB PK y

Concepts 

Primary Keys 

The PK keyword can be used to specify the fields which will be used for the key value. The field values will be concatenated and separated by a -. If no fields are set the topic name, partition and message offset are used.

Error polices 

The connector supports Error polices .

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=hbase
docker-compose up -d hbase

Preparing the target system 

The sink expects a table to already exist:

docker exec -ti hbase /bin/bash

hbase shell 
create 'person',{NAME=>'lenses', VERSIONS=>1}

Start the connector 

HBase requires a hbase-site.xml file. Login to fastdatadev and create a file called hbase-site.xml in the root directory:

docker exec -ti hbase /bin/bash

and add the following as the contents"

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>hbase:2181</value>
  </property>
  <property>
    <name>hbase.zookeeper.property.clientPort</name>
    <value>2181</value>
  </property>
</configuration>

If you are using Lenses, login into Lenses and navigate to the connectors page , select HBase as the sink and paste the following:

name=person-hbase
connector.class=com.datamountaineer.streamreactor.connect.hbase.HbaseSinkConnector
tasks.max=1
topics=hbase
connect.hbase.column.family=lenses
connect.hbase.kcql=INSERT INTO person SELECT * FROM hbase PK firstName, lastName
connect.hbase.conf.dir=/

To start the connector without using Lenses, log into the fastdatadev container:


docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli :

connect-cli create hbase < connector.properties

connect-cli create hbase < connector.properties

Wait a for the connector to start and check its running:

connect-cli status hbase

Inserting test data 

In the to fastdata container start the kafka producer shell:


kafka-avro-console-producer \
  --broker-list localhost:9092 --topic rethink \
  --property value.schema='{"type":"record","name":"User","namespace":"com.datamountaineer.streamreactor.connect.rethink"
  ,"fields":[{"name":"firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"salary","type":"double"}]}'

the console is now waiting for your input, enter the following:


{"firstName": "John", "lastName": "Smith", "age":30, "salary": 4830}

Check for data in HBase 

In the HBase container run:

scan 'person'

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.hbase.conf.dirThe HBase configuration directory.string
connect.hbase.column.familyThe hbase column family.string
connect.hbase.kcqlconnect.hbase.kcqlstring
connect.hbase.error.policySpecifies the action to be taken if an error occurs while inserting the data. There are two available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is based on The error will be logged automaticallystringTHROW
connect.hbase.retry.intervalThe time in milliseconds between retries.int60000
connect.hbase.max.retriesThe maximum number of times to try the write again.int20
connect.hbase.security.principalThe principal to use when HDFS is using Kerberos to for authentication.string
connect.hbase.security.keytabThe path to the keytab file for the HDFS connector principal. This keytab file should only be readable by the connector user.string
connect.hbase.namenode.principalThe principal for HDFS Namenode.string
connect.hbase.security.kerberos.ticket.renew.msThe period in milliseconds to renew the Kerberos ticket.long3600000
connect.hbase.security.kerberos.userThe user name for login in. Used when auth.mode is set to USERPASSWORDstring
connect.hbase.security.kerberos.passwordThe user password to login to Kerberos. Used when auth.mode is set to USERPASSWORDpassword
connect.hbase.security.kerberos.krb5The path to the KRB5 filestring
connect.hbase.security.kerberos.jaasThe path to the JAAS filestring
connect.hbase.security.kerberos.jaas.entry.nameThe entry in the jaas file to considerstringcom.sun.security.jgss.initiate
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
connect.hbase.security.kerberos.enabledConfiguration indicating whether HDFS is using Kerberos for authentication.booleanfalse
connect.hbase.security.kerberos.auth.modeThe authentication mode for Kerberos. It can be KEYTAB or USERPASSWORDstringKEYTAB
connect.hbase.security.kerberos.debugConfiguration to enable Kerberos debug loggingbooleanfalse