5.0
AWS S3
A Kafka Connect Source connector for writing records from AWS S3 Buckets to Kafka.
KCQL support
The following KCQL is supported:
INSERT INTO kafka-topic SELECT * FROM bucketAddress:pathPrefix [BATCH=batch] [STOREAS storage_format] [LIMIT limit]
Examples:
-- Insert mode, select all fields from the json files
-- within the bucket and path given on testS3Bucket
-- using the default number of records
INSERT INTO topicA SELECT * FROM testS3Bucket:pathToReadFrom
-- Insert mode, select all fields from the avro files
-- within the bucket and path given on testS3Bucket
-- retrieving 5000 records on each poll from Kafka
-- Connect
INSERT INTO topicA SELECT * FROM testS3Bucket:pathToReadFrom STOREAS `AVRO` LIMIT 5000
-- Insert mode, select the content from the text (*.txt)
-- files under a path within the bucket.
-- The connector will return 5000 results for each poll call.
-- Each request to list files to S3 will retrieve a list of 100 files at a time.
INSERT INTO topicA SELECT * FROM `kafka-connect-aws-s3-test:some/other/path` BATCH=100 STOREAS `TEXT` LIMIT 5000
Concepts
The connector reads files in AVRO, Parquet, CSV, text, json, or binary/byte information from an S3 bucket into Kafka connect.
Configuration
An example configuration is provided:
name=S3SourceConnectorParquet # this can be anything
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
tasks.max=1
connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
connect.s3.aws.secret.key=SECRET_KEY
connect.s3.aws.access.key=ACCESS_KEY
connect.s3.aws.auth.mode=Credentials
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8089
You should replace $BUCKET_NAME, $PREFIX_NAME and $TOPIC_NAME with the names of the bucket, desired prefix and topic.
Please read below for a detailed explanation of these and other options, including the meaning of WITH_FLUSH_COUNT and its alternatives.
Auth Mode configuration
2 Authentication modes are available:
Credentials
ACCESS_KEY and SECRET_KEY are credentials generated within AWS IAM and must be set and configured with permissions to write to the desired S3 bucket.
aws.auth.mode=Credentials
aws.access.key=ACCESS_KEY
aws.secret.key=SECRET_KEY
Default
In this auth mode no credentials need be supplied. If no auth mode is specified, then this default will be used.
aws.auth.mode=Default
The credentials will be discovered through the default chain, in this order:
- Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
- Java System Properties - aws.accessKeyId and aws.secretKey
- Web Identity Token credentials from the environment or container
- Credential profiles file at the default location (~/.aws/credentials)
- EC2 Credentials delivered through the Amazon EC2 container service
- Instance profile credentials delivered through the Amazon EC2 metadata service
The full details of the default chain are available on S3 Documentation
Format configuration
Format configuration is provided by kcql.
The options for json, avro and parquet will look like the below:
connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `JSON`
The options for the formats are case-insensitive, but they are presented here in the most readable form.
Input Formats and Expected Extensions
There are multiple options for CSV and BYTES reading that you should be aware of. More is explained below.
The connector will automatically search for any files matching the expected extensions in the given path.
Format | Expected Extensions | Variations |
---|---|---|
JSON | *.json | |
AVRO | *.avro | |
PARQUET | *.parquet | |
TEXT | *.text | |
CSV | *.csv | CSV, CSV_WithHeaders |
BYTES | *.bytes | Bytes_KeyAndValueWithSizes, Bytes_KeyWithSize, Bytes_ValueWithSize, Bytes_KeyOnly, Bytes_ValueOnly |
JSON Input Format Configuration
Using JSON as an input format allows you to read in files containing JSON content (delimited by new lines), line by line.
STOREAS `JSON`
Please note: The JSON is not parsed by the S3 Source connector. There is no difference in handling between Json and Text by the S3 Source connector.
value.converter=org.apache.kafka.connect.storage.StringConverter
Avro Input Format Configuration
Using Avro as the input format allows you to read the Avro-stored messages on S3 back into Kafka’s native format.
STOREAS `Avro`
It may also be necessary to configure the message converter:
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8089
Parquet Input Format Configuration
Using Parquet as the input format allows you to read parquet files stored on S3, importing the Avro schemas and values.
STOREAS `Parquet`
It may also be necessary to configure the message converter:
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8089
Text Input Format Configuration
If the source files on S3 consist of files containing lines of text, then using the text input format may be desired.
STOREAS `Text`
It may be required to use the additional configuration options for this connector to ensure that the value is presented as a String.
value.converter=org.apache.kafka.connect.storage.StringConverter
CSV Input Format Configuration
This reads CSV files written to S3 into a string to be written back to the Kafka source. There are 2 options for CSV format, if WithHeaders is included then the first row is skipped.
STOREAS `CSV_WithHeaders`
STOREAS `CSV`
Please note there is little distinction between the handling of CSV and handling of TEXT (with the exception that the header row can be skipped). The CSV is not parsed within the connector.
Byte(Binary) Input Format Configuration
Bytes can be read back in from S3 and back into message keys/values, depending on how the data was written to the source.
This can be used for reading back in a messages containing binary data that were written out using the s3 source, or alternatively reading binary files to be loaded onto a Kafka queue.
Please see the KCQL options available and the results of these configurations:
Option | KCQL Configuration | Records read from file as |
---|---|---|
Key and Value (With Sizes) | STOREAS Bytes_KeyAndValueWithSizes | Long Long Bytes Bytes |
Key (With Size) | STOREAS Bytes_KeyWithSize | Long Bytes |
Value (With Size) | STOREAS Bytes_ValueWithSize | Long Bytes |
Key Only | STOREAS Bytes_KeyOnly | Bytes |
Value Only | STOREAS Bytes_ValueOnly | Bytes |
Using the “With Sizes” options the Source assumes that the files will contain one or two (depending on configuration) 8-byte chunks of data at the start of the file instructing how many bytes to read for the content.
In order to ensure the message is passed through as bytes it may be necessary to set the additional configuration options
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
Limit Options
To limit the number of file names the source reads from S3 in a single poll, use
BATCH = 100
In order to limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause.
LIMIT 1000
Partition Extraction
To ensure that the original partitions are passed back into Kafka Connect when reading from the source, a partition extractor can be configured to extract the partition from the filename.
The following two new properties are introduced:
connect.s3.source.partition.extractor.type
Can either be hierarchical
(which matches the topic/partition/offset.json
format the sink uses by default) or regex
(where you can supply a custom regular expression to extract the partition).
connect.s3.source.partition.extractor.regex
If using regex
, the regular expression to use to parse the file path and retrieve the partition. Only one lookup group is expected. For an example regex please see the regex for hierarchical
, which is:
(?i)^(?:.*)\/([0-9]*)\/(?:[0-9]*)[.](?:Json|Avro|Parquet|Text|Csv|Bytes)$
Quickstart
Preparing the source system
The S3 Source Connector requires AWS access keys and a bucket to be set up prior to running.
Configuring access keys
The S3 connector requires AWS access keys to be able to perform actions in AWS.
You can configure these through AWS console or CLI tools.
Please see AWS documentation for more details
Preparing a target kafka queue
If you are using lenses you can achieve creation of a Kafka queue simply by using the topic management features of lenses.
Otherwise, change to the directory you have kafka installed
cd ~/Software/kafka_2.12-2.5.0/bin
./kafka-topics.sh --create --bootstrap-server=localhost:9092 --topic=test_topic --partitions=3 --replication-factor=1
Creating a source bucket
Please ensure you use sensible selections for security and access options. More information on the command at AWS’s documentation .
aws create-bucket --bucket test-kafka-connect-bucket
Populate the source files in S3
echo '{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}' > 1.json
aws s3 cp 1.json s3://test-kafka-connect-bucket/jsonTest/test_topic/1.json
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select S3 as the source and paste the following:
name=S3SourceConnectorS3 # this can be anything
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
topics=$TOPIC_NAME
tasks.max=1
connect.s3.kcql=insert into test_topic select * from test-kafka-connect-bucket:jsonTest STOREAS `json` LIMIT 5000
connect.s3.aws.access.key=ACCESS_KEY
connect.s3.aws.secret.key=SECRET_KEY
connect.s3.aws.auth.mode=Credentials
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create aws-s3-source < connector.properties
Wait a for the connector to start and check its running:
connect-cli status aws-s3-source
Check for data in kafka
If you are using Lenses, login into Lenses and navigate to the explore page , and select the new topic you added.
View the topic and verify it has data in.
Otherwise, you can use the kafka-console-consumer script that comes with Kafka.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jsonTest --from-beginning
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Available Values | Default Value |
---|---|---|---|---|
connect.s3.aws.access.key | Access Key | string | ||
connect.s3.aws.secret.key | Secret Key | string | ||
connect.s3.aws.region | Region | string | ||
connect.s3.aws.client | Client | string | aws, jclouds | AWS |
connect.s3.pool.max.connections | Connection Pool Max Connections | int | -1 (undefined) | 50 (aws), 30 (jclouds) |
connect.s3.aws.auth.mode | Auth Mode | string | Credentials, Default | Default |
connect.s3.custom.endpoint | Custom Endpoint | string | ||
connect.s3.kcql | KCQL String | string | kcql configuration | |
connect.s3.vhost.bucket | Enable Vhost Buckets | boolean | true, false | false |
connect.s3.source.partition.extractor.type | Extractor Type | string | hierarchical, regex | |
connect.s3.source.partition.extractor.regex | Extractor Regex | string |