AWS S3


A Kafka Connect sink connector for writing records from Kafka to AWS S3 Buckets.

KCQL support 

The following KCQL is supported:

INSERT INTO bucketAddress:pathPrefix
SELECT *
FROM kafka-topic
[PARTITIONBY (partition[, partition] ...)]
[STOREAS storage_format]
[WITHPARTITIONER partitioner]
[WITH_FLUSH_SIZE = flush_size]
[WITH_FLUSH_INTERVAL = flush_interval]
[WITH_FLUSH_COUNT = flush_count]

Examples:

-- Insert mode, select all fields from all configured topics (`*`) and 
-- write underneath prefix on testS3Bucket.  Using the wildcard ensures
-- you don't have to define the topics twice in your connector configuration.
INSERT INTO testS3Bucket:pathToWriteTo SELECT * FROM `*`

-- Insert mode, select all fields from topicA and
-- write underneath prefix on testS3Bucket
INSERT INTO testS3Bucket:pathToWriteTo SELECT * FROM topicA

-- Insert mode, select all fields from topicA and
-- write underneath prefix on testS3Bucket as AVRO with
-- a maximum of 5000 records per file
INSERT INTO testS3Bucket:pathToWriteTo SELECT * FROM topicA STOREAS `AVRO` WITH_FLUSH_COUNT = 5000

-- Insert mode, select all fields from "topic-with-headers" and
-- write underneath "headerpartitioningdemo" prefix on bucket
-- "kafka-connect-aws-s3-test" partitioning files by country
-- code and facilityNum data in the message header
INSERT INTO kafka-connect-aws-s3-test:headerpartitioningdemo SELECT * FROM topic-with-headers PARTITIONBY _header.facilityCountryCode, _header.facilityNum STOREAS `TEXT` WITH_FLUSH_COUNT = 100

Concepts 

The connector accepts messages containing AVRO, text, binary byte information from Kafka Connect and can output it to a number of different formats on Amazon S3.

Configuration 

An example configuration is provided:

name=S3SinkConnectorS3 # this can be anything
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
topics=$TOPIC_NAME
tasks.max=1
connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_COUNT = 5000
aws.access.key=ACCESS_KEY
aws.secret.key=SECRET_KEY
aws.auth.mode=Credentials

You should replace $BUCKET_NAME, $PREFIX_NAME and $TOPIC_NAME with the names of the bucket, desired prefix and topic.

Please read below for a detailed explanation of these and other options, including the meaning of WITH_FLUSH_COUNT and its alternatives.

Auth Mode configuration 

2 Authentication modes are available:

Credentials 

ACCESS_KEY and SECRET_KEY are credentials generated within AWS IAM and must be set and configured with permissions to write to the desired S3 bucket.

aws.auth.mode=Credentials
aws.access.key=ACCESS_KEY
aws.secret.key=SECRET_KEY
Default 

In this auth mode no credentials need be supplied. If no auth mode is specified, then this default will be used.

aws.auth.mode=Default

The credentials will be discovered through the default chain, in this order:

  • Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  • Java System Properties - aws.accessKeyId and aws.secretKey
  • Web Identity Token credentials from the environment or container
  • Credential profiles file at the default location (~/.aws/credentials)
  • EC2 Credentials delivered through the Amazon EC2 container service
  • Instance profile credentials delivered through the Amazon EC2 metadata service

The full details of the default chain are available on S3 Documentation

Format configuration 

Format configuration is provided by kcql.

The options for json, avro and parquet will look like the below:

connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `JSON`

The options for the formats are case-insensitive, but they are presented here in the most readable form.

JSON Output Format Configuration 

Using JSON as an output format allows you to convert the complex AVRO structure of the message to a simpler schemaless Json format on output.

STOREAS `JSON`
Avro Output Format Configuration 

Using Avro as the output format allows you to output the Avro message.

STOREAS `Avro`
Parquet Output Format Configuration 

Using Parquet as the output format allows you to output the Avro message to a file readable by a parquet reader, including schemas.

STOREAS `Parquet`
Text Output Format Configuration 

If the incoming kafka message contains text only and this is to be pushed through to the S3 sink as is, then this option may be desired.

STOREAS `Text`

It will may be required to use the additional configuration options for this connector to ensure that the value is presented as a String.

value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
CSV Output Format Configuration 

This converts the fields of the Avro message to string values to be written out to a CSV file. There are 2 options for CSV format, to write CSV files with the column headers or without.

STOREAS `CSV_WithHeaders`
STOREAS `CSV`
Byte(Binary) Output Format Configuration 

Bytes can be written from the message key and/or the message value depending on which option is configured.

The key only/ value only options can be useful for, as an example, stitching together multiple parts of a binary file.

The content sizes are output first to an (8-byte) long.

In order to ensure the message is passed through as bytes it may be necessary to set the additional configuration options

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Please see the KCQL options available and the results of these configurations:

