5.0
Snapshot Engine
Enhanced the information provided by the /api/ws/v2/sql/execute endpoint
New fields and new message types were added, old messages and message formats remain unchanged.
Bad records are now streamed by the socket
A new message type was added so that bad records (records that for some reason could not be read) can be streamed.
Bad records for kafka source will include the information shown below:
{
"type": "BADRECORD",
"data": {
"metadata": {
"timestamp": 1618843939222,
"partition": 3,
"offset": 5032,
"sourceId": "4ceebad2-7c5d-4937-ab79-39c6be492aed",
"table": "example_topic"
}
},
"rowId": null,
"statementIndex": null
}
Sentinel records added
A sentinel record type was added. Currently these are used to indicate that a given query was abruptly stopped.
For each early termination reason, a different shape was added:
The scanned size is bigger than allowed amount
{ "type": "SENTINEL", "data": { "reason": { "type": "SCANNEDSIZE", "sourceId": "4ceebad2-7c5d-4937-ab79-39c6be492aed", "source": "example_topic", "amount": 1000 }, "event": "SOURCETERMINATION" }, "rowId": null, "statementIndex": null }
An idle timeout was reached while retrieving data from a source
{ "type": "SENTINEL", "data": { "reason": { "type": "IDLETIMEOUT", "timeout": 0, "sourceId": "4ceebad2-7c5d-4937-ab79-39c6be492aed", "source": "example_topic" }, "event": "SOURCETERMINATION" }, "rowId": null, "statementIndex": null }
The query time for a given source was exceeded
{ "type": "SENTINEL", "data": { "reason": { "type": "COMPLETIONTIMEOUT", "timeout": 0, "sourceId": "4ceebad2-7c5d-4937-ab79-39c6be492aed", "source": "example_topic" }, "event": "SOURCETERMINATION" }, "rowId": null, "statementIndex": null }
New Stats fields
Stats messages now include detailed information regarding the query run. The new message looks as follows:
{
"type": "STATS",
"data": {
"id": "674de6ad-9fbd-478c-b9da-b050c5b2deec",
"bytesRead": 81323,
"sources": {
"4ceebad2-7c5d-4937-ab79-39c6be492aed": "example_topic"
},
"sourceConfigs": {
"4ceebad2-7c5d-4937-ab79-39c6be492aed": {
"max.idle.time": 20000,
"show.bad.records": true,
"kafka.offset.timeout": 10000,
"max.query.time": 3600000,
"max.size": 209715200
}
},
"sourcesStats": {
"4ceebad2-7c5d-4937-ab79-39c6be492aed": {
"0": {
"badRecords": 1,
"bytesRead": 92592,
"recordsScanned": 41,
"begin": 0,
"end": 40,
"offsetBound": 10
}
}
},
"badRecordsTotal": 1,
"recordsSkipped": 0,
"results": 1,
"recordsScanned": 39,
"duration": 1,
"startedAt": 1618847917476
},
"rowId": null,
"statementIndex": null
}
property | meaning |
---|---|
$.sources | each topic reference in the query is assigned a unique sourceId; the id to topic name mapping is defined in this object |
$.sourceConfigs | includes the config options used to instantiate a given source |
$.sourcesStats | an object containing, for each source, the corresponding collected stats |
$.sourcesStats[‘sourceId’] | the stats for a specific source per partition; for non kafka sources partition -1 is used |
$.sourcesStats[‘sourceId’].[‘partition’] | the object containing the list of stats collected for a given source partition; all fields are optional and not guaranteed to appear |
$.sourcesStats[‘sourceId’].[‘partition’].badRecords | count of bad records read for the given source and partition |
$.sourcesStats[‘sourceId’].[‘partition’].bytesRead | total bytes scanned while reading a given source and partition |
$.sourcesStats[‘sourceId’].[‘partition’].recordsScanned | count of all scanned records while reading a given source and partition |
$.sourcesStats[‘sourceId’].[‘partition’].begin | offset of the first element scanned for a given source and partition |
$.sourcesStats[‘sourceId’].[‘partition’].end | offset of the last element scanned for a given source and partition |
$.sourcesStats[‘sourceId’].[‘partition’].offsetBound | the last offset the snapshot engine expect to scan for a give source and partition |
$.badRecordsTotal | count of read bad records |
Remove support for skip.bad.records
Since bad records are now streamable the behavior defined by setting SET skip.bad.records=true|false
is no longer
useful and is therefore no longer supported in the SNAPSHOT engine.
Added show.bad.records
A new flag was added to define if the socket should stream/not stream the bad records read from the topic.