Hive


Kafka Connect source connector for reading data from Hive and writing to Kafka.

Hive connector versions available are:

  • Hive (Hive 2.1+)

KCQL support 

The following KCQL is supported:

INSERT INTO <topic>
SELECT FIELDS,...
FROM <hive-table>

Examples:

-- Insert into kafka_topicA all fields from hive_tableA
INSERT INTO kafka_topicA SELECT * FROM hive_tableA

Concepts 

Kerberos 

For those Hive clusters using Kerberos for authentication, the connector supports two modes. Controlling the modes happens via connect.hive.security.kerberos.auth.mode configuration. The supported values are:

  • KEYTAB
  • USERPASSWORD

The connect.hive.security.kerberos.ticket.renew.ms configuration controls the interval (in milliseconds) to renew a previously obtained (during the login step) Kerberos token.

Keytab 

When this mode is configured, these extra configurations need to be set:

connect.hive.security.principal= ....
connect.hive.security.keytab = /path/to/the/keytab
connect.hive.security.namenode.principal=....

The keytab file needs to be available on the same path on all the Connect cluster workers. In case the file is missing an error will be raised. You can find the details about the configurations in the Optional Configurations section.

User-password 

For those setups where a keytab is not available, the Kerberos authentication can be handled via user and password approach. In this case, the following configurations are required by the sink:

connect.hive.security.kerberos.user = jsmith
connect.hive.security.kerberos.password=password123
connect.hive.security.kerberos.krb5=/path/to/the/krb5
connect.hive.security.kerberos.jaas=/path/to/the/jaas

Quickstart 

Launch the stack 


  1. Copy the docker-compose file.
  2. Bring up the stack.
export CONNECTOR=hive
docker-compose up -d kudu

Inserting test data 

Login to the container and start the HIVE shell:

docker exec -ti hive hive

and execute the following:

CREATE DATABASE lenses;
USE lenses;

CREATE TABLE cities (
    city STRING
    , state STRING
    , population INT
    , country STRING)
STORED AS PARQUET;

INSERT INTO TABLE cities VALUES ("Philadelphia", "PA", 1568000, "USA");
INSERT INTO TABLE cities VALUES ("Chicago", "IL", 2705000, "USA");
INSERT INTO TABLE cities VALUES ("New York", "NY", 8538000, "USA");

SELECT *
FROM cities;

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select Hive as the source and paste the following:

name=hive-source-example
connector.class=com.landoop.streamreactor.connect.hive.source.HiveSourceConnector
tasks.max=1
topics=hive
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
connect.hive.kcql=INSERT INTO cities SELECT * FROM cities
connect.hive.database.name=lenses
connect.hive.metastore=thrift
connect.hive.metastore.uris=thrift://hive-metastore:9083
connect.hive.fs.defaultFS=hdfs://namenode:8020

To start the connector using the command line, log into the lenses-box container:


docker exec -ti lenses-box /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create hive < connector.properties

connect-cli create hive < connector.properties

Wait for the connector to start and check it’s running:

connect-cli status hive

Check for records in Kafka 

Check the records in Lenses or with via the console:

kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic cities \
    --from-beginning

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeDefault Value
connect.hive.database.nameSets the database namestring
connect.hive.metastoreProtocol used by the hive metastorestring
connect.hive.kcqlContains the Kafka Connect Query Language describing the flow from Apache Kafka topics to Apache Hive tables.string
connect.hive.fs.defaultFSHDFS Filesystem default uristring
connect.hive.metastore.urisURI to point to the metastorestring
connect.hive.hdfs.conf.dirThe Hadoop configuration directory.string
connect.hive.conf.dirThe Hive configuration directory.string
connect.hive.refresh.frequencythe number of seconds before which to refresh the Hive file listingint0
connect.hive.security.principalThe principal to use when HDFS is using Kerberos to for authentication.string
connect.hive.security.keytabThe path to the keytab file for the HDFS connector principal. This keytab file should only be readable by the connector user.string
connect.hive.namenode.principalThe principal for HDFS Namenode.string
connect.hive.security.kerberos.ticket.renew.msThe period in milliseconds to renew the Kerberos ticket.long3600000
connect.hive.security.kerberos.userThe user name for login in. Used when auth.mode is set to USERPASSWORDstring
connect.hive.security.kerberos.passwordThe user password to login to Kerberos. Used when auth.mode is set to USERPASSWORDpassword
connect.hive.security.kerberos.krb5The path to the KRB5 filestring
connect.hive.security.kerberos.jaasThe path to the JAAS filestring
connect.hive.security.kerberos.jaas.entry.nameThe entry in the jaas file to considerstringcom.sun.security.jgss.initiate
connect.progress.enabledEnables the output for how many records have been processedbooleanfalse
connect.hive.security.kerberos.enabledConfiguration indicating whether HDFS is using Kerberos for authentication.booleanfalse
connect.hive.security.kerberos.auth.modeThe authentication mode for Kerberos. It can be KEYTAB or USERPASSWORDstringKEYTAB
--
Last modified: November 18, 2024