GCP Storage


The Kafka connector is designed to seamlessly ingest records from GCP Storage Buckets into your Kafka cluster. When paired with the GCP Storage Sink Connector, it empowers you to efficiently stream data from GCP Storage to Kafka or facilitate backup and restore operations.

Notably, this connector provides robust support for exactly-once semantics, to the extend that they are supported by Apache Kafka itself. Follow the documentation here to enable exactly once semantics for your Kafka Connect cluster.

To enhance data integrity, we recommend enabling the transactional producer or, at the very least, the idempotent producer.

Configurations 

NameDescriptionTypeAvailable ValuesDefault Value
connect.gcpstorage.gcp.auth.modeSpecifies the authentication mode for connecting to GCP.string“Credentials”, “File” or “Default”“Default”
connect.gcpstorage.gcp.credentialsFor “auth.mode” credentials: GCP Authentication credentials string.string(Empty)
connect.gcpstorage.gcp.fileFor “auth.mode” file: Local file path for file containing GCP authentication credentials.string(Empty)
connect.gcpstorage.gcp.project.idGCP Project ID.string(Empty)
connect.gcpstorage.gcp.quota.project.idGCP Quota Project ID.string(Empty)
connect.gcpstorage.endpointEndpoint for GCP Storage.string
connect.gcpstorage.error.policyDefines the error handling policy when errors occur during data transfer to or from GCP Storage.string“NOOP,” “THROW,” “RETRY”“THROW”
connect.gcpstorage.max.retriesSets the maximum number of retries the connector will attempt before reporting an error to the Connect Framework.int20
connect.gcpstorage.retry.intervalSpecifies the interval (in milliseconds) between retry attempts by the connector.int60000
connect.gcpstorage.http.max.retriesSets the maximum number of retries for the underlying HTTP client when interacting with GCP Storage.long5
connect.gcpstorage.http.retry.intervalSpecifies the retry interval (in milliseconds) for the underlying HTTP client. An exponential backoff strategy is employed.long50
connect.gcpstorage.kcqlKafka Connect Query Language (KCQL) Configuration to control the connector behaviourstringkcql configuration
connect.gcpstorage.source.extension.excludesA comma-separated list of file extensions to exclude from the source file search.stringfile extension filtering
connect.gcpstorage.source.extension.includesA comma-separated list of file extensions to include in the source file search.stringfile extension filtering
connect.gcpstorage.source.partition.extractor.typeType of Partition Extractor (Hierarchical or Regex)stringhierarchical, regex
connect.gcpstorage.source.partition.extractor.regexRegex Pattern for Partition Extraction (if applicable)string
connect.gcpstorage.source.partition.search.continuousIf set to true the connector will continuously search for new partitions.booleantrue, falsetrue
connect.gcpstorage.source.partition.search.excludesA comma-separated list of paths to exclude from the partition search.string“.indexes”
connect.gcpstorage.source.partition.search.intervalThe interval in milliseconds between searching for new partitions.long300000
connect.gcpstorage.source.partition.search.recurse.levelsControls how many levels deep to recurse when searching for new partitionsint0

For a complete example configuration please see GCP Storage Examples.

KCQL (Kafka Connect Query Language) 

The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:

    INSERT INTO $kafka-topic
    SELECT *
    FROM bucketAddress:pathPrefix
    [BATCH=batch]
    [STOREAS storage_format]
    [LIMIT limit]
    [PROPERTIES(
      'property.1'=x,
      'property.2'=x,
    )]

Please note that you can employ escaping within KCQL for the INSERT INTO, SELECT * FROM, and PARTITIONBY clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:

    INSERT INTO `my-topic-with-hyphen`
    SELECT *
    FROM bucketAddress:pathPrefix

Target Bucket and Path 

The target Kafka topic is specified via INSERT INTO clause. The connector will write all the records to the given topic:

INSERT INTO my-topic SELECT * FROM  testbucket:pathToReadFrom;

SQL projection 

Currently, the connector does not offer support for SQL projection; consequently, anything other than a SELECT * query is disregarded. The connector will faithfully write all the record fields to Kafka exactly as they are.

Source GCP Storage Bucket 

The GCP Storage source location is defined within the FROM clause. The connector will read all files from the given location considering the data partitioning and ordering options. Each data partition will be read by a single connector task.

Properties 

The PROPERTIES clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’). The following properties are supported:

