Sometimes you have a topic that is almost exactly what you need, except that the key of the record requires a bit of massaging.
In our example, we have a topic containing events coming from temperature sensors.
Each record contains the sensor’ measured temperature, the time of the measurement, and the id of the sensor. The key is a unique string (for example, a UUID) that the upstream system assigns to the event.
You can replicate the example, by creating a topic in SQL Studio:
CREATE TABLE temperature_events( sensor_id string , temperature int , event_time long ) FORMAT (string, avro);
We can also insert some example data to do our experiments:
INSERT INTO temperature_events( _key , sensor_id , temperature , event_time ) VALUES ("9E6F72C8-6C9F-4EC2-A7B2-C8AC5A95A26C", "sensor-1", 12, 1591970345), ("0958C1E2-804F-4110-B463-7BEA25DA680C", "sensor-2", 13, 1591970346), ("552E9A77-D23D-4EC7-96AB-7F106637ADBC", "sensor-3", 28, 1591970347), ("00F44EDD-1BF1-4AE8-A7FE-622F720CA3BC", "sensor-1", 13, 1591970348), ("A7F19576-6EFA-40A0-91C3-D8999A41F453", "sensor-1", 12, 1591970348), ("5273803E-7F34-44F8-9A3B-B6CB9D24C1F9", "sensor-2", 31, 1591970349), ("8401CD82-ABFF-4D1B-A564-986DC971F913", "sensor-2", 30, 1591970349), ("F1CF77A0-6CFB-472F-B368-28F011F46C64", "sensor-1", 13, 1591970349);
You can explore the topic in lenses to check the content and the shape of what you just inserted:
Let’s say that what you need is that same stream of events, but the record key should be the sensor_id instead of the UUID.
sensor_id
With the special SELECT ... as _key syntax, a few lines are enough to define our new re-keying processor:
SELECT ... as _key
SET defaults.topic.autocreate=true; INSERT INTO events_by_sensor SELECT STREAM sensor_id as _key FROM temperature_events;
The query above will take the sensor_id from the value of the record and put it as the new key. All the values fields will remain untouched:
Maybe the sensor_id is not enough, and for some reason, you also need the hour of the measurement in the key. In this case, the key will become a composite object with two fields: the sensor_id and the event_hour:
event_hour
SET defaults.topic.autocreate=true; INSERT INTO events_by_sensor_and_hour SELECT STREAM temperature_events.sensor_id as _key.sensor_id , temperature_events.event_time / 3600 as _key.event_hour FROM temperature_events;
As you can see, you can build composite objects in Lenses with ease just listing all the structure’s fields, one after the other. (If you want to know more about this check our documentation)
In the last example, the _key output storage format will be inferred automatically by the system as JSON. If you need more control, you can use the STORE AS clause before the SELECT.
_key
JSON
STORE AS
SELECT
The following example will create a topic as the previous one, but where the keys will be stored as AVRO:
AVRO
SET defaults.topic.autocreate=true; INSERT INTO events_by_sensor_and_hour_avro_key STORE KEY AS AVRO SELECT STREAM temperature_events.sensor_id as _key.sensor_id , FLOOR(temperature_events.event_time / 3600) as _key.event_hour FROM temperature_events;
Happy re-keying!