These scenarios partition data by date and time, employing record timestamp headers to enable partitioning based on these time components.
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.date.format="yyyy-MM-dd-HH" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date STORE AS X
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.year.format="'year='yyyy" transforms.partition.month.format="'month='MM" transforms.partition.day.format="'day='dd" transforms.partition.hour.format="'hour='HH" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour
Similar to the previous scenario, this partitions data by year, month, and day. It utilizes record timestamp headers for partitioning based on these time components.
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.year.format="'year='yyyy" transforms.partition.month.format="'month='MM" transforms.partition.day.format="'day='dd" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day
Extending the previous scenarios, these partition data by year, month, day, hour, and minute, allowing for more granular time-based partitioning.
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.year.format="yyyy" transforms.partition.month.format="MM" transforms.partition.day.format="dd" transforms.partition.hour.format="HH" transforms.partition.minute.format="mm" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.year, _header.month, _header.day, _header.hour, _header.minute
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.date.format="'date='yyyy-MM-dd" transforms.partition.hour.format="'time='HHmm" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.time
This scenario partitions data by year, month, day, and hour. It utilizes a transformation process to insert record timestamp headers, enabling partitioning based on these time components.
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.date.format="'data_date='yyyy-MM-dd" transforms.partition.hour.format="'hour='HH" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hour
This scenario partitions data by date and hour, using record timestamp headers for partitioning based on these time components.
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.date.format="'dt='yyyy-MM-dd" transforms.partition.hour.format="'hour='HH" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date, _header.hour
Data is partitioned based on the raw creation date, employing record timestamp headers for this partitioning scheme.
#"‘raw_cre_dt’=YYYY’-‘MM’-‘dd"
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.date.format="'raw_cre_dt='yyyy-MM-dd" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
Data is partitioned based on the creation timestamp, utilizing record timestamp headers for this partitioning scheme.
#"‘creation-ts’=YYYY-MM-dd"
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.date.format="'creation-ts='yyyy-MM-dd" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
This scenario partitions data by the created at date, employing record timestamp headers for partitioning.
#"‘createdAt’=YYYY’-‘MM’-‘dd"
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.date.format="'createdAt='yyyy-MM-dd" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
Similar to the previous scenario, this partitions data by the created at date, utilizing record timestamp headers for partitioning.
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.date.format="'createdAt='yyyyMMddHH" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.date.format="'created_at='yyyy-MM-dd" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
Data is partitioned based on the creation date, employing record timestamp headers for this partitioning scheme.
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.date.format="'creation_ds='yyyy-MM-dd" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
Data is partitioned by data date, utilizing record timestamp headers for partitioning based on these time components.
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.date.format="'data_date='yyyy-MM-dd" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
Data is partitioned by data date and hour, utilizing record timestamp headers for partitioning based on these time components.
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.date.format="'date_hour='yyyy-MM-dd-HH" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
transforms=partition transforms.partition.type=io.lenses.connect.smt.header.InsertRecordTimestampHeaders transforms.partition.date.format="'data_date='yyyy-MM-dd-HH" connect.gcpstorage.kcql=INSERT INTO $bucket:prefix SELECT * FROM kafka_topic PARTITIONBY _header.date
Data is partitioned based on the date and hour, employing record timestamp headers for this partitioning scheme.
The default Confluent partitioning scheme follows the structure <prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>. This provides a default partitioning mechanism for Kafka topics.
<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
On this page