REST API

Lenses provides a rich set of REST APIs that can be used to interact with Apache Kafka, topics, offsets, consumers as well as the micro-services of your data streaming platform. Lenses takes security as a first-class citizen and provides role-based access and auditing on APIs and protects sensitive data such as passwords.

Note

Lenses exposes a single PORT and serves both the UI and the APIs from the same PORT. For example, if Lenses is accessible at http://localhost:3030 then the API is accessible under http://localhost:3030/api/xx using content-type:application/json

Error Codes

All APIs use a standard HTTP error codes for any requests that return an HTTP status indicating an error (4xx or 5xx statuses).

Authentication API

All requests must be authenticated using an HTTP Header x-kafka-lenses-token:myToken. You can obtain the token via the following login API or you can use a service account.

All REST APIs are protected via role-based authentication that is either BASIC or LDAP based, depending on how Lenses security has been set up. In order to be able to use the APIs, you will need to first authenticate via an appropriate user, then receive an access token and use that token for any subsequent request.

POST /api/login

Attributes

  • user, string
  • password, string

Error Code

  • 401 UNAUTHORIZED

To run below example, we recommend installing the jq tool

Example Request

# login and receive the access token
HOST="http://localhost:9991"
curl -X POST -H "Content-Type:application/json" -d '{"user":"admin",  "password":"******"}' ${HOST}/api/login --compress -s

Example Response

"a1f44cb8-0f37-4b96-828c-57bbd8d4934b"

Note

Once an Access Token has been retrieved, it will need to be used in every subsequent request to any API call -H "X-Kafka-Lenses-Token:${TOKEN}"

Topic API

Create Topic

POST /api/topics

Attributes

  • topicName, string, Required
  • replication, int
  • partitions, int
  • configs, key - value

Example Request

{
    "topicName": "topicA",
    "replication": 1,
    "partitions": 1,
    "configs": {
        "cleanup.policy": "compact",
        "compression.type": "snappy"
    }
}

Delete Topic

DELETE /api/topics/(string: topicName)

Parameters

  • topicName, string

Delete Records from Topic-Partition up to a specified Offset

Important

This works only for kafka versions 0.11+

DELETE /api/topics/{string: topicName}/{int: partition}/{long: offset}

Parameters

  • topicName, string
  • partition, int
  • offset, long

Example Response

A message similar to the one below, indicating that records are being deleted and if everything was successful there will be an audit entry, as well as a notification indicating that.

Records from topic 'test_topic' and partition '0' up to offset '1260', are being deleted.
When the process is completed, an audit will appear in the audits tab.

Update Topic Configuration

PUT api/configs/topics/(string: topicName)

Parameters

  • topicName, string

Attributes

  • configs, an array of topic config key-values

Example Request

PUT /api/topics/config/topicA

{
    "configs": [{
        "key": "cleanup.policy",
        "value": "compact"
    }]
}

Get Topic information

GET api/topics/(string: topicName)

Parameters

  • topicName, string

Example Response

{
    "topicName": "topicA",
    "keyType": "AVRO",
    "valueType": "AVRO",
    "partitions": 1,
    "replication": 1,
    "isControlTopic": false,
    "keySchema": null
    "valueSchema": null,
    "messagesPerSecond": 0,
    "totalMessages": 1737056563,
    "timestamp": 1515415557251,
    "isMarkedForDeletion": false,
    "config": [{
        "configuration": "cleanup.policy",
        "value": "compact",
        "defaultValue": "delete",
        "documentation": "A string that is either \"delete\" or \"compact\". This string designates the retention policy to use on old log segments. The default policy (\"delete\") will discard old segments when their retention time or size limit has been reached. The \"compact\" setting will enable log compaction on the topic."
    }],
    "consumers": [],
    "messagesPerPartition": [{
        "partition": 0,
        "messages": 1737056563,
        "begin": 0,
        "end": 1737056563
    }]
}

Topic Metadata API

Lenses has a system topic that stores metadata for every topic in your system. Topic Metadata API provides a way to retrieve and also add/delete metadata for an existing topic.

Retrieve All Stored Metadata

GET /api/metadata/topics

Example Response

[{
    keyType: "AVRO",
    valueType: "AVRO",
    topicName: "reddit_posts",
    valueSchema: "{
        "type":"record",
        "name":"reddit_post","namespace":"com.landoop.social.reddit.post",
        "doc":"Schema for Reddit Posts dataset. [https://www.kaggle.com/reddit/reddit-comments-may-2015/discussion/16213]",
        "fields":[]
        }",
    keySchema: "{
        "type":"record",
        "name":"reddit_post_key",
        "namespace":"com.landoop.social.reddit.post.key",
        "fields":[]}"
}]

Retrieve Metadata For A Topic

GET /api/metadata/topics/{topicName: string}

Parameters

  • topicName, string, The topic to retrieve metadata for.

Example Response

{
    keyType: "AVRO",
    valueType: "AVRO",
    topicName: "reddit_posts",
    valueSchema: "{
        "type":"record",
        "name":"reddit_post","namespace":"com.landoop.social.reddit.post",
        "doc":"Schema for Reddit Posts dataset. [https://www.kaggle.com/reddit/reddit-comments-may-2015/discussion/16213]",
        "fields":[]
        }",
    keySchema: "{
        "type":"record",
        "name":"reddit_post_key",
        "namespace":"com.landoop.social.reddit.post.key",
        "fields":[]
        }"
}

