A Kafka Connect source connector for writing records from Kafka to AWS S3 Buckets.
The following KCQL is supported:
INSERT INTO kafka-topic SELECT * FROM bucketAddress:pathPrefix [BATCH=batch] [STOREAS storage_format] [LIMIT limit]
Examples:
-- Insert mode, select all fields from the json files -- within the bucket and path given on testS3Bucket -- using the default number of records INSERT INTO topicA SELECT * FROM testS3Bucket:pathToReadFrom -- Insert mode, select all fields from the avro files -- within the bucket and path given on testS3Bucket -- retrieving 5000 records on each poll from Kafka -- Connect INSERT INTO topicA SELECT * FROM testS3Bucket:pathToReadFrom STOREAS `AVRO` LIMIT 5000 -- Insert mode, select the content from the text (*.txt) -- files under a path within the bucket. -- The connector will return 5000 results for each poll call. -- Each request to list files to S3 will retrieve a list of 100 files at a time. INSERT INTO topicA SELECT * FROM `kafka-connect-aws-s3-test:some/other/path` BATCH=100 STOREAS `TEXT` LIMIT 5000
The connector reads files in AVRO, Parquet, CSV, text, json, or binary/byte information from an S3 bucket into Kafka connect.
An example configuration is provided:
name=S3SourceConnectorParquet # this can be anything connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector tasks.max=1 connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet` connect.s3.aws.secret.key=SECRET_KEY connect.s3.aws.access.key=ACCESS_KEY connect.s3.aws.auth.mode=Credentials value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8089
You should replace $BUCKET_NAME, $PREFIX_NAME and $TOPIC_NAME with the names of the bucket, desired prefix and topic.
Please read below for a detailed explanation of these and other options, including the meaning of WITH_FLUSH_COUNT and its alternatives.
Two authentication modes are available:
ACCESS_KEY and SECRET_KEY are credentials generated within AWS IAM and must be set and configured with permissions to read from the desired S3 bucket.
aws.auth.mode=Credentials aws.access.key=ACCESS_KEY aws.secret.key=SECRET_KEY
The S3 Source Connector requires at least the following permissions:
In this auth mode no credentials need be supplied. If no auth mode is specified, then this default will be used.
aws.auth.mode=Default
The credentials will be discovered through the default chain, in this order:
Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEYJava System Properties - aws.accessKeyId and aws.secretKeyWeb Identity Token credentials from the environment or containerCredential profiles file at the default location (~/.aws/credentials)EC2 Credentials delivered through the Amazon EC2 container serviceInstance profile credentials delivered through the Amazon EC2 metadata service
The full details of the default chain are available on S3 Documentation
Format configuration is provided by kcql.
The options for json, avro and parquet will look like the below:
connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `JSON`
The options for the formats are case-insensitive, but they are presented here in the most readable form.
There are multiple options for CSV and BYTES reading that you should be aware of. More is explained below.
The connector will automatically search for any files matching the expected extensions in the given path.
Using JSON as an input format allows you to read in files containing JSON content (delimited by new lines), line by line.
STOREAS `JSON`
Please note: The JSON is not parsed by the S3 Source connector. There is no difference in handling between Json and Text by the S3 Source connector.
value.converter=org.apache.kafka.connect.storage.StringConverter
Using Avro as the input format allows you to read the Avro-stored messages on S3 back into Kafka’s native format.
STOREAS `Avro`
It may also be necessary to configure the message converter:
value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8089
Using Parquet as the input format allows you to read parquet files stored on S3, importing the Avro schemas and values.
STOREAS `Parquet`
If the source files on S3 consist of files containing lines of text, then using the text input format may be desired.
STOREAS `Text`
It may be required to use the additional configuration options for this connector to ensure that the value is presented as a String.
This reads CSV files written to S3 into a string to be written back to the Kafka source. There are two options for CSV format, if WithHeaders is included then the first row is skipped.
STOREAS `CSV_WithHeaders` STOREAS `CSV`
Please note there is little distinction between the handling of CSV and handling of TEXT (with the exception that the header row can be skipped). The CSV is not parsed within the connector.
Bytes can be read back in from S3 and back into message keys/values, depending on how the data was written to the source.
This can be used for reading back in a messages containing binary data that were written out using the s3 source, or alternatively reading binary files to be loaded onto a Kafka queue.
Please see the KCQL options available and the results of these configurations:
Bytes_KeyAndValueWithSizes
Bytes_KeyWithSize
Bytes_ValueWithSize
Bytes_KeyOnly
Bytes_ValueOnly
Using the “With Sizes” options the Source assumes that the files will contain one or two (depending on configuration) 8-byte chunks of data at the start of the file instructing how many bytes to read for the content.
In order to ensure the message is passed through as bytes it may be necessary to set the additional configuration options
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
To limit the number of file names the source reads from S3 in a single poll, use
BATCH = 100
In order to limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause.
LIMIT 1000
To ensure that the original partitions are passed back into Kafka Connect when reading from the source, a partition extractor can be configured to extract the partition from the filename.
The following two new properties are introduced:
connect.s3.source.partition.extractor.type
Can either be hierarchical (which matches the topic/partition/offset.json format the sink uses by default) or regex (where you can supply a custom regular expression to extract the partition).
hierarchical
topic/partition/offset.json
regex
connect.s3.source.partition.extractor.regex
If using regex, the regular expression to use to parse the file path and retrieve the partition. Only one lookup group is expected. For an example regex please see the regex for hierarchical, which is:
(?i)^(?:.*)\/([0-9]*)\/(?:[0-9]*)[.](?:Json|Avro|Parquet|Text|Csv|Bytes)$
The S3 Source Connector requires AWS access keys and a bucket to be set up prior to running.
The S3 connector requires AWS access keys to be able to perform actions in AWS.
You can configure these through AWS console or CLI tools.
Please see AWS documentation for more details.
If you are using lenses you can achieve creation of a Kafka queue simply by using the topic management features of lenses.
Otherwise, change to the directory you have kafka installed
cd ~/Software/kafka_2.12-2.5.0/bin ./kafka-topics.sh --create --bootstrap-server=localhost:9092 --topic=test_topic --partitions=3 --replication-factor=1
Please ensure you use sensible selections for security and access options. More information on the command at AWS’s documentation.
aws create-bucket --bucket test-kafka-connect-bucket
echo '{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}' > 1.json aws s3 cp 1.json s3://test-kafka-connect-bucket/jsonTest/test_topic/1.json
If you are using Lenses, login into Lenses and navigate to the connectors page, select S3 as the source and paste the following:
name=S3SourceConnectorS3 # this can be anything connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector topics=$TOPIC_NAME tasks.max=1 connect.s3.kcql=insert into test_topic select * from test-kafka-connect-bucket:jsonTest STOREAS `json` LIMIT 5000 connect.s3.aws.access.key=ACCESS_KEY connect.s3.aws.secret.key=SECRET_KEY connect.s3.aws.auth.mode=Credentials
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create aws-s3-source < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status aws-s3-source
If you are using Lenses, login into Lenses and navigate to the explore page, and select the new topic you added.
View the topic and verify it has data in.
Otherwise, you can use the kafka-console-consumer script that comes with Kafka.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jsonTest --from-beginning
Bring down the stack:
docker-compose down
On this page