In this tutorial, we will see how to automate the process for creating SQL processors.
The requirement is to automate an SQL processor’s deployment, including creating the topic and associated data schema. We have an input topic, let’s call it ‘input,’ the topic message key is text(i.e., STRING), and the message value is an Avro with this schema:
{ "type": "record", "name": "ConnectDefault", "namespace": "io.lenses.connect.avro", "fields": [ { "name": "EVENTID", "type": "string" }, { "name": "EVENTDATETIME", "type": "string" }, { "name": "CLIENTID", "type": "string" }, { "name": "CUSTOMERCATEGORY", "type": "string" }, { "name": "CUSTOMERID", "type": "string" }, { "name": "ENTITYID", "type": "string" }, { "name": "EVENTCORRELATIONID", "type": ["null", "string"], "default": null }, { "name": "EVENTSOURCE", "type": "string" }, { "name": "SUPERPRODUCTID", "type": "string" }, { "name": "TENANTID", "type": "string" }, { "name": "TREATMENTCODE", "type": ["null", "string"], "default": null }, { "name": "SAPREASONCODE", "type": "string" }, { "name": "TRXAMOUNT", "type": "string" }, { "name": "TRXCODE", "type": "string" }, { "name": "TRXDESCRIPTION", "type": "string" }, { "name": "SourceType", "type": ["null", "string"], "default": null } ] }
Then we have the SQL processor code to rename and re-align the fields:
SET defaults.topic.autocreate=true; INSERT INTO output1 SELECT STREAM EVENTID as EventID , EVENTDATETIME as EventDateTime , EVENTSOURCE as EventSource , TENANTID as TenantId , ENTITYID as EntityId , CUSTOMERID as CustomerID , CLIENTID as ClientID , SUPERPRODUCTID as SuperProductID , CUSTOMERCATEGORY as CustomerCategory , SAPREASONCODE as SAPReasonCode , TRXAMOUNT as TrxAmount , TRXDESCRIPTION as TrxDescription , TREATMENTCODE as TreatmentCode , SourceType FROM input
Navigate to the Admin section using the top bar navigation. From the left navigation system, navigate to Groups and add this group.
From there follow the left navigation system to go to Service accounts page, and add a new entry:
Grab the token provided for the service account; step 3 will use it.
Lenses SQL processors require schema even if they are not Avro. Creating a topic that uses AVRO for the message key or value will first register it with Schema Registry.
First, let’s create the request payload. Save this JSON to a file named body.json
{ "name": "input", "replication": 1, "partitions": 3, "configs": {}, "format": { "key": { "format": "STRING" }, "value": { "format": "AVRO", "schema": "{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.lenses.connect.avro\",\"fields\":[{\"name\":\"EVENTID\",\"type\":\"string\"},{\"name\":\"EVENTDATETIME\",\"type\":\"string\"},{\"name\":\"CLIENTID\",\"type\":\"string\"},{\"name\":\"CUSTOMERCATEGORY\",\"type\":\"string\"},{\"name\":\"CUSTOMERID\",\"type\":\"string\"},{\"name\":\"ENTITYID\",\"type\":\"string\"},{\"name\":\"EVENTCORRELATIONID\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"EVENTSOURCE\",\"type\":\"string\"},{\"name\":\"SUPERPRODUCTID\",\"type\":\"string\"},{\"name\":\"TENANTID\",\"type\":\"string\"},{\"name\":\"TREATMENTCODE\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"SAPREASONCODE\",\"type\":\"string\"},{\"name\":\"TRXAMOUNT\",\"type\":\"string\"},{\"name\":\"TRXCODE\",\"type\":\"string\"},{\"name\":\"TRXDESCRIPTION\",\"type\":\"string\"},{\"name\":\"SourceType\",\"type\":[\"null\",\"string\"],\"default\":null}]}" } } }
Paste the content after running this command
cat > body.json [Paste the JSON above] [Hit Enter then CTRL+C]
Lenses authentication and authorization require a token passed via X-Kafka-Lenses-Token. Since we created the service account entry, this token is already available.
X-Kafka-Lenses-Token.
curl --header "Content-Type: application/json" \ --header "X-Kafka-Lenses-Token: demo-sa:48cd653d-1889-42b6-9f38-6e5116db691a" \ -i -X POST \ --data @body.json \ http://localhost:24015/api/v1/kafka/topic
It is assumed the SQL processor deployment mode is IN_PROC, the development mode. Lenses supports KUBERNETES for its production mode. For more details about the other modes, see SQL details.
IN_PROC
KUBERNETES
Before we issue the CURL command, let’s create the request body JSON.
{ "name": "myProcessor", "runnerCount": 1, "sql": "SET defaults.topic.autocreate=true;\n\nINSERT INTO output1\nSELECT STREAM \n EVENTID as EventID\n , EVENTDATETIME as EventDateTime\n , EVENTSOURCE as EventSource\n , TENANTID as TenantId\n , ENTITYID as EntityId\n , CUSTOMERID as CustomerID\n , CLIENTID as ClientID\n , SUPERPRODUCTID as SuperProductID\n , CUSTOMERCATEGORY as CustomerCategory\n , SAPREASONCODE as SAPReasonCode\n , TRXAMOUNT as TrxAmount\n , TRXDESCRIPTION as TrxDescription\n , TREATMENTCODE as TreatmentCode\n , SourceType\nFROM input", "settings": {} }
cat > processor.json [Paste the json above] [Hit Enter then CTRL+C]
curl --header "Content-Type: application/json" \ --header "X-Kafka-Lenses-Token: demo-sa:48cd653d-1889-42b6-9f38-6e5116db691a" \ -i -X POST \ --data @processor.json \ http://localhost:24015/api/v1/streams
Once the last curl command is executed, Lenses will have the processor ready for processing your data:
In this tutorial, you learned how to automate your data pipelines and even integrate your CI/CD with Lenses to create your SQL processors automatically.
Good luck and happy streaming!
On this page