A websocket allowing the client to request data using SQL as input. The information on the Lenses SQL Snapshot engined can be read here.
Use the following URL to open a websocket connection. Depending on deployment, use ws (not-secure) or wss (secure) connection:
WS | WSS ${LENSES_URL}/api/ws/v2/sql/execute
Once the connection is established, the server expects the client to send a JSON message with the following structure:
{ "token": String, "sql": String, "live": Optional[Boolean], "stats": Optional[Int] }
The client should wait for incoming messages and stop when EOF is received. The server sends JSON encoded messages, the payload structure is:
{ "type": <Enumeration>, "data": <Dependent on the type field> }
Since there are multiple types of messages the server sends, the type attribute determines the data structure for the data payload. Below is the list of possible type values:
type
data
stats
The data attribute is null. The client should ignore these messages.
{ "type": "HEARTBEAT" }
In this case the payload structure is:
{ "type": "HEARTBEAT", "data": String }
When requested the payload format is:
{ "type": "HEARTBEAT", "data": { id: String, bytesRead: Long, sources: Map[UUID, String], sourceConfigs: Map[UUID, Map[String, Any]], sourcesStats: Map[UUID, Map[Int, Map[String, Long]]], badRecordsTotal: Long, recordsSkipped: Long, results: Long, recordsScanned: Long, duration: Long, startedAt: Long, }
A possible payload can be:
{ "type": "STATS", "data": { "id": "6a29ef99-3914-4640-b0dc-ec4a07c7ffc5", "bytesRead": 51972426, "sources": { "ad1afc73-3dec-41e7-b679-aa35c55e3328": "cc_payments" }, "sourceConfigs": { "ad1afc73-3dec-41e7-b679-aa35c55e3328": { "live.aggs": true, "max.idle.time": 20000, "format.timestamp": true, "show.bad.records": true, "kafka.offset.timeout": 10000, "max.query.time": 3600000, "max.size": 209715200 } }, "sourcesStats": { "ad1afc73-3dec-41e7-b679-aa35c55e3328": { "0": { "begin": 0, "offsetBound": 541950, "bytesRead": 51972426, "recordsScanned": 541951, "end": 541950 } } }, "badRecordsTotal": 0, "recordsSkipped": 0, "results": 119, "recordsScanned": 541951, "duration": 18196, "startedAt": 1678901355694 } }
The payload format is:
{ "type": "METADATA", "data": { "fields": Array[String] } }
{ "type": "END" }
{ "type": "BADRECORD", "data": { "metadata": { "offset": Optional[Long], "partition": Optional[Long], "timestamp": Optional[Long], "table": String, "sourceId": String } } }
{ "type": "SENTINEL", "data": { "event": String, "reason": { "type": String, "timeout": Optional[Long], "source": String, "sourceId": String, "amount":Long } } }
Given the type RECORD the payloads contains the data returned to the client, and it is dependent on the SQL input. The payload format is:
{ "type": "RECORD", "data": { "fields": <dynamic>, "value": <dynamic>, "metadata": <dynamic>, "rownum": Optional[Long] } }
*
data.value.MMSI
For a topic which contains nested data for both key and value.
SELECT * FROM sea_vessel_position_reports
{ "type": "RECORD", "data": { "key": { "MMSI": 219005662 }, "value": { "Type": 1, "Repeat": 0, "MMSI": 219005662, "Speed": 0.1, "Accuracy": false, "Longitude": 12.570543333333333, "Latitude": 55.85040166666667, "location": "55.850402,12.570543", "Course": 177.9, "Heading": 511, "Second": 20, "RAIM": false, "Radio": 33577, "Status": 3, "Turn": -128.0, "Maneuver": 0, "Timestamp": 1491318144456507320 }, "metadata": { "offset": 71, "partition": 1, "timestamp": "2023-03-13T11:42:24.267Z", "__keysize": 10, "__valuesize": 79, "keySchemaId": 4, "valueSchemaId": 5 }, "rownum": 71 } }
If the Kafka message key is null the response would be:
{ "type": "RECORD", "data": { "value": { "Type": 1, "Repeat": 0, "MMSI": 219005662, "Speed": 0.1, "Accuracy": false, "Longitude": 12.570543333333333, "Latitude": 55.85040166666667, "location": "55.850402,12.570543", "Course": 177.9, "Heading": 511, "Second": 20, "RAIM": false, "Radio": 33577, "Status": 3, "Turn": -128.0, "Maneuver": 0, "Timestamp": 1491318144456507320 }, "metadata": { "offset": 71, "partition": 1, "timestamp": "2023-03-13T11:42:24.267Z", "__keysize": 10, "__valuesize": 79, "keySchemaId": 4, "valueSchemaId": 5 }, "rownum": 71 } }
Projecting the key will not return the key attribute:
key
SELECT _key.MMSI FROM sea_vessel_position_reports
produces the following output:
{ "type": "RECORD", "data": { "value": { "MMSI": 265527470 }, "metadata": { "offset": 86, "partition": 1, "timestamp": "2023-03-13T11:42:29.107Z", "__keysize": 10, "__valuesize": 79, "keySchemaId": 4, "valueSchemaId": 5 }, "rownum": 86 } }
Aggregating data will not return the key and metadata attributes:
metadata
SELECT count(*), currency FROM cc_payments GROUP BY currency
{ "type": "RECORD", "data": { "value": { "count": 20066, "currency": "NOR" }, "rownum": 90 }, "rowId": "fc767952-97e5-46cf-868c-7cbfbcde21fb", "statementIndex": null }
On this page