NameDescriptionTypeAvailable ValuesDefault Value
read.text.modeControls how Text content is readEnumRegex, StartEndTag, StartEndLine
read.text.regexRegular Expression for Text Reading (if applicable)String
read.text.start.tagStart Tag for Text Reading (if applicable)String
read.text.end.tagEnd Tag for Text Reading (if applicable)String
read.text.buffer.sizeText Buffer Size (for optimization)Int
read.text.start.lineStart Line for Text Reading (if applicable)String
read.text.end.lineEnd Line for Text Reading (if applicable)String
read.text.trimTrim Text During ReadingBoolean
store.envelopeMessages are stored as “Envelope”Boolean

Throttling 

To limit the number of file names the source reads from GCP Storage in a single poll. The default value, if not specified, is 1000:

    BATCH = 100

In order to limit the number of result rows returned from the source in a single poll operation, you can use the LIMIT clause. The default value, if not specified, is 10000.

    LIMIT 10000

File Extension Filtering 

The GCP Storage Source Connector allows you to filter the files to be processed based on their extensions. This is controlled by two properties: connect.gcpstorage.source.extension.excludes and connect.gcpstorage.source.extension.includes.

Excluding File Extensions 

The connect.gcpstorage.source.extension.excludes property is a comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered. For example, to exclude .txt and .csv files, you would set this property as follows:

connect.s3.source.extension.excludes=txt,csv

Including File Extensions 

The connect.gcpstorage.source.extension.includes property is a comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered. For example, to include only .json and .xml files, you would set this property as follows:

connect.gcpstorage.source.extension.includes=json,xml

Note: If both connect.gcpstorage.source.extension.excludes and connect.gcpstorage.source.extension.includes are set, the connector first applies the exclusion filter and then the inclusion filter.

Time-based partitioning 

For some examples of time-based partitioning using a SMT (Single Message Transform), please see Time-based Partitioning Examples.

Backup and Restore 

When used in tandem with the GCP Storage Sink Connector, the GCP Storage Source Connector becomes a powerful tool for restoring Kafka topics from GCP Storage. To enable this behavior, you should set store.envelope to true. This configuration ensures that the source expects the following data structure in GCP Storage:

{
  "key": <the message Key, which can be a primitive or a complex object>,
  "value": <the message Value, which can be a primitive or a complex object>,
  "headers": {
    "header1": "value1",
    "header2": "value2"
  },
  "metadata": {
    "offset": 0,
    "partition": 0,
    "timestamp": 0,
    "topic": "topic"
  }
}

When the messages are sent to Kafka, the GCP Storage Source Connector ensures that it correctly maps the key, value, headers, and metadata fields (including timestamp and partition) to their corresponding Kafka message fields. Please note that the envelope functionality can only be used with data stored in GCP Storage as Avro, JSON, or Parquet formats.

Partition Extraction 

When the envelope feature is not in use, and data restoration is required, the responsibility falls on the connector to establish the original topic partition value. To ensure that the source correctly conveys the original partitions back to Kafka Connect during reads from the source, a partition extractor can be configured to extract this information from the GCP Storage object key.

To configure the partition extractor, you can utilize the connect.gcpstorage.source.partition.extractor.type property, which supports two options:

  • hierarchical: This option aligns with the default format used by the sink, topic/partition/offset.json.
  • regex: When selected, you can provide a custom regular expression to extract the partition information. Additionally, when using the regex option, you must also set the connect.gcpstorage.source.partition.extractor.regex property. It’s important to note that only one lookup group is expected. For an example of a regular expression pattern, please refer to the pattern used for hierarchical, which is:
(?i)^(?:.*)\/([0-9]*)\/(?:[0-9]*)[.](?:Json|Avro|Parquet|Text|Csv|Bytes)$

GCP storage 

The connector supports a range of storage formats, each with its own distinct functionality:

  • JSON:The connector will read files containing JSON content, each line representing a distinct record.
  • Avro: The connector will read Avro-stored messages from GCP Storage and translate them into Kafka’s native format.
  • Parquet: The connector will read Parquet-stored messages from GCP Storage and translate them into Kafka’s native format.
  • Text: The connector will read files containing lines of text, each line representing a distinct record.
  • CSV: The connector will read files containing lines of text, each line representing a distinct record.
  • CSV_WithHeaders: The connector will read files containing lines of text, each line representing a distinct record, while skipping the header row.
  • Bytes: The connector will read files containing bytes, each file is translated to a Kafka message.

Use the STOREAS clause to configure the storage format. The following options are available:

    STOREAS `JSON`
    STOREAS `Avro`
    STOREAS `Parquet`
    STOREAS `Text`
    STOREAS `CSV`
    STOREAS `CSV_WithHeaders`
    STOREAS `Bytes`

Text extended 

When using Text storage, the connector provides additional configuration options to finely control how text content is processed.

Regex 

