Tables¶
The term table is equivalent with a topic when it comes to Apache Kafka. A Kafka record is made of the following parts:
- Key
- Value
- Timestamp
- Topic
- Partition
- Offset
- Headers (a collection of key-value pairs)
The Key and Value components can hold any type of data. To achieve maximum performance, Kafka is not aware of the actual record storage format and what it actually contains. Typically a Key is a primitive value. It can be a string (customer unique identifier - say an email), int (the payment transaction identifier) or long (the IoT device unique identifier). However, there is nothing stopping the user from using complex/nested structures as keys. The same applies to the Value component.
Here are the rules for retrieving each part of a Kafka record using SQL:
- Key - Use
_key
to look at the Key component. Using just_key
will select the Key value. If the key contains complex data, use_key.fieldA
to address a specific field in the record. - Value - Access a field in the Value component directly, or optionally use the
_value
prefix. The following expressions are equivalentSELECT firstName FROM customer
andSELECT _value.firstName FROM customer
. When using _value (i.e.SELECT _value FROM customer
), the entire Value component data is returned. - Headers - To retrieve a record header use the
_header
prefix. For exampleSELECT _header.lenses FROM customer
returns the value of a header namedlenses
. - Metadata - All the other components of a record can be selected using the
_meta
prefix. For exampleSELECT _meta.partition, _meta.timestamp, _meta.offset FROM customer
returns the record table-partition, the record timestamp and its offset within the table-partition.
-- Selecting fields from the Key and Value
SELECT _key.device.id
, _key.device.tags[0] as model
, temperature
, humidity
, geo.latitude
, params[1].value as battery
FROM iot_readings
AVRO is recommended¶
When using AVRO storage format for your Kafka records, and a SR (Schema Registry), the data schema is enforced. What this means is that your program cannot send data content which does not match the schema. AVRO also provides schema evolution rules, out of the box. This makes it possible to extend your schema, say by adding a new field, without impacting the records already stored. Schema management can be achieved by introducing a Schema Registry to your infrastructure. Lenses supports the Confluent and Hortonworks schema registries and provides the web interface to explore and manage the schemas, get the changes history, edit and configure the evolution rules.
Table Schema¶
For each table, the SQL engine tracks and maintains its schema. Given a Kafka record, this means there is a schema for the Key and one for the Value.
You can use the DESCRIBE TABLE
command to quickly see the schema of a given table.
DESCRIBE TABLE payments
/*
Would yield a result like this
_key String
_value.id String
_value.time String
_value.amount decimal<8,2>
_value.currency String
_value.creditCardId String
_value.merchantId Long
*/
In order to understand the record content, the SQL engine needs to have the Key and Value storage format set for each topic. The full list of formats currently supported out of the box is as follows:
- JSON
- AVRO
- XML
- CSV
- PROTOBUF
- STRING
- INT
- LONG
- BYTES (default)
- Custom
For the Google Protobuf format or any other custom format follow configure any custom formats to see how they can be enabled.
How Lenses identifies data types¶
The format type for the key
and the value
is configurable for each table in Lenses.
Whenever a new table is added, or Lenses runs for the first time, it will try to detect the storage format for both.
A user can always override the types by setting the values for each topic as seen in this screen-shot:
Important
Only a user with TableStorageWrite permission can set up the table/topic storage format information in Lenses. Follow the
Advanced users can instruct the SQL engine to use the values provided in the query code itself. For example, imagine a topic where the message key is an INT and the message value is a JSON but the user decides the value part should be read as STRING. To achieve that the following code can be used:
SELECT *
FROM payments
If Lenses fails to read the specified storage format, it will fall back to the default BYTES
storage format.