Filtering messages and copying them to a topic can be achieved using the WHERE clause.
In our example, we have a topic where our application registers bank transactions.
We have a topic called payments where records have this shape:
payments
KEY: "6A461C60-02F3-4C01-94FB-092ECBDE0837" VALUE: { "amount": "12.10", "currency": "EUR", "from_account": "xxx", "to_account": "yyy", "time": 1591970345000 }
We can replicate such a structure running the following query in SQL Studio:
CREATE TABLE payments( amount double , currency string , from_account string , to_account string , time datetime ) FORMAT (string, avro);
Each event has a unique string key generated by the upstream system.
We can again use SQL Studio to insert some data to play with:
INSERT INTO payments( _key , amount , currency , from_account , to_account , time ) VALUES ("6A461C60-02F3-4C01-94FB-092ECBDE0837", 10, "EUR", "account-1", "account-2", 1590970345000), ("E5DA60E8-F622-43B2-8A93-B958E01E8AB3", 100000, "EUR", "account-1", "account-3", 1591070346000), ("0516A309-FB2B-4F6D-A11F-3C06A5D64B68", 5300, "USD", "account-2", "account-3", 1591170347000), ("0871491A-C915-4163-9C4B-35DEA0373B41", 6500, "EUR", "account-3", "account-1", 1591270348000), ("2B557134-9314-4F96-A640-1BF90887D846", 300, "EUR", "account-1", "account-4", 1591370348000), ("F4EDAE35-45B4-4841-BAB7-6644E2BBC844", 3400, "EUR", "account-2", "account-1", 1591470349000), ("F51A912A-96E9-42B1-9AC4-42E923A0A6F8", 7500, "USD", "account-2", "account-3", 1591570352000), ("EC8A08F1-75F0-49C8-AA08-A5E57997D27A", 2500000, "USD", "account-1", "account-3", 1591670356000), ("9DDBACFF-D42B-4042-95AC-DCDD84F0AC32", 1000, "GBP", "account-2", "account-3", 1591870401000) ;
Let’s assume we need to detect significant transactions that will be then fed into our anti-fraud system.
We want to copy those transactions into a new topic, maintaining the content of the records as it is.
For our first example, we will use a simple predicate to filter transactions with an amount larger than 5000, regardless of the currency.
Lenses SQL supports all the common comparison operators to compare values, so for our goal it is enough to use a WHERE statement with a >= condition:
WHERE
>=
SET defaults.topic.autocreate=true; INSERT INTO big_payments SELECT STREAM * FROM payments WHERE amount >= 5000
Checking the records emitted by the processor, we see that we got the transactions we were looking for.
Because of the * projection, records content has not changed.
*
KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3" VALUE: { amount:100000, ... } ------------------------------------ KEY:"0516A309-FB2B-4F6D-A11F-3C06A5D64B68" VALUE: { amount:5300, ... } ------------------------------------ KEY:"0871491A-C915-4163-9C4B-35DEA0373B41" VALUE: { amount:6500, ... } ------------------------------------ KEY:"F51A912A-96E9-42B1-9AC4-42E923A0A6F8" VALUE: { amount:7500, ... } ------------------------------------ KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A" VALUE: { amount:2500000, ... }
Not all currencies are the same, so we would like to add a specific threshold for each currency. As a first cut, we combine multiple conditions with ANDs and ORs:
AND
OR
SET defaults.topic.autocreate=true; INSERT INTO big_eur_usd_payments SELECT STREAM * FROM payments WHERE (amount >= 5000 AND currency = 'EUR') OR (amount >= 5500 AND currency = 'USD')
As an improvement, we want to capture the threshold for each currency in a single expression. We will use a CASE statement for that:
CASE
SET defaults.topic.autocreate=true; INSERT INTO big_payments_case SELECT STREAM * FROM payments WHERE amount >= (CASE WHEN currency = 'EUR' THEN 5000 WHEN currency = 'USD' THEN 5500 WHEN currency = 'GBP' THEN 4500 ELSE 5000 END)
getting the results:
KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3" VALUE: { amount:100000, currency:"EUR", ... } ------------------------------------ KEY:"0871491A-C915-4163-9C4B-35DEA0373B41" VALUE: { amount:6500, currency:"EUR", ... } ------------------------------------ KEY:"F51A912A-96E9-42B1-9AC4-42E923A0A6F8" VALUE: { amount:7500, currency:"USD", ... } ------------------------------------ KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A" VALUE: { amount:2500000, currency:"USD", ... }
In this section, we will find all the transactions that happened during the (UTC) night. To do that we can use one of our many date and time functions.
You will also see how to use a CAST expression to convert from one type to another.
CAST
SET defaults.topic.autocreate=true; INSERT INTO night_payments SELECT STREAM * FROM payments WHERE CAST(HOUR(time) AS INT) >= 0 AND CAST(HOUR(time) AS INT) <= 5
Checking the output, we can see that only one transaction satisfied our predicate:
KEY:"6A461C60-02F3-4C01-94FB-092ECBDE0837" VALUE: { amount:10, currency:"EUR", time:1590970345000, ... } ------------------------------------ KEY:"E5DA60E8-F622-43B2-8A93-B958E01E8AB3" VALUE: { amount:100000, currency:"EUR", time:1591070346000, ... } ------------------------------------ KEY:"EC8A08F1-75F0-49C8-AA08-A5E57997D27A" VALUE: { amount:2500000, currency:"USD", time:1591670356000, ... }
Let’s imagine that we have to build some intelligence around all the payments we process, but we do not have the capacity and the need to process all of them.
We decided then to build a reduced copy of the payments topic, with only 10% of the original records.
To do that we are going to use our RANDINT function:
RANDINT
SET defaults.topic.autocreate=true; INSERT INTO payments_sample SELECT STREAM * FROM payments WHERE CAST(ABS(RANDINT()) AS DOUBLE) / 2147483647 < 0.01
RANDINT generates a random integer, we take its absolute value to make sure it is positive, and we then normalise the result dividing it by the max possible integer, getting an (almost) uniform sample of numbers between 0 and 1.
We have to CAST to double on the way; otherwise, the division would be between integers, and the result would always be 0.
double
0
On this page