Add Metadata For An Existing Topic

POST api/metadata/topics/{topicName: string}

Parameters

Param Type Required/Optional
topicName string required
keyType string required
valueType string required
keySchema string optional
valueSchema string optional

Example Request

POST api/metadata/topics/reddit_posts?keyType=JSON&valueType=JSON

Delete Metadata For An Existing Topic

DELETE api/system/topics/metadata/{topicName}

Parameters

  • topicName, string, The topic to delete metadata for.

Data API

The REST APIs for getting Data allows you to get data from topics via Lenses SQL queries. You can also subscribe and produce messages to the Kafka Topic Live Stream via the Web Socket APIs

Lenses SQL URL Encoding

SQL statements need to be encoded in order to fire the request. You may want to follow standard instructions here for the encoding: Here is an example:

Assume the query below:

SELECT * FROM `topicA`
LIMIT 1000

The Encoded version will look like this:

SELECT+*+FROM+%60topicA%60%0ALIMIT+1000

Lenses SQL Validation

GET /api/sql/validation

URL Params

  • sql, string (Encoded URL Lenses SQL query), Required

Example Request

curl -X GET -H "Content-Type:application/json" ${HOST}/api/sql/validation?sql=SELECT+*+FROM+%60topicA%60%0ALIMIT+1000

Example Response

{
    "isValid": true,
    "line": 0,
    "column": 0,
    "message": null
}

Example Error

{
    "isValid": false,
    "line": 4,
    "column": 1,
    "message": "Invalid syntax.Encountered \"LIIT\" at line 4, column 1.\nWas expecting one of:\n    <EOF> ... "
}

Lenses SQL Get Data

GET /api/sql/data

URL Params

  • sql, string (Encoded URL Lenses SQL query), Required
  • stats, integer, stats are enabled and pushed out every x(stats=4000 means every 4 seconds), Optional
  • offsets, boolean, the end record will pull the offsets for the topic and will be attached to the end record, Optional

How to make a Request

The client should be configured to expect Server Side Events

"Accept": "application/json, text/event-stream"
GET /api/sql/data?sql=SELECT+*+FROM+%60topicA%60%0ALIMIT+1000&stats=4000&offsets=true

The client should wait for incoming messages and stop when EOF is received.

Each message starts with "data:" followed by a number; the number indicates the type of the data received (see below). Based on the message type number, you should handle each message differently. Here are the possible values for the message type:

  • 0 - Heartbeat to keep the connection open, you must SKIP this (add continue), i.e data:0
  • 1 - JSON payload record which contains topic metadata and the value string field which is the data coming from the topic, i.e data:1{timestamp:..., partition:..., value:"{...}"}
  • 2 - Stop JSON message. When recieved you can exit the loop. It also contains statistics, i.e data:2{isTimeRemaining:true/false, size:...., offsets:...}
  • 3 - Error JSON message (if any) i.e data:3{fromLine:...,toLine:...,fromColumn:...,toColumn:...,error:...}
You can use Lenses-CLI code to see the response data structure:

Example Response for a record (message type 1)

data:1{"timestamp":1532460006300,"partition":2,"key":"{\"key\":\"value\"}","offset":560207858,"topic":"","value":"{the fetched data}"}

Example Response for stop (message type 2)

data:2{"size":2062,"isTimeRemaining":true,"offsets":[{"partition":2,"min":560207858,"max":568636162},{"partition":1,"min":338050641,"max":347448266},{"partition":0,"min":355312551,"max":364217747}],"isStopped":false,"totalRecords":3,"isTopicEnd":false,"skippedRecords":0,"recordsLimit":3,"totalSizeRead":2062}

Example Response for an error (message type 3)

data:3{"fromLine":4,"toLine":4,"fromColumn":1,"toColumn":1,"error":"Invalid syntax.Encountered \"LIIT\" at line 4, column 1.\nWas expecting one of:\n    <EOF> ... "}

Note

Follow the implementation for Go or Python for more details

Processor API

Before using the Processor API make sure you are aware of which mode your Lenses instance is running to execute the processors (Lenses Configuration).

Get All Processors

GET /api/streams

Example response

{
    [
        "targets" : {
            "cluster" : "minikube",
            "version" : null,
            "namespaces" : [ "lenses-sql" ]
        },
        "streams" : {
            "id": "lsql_c7f177994db640cab95a99900c4fb7e7",
            "name": "cc_payments_freq_1min",
            "clusterName": "MINISHIFT",
            "user": "admin",
            "namespace": "lenses-sql",
            "uptime": 3387090,
            "sql": "SET autocreate=true;SET `commit.interval.ms`='120000';SET `auto.offset.reset`='latest'; INSERT INTO `cc_payments_1m_freq` SELECT STREAM COUNT(*) as `count` FROM cc_payments GROUP BY tumble(1, m)",
            "runners": 1,
            "deploymentState": "RUNNING",
            "topicValueDecoder": "LONG",
            "pipeline": "lsql",
            "startTs": 1522180383208,
            "toTopic": "cc_payments_1m_freq",
            "runnerState": {
                "e94433cf-cf99-4e46-9adc-407bb06b35c2": {
                "id": "e94433cf-cf99-4e46-9adc-407bb06b35c2",
                "worker": "worker1",
                "state": "RUNNING",
                "errorMsg": ""
                }
            }
        }
    ]
}

