Stream Processing¶
Apache Kafka ecosystem is enhanced by the Kafka Streams API which allows for data streams processing. The framework is available for the Java runtime, therefore you need to know you Java, Kotlin, Scala, etc. to make use of it. Not anymore. A Kafka Streams flow can be described by anyone with basic SQL skills.
Streaming Syntax¶
LSQL second usage is defining Kafka Streams flows through a SQL-like syntax. All the information learned so far is fully applicable to all the SELECT statements written for stream processing. The syntax, which might look complex initially, looks like this:
[ SET autocreate = true;]
[ SET partitions = 1;]
[ SET replication = 2;]
[ SET `decimal.scale`= 18;]
[ SET `decimal.precision`= 38;]
[ SET `ANY KAFKA STREAMS CONFIG. See Kafka documentation StreamsConfig, ConsumerConfig and ProducerConfig` = '';]
[ SET `topic.[ANY KAFKA Log CONFIG. See LogConfig]` = '';]
[ SET `rocksdb.[RocksDB specific configurations. See the section below on RocksDB]`= '';]
INSERT INTO _TARGET_TOPIC_
[WITH
_ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...),
_ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...)
]
SELECT select_expr [, select_expr ...]
FROM _ID_NAME_ INNER JOIN _OTHER_ID_NAME_ ON join_expr
[WHERE condition_expr]
[GROUP BY group_by_expr]
[HAVING having_expr]
If you are not familiar with Apache Kafka stream processing API please follow the documentation.
Important
Streaming SQL is not your typical RDBMS SQL. Core concepts around stream processing with Apache Kafka, the duality of Table/Stream, the implication of creating a Table versus a Stream instance, etc. need to be understood first.
Using LSQL for streaming allows you to do:
- Transformations
- Aggregation
- Join data
We will go through each one in detail but before we do so we need to expand on the syntax you have seen earlier.
Important
When using AVRO payloads the schema needs to be present. LSQL engine does static validation against the existing schema. If the schema is missing an error will be returned.
Windowing¶
Windowing allows you to control how to group records which share the same key for stateful operations such as aggregations or join windows. Windows are tracked per record key. LSQL has support for the full spectrum of windowing functionality available in the Kafka Streams API.
Note
A record is discarded and will not be processed by the window if it arrives after the retention period has passed.
You can use the following types of windows in LSQL:
Hopping time windows
. These are windows based on time intervals. They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two properties: the window’s size and its advance interval (aka “hop”). The advance interval specifies how much a window moves forward relative to the previous one. For example, you can configure a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping windows can overlap a data record may belong to more than one such windows.
...
GROUP BY HOP(5,m,1,m)
...
Tumbling time windows
. These are a special case of hopping time windows and, like the latter, are windows based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s size. A tumbling window is a hopping window whose window size is equal to its advance interval. Since tumbling windows never overlap, a data record will belong to one and only one window.
...
GROUP BY tumble(1,m)
...
Sliding windows
. These express fixed-size window that slides continuously over the time axis. Here, two data records are said to be included in the same window if the difference of their timestamps is within the window size. Thus, sliding windows are not aligned to the epoch, but on the data record timestamps.
...
GROUP BY SLIDING(1,m)
...
Session windows
. These are used to aggregate key-based events into sessions. Sessions represent a period of activity separated by a defined gap of inactivity. Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions. If the event falls outside of the session gap, then a new session will be created. Session windows are tracked independently across keys (e.g. windows of different keys typically have different start and end times) and their sizes vary (even windows for the same key typically have different sizes). As such session windows can’t be pre-computed and are instead derived from analyzing the timestamps of the data records.
...
GROUP BY SESSION(10,m, 5, m)
...
All the window functions allow the user to specify the time unit. Supported time windows are
Keyword | Unit |
---|---|
MS | milliseconds |
S | seconds |
M | minutes |
H | hours |
Table or Stream¶
When using Apache Kafka Streams API you can build a KStream
or KTable
from a topic. To distinguish between them, LSQL
uses the keyword STREAM
. When the keyword is missing an instance of KTable
is created. When the keyword is present an instance of KStream
is created.
Important
Use SELECT STREAM
to create a KStream
instance. Use SELECT
to create a KTable
instance.
Important
When creating a table the Kafka Messages must have a non-null key. Otherwise, the record is ignored.
KStream Settings¶
LSQL allows you to define a Kafka Stream flow with SQL-like syntax. From the target topic settings to the producer/consumer settings
LSQL allows the user to override the defaults. LSQL supports setting these configurations via a standard SQL pattern of
setting variables via SET
. For example:
SET `auto.offset.reset` = 'smallest';
SET `processing.guarantee`= 'exactly_once'; //this is for Kafka 0.11+ enabling exactly once semantics
SET `commit.interval.ms` = 1000; //The frequency with which to save the position of the processor.
Any of the target topic specific configurations can also be specified here. Follow the Apache Kafka documentation here for a full list of topic specific configuration options. To set the configuration for the flow result topic you need to prefix the key with topic.. For example to set the cleanup policy to compact ` and to `flush.messages every 5 messages you need to configure LSQL as follows:
SET `topic.cleanup.policy`='compact';
SET `topic.flush.messages`= 5;
...
Apart from the topic, producer/consumer or Kafka stream configs, LSQL allows you to set the following:
Setting | Description | Type | Example |
---|---|---|---|
autocreate | If the target topic does not exist it will create it. **If the
Kafka setup does not allow for auto topic creation the flow will fail!**
|
BOOLEAN | SET autocreate=true |
partitions | The number of partitions to create for the target topic.
Applies only when autocreate is set to true. By default is false.
|
INTEGER | SET partitions=2 |
replication | How many replicas to create for the target topic. Applies
only when autocreate is set to true. By default is false.
|
INTEGER | SET replication=3 |
decimal.scale | When working with AVRO records where decimal type is
involved it specifies the decimal scale.
|
INTEGER | SET `decimal.scale`=18 |
decimal.precision | When working with AVRO records where decimal type is
involved it specifies the decimal precision.
|
INTEGER | SET `decimal.precision`=38 |
Important
Each SET ..
instruction needs to be followed by a semicolon:;
.
Here is an example of setting the commit interval to 5 seconds and enabling exactly-once semantics (Apache Kafka 0.11+):
SET `commit.interval.ms` = 5000;
SET `processing.guarantee`= 'exactly_once';
INSERT INTO `hot_sensors`
SELECT
ip
, lat
, `long`
, (temp * 1.8 + 32) as metric
FROM `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO
AND temp > 30
Note
Configuring a stream flow via code requires the following configuration keys to be set: default.key.serde
and default.value.serde
. LSQL takes care of this based on the values specified in the SQL so you don’t have to set them.
RocksDB Settings¶
Whenever the Kafka streams application requires state, it will rely on RocksDB. Configuring this key-value store breaks the pattern found in Kafka Streams API. To customize the settings one has to provide an implementation for org.apache.kafka.streams.state.RocksDBConfigSetter. LSQL covers most of the settings available, here is the entire list :
Key | Type | Description | |
---|---|---|---|
rocksdb.table.block.cache.size | LONG | Set the amount of cache in bytes
that will be used by RocksDB.
If cacheSize is non-positive, then cache will not be used.
DEFAULT: 8M
|
|
rocksdb.table.block.size | LONG | Approximate size of user data packed per block. Default: 4K | |
rocksdb.table.block.cache.compressed.num.shard.bits | INT | TableFormatConfig.setBlockCacheCompressedNumShardBits | Controls the number of shards for the
block compressed cache
|
rocksdb.table.block.cache.num.shard.bits | INT | Controls the number of shards for the block cache | |
rocksdb.table.block.cache.compressed.size | LONG | Size of compressed block cache. If 0,
then block_cache_compressed is set to null
|
|
rocksdb.table.block.restart.interval | INT | Set block restart interval | |
rocksdb.table.block.cache.size.and.filter | BOOL | Indicating if we’d put index/filter blocks to the block cache.
If not specified, each ‘table reader’ object will pre-load index/filter
block during table initialization
|
|
rocksdb.table.block.checksum.type | STRING | Sets the checksum type to be used with this table.
Available values: kNoChecksum, kCRC32c, kxxHash.
|
|
rocksdb.table.block.hash.allow.collision | BOOL | Influence the behavior when kHashSearch is used.
if false, stores a precise prefix to block range mapping
if true, does not store prefix and allows prefix hash collision
(less memory consumption)
|
|
rocksdb.table.block.index.type | STRING | Sets the index type to used with this table.
Available values: kBinarySearch, kHashSearch
|
|
rocksdb.table.block.no.cache | BOOL | Disable block cache. If this is set to true,
then no block cache should be used. Default: false
|
|
rocksdb.table.block.whole.key.filtering | BOOL | If true, place whole keys in the filter (not just prefixes).
This must generally be true for gets to be efficient.
Default: true
|
|
rocksdb.table.block.pinl0.filter | BOOL | Indicating if we’d like to pin L0 index/filter blocks to the block cache.
If not specified, defaults to false.
|
|
rocksdb.total.threads | INT | The max threads RocksDB should use | |
rocksdb.write.buffer.size | LONG | Sets the number of bytes the database will build up in memory
(backed by an unsorted log on disk) before converting to a
sorted on-disk file
|
|
rocksdb.table.block.size.deviation | INT | This is used to close a block before it reaches the configured
‘block_size’. If the percentage of free space in the current block is less
than this specified number and adding a new record to the block will
exceed the configured block size, then this block will be closed and the
new record will be written to the next block.
Default is 10.
|
|
rocksdb.table.block.format.version | INT | We currently have three versions:
0 - This version is currently written out by all RocksDB’s versions by default.
Can be read by really old RocksDB’s. Doesn’t support changing
checksum (default is CRC32).
1 - Can be read by RocksDB’s versions since 3.0.
Supports non-default checksum, like xxHash. It is written by RocksDB when
BlockBasedTableOptions::checksum is something other than kCRC32c. (version
0 is silently upconverted)
2 - Can be read by RocksDB’s versions since 3.10.
Changes the way we encode compressed blocks with LZ4, BZip2, and Zlib
compression. If you don’t plan to run RocksDB before version 3.10,
you should probably use this.
This option only affects newly written tables. When reading existing
tables, the information about version is read from the footer.
|
|
rocksdb.compaction.style | STRING | Available values: LEVEL, UNIVERSAL, FIFO | |
rocksdb.max.write.buffer | INT | ||
rocksdb.base.background.compaction | INT | ||
rocksdb.background.compaction.max | INT | ||
rocksdb.subcompaction.max | INT | ||
rocksdb.background.flushes.max | INT | ||
rocksdb.log.file.max | LONG | ||
rocksdb.log.fle.roll.time | LONG | ||
rocksdb.compaction.auto | BOOL | ||
rocksdb.compaction.level.max | INT | ||
rocksdb.files.opened.max | INT | ||
rocksdb.wal.ttl | LONG | ||
rocksdb.wal.size.limit | LONG | ||
rocksdb.memtable.concurrent.write | BOOL | ||
rocksdb.os.buffer | BOOL | ||
rocksdb.data.sync | BOOL | ||
rocksdb.fsync | BOOL | ||
rocksdb.log.dir | STRING | ||
rocksdb.wal.dir | STRING |
All the table configurations will initialize a BlockBasedTableConfig on which the call to set the values is made. This will be followed by a call to Options.setTableFormatConfig(tableConfig).
Transformations¶
This is the basic and most common use case. Transforming an incoming topic to morph the messages
using any of the capabilities provided by the SELECT
statement. That includes:
- Selecting specific fields
- Applying supported functions to achieve a new field
- Filtering the records based on your criteria.
Let’s imagine we have a topic containing sensor specific data:
{
"device_id": 1,
"ip": "191.35.83.75",
"timestamp": 1447886791,
"lat": 22,
"long": 82,
"scale": "Celsius",
"temp": 22.0,
"device_name": "sensor-AbC-193X",
"humidity": 15,
"zipcode": 95498
}
And we want to select only that data where the temperature is over 30 degrees Celsius.
Furthermore, we want the temperature value to be expressed in Fahrenheit
and we only need ip
, lat
, long
fields from the initial data. To do so we can write this LSQL statement:
INSERT INTO `hot_sensors`
SELECT
ip
, lat
, long
, (temp * 1.8 + 32) AS metric
FROM `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO
AND temp > 30
This is the simplest flow you could write and the query will end up producing records looking like this:
{
"ip": "191.35.83.75",
"lat": 22,
"long": 82,
"metric": 71.6
}
The SQL syntax allows you to access nested fields or a complex field. We can change slightly the structure above to do this. The new data looks like this:
{
"description":"Sensor embedded in exhaust pipes in the ceilings",
"ip":"204.116.105.67",
"id":5,
"temp":40,
"c02_level":1574,
"geo":{
"lat":35.93,
"long":-85.46
}
}
First, we write the SQL to address the nested fields:
INSERT INTO `new_sensors`
SELECT
ip
, geo.lat
, geo.long
, temp
FROM `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO
The result of applying this query will be AVRO records with the following format:
{
"ip":"204.116.105.67",
"lat":35.93,
"long":-85.46,
"temp":40
}
If the user selects a complex field, the entire substructure is copied over. For example:
INSERT INTO `new_sensors`
SELECT
ip
, geo
, temp
FROM `sensors`
WHERE _ktype = 'LONG'
AND _vtype = AVRO
The new records will have this format:
{
"ip":"204.116.105.67",
"geo":{
"lat":35.93,
"long":-85.46
},
"temp":40
}
These examples are for records of type AVRO, but the similar support is provided for JSON payloads.
Aggregation¶
Typical streaming aggregation involves scenarios similar to these:
- Counting the number of visitors on your website per region
- Totalling amount of Foreign Exchange transactions for GBP-USD on a 15 minutes interval
- Totalling sales made on each of the company stores every day
- Retaining the minimum and maximum stock value on a 30 minutes interval
These are just a few examples - the list goes on. LSQL gives you a way of quickly express such aggregation over Kafka streams with either JSON or AVRO payloads.
Imagine a trading system needs to display the number of transactions made for each currency pair (GBPUSD is a currency exchange ticker). Such functionality can be easily achieved with a query like this:
INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY ticker
Remember this is a stateful stream so potentially, you could see the values for a ticker more than once. It depends on how many transactions flow through the Kafka topic. The result of this query could be the following:
Key | Value |
---|---|
GBPUSD | 1 |
CHFYEN | 1 |
USDEUR | 1 |
GBPUSD | 3 |
USDEUR | 5 |
Suppose the user needs to look only at specific tickers. There are two approaches here where the filter is applied in the WHERE
clause (best for performance) or relying on HAVING
clause. Both examples are covered by the queries below:
INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = BYTES
AND _vtype = AVRO
AND ticker LIKE '%GBP%'
GROUP BY ticker
--OR
INSERT INTO `total_transactions`
SELECT count(*) as transaction_count
FROM `fx`
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY ticker
HAVING ticker in ('GBPUSD', 'EURDKK', 'SEKCHF')
Having
clause allows the usage of any of the LSQL supported functions to achieve your filter requirements.
To illustrate that we will filter all the tickers for USD
and the list: GBPUSD
, EURDKK
, SEKCHF
INSERT INTO `total_transactions`
SELECT count(*) as transaction_count
FROM `fx`
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY ticker
HAVING ticker IN ('GBPUSD', 'EURDKK', 'SEKCHF') OR ticker LIKE '%USD%'
There are scenarios where grouping by the record key part. Assume the fx
topic contains the ticker in the key part. In that case, the queries become:
INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = 'STRING'
AND _vtype = AVRO
GROUP BY _key
-- OR adding a filter
INSERT INTO `total_transactions`
SELECT count(*) AS transaction_count
FROM `fx`
WHERE _ktype = 'STRING'
AND _vtype = AVRO
AND _key.* LIKE '%GBP%'
GROUP BY _key
Important
Every time GROUP BY
a field is involved, the resulting key on the target topic is STRING
!
This is to allow joins on multiple fields.
In the version 1, LSQL does not support arithmetic on aggregation functions. By that we mean you can not do `SUM(fieldA)/count(*)`
.
We are looking at solutions to address this in the upcoming version(-s). Meanwhile here is how you can do it for now:
SET `auto.offset.reset`='latest';
SET autocreate = true;
SET `commit.interval.ms` = 3000;
INSERT INTO sensor_data_avg
WITH
avgStream as
(
SELECT STREAM
COUNT(*) as total,
SUM(temperature) AS temperatureTotal,
SUM(humidity) AS humidityTotal,
MIN(temperature) AS minTemperature,
MAX(temperature) AS maxTemperature,
MIN(humidity) AS minHumidity,
MAX(humidity) AS maxHumidity
FROM `sensor_data`
WHERE _ktype='STRING' AND _vtype='JSON'
GROUP BY TUMBLE(2,s),_key
)
SELECT STREAM
temperatureTotal/total AS avgTemperature,
humidityTotal/total AS avgHumidity,
minTemperature,
maxTemperature,
minHumidity,
maxHumidity
FROM avgStream
Notice the last SELECT statement uses the output data from the first one in order to achieve the average calculation.
Important
Doing aggregate functions (SUM/COUNT/MIN/MAX) arithmetic is not supported!
Using Window¶
We have shown so far simple aggregation without involving windows. That might solve some of the requirements a user has. Aggregating over a window is a very common scenario for streaming. Window support was introduced earlier; please revisit the windowing section.
Keeping the trend of IoT scenarios, imagine there is a stream of metrics information from devices across the globe. The data structure looks like this:
{
"device_id": 2,
"device_type": "sensor-gauge",
"ip": "193.156.90.200",
"cca3": "NOR",
"cn": "Norway",
"temp": 18,
"signal": 26,
"battery_level": 8,
"timestamp": 1475600522
}
The following query allows you to count all the records received from each country on a tumbling window of 30 seconds. Such functionality can be described like this:
INSERT INTO norway_sensors_count
SELECT count(*) AS total
FROM sensors
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY tumble(30,s), cca3
The result would be records emitted on a 30 seconds interval and they would look similar to this:
header: | “Key”, “Value” NOR, 10 ROM, 2 FRA, 126 UK, 312 US, 289 NOR, 2 FRA, 16 UK, 352 US, 219 |
---|
Note
Remember the key value will be of type String
.
So far we have done counting only but LSQL provides support for SUM
, MIN
or MAX
as well.
Maybe your system processes customers orders and you want to keep computing every hour the total amount
of orders over the last 24 hours.
SELECT
product
, SUM(amount) AS amount
FROM Orders
WHERE _ktype = BYTES
AND _vtype = AVRO
GROUP BY HOP(1, H, 1,D), product
Joins¶
A join operation merges two streams based on the keys of their data records. The result is a new stream.
Note
LSQL supports joins on key, but also allows the user to join based on value/key part fields. This will end up with both sides having the key of the record remapped. The key is a result of string concatenation of the fields involved
Kafka Streams supports the following join operations:
KStream-to-KStream
Joins are always windowed joins, since otherwise the memory and state required to compute the join would grow infinitely in size. Here, a newly received record from one of the streams is joined with the other stream’s records within the specified window interval to produce one result for each matching pair. A new KStream instance representing the resulting stream of the join is returned from this operator.KTable-to-KTable
Joins are join operations designed to be consistent with the ones in relational databases. Here, both changelog streams are materialized into local state stores first. When a new record is received from one of the streams, it is joined with the other stream’s materialized state stores to produce one result for each matching pair. A new KTable instance is produced representing the result stream of the join, which is also a changelog stream of the represented table.KStream-to-KTable
Joins allow you to perform table lookups against a changelog stream (KTable) upon receiving a new record from another record stream (KStream). An example use case would be to enrich a stream of orders (KStream) with the order details(KTable). Only records received from the record stream will trigger the join and produce results, not vice versa. This results in a brand new KStream instance representing the result stream of the join.
Here is a table of joins supported by Apache Kafka Streams:
Left Operand | Right Operand | Inner Join | Left Join | Outer Join |
---|---|---|---|---|
KStream | KStream | Yes | Yes | Yes |
KTable | KTable | Yes | Yes | Yes |
KStream | KTable | Yes | Yes | No |
LSQL supports these joins operators:
INNER
LEFT
OUTER
RIGHT
Note
RIGHT JOIN
will be expressed in terms of LEFT JOIN
(The two operands are swapped)
Given the table above, here is a list of joins NOT possible by default in Kafka Streams API:
KTable RIGHT JOIN KStream
KTable OUTER JOIN KStream
KStream RIGHT JOIN KTable
LSQL ALLOWS the user to perform these operations, however, there are some costs associated with doing so.
But before more details are provided, we need to give an overview of the context at hand.
We said already a RIGHT JOIN
is expressed as a LEFT JOIN
and as a result, the above list becomes the following:
KStream LEFT JOIN KTable
KTable OUTER JOIN KStream
KTable LEFT JOIN KStream
The challenge here is that a KTable can only be joined with another one. Furthermore, at the moment there is not a straightforward way to go from a KStream instance to a KTable one. The only solution is to use an intermediary topic and then build the KTable required off that topic. Of course, this will hurt performance since the data has to be written to a topic and read again to allow for the join to happen. The topology description for the flow will reflect such scenario. Given this information the above joins become:
KTABLE LEFT JOIN KTable
KTable OUTER JOIN KTABLE
KTABLE LEFT JOIN KTABLE
A KStream OUTER JOIN KTable
, despite not having support in the Kafka Streams API, is translated to a
KStream OUTER JOIN KTable
.
Important
LSQL transforms the flow as required to allow for the join type to happen. Fully understand the implications of making joins which require going through an intermediary topic.
Repartition¶
Apache Kafka Streams API does not allow joining two streams with a different partition count. This can easily be the case
in real systems. For example, with an order and order-detail topic, the partition count on latter will be smaller
since traffic is lower. To allow such a join LSQL makes sure it brings the two in line. As a result, it will have
to create an order-repartition
(the name is just an illustration) matching the right operand version.
Such operation will have a direct impact on performance since the entire topic is copied over just to have the join. The topology viewer allows the user to see when such flow change appears.
Using WITH¶
As you have seen earlier, the full syntax for LSQL contains the following:
[WITH
_ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...),
_ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...)
]
This allows you to break down your query complexity. For example, let’s consider the scenario where you have a topic for static product details which you will want to join against the topic containing the orders. From the product details, you only need to store the product name. This is the SQL to use to achieve such behavior:
...
INSERT INTO ...
WITH
productTable AS
(
SELECT productName
FROM `product`
WHERE _ktype = 'STRING'
AND _vtype = AVRO
)
SELECT ..
FROM ... JOIN productTable ON ...
...
Any names registered via WITH
, in the example above product
, can be referenced after its definition.
If your requirements are as such that you need to define multiple entries you can do so by separating all the
WITH
via comma. For example:
WITH
productTable as
(
SELECT productName
FROM `product`
WHERE _ktype = 'STRING'
AND _vtype = AVRO
),
userTable as
(
SELECT firstName, secondName
FROM `user`
WHERE _ktype = 'LONG'
AND _vtype = AVRO
)
The examples define tables (which translate into instances of a KTable) but you can specify a stream (which translates
to an instance of KStream) by simply adding STREAM
after the SELECT
:
WITH
productStream AS
(
SELECT STREAM productName
FROM `product`
WHERE _ktype = 'STRING'
AND _vtype = AVRO
)
Important
If the right operand is not found in the list of entities defined by WITH
a Stream instance will be created.
SELECT STREAM
or SELECT
within a join targets the left operand only.
Join on Key¶
When joins were introduced at the beginning of the chapter, it was stated that two records are matched when their keys are equal. Here is how you would join orders and order details for AVRO records:
INSERT INTO orders_enhanced
SELECT STREAM
o.orderNumber
, o.status
, SUM(od.quantityOrdered * od.priceEach) total
FROM `order_details` AS od
INNER JOIN `orders` AS o
ON o._key = od._key
WHERE o._ktype = 'LONG'
AND o._vtype = AVRO
AND od._ktype = 'LONG'
AND od._vtype = AVRO
GROUP BY TUMBLE(2,s),o.orderNumber
Important
You can not join two topics when left operand value decoder differs from right operand value decoder. Joining values from different decoder types is not supported.
When joining streams the join needs to happen over a JoinWindow
. The GROUP BY tumble(2,s)
will be used as part of aggregation but also from it LSQL
will build an instance of JoinWindow
instance to use when joining the streams but before apply the grouping.
The translation between Window and JoinWindow
happens as described in the table below:
header: | “Window Type”,”Join Window” tumble(duration),JoinWindows.of(duration) “hop(duration,advance)”,JoinWindows.of(duration).until(advance) “session(inactivity,duration)”,JoinWindows.of(inactivity).until(advance) slide(duration),JoinWindows.of(duration) |
---|
If your topics have JSON payloads the above query should be:
INSERT INTO orders_enhanced
SELECT STREAM
o.orderNumber
, o.status
, SUM(od.quantityOrdered * od.priceEach) total
FROM `order_details` AS od
INNER JOIN `orders` AS o
ON o._key = od._key
WHERE o._ktype = 'LONG'
AND o._vtype = JSON
AND od._ktype = 'LONG'
AND od._vtype = JSON
GROUP BY TUMBLE(2,s),o.orderNumber
You can still join two streams without aggregating by simply doing the following:
INSERT INTO `orders_enhanced`
SELECT STREAM
od.orderNumber
, od.productCode
, od.quantityOrdered
, od.priceEach
, od.orderLineNumber
, p.productName
FROM `product` as p
INNER JOIN `order_details` AS od
ON p._key = od._key
WHERE p._ktype = BYTES
AND p._vtype = AVRO
AND od._ktype = BYTES
AND od._vtype = AVRO
GROUP BY TUMBLE(2,s)
Although GROUP BY
is still used it is not actually applying grouping since no grouping fields were defined.
If your product topic key is not AVRO, you can specify like in the example above, the _ktype='BYTES'
.
This gives some performance benefit for not having to deserialize the LONG
in the earlier example and serializing it back to the output topic.
Important
Do not use BYTES
when the payload is AVRO. The AVRO content is
retained but the Schema Registry entry on the target will not be created.
All the functions supported by LSQL can be used in the select list. However unless grouping is involved the analytic ones: SUM
, MIN
, MAX
,
COUNT
are not allowed.
INSERT INTO `orders_enhanced`
SELECT STREAM
od.orderNumber
, od.productCode
, od.quantityOrdered
, od.priceEach
, od.orderLineNumber
, concat(od.productCode,'-',p.productName) AS productName
FROM `order_details` AS od
LEFT JOIN `product` as p
ON p.productCode = od.productCode
WHERE p._ktype = BYTES
AND p._vtype = AVRO
AND od._ktype = BYTES
AND od._vtype = AVRO
GROUP BY TUMBLE(4,s)
Join on Fields¶
It is not always the case that the topic key is actually the value to join on. Maybe it was an oversight in initial development. but LSQL has it covered as well. It allows you to chose a field from the Kafka message value part to use during the join.
INSERT INTO `order_details`
SELECT STREAM
o.orderNumber
, o.status
, o.flags
, od.productCode
FROM `order_details` AS od
INNER JOIN `orders` AS o
ON o.orderNumber = od.orderNumber
WHERE o._ktype = BYTES
AND o._vtype = AVRO
AND od._ktype = BYTES
AND od._vtype = AVRO
GROUP BY TUMBLE(2,s)
There is a trade-off here. Joining on a field like above means the stream needs to be remapped to allow for the new key.
All groupings will result in a String
key. The reason is LSQL allows you to join on more than one field!
The key is a string concatenation of all the values involved.
Important
Joining on a value field(-s) will re-map the stream/table and the new key type will be String
.
Re-mapping a table has it’s cost since it will have to move the data from the KTable to a new topic
and build a new instance of the table.
The standard way to handle joins with a table is to define the table via WITH
. An optimal solution for
joining orders with products to get the product name attached to the order looks like this:
INSERT INTO `orders_enhanced`
WITH
productTable AS
(
SELECT productName
FROM `product`
WHERE _ktype = 'STRING'
AND _vtype = AVRO
)
SELECT STREAM
od.orderNumber
, od.productCode
, od.quantityOrdered
, od.priceEach
, od.orderLineNumber
, p.productName
FROM `order_details` AS od
LEFT JOIN productTable AS p
ON p._key = od.productCode
WHERE od._ktype = BYTES
AND od._vtype = AVRO
First, a productTable
is defined and it becomes the right operand for a LEFT JOIN
. It is required for the
od.productCode
to be of type String
since the key on the table is String
. Also, notice _ktype
is still required for order_details
(od._ktype=’BYTES’). The resulting schema will
have the productName
field as an optional string
since the right side might not be present.