In this tutorial, we will see how to use Lenses SQL to alter the shape of your records.
We will learn how to
In our example, we are getting data from speed sensors from a speed car circuit.
The upstream system registers speed measurement events as records in a Kafka topic.
An example of such an event is the following:
KEY: "9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C" VALUE: { "car_id": "car_2", "speedMph": 84, "sensor": { "id": "sensor_1", "lat": 45, "long": 0 }, "event_time": 159230614 }
We can replicate such a structure running the following query in SQL Studio:
CREATE TABLE car_speed_events( car_id string , speedMph int , sensor.id string , sensor.lat double , sensor.long double , event_time long ) FORMAT (string, avro);
Each event is keyed by a unique string generated by the upstream system.
We can again use SQL Studio to insert some data to play with:
INSERT INTO car_speed_events( _key , car_id , speedMph , sensor.id , sensor.lat , sensor.long , event_time ) VALUES ("9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C", "car-1", 50, "sensor_1", 45.1241, 1.177, 1591970345), ("0958C1E2-804F-4110-B463-7BEA25DA680C", "car-2", 54, "sensor_2", 45.122, 1.75, 1591970346), ("552E9A77-D23D-4EC7-96AB-7F106637ADBC", "car-3", 60, "sensor_1", 45.1241, 1.177, 1591970347), ("00F44EDD-1BF1-4AE8-A7FE-622F720CA3BC", "car-1", 55, "sensor_2", 45.122, 1.75, 1591970348), ("A7F19576-6EFA-40A0-91C3-D8999A41F453", "car-1", 56, "sensor_1", 45.1241, 1.177, 1591970348), ("5273803E-7F34-44F8-9A3B-B6CB9D24C1F9", "car-2", 58, "sensor_3", 45.123, 1.176, 1591970349), ("8401CD82-ABFF-4D1B-A564-986DC971F913", "car-2", 60, "sensor_1", 45.1241, 1.177, 1591970349), ("F1CF77A0-6CFB-472F-B368-28F011F46C64", "car-1", 53, "sensor_3", 45.123, 1.176, 1591970349);
In this section, we are only interested in the speed of single cars, and we do not care about all the other fields.
We want to use the car id, which now is part of the record Value, to become the new key (using the special as _key syntax). We also want the car speed as the new record Value.
as _key
To achieve that we can create a new SQL Processor using some simple projections:
SET defaults.topic.autocreate=true; INSERT INTO only_car_speeds SELECT STREAM car_id AS _key , speedMph FROM car_speed_events;
Checking the records emitted by the processor we see that the shape of the records is
KEY: "car-1" VALUE: { "speedMph": 50 }
We want to avoid that intermediate wrapping of speedMph inside an object. To do that we can tell Lenses to unwrap the value with the special as _value syntax, saving some bytes and CPU cycles:
speedMph
as _value
SET defaults.topic.autocreate=true; INSERT INTO only_car_speeds_unwrapped SELECT STREAM car_id AS _key , speedMph AS _value FROM car_speed_events;
Now the shape of the records is what we had in mind:
KEY: "car-1" VALUE: 50
This time we want to do some more complex manipulation. We want to convert the speed from Mph to Kmph, and we would also want to build a nice string describing the event.
An example of an output record would be:
{ "speedKmph": 160.93, "description": "Car car_1 was running at 100Mph " }
In this case, we are using CONCATENATE to concatenate multiple strings, CAST to convert an expression to another type, and *, the usual multiplication operator.
CONCATENATE
CAST
*
You can find the full list of supported functions in our documentation.
SET defaults.topic.autocreate=true; INSERT INTO car_speeds_kmph SELECT STREAM speedMph * 1.60934 AS speedKmph , CONCATENATE( 'Car ', car_id, ' was running at ', CAST(speedMph AS STRING), 'Mph') AS description FROM car_speed_events;
If we check the resulting records, we can see that we obtained the shape we were looking for. Please note that the keys have been left untouched:
KEY: "9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C" VALUE: { speedKmph:"80.467", description:"Car car-1 was running at 50Mph" }
In this last example, we will show how to create composite keys and values in our projections.
We want both the sensor id and the event_time as the record Key. For the record Value, we want the car_id and the speed, expressed both as Mph and Kmph.
id
event_time
car_id
KEY: { "sensor_id": "sensor_1", "event_time": 1591970345 } VALUE: { "car_id": "car_1", "speed": { "mph": 100, "kmph": 161 } }
Lenses SQL allows as to use nested aliases to build nested structures. You have to put some dots in your aliases.
SET defaults.topic.autocreate=true; INSERT INTO car_speds_by_sensor_and_time SELECT STREAM sensor.id AS _key.sensor_id , event_time AS _key.event_time , car_id , speedMph AS speed.mph , speedMph * 1.60934 AS speed.kmph FROM car_speed_events;
The resulting shape of the record is what we were aiming for:
KEY: { "sensor_id": "sensor_1", "event_time": 1591970345 } VALUE: { "car_id": "car_1", "speed": { "mph": 100, "kmph": 160 } }
Happy re-shaping!
On this page