5.0
Hive
Kafka Connect source connector for reading data from Hive and writing to Kafka.
Two versions of the Hive connector are available:
- Hive (Hive 2.1+)
- Hive 1.1 (Hive 1.1)
KCQL support
The following KCQL is supported:
INSERT INTO <topic>
SELECT FIELDS,...
FROM <hive-table>
Examples:
-- Insert into kafka_topicA all fields from hive_tableA
INSERT INTO kafka_topicA SELECT * FROM hive_tableA
Concepts
Kerberos
For those Hive clusters using Kerberos for authentication, the connector supports two modes. Controlling the modes happens via connect.hive.security.kerberos.auth.mode configuration. The supported values are
- KEYTAB
- USERPASSWORD.
The connect.hive.security.kerberos.ticket.renew.ms configuration controls the interval (in milliseconds) to renew a previously obtained (during the login step) Kerberos token.
Keytab
When this mode is configured, these extra configurations need to be set:
connect.hive.security.principal= ....
connect.hive.security.keytab = /path/to/the/keytab
connect.hive.security.namenode.principal=....
The keytab file needs to be available on the same path on all the Connect cluster workers. In case the file is missing an error will be raised. You can find the details about the configurations in the Optional Configurations section.
User-password
For those setups where a keytab is not available, the Kerberos authentication can be handled via user and password approach. In this case, the following configurations are required by the sink:
connect.hive.security.kerberos.user = jsmith
connect.hive.security.kerberos.password=password123
connect.hive.security.kerberos.krb5=/path/to/the/krb5
connect.hive.security.kerberos.jaas=/path/to/the/jaas
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=hive
docker-compose up -d kudu
Inserting test data
Login to the container and start the HIVE shell:
docker exec -ti hive hive
and execute the following:
CREATE DATABASE lenses;
USE lenses;
CREATE TABLE cities (
city STRING
, state STRING
, population INT
, country STRING)
STORED AS PARQUET;
INSERT INTO TABLE cities VALUES ("Philadelphia", "PA", 1568000, "USA");
INSERT INTO TABLE cities VALUES ("Chicago", "IL", 2705000, "USA");
INSERT INTO TABLE cities VALUES ("New York", "NY", 8538000, "USA");
SELECT *
FROM cities;
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select Hive as the source and paste the following:
name=hive-source-example
connector.class=com.landoop.streamreactor.connect.hive.source.HiveSourceConnector
tasks.max=1
topics=hive
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
connect.hive.kcql=INSERT INTO cities SELECT * FROM cities
connect.hive.database.name=lenses
connect.hive.metastore=thrift
connect.hive.metastore.uris=thrift://hive-metastore:9083
connect.hive.fs.defaultFS=hdfs://namenode:8020
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create hive < connector.properties
connect-cli create hive < connector.properties
Wait a for the connector to start and check its running:
connect-cli status hive
Check for records in Kafka
Check the records in Lenses or with via the console:
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic cities \
--from-beginning
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.hive.database.name | Sets the database name | string | |
connect.hive.metastore | Protocol used by the hive metastore | string | |
connect.hive.kcql | Contains the Kafka Connect Query Language describing the flow from Apache Kafka topics to Apache Hive tables. | string | |
connect.hive.fs.defaultFS | HDFS Filesystem default uri | string | |
connect.hive.metastore.uris | URI to point to the metastore | string | |
connect.hive.hdfs.conf.dir | The Hadoop configuration directory. | string | |
connect.hive.conf.dir | The Hive configuration directory. | string | |
connect.hive.refresh.frequency | the number of seconds before which to refresh the Hive file listing | int | 0 |
connect.hive.security.principal | The principal to use when HDFS is using Kerberos to for authentication. | string | |
connect.hive.security.keytab | The path to the keytab file for the HDFS connector principal. This keytab file should only be readable by the connector user. | string | |
connect.hive.namenode.principal | The principal for HDFS Namenode. | string | |
connect.hive.security.kerberos.ticket.renew.ms | The period in milliseconds to renew the Kerberos ticket. | long | 3600000 |
connect.hive.security.kerberos.user | The user name for login in. Used when auth.mode is set to USERPASSWORD | string | |
connect.hive.security.kerberos.password | The user password to login to Kerberos. Used when auth.mode is set to USERPASSWORD | password | |
connect.hive.security.kerberos.krb5 | The path to the KRB5 file | string | |
connect.hive.security.kerberos.jaas | The path to the JAAS file | string | |
connect.hive.security.kerberos.jaas.entry.name | The entry in the jaas file to consider | string | com.sun.security.jgss.initiate |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.hive.security.kerberos.enabled | Configuration indicating whether HDFS is using Kerberos for authentication. | boolean | false |
connect.hive.security.kerberos.auth.mode | The authentication mode for Kerberos. It can be KEYTAB or USERPASSWORD | string | KEYTAB |