OptionKCQL ConfigurationRecords written to file as
Key and Value (With Sizes)STOREAS Bytes_KeyAndValueWithSizesLong Long Bytes Bytes
Key (With Size)STOREAS Bytes_KeyWithSizeLong Bytes
Value (With Size)STOREAS Bytes_ValueWithSizeLong Bytes
Key OnlySTOREAS Bytes_KeyOnlyBytes
Value OnlySTOREAS Bytes_ValueOnlyBytes

Flush configuration 

Flush configuration is provided by kcql

FLUSH_COUNT 

The flush occurs after the configured number of records written to the sink.

For example, if you want a file written for every record, you would set the FLUSH_COUNT to 1. If you want a file written for each 10,000 records, then you would set the FLUSH_COUNT to 10000

connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_COUNT = 1
FLUSH_SIZE 

The flush occurs after the configured size in bytes is exceeded.

For example, to flush after each 10000 bytes written to a file, you would set the FLUSH_SIZE to 10000.

connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_SIZE = 10000
FLUSH_INTERVAL 

The flush occurs after the configured interval.

For example, to roll over to a new file after each 10 minutes, you would set the FLUSH_INTERVAL to 600 (10 minutes * 60 seconds)

connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_INTERVAL = 600

Partitioning Options 

There are 2 options for grouping (partitioning) the output files.

Default Partitioning 

S3 Bucket Layout 

In your S3 bucket the sink files will be stored as

bucket/prefix/topic/partition/offset.ext

Where .ext is the appropriate file extension.

(Please note: the prefix must not be divided into subpaths.)

Custom Partitioning 

S3 Bucket Layout (Custom) 

This allows you to store the sink files in your S3 bucket as

bucket/prefix/customKey1=customValue/topic(partition_offset).ext

or

bucket/prefix/customValue/topic(partition_offset).ext

The custom keys and values can be taken from the kafka message key, from the value record, or the message headers (supporting string-coaxable headers only).

Again, .ext is the appropriate file extension.

(Please note: the prefix must not be divided into subpaths.)

Configuring Custom Partitioning 

The number of partitions you may configure on your sink is unbounded but bear in mind restrictions on AWS S3 key lengths.

Partitions from Message Values 

To pull fields from the message values, just use the name of the field from the Avro message.

The fields from the message must be primitive types (string, int, long, etc) in order to partition by them.

Add this to your KCQL string:

    PARTITIONBY fieldA[,fieldB]
Partitions from Message Keys 

It is possible to partition by the entire message key, as long as the key is coercible into a primitive type:

    PARTITIONBY _key

Where the Kafka message key is not a primitive but a complex Avro object, it is possible to partition by individual fields within the key.

    PARTITIONBY _key.fieldA[, _key.fieldB]
Partitions from Message Headers 

Kafka message headers may be used for partitioning. In this case the header must contain a primitive type easily coercible to a String type.

    PARTITIONBY _header.<header_key1>[,_header.<header_key2>]
Mixing Partition Types 

The above partition types can be mixed to configure advanced partitioning.

For example

    PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
Partitioning by Date 

The formats can be specified using the preconfigured strings from Oracle's Java Documentation

An example of the KCQL configuration:

    PARTITIONBY _date.uuuu,_date.LL,_date.dd WITHPARTITIONER=Values

This will partition by year/month/day for example 2020/10/25

Configuring partition display 
    WITHPARTITIONER=KeysAndValues
    WITHPARTITIONER=Values

Padding Strategy 

To ensure the file can be restored by the S3 Source in order, you should configure the padding strategy.

connect.s3.padding.strategy

The options available are:

  • LeftPad - add 0’s to the left, eg 5 padded to 3 digits becomes 005

  • RightPad - add 0’s to the right, eg 5 padded to 3 digits becomes 500

  • NoOp - this is the default value, and retains the legacy behaviour

    connect.s3.padding.length

Specify the length of the string you wish to pad to. Default, if not set, is 8. If connect.s3.padding.strategy is not set then this property will be ignored.

Kafka payload support 

Dependent on the format required, this sink supports the following Kafka payloads:

  • AVRO (Structs, Maps, Arrays, Primitives). *
  • Text (including any other primitives and string based formats including Json, XML **)
  • Bytes (including any binary formats)

* Schema required when writing to Avro and Parquet formats.

** However you will not be able to partition by Json or XML properties as Text message content is not parsed.

Output Formats 

formatvariations
json
avro
parquet
text
csvCSV, CSV_WithHeaders
byesBytes_KeyAndValueWithSizes, Bytes_KeyWithSize, Bytes_ValueWithSize, Bytes_KeyOnly, Bytes_ValueOnly

See connect payloads for more information.

AVRO and Parquet Compression 

AVRO and Parquet present options for compressing the file as it writes.

The S3 Sink connector exposes these options to allow the advanced user to configure this compression.

Configuration 

connect.s3.compression.codec Allows specification of the compression codec to use. The options are:

  • UNCOMPRESSED
  • SNAPPY
  • GZIP
  • LZ0
  • LZ4
  • BROTLI
  • BZIP2
  • ZSTD
  • DEFLATE
  • XZ If no option is specified then it defaults to UNCOMPRESSED.

connect.s3.compression.level Currently required only for certain codecs when writing to an AVRO sink (see the matrix below). An integer from 1 - 9 specifying the compression level, 9 being the maximum amount of compression.

