Joins

A join operation allows the user to assemble new records by combining two or more source records. When a join operator is used, it is required to specify the part of the data to join on - a joining condition. The default join is an INNER join. This means that for each joined record that is produced, there must exist left and right source records whose joining condition match. A LEFT join means the produced record will contain all the records from left even if a right-hand source record has not been found. A RIGHT join is the opposite of a LEFT join. There is one more join type, FULL JOIN. FULL JOIN returns all matching records from both sources whether the record match is found or not.

When joining two or more streams, the result is a new stream.

Join Condition

In the Lenses SQL, the join between two streams (or tables, they are dual) is fulfilled based on the record Key value. That is not to say that you can only use the Key alone in the join condition. The engine allows the user to join directly on the key:

...
    FROM payments INNER JOIN customer on payments._key = customer._key

but it is not limited to that. A user can join on the Key fields or even Value fields, for example:

    ...
    FROM payments INNER JOIN customer on payments.customerId = customer._key

--OR
    ...
    FROM payments INNER JOIN customer on payments.customerId = customer.id

The requirement is that the record Key storage type (STRING, INT, AVRO) should be the same on both left and right source records. Whenever the join condition uses a field from the Key or a Value, Lenses SQL will re-key the record before applying the join. The re-keyed record Key contains the corresponding field STRING value (even if the field is not a STRING the value is converted into one).

Note

Lenses SQL supports joins on the Key, but also allows the user to join based on Value/Key fields.

Repartition

Joins on streaming data have special requirements. And this is driven by the implementation of Apache Kafka Streams API, which the Lenses SQL engine uses. A stream (which maps to a Kafka topic) can be partitioned to allow better throughput. But this means whenever stream S1 and stream S2 are joined, the partition size should correspond. The reason for this is driven by the rule described above where a join is requiring the Key.

Having Kafka topics having different partition strategy is a common scenario. For example, with an order and order-detail topic, the partition count on the latter will be smaller since data volume is lower. To allow such a join, the SQL engine makes sure it brings the two in line. As a result, it will have to create an order-repartition (the name is just an illustration) matching the right side partition count. Such an operation will have a direct impact on performance since the entire topic is copied.

For each continuous SQL query, the user can see the path data takes from input to the output - the topology viewer.

Joining the streams

In table is a stream and a stream is a table section, the duality of stream and table was described. Since a table is a stream and a stream is a table, combining them in a join is possible:

  • Stream-to-Stream Joins are always windowed joins – otherwise the memory and state required to compute the join would grow infinitely in size. Here, a newly received record from one of the streams is joined with the other stream’s records within the specified window interval to produce one result for each matching pair. The result of this operation is a new stream.
  • Table-to-Table Joins are join operations designed to be consistent with the ones in relational databases. Here, both changelog streams are materialized into local state stores first. When a new record is received from one of the streams, it is joined with the other stream’s materialized state stores to produce one result for each matching pair. A new table is produced representing the result stream of the join, which is also a changelog stream of the represented table.
  • Stream-to-Table Joins allow you to perform table lookups against a changelog stream (table) upon receiving a new record from another record stream. An example use case would be to enrich a stream of orders with the order details table. Only records received from the record stream will trigger the join and produce results, not vice versa. This results in a brand new stream containing the records resulted from the join.

Lenses materializes the stream flow using Apache Kafka Streams framework. The framework allows for these join operations:

Left Operand Right Operand Inner Join Left Join Outer Join
Stream Stream Yes Yes Yes
Table Table Yes Yes Yes
Stream Table Yes Yes No

Given the table above, here is a list of joins NOT possible by default in Kafka Streams API:

  • Table RIGHT JOIN Stream
  • Table OUTER JOIN Stream
  • Stream RIGHT JOIN Table

Lenses SQL engine allows the user to perform these operations. However, there are some costs associated with doing so. But before we provide you with more details, we need to give an overview of the context at hand. We already said that a RIGHT JOIN is expressed as a LEFT JOIN and as a result, the above list becomes the following:

  • Table LEFT JOIN Table
  • Table OUTER JOIN Table
  • Table LEFT JOIN Table

