Custom Serializer and Deserializer

By default, Lenses can interpret messages written in AVRO, JSON, and the common primitive formats such as Strings, longs and so on. However sometimes you may have topics which are processing messages written in another format, such as protobuf, thrift, or even your own proprietary format. In order for Lenses to be able to handle these messages, it is possible to supply a serde which will handle the serialization from a record into bytes and deserialization from bytes back into a record.

Lenses uses AVRO as it’s internal format and so when implementing a custom serde, they are expected to serialize an AVRO Record to bytes, and deserialize from bytes to an AVRO Record. The serde is responsible for understanding what the bytes represent and how to interpret them. For example, a custom deserializer may be passed a byte array from a topic which holds protobuf data. Only this custom serde will know how to interpret that byte array, and will inflate it as an object using the Protobuf marshaller. Then it will use that object to populate an AVRO Record which will be passed back to Lenses.

Let’s consider a simple example of a topic that contains a GPS coordinate. This consists of a lat and a long, both of which are doubles. We encode this as a string “lat:long”, such as “45.623412:10.419433”. From this string, we take the UTF8 bytes, and that is what is published onto the topic.

It would be impossible for a system to interpret this without having the code required to decode / encode this format. Therefore we must provide a custom serde which can convert bytes back into the lat and long parts. Similarly, to allow Lenses to publish data we must be able to take a record that contains a lat and long and return the correctly encoded byte array.

The following Java code implements the interface Serde which has two methods that must be implemented. Notice that the return type of the deserializer is an GenericRecord and that is populated with the two fields parsed from the bytes. In this simple example we don’t worry about error handling, such as ensuring the tokens are valid doubles.

First make sure you bring the library dependency to your project. Here is the maven example:

<dependency>
    <groupId>com.landoop</groupId>
    <artifactId>lenses-serde</artifactId>
    <version>1.0.2</version>
</dependency>
public class LatLngSerde implements Serde {
    private Schema schema = SchemaBuilder.builder()
                        .record("lat_lng")
                        .fields()
                        .requiredDouble("lat")
                        .requiredDouble("lng")
                        .endRecord();

    @Override
    public Schema getSchema() {
        return schema;
    }

    @Override
    public Serializer serializer(Properties properties) {
        return new Serializer() {

            @Override
            public byte[] serialize(GenericRecord record) throws IOException {
                double lat = (double) record.get("lat");
                double lng = (double) record.get("lng");
                String data = lat + ":" + lng;
                return data.getBytes("UTF-8");
            }

            @Override
            public void close() throws IOException {
            }
        };
    }

    @Override
    public Deserializer deserializer(Properties properties) {
        return new Deserializer() {
            @Override
            public GenericRecord deserialize(byte[] bytes) throws IOException {
                String data = new String(bytes);
                String[] tokens = data.split(":");
                double lat = Double.parseDouble(tokens[0]);
                double lng = Double.parseDouble(tokens[1]);

                GenericRecord record = new GenericData.Record(schema);
                record.put("lat", lat);
                record.put("lng", lng);
                return record;
            }

            @Override
            public void close() throws IOException {
            }
        };
    }
}

Note: It is not necessary to implement the serializer method if you will not publish data in this format. In that case, simply return null.

Installation of Custom Serdes

Once a serde has been written, it must be deployed to the Lenses instance. To do this, package up the serde as a jar, with any dependencies it requires, and install the jar into %LENSES_HOME/serdes. Any jars deployed here are picked up after a ten second delay at which point Lenses will scan the jar to discover any serde implementations and will then make those available as a decoder type in the UI.

The following screenshot shows such a serde that has been detected and is available for selection.

../_images/serde_selection.png

The following screenshot shows a topic which contains credit-card messages written in a protobuf format. As the serde is not being used, Lenses interprets the data as raw bytes.

../_images/raw_protobuf_data.png

And in the final screenshot we can see that once the serde is selected, the data is correctly marshalled as the credit-card data.

../_images/converted_protobuf_data.png