I need to publish a Kafka record to a Kafka topic.
Sometimes there is the need to publish data to a topic. Maybe a previous record contained incorrect information or was missing data, and downstream systems have been impacted. Inserting one record with the correct information would correct the problems. Or maybe, one control message will trigger a data workflow. Finally, using the Kafka default command-line tools is a bit too much for your data stewards, business analysts, developers, or data scientists. You want a flexible RBAC system and audit out of the box.
Writing SQL to handle the insert using SQL studio is one option to publish the data. However, suppose the JSON representation for my data (quite an often scenario) is available, and it is a complex structure. In that case, this functionality can speed up your task.
We assume a topic named myTopic already exists and has the permission to publish data to it. Furthermore, the topic is configured to retain records with Json keys and AVRO values. The Avro schema is set to:
myTopic
{ "type": "record", "name": "milanoRecord", "namespace": "io.lenses.telecom.telecomitalia.grid", "doc": "Schema for Grid for Telecommunications Data from Telecom Italia.", "fields": [ { "name": "SquareId", "type": "int", "doc": " The id of the square that is part of the Milano GRID" }, { "name": "Polygon", "type": { "type": "array", "items": { "type": "record", "name": "coordinates", "fields": [ { "name": "longitude", "type": "double" }, { "name": "latitude", "type": "double" } ] } } } ] }
We now want to insert a record to this topic with this content:
{ "SquareId": 51 }
{ "SquareId": 51, "Polygon": [ { "longitude": "9.161512740279838", "latitude": "45.35868769924141" }, { "longitude": "9.164513162265347", "latitude": "45.358683417093154" }, { "longitude": "9.164507032776845", "latitude": "45.3565681059862" }, { "longitude": "9.161506722580903", "latitude": "45.35657238782037" }, { "longitude": "9.161512740279838", "latitude": "45.35868769924141" } ] }
To do so, navigate to the Explore screen and then to the details page for the topic myTopic. Follow the Actions button and chose the option Insert Messages. The following dialog appears, prompting the user for the data to insert.
Actions
Insert Messages
The input accepts an array of records, but for now, we would insert just one. Use the following JSON as the data content and click the Insert Messages button:
[ { "key": { "SquareId": 51 }, "value": { "SquareId": 51, "Polygon": [ { "longitude": "9.161512740279838", "latitude": "45.35868769924141" }, { "longitude": "9.164513162265347", "latitude": "45.358683417093154" }, { "longitude": "9.164507032776845", "latitude": "45.3565681059862" }, { "longitude": "9.161506722580903", "latitude": "45.35657238782037" }, { "longitude": "9.161512740279838", "latitude": "45.35868769924141" } ] } } ]
Reload the page, and the following record appears on the Data tab:
Data
Lenses has taken the JSON input and validated the key content is valid. For the record Value, it used the Avro schema to transform the input to Avro before it published it. Any misalignment with the Avro schema will yield an error back to the user. Therefore, the data will not be published to the Kafka topic.
The input JSON should omit the key or value fields to insert records with nullable keys or values. The following content will insert a record with a nullable value.
key
value
[ { "key": { "SquareId": 51 } } ]
To insert a record with a nullable key requires the input JSON to omit the key field. If the target topic is compacted, inserting nullable keys are rejected.
[ { "value": { "SquareId": 51, "Polygon": [ { "longitude": "9.161512740279838", "latitude": "45.35868769924141" }, { "longitude": "9.164513162265347", "latitude": "45.358683417093154" }, { "longitude": "9.164507032776845", "latitude": "45.3565681059862" }, { "longitude": "9.161506722580903", "latitude": "45.35657238782037" }, { "longitude": "9.161512740279838", "latitude": "45.35868769924141" } ] } } ]
Lenses will allow inserting primitive types for a Kafka record like String, Integer, Long. It does not support inserting records when the target topic storage involves Bytes, Xml, or Time-window types (which result from aggregation).
At times, there is a requirement to publish records containing headers. Adding them to a record requires a header field in the input provided to the insert messages dialog.
header
Publishing headers is optional and is triggered by the presence of the headers field.Here are the rules for inserting headers:
headers
[ { "headers": { "h1": "...", "h2": "..." }, "key" : "...", "value": "..." } ]
[ { "headers": { "h1": "1" } } ]
[ { "headers": { "h1": "1.1" }, "..." } ]
[ { "headers": { "h1": "null" }, "..." } ]
[ { "headers": { "h1": { "a": "..." }, "h2": [ { "x": "..." } ] }, "..." } ]
If the rules above are not met, then an error will be returned.
Here is an example for all the supported headers features:
[ { "headers":{ "nullable": null, "int": 1, "double": 1.1, "text" : "example text", "boolean": true, "nested1": { "x": 1, "y": "example test", "z": { "a": 2 } }, "nested2": [ { "x": 1, "y": "example test" }, { "x": 2, "y": "example test" } ] }, "key": { "SquareId": 51 }, "value": { "SquareId": 51, "Polygon": [ { "longitude": "9.161512740279838", "latitude": "45.35868769924141" }, { "longitude": "9.164513162265347", "latitude": "45.358683417093154" }, { "longitude": "9.164507032776845", "latitude": "45.3565681059862" }, { "longitude": "9.161506722580903", "latitude": "45.35657238782037" }, { "longitude": "9.161512740279838", "latitude": "45.35868769924141" } ] } } ]
At the moment, headers have these types: double, longs are rendered as a String, a functionality gap that will be closed soon.
On this page