Tuning

The SQL engine can be tuned for each query, from controlling the resources involved when running the aforementioned query. To instruct the engine what to do, a user would have to set a few parameters using the SET statement.

Control execution

Querying a table can take a long time if it contains a lot of records. The underlying Kafka topic has to be read and the filter conditions applied and the projections made. Even more, the SELECT statement could end up bringing a large amount of data to the client. To be able to constrain the resources involved, the SQL engine allows for context customization, which ends up driving the execution, thus giving control to the user. Here is the list of context parameters to overwrite:

Name Description Usage
max.size
The maximum amount of data to return.
It can be expressed as bytes (1024b), as kilo-bytes (1024k)
, as mega-bytes (10m) or as giga-bytes (5g).
Default is 20MB
SET max.bytes = '20m';
will set a max of 20MB to be returned
max.query.time
The maximum amount of time the query
is allowed to run. It can be specified as millisecon (2000ms)
,as hours (2h), minutes (10m) or seconds (60s).
Default is 1 hour
SET max.time = '60000ms';
sets a one minute query limit
max.idle.time
The amount of time to wait
when no more records are read from the source
before the query is completed.
Default is 5 seconds.
SET max.idle.time = '5s';;
sets a maximum of 5 calls returning 0.
LIMIT N
The maximum of records to return.
Default is 10000
SELECT *
FROM payments
LIMIT 100;
skip.bad.records
Flag to drive the behavior
of handling topic records when their payload
does not correspond with the table storage format.
Default is true. This means bad records are
skipped. Set it to false to fail the query
on first invalid payload record.
SET skip.bad.records=false;
format.timestamp
Flag to control the values
for Avro date time. Avro encodes date time via
Long values. Set the value to true if you want
the values to be returned as text and in a human
readable format.
SET format.timestamp=true;
live.aggs
Flag to control if aggregation
queries should be allowed to run. Since they accumulate
data they require more memory to retain the state.
SET live.aggs=true;
optimize.kafka.partition
When enabled, it will use the primitive
used for the _key filter to determine the partition the same
way the default Kafka partitioner logic does. Therefore,
queries like SELECT * FROM trips WHERE _key='customer_id_value';
on multiple partition topics will only read one partition
as opposed to the entire topic. To disable it,
set the flag to false.
SET optimize.kafka.partition=false;
query.parallel
When used, it will parallelize
the query. The number provided will be capped by the
target topic partitions count.
SET query.parallel=2;
query.buffer
Internal buffer when
processing the messages.
SET query.buffer=50000;
kafka.offset.timeout
Timeout for retrieving target topic
start/end offsets.
SET kafka.offset.timeout=20000;

All the above values can be given a default value via the configuration file. Using lenses.sql.settings as prefix the format.timestamp can be set like this:

lenses.sql.settings.format.timestamp=true

Query tuning

Lenses SQL uses Kafka Consumer to read the data. This means that an advanced user with knowledge of Kafka could tweak the consumer properties to achieve better throughput. This would occur on very rare occasions. The query context can receive Kafka consumer settings. For example, the max.poll.records consumer can be set as:

SET max.poll.records= 100000;
SELECT *
FROM payments
LIMIT 1000000

Example

The fact is that streaming SQL is operating on unbounded streams of events: a query would normally be a never-ending query. In order to bring query termination semantics into Apache Kafka we introduced 4 controls:

  • LIMIT = 10000 - Force the query to terminate when 10,000 records are matched.
  • max.bytes = 20000000 - Force the query to terminate once 20 MBytes have been retrieved.
  • max.time = 60000 - Force the query to terminate after 60 seconds.
  • max.zero.polls = 8 - Force the query to terminate after 8 consecutive polls are empty, indicating we have exhausted a topic.

Thus when retrieving data, you can set a limit of 1GByte to the maximum number of bytes retrieved and maximum query time of one hour as follows:

SET max.bytes = 1000000000;
SET max.time = 60000000;
SELECT * from topicA WHERE customer.id = "XXX"