Get a Processor

Retrieve a specific processor.

GET /api/streams/(string: processorId)

Parameters

  • processorId is the id of the processor

The response is a SqlStream.

Example response

{
    "streams" : {
        "id": "lsql_c7f177994db640cab95a99900c4fb7e7",
        "name": "cc_payments_freq_1min",
        "clusterName": "MINISHIFT",
        "user": "admin",
        "namespace": "lenses-sql",
        "uptime": 3387090,
        "sql": "SET autocreate=true;SET `commit.interval.ms`='120000';SET `auto.offset.reset`='latest'; INSERT INTO cc_payments_1m_freq SELECT STREAM COUNT(*) as `count` FROM cc_payments GROUP BY tumble(1, m)",
        "runners": 1,
        "deploymentState": "RUNNING",
        "topicValueDecoder": "LONG",
        "pipeline": "lsql",
        "startTs": 1522180383208,
        "toTopic": "cc_payments_1m_freq",
        "runnerState": {
            "e94433cf-cf99-4e46-9adc-407bb06b35c2": {
            "id": "e94433cf-cf99-4e46-9adc-407bb06b35c2",
            "worker": "worker1",
            "state": "RUNNING",
            "errorMsg": ""
            }
        }
    }
}

Create Processor

POST /api/streams

Attributes

  • name, string, Required
  • sql, string, Required
  • runners, int
  • clusterName, string, used for both Kubernetes and Connect modes
  • namespace, string, used only for Kubernetes mode
  • pipeline, string, used only for Kubernetes mode

Example Request

{
     "name": "myProcessor",
     "sql": "SET autocreate=true;INSERT INTO topicB SELECT * FROM topicA",
     "runners": 1,
     "clusterName": "myCluster",
     "namespace": "ns"
 }

Response

The response includes the id for the processor.

Pause Processor

PUT /api/streams/(string: processorId)/pause

Parameters

  • processorId, string, Required

Resume Processor

PUT /api/streams/(string: processorId)/resume

Parameters

  • processorId, string, Required

Update Processor Runners

PUT /api/streams/(string: processorId)/scale/(int: numberOfRunners)

Parameters

  • processorId, string, Required
  • ` numberOfRunners`, int, Required

Delete Processor

DELETE /api/streams/(string: processorId)

Parameters

  • processorId, string, Required

Access Control Lists API

Kafka Access Control Lists can be viewed and set via the rest endpoints. Only admin users are allowed to add or modify ACLs.

Create/Update

PUT /api/acl

Attributes

  • resourceType, string, required
  • resourceName, string, required
  • principal, string, required
  • permissionType, string, required (either Allow or Deny)
  • host, string, required
  • operation, string, required

ACL Operations

The following operations are valid (depending on the Kafka version)

Resource Type Operation
Topic Read
Topic Write
Topic Describe
Topic Delete
Topic DescribeConfigs
Topic AlterConfigs
Topic All
Group Read
Group Describe
Group Delete
Group All
Cluster Create
Cluster ClusterAction
Cluster DescribeConfigs
Cluster AlterConfigs
Cluster IdempotentWrite
Cluster Alter
Cluster Describe
Cluster All
TransactionalId Describe
TransactionalId Write
TransactionalId All
DelegationToken Describe
DelegationToken All

Example Request

Allow read from any host on topic transactions for principal principalType:principalName.

{
    "resourceType": "Topic",
    "resourceName": "transactions",
    "principal": "principalType:principalName" ,
    "permissionType": "Allow",
    "host": "*",
    "operation": "Read"
}

Get ACLs

GET /api/acl

Example Response

[{
    "resourceType": "Topic",
    "resourceName": "transactions",
    "principal": "principalType:principalName" ,
    "permissionType": "Allow",
    "host": "*",
    "operation": "Read"
}]

Delete ACLs

DELETE /api/acl

Example Request

{
    "resourceType": "Topic",
    "resourceName": "transactions",
    "principal": "principalType:principalName" ,
    "permissionType": "Allow",
    "host": "*",
    "operation": "Read"
}

Alert API

Alerts are divided into two categories:

  • Infrastructure - These are out of the box alerts that can be toggled on and off
  • Consumer group - These are user-defined alerts on consumer groups

Alert notifications are the result of an Alert Setting Condition being met on an Alert Setting.

Get All Alert Settings

Retrieves the configured alert settings

GET /api/alerts/settings