In Regex mode, the connector applies a regular expression pattern, and only when a line matches the pattern is it considered a record. For example, to include only lines that start with a number, you can use the following configuration:

   connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:regex STOREAS `text` PROPERTIES('read.text.mode'='regex', 'read.text.regex'='^[1-9].*')

Start-End line 

In Start-End Line mode, the connector reads text content between specified start and end lines, inclusive. This mode is useful when you need to extract records that fall within defined boundaries. For instance, to read records where the first line is ‘SSM’ and the last line is an empty line (’’), you can configure it as follows:

   connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='')

To trim the start and end lines, set the read.text.trim property to true:

    connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:multi_line STOREAS `text` PROPERTIES('read.text.mode'='startEndLine', 'read.text.start.line'='SSM', 'read.text.end.line'='', 'read.text.trim'='true')

Start-End tag 

In Start-End Tag mode, the connector reads text content between specified start and end tags, inclusive. This mode is particularly useful when a single line of text in GCP Storage corresponds to multiple output Kafka messages. For example, to read XML records enclosed between ‘’ and ‘’, configure it as follows:

   connect.gcpstorage.kcql=insert into $kafka-topic select * from lensesio:xml STOREAS `text` PROPERTIES('read.text.mode'='startEndTag', 'read.text.start.tag'='<SSM>', 'read.text.end.tag'='</SSM>')

Storage to output matrix 

Depending on the storage format of Kafka topics’ messages, the need for replication to a different cluster, and the specific data analysis requirements, there exists a guideline on how to effectively utilize converters for both sink and source operations. This guidance aims to optimize performance and minimize unnecessary CPU and memory usage.

GCP Storage Storage FormatKafka Output FormatRestore or replicate clusterAnalyticsSink ConverterSource Converter
JSONSTRINGSame,OtherYes, NoStringConverterStringConverter
AVRO,ParquetSTRINGSame,OtherYesStringConverterStringConverter
AVRO,ParquetSTRINGSame,OtherNoByteArrayConverterByteArrayConverter
JSONJSONSame,OtherYesJsonConverterStringConverter
JSONJSONSame,OtherNoStringConverterStringConverter
AVRO,ParquetJSONSame,OtherYes,NoJsonConverterJsonConverter or Avro Converter( Glue, Confluent)
AVRO,Parquet, JSONBYTESSame,OtherYes,NoByteArrayConverterByteArrayConverter
AVRO,ParquetAVROSameYesAvro Converter( Glue, Confluent)Avro Converter( Glue, Confluent)
AVRO,ParquetAVROSameNoByteArrayConverterByteArrayConverter
AVRO,ParquetAVROOtherYes,NoAvro Converter( Glue, Confluent)Avro Converter( Glue, Confluent)
AVRO,ParquetProtobufSameYesProtobuf Converter( Glue, Confluent)Protobuf Converter( Glue, Confluent)
AVRO,ParquetProtobufSameNoByteArrayConverterByteArrayConverter
AVRO,ParquetProtobufOtherYes,NoProtobuf Converter( Glue, Confluent)Protobuf Converter( Glue, Confluent)
AVRO,Parquet, JSONOtherSame, OtherYes,NoByteArrayConverterByteArrayConverter

Adapt the key.converter and value.converter properties accordingly to the table above.

Auth Mode 

The connector offers two distinct authentication modes:

  • Default: This mode relies on the default GCP authentication chain, simplifying the authentication process.
  • File: This mode uses a local (to the connect worker) path for a file containing GCP authentication credentials.
  • Credentials: In this mode, explicit configuration of a GCP Credentials string is required for authentication.

The simplest example to configure in the connector is the “Default” mode, as this requires no other configuration.

...
connect.gcpstorage.gcp.auth.mode=Default
...

When selecting the “Credentials” mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.

Here’s an example configuration for the “Credentials” mode:

...
connect.gcpstorage.gcp.auth.mode=Credentials
connect.gcpstorage.gcp.credentials=$GCP_CREDENTIALS
connect.gcpstorage.gcp.project.id=$GCP_PROJECT_ID
...

And here is an example configuration using the “File” mode:

...
connect.gcpstorage.gcp.auth.mode=File
connect.gcpstorage.gcp.file=/home/secure-stuff/gcp-read-credential.txt
...

Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.

For enhanced security and flexibility when using either the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. You can find detailed information on how to use the Connect Secret Providers here. This approach ensures robust security practices while handling access credentials.

Ordering 

The GCP Storage sink employs zero-padding in file names to ensure precise ordering, leveraging optimizations offered by the GCP Storage API, guaranteeing the accurate sequence of files.

When using the GCP Storage source alongside the GCP Storage sink, the connector will adopt the same ordering method, ensuring data processing follows the correct chronological order.

--
Last modified: November 18, 2024