The SET syntax allows to customize the behavior for the underling Kafka Consumer/Producer, Kafka Streams (inlcuding RocksDB parameters), topic creation and error handling.
The general syntax is:
SET <setting_name>=<setting_value>;
The computed result is written back to a Kafka topic. SQL processor can create the topics are not present. There are two levels of settings, generic (or default) applying to all target topics and specific (or topic related) to allow distinct setup for a given topic. Maybe one of the output topics requires a different partition count or replication factor than the defaults.
To set the defaults follow this syntax:
SET defaults.topic.<topic_setting_key> = <value>;
cleanup.policy
SET defaults.topic.cleanup.policy='compact,delete';
All the keys applicable for defaults are valid for controlling the settings for a given topic. Controlling the settings for a specific topic can be done via:
SET topic.<topic_name>.<topic_setting_key>=<value>;
SET topic.market_risk.cleanup.policy='compact,delete'; --escaping the topic name if it contains . or - or other non-alpha numeric SET topic.`market.risk`.cleanup.policy='compact,delete'; SET topic.`market-risk`.cleanup.policy='compact,delete';
The streaming engine allows users to define how errors are handled when writing to or reading from a topic.
Both sides can be set at once by doing:
SET error.policy= '<error_policy>';
or individually as described in the sections below.
Data being processed might be corrupted or not aligned with the topic format (maybe you expect an Avro payload but the raw bytes represent a JSON document). Setting what happens in these scenarios can be done like this:
SET error.policy.read= '<error_policy>';
While data is being written multiple errors can occur (maybe there were some network issues). Setting what happens in these scenarios can be done like this:
SET error.policy.write= '<error_policy>';
There are three possible values to control the behavior.
dead.letter.queue
lenses.sql.dlq
When dlq is used this setting is required. The value is the target topic where the problematic records will be sent to.
dlq
SET dead.letter.queue = '<dead_letter_topic>';
Using the SET syntax, the underlying Kafka Streams and Kafka Producer and Consumer settings can be adjusted.
SET <setting_key>=<value>;
processing.guarantee
EXACTLY_ONCE
commit.interval.ms
Alongside the keys above, the Kafka consumer and producer settings can be also tweaked.
SET session.timeout.ms=120000; SET max.poll.record = 20000;
Some of the configuration for the consumer and producer have the same name. At times, maybe there is a requirement to distinguish them. To do that the keys have to be prefixed with: consumer or producer.
SET consumer.<duplicate_config_key>=<value_1>; SET producer.<duplicate_config_key>=<value_2>;
Stateful data flow applications, might require, on rare occassion, some of the parameters for the underlying RocksDB to be tweaked.
To set the properties, use:
SET rocksdb.<key> = <value>;
kNoChecksum
kCRC32c
kxxHash
kBinarySearch
kHashSearch
LEVEL
UNIVERSAL
FIFO
On this page