Single AlertSetting structure.

  • conditions is the condition to be met for the alert to be fired. Type of a Map[string][string], can be empty
  • conditionRegex is the regular expression to validate the condition set based on the conditionTemplate. Type of string, can be empty
  • description is the description of the alert. Type of string, cannot be empty
  • enabled is a boolean flag to indicate if this alert is enabled
  • docs is a documentation link. Type of string, can be empty
  • id is the id of this alert. Type of int, must be positive
  • category is the category of the alert, either Infrastructure or Consumers
  • isAvailable indicates whether or not the alert is available, because some alerts need JMX enabled (If JMX not available, some alerts are disabled). Type of boolean, true/false
  • conditionTemplate is an optional string template for the condition, .e.g lag >= $Threshold-Number on group $Consumer-Group and topic $Topic-Name

Example Response

{
  "categories": {
    "Infrastructure": [
      {
        "id": 1,
        "description": "License is invalid",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1000,
        "description": "Kafka Broker is down",
        "category": "Infrastructure",
        "enabled": false,
        "isAvailable": true
      },
      {
        "id": 1001,
        "description": "Zookeeper Node is down",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1002,
        "description": "Connect Worker is down",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1003,
        "description": "Schema Registry is down",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1004,
        "description": "Alert Manager is down",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1005,
        "description": "Under replicated partitions",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1006,
        "description": "Partitions offline",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1007,
        "description": "Active Controllers",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1008,
        "description": "Multiple Broker Versions",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1009,
        "description": "File-open descriptors high capacity on Brokers",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1010,
        "description": "Average % the request handler is idle",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1011,
        "description": "Fetch requests failure",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1012,
        "description": "Produce requests failure",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1013,
        "description": "Broker disk usage is greater than the cluster average",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      },
      {
        "id": 1014,
        "description": "Leader Imbalance",
        "category": "Infrastructure",
        "enabled": true,
        "isAvailable": true
      }
    ],
    "Consumers": [
      {
        "id": 2000,
        "description": "Consumer Lag exceeded",
        "category": "Consumers",
        "enabled": true,
        "docs": "Raises an alert when the consumer lag
exceeds the threshold",
        "conditionTemplate": "lag >= $Threshold-Number
on group $Consumer-Group and topic $Topic-Name",
        "conditionRegex": "lag >= ([1-9][0-9]*) on group (\\b\\S+\\b) and topic (\\b\\S+\\b)",
        "conditions": {
          "28bbad2b-69bb-4c01-8e37-28e2e7083aa9": "lag
>= 100000 on group group and topic topicA",
          "4318a5a7-32e4-43af-9f3f-438e64d14a11": "lag
>= 1000000 on group consumerA and topic topicA",
          "92dd89a1-83c0-4251-8610-36fce780a824": "lag
>= 1000000 on group minikube.default.london-keyword-count and topic reddit_posts",
          "bdb01792-b79f-44d0-a39e-7f3e0906a89b": "lag
>= 100000 on group connect-nullsink and topic topicA",
          "f4dd550f-bcb6-4e02-b28b-18bbd426aaf2": "lag
>= 100000 on group groupA and topic topicA"
        },
        "isAvailable": true
      }
    ]
  }
}

Get an Alert Setting

Retrieves the specified alert setting

GET /api/alerts/settings/{alert_setting_id: Int}

Parameters

  • alert_setting_id is the alert setting id to retrieve. Type of Int

For more information on available identifiers, see: Alert Identifiers .

The response is AlertSetting.

  • conditions is the condition to be met for the alert to be fired. Type of a Map[string][string], can be empty
  • conditionRegex is the regular expression to validate the condition set based on the conditionTemplate. Type of string, can be empty
  • description is the description of the alert. Type of string, cannot be empty
  • enabled is a boolean flag to indicate if this alert is enabled
  • docs is a documentation link. Type of string, can be empty
  • id is the id of this alert. Type of int, must be positive
  • category is the category of the alert, either Infrastructure or Consumers
  • isAvailable indicates whether or not the alert is available, because some alerts need JMX enabled (If JMX not available, some alerts are disabled). Type of boolean, true/false
  • conditionTemplate is an optional string template for the condition, .e.g lag >= $Threshold-Number on group $Consumer-Group and topic $Topic-Name

Example Response

{
  "id": 2000,
  "description": "Consumer Lag exceeded",
  "category": "Consumers",
  "enabled": true,
  "docs": "Raises an alert when the consumer lag exceeds the threshold",
  "conditionTemplate": "lag >= $Threshold-Number on group $Consumer-Group and topic $Topic-Name",
  "conditionRegex": "lag >= ([1-9][0-9]*) on group (\\b\\S+\\b) and topic (\\b\\S+\\b)",
  "conditions": {
    "28bbad2b-69bb-4c01-8e37-28e2e7083aa9": "lag >= 100000 on group group and topic topicA",
    "4318a5a7-32e4-43af-9f3f-438e64d14a11": "lag >= 1000000 on group consumerA and topic topicA",
    "92dd89a1-83c0-4251-8610-36fce780a824": "lag >= 1000000 on group minikube.default.london-keyword-count and topic reddit_posts",
    "bdb01792-b79f-44d0-a39e-7f3e0906a89b": "lag >= 100000 on group connect-nullsink and topic topicA",
    "f4dd550f-bcb6-4e02-b28b-18bbd426aaf2": "lag >= 100000 on group groupA and topic topicA"
  },
  "isAvailable": true
}

Enabled an Alert Setting

Enables an alert setting

