5.0
Joins
Joins allow rows from different sources to be combined.
Lenses allows two sources of data to be combined based either on the equality of their _key facet or using a user provided expression.
A query using joins looks like a regular query apart from the definition of its source and in some cases the need to specify a window expression:
SELECT (STREAM|TABLE)
<projection>
FROM
<sourceA> [LEFT|RIGHT|INNER|OUTER] JOIN
<sourceB> [ON <joinExpression>]
[WITHIN <duration>]
WHERE
<filterExpression>;
- projection: the projection of a join expression differs very little from a regular projection. The only
important consideration is that since data is selected from two sources, some fields may be common to both. The syntax
table.field
is recommended to avoid this type of problem. - sourceA/sourceB : the two sources of data to combine.
- window: only used if two streams are joined. Specifies the interval of time to search matching results in.
- joinExpression: a boolean expression that specifies how the combination of the two sources is calculated.
- filterExpression: a filter expression specifying which records should be filtered.
Join types
When two sources of data are combined it is possible to control which records to keep when a match is not found:
Disclaimer: The following examples do not take into consideration windowing and/or table materialization concerns; these will be covered in the windowing section .
Customers
_key | id | name |
---|---|---|
1 | 1 | John |
2 | 2 | Frank |
Orders
_key | customer_id | item |
---|---|---|
1 | 1 | Computer |
2 | 1 | Mouse |
3 | 3 | Keyboard |
INNER JOIN / JOIN
WITH customersTable AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);
INSERT INTO result
SELECT STREAM
customersTable.name
, ordersStream.item
FROM
ordersStream JOIN customersTable
ON customersTable.id = ordersStream.customer_id;
This join type will only emit records where a match has occurred.
name | item |
---|---|
John | Computer |
John | Mouse |
(Notice there’s no item with customer.id = 2
nor is there a customer with id = 3
so this two rows are not present in the result).
LEFT JOIN
WITH customersTable AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);
INSERT INTO result
SELECT STREAM
customersTable.name
, ordersStream.item
FROM
ordersStream LEFT JOIN customersTable
ON customersTable.id = ordersStream.customer_id;
This join type selects all the records from the left side of the join regardless of a match:
name | item |
---|---|
John | Computer |
John | Mouse |
null | Keyboard |
(Notice all the rows from orders are present but since no customer.id = 3
no name can be set.)
RIGHT JOIN
WITH customersTable AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);
INSERT INTO result
SELECT TABLE
customersTable.name
, ordersStream.item
FROM
customersTable RIGHT JOIN ordersStream
ON customersTable.id = ordersStream.customer_id;
A right join can be seen as a mirror of a LEFT JOIN. It selects all the records from the right side of the join regardless of a match:
name | item |
---|---|
John | Computer |
John | Mouse |
null | Keyboard |
OUTER JOIN
WITH customersStream AS (SELECT STREAM * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);
INSERT INTO result
SELECT TABLE
customersStream.name
, ordersStream.item
FROM
ordersStream OUTER JOIN customersStream
ON customersTable.id = ordersStream.customer_id;
An outer join can be seen as the union of left and right joins. It selects all records from the left and right side of the join regardless of a match happening:
name | item |
---|---|
John | Computer |
John | Mouse |
null | Keyboard |
Frank | null |
Matching expression (ON)
By default, if no ON expression is provided, the join will be evaluated based on the equality of the _key facet. This means that the following queries are equivalent:
SELECT TABLE *
FROM customers JOIN orders;
SELECT TABLE *
FROM customers JOIN orders
ON customers._key = orders._key;
When an expression is provided however, there are limitations regarding what kind of expressions can be evaluated.
Currently, the following expression types are supported:
- Equality expressions using equality (=) with one table on each side:
customers.id = order.user_id
customers.id - 1 = order.user_id - 1
substr(customers.name, 5) = order.item
- Any boolean expression which references only one table:
len(customers.name) > 10
substr(customer.name,1) = "J"
len(customer.name) > 10 OR customer_key > 1
- Allowed expressions mixed together using an AND operator:
customers._key = order.user_id AND len(customers.name) > 10
len(customers.name) > 10 AND substr(customer.name,1) = "J"
substr(customers.name, 5) = order.item AND len(customer.name) > 10 OR customer_key > 1
Any expressions not following the rules above will be rejected:
- More than one table is referenced in each side of the equality operator
concat(customer.name, item.name) = "John"
customer._key - order.customer_id = 0
- a boolean expression not separated by an AND references more than one table:
customer._key = 1 OR customer._key = order.customer_id
Windowing: stream to stream joins (WITHIN )
When two streams are joined Lenses needs to know how far away in the past and in the future to look for a matching record.
This approach is called a “Sliding Window” and works like this:
Customers
arrival | id | name |
---|---|---|
t = 6s | 1 | Frank |
t = 20s | 2 | John |
Purchases
arrival | item | customer_id |
---|---|---|
t = 10s | Computer | 1 |
t = 11s | Keyboard | 2 |
SELECT STREAM
customers.name
, orders.item
FROM
customers LEFT JOIN orders WITHIN 5s
ON customers.id = orders.customer_id
WITHIN 5s;
At t=10
, when both the Computer and the Keyboard records arrive, only one customer can be found
within the given time window (the specified window is 5s thus the window will be [10-5,10+5]s ).
This means that the following would be the result of running the query:
name | item |
---|---|
Frank | Computer |
null | Keyboard |
Note: John will not match the Keyboard purchase since t=20s is not withing the window interval [10-5,10+5]s.
Read more about time-windows here.
Non windowed joins (stream to table and table to stream)
When streaming data, records can be produced at different rates and even out of order. This means that often a match may not be found because a record hasn’t arrived yet.
The following example shows an example of a join between a stream and a table where the arrival of the purchase information is made available before the customers’ information is.
(Notice that the purchase of a “Keyboard” by customer_id = 2
is produced before the record with the customer details is.)
Customers
arrival | id | name |
---|---|---|
t = 0s | 1 | Frank |
t = 10s | 1 | John |
Purchases
arrival | item | customer_id |
---|---|---|
t = 0s | Computer | 1 |
t = 1s | Keyboard | 2 |
Running the following query:
WITH customersTable AS (SELECT TABLE * FROM customers);
SELECT STREAM
customers.name
, item.item
FROM
orders LEFT JOIN customersTable
ON customers.id = orders.id
would result in the following:
name | item |
---|---|
Frank | Computer |
null | Keyboard |
If later, the record for customer_id = 2
is available:
arrival | id | name |
---|---|---|
t = 10s | 2 | John |
a record would be emitted with the result now looking like the following:
name | item |
---|---|
Frank | Computer |
null | Keyboard |
John | Keyboard |
Notice that “Keyboard” appears twice, once for the situation where the data is missing and another for when the data is made available.
This scenario will happen whenever a Stream is joined with a Table using a non inner join.
Table/Stream joins compatibility table
The following table shows which combinations of table/stream joins are available:
Left | Right | Allowed types | Window | Result |
---|---|---|---|---|
Stream | Stream | All | Required | Stream |
Table | Table | All | No | Table |
Table | Stream | RIGHT JOIN | No | Stream |
Stream | Table | INNER, LEFT JOIN | No | Stream |
Key decoder types
In order to evaluate a join between two sources, the key facet for both sources has to share the same initial format.
If formats are not the same the join can’t be evaluated. To address this issue, an intermediate topic can be created with the correct format using a STORE AS statement. This newly created topic can then be created as the new source.
Co-partitioning
In addition to the constraint aforementioned, when joining, it’s required that the partition number of both sources be the same.
When a mismatch is found, and additional step will be added to the join evaluation in order to guarantee an equal number of partitions between the two sources. This step will write the data from the source topic with a smaller count of partitions into an intermediate one.
This newly created topic will match the partition count of the source with the highest partition count.
In the topology view this step will show up as a Repartition Node.
ON expressions and key change
Joining two topics is only possible if the two sources used in the join share the same key shape and decoder.
When an ON statement is specified, the original key facet will have to change so that it matches the expression provided in the ON statement. Lenses will do this calculation automatically. As a result, the key schema of the result will not be the same as either one of the sources. It will be a lenses calculated object equivalent to the join expression specified in the query.
Nullability
As discussed when addressing join types , some values may have null values when non inner joins are used.
Due to this fact, field that may have null values will be typed as union of null and their original type.
Joining more than 2 sources
Within the same query, joins may only be evaluated between two sources.
When a join between more than two sources is required, multiple queries can be combined using a WITH statement:
WITH customerOrders AS (
SELECT TABLE
customer.name
, order.item,
, order._key AS order_id
FROM
customers INNER JOIN orders
ON orders.customer_id = customers.id
);
INSERT INTO target
SELECT TABLE *
FROM customerOrders INNER JOIN deliveryAddress
ON customerOders.order_id = deliveryAddress.order_id;
Joining and Grouping
In order to group the results of a join, one just has to provide a GROUP BY
expressions after a join expression is specified.
SET defaults.topic.autocreate=true;
WITH customersTable AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);
WITH joined AS (
SELECT STREAM
customersTable.name
, ordersStream.item
FROM
ordersStream JOIN customersTable
ON customersTable.id = ordersStream.customer_id
GROUP BY customersTable.name;
);
Stream-Table/Table-Stream joins: table materialization
emmited | processed | id | name |
---|---|---|---|
t = 0s | t = 20s | 1 | Frank |
Purchases
arrival | processed | item | customer_id |
---|---|---|---|
t = 0s | t = 10s | Computer | 1 |
WITH customersStream AS (SELECT TABLE * FROM customers);
WITH ordersStream AS (SELECT STREAM * FROM orders);
INSERT INTO result
SELECT TABLE
customersStream.name
, ordersStream.item
FROM
ordersStream OUTER JOIN customersStream
ON customersTable.id = ordersStream.customer_id;
When a join between a table and a join is processed, lenses will, for each stream input (orders in the example above), look for a matching record on the specified table (customers).
Notice that record with Frank’s purchase information is processed at t = 10s
at which point the Frank’s Customer information hasn’t yet been processed. This means that no match will be found for this record.
At t=20s
however the record with Frank’s customer information is processed; this will only trigger the emission of a new record if an Outer Join is used.
Filter optimizations
There are some cases where filter expressions can help optimize a query. When a filter can be broken down into multiple steps so that some can be applied before the join node is evaluated. This type of optimization will reduce the number of records going into the join node and consequentially increase its speed.
For this reason, in some cases, filters will show up before the join in the topology node.