5.0
AWS S3
A Kafka Connect sink connector for writing records from Kafka to AWS S3 Buckets.
KCQL support
The following KCQL is supported:
INSERT INTO bucketAddress:pathPrefix SELECT * FROM kafka-topic [PARTITIONBY (partition[, partition] ...)] [STOREAS storage_format] [WITHPARTITIONER partitioner] [WITH_FLUSH_SIZE flush_size] [WITH_FLUSH_INTERVAL flush_interval] [WITH_FLUSH_COUNT flush_count]
Examples:
-- Insert mode, select all fields from all configured topics (`*`) and
-- write underneath prefix on testS3Bucket. Using the wildcard ensures
-- you don't have to define the topics twice in your connector configuration.
INSERT INTO testS3Bucket:pathToWriteTo SELECT * FROM `*`
-- Insert mode, select all fields from topicA and
-- write underneath prefix on testS3Bucket
INSERT INTO testS3Bucket:pathToWriteTo SELECT * FROM topicA
-- Insert mode, select all fields from topicA and
-- write underneath prefix on testS3Bucket as AVRO with
-- a maximum of 5000 records per file
INSERT INTO testS3Bucket:pathToWriteTo SELECT * FROM topicA STOREAS `AVRO` WITH_FLUSH_COUNT = 5000
-- Insert mode, select all fields from "topic-with-headers" and
-- write underneath "headerpartitioningdemo" prefix on bucket
-- "kafka-connect-aws-s3-test" partitioning files by country
-- code and facilityNum data in the message header
INSERT INTO kafka-connect-aws-s3-test:headerpartitioningdemo SELECT * FROM topic-with-headers PARTITIONBY _header.facilityCountryCode, _header.facilityNum STOREAS `TEXT` WITH_FLUSH_COUNT = 100
Concepts
The connector accepts messages containing AVRO, text, binary byte information from Kafka Connect and can output it to a number of different formats on Amazon S3.
Configuration
An example configuration is provided:
name=S3SinkConnectorS3 # this can be anything
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
topics=$TOPIC_NAME
tasks.max=1
connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_COUNT = 5000
aws.access.key=ACCESS_KEY
aws.secret.key=SECRET_KEY
aws.auth.mode=Credentials
You should replace $BUCKET_NAME, $PREFIX_NAME and $TOPIC_NAME with the names of the bucket, desired prefix and topic.
Please read below for a detailed explanation of these and other options, including the meaning of WITH_FLUSH_COUNT and its alternatives.
Auth Mode configuration
2 Authentication modes are available:
Credentials
ACCESS_KEY and SECRET_KEY are credentials generated within AWS IAM and must be set and configured with permissions to write to the desired S3 bucket.
aws.auth.mode=Credentials
aws.access.key=ACCESS_KEY
aws.secret.key=SECRET_KEY
Default
In this auth mode no credentials need be supplied. If no auth mode is specified, then this default will be used.
aws.auth.mode=Default
The credentials will be discovered through the default chain, in this order:
- Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
- Java System Properties - aws.accessKeyId and aws.secretKey
- Web Identity Token credentials from the environment or container
- Credential profiles file at the default location (~/.aws/credentials)
- EC2 Credentials delivered through the Amazon EC2 container service
- Instance profile credentials delivered through the Amazon EC2 metadata service
The full details of the default chain are available on S3 Documentation
Format configuration
Format configuration is provided by kcql.
The options for json, avro and parquet will look like the below:
connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `JSON`
The options for the formats are case-insensitive, but they are presented here in the most readable form.
JSON Output Format Configuration
Using JSON as an output format allows you to convert the complex AVRO structure of the message to a simpler schemaless Json format on output.
STOREAS `JSON`
Avro Output Format Configuration
Using Avro as the output format allows you to output the Avro message.
STOREAS `Avro`
Parquet Output Format Configuration
Using Parquet as the output format allows you to output the Avro message to a file readable by a parquet reader, including schemas.
STOREAS `Parquet`
Text Output Format Configuration
If the incoming kafka message contains text only and this is to be pushed through to the S3 sink as is, then this option may be desired.
STOREAS `Text`
It will may be required to use the additional configuration options for this connector to ensure that the value is presented as a String.
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
CSV Output Format Configuration
This converts the fields of the Avro message to string values to be written out to a CSV file. There are 2 options for CSV format, to write CSV files with the column headers or without.
STOREAS `CSV_WithHeaders`
STOREAS `CSV`
Byte(Binary) Output Format Configuration
Bytes can be written from the message key and/or the message value depending on which option is configured.
The key only/ value only options can be useful for, as an example, stitching together multiple parts of a binary file.
The content sizes are output first to an (8-byte) long.
In order to ensure the message is passed through as bytes it may be necessary to set the additional configuration options
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
Please see the KCQL options available and the results of these configurations:
Option | KCQL Configuration | Records written to file as |
---|---|---|
Key and Value (With Sizes) | STOREAS Bytes_KeyAndValueWithSizes | Long Long Bytes Bytes |
Key (With Size) | STOREAS Bytes_KeyWithSize | Long Bytes |
Value (With Size) | STOREAS Bytes_ValueWithSize | Long Bytes |
Key Only | STOREAS Bytes_KeyOnly | Bytes |
Value Only | STOREAS Bytes_ValueOnly | Bytes |
Flush configuration
Flush configuration is provided by kcql
FLUSH_COUNT
The flush occurs after the configured number of records written to the sink.
For example, if you want a file written for every record, you would set the FLUSH_COUNT to 1. If you want a file written for each 10,000 records, then you would set the FLUSH_COUNT to 10000
connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_COUNT = 1
FLUSH_SIZE
The flush occurs after the configured size in bytes is exceeded.
For example, to flush after each 10000 bytes written to a file, you would set the FLUSH_SIZE to 10000.
connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_SIZE = 10000
FLUSH_INTERVAL
The flush occurs after the configured interval.
For example, to roll over to a new file after each 10 minutes, you would set the FLUSH_INTERVAL to 600 (10 minutes * 60 seconds)
connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_INTERVAL = 600
Partitioning Options
There are 2 options for grouping (partitioning) the output files.
Default Partitioning
S3 Bucket Layout
In your S3 bucket the sink files will be stored as
bucket/prefix/topic/partition/offset.ext
Where .ext is the appropriate file extension.
(Please note: the prefix must not be divided into subpaths.)
Custom Partitioning
S3 Bucket Layout
This allows you to store the sink files in your S3 bucket as
bucket/prefix/customKey1=customValue/topic(partition_offset).ext
or
bucket/prefix/customValue/topic(partition_offset).ext
The custom keys and values can be taken from the kafka message key, from the value record, or the message headers (supporting string-coaxable headers only).
Again, .ext is the appropriate file extension.
(Please note: the prefix must not be divided into subpaths.)
Configuring Custom Partitioning
The number of partitions you may configure on your sink is unbounded but bear in mind restrictions on AWS S3 key lengths.
Partitions from Message Values
To pull fields from the message values, just use the name of the field from the Avro message.
The fields from the message must be primitive types (string, int, long, etc) in order to partition by them.
Add this to your KCQL string:
PARTITIONBY fieldA[,fieldB]
Partitions from Message Keys
It is possible to partition by the entire message key, as long as the key is coercible into a primitive type:
PARTITIONBY _key
Where the Kafka message key is not a primitive but a complex Avro object, it is possible to partition by individual fields within the key.
PARTITIONBY _key.fieldA[, _key.fieldB]
Partitions from Message Headers
Kafka message headers may be used for partitioning. In this case the header must contain a primitive type easily coercible to a String type.
PARTITIONBY _header.<header_key1>[,_header.<header_key2>]
Mixing Partition Types
The above partition types can be mixed to configure advanced partitioning.
For example
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
Partitioning by Date
The formats can be specified using the preconfigured strings from Oracle's Java Documentation
An example of the KCQL configuration:
PARTITIONBY _date.uuuu,_date.LL,_date.dd WITHPARTITIONER=Values
This will partition by year/month/day for example 2020/10/25
Configuring partition display
WITHPARTITIONER=KeysAndValues
WITHPARTITIONER=Values
Padding Strategy
To ensure the file can be restored by the S3 Source in order, you should configure the padding strategy.
connect.s3.padding.strategy
The options available are:
LeftPad
- add 0’s to the left, eg 5 padded to 3 digits becomes 005RightPad
- add 0’s to the right, eg 5 padded to 3 digits becomes 500NoOp
- this is the default value, and retains the legacy behaviourconnect.s3.padding.length
Specify the length of the string you wish to pad to. Default, if not set, is 8. If connect.s3.padding.strategy is not set then this property will be ignored.
Kafka payload support
Dependent on the format required, this sink supports the following Kafka payloads:
- AVRO (Structs, Maps, Arrays, Primitives). *
- Text (including any other primitives and string based formats including Json, XML **)
- Bytes (including any binary formats)
* Schema required when writing to Avro and Parquet formats.
** However you will not be able to partition by Json or XML properties as Text message content is not parsed.
Output Formats
format | variations |
---|---|
json | |
avro | |
parquet | |
text | |
csv | CSV, CSV_WithHeaders |
byes | Bytes_KeyAndValueWithSizes, Bytes_KeyWithSize, Bytes_ValueWithSize, Bytes_KeyOnly, Bytes_ValueOnly |
See connect payloads for more information.
AVRO and Parquet Compression
AVRO and Parquet present options for compressing the file as it writes.
The S3 Sink connector exposes these options to allow the advanced user to configure this compression.
Configuration
connect.s3.compression.codec
Allows specification of the compression codec to use. The options are:
- UNCOMPRESSED
- SNAPPY
- GZIP
- LZ0
- LZ4
- BROTLI
- BZIP2
- ZSTD
- DEFLATE
- XZ If no option is specified then it defaults to UNCOMPRESSED.
connect.s3.compression.level
Currently required only for certain codecs when writing to an AVRO sink (see the matrix below). An integer from 1 - 9 specifying the compression level, 9 being the maximum amount of compression.
Compression Support Matrix
The options for the connect.s3.compression.codec are listed below along with whether they are supported by the Avro or the Parquet writer.
Where the configurations also require a connect.s3.compression.level
that is marked with the ⚙️ icon.
Compression | Avro Support | Avro (requires Level) | Parquet Support |
---|---|---|---|
UNCOMPRESSED | ✅ | ✅ | |
SNAPPY | ✅ | ✅ | |
GZIP | ✅ | ||
LZ0 | ✅ | ||
LZ4 | ✅ | ||
BROTLI | ✅ | ||
BZIP2 | ✅ | ||
ZSTD | ✅ | ⚙️ | ✅ |
DEFLATE | ✅ | ⚙️ | |
XZ | ✅ | ⚙️ |
Please note that not all compression libraries are bundled with the S3 connector, therefore you may need to add some to the class path to ensure they work.
Error polices
The connector supports Error polices .
Quickstart
Preparing the target system
The S3 Sink Connector requires AWS access keys and a bucket to be set up prior to running.
Configuring access keys
The S3 connector requires AWS access keys to be able to perform actions in AWS.
You can configure these through AWS console or CLI tools.
Please see AWS documentation for more details
Creating a target bucket
Please ensure you use sensible selections for security and access options. More information on the command at AWS’s documentation .
aws create-bucket –bucket test-kafka-connect-bucket
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select S3 as the sink and paste the following:
name=S3SinkConnectorS3 # this can be anything
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
topics=$TOPIC_NAME
tasks.max=1
connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_COUNT = 5000
aws.access.key=ACCESS_KEY
aws.secret.key=SECRET_KEY
aws.auth.mode=Credentials
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create aws-s3-sink < connector.properties
Wait a for the connector to start and check its running:
connect-cli status aws-s3-sink
Inserting test data
In the to fastdata container start the kafka producer shell:
kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}
Check for data in S3
Use the aws console. Alternatively on the command line run
aws ls s3://mybucket/mykey --recursive
Clean up
Bring down the stack:
docker-compose down
Local File Writing
The connector builds complete files locally before uploading in one operation in the commit.
To optionally supply a directory to write the files to locally. If none is supplied and BuildLocal mode is used, then a directory will be created in your system temporary directory (eg /tmp)
connect.s3.local.tmp.directory
Files are currently limited to 5GB. This can be addressed in future if it is required.
Error handling
Various properties for managing the error handling are supplied.
connect.s3.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.s3.max.retries. All errors will be logged automatically, even if the code swallows them.
connect.s3.max.retries
The maximum number of times to try the write again.
connect.s3.retry.interval
The time in milliseconds between retries.
connect.s3.http.max.retries
Number of times to retry the http request, in the case of a resolvable error on the server side.
connect.s3.http.retry.interval
If greater than zero, used to determine the delay after which to retry the http request in milliseconds. Based on an exponential backoff algorithm.
Options
Name | Description | Type | Available Values | Default Value |
---|---|---|---|---|
connect.s3.aws.access.key | Access Key | string | ||
connect.s3.aws.secret.key | Secret Key | string | ||
connect.s3.aws.region | Region | string | ||
connect.s3.aws.client | Client | string | aws, jclouds | AWS |
connect.s3.pool.max.connections | Connection Pool Max Connections | int | -1 (undefined) | 50 (aws), 30 (jclouds) |
connect.s3.aws.auth.mode | Auth Mode | string | Credentials, Default | Default |
connect.s3.custom.endpoint | Custom Endpoint | string | ||
connect.s3.vhost.bucket | Enable Vhost Buckets | boolean | true, false | false |
connect.s3.error.policy | Error Policy | string | NOOP, THROW, RETRY | THROW |
connect.s3.max.retries | Maximum Retries | int | 20 | |
connect.s3.retry.interval | Retry Interval | int | 60000 | |
connect.s3.http.max.retries | HTTP Maximum Retries | long | 5 | |
connect.s3.http.retry.interval | HTTP Retry Interval (Exponential Backoff) | long | 50 | |
connect.s3.local.tmp.directory | Local Staging Area | string | ||
connect.s3.kcql | KCQL String | string | kcql configuration | |
connect.s3.compression.codec | Compression Codec | string | UNCOMPRESSED, SNAPPY, GZIP, LZ0, LZ4, BROTLI, BZIP2, ZSTD, DEFLATE, XZ | UNCOMPRESSED |
connect.s3.compression.level | Compression Level | int | 1-9 | |
connect.s3.padding.strategy | Padding Strategy | string | NoOp, LeftPad, RightPad | NoOp |
connect.s3.padding.length | Length to Pad To | int | ||
connect.s3.seek.max.files | Max Index File Error Threshold | int | 5 | |
connect.s3.seek.migration.enabled | One-time Connector Upgrade from Legacy Seek | boolean | true, false | false |