Kafka Connect sink connector for writing data from Kafka to HBase.
The following KCQL is supported:
INSERT | UPSERT INTO <table_name> SELECT FIELD, ... FROM <kafka-topic> [PK FIELD, ...]
Examples:
-- Insert mode, select all fields from topicA and write to tableA -- and use the default rowkey (topic name, partition, offset) INSERT INTO tableA SELECT * FROM topicA -- Insert mode, select 3 fields and rename from topicB -- use field y from the topic as the row key INSERT INTO tableB SELECT x, y, z FROM topicB PK y
The PK keyword can be used to specify the fields which will be used for the key value. The field values will be concatenated and separated by a -. If no fields are set the topic name, partition and message offset are used.
The connector supports Error polices.
export CONNECTOR=hbase docker-compose up -d hbase
The sink expects a table to already exist:
docker exec -ti hbase /bin/bash
hbase shell create 'person',{NAME=>'lenses', VERSIONS=>1}
HBase requires a hbase-site.xml file. Login to lenses-box and create a file called hbase-site.xml in the root directory:
and add the following as the contents:
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hbase.zookeeper.quorum</name> <value>hbase:2181</value> </property> <property> <name>hbase.zookeeper.property.clientPort</name> <value>2181</value> </property> </configuration>
If you are using Lenses, login into Lenses and navigate to the connectors page, select HBase as the sink and paste the following:
name=person-hbase connector.class=com.datamountaineer.streamreactor.connect.hbase.HbaseSinkConnector tasks.max=1 topics=hbase connect.hbase.column.family=lenses connect.hbase.kcql=INSERT INTO person SELECT * FROM hbase PK firstName, lastName connect.hbase.conf.dir=/
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create hbase < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status hbase
In the lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 --topic rethink \ --property value.schema='{"type":"record","name":"User","namespace":"com.datamountaineer.streamreactor.connect.rethink" ,"fields":[{"name":"firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"salary","type":"double"}]}'
the console is now waiting for your input, enter the following:
{ "firstName": "John", "lastName": "Smith", "age": 30, "salary": 4830 }
In the HBase container run:
scan 'person'
Bring down the stack:
docker-compose down
On this page