Aggregations are stateful transformations that allow to group an unbounded set of inputs into sub-sets and then to aggregate each of these sub-sets into a single output; the reason why they are stateful is because they need to maintain the current state of computation between the application of each input.
To group a given input dataset into sub-sets, a key function needs to specified; the result of applying this key function to an input record will be used as a discriminator (sometimes called a pivot) to determine in what sub-set each input record is to be bucketed.
The specific transformation that each aggregation performs is described by the Aggregated Functions used in the input query. See a complete list of aggregate functions.
Notice that the behavior described above is precisely what a Table does. For any given key, there is the state will continuously be updated as new events with the given key are received. In the case of Aggregations, new events are represented by input records in the original dataset that will map to a given key, therefore ending up in a bucket or another.
Whenever Aggregations are used, the result will be a Table. Each entry will have the key set to the grouping discriminator, and the value set to the current state of computation for all input records matching the key.
The complete syntax for aggregations is:
SELECT (STREAM | TABLE) <aggregated projection1> [, aggregated projection2] ... [, aggregated projectionN] [, projection1] ... [, projectionM] FROM <source> [WINDOW BY <window description>] GROUP BY <expression> ;
The specific syntactical elements of the above are:
(STREAM | TABLE)
<source>
<aggregated projection1>
COUNT(*) as x
CAST(COUNT(*) as STRING) as stringed
[, aggregated projection2] ... [, aggregated projectionN]
[, projection1] ... [, projectionM]
GROUP BY
WITH
[WINDOW BY <window description>]
STREAM
GROUP BY <expression>
<expression>
Most of the rules and syntax described for Projections apply to aggregated projections as well, but there are some additional syntactical rules due to the specific nature of Aggregations.
COUNT(*) as _key.a
SUM(x) as _key
_key.a
COUNT(_key.b)
_key
<source>._key.a
COUNT(<source>._key.b)
INSERT INTO target-topic SELECT STREAM COUNT(*) AS records FROM input-topic GROUP BY field1;
As previously mentioned, the GROUP BY is used to determine the key of the query’s result; the above query will group all records in input-topic by the value of field1 in each record, and target-topic’s key will be the schema of field1.
input-topic
field1
target-topic
Just like in the case of the Projections, the Streaming mode takes an opinionated approach here and will simplify the result schema and Storage Format in case of single field structures.
In the case above, assuming for example that field1 is an integer, target-topic’s key will not be a structure with a single integer field1 field, but rather just the value field1; the resulting storage format is going to be INT, and the label field1 will be just dropped.
INT
In case the above behavior is not desirable, specifying an explicit alias will allow to override it.
INSERT INTO target-topic SELECT STREAM COUNT(*) AS records, FROM input-topic GROUP BY field1 AS keep_me;
This will result in target-topic’s key being a structure with a field keep_me, with the same schema as field1. The corresponding Storage Format will match the input format for input-topic, AVRO or JSON.
keep_me
AVRO
JSON
An example will help clarifying how aggregations work, as well as how they behave depending on the semantics of the input dataset they are being applied to.
Assume that we have a Kafka topic (gaming-sessions) containing these records:
gaming-sessions
What this data describes is a series of gaming sessions, performed by a player. For each gaming session, the player (used as Key), the points achieved, and the country where the game took place.
Let’s now assume that what we want to calculate is the total points achieved by players in a given country, as well as the average points per game.One way to achieve the desired behavior is to build a Stream from the input topic. Remember that this means that each event will be considered in isolation.
INSERT INTO target-topic SELECT STREAM SUM(points) AS total_points , AVG(points) AS average_points FROM gaming-sessions GROUP BY country
Explanations for each element of this syntax can be found below, but very briefly, this builds a Stream from gaming-sessions, grouping all events by country (e.g. all records with the same country will be aggregated together) and finally calculating the total (total_points) and the average (average_points) of all points for a given group.
country
total_points
average_points
The final result in target-topic will be (disregarding intermediate events):
The results are calculated from the totality of the input results, because in a Stream, each event is independent and unrelated with any other.
We now want to calculate something similar to what we obtain before, but we want to keep track only of the last session played by a player, as it might give us a better snapshot of both the performances and locations of players worldwide. The statistics we want to gather are the same as before: total and average of points per country.
The way to achieve the above requirement is simply by reading gaming-sessions into a Table, rather than a Stream, and aggregate it.
INSERT INTO target-topic SELECT TABLE SUM(points) AS total_points , AVG(points) AS average_points FROM gaming-sessions GROUP BY country
Compare this with the behavior from the previous scenario; the key difference is that the value for uk includes only willy and noel, and that’s because the last event moved billy to the spain bucket, removing all data regarding him from his original group.
uk
willy
noel
billy
spain
The previous section described the behavior of aggregations when applied to Tables, and highlighted how aggregations not only need to be able to sum the latest values received to the current state of a group, but also need to be able to subtract an obsolete value that might have just been assigned to a new group. As we saw above, it is easy to do this in case of SUM and AVG.
SUM
AVG
However, consider what would happen if we wanted to add a new statistics to the ones calculated above: the maximum points achieved by a player in a given country.
In the Stream scenario, this can be achieved by simply adding MAXK(points,1) as max_points to the query.
MAXK(points,1) as max_points
INSERT INTO target-topic SELECT STREAM SUM(points) AS total_points , AVG(points) AS average_points , MAXK(points,1) AS max_points FROM gaming-sessions GROUP BY country
In the Table scenario however things are different. We know that the final event moves billy from uk to spain, so we need to subtract from uk all information related to billy. In case of SUM and AVG that’s possible, because subtracting billy’s points to the current value of the aggregation will return the correct result.
But that’s not possible for MAXK. MAXK(points, 1) only keeps track of 1 value, the highest seen so far, and if that’s removed, what value should take its place? The aggregation function cannot inspect the entire topic data to search for the correct answer. The state the aggregation function has access to, is that single number 90, which now is invalid.
MAXK
MAXK(points, 1)
1
90
This problem explains why some aggregated functions can be used on Streams and Tables both (e.g. SUM), while others can be used only on Streams (e.g. MAXK).
The key factor is usually whether an hypothetical subtraction operation would need access to all previous inputs to calculate its new value (like MAXK) or just the aggregated state (like SUM).
A common scenario that arises in the context of aggregations is the idea of adding a time dimension to the grouping logic expressed in the query. For example, one might want to group all input records by a given field and that were received within 1 hour of each other.
To express the above Lenses SQL Streaming supports windowed aggregations, by adding a WINDOW BY clause to the query. Given their semantics, tables cannot be aggregated using a window, because it would not make sense. A table represents the latest_ state of a set of (Key, Value) pairs, not a series of events interspersed over a time-continuum. Thus trying to window them is not a sensible operation.
WINDOW BY
Details of the supported types of windows, their specific syntax as well as the fundamental relationship between stream processing and time can be found in the Time and Windows page of this documentation.
Filtering the input into aggregated queries is similar to filtering non-aggregated ones. When using a WHERE <expression> statement, where <expression> is a valid SQL boolean expression, all records that do not match the predicate will be left out.
WHERE <expression>
However, aggregated functions add a further dimension to what it might be desirable to filter.
We might be interested in filtering base on some conditions of the groups themselves; for example, we might want to count all inpuut records that have a given value of field1, but only if the total is greater than 3. In this case, WHERE would not help, because it has not access to the groups nor to the results of the aggregated projections. The below query is what is needed.
WHERE
INSERT INTO target-topic SELECT STREAM COUNT(*) as sessions FROM gaming-sessions GROUP BY country HAVING sessions > 3;
The above query uses the HAVING clause to express a filter at a grouping level. Using this feature it is possible to express a predicate on the result of aggregated projections and filter out the output records that do not satisfy it.
HAVING
Note that only aggregated projections specified in the SELECT clause can be used within the HAVING clause.
SELECT
On this page