Settings¶
When defining a data stream flow with the SQL-like syntax, the engine offers the syntax constructs to tweak a few parameters. These parameters can be set via the SET operator.
SET `auto.offset.reset` = 'earliest';
SET `processing.guarantee`= 'exactly_once'; //this is for Kafka 0.11+ enabling exactly once semantics
SET `commit.interval.ms` = 1000; //The frequency with which to save the position of the processor.
Since the Lenses data flow maps to a Kafka Streams flow, the user can tweak the underlying Kafka producer/consumer settings as well as processing settings. The example provided above is a clear sample of how-to do it.
A user can also set the target topic configurations. Follow the Apache Kafka documentation
here for a full list of topic-specific configuration options.
To set the configuration for the flow result topic, you need to prefix the key with topic.
.
For example to set the cleanup policy to compact
and to flush.messages
every 5 messages the following SQL code needs to be used:
SET `topic.cleanup.policy`='compact';
SET `topic.flush.messages`= 5;
...
Apart from the topic, producer/consumer or Kafka stream configs, Lenses SQL allows you to set the following:
Setting | Description | Type | Example |
---|---|---|---|
autocreate | If the target topic does not exist it will create it. If the
Kafka setup does not allow for auto topic creation the flow will fail!
|
BOOLEAN | SET autocreate=true |
partitions | The number of partitions to create for the target topic.
Applies only when autocreate is set to true. By default is false.
|
INTEGER | SET partitions=2 |
replication | How many replicas to create for the target topic. Applies
only when autocreate is set to true. By default is false.
|
INTEGER | SET replication=3 |
decimal.scale | When working with AVRO records where decimal type is
involved it specifies the decimal scale.
|
INTEGER | SET `decimal.scale`=18 |
decimal.precision | When working with AVRO records where decimal type is
involved it specifies the decimal precision.
|
INTEGER | SET `decimal.precision`=38 |
output.schema.name | When working with AVRO it controls the output schema
name.
|
STRING | SET `output.schema.name`='my-own-schema-name' |
output.schema.namespace | When working with AVRO it controls the output schema
namespace.
|
STRING | SET `output.schema.namespace`='my-own-schema-namespace' |
Important
Each SET ..
instruction needs to be followed by a semicolon:;
.
Here is an example of setting the commit interval to 5 seconds and enabling exactly-once semantics (Apache Kafka 0.11+):
SET `commit.interval.ms` = 5000;
SET `processing.guarantee`= 'exactly_once';
INSERT INTO `hot_sensors`
SELECT
ip
, lat
, `long`
, (temp * 1.8 + 32) as metric
FROM `sensors`
WHERE temp > 30
RocksDB Settings¶
As discussed in the table-stream duality, a table is materialized on the machine running the streaming flow by an instance of RocksDb instance. Being able to set some of the runtime configuration used by the key-value database is required in very few occasions. The keys to reference by the user when using the SET operator can be found in the table below:
Key | Type | Description | |
---|---|---|---|
rocksdb.table.block.cache.size | LONG | Set the amount of cache in bytes
that will be used by RocksDB.
If cacheSize is non-positive, then cache will not be used.
DEFAULT: 8M
|
|
rocksdb.table.block.size | LONG | Approximate size of user data packed per block. Default: 4K | |
rocksdb.table.block.cache.compressed.num.shard.bits | INT | TableFormatConfig.setBlockCacheCompressedNumShardBits | Controls the number of shards for the
block compressed cache
|
rocksdb.table.block.cache.num.shard.bits | INT | Controls the number of shards for the block cache | |
rocksdb.table.block.cache.compressed.size | LONG | Size of compressed block cache. If 0,
then block_cache_compressed is set to null
|
|
rocksdb.table.block.restart.interval | INT | Set block restart interval | |
rocksdb.table.block.cache.size.and.filter | BOOL | Indicating if we’d put index/filter blocks to the block cache.
If not specified, each ‘table reader’ object will pre-load index/filter
block during table initialization
|
|
rocksdb.table.block.checksum.type | STRING | Sets the checksum type to be used with this table.
Available values: kNoChecksum, kCRC32c, kxxHash.
|
|
rocksdb.table.block.hash.allow.collision | BOOL | Influence the behavior when kHashSearch is used.
if false, stores a precise prefix to block range mapping
if true, does not store prefix and allows prefix hash collision
(less memory consumption)
|
|
rocksdb.table.block.index.type | STRING | Sets the index type to used with this table.
Available values: kBinarySearch, kHashSearch
|
|
rocksdb.table.block.no.cache | BOOL | Disable block cache. If this is set to true,
then no block cache should be used. Default: false
|
|
rocksdb.table.block.whole.key.filtering | BOOL | If true, place whole keys in the filter (not just prefixes).
This must generally be true for gets to be efficient.
Default: true
|
|
rocksdb.table.block.pinl0.filter | BOOL | Indicating if we’d like to pin L0 index/filter blocks to the block cache.
If not specified, defaults to false.
|
|
rocksdb.total.threads | INT | The max threads RocksDB should use | |
rocksdb.write.buffer.size | LONG | Sets the number of bytes the database will build up in memory
(backed by an unsorted log on disk) before converting to a
sorted on-disk file
|
|
rocksdb.table.block.size.deviation | INT | This is used to close a block before it reaches the configured
‘block_size’. If the percentage of free space in the current block is less
than this specified number and adding a new record to the block will
exceed the configured block size, then this block will be closed and the
new record will be written to the next block.
Default is 10.
|
|
rocksdb.table.block.format.version | INT | We currently have three versions:
0 - This version is currently written out by all RocksDB’s versions by default.
Can be read by really old RocksDB’s. Doesn’t support changing
checksum (default is CRC32).
1 - Can be read by RocksDB’s versions since 3.0.
Supports non-default checksum, like xxHash. It is written by RocksDB when
BlockBasedTableOptions::checksum is something other than kCRC32c. (version
0 is silently upconverted)
2 - Can be read by RocksDB’s versions since 3.10.
Changes the way we encode compressed blocks with LZ4, BZip2, and Zlib
compression. If you don’t plan to run RocksDB before version 3.10,
you should probably use this.
This option only affects newly written tables. When reading existing
tables, the information about version is read from the footer.
|
|
rocksdb.compaction.style | STRING | Available values: LEVEL, UNIVERSAL, FIFO | |
rocksdb.max.write.buffer | INT | ||
rocksdb.base.background.compaction | INT | ||
rocksdb.background.compaction.max | INT | ||
rocksdb.subcompaction.max | INT | ||
rocksdb.background.flushes.max | INT | ||
rocksdb.log.file.max | LONG | ||
rocksdb.log.fle.roll.time | LONG | ||
rocksdb.compaction.auto | BOOL | ||
rocksdb.compaction.level.max | INT | ||
rocksdb.files.opened.max | INT | ||
rocksdb.wal.ttl | LONG | ||
rocksdb.wal.size.limit | LONG | ||
rocksdb.memtable.concurrent.write | BOOL | ||
rocksdb.os.buffer | BOOL | ||
rocksdb.data.sync | BOOL | ||
rocksdb.fsync | BOOL | ||
rocksdb.log.dir | STRING | ||
rocksdb.wal.dir | STRING |