We want to automate the process of creating Kafka topics and their schema. How can I do that?
Lenses allows Kafka topic management, including their schema. When it comes to the schema, the Avro storage schema is also registered with the Schema Registry connection provided to Lenses. For JSON, CSV, or XML schemaless storage format, Lenses attempts to infer their schema, but Schema Registry is not involved. It enhances the Explore screen user experience and allows building stream analytics pipelines using SQL processors.
Here is the API to call to create a topic and attach a schema to it.
POST /api/v1/kafka/topic
Request body
{ "name" : "$YOUR_TOPIC", "replication" : 1, "partitions" : 1, "configs" : { "$KAKFA_TOPIC_CONFIG_KEY" : "$VALUE" }, "format" : { "key" : { "format" : "INT/STRING/LONG/AVRO/JSON/CSV/XML", "schema" : "$AVRO_SCHEMA" }, "value" : { "format" : "INT/STRING/LONG/AVRO/JSON/CSV/XML", "schema" : "$AVRO_SCHEMA" } } }
We have an input topic, let’s call it ‘input,’ the topic message key is text(i.e., STRING), and the message value is an Avro with this schema:
{ "type": "record", "name": "ConnectDefault", "namespace": "io.lenses.connect.avro", "fields": [ { "name": "EVENTID", "type": "string" }, { "name": "EVENTDATETIME", "type": "string" }, { "name": "CLIENTID", "type": "string" }, { "name": "CUSTOMERCATEGORY", "type": "string" }, { "name": "CUSTOMERID", "type": "string" }, { "name": "ENTITYID", "type": "string" }, { "name": "EVENTCORRELATIONID", "type": [ "null", "string" ], "default": null }, { "name": "EVENTSOURCE", "type": "string" }, { "name": "SUPERPRODUCTID", "type": "string" }, { "name": "TENANTID", "type": "string" }, { "name": "TREATMENTCODE", "type": [ "null", "string" ], "default": null }, { "name": "SAPREASONCODE", "type": "string" }, { "name": "TRXAMOUNT", "type": "string" }, { "name": "TRXCODE", "type": "string" }, { "name": "TRXDESCRIPTION", "type": "string" }, { "name": "SourceType", "type": [ "null", "string" ], "default": null } ] }
Navigate to the Admin section using the top bar navigation. From the left navigation system, navigate to Groups and add this group.
From there follow the left navigation system to go to Service accounts page, and add a new entry:
Grab the token provided for the service account; step 3 will use it.
{ "name": "input", "replication": 1, "partitions": 3, "configs": {}, "format": { "key": { "format": "STRING" }, "value": { "format": "AVRO", "schema": "{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.lenses.connect.avro\",\"fields\":[{\"name\":\"EVENTID\",\"type\":\"string\"},{\"name\":\"EVENTDATETIME\",\"type\":\"string\"},{\"name\":\"CLIENTID\",\"type\":\"string\"},{\"name\":\"CUSTOMERCATEGORY\",\"type\":\"string\"},{\"name\":\"CUSTOMERID\",\"type\":\"string\"},{\"name\":\"ENTITYID\",\"type\":\"string\"},{\"name\":\"EVENTCORRELATIONID\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"EVENTSOURCE\",\"type\":\"string\"},{\"name\":\"SUPERPRODUCTID\",\"type\":\"string\"},{\"name\":\"TENANTID\",\"type\":\"string\"},{\"name\":\"TREATMENTCODE\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"SAPREASONCODE\",\"type\":\"string\"},{\"name\":\"TRXAMOUNT\",\"type\":\"string\"},{\"name\":\"TRXCODE\",\"type\":\"string\"},{\"name\":\"TRXDESCRIPTION\",\"type\":\"string\"},{\"name\":\"SourceType\",\"type\":[\"null\",\"string\"],\"default\":null}]}" } } }
Paste the content after running this command
cat > body.json [Paste the JSON above] [Hit Enter then CTRL+C]
Lenses authentication and authorization require a token passed via X-Kafka-Lenses-Token. Since we created the service account entry, this token is already available.
X-Kafka-Lenses-Token.
curl --header "Content-Type: application/json" \ --header "X-Kafka-Lenses-Token: demo-sa:48cd653d-1889-42b6-9f38-6e5116db691a" \ -i -X POST \ --data @body.json \ http://localhost:24015/api/v1/kafka/topic
On successful call there will be an input Kafka topic, and a input-value Schema Registry subject entry.
input
input-value
On this page