In this tutorial, we will see how we can read data from multiple topics, process it as needed, and write the results to as many output topics we need, all by using a single SQL Processor.
Let’s assume that we have a topic (game-sessions) that contains data regarding remote gaming sessions by users.
game-sessions
Each gaming session will contain:
The above structure represents the value of each record in our game-sessions topic.
value
Additionally, each record will is keyed by user details.
In light of the above, a record might look like the following (in JSON for simplicity):
{ "key":{ "pid": 1, "name": "Billy", "surname": "Lagrange", "age": 30 }, "value":{ "points": 5, "sessionMetadata": { "country": "Italy", "language": "IT" } } }
Finally, let’s assume we also have another, normalised, compacted topic user-details, keyed by an int matching the pid from topic game-sessions and containing user information like address and period of membership to the platform.
user-details
int
pid
{ "key": 1, "value":{ "fullName": "Billy Lagrange", "memberYears": 3, "address": { "country": "Italy", "street": "Viale Monza 5", "city": "Milan" } } }
We can replicate such structures using SQL Studio and the following query:
CREATE TABLE game-sessions( _key.pid int , _key.name string , _key.surname string , _key.age int , points double , sessionMetadata.country string , sessionMetadata.language string ) FORMAT (avro, avro); CREATE TABLE user-details( fullName string , memberYears int , address.country string , address.street string , address.city string ) FORMAT (int, avro);
We can then use SQL Studio again to insert the data we will use in the rest of the tutorial:
INSERT into game-sessions( _key.pid , _key.name , _key.surname , _key.age , points , sessionMetadata.country , sessionMetadata.language ) VALUES (1, 'Billy', 'Lagrange', 35, 5, 'Italy', 'IT'), (1, 'Billy', 'Lagrange', 35, 30, 'Italy', 'IT'), (1, 'Billy', 'Lagrange', 35, 0, 'Italy', 'IT'), (2, 'Maria', 'Rossi', 27, 50, 'Italy', 'IT'), (2, 'Maria', 'Rossi', 27, 10, 'Italy', 'IT'), (3, 'Jorge', 'Escudero', 27, 10, 'Spain', 'ES'), (4, 'Juan', 'Suarez', 22, 80, 'Mexico', 'ES'), (5, 'John', 'Bolden', 40, 10, 'USA', 'EN'), (6, 'Dave', 'Holden', 31, 30, 'UK', 'EN'), (7, 'Nigel', 'Calling', 50, 5, 'UK', 'EN'), (2, 'Maria', 'Rossi', 27, 10, 'UK', 'EN'), (1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT'), (3, 'Jorge', 'Escudero', 27, 16, 'Spain', 'ES'), (4, 'Juan', 'Suarez', 22, 70, 'Mexico', 'ES'), (5, 'John', 'Bolden', 40, 10, 'USA', 'EN'), (6, 'Dave', 'Holden', 31, 50, 'Italy', 'IT'), (6, 'Dave', 'Holden', 31, 70, 'Spain', 'ES'), (2, 'Maria', 'Rossi', 27, 70, 'Italy', 'IT'), (1, 'Billy', 'Lagrange', 35, 50, 'Italy', 'IT') ; INSERT into user-details( _key , fullName , memberYears , address.country , address.street , address.city ) VALUES (1, 'Billy Lagrange', 3, 'Italy', 'Viale Monza 5', 'Milan'), (2, 'Maria Rossi', 1, 'Italy', 'Stazione Termini', 'Rome'), (3, 'Jorge Escudero', 5, 'Spain', '50 Passeig de Gracia', 'Barcelona'), (4, 'Juan Suarez', 0, 'Mexico', 'Plaza Real', 'Tijuana'), (5, 'John Bolden', 2, 'USA', '100 Wall Street', 'New Work'), (6, 'Dave Holden', 1, 'UK', '25 Bishopsgate', 'London'), (7, 'Nigel Calling', 6, 'UK', '10 Queen Anne Street', 'Brighton') ;
Let’s imagine that, given the above data, we are given the following requirements:
games-sessions
games-per-country
memberYears
games-sessions-normalised
We can obtain the above with the following query:
SET defaults.topic.autocreate=true; SET commit.interval.ms='1000'; WITH userDetailsTable AS( SELECT TABLE * FROM user-details ); WITH joinedAndNormalised AS( SELECT STREAM gs.* , ud.memberYears FROM game-sessions AS gs JOIN userDetailsTable AS ud ON (gs._key.pid = ud._key) ); INSERT INTO games-per-country SELECT STREAM COUNT(*) AS gamesPlayed FROM game-sessions GROUP BY sessionMetadata.country; INSERT INTO games-sessions-normalised SELECT STREAM * FROM joinedAndNormalised;
The result of this processor in the UI will be a processor graph similar to the following:
Finally, the content of the output topics games-per-country and games-sessions-normalised can now be inspected in the Lenses Explore screen:
In this tutorial, we learned how to read data from multiple topics, combine it, and process in different ways and save it in as many output topics as needed.
For more details about the specific operations used within this tutorial, please refer to the Aggregations and Joins sections of the user documentation.
Good luck and happy streaming!
On this page