A Kafka Connect sink connector for writing records from Kafka to AWS S3 Buckets.
The following KCQL is supported:
INSERT INTO bucketAddress:pathPrefix SELECT * FROM kafka-topic [PARTITIONBY (partition[, partition] ...)] [STOREAS storage_format] [WITHPARTITIONER partitioner] [WITH_FLUSH_SIZE = flush_size] [WITH_FLUSH_INTERVAL = flush_interval] [WITH_FLUSH_COUNT = flush_count]
Examples:
-- Insert mode, select all fields from all configured topics (`*`) and -- write underneath prefix on testS3Bucket. Using the wildcard ensures -- you don't have to define the topics twice in your connector configuration. INSERT INTO testS3Bucket:pathToWriteTo SELECT * FROM `*` -- Insert mode, select all fields from topicA and -- write underneath prefix on testS3Bucket INSERT INTO testS3Bucket:pathToWriteTo SELECT * FROM topicA -- Insert mode, select all fields from topicA and -- write underneath prefix on testS3Bucket as AVRO with -- a maximum of 5000 records per file INSERT INTO testS3Bucket:pathToWriteTo SELECT * FROM topicA STOREAS `AVRO` WITH_FLUSH_COUNT = 5000 -- Insert mode, select all fields from "topic-with-headers" and -- write underneath "headerpartitioningdemo" prefix on bucket -- "kafka-connect-aws-s3-test" partitioning files by country -- code and facilityNum data in the message header INSERT INTO kafka-connect-aws-s3-test:headerpartitioningdemo SELECT * FROM topic-with-headers PARTITIONBY _header.facilityCountryCode, _header.facilityNum STOREAS `TEXT` WITH_FLUSH_COUNT = 100
The connector accepts messages containing AVRO, text, binary byte information from Kafka Connect and can output it to a number of different formats on Amazon S3.
An example configuration is provided:
name=S3SinkConnectorS3 # this can be anything connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector topics=$TOPIC_NAME tasks.max=1 connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_COUNT = 5000 aws.access.key=ACCESS_KEY aws.secret.key=SECRET_KEY aws.auth.mode=Credentials
You should replace $BUCKET_NAME, $PREFIX_NAME and $TOPIC_NAME with the names of the bucket, desired prefix and topic.
Please read below for a detailed explanation of these and other options, including the meaning of WITH_FLUSH_COUNT and its alternatives.
2 Authentication modes are available:
ACCESS_KEY and SECRET_KEY are credentials generated within AWS IAM and must be set and configured with permissions to write to the desired S3 bucket.
aws.auth.mode=Credentials aws.access.key=ACCESS_KEY aws.secret.key=SECRET_KEY
In this auth mode no credentials need be supplied. If no auth mode is specified, then this default will be used.
aws.auth.mode=Default
The credentials will be discovered through the default chain, in this order:
Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEYJava System Properties - aws.accessKeyId and aws.secretKeyWeb Identity Token credentials from the environment or containerCredential profiles file at the default location (~/.aws/credentials)EC2 Credentials delivered through the Amazon EC2 container serviceInstance profile credentials delivered through the Amazon EC2 metadata service
The full details of the default chain are available on S3 Documentation
Format configuration is provided by kcql.
The options for json, avro and parquet will look like the below:
connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `JSON`
The options for the formats are case-insensitive, but they are presented here in the most readable form.
Using JSON as an output format allows you to convert the complex AVRO structure of the message to a simpler schemaless Json format on output.
STOREAS `JSON`
Using Avro as the output format allows you to output the Avro message.
STOREAS `Avro`
Using Parquet as the output format allows you to output the Avro message to a file readable by a parquet reader, including schemas.
STOREAS `Parquet`
If the incoming kafka message contains text only and this is to be pushed through to the S3 sink as is, then this option may be desired.
STOREAS `Text`
It will may be required to use the additional configuration options for this connector to ensure that the value is presented as a String.
value.converter=org.apache.kafka.connect.storage.StringConverter key.converter=org.apache.kafka.connect.storage.StringConverter
This converts the fields of the Avro message to string values to be written out to a CSV file. There are 2 options for CSV format, to write CSV files with the column headers or without.
STOREAS `CSV_WithHeaders` STOREAS `CSV`
Bytes can be written from the message key and/or the message value depending on which option is configured.
The key only/ value only options can be useful for, as an example, stitching together multiple parts of a binary file.
The content sizes are output first to an (8-byte) long.
In order to ensure the message is passed through as bytes it may be necessary to set the additional configuration options
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
Please see the KCQL options available and the results of these configurations:
Bytes_KeyAndValueWithSizes
Bytes_KeyWithSize
Bytes_ValueWithSize
Bytes_KeyOnly
Bytes_ValueOnly
Flush configuration is provided by kcql
The flush occurs after the configured number of records written to the sink.
For example, if you want a file written for every record, you would set the FLUSH_COUNT to 1. If you want a file written for each 10,000 records, then you would set the FLUSH_COUNT to 10000
connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_COUNT = 1
The flush occurs after the configured size in bytes is exceeded.
For example, to flush after each 10000 bytes written to a file, you would set the FLUSH_SIZE to 10000.
connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_SIZE = 10000
The flush occurs after the configured interval.
For example, to roll over to a new file after each 10 minutes, you would set the FLUSH_INTERVAL to 600 (10 minutes * 60 seconds)
connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `json` WITH_FLUSH_INTERVAL = 600
There are 2 options for grouping (partitioning) the output files.
In your S3 bucket the sink files will be stored as
bucket/prefix/topic/partition/offset.ext
Where .ext is the appropriate file extension.
(Please note: the prefix must not be divided into subpaths.)
This allows you to store the sink files in your S3 bucket as
bucket/prefix/customKey1=customValue/topic(partition_offset).ext
or
bucket/prefix/customValue/topic(partition_offset).ext
The custom keys and values can be taken from the kafka message key, from the value record, or the message headers (supporting string-coaxable headers only).
Again, .ext is the appropriate file extension.
The number of partitions you may configure on your sink is unbounded but bear in mind restrictions on AWS S3 key lengths.
To pull fields from the message values, just use the name of the field from the Avro message.
The fields from the message must be primitive types (string, int, long, etc) in order to partition by them.
Add this to your KCQL string:
PARTITIONBY fieldA[,fieldB]
It is possible to partition by the entire message key, as long as the key is coercible into a primitive type:
PARTITIONBY _key
Where the Kafka message key is not a primitive but a complex Avro object, it is possible to partition by individual fields within the key.
PARTITIONBY _key.fieldA[, _key.fieldB]
Kafka message headers may be used for partitioning. In this case the header must contain a primitive type easily coercible to a String type.
PARTITIONBY _header.<header_key1>[,_header.<header_key2>]
The above partition types can be mixed to configure advanced partitioning.
For example
PARTITIONBY fieldA, _key.fieldB, _headers.fieldC
The formats can be specified using the preconfigured strings from Oracle's Java Documentation
An example of the KCQL configuration:
PARTITIONBY _date.uuuu,_date.LL,_date.dd WITHPARTITIONER=Values
This will partition by year/month/day for example 2020/10/25
WITHPARTITIONER=KeysAndValues WITHPARTITIONER=Values
To ensure the file can be restored by the S3 Source in order, you should configure the padding strategy.
connect.s3.padding.strategy
The options available are:
LeftPad - add 0’s to the left, eg 5 padded to 3 digits becomes 005
LeftPad
RightPad - add 0’s to the right, eg 5 padded to 3 digits becomes 500
RightPad
NoOp - this is the default value, and retains the legacy behaviour
NoOp
connect.s3.padding.length
Specify the length of the string you wish to pad to. Default, if not set, is 8. If connect.s3.padding.strategy is not set then this property will be ignored.
Dependent on the format required, this sink supports the following Kafka payloads:
* Schema required when writing to Avro and Parquet formats.
** However you will not be able to partition by Json or XML properties as Text message content is not parsed.
See connect payloads for more information.
AVRO and Parquet present options for compressing the file as it writes.
The S3 Sink connector exposes these options to allow the advanced user to configure this compression.
connect.s3.compression.codec Allows specification of the compression codec to use. The options are:
connect.s3.compression.codec
connect.s3.compression.level Currently required only for certain codecs when writing to an AVRO sink (see the matrix below). An integer from 1 - 9 specifying the compression level, 9 being the maximum amount of compression.
connect.s3.compression.level
The options for the connect.s3.compression.codec are listed below along with whether they are supported by the Avro or the Parquet writer.
Where the configurations also require a connect.s3.compression.level that is marked with the ⚙️ icon.
Please note that not all compression libraries are bundled with the S3 connector, therefore you may need to add some to the class path to ensure they work.
The connector supports Error polices.
The S3 Sink Connector requires AWS access keys and a bucket to be set up prior to running.
The S3 connector requires AWS access keys to be able to perform actions in AWS.
You can configure these through AWS console or CLI tools.
Please see AWS documentation for more details.
Please ensure you use sensible selections for security and access options. More information on the command at AWS’s documentation.
aws create-bucket –bucket test-kafka-connect-bucket
If you are using Lenses, login into Lenses and navigate to the connectors page, select S3 as the sink and paste the following:
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create aws-s3-sink < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status aws-s3-sink
In the lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'
the console is now waiting for your input, enter the following:
{ "id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty": 100 }
Use the aws console. Alternatively on the command line run
aws ls s3://mybucket/mykey --recursive
Bring down the stack:
docker-compose down
The connector builds complete files locally before uploading in one operation in the commit.
To optionally supply a directory to write the files to locally. If none is supplied and BuildLocal mode is used, then a directory will be created in your system temporary directory (eg /tmp)
connect.s3.local.tmp.directory
Files are currently limited to 5GB. This can be addressed in future if it is required.
Various properties for managing the error handling are supplied.
Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.s3.max.retries. All errors will be logged automatically, even if the code swallows them.
The maximum number of times to try the write again.
The time in milliseconds between retries.
Number of times to retry the http request, in the case of a resolvable error on the server side.
If greater than zero, used to determine the delay after which to retry the http request in milliseconds. Based on an exponential backoff algorithm.
On this page