5.0
Deserializers
Lenses works out of the box with any messages in AVRO, PROTOBUF, JSON, XML and primitive formats.
When using messages with formats not natively supported by Lenses, you need to provide a
deserialization envelop or else a serde
. A serde handles reading raw bytes into a meaningful data structure and translating data into raw bytes based on the storage format.
Deserialising a custom format
Suppose a Kafka topic contains GPS coordinates consisting of latitude and a longitude encoded as a UTF-8 string such as “45.623412:10.419433”.
A serde can allow decoding this format. This can be achieved by implementing a simple interface. First, add the following library dependency to your project:
<dependency>
<groupId>com.landoop</groupId>
<artifactId>lenses-serde</artifactId>
<version>1.0.3</version>
</dependency>
Then, check the following Java code implementing the Serde
interface. The return type of the
deserializer is a GenericRecord
with the two fields parsed from the bytes.
In this simple example, we do not worry about error handling, such as ensuring that the tokens have valid double values.
public class LatLngSerde implements Serde {
private Schema schema = SchemaBuilder.builder()
.record("lat_lng")
.fields()
.requiredDouble("lat")
.requiredDouble("lng")
.endRecord();
@Override
public Schema getSchema() {
return schema;
}
@Override
public Serializer serializer(Properties properties) {
return null;
}
@Override
public Deserializer deserializer(Properties properties) {
return new Deserializer() {
@Override
public GenericRecord deserialize(byte[] bytes) throws IOException {
String data = new String(bytes);
String[] tokens = data.split(":");
double lat = Double.parseDouble(tokens[0]);
double lng = Double.parseDouble(tokens[1]);
GenericRecord record = new GenericData.Record(schema);
record.put("lat", lat);
record.put("lng", lng);
return record;
}
@Override
public void close() throws IOException {
}
};
}
}