JSON is a widely used way to represent and store data. However, it is less than ideal for handling data consistency and data quality on its own. Using JSON alone, there is no description of the data shape: fields and their types. Nothing stops changing the type of a field between two consecutive messages. Imagine the first message with a field called tag and a string value and then a second message with the same field but this time a number.
tag
{ "tag" : "this-is-awesome" }
{ "tag" : 123 }
Fields with null values increase complexity. More often than not, fields that contain null are not present in the payload.When it comes to converting JSON to AVRO, there needs to be some more attention to fields that can be null.
AVRO is a self describing data representation, where the existence or nullability of a given field it’s known via its schema and not via checking the next messages. Avoiding having different resulting AVRO schemas, just because the input JSON varies, should be a well-established goal.
Lenses learns about your topics and infers their schema. This process, at the moment, it is not continuous for performance reasons. Repeatedly reading the topic data will impact the Kafka cluster performance. Inferring the schema is done based on data sampling and not by reading the entire topic data and merging all the schemas identified.
When it comes to JSON, understanding the full schema is never going to be perfect. Here are a few scenarios to highlight the limitations of JSON.
Imagine your topic at the moment contains messages with this payload structure:
{ "deviceid" : "iot123", "temp" : 54.98, "humidity" : 32.43, "coords" : { "latitude" : 47.615694, "longitude" : -122.3359976 } }
Then a few days later, some messages received contain the structure below. By then, Lenses has inferred what was available, and therefore this new field tags will not appear.
tags
{ "deviceid" : "iot123", "temp" : 54.98, "humidity" : 32.43, "coords" : { "latitude" : 47.615694, "longitude" : -122.3359976 }, "tags" : "tag1,tag2" }
When processing the topic data via Lenses SQL streaming, the tags field needs to be added by the user manually. Later in the article, you will see how.
Another challenge is when the messages contain all the possible fields. Let us flip the earlier example. This time, all the messages contain the field tags. A few days later, some messages arrive without a tag. The tags field schema has been identified as string but not nullable by then.
tags.
string
When processing the topic data via SQL streaming, the topic schema in Lenses needs to be manually updated for the field tags to highlight it is nullable. With the examples below, you will see how.
I have a topic with messages, some of which contain the field tags, some of which do not. When I try to use the field in a SQL processor, Lenses gives me this error:
Cannot create the Lenses SQL Processor.Unknown field [tags]. Available fields are:deviceid,temp,humidity,coords.
What can I do to get this working?
It looks like the tags field has not been identified by Lenses schema inference. As explained already on schema inference and its challenges, it can be a real scenario. The user should update the schema Lenses has to match the latest changes.
First, let’s recreate the context.
We first create the topic and insert the data. To do so, navigate to the Lenses SQL Studio screen, run the queries below.
CREATE TABLE iot_1( _key string , deviceid string , temp double , humidity double , coords.latitude double , coords.longitude double ) FORMAT(string, json); INSERT INTO iot_1( _key , deviceid , temp , humidity , coords.latitude , coords.longitude ) VALUES ("iot123", "iot123", 54.98, 32.43, 47.615694, -122.3359976);
To check the data run this query:
select * from iot_1;
So far, we replicated the scenario where you have at least one producer pushing data to iot_1 and Lenses identified the schema. Next, let’s push a new message to contain a new field tags. To do so, navigate to the iot_1 topic details. Chose “Insert Messages” from the Actions drop down. Paste the following in the dialog presented:
iot_1
Actions
[ { "key": "iot456", "value": { "deviceid": "iot456", "temp": 54.18, "humidity": 33.03, "coords": { "latitude": 47.615094, "longitude": -122.3356976 }, "tags": "tag1,tag2" } } ]
Next, we want to create a processor using the field tags.
SET defaults.topic.autocreate=true; INSERT INTO iot_1_processed STORE VALUE AS AVRO SELECT STREAM deviceid, tags FROM iot_1
Trying to run this replicates the error reported.
To get your SQL processor working, you need to manually update the schema Lenses inferred to contain the field tags. Navigate to the iot_1 topic details and then to the schema tab. From the actions, chose Edit schemas and paste the following:
Edit schemas
{ "type": "record", "name": "record0", "fields": [ { "name": "deviceid", "type": "string" }, { "name": "temp", "type": "double" }, { "name": "humidity", "type": "double" }, { "name": "coords", "type": { "type": "record", "name": "record", "fields": [ { "name": "latitude", "type": "double" }, { "name": "longitude", "type": "double" } ] } }, { "name": "tags", "type": ["null","string"] } ] }
The change is this:
{ "name": "tags", "type": ["null","string"] }
Save the change and go back to the SQL processor and this time, it allows you to reference the tags field and thus runs successfully.
I am trying to convert my data to AVRO, but I get this error on my SQL processor:
Error serializing Avro message Caused by: java.lang.NullPointerException: null of ...
Lenses has identified the tags field (let’s consider this the field in action), but not as one which contains null values. As explained already, this is not possible to always pick up during schema inference. Before jumping to the solution, let’s recreate the context.
We start this tutorial by creating a topic and insert data. To do so, navigate to the Lenses SQL Studio screen, and run the queries below.
The next step is to push data as it would come from producers.
CREATE TABLE iot_2( _key string , deviceid string , temp double , humidity double , coords.latitude double , coords.longitude double , tags string ) FORMAT(string, json); INSERT INTO iot_2( _key , deviceid , temp , humidity , coords.latitude , coords.longitude , tags ) VALUES ("iot123", "iot123", 54.98, 32.43, 47.615694, -122.3359976, "tag1,tag2");
Let’s check the data by running this query:
select * from iot_2;
Next, let’s insert a message without the tags field. Navigate to the iot_2 topic details and from the Actions option chose Insert Messages. Paste the content below - notice that there is no field named tags:
iot_2
Insert Messages
[ { "key": "iot456", "value": { "deviceid": "iot456", "temp": 54.18, "humidity": 33.03, "coords": { "latitude": 47.615094, "longitude": -122.3356976 } } } ]
Now, let’s create a processor which converts to Avro:
SET defaults.topic.autocreate=true; INSERT INTO iot_2_fails STORE VALUE AS AVRO SELECT STREAM deviceid , tags FROM iot_2
The processor will fail when it gets to process the second message because now it has null for the tags field, but the schema says it does not allow null values.
There are two ways to address this.
If you do not know if the field will always be present in the future, you can write the query to make it nullable on the output topic. The solution is to mark the input field as nullable, regardless of its schema. See as_nullable(tags) in the SQL below:
as_nullable(tags)
SET defaults.topic.autocreate=true; INSERT INTO iot_2_as_nullable STORE VALUE AS AVRO SELECT STREAM deviceid , as_nullable(tags) as tags FROM iot_2
The resulting topic schema will be:
{ "type": "record", "name": "record", "fields": [ { "name": "deviceid", "type": "string" }, { "name": "tags", "type": [ "null", "string" ] } ] }
The output topic will contain two records, with the last entry having a null tags.
To fix the processor, you need to update the source topic schema to mark the tags field nullable. Navigate to iot_2 topic, and then on the schema tab. From the Actions options, choose edit schema, and paste the following content for the value.
Here is how it looks from the Lenses UI:
Save the change and go back to the SQL processors page.
Use this code to create a new processor:
SET defaults.topic.autocreate=true; INSERT INTO iot_2_works STORE VALUE AS AVRO SELECT STREAM deviceid , tags FROM iot_2
I receive JSON messages, I don’t know the entire schema, but I want to process the fields that I care about only. How can that work without the schema for the entire JSON?
To handle stream processing you only need Lenses to know about the fields you target.
Consider a scenario where these type of messages are received from the upstream system
{ "InvoiceNumber": "GBPGB011", "TotalSalePrice": 89000, "SaleDate": "2015-04-30T00:00:00", "Customer": { "Name": "Wonderland Wheels", "CreditRisk": false, "Reseller": true, "..." }, "Address": { "First Line" : "21A", "Second Line" : "Buckingham Palace Road", "Town": "London", "PostCode": "E7 4BR", "CountryName": "United Kingdom", "CountryISO": "GBR ", "..." } , "CustomerComments": [ "Excellent", "Wonderful", "Superb" ], "Salesdetails": [ { "LineItem": 1, "Make": "Porsche", "Model": "944", "SellingPrice": 8500, "...", }, { "LineItem": 2, "Make": "Bentley", "Model": "Flying Spur", "...", } ], "id": "f58a70dc-f107-d3ba-acda-02f39893eb44", "_rid": "molfALBK0z8BAAAAAAAAAA==", "_etag": "00000000-0000-0000-c2fa", "_attachments": "attachments/", "_ts": 1549993225, "..." }
Imagine these fields are required:
To process the data it’s enough to only have the schema covering the fields above and not the entire set of possible fields. Then the topic schema should be:
{ "type" : "record", "name" : "Invoice", "namespace" : "io.lenses.core.sql.common.metrics", "fields" : [ { "name" : "InvoiceNumber", "type" : "string" }, { "name" : "SalesDate", "type" : "string" }, { "name" : "Address", "type" : { "type" : "record", "name" : "Address", "fields" : [ { "name" : "FirstLine", "type" : [ "null", "string" ] }, { "name" : "SecondLine", "type" : [ "null", "string" ] }, { "name" : "Town", "type" : "string" }, { "name" : "PostCode", "type" : "string" }, { "name" : "CountryName", "type" : "string" } ] } }, { "name" : "salesDetails", "type" : { "type" : "array", "items" : { "type" : "record", "name" : "SalesDetails", "fields" : [ { "name" : "Make", "type" : "string" }, { "name" : "Model", "type" : "string" }, { "name" : "SellingPrice", "type" : "double" } ] } } } ] }
On this page