PUT /api/alerts/settings/{alert_setting_id: Int}

Parameters

  • alert_setting_id is the alert setting to enable. Type of Int

For more information on available identifiers, see: Alert Identifiers .

Get Alert Setting Conditions

Retrieves the conditions for an alert setting

GET /api/alerts/settings/{alert_setting_id: Int}/condition

Parameters

  • alert_setting_id the alert setting id to get the condition for. Type of Int

For more information on available identifiers, see: Alert Identifiers .

Example Response

{
    "28bbad2b-69bb-4c01-8e37-28e2e7083aa9": "lag >= 100000 on group group and topic topicA",
    "4318a5a7-32e4-43af-9f3f-438e64d14a11": "lag >= 1000000 on group consumerA and topic topicA",
    "92dd89a1-83c0-4251-8610-36fce780a824": "lag >= 1000000 on group minikube.default.london-keyword-count and topic reddit_posts",
    "bdb01792-b79f-44d0-a39e-7f3e0906a89b": "lag >= 100000 on group connect-nullsink and topic topicA",
    "f4dd550f-bcb6-4e02-b28b-18bbd426aaf2": "lag >= 100000 on group groupA and topic topicA"
}

Create/Update an Alert Setting Condition

POST /api/alerts/settings/{alert_setting_id: Int}/condition/{condition_uuid: String}

Parameters

The request is a plain text expression for the specific condition:

  • alert_setting_id is the alert setting id to set the condition for. Type of Int
  • condition_uuid is the condition id to update. Type of String

For more information on available identifiers, see: Alert Identifiers .

Example Request: lag >= 100000 on group group and topic topicA

Delete an Alert Setting Condition

DELETE /api/alerts/settings/{alert_setting_id: Int}/condition/{condition_uuid: String)

Attributes

  • alert_setting_id is the alert setting id to delete the condition for. Type of Int
  • condition_uuid the condition id to delete. Type of String

For more information on available identifiers, see: Alert Identifiers .

Get All Alert Notifications

GET /api/alerts

The response is List of Alerts.

  • labels is a list of key-value pairs. It will contain at least severity
  • annotations is a list of key-value pairs. The keys will contain the summary, source, and docs keys.
  • startsAt is the time, in ISO format, for when the alert starts
  • endsAt is the time the alert ended at
  • generatorURL is a unique URL identifying the creator of this alert. It matches AlertManager requirements for providing this field
  • alertId is a unique identifier for the setting corresponding to this alert. See id in the alert settings API.

Example Response

[{
    "endsAt": null,
    "startsAt": "2018-03-27T21:23:23.634+02:00",
    "alertId": 1000,
    "annotations": {
        "source": "",
        "summary": "Broker on 1 is down"
    },
    "labels": {
        "category": "Infrastructure",
        "severity": "HIGH",
        "instance": "instance101"
    },
    "generatorURL": "http://lenses"
}]

Alert Identifiers

License

Alert Identifier Description
1 License is invalid

Infrastructure

Alert Identifier Description
1000 Kafka Broker is down
1001 Zookeeper Node is down
1002 Connect Worker is down
1003 Schema Registry is down
1004 Alert Manager is down
1005 Under replicated partitions
1006 Partitions offline
1007 Active Controllers
1008 Multiple Broker Versions
1009 File-open descriptors high capacity on Brokers
1010 Average % the request handler is idle
1011 Fetch requests failure
1012 Produce requests failure
1013 Broker disk usage is lower than the average of the cluster
1014 Leader Imbalance

Consumers

Alert Identifier Description
2000 Consumer Lag exceeded

Connect

Alert Identifier Description
3000 Connector deleted

Topics

Alert Identifier Description
4000 Topic has been created
4001 Topic has been deleted
4002 Topic data has been deleted

Alert SSE API

Alert notifications can be pushed in real-time to the client via a Send Server Event endpoint.

GET /api/sse/alerts

The response is text/event-stream of Alerts. Keep alive empty heartbeats are sent every 10 seconds. It sends data:{alertId: …, labels: …}, the Alert is after the data: message prefix. When empty data: then it means stop, if empty messages received or the length is smaller than 5 then ignore the incoming message, it’s the heartbeat.

  • labels is a list of key-value pairs. It will contain at least severity
  • annotations is a list of key-value pairs. The keys will contain the summary, source, and docs keys.
  • startsAt is the time, in ISO format, for when the alert starts
  • endsAt is the time the alert ended at
  • generatorURL is a unique URL identifying the creator of this alert. It matches AlertManager requirements for providing this field
  • alertId is a unique identifier for the setting corresponding to this alert. See id in the alert settings API.

Example Response

data:{
    "endsAt": null,
    "startsAt": "2018-03-27T21:23:23.634+02:00",
    "alertId": 1000,
    "annotations": {
        "source": "",
        "summary": "Broker on 1 is down"
    },
    "labels": {
        "category": "Infrastructure",
        "severity": "HIGH",
        "instance": "instance101"
    },
    "generatorURL": "http://lenses"
}

Prometheus API

Prometheus can be used to poll metric information from infrastructure services.