Compression Support Matrix 

The options for the connect.s3.compression.codec are listed below along with whether they are supported by the Avro or the Parquet writer.

Where the configurations also require a connect.s3.compression.level that is marked with the ⚙️ icon.

CompressionAvro SupportAvro (requires Level)Parquet Support
UNCOMPRESSED
SNAPPY
GZIP
LZ0
LZ4
BROTLI
BZIP2
ZSTD⚙️
DEFLATE⚙️
XZ⚙️

Please note that not all compression libraries are bundled with the S3 connector, therefore you may need to add some to the class path to ensure they work.

Error polices 

The connector supports Error polices.

Quickstart 

Preparing the target system 

The S3 Sink Connector requires AWS access keys and a bucket to be set up prior to running.

Configuring access keys 

The S3 connector requires AWS access keys to be able to perform actions in AWS.

You can configure these through AWS console or CLI tools.

Please see AWS documentation for more details.

Creating a target bucket 

Please ensure you use sensible selections for security and access options. More information on the command at AWS’s documentation.

aws create-bucket –bucket test-kafka-connect-bucket

Start the connector 

If you are using Lenses, login into Lenses and navigate to the connectors page, select S3 as the sink and paste the following:

    name=S3SinkConnectorS3 # this can be anything
    connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
    topics=$TOPIC_NAME
    tasks.max=1
    connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_COUNT = 5000
    aws.access.key=ACCESS_KEY
    aws.secret.key=SECRET_KEY
    aws.auth.mode=Credentials

To start the connector using the command line, log into the lenses-box container:


docker exec -ti lenses-box /bin/bash

and create a connector.properties file containing the properties above.

Create the connector, with the connect-cli:

connect-cli create aws-s3-sink < connector.properties

Wait for the connector to start and check it’s running:

connect-cli status aws-s3-sink

Inserting test data 

In the to lenses-box container start the kafka producer shell:


kafka-avro-console-producer \
 --broker-list localhost:9092 --topic orders \
 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'

the console is now waiting for your input, enter the following:


{
  "id": 1,
  "created": "2016-05-06 13:53:00",
  "product": "OP-DAX-P-20150201-95.7",
  "price": 94.2,
  "qty": 100
}

Check for data in S3 

Use the aws console. Alternatively on the command line run

aws ls s3://mybucket/mykey --recursive

Clean up 

Bring down the stack:

docker-compose down

Local file writing 

The connector builds complete files locally before uploading in one operation in the commit.

To optionally supply a directory to write the files to locally. If none is supplied and BuildLocal mode is used, then a directory will be created in your system temporary directory (eg /tmp)

    connect.s3.local.tmp.directory

Files are currently limited to 5GB. This can be addressed in future if it is required.

Error handling 

Various properties for managing the error handling are supplied.

connect.s3.error.policy 

Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.s3.max.retries. All errors will be logged automatically, even if the code swallows them.

connect.s3.max.retries 

The maximum number of times to try the write again.

connect.s3.retry.interval 

The time in milliseconds between retries.

connect.s3.http.max.retries 

Number of times to retry the http request, in the case of a resolvable error on the server side.

connect.s3.http.retry.interval 

If greater than zero, used to determine the delay after which to retry the http request in milliseconds. Based on an exponential backoff algorithm.

Options 

NameDescriptionTypeAvailable ValuesDefault Value
connect.s3.aws.access.keyAccess Keystring
connect.s3.aws.secret.keySecret Keystring
connect.s3.aws.regionRegionstring
connect.s3.aws.clientClientstringaws, jcloudsAWS
connect.s3.pool.max.connectionsConnection Pool Max Connectionsint-1 (undefined)50 (aws), 30 (jclouds)
connect.s3.aws.auth.modeAuth ModestringCredentials, DefaultDefault
connect.s3.custom.endpointCustom Endpointstring
connect.s3.vhost.bucketEnable Vhost Bucketsbooleantrue, falsefalse
connect.s3.error.policyError PolicystringNOOP, THROW, RETRYTHROW
connect.s3.max.retriesMaximum Retriesint20
connect.s3.retry.intervalRetry Intervalint60000
connect.s3.http.max.retriesHTTP Maximum Retrieslong5
connect.s3.http.retry.intervalHTTP Retry Interval (Exponential Backoff)long50
connect.s3.local.tmp.directoryLocal Staging Areastring
connect.s3.kcqlKCQL Stringstringkcql configuration
connect.s3.compression.codecCompression CodecstringUNCOMPRESSED, SNAPPY, GZIP, LZ0, LZ4, BROTLI, BZIP2, ZSTD, DEFLATE, XZUNCOMPRESSED
connect.s3.compression.levelCompression Levelint1-9
connect.s3.padding.strategyPadding StrategystringNoOp, LeftPad, RightPadNoOp
connect.s3.padding.lengthLength to Pad Toint
connect.s3.seek.max.filesMax Index File Error Thresholdint5
connect.s3.seek.migration.enabledOne-time Connector Upgrade from Legacy Seekbooleantrue, falsefalse
--
Last modified: November 18, 2024