This example shows a running windowed calculation. It continuously aggregates credit card transaction attempts where the card has been blocked.
With the following SQL function, we will aggregate and analyze streaming events from Apache Kafka.
SET defaults.topic.autocreate=true; SET commit.interval.ms='10000'; WITH tableCards AS ( SELECT TABLE * FROM example_cc_data WHERE currency = 'EUR' ); WITH stream AS ( SELECT STREAM p.currency, p.amount FROM example_cc_payments AS p INNER JOIN tableCards AS c ON p._key = c._key WHERE c.blocked = true ); INSERT INTO example_join SELECT STREAM currency AS currency , sum(amount) AS total , count(*) AS usage FROM stream WINDOW BY TUMBLE 5s GROUP BY currency
The streaming topology that we want to achieve is effectively the bellow:
Underling Kafka Streams application which continuously executes the code looks like:
Let us have a look at the above streaming SQL.
Apart from INSERT, which is self-explanatory, we have
INSERT
The SET autocreate=true; at the beginning of the SQL allows Lenses to create the product topic defined after INSERT if it does not exist. We are also using WITH <tableName> to create a new table with only the _key & blocked fields from example_cc_data topic, which in this example is our user’s dictionary. We also filter only records that have EUR as the currency.
SET autocreate=true;
WITH <tableName>
_key
blocked
example_cc_data
EUR
The example_cc_payments topic is our source of payment events. In the SELECT STREAM, we define a stream that does an INNER JOIN between exaple_cc_payments and tableCards. All records within 5 seconds tumble window that belongs to a customer with blocked status are grouped and written to the exaple_join topic.
example_cc_payments
SELECT STREAM
INNER JOIN
exaple_cc_payments
tableCards
exaple_join
To simplify our testing process and to run the above example in less than 60 seconds, we will be using SQL to create and populate the Apache Kafka topics:
CREATE TOPIC example_cc_data
CREATE TABLE example_cc_data ( _key string , number string , customerFirstName string , customerLastName string , country string , currency string , blocked boolean ) FORMAT(string, avro) PROPERTIES(partitions=2, replication=1, compacted=true);
When run in SQL Studio, the above SQL will create a topic called example_cc_data with two partitions, replication factor of one and compaction set to true.
We also define the structure of the topic. The serde for the records Value is set to Avro and serde for the Key to string. For the Avro, we should note that Lenses also takes care of the Schema for us (given that a schema registry has been configured with Lenses). If you navigate to the schema registry view in Lenses, you will notice that a schema example_cc_data-value has been created and has the exact structure as the one we defined above.
example_cc_data-value
POPULATE TOPIC example_cc_data
INSERT INTO example_cc_data( _key , number , customerFirstName , customerLastName , country , currency , blocked ) VALUES ("5162258362252394","5162258362252394","April","Paschall","GBR","GBP",false), ("5290441401157247","5290441401157247","Charisse","Daggett","USA","USD",false), ("5397076989446422","5397076989446422","Gibson","Chunn","USA","USD",true), ("5248189647994492","5248189647994492","Hector","Swinson","NOR","NOK",false), ("5196864976665762","5196864976665762","Booth","Spiess","CAN","CAD",false), ("5423023313257503","5423023313257503","Hitendra","Sibert","SWZ","CHF",false), ("5337899393425317","5337899393425317","Larson","Asbell","SWE","SEK",false), ("5140590381876333","5140590381876333","Zechariah","Schwarz","GER","EUR",true), ("5524874546065610","5524874546065610","Shulamith","Earles","FRA","EUR",true), ("5204216758311612","5204216758311612","Tangwyn","Gorden","GBR","GBP",false), ("5336077954566768","5336077954566768","Miguel","Gonzales","ESP","EUR",true), ("5125835811760048","5125835811760048","Randie","Ritz","NOR","NOK",true), ("5317812241111538","5317812241111538","Michelle","Fleur","FRA","EUR",true), ("5373595752176476","5373595752176476","Thurborn","Asbell","GBR","GBP",true), ("5589753170506689","5589753170506689","Noni","Gorden","AUT","EUR",false), ("5588152341005179","5588152341005179","Vivian","Glowacki","POL","EUR",false), ("5390713494347532","5390713494347532","Elward","Frady","USA","USD",true), ("5322449980897580","5322449980897580","Severina","Bracken","AUT","EUR",true);
The above SQL will populate the topic we created before we some data. We should note that you can use Lenses API to insert records as easy as we did above. This way, your cronjob scripts, and other background processes can call an API and write data to Apache Kafka without requiring you to set up a Kafka producer.
CREATE TOPIC example_cc_payments
CREATE TABLE example_cc_payments ( id string , amount decimal , currency string , creditCardId string ) FORMAT(string, avro) PROPERTIES(partitions=1, replication=1, compacted=false);
The above SQL creates example_cc_payments topic, which will store all payment events. As you can see, the topic is not compacted. A single customer can make multiple payments, but a customer should exist only once in our topic (example_cc_data).
POPULATE TOPIC example_cc_payments
INSERT INTO example_cc_payments ( _key , id , amount , currency , creditCardId ) VALUES ("5373595752176476", "txn1", 100.2, "GBP", "5373595752176476"), ("5322449980897580", "txn2", 51.2, "EUR", "5322449980897580"), ("5140590381876333", "txn2", 389.1, "EUR", "5140590381876333"), ("5589753170506689", "txn1", 11.2, "GBP", "5589753170506689"), ("5336077954566768", "txn2", 559.8, "EUR", "5336077954566768"), ("5337899393425317", "txn2", 221.50, "SEK", "5337899393425317");
Same logic as the previews INSERT example.
Let’s use SQL to validate the messages:
SELECT customerLastName , customerFirstName , number , blocked FROM example_cc_data WHERE blocked = true
In the testing data 8 out of the 18 credit card transactions have the card blocked:
With the topics and data available, the SQL processor can be created and started.
On this page