Every message in Kafka comes with a timestamp, and Lenses Engine Streaming mode uses that by default when doing time-dependent operations, like aggregations and joins.
Sometimes though that timestamp is not exactly what you need, and you would like to use a field in the record value or key as the new timestamp.
In our toy example, we have a simple topic where electricity meter readings events are collected:
CREATE TABLE electricity_events( KW double , customer_id int , event_time long ) FORMAT (string, avro);
We can also insert some example data to do our experiments:
INSERT INTO electricity_events( KW , customer_id , event_time ) VALUES (1.0, 1, 1592848400000), (2.0, 2, 1592848400000), (1.5, 3, 1592848400000), (2.0, 1, 1592848405000), (1.0, 2, 1592848405000), (2.5, 3, 1592848405000), (3.0, 1, 1592848410000), (0.5, 2, 1592848410000), (4.0, 3, 1592848405000) ;
If you query the events, you can see that Kafka sets a timestamp for each record. That timestamp is, in our case, the time of when the record was inserted. As you can see, it is totally unrelated to the event_time field we have in the payload.
event_time
[ {"value":{"KW":1,"customer_id":1,"event_time":1592848400000},"metadata":{...,"timestamp":1594041840812,...}}, {"value":{"KW":2,"customer_id":2,"event_time":1592848400000},"metadata":{...,"timestamp":1594041840821,...}}, {"value":{"KW":1.5,"customer_id":3,"event_time":1592848400000},"metadata":{...,"timestamp":1594041840828,...}}, {"value":{"KW":2,"customer_id":1,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840834,...}}, {"value":{"KW":1,"customer_id":2,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840842,...}}, {"value":{"KW":2.5,"customer_id":3,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840848,...}}, {"value":{"KW":3,"customer_id":1,"event_time":1592848410000},"metadata":{...,"timestamp":1594041840855,...}}, {"value":{"KW":0.5,"customer_id":2,"event_time":1592848410000},"metadata":{...,"timestamp":1594041840862,...}}, {"value":{"KW":4,"customer_id":3,"event_time":1592848405000},"metadata":{...,"timestamp":1594041840868,...}} ]
(if you want to know more how you can configure the timestamp set by Kafka for the records, check our documentation)
We would like to transform our original stream of events, aggregating events with a hopping window of 10s width and an increment of 5s, computing the average for each window.
10s
5s
You can create a new processor that streams those averages, using the special WINDOW BY ... syntax:
WINDOW BY ...
SET defaults.topic.autocreate=true; INSERT INTO electricity_events_avg_wrong SELECT STREAM customer_id , AVG(KW) as KW FROM electricity_events WINDOW BY HOP 10s,5s GROUP BY customer_id
For customer 1, we have three events in input, with a 5s delay between them, so we expect four output events for that customer, since 4 is the number of hopping windows involved.
1
ButChecking the emitted records we see that only two are produced.
This is because by default windowing operations works on the record timestamp, and in our case all the timestamps are pretty much the same, and they coincide with the time the records were inserted.
Fortunately e can change this behavior using the special EVENTTIME BY ... syntax, specifying an expression to be used as a timestamp:
EVENTTIME BY ...
SET defaults.topic.autocreate=true; INSERT INTO electricity_events_avg SELECT STREAM customer_id , AVG(KW) as KW FROM electricity_events EVENTTIME BY event_time WINDOW BY HOP 10s,5s GROUP BY customer_id
As you can see, the results have been windowed using event_time as the timestamp:
[ {"key":{"value":1,"window":{"start":1592848395000,"end":null}},"value":{"customer_id":1,"KW":1}, ...}, {"key":{"value":1,"window":{"start":1592848400000,"end":null}},"value":{"customer_id":1,"KW":1.5}, ...}, {"key":{"value":1,"window":{"start":1592848405000,"end":null}},"value":{"customer_id":1,"KW":2.5}, ...}, {"key":{"value":1,"window":{"start":1592848410000,"end":null}},"value":{"customer_id":1,"KW":3}, ...} ]
To know more about time and time windows in Lenses SQL, review Streaming Time Windows.
On this page