In this article, we will be enriching customer call events with their customer details.
Enriching data streams with extra information by performing an efficient lookup is a common scenario for streaming SQL on Apache Kafka.
Topics involved:
customer_details
customer_call_details
SET defaults.topic.autocreate=true; INSERT INTO customers_callInfo SELECT STREAM calls._value AS call , customer._value AS customer FROM customer_call_details AS calls INNER JOIN (SELECT TABLE * FROM customer_details) AS customer ON customer._key.customer.id = calls._key.customer.id
To simplify our testing process and manage to run the above example in less than 60 seconds, we will be using SQL to create and populate the three Apache Kafka topics:
CREATE TOPIC customer_details
CREATE TABLE customer_details( _key.customer.typeID string , _key.customer.id string , customer.name string , customer.middleName string null , customer.surname string , customer.nationality string , customer.passportNumber string , customer.phoneNumber string , customer.email string null , customer.address string , customer.country string , customer.driverLicense string null , package.typeID string , package.description string , active boolean ) FORMAT(avro, avro) PROPERTIES(partitions=5, replication=1, compacted=true);
POPULATE TOPIC customer_details
INSERT INTO customer_details( _key.customer.typeID , _key.customer.id , customer.name , customer.middleName , customer.surname , customer.nationality , customer.passportNumber , customer.phoneNumber , customer.email , customer.address , customer.country , customer.driverLicense , package.typeID , package.description , active ) VALUES ("userType1","5162258362252394","April","-","Paschall","GBR","APGBR...","1999153354","aprilP@mydomain.com","-","GBR","-","TypeA","Desc.",true), ("internal","5290441401157247","Charisse","-","Daggett","USA","CDUSA...","6418577217","charisseD@mydomain.com","-","USA","-","TypeC","Desc.",true), ("internal","5397076989446422","Gibson","-","Chunn","USA","GCUSA...","8978860472","gibsonC@mydomain.com","-","USA","-","TypeC","Desc.",true), ("partner","5248189647994492","Hector","-","Swinson","NOR","HSNOR...","8207437436","hectorS@mydomain.com","-","NOR","-","TypeA","Desc.",true), ("userType1","5196864976665762","Booth","-","Spiess","CAN","BSCAN...","6220504387","hectorS@mydomain.com","-","CAN","-","TypeA","Desc.",true), ("userType2","5423023313257503","Hitendra","-","Sibert","SWZ","HSSWZ...","6731834082","hitendraS@mydomain.com","-","SWZ","-","TypeA","Desc.",true), ("userType2","5337899393425317","Larson","-","Asbell","SWE","LASWE...","2844252229","larsonA@mydomain.com","-","SWE","-","TypeA","Desc.",true), ("partner","5140590381876333","Zechariah","-","Schwarz","GER","ZSGER...","4936431929","ZechariahS@mydomain.com","-","GER","-","TypeB","Desc.",true), ("internal","5524874546065610","Shulamith","-","Earles","FRA","SEFRA...","2119087327","ShulamithE@mydomain.com","-","FRA","-","TypeC","Desc.",true), ("userType1","5204216758311612","Tangwyn","-","Gorden","GBR","TGGBR...","9172511192","TangwynG@mydomain.com","-","GBR","-","TypeA","Desc.",true), ("userType1","5336077954566768","Miguel","-","Gonzales","ESP","MGESP...","5664871802","MiguelG@mydomain.com","-","ESP","-","TypeA","Desc.",true), ("userType3","5125835811760048","Randie","-","Ritz","NOR","RRNOR...","3245795477","RandieR@mydomain.com","-","NOR","-","TypeA","Desc.",true), ("userType1","5317812241111538","Michelle","-","Fleur","FRA","MFFRA...","7708177986","MichelleF@mydomain.com","-","FRA","-","TypeA","Desc.",true), ("userType1","5373595752176476","Thurborn","-","Asbell","GBR","TAGBR...","5927996719","ThurbornA@mydomain.com","-","GBR","-","TypeA","Desc.",true), ("userType3","5589753170506689","Noni","-","Gorden","AUT","NGAUT...","7288041910","NoniG@mydomain.com","-","AUT","-","TypeA","Desc.",true), ("userType2","5588152341005179","Vivian","-","Glowacki","POL","VGPOL...","9001088901","VivianG@mydomain.com","-","POL","-","TypeA","Desc.",true), ("partner","5390713494347532","Elward","-","Frady","USA","EFUSA...","2407143487","ElwardF@mydomain.com","-","USA","-","TypeB","Desc.",true), ("userType1","5322449980897580","Severina","-","Bracken","AUT","SBAUT...","7552231346","SeverinaB@mydomain.com","-","AUT","-","TypeA","Desc.",true);
CREATE TOPIC customer_call_details
CREATE TABLE customer_call_details( _key.customer.typeID string , _key.customer.id string , callInfoCustomerID string , callInfoType string , callInfoDuration int , callInfoInit int) FORMAT(avro, avro) PROPERTIES(partitions=1, replication=1, compacted=false)
POPULATE TOPIC customer_call_details
INSERT INTO customer_call_details( _key.customer.typeID , _key.customer.id , callInfoCustomerID , callInfoType , callInfoDuration , callInfoInit ) VALUES ("userType1", "5322449980897580","5322449980897580", "CallTypeA", 470, 0), ("internal", "5290441401157247","5290441401157247", "CallTypeC", 67, 0), ("partner", "5140590381876333","5140590381876333", "CallTypeB", 377, 0), ("internal", "5397076989446422","5397076989446422", "CallTypeC", 209, 0), ("userType2", "5337899393425317","5337899393425317", "CallTypeA", 209, 0), ("partner", "5140590381876333","5140590381876333", "CallTypeB", 887, 0), ("userType1", "5322449980897580","5322449980897580", "CallTypeA", 203, 0), ("partner", "5140590381876333","5140590381876333", "CallTypeB", 1698, 0), ("userType3", "5589753170506689","5589753170506689", "CallTypeA", 320, 1), ("internal", "5290441401157247","5290441401157247", "CallTypeC", 89, 0), ("partner", "5140590381876333","5140590381876333", "CallTypeB", 355, 0), ("internal", "5290441401157247","5290441401157247", "CallTypeC", 65, 0), ("userType2", "5337899393425317","5337899393425317", "CallTypeA", 43, 1), ("partner", "5390713494347532","5390713494347532", "CallTypeB", 530, 0), ("internal", "5397076989446422","5397076989446422", "CallTypeC", 270, 0), ("userType3", "5589753170506689","5589753170506689", "CallTypeA", 1633, 0), ("internal", "5290441401157247","5290441401157247", "CallTypeC", 110, 0), ("userType1", "5322449980897580","5322449980897580", "CallTypeA", 540, 0), ("internal", "5290441401157247","5290441401157247", "CallTypeC", 168, 0), ("userType3", "5589753170506689","5589753170506689", "CallTypeA", 1200, 0), ("internal", "5290441401157247","5290441401157247", "CallTypeC", 1200, 0), ("partner", "5390713494347532","5390713494347532", "CallTypeB", 22, 0), ("userType3", "5589753170506689","5589753170506689", "CallTypeA", 333, 1), ("internal", "5397076989446422","5397076989446422", "CallTypeC", 87, 0), ("partner", "5390713494347532","5390713494347532", "CallTypeB", 123, 0), ("userType2", "5337899393425317","5337899393425317", "CallTypeA", 182, 1), ("partner", "5140590381876333","5140590381876333", "CallTypeB", 844, 0), ("partner", "5390713494347532","5390713494347532", "CallTypeB", 56, 1), ("internal", "5397076989446422","5397076989446422", "CallTypeC", 36, 0), ("partner", "5140590381876333","5140590381876333", "CallTypeB", 794, 0), ("userType3", "5589753170506689","5589753170506689", "CallTypeA", 440, 0), ("internal", "5397076989446422","5397076989446422", "CallTypeC", 52, 0), ("userType1", "5322449980897580","5322449980897580", "CallTypeA", 770, 0), ("internal", "5397076989446422","5397076989446422", "CallTypeC", 627, 0), ("partner", "5140590381876333","5140590381876333", "CallTypeB", 555, 0), ("userType2", "5337899393425317","5337899393425317", "CallTypeA", 55, 1);
SELECT p.callInfoCustomerID AS customerID , p.callInfoType , p.callInfoInit FROM customer_call_details AS p INNER JOIN customer_details AS c ON p._key.customer.id = c._key.customer.id
On this page