Some important metrics, such as consumer lag are not exposed by Kafka. Lenses provides an additional Prometheus API available at http://lenseshost:port/metrics to be added to the Prometheus targets in order to bring in additional critical monitoring information. Authentication is not required, so that Prometheus can freely poll this API.

GET /metrics

The response is a List of prometheus entries, for the consumer lag per partition and the aggregated lag per topic.

lenses_partition_consumer_lag{topic="iot_data",partition="3",consumerGroup="my.group.a"} 537
lenses_topic_consumer_lag{topic="iot_data",consumerGroup="my.group.a"} 4176

Quota API

Quotas can be viewed and set using the REST endpoints presented in the following section. Only admin users are allowed to add or remove entries.

Get Quotas

GET /api/quotas

The response is List of Quotas.

  • entityType can be either CLIENT, CLIENTS, CLIENTS DEFAULT, USER, USERS, USERCLIENT or USERS DEFAULT
  • entityName is Kafka client id for CLIENT and CLIENTS and user name for USER, USER and USERCLIENT
  • child is optional and only present for entityType USERCLIENT and is the client id
  • properties is a map of the quota constraints, producer_byte_rate, consumer_byte_rate and request_percentage
  • url from this quota in Lenses

Example Response

[
    {
        "entityType": "CLIENT",
        "entityName" : "my-client-id",
        "properties": {
                        "producer_byte_rate" : "100000",
                        "consumer_byte_rate" : "200000",
                        "request_percentage" : "75"
                        },
        "url" : "/api/quotas/clients"
    }
]

Create/Update Quota - All Users

Default for all users.

PUT /api/quotas/users

Attributes

  • config The quota constraints

Example Request

{
    "producer_byte_rate" : "100000",
    "consumer_byte_rate" : "200000",
    "request_percentage" : "75"
}

Create/Update Quota - User all Clients

Create/update for all client ids for a particular user.

PUT /api/quotas/users/(string: user)/clients

Parameters

  • user The user to set the quota for

Attributes

  • config The quota constraints

Example Request

{
    "producer_byte_rate" : "100000",
    "consumer_byte_rate" : "200000",
    "request_percentage" : "75"
}

Create/Update a Quota - User/Client pair

Quotas for a user and client id pair.

PUT /api/quotas/users/(string: user)/clients/(string: client-id)

Parameters

  • user The user to set the quota for
  • client-id The client id to set the quota for

Attributes

  • config The quota constraints

Example Request

{
    "producer_byte_rate" : "100000",
    "consumer_byte_rate" : "200000",
    "request_percentage" : "75"
}

Create/Update a Quota - User

Quota for a user.

PUT /api/quotas/users/(string: user)

Parameters

  • user The user to set the quota for

Attributes

  • config The quota constraints

Example Request

{
    "producer_byte_rate" : "100000",
    "consumer_byte_rate" : "200000",
    "request_percentage" : "75"
}

Create/Update Quota - All Clients

Default for all clients.

PUT /api/quotas/clients

Attributes

  • config The quota constraints

Example Request

{
    "producer_byte_rate" : "100000",
    "consumer_byte_rate" : "200000",
    "request_percentage" : "75"
}

Create/Update a Quota - Client

Quotas for a client id.

PUT /api/quotas/clients/(string: client-id)

Parameters

  • client-id The client id to set the quota for

Attributes

  • config The quota constraints

Example Request

{
    "producer_byte_rate" : "100000",
    "consumer_byte_rate" : "200000",
    "request_percentage" : "75"
}

Delete Quota - All Users

Delete default for all users.

DELETE /api/quotas/users

Attributes

  • list of properties to be removed from the quota’s config

Example Request

[
    "producer_byte_rate",
    "consumer_byte_rate",
    "request_percentage"
]

Delete Quota - User all Clients

Delete for all client ids for a user.

DELETE /api/quotas/users/(string: user)/clients

Parameters

  • user The user to delete the quota for

Attributes

  • list of properties to be removed from the quota’s config

Example Request

[
    "producer_byte_rate",
    "consumer_byte_rate",
    "request_percentage"
]

Delete a Quota - User/Client pair

Delete quotas for a user and client id pair.

DELETE /api/quotas/users/(string: user)/clients/(string: client-id)

Parameters

  • user The user to delete the quota for
  • client-id The client id to delete the quota for

Attributes

  • list of properties to be removed from the quota’s config

Example Request

[
    "producer_byte_rate",
    "consumer_byte_rate",
    "request_percentage"
]

Delete a Quota - User

Delete a quota for a user.

DELETE /api/quotas/users/(string: user)

Parameters

  • user The user to delete the quota for

Attributes

  • list of properties to be removed from the quota’s config

Example Request

[
    "producer_byte_rate",
    "consumer_byte_rate",
    "request_percentage"
]

Delete Quota - All Clients

Delete defaults for all clients.

DELETE /api/quotas/clients

Attributes

  • list of properties to be removed from the quota’s config

Example Request

[
    "producer_byte_rate",
    "consumer_byte_rate",
    "request_percentage"
]

Delete a Quota - Client

Delete quotas for a client id.

DELETE /api/quotas/clients/(string client-id)

Parameters

  • client-id The client id to delete the quota for

Attributes

  • list of properties to be removed from the quota’s config

