5.0

AWS S3

A Kafka Connect Source connector for writing records from AWS S3 Buckets to Kafka.

KCQL support 

The following KCQL is supported:

INSERT INTO kafka-topic 
SELECT * 
FROM bucketAddress:pathPrefix
[BATCH=batch]
[STOREAS storage_format]
[LIMIT limit]

Examples:

-- Insert mode, select all fields from the json files
-- within the bucket and path given on testS3Bucket
-- using the default number of records
INSERT INTO topicA SELECT * FROM testS3Bucket:pathToReadFrom

-- Insert mode, select all fields from the avro files
-- within the bucket and path given on testS3Bucket
-- retrieving 5000 records on each poll from Kafka 
-- Connect
INSERT INTO topicA SELECT * FROM testS3Bucket:pathToReadFrom STOREAS `AVRO` LIMIT 5000

-- Insert mode, select the content from the text (*.txt)
-- files under a path within the bucket.
-- The connector will return 5000 results for each poll call.
-- Each request to list files to S3 will retrieve a list of 100 files at a time.
INSERT INTO topicA SELECT * FROM `kafka-connect-aws-s3-test:some/other/path` BATCH=100 STOREAS `TEXT` LIMIT 5000

Concepts 

The connector reads files in AVRO, Parquet, CSV, text, json, or binary/byte information from an S3 bucket into Kafka connect.

Configuration 

An example configuration is provided:

name=S3SourceConnectorParquet # this can be anything
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
tasks.max=1
connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
connect.s3.aws.secret.key=SECRET_KEY
connect.s3.aws.access.key=ACCESS_KEY
connect.s3.aws.auth.mode=Credentials
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8089

You should replace $BUCKET_NAME, $PREFIX_NAME and $TOPIC_NAME with the names of the bucket, desired prefix and topic.

Please read below for a detailed explanation of these and other options, including the meaning of WITH_FLUSH_COUNT and its alternatives.

Auth Mode configuration 

2 Authentication modes are available:

Credentials 

ACCESS_KEY and SECRET_KEY are credentials generated within AWS IAM and must be set and configured with permissions to write to the desired S3 bucket.

aws.auth.mode=Credentials
aws.access.key=ACCESS_KEY
aws.secret.key=SECRET_KEY
Default 

In this auth mode no credentials need be supplied. If no auth mode is specified, then this default will be used.

aws.auth.mode=Default

The credentials will be discovered through the default chain, in this order:

  • Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  • Java System Properties - aws.accessKeyId and aws.secretKey
  • Web Identity Token credentials from the environment or container
  • Credential profiles file at the default location (~/.aws/credentials)
  • EC2 Credentials delivered through the Amazon EC2 container service
  • Instance profile credentials delivered through the Amazon EC2 metadata service

The full details of the default chain are available on S3 Documentation

Format configuration 

Format configuration is provided by kcql.

The options for json, avro and parquet will look like the below:

connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `JSON`

The options for the formats are case-insensitive, but they are presented here in the most readable form.

Input Formats and Expected Extensions 

There are multiple options for CSV and BYTES reading that you should be aware of. More is explained below.

The connector will automatically search for any files matching the expected extensions in the given path.

FormatExpected ExtensionsVariations
JSON*.json
AVRO*.avro
PARQUET*.parquet
TEXT*.text
CSV*.csvCSV, CSV_WithHeaders
BYTES*.bytesBytes_KeyAndValueWithSizes, Bytes_KeyWithSize, Bytes_ValueWithSize, Bytes_KeyOnly, Bytes_ValueOnly
JSON Input Format Configuration 

Using JSON as an input format allows you to read in files containing JSON content (delimited by new lines), line by line.

STOREAS `JSON`

Please note: The JSON is not parsed by the S3 Source connector. There is no difference in handling between Json and Text by the S3 Source connector.

value.converter=org.apache.kafka.connect.storage.StringConverter
Avro Input Format Configuration 

Using Avro as the input format allows you to read the Avro-stored messages on S3 back into Kafka’s native format.

STOREAS `Avro`

It may also be necessary to configure the message converter:

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8089 
Parquet Input Format Configuration 

Using Parquet as the input format allows you to read parquet files stored on S3, importing the Avro schemas and values.

STOREAS `Parquet`

It may also be necessary to configure the message converter:

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8089 
Text Input Format Configuration 

If the source files on S3 consist of files containing lines of text, then using the text input format may be desired.

STOREAS `Text`

It may be required to use the additional configuration options for this connector to ensure that the value is presented as a String.

value.converter=org.apache.kafka.connect.storage.StringConverter
CSV Input Format Configuration 

This reads CSV files written to S3 into a string to be written back to the Kafka source. There are 2 options for CSV format, if WithHeaders is included then the first row is skipped.

STOREAS `CSV_WithHeaders`
STOREAS `CSV`

