5.0

Lateral Joins

With Lateral Joins you can combine a data source with any array expression. As a result, you will get a new data source, where every record of the original one will be joined with the values of the lateral array expression.

Assume you have a source where elements is an array field:

field1field2elements
a1[1, 2]
b2[3, 4, 5]
c3[6]

Then a Lateral Join of source with elements is a new table, where every record of source will be joined with all the single items of the value of elements for that record:

field1field2elementselement
a1[1, 2]1
a1[1, 2]2
b2[3, 4, 5]3
b2[3, 4, 5]4
b2[3, 4, 5]5
c3[6]6

In this way the single elements of the array becomes available and can be used as a normal field in the query.

Syntax 

A query using lateral joins looks like a regular query apart from the definition of its source:

SELECT (STREAM|TABLE)
  <projection>
FROM
  <source> LATERAL
  <lateralArrayExpression> AS <lateralAlias>
WHERE
  <filterExpression>;

  • projection: as in a single-table select, all the fields from <source> will be available in the projection. In addition to that, also the special field <lateralAlias> will be available.
  • source: the source of data. Note: it is not possible to specify a normal join as a source of a lateral join. This limitation will be removed in the future.
  • lateralArrayExpression: any expression that evaluates to an array. Fields from <source> are available for defining this expression.
  • filterExpression: a filter expression specifying which records should be filtered.

Single Lateral Joins 

Assume you have a topic batched_readings populated with the following records:

batched_readings

_keymeter_idreadings
a1[100, 80, 95, 91]
b2[87, 93, 100]
c1[88, 89, 92, 94]
d2[81]

As you can see, readings is a field containing arrays of integers.

We define a processor as follows:

INSERT INTO readings
SELECT STREAM
    meter_id,
    reading
 FROM
    batched_readings
    LATERAL readings AS reading
WHERE 
    reading > 90

The processor will emil the following records:

_keymeter_idreading
a1100
a195
a191
b293
c192
c194

Things to notice:

  • We used the aliased lateral expression reading both in the projection and in the WHERE.
  • The _key for each emitted record is the one of the original record. As usual you can change this behaviour projecting on the key with a projection like expression AS _key.
  • batched_readings records with keys a and b have been split into multiple records. That’s because they contain multiple readings greater than 90.
  • Record d disappeared, because it has no readings greater than 90

Multiple Lateral Joins 

It is possible to use multiple LATERAL joins in the same FROM clause.

Assume you have a topic batched_nested_readings populated with the following records:

batched_readings

_keymeter_idnested_readings
a1[[100, 80], [95, 91]]
b2[[87], [93, 100]]
c1[[88, 89], [92, 94]]
d2[[81]]

Notice how nested_readings contains arrays of arrays of integers.

To get the same results of the previous example, we use a first lateral join to unpack the first level of nested_readings into an array that we call readings. We then define a second lateral join on readings to extract the single values:

INSERT INTO readings
SELECT STREAM
    meter_id,
    reading
 FROM
    batched_readings
    LATERAL nested_readings AS readings
    LATERAL readings as reading
WHERE 
    reading > 90

Complex Lateral expressions 

In the previous example we used a simple field as the <lateralArrayExpression>. In the section we will see how any array expression can be used for it.

Assume you have a topic day_night_readings populated with the following records:

day_night_readings

_keymeter_idreadings_dayreadings_night
a1[100, 80][95, 91]
b2[87, 93][100]
c1[88][89, 92, 94]
d2[81][]

We can make use of Array Functions to lateral join day_night_readings on the concatenation of the two readings fields:

INSERT INTO readings
SELECT STREAM
    meter_id,
    reading
 FROM
    batched_readings
    LATERAL flatten([readings_day, readings_night]) AS reading
WHERE 
    reading > 90

The processor such defined will emit the records

_keymeter_idreading
a1100
a195
a191
b293
c192
c194