In this tutorial, we will see how to use Lenses SQL to extract, manipulate and inspect the single elements of an array.
You will learn to:
LATERAL
In this example, we are getting data from a sensor.
Unfortunately, the upstream system register the readings in batches, while what we need is a single record for every reading.
An example of such a record is the following:
KEY: 1 VALUE: { "meter_id": 1, "readings": [100, 101, 102] }
Notice how each record contains multiple readings, inside the reading array field.
reading
We can replicate such a structure running the following query in SQL Studio:
CREATE TABLE batched_readings( meter_id int, readings int[] ) FORMAT(int, AVRO);
We can again use SQL Studio to insert some data to play with:
INSERT INTO batched_readings(_key, meter_id, readings) VALUES (1, 1, [100, 92, 93, 101]), (2, 2, [81, 82, 81]), (3, 1, [95, 94, 93, 96]), (4, 2, [80, 82])
What we want to obtain is a new topic readings, where each record contain a single reading, together with its meter_id. Considering the first record, we expect to explode it to four different records, one for each reading:
readings
meter_id
KEY: 1 VALUE: { "meter_id": 1, "reading": 100 } ----------------------------- KEY: 1 VALUE: { "meter_id": 1, "reading": 92 } ----------------------------- KEY: 1 VALUE: { "meter_id": 1, "reading": 93 } ----------------------------- KEY: 1 VALUE: { "meter_id": 1, "reading": 101 }
In LensesSQL you can easily achieve that with the special LATERAL syntax (see the documentation for more details).
You can create a processor defined as:
SET defaults.topic.autocreate=true; INSERT INTO readings SELECT STREAM meter_id, reading FROM batched_readings LATERAL readings as reading
The magic happens in batched_readings LATERAL readings as reading. With that we are basically saying:
batched_readings LATERAL readings as reading
batched_readings
We can then use in the SELECT both the original fields and the new reading field.
SELECT
If you save the processor and run it, you will see that it will emit the records we expected.
One of the powerful features of a LATERAL join is that the expression you put in the LATERAL can be used as a normal field. This means that you can then use it for example also in a WHERE or in a GROUP BY.
WHERE
GROUP BY
In this section we will see how to filter records generated by a LATERAL using a normal WHERE.
We want to modify our previous processor in order to emit only the readings greater than 95.
95
To do that is enough to use reading as if it were a normal field, in the WHERE section:
SET defaults.topic.autocreate=true; INSERT INTO readings SELECT STREAM meter_id, reading FROM batched_readings LATERAL readings as reading WHERE reading > 95
Running the processor we get the records
KEY: 1 VALUE: { "meter_id": 1, "reading": 100 } ----------------------------- KEY: 1 VALUE: { "meter_id": 1, "reading": 101 } ----------------------------- KEY: 3 VALUE: { "meter_id": 1, "reading": 96 }
This example is similar to the previous one. The only difference is that the readings are now stored in batches of batches:
CREATE TABLE batched_readings_nested( meter_id int, nested_readings int[][] ) FORMAT(int, AVRO);
As you can see, nested_readings is an array whose elements are array of integers.
nested_readings
We can again use SQL Studio to insert some data:
INSERT INTO batched_readings_nested(_key, meter_id, nested_readings) VALUES (1, 1, [[100, 92], [93, 101]]), (2, 2, [[81], [82, 81]]), (3, 1, [[95, 94, 93], [96]]), (4, 2, [[80, 82]])
We would like to define a processor that emits the same records of the previous one.
In this case though we are dealing with nested_readings, that is a multi-level array, so a single LATERAL join is not enough. But nesting a LATERAL inside another will do the job:
SET defaults.topic.autocreate=true; INSERT INTO readings SELECT STREAM meter_id, reading FROM batched_readings_nested LATERAL nested_readings as readings LATERAL readings as reading
This is roughly what happens in the FROM clause:
FROM
batched_readings_nested LATERAL nested_readings as readings
... LATERAL readings as reading
In this section we will see how it is possible to use any expression as the right hand side of a LATERAL join, as long as it gets evaluated to an array.
We have a table where the meter readings are split into two columns, readings_day and readings_night.
readings_day
readings_night
CREATE TABLE day_night_readings( meter_id int, readings_day int[], readings_night int[] ) FORMAT(int, AVRO);
Let’s insert the same data as the first example, but where the readings are split across the two columns.
INSERT INTO day_night_readings(_key, meter_id, readings_day, readings_night) VALUES (1, 1, [100, 92], [93, 101]), (2, 2, [81], [82, 81]), (3, 1, [95, 94, 93], [96]), (4, 2, [80], [81])
To extract the readings one by one, we need first to concatenate the two arrays readings_day and readings_night. We can achieve that using flatten, one of our array functions. We can then use the concatenated array in a LATERAL join:
flatten
SET defaults.topic.autocreate=true; INSERT INTO readings SELECT STREAM meter_id, reading FROM day_night_readings LATERAL flatten([readings_day, readings_night]) as reading
The processor defined above will emit the records
KEY: 1 VALUE: { "meter_id": 1, "reading": 100 } ----------------------------- KEY: 1 VALUE: { "meter_id": 1, "reading": 92 } ----------------------------- KEY: 1 VALUE: { "meter_id": 1, "reading": 93 } ----------------------------- KEY: 1 VALUE: { "meter_id": 1, "reading": 101 } ----------------------------- KEY: 2 VALUE: { "meter_id": 2, "reading": 81 } ...
as we expected.
On this page