The challenge here is that a Table can only be joined with another one. The solution is to use an intermediary topic and then build the table required from that topic. Of course, this will impact performance since the data has to be written to a new topic and read again. The topology viewer for the flow will reflect such scenario. Given this information the above joins become:

Important

Lenses SQL transforms the flow as required, to allow for the join type to happen. Fully understand the implications of making joins which require going through an intermediary topic.

Using WITH

The reference syntax for SQL streaming contains the following snippet:

[WITH
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...),
   _ID_NAME_ AS (SELECT [STREAM] ...FROM _TOPIC_ ...)
]

This means that a user can break down the code complexity by defining smaller code blocks which can be reused. To give an example consider a product topic which needs to be joined with the orders topic. This is a typical scenario for stream-to-table join or a lookup. From the product record, only the value of the productName field is required.

...
INSERT INTO ...
WITH
productTable AS
 (
   SELECT productName
   FROM `product`
 )

SELECT order.value
      , productTable.productName
FROM  order JOIN productTable ON order.productId = productTable._key

The examples above defines a table but similarly, a stream can be defined by adding the STREAM keyword:

...
INSERT INTO ...
WITH
productStream AS
 (
   SELECT productName
   FROM `product`
 )

Any names registered via WITH, in the example above product, can be referenced after its definition. If you need to define multiple entries, use WITH and separate entries using a comma. For example:

WITH
productTable as
 (
   SELECT productName
   FROM `product`
 ),
userTable as
 (
    SELECT firstName, secondName
    FROM `user`
 )
...

Join on Key

When joins were introduced at the beginning of the chapter, it was stated that two records are matched when their keys are equal. Here is how you would join orders and order details:

INSERT INTO orders_enhanced
SELECT STREAM
      o.orderNumber
    , o.status
    , SUM(od.quantityOrdered * od.priceEach) total
FROM  `order_details` AS od
    INNER JOIN `orders` AS o
        ON o._key = od._key
GROUP BY TUMBLE(2,s),o.orderNumber

Important

You can not join two records when their Key storage format differs.

When joining streams the join needs to happen over a time window. The GROUP BY tumble(2,s) will instruct the engine to perform the aggregation on a 2 seconds interval.

All the functions described here can be used in the SELECT projection. However, for these functions SUM, MIN, MAX, COUNT grouping is required.

INSERT INTO `orders_enhanced`
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , concat(od.productCode,'-',p.productName) AS productName
FROM  `order_details` AS od
    LEFT JOIN `product` as p
        ON p.productCode = od.productCode
GROUP BY TUMBLE(4,s)

Although GROUP BY is still used, it is not actually applying any grouping since no grouping fields were defined. The operator has been overloaded by the Lenses SQL engine to allow describing the time window required for the join.

Join on Fields

It is not always the case that the record Key is actually the value to join on. Lenses SQL allows the user to chose a field from the record Key or even the Value, in order to apply the join on.

INSERT INTO `order_details`
SELECT STREAM
    o.orderNumber
    , o.status
    , o.flags
    , od.productCode
FROM  `order_details` AS od
    INNER JOIN `orders` AS o
        ON o.orderNumber = od.orderNumber
GROUP BY TUMBLE(2,s)

There is a trade-off here. Joining on a field like the above means that the stream needs to be remapped to allow for the new key. All groupings will result in a STRING key. The join condition is not limited to a single field only, multiple fields can be used as part of the condition. The resulting record Key will be a string concatenation of all involved values.

Important

Joining on a value field(s) will remap the stream/table and the new key type will be a STRING. Remapping a stream/table has its cost since it requires data to be copied and reread.

The standard way to handle joins with a table is to define the table via WITH. An optimal solution for joining orders with products to get the product name attached to the order looks like this:

INSERT INTO `orders_enhanced`
WITH
productTable AS
 (
   SELECT productName
   FROM `product`
 )
SELECT STREAM
    od.orderNumber
    , od.productCode
    , od.quantityOrdered
    , od.priceEach
    , od.orderLineNumber
    , p.productName
FROM  `order_details` AS od
    LEFT JOIN  productTable AS p
        ON p._key = od.productCode