Example Request

[
    "producer_byte_rate",
    "consumer_byte_rate",
    "request_percentage"
]

Quotas Precedence

Quotas specified using the above rest endpoints, create entries in zookeeper. The paths in zookeeper dictate the following order of precedence:

  1. /config/users/<user>/clients/<client-id>
  2. /config/users/<user>/clients/<default>
  3. /config/users/<user>
  4. /config/users/<default>/clients/<client-id>
  5. /config/users/<default>/clients/<default>
  6. /config/users/<default>
  7. /config/clients/<client-id>
  8. /config/clients/<default>

Tip

The highest priority quota is always enforced. To use an example if clientA has a client quota of 10MB and userA has a user quota 20MB and an application is running under clientA-userA then the quota will be 20MB because the user quota is of highest priority.

Connector API

Kafka Connect APIs are getting proxied via Lenses. In case multiple connect clusters are managed via Lenses you will need to include the alias used for this cluster as per your Lenses Configuration.

# List active connectors
GET /api/proxy-connect/(string: clusterAlias)/connectors

# Create new connector
POST /api/proxy-connect/(string: clusterAlias)/connectors [CONNECTOR_CONFIG]

# Get information about a specific connector
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)

# Get connector config
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/config

# Set connector config
PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/config

# Get connector status
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/status

# Pause a connector
PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/pause

# Resume a paused connector
PUT /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/resume

# Restart a connector
POST /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/restart

# Get list of connector tasks
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks

# Get current status of a task
GET /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks/(string: task_id)/status

# Restart a connector task
POST /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)/tasks/(string: task_id)/restart

# Remove a running connector
DELETE /api/proxy-connect/(string: clusterAlias)/connectors/(string: connectorName)

# List available connector plugins
GET /api/proxy-connect/(string: clusterAlias)/connector-plugins

Schemas API

The Schema Registry proxy API interact directly with the schema registry services and iterate over the available hosts and in case of a failure retries with the next available one

# List all available subjects
GET /api/proxy-sr/subjects

# List all versions of a particular subject
GET /api/proxy-sr/subjects/(string: subject)/versions

# Delete a subject and associated compatibility level
DELETE /api/proxy-sr/subjects/(string: subject)

# Get the schema for a particular subject id
GET /api/proxy-sr/schemas/ids/{int: id}

# Get the schema at a particular version
# - versionId string "latest" or numeric values between 1 and 2^31-1
#   The string “latest” refers to the last registered schema under the specified subject.
#   Note that there may be a new latest schema that gets registered right after this request is served.
GET /api/proxy-sr/subjects/(string: subject)/versions/(versionId: version)

# Register a new schema under a particular subject
POST /api/proxy-sr/subjects/(string: subject)/versions

# Delete a particular version of a subject
DELETE /api/proxy-sr/subjects/(string: subject)/versions/(versionId: version)

# Update global compatibility level
PUT /api/proxy-sr/config

# Get global compatibility level
GET /api/proxy-sr/config

# Change compatibility level of a subject
PUT /api/proxy-sr/config/(string: subject)

# Get compatibility level of a subject
GET /api/proxy-sr/config/(string: subject)

Tip

You can still use the rest endpoints for Schema Registry and Kafka Connect directly. Lenses only proxies the queries to the correct endpoint by also tracking the relevant auditing information!

Logs API

GET api/logs/INFO
GET api/logs/METRICS

Example Response

A list of Log lines.

[
    {
        "timestamp": 1532570466516,
        "logger": "akka.actor.ActorSystemImpl",
        "stacktrace": "",
        "thread": "default-akka.actor.default-dispatcher-3",
        "message": "Request: POST->http://localhost:24015/api/login returned 200 OK in 3ms",
        "time": "2018-07-26 04:01:06.516",
        "level": "INFO"
    },
    {
        "timestamp": 1532570466601,
        "logger": "akka.actor.ActorSystemImpl",
        "stacktrace": "",
        "thread": "default-akka.actor.default-dispatcher-3",
        "message": "Request: GET->http://localhost:24015/api/auth returned 200 OK in 2ms",
        "time": "2018-07-26 04:01:06.601",
        "level": "INFO"
    }
]

Note

The data structure is the same for both INFO and METRICS. Follow the implementation for Go for more details

Audits API

GET api/audit

Example Response

A list of Audit entries.

[
    {
        "type": $AuditEntryType,
        "change": $AuditEntryChange,
        "userId": "string",
        "timestamp": 329185402,
        "content": {"map key": "map string value", ...}
    },
    ...
]
$AuditEntryType = "TOPIC" OR "TOPIC_DATA" OR "QUOTAS" OR "BROKER_CONFIG" OR "ACL" OR "SCHEMA" OR "PROCESSOR" OR "CONNECTOR"
$AuditEntryChange = "ADD" OR "REMOVE" OR "UPDATE" OR "INSERT"

Audits API (SSE, Live updates)

GET api/sse/audit

How to make a Request

The client should be configured to expect Server Side Events

"Accept": "application/json, text/event-stream"

The client should wait(infinite loop/forever) for incoming messages and stop when EOF.

Each message received starts with "data:" and ends with "\n". If the message has not that prefix, skip it.

