Web Socket API¶
Lenses provides a set of WebSocket APIs in order to interact with Apache Kafka data in real-time from a JavaScript client for example. It can be used to build powerful Front-End Web applications with full Lenses SQL support.
To speed up integration we provide a open source client JavaScript library; if you are using React/Redux you can get going fast and focus on your business requirements.
npm install rxjs redux-lenses-streaming --save
Introduction¶
The API is straightforward. Once a WebSocket connection has been opened, the client needs to make sure it follows the protocol. Here is the template for each message the client can send to the back end:
{
"type":" SUBSCRIBE/UNSUBSCRIBE/PUBLISH/COMMIT/LOGIN",
"content":"The json text for the specific request",
"correlationId":1000,
"authToken" : "Authorization token or empty"
}
Below you can find the list of supported functionality:
- A client can and should
AUTHENTICATE
first to obtain the token allowing all other requests to be accepted. - A client can
SUBSCRIBE
to a topic via SQL. Please check the Lenses SQL section for the full details on how to set filters and use functions. - A client can
UNSUBSCRIBE
from a topic. - A client can
PUBLISH
messages to a topic. The current version supports only string/json. In the future, we will add support for AVRO. - A client can
COMMIT
the (topic, partition) offsets. - A client subscription must specify the decoder type. This allows reading correctly the content of the Kafka Message Key/Value parts.
The following decoders are supported
STRING
- the byte[] payload received from Kafka is read as aString
INT
- the byte[] payload received from Kafka is read as anINT
LONG
- the byte[] payload received from Kafka is read as aLONG
AVRO
- the byte[] payload received from Kafka is read as anAVRO
JSON
- the byte[] payload received from Kafka is read as anAVRO
BINARY
- the byte[] payload received from Kafka is kept as it is inBinary
format
Field | Description | Type |
---|---|---|
type | Describes the action the back end will take in response to the request.
The available values are:
LOGIN , SUBSCRIBE , UNSUBSCRIBE ,PUBLISH , COMMIT |
String |
content | Contains the JSON content of the actual request. The content
is strictly related to the
type described shortly |
String |
correlationId | A unique identifier in order for the client to link the response
with the request made
|
Long |
authToken | A unique token identifying the user making the request. This token can
only be obtained once the
LOGIN request has completed successfully |
String |
The response received from the back end follows this template:
{
"correlationId": Long,
"type" : "ERROR/INVALIDREQUEST/KAFKAMSG/HEARTBEAT/SUCCESS",
"content": String
}
Each request, LOGIN
, PUBLISH
, SUBSCRIBE
, UNSUBSCRIBE
will be followed by an acknowledgment if it was
successful. Use the correlationId
to handle these acknowledgments, for example after login in you will receive the
token
in the content
field and the message type will be SUCCESS
, then if you SUBSCRIBE
you will receive a
second acknowledgment message of type SUCCESS
. Set a different correlationId
for each type to distinguish.
Field | Description | Type |
---|---|---|
correlationId | The unique identifier the client has provided in
| the request associated with the response
|
Long |
type | Describes what response content the client has
received. Available values are
ERROR ,INVALIDREQUEST , KAFKAMSG , HEARTBEAT , SUCCESS |
String |
content | Contains the actual response content. Each
response type has its own content layout
|
String |
Protocol Definition¶
All requests made are constrained by user permissions on the back end. If the user has only Read
access
then publishing a record to a topic will not be allowed. If the user has only No-Data
user role, then
retrieving messages from Kafka will not be allowed either. See the Security section for role definitions.
Login¶
The first thing to do when a WebSocket connection has been opened is to obtain an authorization token.
To do so the client will have to send the following LOGIN
request format:
{
"type" : "LOGIN",
"content" : "{
"user" : String,
"password" : String,
}",
"correlationId": Long,
"authToken": String
}
Login Options List Reference¶
Field | Description | Type |
---|---|---|
content | Contains a JSON with two fields user and password
to obtain the token for
|
String |
correlationId | A unique number the back end will send back
as part of the response
|
Long |
authToken | For this request type the authorization token
is not validated
|
String |
Note
The content
field value is a string containing a JSON!
A successful login response will look like this:
{
"correlationId" : Long,
"type" : "SUCCESS",
"content" : String
}
Field | Description | Type |
---|---|---|
content | Contains the authorization token | String |
correlationId | A unique number sent in the request | Long |
If the user or password provided is not correct, the client will receive an error response. In this case, the response format looks like this:
{
"correlationId": Long,
"type" : "ERROR",
"content": String
}
Field | Description | Type |
---|---|---|
content | Contains the description error. | String |
correlationId | A unique number sent in the request. | Long |
Authentication Example¶
For authentication, you can use the WebSocket or REST APIs. For WS use the endpoint: ws://lenses-host:port/api/kafka/ws
A request example of a login request can be seen in the Python code below:
# Login Payload with user=admin & password=admin
login_payload = {
"type" : "LOGIN",
"content" : "{
"user" : "admin",
"password" : "admin",
}",
"correlationId": Long,
"authToken": ""
}
# Endpoint definition for lenses-host=localhost, port = 3030 & myclientid="unique long string"
self.url = 'ws://localhost:3030' + "/api/kafka/ws/" + self.myclientid
# Create a websocet connection on the endpoint defined above
ws = websocket.create_connection(self.url)
# Send the login payload
ws.send(json.dumps(login_payload))
print(json.loads(ws.recv()), '\n', json.loads(ws.recv())['content'])
{'content': '2803db07-9837-475e-b1ea-f896ce550ddd', 'correlationId': 2, 'type': 'SUCCESS'}
'2803db07-9837-475e-b1ea-f896ce550ddd'
Once you have received an authentication token, you should use it in all the request made. The token will become unavailable on logout.
An alternative way to authenticate would be via http://lenses-host:port/api/login
Authentication example via the REST /api/login
API with curl
curl --request POST --data '{"user":"admin", "password":"admin"}' http://localhost:3030/api/login -compress -w '\n'
24e75cf5-ccb1-4229-a032-618cb25751e8
Publishing¶
In order to publish a message to a topic the client has to send the following request:
{
"type" : "PUBLISH",
"content" : "{
"topic" : String,
"key" : String,
"value" : String
}",
"correlationId": Long,
"authToken": String
}
Field | Description | Type |
---|---|---|
content | Contains a JSON with three fields: topic, key, and
value. The last two fields are optional. Do not
set the field if you want to send null values
|
String |
correlationId | A unique number the back end will send back
as part of the response
|
Long |
authToken | The authorization token. The back end will check
if the user roles allow such action
|
String |
Note
Remember, the content for key/value are sent to the target Kafka topic are sent as String!
The content
field value is a string containing a JSON!
Subscription¶
To receive messages from a Kafka topic the client has to send a SUBSCRIBE
request.
{
"type" : "SUBSCRIBE",
"content" : "{
"sqls" : [
String,
String
]
}",
"correlationId" : Long,
"authToken" : String
}
Field | Description | Type |
---|---|---|
content | Contains a JSON with one field: SQLs. The field is
an array of Lenses SQL values
|
String |
sqls | An array of Lenses SQL values. The format is a SQL like
syntax allowing you to use functions, filter
and allows for field selection. See template below
|
Array of String |
correlationId | A unique number the back end will send back
as part of the response
|
Long |
authToken | The authorization token. The back end will
check if the user roles allow such action
|
String |
SELECT *
FROM $TOPIC
WHERE [ .. AND ...]
You can provide more than one SQL statement if you want to subscribe to more than 1 topic.
Please visit the Lenses SQL Engine section for full details on what it
supports. The response from the back end can be a SUCCESS
or an ERROR
.
Once the subscription has been successful, messages arriving in the Kafka topic(-s) and matching the filter will be delivered. A message received by the client will have this structure:
{
"content": [
{
"key" : "...",
"value" : "{...}",
"topic" : "topicA",
"partition" : Int,
"offset" : Long,
"timestamp" : Long
},
..
],
"correlationId": Long,
"type" : "KAFKAMSG"
}
Field | Description | Type |
---|---|---|
content | Contains a JSON with six fields: key, value,
topic, partition, offset and timestamp
|
String |
content.key | Contains the Kafka message key value. If the key
is null, the field will not be present
|
String |
content.value | Contains the Kafka message value part. If the
value is null, the field will not be present
|
String |
content.topic | Contains Kafka message topic name | String |
content.partition | Contains Kafka message partition number | Int |
content.offset | Contains Kafka message offset | Long |
content.timestamp | Contains the Kafka message timestamp | Long |
correlationId | A unique number the back end will send back
as part of the response
|
Long |
authToken | The authorization token. The back end will check
if the user roles allow such action
|
String |
Note
The timestamp
field requires Kafka 0.10.2+ and correct broker settings/or client publishing the timestamp.
Un-subscribe¶
A client can choose at any point to stop receiving messages from a given topic(-s). In order to do so, it has to send the following message:
{
"type" :"UNSUBSCRIBE",
"content": "{
"topics": [
"topic":String,
..
]
}",
"correlationId": Long,
"authToken" : String,
}
Field | Description | Type |
---|---|---|
content | Contains a JSON with one field: topics. The field
should contain an array of strings representing
the topics to unsubscribe from
|
String |
correlationId | A unique number the back end will send back
as part of the response
|
Long |
authToken | The authorization token. The back end will check
if the user roles allow such action
|
String |
Although the subscription allows you to specify via Lenses SQL the partitions to subscribe to, the unsubscribe does not support selective partition dropping from the subscription.
Note
Executing a subscribe call with a new Lenses SQL for a topic already in the subscription, will unsubscribe first and subscribe again.
Offsets Commit¶
The JavaScript client can decide when to commit the offset in Kafka. This way, when the client reopens a connection and resubscribes to the same Kafka topic it will receive the Kafka messages from where it left it.
To commit offsets the client has to send the following message structure:
{
"type" :"COMMIT",
"content": "{
"commits": [
{
"topic": String,
"partition": Int,
"offset" : Long
},
...
]
}",
"correlationId": Long,
"authToken" : String
}
Field | Description | Type |
---|---|---|
content | Contains a JSON with one field: commits. The field
should contain an array of elements with three
fields: topic, partition and offset
|
String |
content.commits.topic | The Kafka topic to commit the offsets for | String |
content.commits.partition | The Kafka topic partition to commit the offsets to | Int |
content.commits.offset | The offsets number to retain | Long |
correlationId | A unique number the back end will send back
as part of the response
|
Long |
authToken | The authorization token. The back end will check
if the user roles allow such action
|
String |
Since the commits
field is an array, more than one (topic, partition, offset) tuple can be provided at once.
Note
The content field value is a string containing a JSON!
Heartbeat¶
The REST API makes sure it keeps the connection open in case there is no data going back and forth between the client and the back end. Therefore, the client should be able to handle messages with the following structure:
{
"type" : "HEARTBEAT"
}
When such messages are received the client can discard them.
Working with New SQL Engine¶
You can access the new SQL Engine (see Lenses SQL Engine for options) from the /api/ws/v1/execute
endpoint.
First, you need to authenticate and export the token (see Authentication Example).
To send a request command to the new SQL Engine, first encode the command to the URL together with the token as shown below
.
Example command before encoding
CREATE TABLE customer (id string, address.line string, address.city string, address.postcode int, email string)
FORMAT (string, json)
PROPERTIES (partitions=1, compacted=true)
Example command after encoding (Python example)
self._sql = {"sql": "CREATE TABLE customer (id string, address.line string, address.city string, address.postcode int, email string)
FORMAT (string, json)
PROPERTIES (partitions=1, compacted=true)"
}
self._sql = {'sql': self._command}
self._command = 'ws://localhost:3030/api/ws/v1/sql/execute?' + urlencode(sql) + '&' + urlencode({'stats': '2'}) + '&' + urlencode({'token': self.token})
ws = create_connection(self._command)
In order to read the response from the above example, you must extend your code to do the appropriate iterations and checks as described below:
The client should do a loop for incoming messages and stop when Key: type, Value: END
is received. Below we present the different values that Key: type
can have.
Value: END
Value: RECORD
Value: STATS
Value: ERROR
The RECORD & STATS
entries have additional fields, with data
being the most important. The data in the STATS
entries hold stats related with your request, while
the data in the RECORD
entries hold the actual data of your request.
All requests have one END
& STATS
entry, and also Zero or more RECORD/S (>=0)
.
Output by reading the response from the example above CREATE TABLE customer...
{'type': 'RECORD', 'data': {'rownum': 0, 'value': 'Topic customer has been created'}}
{'type': 'STATS', 'data': {'results': 0, 'bytesRead': 0, 'duration': 0, 'id': 'e626273c-133c-41a0-b287-c4b62418c853', 'recordsSkipped': 0, 'recordsScanned': 0}}
{'type': 'END'}
Output after executing DESCRIBE TABLE customer
{'type': 'RECORD', 'data': {'rownum': 0, 'value': {'type': 'string', 'name': '_key'}}}
{'type': 'RECORD', 'data': {'rownum': 1, 'value': {'type': 'string', 'name': '_value.id'}}}
{'type': 'RECORD', 'data': {'rownum': 2, 'value': {'type': 'string', 'name': '_value.address.line'}}}
{'type': 'RECORD', 'data': {'rownum': 3, 'value': {'type': 'string', 'name': '_value.address.city'}}}
{'type': 'RECORD', 'data': {'rownum': 4, 'value': {'type': 'int', 'name': '_value.address.postcode'}}}
{'type': 'RECORD', 'data': {'rownum': 5, 'value': {'type': 'string', 'name': '_value.email'}}}
{'type': 'STATS', 'data': {'duration': 0, 'recordsSkipped': 0, 'results': 0, 'id': '049a86f6-c72c-496f-a4e4-b2a1d16143ff', 'bytesRead': 0, 'recordsScanned': 0}}
{'type': 'END'}
Output after executing DROP TABLE customer
{'type': 'STATS', 'data': {'recordsSkipped': 0, 'id': '3eb4b575-9e68-4836-8e41-f5ef3dd5f9e1', 'duration': 0, 'results': 0, 'recordsScanned': 0, 'bytesRead': 0}}
{'type': 'END'}