Connect SQL engine¶
Kafka Connect is the framework to use for moving data in and out of Apache Kafka. We provide 25+ Connectors. Each source or sink has its own specific connection details. Working with our clients, we realized that the way Connect expects configuration does not allow an easy connector management. Let us take a JMS Sink as an example. We want to push data from Kafka topics to JMS Topics. For that, we need a mapping between the JMS source and the Kafka target. Let us add on top of this a secure JMS connection. Last, we want specific fields from each JMS payload. The way the majority of connectors are written makes them cumbersome to configure and manage such instances. Keep things simple and make Kafka accessible is our motto, therefore we provide a SQL like syntax to describe the above.
Here is an example of a Lenses SQL for Connect, for moving data from Kafka to JMS:
INSERT INTO /sensors
SELECT sensorId AS id,
minTemperature,
maxTemperature,
avgTemperature
FROM sensors_data
STOREAS JSON
WITHTYPE TOPIC
The SQL like syntax describes and drives the work the JMS sink will perform. It will pick the fields from the Kafka topic named sensors_data and push them to a JMS topic /sensors. The resulting JMS messages payload will be in JSON. Easy!
“Keep things simple and make Kafka accessible is our motto.”
All our connectors support the following:
- AVRO and JSON support
- Multiple Kafka topic to target mapping (for sinks)
- Multiple sources to target Kafka topic (for sources)
- Field selection, extraction, and filtering
- Auto creation and auto evolution for the target storage
- Error handling policies
The Syntax¶
Here the full syntax that our connectors share:
[INSERT|UPSERT]
INTO $TARGET
SELECT *|columns (i.e col1,col2 | col1 AS column1,col2, field1.field2.field3, field1.field2.field3 as f3)
FROM $TOPIC_NAME
[ IGNORE field1, field2,.. ]
[ AUTOCREATE ]
[ WITHSTRUCTURE ]
[ PK columns ]
[ WITHTARGET = target]
[ AUTOEVOLVE ]
[ BATCH = N ]
[ CAPITALIZE ]
[ INITIALIZE ]
[ PROJECTTO versionNumber]
[ PARTITIONBY cola[,colb] ]
[ DISTRIBUTEBY cola[,colb] ]
[ CLUSTERBY value1[,value2] INTO bucketsNumber BUCKETS ]
[ TIMESTAMP cola|sys_current ]
[ TIMESTAMPUNIT = timestampUnit]
[ WITHFORMAT TEXT|AVRO|JSON|BINARY|OBJECT|MAP ]
[ WITHUNWRAP ]
[ STOREAS $YOUR_TYPE([key=value, .....]) ]
[ WITHTAG (field1, tag2= constant, field2 as tag2) ]
[ INCREMENTALMODE = incrementalMode ]
[ WITHTYPE type ]
[ WITHDOCTYPE = docType ]
[ WITHINDEXSUFFIX = suffix ]
[ TTL = ttlType ]
[ WITHCONVERTER = converterClass ]
[ WITHJMSSELECTOR = jmsSelector ]
[ WITHKEY (field1, field2.field3) ]
[ KEYDELIM = value ]
Of course, the keywords which are non-SQL specific are not applicable at the same time. Each connector documentation details the keys it is using.
The SELECT
mode is useful for target systems that do not support the concept of namespaces (Key-Value stores such as HazelCast or Redis):
SELECT *|columns
FROM $TOPIC_NAME
[ IGNORE field1, field2,.. ]
[ PK columns ]
[ WITHSTRUCTURE ]
[ WITHFORMAT JSON|AVRO|BINARY ]
[ WITHUNWRAP ]
[ WITHGROUP theConsumerGroup]
[ WITHOFFSET (offset1)(offset2) ]
[ WITHPARTITION (partition),[(partition, offset) ]
[ SAMPLE $RECORDS_NUMBER EVERY $SLIDE_WINDOW ]
[ LIMIT limitValue ]
[ STOREAS $YOUR_TYPE([key=value, .....]) ]
[ WITHTAG (field1, tag2= constant, field2 as tag2) ]
[ INCREMENTALMODE = incrementalMode ]
[ WITHDOCTYPE = docType ]
[ WITHINDEXSUFFIX = suffix ]
[ WITHCONVERTER = converterClass ]
Here is a Redis Connector example:
"connect.redis.sink.kcql": "INSERT INTO sensorTimeseries SELECT sensorID, temperature, humidity FROM sensorsTopic STOREAS SortedSet (score=ts)",
The above SQL instructs the Redis Kafka Sink Connector to select three fields and store the time-series data into a Sorted Set, using the incoming field ts for scoring each message.