REST API¶
Lenses provides a rich set of REST APIs that can be used for interacting with Apache Kafka, topics, offsets, consumers as well as with the micro-services of your data streaming platform. Lenses considers security as a first-class citizen, provides role-based access and auditing on APIs, and protects sensitive data such as passwords.
Note
Lenses exposes a single PORT and serves both the UI and the APIs from the same PORT. For example, if Lenses is accessible
at http://localhost:3030 then the API is accessible under http://localhost:3030/api/xx using content-type:application/json
Error Codes
All APIs use a standard HTTP error codes for any requests that return an HTTP status indicating an error (4xx or 5xx statuses).
Authentication API¶
All requests must be authenticated using an HTTP Header x-kafka-lenses-token:myToken
. You can obtain the token via
the following login API or you can use a service account.
All REST APIs are protected via role-based authentication that is either BASIC or LDAP based, depending on how Lenses security has been set up. In order to be able to use the APIs, you will need to first authenticate via an appropriate user, then receive an access token and use that token for any subsequent request.
POST /api/login
Attributes
user
, stringpassword
, string
Error Code
401 UNAUTHORIZED
To run below example, we recommend installing the jq tool
Example Request
# login and receive the access token
HOST="http://localhost:9991"
curl -X POST -H "Content-Type:application/json" -d '{"user":"admin", "password":"******"}' ${HOST}/api/login --compress -s
Example Response
"a1f44cb8-0f37-4b96-828c-57bbd8d4934b"
Note
Once an Access Token has been retrieved, it will need to be used in every subsequent request to any API call
-H "X-Kafka-Lenses-Token:${TOKEN}"
Topic API¶
Create Topic¶
POST /api/topics
Attributes
topicName
, string, Requiredreplication
, intpartitions
, intconfigs
, key - value
Example Request
{
"topicName": "topicA",
"replication": 1,
"partitions": 1,
"configs": {
"cleanup.policy": "compact",
"compression.type": "snappy"
}
}
Delete Records from Topic-Partition up to a specified Offset¶
Important
This works only for kafka versions 0.11+
DELETE /api/topics/{string: topicName}/{int: partition}/{long: offset}
Parameters
topicName
, stringpartition
, intoffset
, long
Example Response
A message similar to the one below, indicating that records are being deleted and if everything was successful there will be an audit entry, as well as a notification indicating that.
Records from topic 'test_topic' and partition '0' up to offset '1260', are being deleted.
When the process is completed, an audit will appear in the audits tab.
Update Topic Configuration¶
PUT api/configs/topics/(string: topicName)
Parameters
topicName
, string
Attributes
configs
, an array of topic config key-values
Example Request
PUT /api/topics/config/topicA
{
"configs": [{
"key": "cleanup.policy",
"value": "compact"
}]
}
Get Topic information¶
GET api/topics/(string: topicName)
Parameters
topicName
, string
Example Response
{
"topicName": "topicA",
"keyType": "AVRO",
"valueType": "AVRO",
"partitions": 1,
"replication": 1,
"isControlTopic": false,
"keySchema": null
"valueSchema": null,
"messagesPerSecond": 0,
"totalMessages": 1737056563,
"timestamp": 1515415557251,
"isMarkedForDeletion": false,
"config": [{
"configuration": "cleanup.policy",
"value": "compact",
"defaultValue": "delete",
"documentation": "A string that is either \"delete\" or \"compact\". This string designates the retention policy to use on old log segments. The default policy (\"delete\") will discard old segments when their retention time or size limit has been reached. The \"compact\" setting will enable log compaction on the topic."
}],
"consumers": [],
"messagesPerPartition": [{
"partition": 0,
"messages": 1737056563,
"begin": 0,
"end": 1737056563
}]
}
Topic Metadata API¶
Lenses has a system topic that stores metadata for every topic in your system. Topic Metadata API provides a way to retrieve and also add/delete metadata for an existing topic.
Retrieve All Stored Metadata¶
GET /api/metadata/topics
Example Response
[{
keyType: "AVRO",
valueType: "AVRO",
topicName: "reddit_posts",
valueSchema: "{
"type":"record",
"name":"reddit_post","namespace":"com.landoop.social.reddit.post",
"doc":"Schema for Reddit Posts dataset. [https://www.kaggle.com/reddit/reddit-comments-may-2015/discussion/16213]",
"fields":[]
}",
keySchema: "{
"type":"record",
"name":"reddit_post_key",
"namespace":"com.landoop.social.reddit.post.key",
"fields":[]}"
}]
Retrieve Metadata For A Topic¶
GET /api/metadata/topics/{topicName: string}
Parameters
topicName
, string, The topic to retrieve metadata for.
Example Response
{
keyType: "AVRO",
valueType: "AVRO",
topicName: "reddit_posts",
valueSchema: "{
"type":"record",
"name":"reddit_post","namespace":"com.landoop.social.reddit.post",
"doc":"Schema for Reddit Posts dataset. [https://www.kaggle.com/reddit/reddit-comments-may-2015/discussion/16213]",
"fields":[]
}",
keySchema: "{
"type":"record",
"name":"reddit_post_key",
"namespace":"com.landoop.social.reddit.post.key",
"fields":[]
}"
}
Add Metadata For An Existing Topic¶
POST api/metadata/topics/{topicName: string}
Parameters
Param | Type | Required/Optional |
---|---|---|
topicName |
string | required |
keyType |
string | required |
valueType |
string | required |
keySchema |
string | optional |
valueSchema |
string | optional |
Example Request
POST api/metadata/topics/reddit_posts?keyType=JSON&valueType=JSON
Delete Metadata For An Existing Topic¶
DELETE api/system/topics/metadata/{topicName}
Parameters
topicName
, string, The topic to delete metadata for.
Data API¶
The REST APIs for getting Data allows you to get data from topics by sending Lenses SQL Queries. You can also subscribe and produce messages to the Kafka Topic Live Stream via the Web Socket APIs
LSQL URL Encoding¶
The LSQL statements need to be encoded in order to fire the request. You may want to follow standard instructions here for the encoding: Here is an example:
Assume the query below:
SELECT * FROM `topicA`
WHERE _vtype='AVRO'
AND _ktype='AVRO'
LIMIT 1000
The Encoded version will look like this:
SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000
LSQL Validation¶
GET /api/sql/validation
URL Params
sql
, string (Encoded URL LSQL query), Required
Example Request
curl -X GET -H "Content-Type:application/json" ${HOST}/api/sql/validation?sql=SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000
Example Response
{
"isValid": true,
"line": 0,
"column": 0,
"message": null
}
Example Error
{
"isValid": false,
"line": 4,
"column": 1,
"message": "Invalid syntax.Encountered \"LIIT\" at line 4, column 1.\nWas expecting one of:\n <EOF> ... "
}
LSQL Get Data¶
GET /api/sql/data
URL Params
sql
, string (Encoded URL LSQL query), Requiredstats
, integer, stats are enabled and pushed out every x(stats=4000 means every 4 seconds), Optionaloffsets
, boolean, the end record will pull the offsets for the topic and will be attached to the end record, Optional
How to make a Request
The client should be configured to expect Server Side Events
"Accept": "application/json, text/event-stream"
GET /api/sql/data?sql=SELECT+*+FROM+%60topicA%60%0AWHERE+_vtype%3D%27AVRO%27%0AAND+_ktype%3D%27AVRO%27%0ALIMIT+1000&stats=4000&offsets=true
The client should wait for incoming messages and stop when EOF
is received.
Each message starts with "data:"
followed by a number; the number indicates the type of the data received (see below).
Based on the message type number, you should handle each message differently.
Here are the possible values for the message type:
0
- Heartbeat to keep the connection open, you must SKIP this (addcontinue
), i.edata:0
1
- JSON payload record which contains topic metadata and thevalue
string field which is the data coming from the topic, i.edata:1{timestamp:..., partition:..., value:"{...}"}
2
- Stop JSON message. When recieved you can exit the loop. It also contains statistics, i.edata:2{isTimeRemaining:true/false, size:...., offsets:...}
3
- Error JSON message (if any) i.edata:3{fromLine:...,toLine:...,fromColumn:...,toColumn:...,error:...}
Example Response for a record (message type 1)
data:1{"timestamp":1532460006300,"partition":2,"key":"{\"key\":\"value\"}","offset":560207858,"topic":"","value":"{the fetched data}"}
Example Response for stop (message type 2)
data:2{"size":2062,"isTimeRemaining":true,"offsets":[{"partition":2,"min":560207858,"max":568636162},{"partition":1,"min":338050641,"max":347448266},{"partition":0,"min":355312551,"max":364217747}],"isStopped":false,"totalRecords":3,"isTopicEnd":false,"skippedRecords":0,"recordsLimit":3,"totalSizeRead":2062}
Example Response for an error (message type 3)
data:3{"fromLine":4,"toLine":4,"fromColumn":1,"toColumn":1,"error":"Invalid syntax.Encountered \"LIIT\" at line 4, column 1.\nWas expecting one of:\n <EOF> ... "}
Processor API¶
Before using the Processor API make sure you are aware of which mode your Lenses instance is running to execute the processors (Lenses Configuration).
Get All Processors¶
GET /api/streams
Example response
{
[
"targets" : {
"cluster" : "minikube",
"version" : null,
"namespaces" : [ "lenses-sql" ]
},
"streams" : {
"id": "lsql_c7f177994db640cab95a99900c4fb7e7",
"name": "cc_payments_freq_1min",
"clusterName": "MINISHIFT",
"user": "admin",
"namespace": "lenses-sql",
"uptime": 3387090,
"sql": "SET autocreate=true;SET `commit.interval.ms`='120000';SET `auto.offset.reset`='latest'; INSERT INTO `cc_payments_1m_freq` SELECT STREAM COUNT(*) as `count` FROM `cc_payments`WHERE _ktype='BYTES' and _vtype='BYTES' GROUP BY tumble(1, m)",
"runners": 1,
"deploymentState": "RUNNING",
"topicValueDecoder": "LONG",
"pipeline": "lsql",
"startTs": 1522180383208,
"toTopic": "cc_payments_1m_freq",
"runnerState": {
"e94433cf-cf99-4e46-9adc-407bb06b35c2": {
"id": "e94433cf-cf99-4e46-9adc-407bb06b35c2",
"worker": "worker1",
"state": "RUNNING",
"errorMsg": ""
}
}
}
]
}
Get a Processor¶
Retrieve a specific processor.
GET /api/streams/(string: processorId)
Parameters
processorId
is the id of the processor
The response is a SqlStream.
Example response
{
"streams" : {
"id": "lsql_c7f177994db640cab95a99900c4fb7e7",
"name": "cc_payments_freq_1min",
"clusterName": "MINISHIFT",
"user": "admin",
"namespace": "lenses-sql",
"uptime": 3387090,
"sql": "SET autocreate=true;SET `commit.interval.ms`='120000';SET `auto.offset.reset`='latest'; INSERT INTO `cc_payments_1m_freq` SELECT STREAM COUNT(*) as `count` FROM `cc_payments`WHERE _ktype='BYTES' and _vtype='BYTES' GROUP BY tumble(1, m)",
"runners": 1,
"deploymentState": "RUNNING",
"topicValueDecoder": "LONG",
"pipeline": "lsql",
"startTs": 1522180383208,
"toTopic": "cc_payments_1m_freq",
"runnerState": {
"e94433cf-cf99-4e46-9adc-407bb06b35c2": {
"id": "e94433cf-cf99-4e46-9adc-407bb06b35c2",
"worker": "worker1",
"state": "RUNNING",
"errorMsg": ""
}
}
}
}
Create Processor¶
POST /api/streams
Attributes
name
, string, Requiredsql
, string, Requiredrunners
, intclusterName
, string, used for both Kubernetes and Connect modesnamespace
, string, used only for Kubernetes modepipeline
, string, used only for Kubernetes mode
Example Request
{
"name": "myProcessor",
"sql": "SET `autocreate`=true;INSERT INTO `topicB` SELECT * FROM `topicA` WHERE _ktype='BYTES' AND _vtype='AVRO'",
"runners": 1,
"clusterName": "myCluster",
"namespace": "ns"
}
Response
The response includes the id
for the processor.
Pause Processor¶
PUT /api/streams/(string: processorId)/pause
Parameters
processorId
, string, Required
Resume Processor¶
PUT /api/streams/(string: processorId)/resume
Parameters
processorId
, string, Required
Update Processor Runners¶
PUT /api/streams/(string: processorId)/scale/(int: numberOfRunners)
Parameters
processorId
, string, Required- ` numberOfRunners`, int, Required
Delete Processor¶
DELETE /api/streams/(string: processorId)
Parameters
processorId
, string, Required
Access Control Lists API¶
Kafka Access Control Lists can be viewed and set via the rest endpoints. Only admin
users are allowed to add or modify ACLs.
Create/Update¶
PUT /api/acl
Attributes
resourceType
, string, requiredresourceName
, string, requiredprincipal
, string, requiredpermissionType
, string, required (either Allow or Deny)host
, string, requiredoperation
, string, required
ACL Operations
The following operations are valid (depending on the Kafka version)
Resource Type | Operation |
---|---|
Topic | Read |
Topic | Write |
Topic | Describe |
Topic | Delete |
Topic | DescribeConfigs |
Topic | AlterConfigs |
Topic | All |
Group | Read |
Group | Describe |
Group | Delete |
Group | All |
Cluster | Create |
Cluster | ClusterAction |
Cluster | DescribeConfigs |
Cluster | AlterConfigs |
Cluster | IdempotentWrite |
Cluster | Alter |
Cluster | Describe |
Cluster | All |
TransactionalId | Describe |
TransactionalId | Write |
TransactionalId | All |
DelegationToken | Describe |
DelegationToken | All |
Example Request
Allow read from any host on topic transactions for principal principalType:principalName.
{
"resourceType": "Topic",
"resourceName": "transactions",
"principal": "principalType:principalName" ,
"permissionType": "Allow",
"host": "*",
"operation": "Read"
}
Get ACLs¶
GET /api/acl
Example Response
[{
"resourceType": "Topic",
"resourceName": "transactions",
"principal": "principalType:principalName" ,
"permissionType": "Allow",
"host": "*",
"operation": "Read"
}]
Delete ACLs¶
DELETE /api/acl
Example Request
{
"resourceType": "Topic",
"resourceName": "transactions",
"principal": "principalType:principalName" ,
"permissionType": "Allow",
"host": "*",
"operation": "Read"
}
Alert API¶
Alerts are divided into two categories:
Infrastructure
- These are out of the box alerts that can be toggled on and offConsumer group
- These are user-defined alerts on consumer groups
Alert notifications are the result of an Alert Setting Condition being met on an Alert Setting.
Get All Alert Settings¶
Retrieves the configured alert settings
GET /api/alerts/settings
Single AlertSetting structure.
conditions
is the condition to be met for the alert to be fired. Type of a Map[string][string], can be emptyconditionRegex
is the regular expression to validate the condition set based on theconditionTemplate
. Type of string, can be emptydescription
is the description of the alert. Type of string, cannot be emptyenabled
is a boolean flag to indicate if this alert is enableddocs
is a documentation link. Type of string, can be emptyid
is the id of this alert. Type of int, must be positivecategory
is the category of the alert, eitherInfrastructure
orConsumers
isAvailable
indicates whether or not the alert is available, because some alerts need JMX enabled (If JMX not available, some alerts are disabled). Type of boolean, true/falseconditionTemplate
is an optional string template for the condition, .e.g lag >= $Threshold-Number on group $Consumer-Group and topic $Topic-Name
Example Response
{
"categories": {
"Infrastructure": [
{
"id": 1,
"description": "License is invalid",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
},
{
"id": 1000,
"description": "Kafka Broker is down",
"category": "Infrastructure",
"enabled": false,
"isAvailable": true
},
{
"id": 1001,
"description": "Zookeeper Node is down",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
},
{
"id": 1002,
"description": "Connect Worker is down",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
},
{
"id": 1003,
"description": "Schema Registry is down",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
},
{
"id": 1004,
"description": "Alert Manager is down",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
},
{
"id": 1005,
"description": "Under replicated partitions",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
},
{
"id": 1006,
"description": "Partitions offline",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
},
{
"id": 1007,
"description": "Active Controllers",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
},
{
"id": 1008,
"description": "Multiple Broker Versions",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
},
{
"id": 1009,
"description": "File-open descriptors high capacity on Brokers",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
},
{
"id": 1010,
"description": "Average % the request handler is idle",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
},
{
"id": 1011,
"description": "Fetch requests failure",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
},
{
"id": 1012,
"description": "Produce requests failure",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
},
{
"id": 1013,
"description": "Broker disk usage is greater than the cluster average",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
},
{
"id": 1014,
"description": "Leader Imbalance",
"category": "Infrastructure",
"enabled": true,
"isAvailable": true
}
],
"Consumers": [
{
"id": 2000,
"description": "Consumer Lag exceeded",
"category": "Consumers",
"enabled": true,
"docs": "Raises an alert when the consumer lag
exceeds the threshold",
"conditionTemplate": "lag >= $Threshold-Number
on group $Consumer-Group and topic $Topic-Name",
"conditionRegex": "lag >= ([1-9][0-9]*) on group (\\b\\S+\\b) and topic (\\b\\S+\\b)",
"conditions": {
"28bbad2b-69bb-4c01-8e37-28e2e7083aa9": "lag
>= 100000 on group group and topic topicA",
"4318a5a7-32e4-43af-9f3f-438e64d14a11": "lag
>= 1000000 on group consumerA and topic topicA",
"92dd89a1-83c0-4251-8610-36fce780a824": "lag
>= 1000000 on group minikube.default.london-keyword-count and topic reddit_posts",
"bdb01792-b79f-44d0-a39e-7f3e0906a89b": "lag
>= 100000 on group connect-nullsink and topic topicA",
"f4dd550f-bcb6-4e02-b28b-18bbd426aaf2": "lag
>= 100000 on group groupA and topic topicA"
},
"isAvailable": true
}
]
}
}
Get an Alert Setting¶
Retrieves the specified alert setting
GET /api/alerts/settings/{alert_setting_id: Int}
Parameters
alert_setting_id
is the alert setting id to retrieve. Type of IntFor more information on available identifiers, see: Alert Identifiers .
The response is AlertSetting.
conditions
is the condition to be met for the alert to be fired. Type of a Map[string][string], can be emptyconditionRegex
is the regular expression to validate the condition set based on theconditionTemplate
. Type of string, can be emptydescription
is the description of the alert. Type of string, cannot be emptyenabled
is a boolean flag to indicate if this alert is enableddocs
is a documentation link. Type of string, can be emptyid
is the id of this alert. Type of int, must be positivecategory
is the category of the alert, eitherInfrastructure
orConsumers
isAvailable
indicates whether or not the alert is available, because some alerts need JMX enabled (If JMX not available, some alerts are disabled). Type of boolean, true/falseconditionTemplate
is an optional string template for the condition, .e.g lag >= $Threshold-Number on group $Consumer-Group and topic $Topic-Name
Example Response
{
"id": 2000,
"description": "Consumer Lag exceeded",
"category": "Consumers",
"enabled": true,
"docs": "Raises an alert when the consumer lag exceeds the threshold",
"conditionTemplate": "lag >= $Threshold-Number on group $Consumer-Group and topic $Topic-Name",
"conditionRegex": "lag >= ([1-9][0-9]*) on group (\\b\\S+\\b) and topic (\\b\\S+\\b)",
"conditions": {
"28bbad2b-69bb-4c01-8e37-28e2e7083aa9": "lag >= 100000 on group group and topic topicA",
"4318a5a7-32e4-43af-9f3f-438e64d14a11": "lag >= 1000000 on group consumerA and topic topicA",
"92dd89a1-83c0-4251-8610-36fce780a824": "lag >= 1000000 on group minikube.default.london-keyword-count and topic reddit_posts",
"bdb01792-b79f-44d0-a39e-7f3e0906a89b": "lag >= 100000 on group connect-nullsink and topic topicA",
"f4dd550f-bcb6-4e02-b28b-18bbd426aaf2": "lag >= 100000 on group groupA and topic topicA"
},
"isAvailable": true
}
Enabled an Alert Setting¶
Enables an alert setting
PUT /api/alerts/settings/{alert_setting_id: Int}
Parameters
alert_setting_id
is the alert setting to enable. Type of IntFor more information on available identifiers, see: Alert Identifiers .
Get Alert Setting Conditions¶
Retrieves the conditions for an alert setting
GET /api/alerts/settings/{alert_setting_id: Int}/condition
Parameters
alert_setting_id
the alert setting id to get the condition for. Type of IntFor more information on available identifiers, see: Alert Identifiers .
Example Response
{
"28bbad2b-69bb-4c01-8e37-28e2e7083aa9": "lag >= 100000 on group group and topic topicA",
"4318a5a7-32e4-43af-9f3f-438e64d14a11": "lag >= 1000000 on group consumerA and topic topicA",
"92dd89a1-83c0-4251-8610-36fce780a824": "lag >= 1000000 on group minikube.default.london-keyword-count and topic reddit_posts",
"bdb01792-b79f-44d0-a39e-7f3e0906a89b": "lag >= 100000 on group connect-nullsink and topic topicA",
"f4dd550f-bcb6-4e02-b28b-18bbd426aaf2": "lag >= 100000 on group groupA and topic topicA"
}
Create/Update an Alert Setting Condition¶
POST /api/alerts/settings/{alert_setting_id: Int}/condition/{condition_uuid: String}
Parameters
The request is a plain text expression for the specific condition:
alert_setting_id
is the alert setting id to set the condition for. Type of Intcondition_uuid
is the condition id to update. Type of StringFor more information on available identifiers, see: Alert Identifiers .
Example Request: lag >= 100000 on group group and topic topicA
Delete an Alert Setting Condition¶
DELETE /api/alerts/settings/{alert_setting_id: Int}/condition/{condition_uuid: String)
Attributes
alert_setting_id
is the alert setting id to delete the condition for. Type of Intcondition_uuid
the condition id to delete. Type of StringFor more information on available identifiers, see: Alert Identifiers .
Get All Alert Notifications¶
GET /api/alerts
The response is List of Alerts.
labels
is a list of key-value pairs. It will contain at least severityannotations
is a list of key-value pairs. The keys will contain the summary, source, and docs keys.startsAt
is the time, in ISO format, for when the alert startsendsAt
is the time the alert ended atgeneratorURL
is a unique URL identifying the creator of this alert. It matches AlertManager requirements for providing this fieldalertId
is a unique identifier for the setting corresponding to this alert. Seeid
in the alert settings API.
Example Response
[{
"endsAt": null,
"startsAt": "2018-03-27T21:23:23.634+02:00",
"alertId": 1000,
"annotations": {
"source": "",
"summary": "Broker on 1 is down"
},
"labels": {
"category": "Infrastructure",
"severity": "HIGH",
"instance": "instance101"
},
"generatorURL": "http://lenses"
}]
Alert Identifiers¶
License
Alert Identifier | Description |
---|---|
1 | License is invalid |
Infrastructure
Alert Identifier | Description |
---|---|
1000 | Kafka Broker is down |
1001 | Zookeeper Node is down |
1002 | Connect Worker is down |
1003 | Schema Registry is down |
1004 | Alert Manager is down |
1005 | Under replicated partitions |
1006 | Partitions offline |
1007 | Active Controllers |
1008 | Multiple Broker Versions |
1009 | File-open descriptors high capacity on Brokers |
1010 | Average % the request handler is idle |
1011 | Fetch requests failure |
1012 | Produce requests failure |
1013 | Broker disk usage is lower than the average of the cluster |
1014 | Leader Imbalance |
Consumers
Alert Identifier | Description |
---|---|
2000 | Consumer Lag exceeded |
Connect
Alert Identifier | Description |
---|---|
3000 | Connector deleted |
Topics
Alert Identifier | Description |
---|---|
4000 | Topic has been created |
4001 | Topic has been deleted |
4002 | Topic data has been deleted |
Alert SSE API¶
Alert notifications can be pushed in real-time to the client via a Send Server Event endpoint.
GET /api/sse/alerts
The response is text/event-stream
of Alerts. Keep alive empty heartbeats are sent every 10 seconds.
It sends data:{alertId: …, labels: …}, the Alert is after the data: message prefix. When empty data: then it means stop,
if empty messages received or the length is smaller than 5 then ignore the incoming message, it’s the heartbeat.
labels
is a list of key-value pairs. It will contain at least severityannotations
is a list of key-value pairs. The keys will contain the summary, source, and docs keys.startsAt
is the time, in ISO format, for when the alert startsendsAt
is the time the alert ended atgeneratorURL
is a unique URL identifying the creator of this alert. It matches AlertManager requirements for providing this fieldalertId
is a unique identifier for the setting corresponding to this alert. Seeid
in the alert settings API.
Example Response
data:{
"endsAt": null,
"startsAt": "2018-03-27T21:23:23.634+02:00",
"alertId": 1000,
"annotations": {
"source": "",
"summary": "Broker on 1 is down"
},
"labels": {
"category": "Infrastructure",
"severity": "HIGH",
"instance": "instance101"
},
"generatorURL": "http://lenses"
}
Prometheus API¶
Prometheus can be used to poll metric information from infrastructure services.
Some important metrics, such as consumer lag are not exposed by Kafka. Lenses provides an additional Prometheus API
available at http://lenseshost:port/metrics
to be added to the Prometheus targets in order to bring in additional
critical monitoring information. Authentication is not required, so that Prometheus can freely poll this API.
GET /metrics
The response is a List of prometheus entries, for the consumer lag per partition and the aggregated lag per topic.
lenses_partition_consumer_lag{topic="iot_data",partition="3",consumerGroup="my.group.a"} 537
lenses_topic_consumer_lag{topic="iot_data",consumerGroup="my.group.a"} 4176
Quota API¶
Quotas can be viewed and set using the REST endpoints presented in the following section. Only admin
users are allowed to add or remove entries.
Get Quotas¶
GET /api/quotas
The response is List of Quotas.
entityType
can be eitherCLIENT
,CLIENTS
,CLIENTS DEFAULT
,USER
,USERS
,USERCLIENT
orUSERS DEFAULT
entityName
is Kafka client id forCLIENT
andCLIENTS
and user name forUSER
,USER
andUSERCLIENT
child
is optional and only present for entityTypeUSERCLIENT
and is the client idproperties
is a map of the quota constraints,producer_byte_rate
,consumer_byte_rate
andrequest_percentage
url
from this quota in Lenses
Example Response
[
{
"entityType": "CLIENT",
"entityName" : "my-client-id",
"properties": {
"producer_byte_rate" : "100000",
"consumer_byte_rate" : "200000",
"request_percentage" : "75"
},
"url" : "/api/quotas/clients"
}
]
Create/Update Quota - All Users¶
Default for all users.
PUT /api/quotas/users
Attributes
config
The quota constraints
Example Request
{
"producer_byte_rate" : "100000",
"consumer_byte_rate" : "200000",
"request_percentage" : "75"
}
Create/Update Quota - User all Clients¶
Create/update for all client ids for a particular user.
PUT /api/quotas/users/(string: user)/clients
Parameters
user
The user to set the quota for
Attributes
config
The quota constraints
Example Request
{
"producer_byte_rate" : "100000",
"consumer_byte_rate" : "200000",
"request_percentage" : "75"
}
Create/Update a Quota - User/Client pair¶
Quotas for a user and client id pair.
PUT /api/quotas/users/(string: user)/clients/(string: client-id)
Parameters
user
The user to set the quota forclient-id
The client id to set the quota for
Attributes
config
The quota constraints
Example Request
{
"producer_byte_rate" : "100000",
"consumer_byte_rate" : "200000",
"request_percentage" : "75"
}
Create/Update a Quota - User¶
Quota for a user.
PUT /api/quotas/users/(string: user)
Parameters
user
The user to set the quota for
Attributes
config
The quota constraints
Example Request
{
"producer_byte_rate" : "100000",
"consumer_byte_rate" : "200000",
"request_percentage" : "75"
}
Create/Update Quota - All Clients¶
Default for all clients.
PUT /api/quotas/clients
Attributes
config
The quota constraints
Example Request
{
"producer_byte_rate" : "100000",
"consumer_byte_rate" : "200000",
"request_percentage" : "75"
}
Create/Update a Quota - Client¶
Quotas for a client id.
PUT /api/quotas/clients/(string: client-id)
Parameters
client-id
The client id to set the quota for
Attributes
config
The quota constraints
Example Request
{
"producer_byte_rate" : "100000",
"consumer_byte_rate" : "200000",
"request_percentage" : "75"
}
Delete Quota - All Users¶
Delete default for all users.
DELETE /api/quotas/users
Attributes
- list of properties to be removed from the quota’s config
Example Request
[
"producer_byte_rate",
"consumer_byte_rate",
"request_percentage"
]
Delete Quota - User all Clients¶
Delete for all client ids for a user.
DELETE /api/quotas/users/(string: user)/clients
Parameters
user
The user to delete the quota for
Attributes
- list of properties to be removed from the quota’s config
Example Request
[
"producer_byte_rate",
"consumer_byte_rate",
"request_percentage"
]
Delete a Quota - User/Client pair¶
Delete quotas for a user and client id pair.
DELETE /api/quotas/users/(string: user)/clients/(string: client-id)
Parameters
user
The user to delete the quota forclient-id
The client id to delete the quota for
Attributes
- list of properties to be removed from the quota’s config
Example Request
[
"producer_byte_rate",
"consumer_byte_rate",
"request_percentage"
]
Delete a Quota - User¶
Delete a quota for a user.
DELETE /api/quotas/users/(string: user)
Parameters
user
The user to delete the quota for
Attributes
- list of properties to be removed from the quota’s config
Example Request
[
"producer_byte_rate",
"consumer_byte_rate",
"request_percentage"
]
Delete Quota - All Clients¶
Delete defaults for all clients.
DELETE /api/quotas/clients
Attributes
- list of properties to be removed from the quota’s config
Example Request
[
"producer_byte_rate",
"consumer_byte_rate",
"request_percentage"
]
Delete a Quota - Client¶
Delete quotas for a client id.
DELETE /api/quotas/clients/(string client-id)
Parameters
client-id
The client id to delete the quota for
Attributes
- list of properties to be removed from the quota’s config
Example Request
[
"producer_byte_rate",
"consumer_byte_rate",
"request_percentage"
]
Quotas Precedence
Quotas specified using the above rest endpoints, create entries in zookeeper. The paths in zookeeper dictate the following order of precedence:
- /config/users/<user>/clients/<client-id>
- /config/users/<user>/clients/<default>
- /config/users/<user>
- /config/users/<default>/clients/<client-id>
- /config/users/<default>/clients/<default>
- /config/users/<default>
- /config/clients/<client-id>
- /config/clients/<default>
Tip
The highest priority quota is always enforced. To use an example if clientA
has a client quota of 10MB and
userA
has a user quota 20MB and an application is running under clientA-userA
then the quota will be 20MB
because the user quota is of highest priority.
Connector API¶
Kafka Connect APIs are getting proxied via Lenses. In case multiple connect clusters are managed via Lenses you will need to include the alias used for this cluster as per your Lenses Configuration.
# List active connectors
GET /api/proxy-connect/(string: clusterAlias)/connectors
# Create new connector
POST /api/proxy-connect/(string: clusterAlias)/connectors [CONNECTOR_CONFIG]
# Get information about a specific connector
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)
# Get connector config
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/config
# Set connector config
PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/config
# Get connector status
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/status
# Pause a connector
PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/pause
# Resume a paused connector
PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/resume
# Restart a connector
POST /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/restart
# Get list of connector tasks
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks
# Get current status of a task
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks/(string: task_id)/status
# Restart a connector task
POST /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks/(string: task_id)/restart
# Remove a running connector
DELETE /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)
# List available connector plugins
GET /api/proxy-connect/(string: clusterAlias)/connector-plugins
Schemas API¶
The Schema Registry proxy API interact directly with the schema registry services and iterate over the available hosts and in case of a failure retries with the next available one
# List all available subjects
GET /api/proxy-sr/subjects
# List all versions of a particular subject
GET /api/proxy-sr/subjects/(string: subject)/versions
# Delete a subject and associated compatibility level
DELETE /api/proxy-sr/subjects/(string: subject)
# Get the schema for a particular subject id
GET /api/proxy-sr/schemas/ids/{int: id}
# Get the schema at a particular version
# - versionId string "latest" or numeric values between 1 and 2^31-1
# The string “latest” refers to the last registered schema under the specified subject.
# Note that there may be a new latest schema that gets registered right after this request is served.
GET /api/proxy-sr/subjects/(string: subject)/versions/(versionId: version)
# Register a new schema under a particular subject
POST /api/proxy-sr/subjects/(string: subject)/versions
# Delete a particular version of a subject
DELETE /api/proxy-sr/subjects/(string: subject)/versions/(versionId: version)
# Update global compatibility level
PUT /api/proxy-sr/config
# Get global compatibility level
GET /api/proxy-sr/config
# Change compatibility level of a subject
PUT /api/proxy-sr/config/(string: subject)
# Get compatibility level of a subject
GET /api/proxy-sr/config/(string: subject)
Tip
You can still use the rest endpoints for Schema Registry and Kafka Connect directly. Lenses only proxies the queries to the correct endpoint by also tracking the relevant auditing information!
Logs API¶
GET api/logs/INFO
GET api/logs/METRICS
Example Response
A list of Log lines.
[
{
"timestamp": 1532570466516,
"logger": "akka.actor.ActorSystemImpl",
"stacktrace": "",
"thread": "default-akka.actor.default-dispatcher-3",
"message": "Request: POST->http://localhost:24015/api/login returned 200 OK in 3ms",
"time": "2018-07-26 04:01:06.516",
"level": "INFO"
},
{
"timestamp": 1532570466601,
"logger": "akka.actor.ActorSystemImpl",
"stacktrace": "",
"thread": "default-akka.actor.default-dispatcher-3",
"message": "Request: GET->http://localhost:24015/api/auth returned 200 OK in 2ms",
"time": "2018-07-26 04:01:06.601",
"level": "INFO"
}
]
Note
The data structure is the same for both INFO and METRICS. Follow the implementation for Go for more details
Audits API¶
GET api/audit
Example Response
A list of Audit entries.
[
{
"type": $AuditEntryType,
"change": $AuditEntryChange,
"userId": "string",
"timestamp": 329185402,
"content": {"map key": "map string value", ...}
},
...
]
$AuditEntryType = "TOPIC" OR "TOPIC_DATA" OR "QUOTAS" OR "BROKER_CONFIG" OR "ACL" OR "SCHEMA" OR "PROCESSOR" OR "CONNECTOR"
$AuditEntryChange = "ADD" OR "REMOVE" OR "UPDATE" OR "INSERT"
Audits API (SSE, Live updates)¶
GET api/sse/audit
How to make a Request
The client should be configured to expect Server Side Events
"Accept": "application/json, text/event-stream"
The client should wait(infinite loop/forever) for incoming messages and stop when EOF.
Each message received starts with "data:"
and ends with "\n"
.
If the message has not that prefix, skip it.
The remaining message after the "data:"
is the Audit entry as a JSON object, see below.
The incoming data structure looks like this:
{
"type": $AuditEntryType,
"change": $AuditEntryChange,
"userId": "string",
"timestamp": 329185402,
"content": {"map key": "map string value", ...}
}
$AuditEntryType = "TOPIC" OR "TOPIC_DATA" OR "QUOTAS" OR "BROKER_CONFIG" OR "ACL" OR "SCHEMA" OR "PROCESSOR" OR "CONNECTOR"
$AuditEntryChange = "ADD" OR "REMOVE" OR "UPDATE" OR "INSERT"
Example Response
data:{"type":"TOPIC","change":"INSERT","userId":"johnDoe","timestamp":1532559518100,"content":{"record":"{\"key\":\"PLAINTEXT://...\"}","topic":"topicName"}}
Note
Follow the implementation for Go for more details
Permissions Matrix¶
Topic API
Method - Endpoint | Admin | Write | Read | NoData |
---|---|---|---|---|
POST /api/topics/ |
✔ | ✔ | ||
DELETE /api/topics/{topicName} |
✔ | |||
DELETE /api/topics/{topicName}/{partition}/{offset} |
✔ | |||
PUT /api/topics/config/{topicName} |
✔ | ✔ | ||
GET /api/topics/{topicName} |
✔ | ✔ | ✔ | ✔ |
Metadata API
Method - Endpoint | Admin | Write | Read | NoData |
---|---|---|---|---|
GET /api/metadata/topics |
✔ | ✔ | ✔ | |
GET /api/metadata/topics/{topicName} |
✔ | ✔ | ✔ | |
GET /api/metadata/topics/{topicName} |
✔ | ✔ | ||
GET /api/metadata/topics/{topicName} |
✔ | ✔ |
Data API
Method - Endpoint | Admin | Write | Read | NoData |
---|---|---|---|---|
GET /api/sql/validation |
✔ | ✔ | ✔ | ✔ |
GET /api/sql/data |
✔ | ✔ | ✔ | ✔ |
Processor API
Method - Endpoint | Admin | Write | Read | NoData |
---|---|---|---|---|
GET /api/streams |
✔ | ✔ | ✔ | ✔ |
GET /api/streams/{processorId} |
✔ | ✔ | ✔ | ✔ |
POST /api/streams |
✔ | ✔ | ||
PUT /api/streams/{processorId}/pause |
✔ | ✔ | ||
PUT /api/streams/{processorId}/resume |
✔ | ✔ | ||
PUT /api/streams/{processorId}/scale/{numberOfRunners} |
✔ | ✔ | ||
DELETE /api/streams/{processorId} |
✔ | ✔ |
Access Control Lists API
Method - Endpoint | Admin | Write | Read | NoData |
---|---|---|---|---|
GET /api/acl |
✔ | |||
PUT /api/acl |
✔ | |||
DELETE /api/acl |
✔ |
Alerts API
Method - Endpoint | Admin | Write | Read | NoData |
---|---|---|---|---|
GET /api/alerts |
✔ | ✔ | ✔ | ✔ |
POST /api/alerts |
✔ | ✔ | ||
GET /api/alerts/settings |
✔ | ✔ | ||
GET /api/alerts/settings/{alert_setting_id} |
✔ | ✔ | ✔ | ✔ |
PUT /api/alerts/settings/{alert_setting_id} |
✔ | ✔ | ||
GET /api/alerts/settings/{alert_setting_id}/condition |
✔ | ✔ | ✔ | ✔ |
POST /api/alerts/settings/{alert_setting_id}/condition |
✔ | ✔ | ||
DELETE /api/alerts/settings/{alert_setting_id}/condition/{condition_uuid} |
✔ | ✔ |
Alerts SSE API
Method - Endpoint | Admin | Write | Read | NoData |
---|---|---|---|---|
GET /api/sse/alerts |
✔ | ✔ | ✔ | ✔ |
Prometheus API
Method - Endpoint | Admin | Write | Read | NoData |
---|---|---|---|---|
GET /metrics |
✔ | ✔ | ✔ | ✔ |
Quota API
Method - Endpoint | Admin | Write | Read | NoData |
---|---|---|---|---|
GET /api/quotas |
✔ | ✔ | ✔ | |
PUT /api/quotas/users |
✔ | |||
DELETE /api/quotas/users |
✔ | |||
PUT /api/quotas/clients |
✔ | |||
DELETE /api/quotas/clients |
✔ | |||
PUT /api/quotas/users/{userName} |
✔ | |||
DELETE /api/quotas/users/{userName} |
✔ | |||
PUT /api/quotas/clients/{clientId} |
✔ | |||
DELETE /api/quotas/clients/{clientId} |
✔ | |||
PUT /api/quotas/users/{userName}/clients |
✔ | |||
DELETE /api/quotas/users/{userName}/clients |
✔ | |||
PUT /api/quotas/users/{userName}/clients/{clientId} |
✔ | |||
DELETE /api/quotas/users/{userName}/clients/{clientId} |
✔ |
Connector API
Method - Endpoint | Admin | Write | Read | NoData |
---|---|---|---|---|
GET /api/proxy-connect/{clusterAlias}/connectors |
✔ | ✔ | ✔ | ✔ |
POST /api/proxy-connect/{clusterAlias}/connectors [CONNECTOR_CONFIG] |
✔ | ✔ | ||
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName} |
✔ | ✔ | ✔ | ✔ |
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/config |
✔ | ✔ | ✔ | ✔ |
PUT /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/config |
✔ | ✔ | ||
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/status |
✔ | ✔ | ✔ | ✔ |
PUT /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/pause |
✔ | ✔ | ||
PUT /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/resume |
✔ | ✔ | ||
POST /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/restart |
✔ | ✔ | ||
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/tasks |
✔ | ✔ | ✔ | ✔ |
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/tasks/{task_id}/status |
✔ | ✔ | ✔ | ✔ |
POST /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/tasks/{task_id}/restart |
✔ | ✔ | ||
DELETE /api/proxy-connect/{clusterAlias}/connectors/{connectorName} |
✔ | ✔ | ||
GET /api/proxy-connect/{clusterAlias}/connector-plugins |
✔ | ✔ | ✔ | ✔ |
Schema API
Method - Endpoint | Admin | Write | Read | NoData |
---|---|---|---|---|
GET /api/proxy-sr/subjects |
✔ | ✔ | ✔ | ✔ |
GET /api/proxy-sr/subjects/{subject}/versions |
✔ | ✔ | ✔ | ✔ |
DELETE /api/proxy-sr/subjects/{subject} |
✔ | ✔ | ||
GET /api/proxy-sr/schemas/ids/{id} |
✔ | ✔ | ✔ | ✔ |
GET /api/proxy-sr/subjects/{subject}/versions/{version} |
✔ | ✔ | ✔ | ✔ |
POST /api/proxy-sr/subjects/{subject}/versions |
✔ | ✔ | ||
DELETE /api/proxy-sr/subjects/{subject}/versions/{version} |
✔ | ✔ | ||
PUT /api/proxy-sr/config |
✔ | ✔ | ||
GET /api/proxy-sr/config |
✔ | ✔ | ✔ | ✔ |
PUT /api/proxy-sr/config/{subject} |
✔ | ✔ | ||
GET /api/proxy-sr/config/{subject} |
✔ | ✔ | ✔ | ✔ |
Logs API
Method - Endpoint | Admin | Write | Read | NoData |
---|---|---|---|---|
GET /api/logs/INFO |
✔ | ✔ | ✔ | ✔ |
GET /api/logs/METRICS |
✔ | ✔ | ✔ | ✔ |
Audits API
Method - Endpoint | Admin | Write | Read | NoData |
---|---|---|---|---|
GET /api/audit |
✔ | ✔ | ✔ | ✔ |
GET /api/sse/audit |
✔ | ✔ | ✔ | ✔ |
Security Concerns
We recommend protecting via firewall rules direct access to any REST APIs (i.e. Schema Registry, Kafka Connect, Kubernetes) and then use Lenses APIs to take advantage of refined secure access policies.