The remaining message after the "data:" is the Audit entry as a JSON object, see below.

The incoming data structure looks like this:

{
    "type": $AuditEntryType,
    "change": $AuditEntryChange,
    "userId": "string",
    "timestamp": 329185402,
    "content": {"map key": "map string value", ...}
}
$AuditEntryType = "TOPIC" OR "TOPIC_DATA" OR "QUOTAS" OR "BROKER_CONFIG" OR "ACL" OR "SCHEMA" OR "PROCESSOR" OR "CONNECTOR"
$AuditEntryChange = "ADD" OR "REMOVE" OR "UPDATE" OR "INSERT"

Example Response

data:{"type":"TOPIC","change":"INSERT","userId":"johnDoe","timestamp":1532559518100,"content":{"record":"{\"key\":\"PLAINTEXT://...\"}","topic":"topicName"}}

Note

Follow the implementation for Go for more details

Permissions Matrix

Topic API

Method - Endpoint Admin Write Read NoData
POST /api/topics/    
DELETE /api/topics/{topicName}      
DELETE /api/topics/{topicName}/{partition}/{offset}      
PUT /api/topics/config/{topicName}    
GET /api/topics/{topicName}

Metadata API

Method - Endpoint Admin Write Read NoData
GET /api/metadata/topics  
GET /api/metadata/topics/{topicName}  
GET /api/metadata/topics/{topicName}    
GET /api/metadata/topics/{topicName}    

Data API

Method - Endpoint Admin Write Read NoData
GET /api/sql/validation
GET /api/sql/data

Processor API

Method - Endpoint Admin Write Read NoData
GET /api/streams
GET /api/streams/{processorId}
POST /api/streams    
PUT /api/streams/{processorId}/pause    
PUT /api/streams/{processorId}/resume    
PUT /api/streams/{processorId}/scale/{numberOfRunners}    
DELETE /api/streams/{processorId}    

Access Control Lists API

Method - Endpoint Admin Write Read NoData
GET /api/acl      
PUT /api/acl      
DELETE /api/acl      

Alerts API

Method - Endpoint Admin Write Read NoData
GET /api/alerts
POST /api/alerts    
GET /api/alerts/settings    
GET /api/alerts/settings/{alert_setting_id}
PUT /api/alerts/settings/{alert_setting_id}    
GET /api/alerts/settings/{alert_setting_id}/condition
POST /api/alerts/settings/{alert_setting_id}/condition    
DELETE /api/alerts/settings/{alert_setting_id}/condition/{condition_uuid}    

Alerts SSE API

Method - Endpoint Admin Write Read NoData
GET /api/sse/alerts

Prometheus API

Method - Endpoint Admin Write Read NoData
GET /metrics

Quota API

Method - Endpoint Admin Write Read NoData
GET /api/quotas  
PUT /api/quotas/users      
DELETE /api/quotas/users      
PUT /api/quotas/clients      
DELETE /api/quotas/clients      
PUT /api/quotas/users/{userName}      
DELETE /api/quotas/users/{userName}      
PUT /api/quotas/clients/{clientId}      
DELETE /api/quotas/clients/{clientId}      
PUT /api/quotas/users/{userName}/clients      
DELETE /api/quotas/users/{userName}/clients      
PUT /api/quotas/users/{userName}/clients/{clientId}      
DELETE /api/quotas/users/{userName}/clients/{clientId}      

Connector API

Method - Endpoint Admin Write Read NoData
GET /api/proxy-connect/{clusterAlias}/connectors
POST /api/proxy-connect/{clusterAlias}/connectors [CONNECTOR_CONFIG]    
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName}
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/config
PUT /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/config    
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/status
PUT /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/pause    
PUT /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/resume    
POST /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/restart    
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/tasks
GET /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/tasks/{task_id}/status
POST /api/proxy-connect/{clusterAlias}/connectors/{connectorName}/tasks/{task_id}/restart    
DELETE /api/proxy-connect/{clusterAlias}/connectors/{connectorName}    
GET /api/proxy-connect/{clusterAlias}/connector-plugins

Schema API

Method - Endpoint Admin Write Read NoData
GET /api/proxy-sr/subjects
GET /api/proxy-sr/subjects/{subject}/versions
DELETE /api/proxy-sr/subjects/{subject}    
GET /api/proxy-sr/schemas/ids/{id}
GET /api/proxy-sr/subjects/{subject}/versions/{version}
POST /api/proxy-sr/subjects/{subject}/versions    
DELETE /api/proxy-sr/subjects/{subject}/versions/{version}    
PUT /api/proxy-sr/config    
GET /api/proxy-sr/config
PUT /api/proxy-sr/config/{subject}    
GET /api/proxy-sr/config/{subject}

Logs API

Method - Endpoint Admin Write Read NoData
GET /api/logs/INFO
GET /api/logs/METRICS

Audits API

Method - Endpoint Admin Write Read NoData
GET /api/audit
GET /api/sse/audit

Security Concerns

We recommend protecting via firewall rules direct access to any REST APIs (i.e. Schema Registry, Kafka Connect, Kubernetes) and then use Lenses APIs to take advantage of refined secure access policies.