Please note there is little distinction between the handling of CSV and handling of TEXT (with the exception that the header row can be skipped). The CSV is not parsed within the connector.

Byte(Binary) Input Format Configuration 

Bytes can be read back in from S3 and back into message keys/values, depending on how the data was written to the source.

This can be used for reading back in a messages containing binary data that were written out using the s3 source, or alternatively reading binary files to be loaded onto a Kafka queue.

Please see the KCQL options available and the results of these configurations:

OptionKCQL ConfigurationRecords read from file as
Key and Value (With Sizes)STOREAS Bytes_KeyAndValueWithSizesLong Long Bytes Bytes
Key (With Size)STOREAS Bytes_KeyWithSizeLong Bytes
Value (With Size)STOREAS Bytes_ValueWithSizeLong Bytes
Key OnlySTOREAS Bytes_KeyOnlyBytes
Value OnlySTOREAS Bytes_ValueOnlyBytes

Using the “With Sizes” options the Source assumes that the files will contain one or two (depending on configuration) 8-byte chunks of data at the start of the file instructing how many bytes to read for the content.

In order to ensure the message is passed through as bytes it may be necessary to set the additional configuration options

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Limit Options 

To limit the number of file names the source reads from S3 in a single poll, use

BATCH = 100

In order to limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause.

LIMIT 1000

Partition Extraction 

To ensure that the original partitions are passed back into Kafka Connect when reading from the source, a partition extractor can be configured to extract the partition from the filename.

The following two new properties are introduced:

connect.s3.source.partition.extractor.type

Can either be hierarchical (which matches the topic/partition/offset.json format the sink uses by default) or regex (where you can supply a custom regular expression to extract the partition).

connect.s3.source.partition.extractor.regex

If using regex, the regular expression to use to parse the file path and retrieve the partition. Only one lookup group is expected. For an example regex please see the regex for hierarchical, which is:

(?i)^(?:.*)\/([0-9]*)\/(?:[0-9]*)[.](?:Json|Avro|Parquet|Text|Csv|Bytes)$

Quickstart 

Preparing the source system 

The S3 Source Connector requires AWS access keys and a bucket to be set up prior to running.

Configuring access keys 

The S3 connector requires AWS access keys to be able to perform actions in AWS.

You can configure these through AWS console or CLI tools.

Please see AWS documentation for more details

Preparing a target kafka queue 

If you are using lenses you can achieve creation of a Kafka queue simply by using the topic management features of lenses.

Otherwise, change to the directory you have kafka installed

cd ~/Software/kafka_2.12-2.5.0/bin
./kafka-topics.sh --create --bootstrap-server=localhost:9092 --topic=test_topic --partitions=3 --replication-factor=1

Creating a source bucket 

Please ensure you use sensible selections for security and access options. More information on the command at AWS’s documentation .

aws  create-bucket --bucket test-kafka-connect-bucket

Populate the source files in S3 

echo '{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}' > 1.json
aws s3 cp 1.json s3://test-kafka-connect-bucket/jsonTest/test_topic/1.json

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page , select S3 as the source and paste the following:

name=S3SourceConnectorS3 # this can be anything
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
topics=$TOPIC_NAME
tasks.max=1
connect.s3.kcql=insert into test_topic select * from test-kafka-connect-bucket:jsonTest STOREAS `json` LIMIT 5000 
connect.s3.aws.access.key=ACCESS_KEY
connect.s3.aws.secret.key=SECRET_KEY
connect.s3.aws.auth.mode=Credentials

To start the connector without using Lenses, log into the fastdatadev container:


docker exec -ti fastdata /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli :

connect-cli create aws-s3-source < connector.properties

Wait a for the connector to start and check its running:

connect-cli status aws-s3-source

Check for data in kafka 

If you are using Lenses, login into Lenses and navigate to the explore page , and select the new topic you added.

View the topic and verify it has data in.

Otherwise, you can use the kafka-console-consumer script that comes with Kafka.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jsonTest --from-beginning

Clean up 

Bring down the stack:

docker-compose down

Options 

NameDescriptionTypeAvailable ValuesDefault Value
connect.s3.aws.access.keyAccess Keystring
connect.s3.aws.secret.keySecret Keystring
connect.s3.aws.regionRegionstring
connect.s3.aws.clientClientstringaws, jcloudsAWS
connect.s3.pool.max.connectionsConnection Pool Max Connectionsint-1 (undefined)50 (aws), 30 (jclouds)
connect.s3.aws.auth.modeAuth ModestringCredentials, DefaultDefault
connect.s3.custom.endpointCustom Endpointstring
connect.s3.kcqlKCQL Stringstringkcql configuration
connect.s3.vhost.bucketEnable Vhost Bucketsbooleantrue, falsefalse
connect.s3.source.partition.extractor.typeExtractor Typestringhierarchical, regex
connect.s3.source.partition.extractor.regexExtractor Regexstring