Examples¶
Joins¶
-- LEFT JOIN messages from two topics, using a 4-second tumbling window and store results into target topic
INSERT INTO `toTopic`
SELECT STREAM
od.orderNumber
, od.productCode
, od.quantityOrdered
, od.priceEach
, od.orderLineNumber
, CONCAT(od.productCode,'-',p.productName) AS productName
FROM OrdersDetailsTopic AS od
LEFT JOIN `ProductTopic` AS p
ON p.productCode = od.productCode
GROUP BY TUMBLE(4,s)
-- RIGHT JOIN messages from two topics over a 10-minute tumbling window
INSERT INTO `toTopic`
WITH
product AS
(
SELECT productName
FROM `ProductTopic`
)
SELECT STREAM
od.orderNumber
, od.productCode
, od.quantityOrdered
, od.priceEach
, od.orderLineNumber
, p.productName
FROM product AS p
RIGHT JOIN `OrdersDetailsTopic` AS od
ON p._key = od.productCode
GROUP BY TUMBLE(10,m)
-- LEFT JOIN two topics with JSON data
INSERT INTO `toTopic`
SELECT STREAM
od.orderNumber
, od.productCode
, od.quantityOrdered
, od.priceEach
, od.orderLineNumber
, concat(od.productCode,'-',p.productName) as productName
FROM OrdersDetailsTopic AS od
LEFT JOIN `ProductTopic` AS p
ON p.productCode = od.productCode
GROUP BY TUMBLE(4,s)
-- Full LEFT JOIN of a stream with a table
SET `auto.offset.reset`='latest';
INSERT INTO `toTopic`
WITH
tableTelecom AS (
SELECT *
FROM `telecom_data`
)
SELECT STREAM
data.squareId
, grid.polygon
FROM `activity` AS data
LEFT JOIN tableTelecom AS grid
ON data._key = grid._key
Joins on optional field¶
There are scenarios where although the incoming records on the topic(s) do not always have a given field (or set of fields), you still want to join on it (them). To perform that operation, first build two streams (left and right - as you can see below) which filter out those records which do not have header.correlationId and header.recordId. The second part is to have the join between the two created streams:
INSERT INTO new_records
WITH left as
(
SELECT STREAM header.correlationId, header.recordId
FROM `topicA`
WHERE exists(header.correlationId) is true AND exists(header.recordId) is true
),
right as
(
SELECT STREAM header.recordId as tdfRecId
FROM `topicB`
WHERE exists(header.recordId) is true
)
SELECT STREAM left.correlationId, left.recordId AS tdsRecId, right.tdfRecId
FROM left INNER JOIN right
ON left.correlationId = right.correlationId
GROUP BY SLIDING(1,m)