The input and output data for the engines is stored in Kafka. A storage format is the way to define how the information is stored.
Data sent and received from the brokers is a sequence of bytes. Therefore, data which is being sent to Kafka is first transformed to raw bytes. When reading the data, it is paramount to know how it was written to be able to recreate the original information.
There are many ways to store data:
Choosing an appropriate format can have some great benefits:
Storage formats can be classified in primitives and complex.
The list of supported formats are:
Primitives and storage types are linked together. For bytes, integer, long or string(text) the storage format and the type representation are directly coupled. For example an integer is stored as 8 bytes, the same way it is stored at runtime. A string is stored as a sequence of bytes using the platform default charset (UTF-8). Complex types, however, can store raw primitives as well.
Encoding composite data structures requires a different approach to storage format.
AVRO is a compact, fast, binary storage format.
The format relies on schemas. When AVRO data is read, the schema used when writing is required. Having a schema, enforces the payload to respect an agreed contract; the schema represents the data payload format contract. This avoids having corrupted data records.
Here is an example of a schema example:
{ "type": "record", "name": "LensesExample", "namespace": "io.lenses.example", "fields": [ { "name": "id", "type": "long" }, { "name": "username", "type": "string" }, { "name": "followers", "type": { "type": "array", "items": "string" } }, { "name": "settingsMap", "type": { "type": "map", "values": "string" } }, { "name": "relationship", "type": { "type": "enum", "name": "Relationship", "symbols": [ "MARRIED", "FRIEND", "COLLEAGUE", "STRANGER" ] } }, { "name": "address", "type": { "type": "record", "name": "Address", "fields": [ { "name": "number", "type": "int" }, { "name": "street", "type": "string" }, { "name": "city", "type": "string" } ] } } ] }
Best practices for working with AVRO require is to use a schema manager. The reason is to avoid embedding the schema with each message, since the schema payload might be bigger than the actual data itself. The schema identifier will be part of the data sent to a Kafka topic, thus reducing the disk space used and network traffic. Also having a schema manager allows a centralized place for applications to share, use, and evolve schemas.
In a scenario where the raw bytes contain the schema, Lenses can be extended with custom code to allow the engines to process the data. See below Custom Format.
Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.
Like AVRO the Protobuf format relies on schema. When the data is read, the schema used when writing is required. Having a schema, enforces the payload to respect an agreed contract; the schema represents the data payload format contract. This avoids having corrupted data records.
message Person { string name = 1; int32 id = 2; string email = 3; }
AVRO best practices apply for Google Protobuf.
The format is easy for humans to read and write, and since it is a text format it borrows the storage from the string primitive.
Given this SQL statement:
INSERT INTO <target_topic> SELECT STREAM country , COUNT(*) as total , MAXK(points,3) as maxpoints , AVG(points) as avgpoints FROM <source_topic>
it produces this text output:
{ "country":"UK", "total": 1235, "maxpoints" : [ 9900, 8291, 8111], "avgpoints" : 212345.65 }
JSON storage format can write primitives directly as well. Considering a record Key of type Long and value 100 it will store it as the bytes representation for the text 100.
100
This is another human-readable text format. Following the query above the output would be:
<root> <country>UK</country> <total>1235</total> <maxpoints> <item>9900</item> <item>8291</item> <item>8111</item> </maxpoints> <avgpoints>212345.65</avgpoints> </root>
It is possible to extend Lenses to work with a different storage format which is not provided out of the box.
This makes it possible to handle storage formats like Google Protobuf (without Schema Registry) or Thrift. This extension has to be able to read and translate the raw record bytes into a meaningful data structure.
A byte array from a record which holds Google Protobuf data will be passed to the custom code. Using the Protobuf marshaller the custom extension will inflate the raw bytes to an object Lenses engines can work with..
For example, if a Kafka topic contains GPS coordinates. The data consists of a latitude and a longitude, both of which are doubles and stored as latitude:longitude, such as 45.623412:10.419433. Kafka will receive the raw bytes representation for the GPS coordinates text. Providing the code which can translate this back to an entity Lenses can work with, allows the engines to be able to process the records.
latitude:longitude
45.623412:10.419433
Lenses provides a Java API for hooking up custom storage into the engines.
The following Java code implements the interface Serde. Notice that the return type of the deserializer is an GenericRecord and that is populated with the two fields parsed from the bytes.
public class LatLngSerde implements Serde { private Schema schema = SchemaBuilder.builder() .record("lat_lng") .fields() .requiredDouble("lat") .requiredDouble("lng") .endRecord(); @Override public Schema getSchema() { return schema; } @Override public Serializer serializer(Properties properties) { return new Serializer() { @Override public byte[] serialize(GenericRecord record) throws IOException { double lat = (double) record.get("lat"); double lng = (double) record.get("lng"); String data = lat + ":" + lng; return data.getBytes("UTF-8"); } @Override public void close() throws IOException { } }; } @Override public Deserializer deserializer(Properties properties) { throw new NotImplementedException("Not required for now."); } }
To be able to compile the code this library dependency is required to be added to the build system:
<dependency> <groupId>com.landoop</groupId> <artifactId>lenses-serde</artifactId> <version>1.0.2</version> </dependency>
Once the code is built it can be deployed in Lenses. Follow the #deploy_custom_serde link to get the details.
CSV is a text storage format where the payload consists of a delimited sequence of values.
value1,value2,...,valueN
Lenses will lift the data into a structure like below:
{ "content":[ //value1, //value2, //.. //valueN ] }
All the items in the content field are text. If any of the values represent numbers the CAST function can be used to convert to the required type.
content
CAST
These are storage formats created by the Streaming mode. When performing windowed aggregations the resulting record key carries the window information with it. There are two storage formats for windowed aggregations:
They wrap the Key used during the aggregation. The Key storage format can be any of the other ones mentioned above, apart from custom ones.
For each windowed key the following schema applies:
{ "value": //The Aggregation Key schema. It can be a primitive or complext type "window": { "start": //epoch time when the window started, "end": //epoch time when the window ends. Only for session windows. } }
Kafka and Google Protobuf
Access your Kafka data stored with Google Protobuf
Legacy protobuf support
Access Protobuf data through serde plugin - Lenses 4.3 or earlier releases
On this page