In this tutorial we will see how data stored in Apache Kafka with Google Protobuf can be visualised and processed. Once the data is lifted in Lenses, data masking and stream processing with SQL can be unleashed. The code can be found on Github
I want do view my Google Protobuf data in Kafka, protect the sensitive information and be able to process it.
Let’s assume that we have a topic (cards) that contains data regarding credit cards. The Google Protobuf schema is this:
cards
syntax = "proto2"; package io.lenses.examples.serde.protobuf; option java_package = "io.lenses.examples.serde.protobuf.generated"; option java_outer_classname = "CardData"; message CreditCard { required string name = 1; required string country = 2; required string currency = 3; required string cardNumber = 4; required bool blocked = 5; required string type = 6; }
Lenses exposes a light library to allow to plugin Google Protobuf payloads in Kafka. What is needed is to implement the Serde interface:
Serde
<dependency> <groupId>com.landoop</groupId> <artifactId>lenses-serde</artifactId> <version>${lenses.serdes.version}</version> </dependency>
The plugin implementation has to code the two methods:
getSchema()
deserializer(Properties properties)
At the moment, the serializer(Properties properties) is not required (it is not used by Lenses).
serializer(Properties properties)
@Override public Serializer serializer(Properties properties) { //not required throw new NotImplementedException(); } @Override public Deserializer deserializer(Properties properties) { // REQUIRED } @Override public Schema getSchema() { // REQUIRED }
There are two implementations provided in the example:
First thing is to provide the schema for your Kafka payload. avro-protobuf library can handle that automatically. Using the JVM class generated by the Protobuf schema, and ProtobufData the following code can be used to obtain the Avro schema:
avro-protobuf
ProtobufData
private final static Schema schema = ProtobufData.get() .getSchema(CardData.CreditCard.class); @Override public Schema getSchema() { return schema; }
Next step is to translate the raw bytes, storing the card details using Google Protobuf, to a GenericRecord. Step one is to lift the raw bytes into an instance of CreditCard class:
GenericRecord
CreditCard
CardData.CreditCard card = CardData.CreditCard.parseFrom(bytes);
For step two, the card details are written to an in memory array as Avro:
ProtobufDatumWriter<CardData.CreditCard> pbWriter = new ProtobufDatumWriter<>(schema); ByteArrayOutputStream out = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); pbWriter.write(card, encoder); encoder.flush();
Last step reads the Avro bytes as a GenericRecord:
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema); Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(), null); GenericRecord record = datumReader.read(null, decoder); return record;
Going from bytes to CreditCard to bytes to GenericRecord can be short-circuited to avoid the intermediary bytes. This is where this implementation comes into play at the expense of more code.
Lifting the raw bytes into a CreditCard remains the same. The next, and last step, requires requires the creation of an instance of GenericRecord and populate with a value from the CreditCard class instance.
GenericRecord record = new GenericData.Record(schema); record.put("name", card.getName()); record.put("cardNumber", card.getCardNumber()); record.put("cardType", card.getType()); record.put("country", card.getCountry()); record.put("currency", card.getCurrency()); record.put("blocked", card.getBlocked());
Quite often, the payloads sent over Kafka using Google Protobuf contain nested data. Let’s consider this logical representation for a class NestedObj:
NestedObj
{ "a": 123, "b": { "x":"value1" } }
The implementation for getSchema stays the same, the only thing which changed here is to extract the Avro schema for the b field.
getSchema
b
private final static Schema schema = ProtobufData.get().getSchema(CardData.CreditCard.class); private final static Schema fieldBSchema = schema.getField("b").schema(); @Override public Schema getSchema() { return schema; }
Next, the deserializer code needs to create and populate the GenericRecord, including the nested one for field b:
@Override public Deserializer deserializer(Properties properties) { return new Deserializer() { @Override public GenericRecord deserialize(byte[] bytes) throws IOException { NestedObj obj = NestedObj.parseFrom(bytes); GenericRecord record = new GenericData.Record(schema); record.put("a", obj.getA()); GenericRecord recordFieldB = new GenericData.Record(fieldBSchema); recordFieldB.put("x", obj.getB().getX()); record.put("b", recordFieldB); return record; } }; }
If you get the source code, run the following command in the folder containing the pom.xml file:
pom.xml
mvn clean package
Follow the docs to make these two artifacts available to Lenses:
target/lenses-serde-protobuf-example-1.0.0.jar deps/avro-protobuf-1.8.2.jar
In this tutorial you learned how to enable data stored in Kafka with Google Protobuf. Once the plugin is provided to Lenses, the topic containing the data will be associated with it. As a result the data can be queried using Lenses SQL. Alongside it, the data policies will also apply to the data and you can make sure the sensitive information is not available to the users accessing the data.
Last but not least, you can process the topic data as a stream using the SQL processor. The simplest example is to convert the data to Avro:
SET defaults.topic.autocreate=true; INSERT INTO credit_card_as_avro STORE VALUE AS AVRO SELECT STREAM * FROM credit_card_protobuf
On this page