Clients / Python¶
The Lenses python-library
is a Python client enabling Python developers and data scientists to take advantage
of the Rest and WebSocket endpoints Lenses exposes. Users can:
- Manage Kafka Topics
- Manage Registry
- Manage SQL Processors
- Manage Kafka Connectors
- Browse Topic data via SQL
- Subscribe to live continuous SQL queries via SQL
- Get Flows
- Manage Lenses Admin Interface
Installation¶
Dependencies¶
Listing of runtime and buildtime Dependencies of lensesPy
For installing the required Dependencies
Install Pip3:
curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
python3.8 get-pip.py
rm -f get-pip.py
Runtime Dependencies¶
- requests==2.22.0
- websocket-client==0.56.0
- kerberos==1.3.0 *
Note: Kerberos is optional and will be installed only if your select kerberos support (see at installation step)
Build time Dependencies¶
- setuptools
- virtualenv
Build lensesio lib¶
Clone the Python repo and execute the following command inside the repo:
python3 setup.py sdist bdist_wheel
Install Lenses Python¶
Install lensesio lib
You can install by using pip
pip3 install dist/lensesio-3.0.0-py3-none-any.whl
Install with Kerberos support (Only Linux / Darwin)
pip3 install dist/lensesio-3.0.0-py3-none-any.whl[kerberos]
Getting started¶
Authentication¶
There are three different ways that can be used for authentication.
Parameter Name | Description |
---|---|
basic | Basic (Accont) Authentication |
service | Service Account (Token Based) |
krb5 | Kerberos Authentication |
Basic authentication¶
Parameters for the basic_auth method
Parameter Name | Description | Requried | Type |
---|---|---|---|
auth_type | Authentication Type | Yes | String |
url | Lenses Endpoint | Yes | String |
username | Username | Yes | String |
password | Password | Yes | String |
For basic authentication, issue:
from lensesio.lenses import main as main
lenses_lib = main(
auth_type="basic",
url=lenses_endpoint,
username=user,
password=psk
)
where lenses_endpoint, user, psk are python variables set by you with the endpoint, username and password
Kerberos authentication¶
Parameters for the kerberos_auth method
Parameter Name | Description | Requried | Type |
---|---|---|---|
auth_type | Authentication Type | Yes | String |
url | Lenses Endpoint | Yes | String |
krb_service | Service | Yes | String |
Note: Kerberos support is only supported for linux platform and is not enabled by default. To enable Kerberos support follow kerberos dependency step in the Install section
pip3 install dist/lensesio-3.0.0-py3-none-any.whl[kerberos]
For Kerberos authentcation, issue:
from lensesio.lenses import main
lenses_lib = main(
auth_type="krb5",
url="http://localhost:3030",
krb_service="HTTP@primef.dev.local"
)
Get User Info after authentication¶
To get the authenticated user info, issue:
userInfo = lenses_lib.UserInfo()
print(userInfo)
{'permissions': ['ManageConnectors',
'ViewDataPolicies',
'ViewTopology',
...
'security': {'http': False, 'kerberos': True, 'ldap': False},
'token': '***',
'user': '***'}
Kafka Topics¶
Examples with methods used to manage Kafka Topics
Get Topics List¶
To get a list with all kafka topics, issue
topicsList = lenses_lib.LstOfTopicsNames()
print(topicsList)
[
'connect-configs',
...
]
Get Topics Description¶
To get detailed description for all kafka topics, issue
kafkaTopics = lenses_lib.GetAllTopics()
print(kafkaTopics)
[{'configs': 25,
'consumers': 1,
'isCompacted': False,
...
'topicName': 'connect-configs',
{'configs': 25,
'consumers': 0,
...
Detailed info for a Topic¶
To get a detailed description for a particular topic, issue
topicInfo = lenses_lib.TopicInfo('connect-configs')
print(topicInfo)
{'applications': [],
'config': [{'defaultValue': 'more than 1000y',
'documentation': None,
'isDefault': True,
'name': 'message.timestamp.difference.max.ms',
'originalValue': '9223372036854775807',
'value': 'more than 1000y'},
{'defaultValue': '1 MB',
'documentation': None,
'isDefault': True,
'name': 'max.message.bytes',
'originalValue': '1000012',
'value': '1 MB'},
...
Create a Topic¶
Parameters for the CreateTopic method
Parameter Name | Description | Requried | Type |
---|---|---|---|
topicName | Topic’s Name | Yes | String |
partitions | Topic’s partitions | Yes | Int. |
replication | Topic’s replication number | Yes | Int. |
config | Dict with Topic options | No | Json |
To create a topic, first create a dictionary with the options below
config = {
"cleanup.policy": "compact",
"compression.type": "snappy"
}
Then issue the CreateTopic method in order to create the topic
result = lenses_lib.CreateTopic(
name="test_topic",
partitions=1,
replication=1,
config=config
)
print(result)
'Topic [test_topic] created'
Update Topic configuration¶
Parameters for the UpdateTopicConfig method
Parameter Name | Description | Requried | Type |
---|---|---|---|
topicname | Topic’s Name | Yes | String |
data_params | Dict with Topic options | Yes | Json |
Create the configuration with the desired options
config = {"configs": [{"key": "cleanup.policy", "value": "compact"}]}
Use the UpdateTopicConfig method to update the topic’s configuration
result = lenses_lib.UpdateTopicConfig('test_topic', config)
print(result)
'Topic [test_topic] updated config with [SetTopicConfiguration(...'
Publish data to a Topic¶
Parameters for the Publish method
Parameter Name | Description | Requried | Type |
---|---|---|---|
name | Topic’s Name | Yes | String |
key | Record’s key | Yes | String/Json |
value | Record’s value | Yes | String/Json |
clientId | Client’s ID | Yes | String |
Publishing data to a topic is as easy as
result = lenses_lib.Publish(
"test_topic",
"test_key",
"{'value':1}"
)
print(result)
{'content': None, 'correlationId': 1, 'type': 'SUCCESS'}
Subscribe data to a Topic¶
Parameters for the Subscribe method
Parameter Name | Description | Requried | Type |
---|---|---|---|
dataFunc | Custom function to work with data | Yes | String |
query | SQL Query | Yes | String/Json |
clientId | Client’s ID | No | String |
Note: First define a custom function to work with your data. A custom function is needed because the query is continuous.
def print_data(message):
print(message)
Subscribing to a topic is as easy as
lenses_lib.Subscribe(print_data, "select * from my_topic")
{'content': 'test_topic', 'correlationId': 1, 'type': 'SUCCESS'}
{'key': '8387236824701691257', 'offset': 0, 'partition': 0, 'timestamp': 1580157301897, 'topic': 'test_topic', 'value': "{'value':1}"}
Delete records from a Topic¶
Parameters for the DeleteTopicRecords method
Parameter Name | Description | Requried | Type |
---|---|---|---|
name | Topic’s Name | Yes | String |
partition | Topic’s Partition | Yes | Int. |
offset | End offset | Yes | Int. |
Records can be deleted by providing a range of offsets
result = lenses_lib.DeleteTopicRecords('test_topic', "0", "10")
print(result)
"Records from topic '%s' and partition '0' up to offset '10'" % 'test_topic'
Delete a Topic¶
Parameters for the DeleteTopic method
Parameter Name | Description | Requried | Type |
---|---|---|---|
topicname | Topic’s Name | Yes | String |
Delete a topic called test_topic by using the DeleteTopic method
result = lenses_lib.DeleteTopic("test_topic")
print(result)
"Topic 'test_topic' has been marked for deletion"
Kafka ACL¶
Examples of methods used to manage Kafka ACLs
List Kafka ACLs¶
Example of listing kafka acls. Here we have not set any acls, hence we get an empty list
result = lenses_lib.GetAcl()
print(result)
[]
Set Kafka ACLs¶
Parameters for the SetAcl method
Parameter Name | Description | Requried | Type |
---|---|---|---|
resourceType | https://kafka.apache.org/documentation/#security_authz_cli | Yes | String |
resourceName | Yes | String | |
principal | Yes | String | |
permissionType | Yes | String | |
host | Yes | String | |
operation | Yes | String |
result = lenses_lib.SetAcl("Topic", "transactions", "GROUPA:UserA", "Allow", "*", "Read")
print(result)
'OK'
Delete Kafka ACLs¶
Parameters for the DelAcl method
Parameter Name | Description | Requried | Type |
---|---|---|---|
resourceType | https://kafka.apache.org/documentation/#security_authz_cli | Yes | String |
resourceName | Yes | String | |
principal | Yes | String | |
permissionType | Yes | String | |
host | Yes | String | |
operation | Yes | String |
To delete a Kafka ACL, issue:
result = lenses_lib.DelAcl("Topic", "transactions", "GROUPA:UserA", "Allow", "*", "Read")
print(result)
'OK'
Managing Connect Distributed¶
Examples of methods used to manage Connect Distributed
Create a Connector¶
Parameters for the CreateConnector method
Parameter Name | Description | Requried | Type |
---|---|---|---|
cluster | Connect cluster | Yes | String |
configs | Connector’s config dict | Yes | Json |
If you have configured Lenses with Connect, then you can use the CreateConnector method for creating a new connector.
First, create the configuration that describes the connector
config = {
"name": "test_connector",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max": "1",
"topic": "test_connector_topic"
}
}
The create the connector by issuing: Note: dev is the connect cluster’s name.
result = lenses_lib.CreateConnector('dev', config)
print(result)
{'name': 'test_connector',
'config': {'connector.class': 'org.apache.kafka.connect.file.FileStreamSourceConnector',
'tasks.max': '1',
'topic': 'test_connector_topic',
'name': 'test_connector'},
'tasks': [{'connector': 'test_connector', 'task': 0}],
'type': 'source'}
List all connectors¶
Parameters for the GetConnectors method
Parameter Name | Description | Requried | Type |
---|---|---|---|
cluster | Connect cluster | Yes | String |
configs | Connector’s config dict | Yes | Json |
result = lenses_lib.GetConnectors('dev')
print(result)
['test_connector', 'logs-broker', 'nullsink']
Get information about a connector¶
Parameters for the GetConnectorInfo method
Parameter Name | Description | Requried | Type |
---|---|---|---|
cluster | Connect cluster | Yes | String |
connector | Connector’s name | Yes | String |
result = lenses_lib.GetConnectorInfo('dev', 'test_connector')
print(result)
{'name': 'test_connector',
'config': {'connector.class': 'org.apache.kafka.connect.file.FileStreamSourceConnector',
'tasks.max': '1',
'name': 'test_connector',
'topic': 'test_connector_topic'},
'tasks': [{'connector': 'test_connector', 'task': 0}],
'type': 'source'}
Get Connector’s Configuration¶
Parameters for the GetConnectorConfig method
Parameter Name | Description | Requried | Type |
---|---|---|---|
cluster | Connect cluster | Yes | String |
connector | Connector’s name | Yes | String |
result = lenses_lib.GetConnectorConfig('dev', 'test_connector')
print(result)
{'connector.class': 'org.apache.kafka.connect.file.FileStreamSourceConnector',
'tasks.max': '1',
'name': 'test_connector',
'topic': 'test_connector_topic'}
Get Connector’s Status¶
Parameters for the GetConnectorStatus method
Parameter Name | Description | Requried | Type |
---|---|---|---|
cluster | Connect cluster | Yes | String |
connector | Connector’s name | Yes | String |
result = lenses_lib.GetConnectorStatus('dev', 'test_connector')
print(result)
{'name': 'test_connector',
'connector': {'state': 'RUNNING', 'worker_id': '10.15.3.1:8083'},
'tasks': [{'id': 0, 'state': 'RUNNING', 'worker_id': '10.15.3.1:8083'}],
'type': 'source'}
Get Connector’s Tasks¶
Parameters for the GetConnectorTasks method
Parameter Name | Description | Requried | Type |
---|---|---|---|
cluster | Connect cluster | Yes | String |
connector | Connector’s name | Yes | String |
result = lenses_lib.GetConnectorTasks('dev', 'test_connector')
print(result)
[{'id': {'connector': 'test_connector', 'task': 0},
'config': {'task.class': 'org.apache.kafka.connect.file.FileStreamSourceTask',
'batch.size': '2000',
'topic': 'test_connector_topic'}}]
Get Connector’s Task Status¶
Parameters for the GetStatusTask method
Parameter Name | Description | Requried | Type |
---|---|---|---|
cluster | Connect cluster | Yes | String |
connector | Connector’s name | Yes | String |
task_id | Connector’s task ID | Yes | Int. |
result = lenses_lib.GetStatusTask('dev', 'test_connector', '0')
print(result)
{'id': 0, 'state': 'RUNNING', 'worker_id': '10.15.3.1:8083'}
Restart a Task¶
Parameters for the RestartConnectorTask method
Parameter Name | Description | Requried | Type |
---|---|---|---|
cluster | Connect cluster | Yes | String |
connector | Connector’s name | Yes | String |
task_id | Connector’s task ID | Yes | Int. |
result = lenses_lib.RestartConnectorTask('dev', 'test_connector', '0')
Get Connector’s plugins¶
Parameters for the GetConnectorPlugins method
Parameter Name | Description | Requried | Type |
---|---|---|---|
cluster | Connect cluster | Yes | String |
result = lenses_lib.GetConnectorPlugins('dev')
print(result)
[{'author': 'Lenses.io',
'class': 'com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector',
'description': 'Store Kafka data into InfluxDB',
...
'version': '2.2.1-L0'},
{'author': 'Apache Kafka',
...
'version': '2.2.1-L0'}]
Pause a Connector¶
Parameters for the PauseConnector method
Parameter Name | Description | Requried | Type |
---|---|---|---|
cluster | Connect cluster | Yes | String |
result = lenses_lib.PauseConnector('dev', 'test_connector')
Resume a Connector¶
Parameters for the ResumeConnector method
Parameter Name | Description | Requried | Type |
---|---|---|---|
cluster | Connect cluster | Yes | String |
result = lenses_lib.ResumeConnector('dev', 'test_connector')
Restart a Connector¶
Parameters for the RestartConnector method
Parameter Name | Description | Requried | Type |
---|---|---|---|
cluster | Connect cluster | Yes | String |
result = lenses_lib.RestartConnector('dev', 'test_connector')
Update Connector’s configuration¶
Parameters for the SetConnectorConfig method
Parameter Name | Description | Requried | Type |
---|---|---|---|
cluster | Connect cluster | Yes | String |
connector | Connector’s name | Yes | String |
configs | Connector’s config dict | Yes | Json |
config = {
"name": "test_connector",
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max": "5",
"topic": "test_connector_topic"
}
result = lenses_lib.SetConnectorConfig('dev', 'test_connector', config)
print(result)
{
"name":"test_connector",
"config":{
"name":"test_connector","connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max":"5","topic":"test_connector_topic"
},"tasks":[
{
"connector":"test_connector","task":0
}
],
"type":"source"
}
Delete a Connector¶
Parameters for the DeleteConnector method
Parameter Name | Description | Requried | Type |
---|---|---|---|
cluster | Connect cluster | Yes | String |
connector | Connector’s name | Yes | String |
result = lenses_lib.DeleteConnector('dev', 'test_connector')
Kafka Consumers¶
Examples with methods used to get Kafka Consumers
Get all kafka Consumers¶
result = lenses_lib.GetConsumers()
print(result)
[{'active': False,
'application': None,
'consumers': [],
'consumersCount': 0,
'coordinator': {'host': '-', 'id': -1, 'port': -1, 'rack': ''},
'id': 'connect-fast-data',
'maxLag': None,
'minLag': None,
'state': 'CoordinatorNotFound',
...
'topicPartitionsCount': 8}]
List all Consumer names¶
result = lenses_lib.GetConsumersNames()
print(result)
['connect-fast-data',
'lsql_dc343aa2b9a6441ab2f2e2143771abd7',
'connect-nullsink',
'schema-registry',
'UNKNOWN']
SQL Engine¶
Examples with methods used to execude sql code
Create a Topic via SQL Engine¶
Parameters for the ExecSQL method
Parameter Name | Description | Requried | Type |
---|---|---|---|
query | SQL Query | Yes | String |
query = (
"CREATE TABLE greetings(_key string, _value string) FORMAT (string, string)"
)
result = lenses_lib.ExecSQL(query)
print(result)
{
'data': [
{
'value': {
'flag': True,
'info': 'Topic greetings has been created'
...
...
}
Insert records into a Topic¶
query = (
"INSERT INTO greetings(_key, _value) VALUES('Hello', 'World')"
)
result = lenses_lib.ExecSQL(query)
print(result)
{
'data': [{'value': {'flag': True, 'info': '1 records inserted'}, 'rownum': 0}],
...
}
Query a Topic¶
query = (
"SELECT * FROM greetings limit 1"
)
result = lenses_lib.ExecSQL(query)
print(result)
{
'ERROR': [],
'data': [{'key': 'Hello',
'metadata': {'__keysize': 5,
'__valsize': 5,
'offset': 0,
'partition': 0,
'timestamp': 1579540609297},
'rownum': 0,
'value': 'World'}],
'metadata': {'fields': ['offset',
'partition',
...
...
}
Delete a Topic via SQL¶
query = (
"DROP TABLE greetings"
)
result = lenses_lib.ExecSQL(query)
print(result)
{'ERROR': [],
'data': [{'rownum': 0, 'value': True}],
...
}
SQL Processors¶
Examples with methods used to manage SQL Processors
Create a Processor¶
Parameters for the CreateProcessor method
Parameter Name | Description | Requried | Type |
---|---|---|---|
name | Processors name | Yes | String |
sql | SQL Query | Yes | String |
runners | SQL Processor’s runners | Yes | String |
clusterName | Cluster’s name | Yes | String |
namespace | K8 Namespace | No | String |
pipeline | SQL Pipeline tag | No | String |
query = (
"SET autocreate=true; insert into test_processor_target SELECT * FROM test_processor_source"
)
result = lenses_lib.CreateProcessor("test_processor1", query, 1, 'dev', 'ns', '1')
print(result)
lsql_fa101b766ec04586b156a1d7f725f771
Pause a Processor¶
Parameters for the PauseProcessor method
Parameter Name | Description | Requried | Type |
---|---|---|---|
processorName | Processors name / ID | Yes | String |
First get the processor’s ID
processor_id = lenses_lib.GetProcessorID('test_processor')
print(processor_id)
['lsql_fa101b766ec04586b156a1d7f725f771']
Next use the PauseProcessor method to pause the processor
result = lenses_lib.PauseProcessor(processor_id[0])
print(result)
'OK'
Resume a processor¶
Parameters for the ResumeProcessor method
Parameter Name | Description | Requried | Type |
---|---|---|---|
processorName | Processors name / ID | Yes | String |
processor_id = lenses_lib.GetProcessorID('test_processor')
result = lenses_lib.ResumeProcessor(processor_id[0])
Update Processor’s Runners¶
Parameters for the UpdateProcessorRunners method
Parameter Name | Description | Requried | Type |
---|---|---|---|
processorName | Processors name / ID | Yes | String |
numberOfRunners | Number of runners | Yes | Int./String |
processor_id = lenses_lib.GetProcessorID('test_processor')
lenses_lib.UpdateProcessorRunners(processor_id[0], '4')
Delete a Processor¶
Parameters for the DeleteProcessor method
Parameter Name | Description | Requried | Type |
---|---|---|---|
processorName | Processors name / ID | Yes | String |
processor_id = lenses_lib.GetProcessorID('test_processor')
result = lenses_lib.DeleteProcessor(processor_id[0])
Data Policy¶
Examples with methods used to manage data policies
Set a Policy¶
Parameters for the SetPolicy method
Parameter Name | Description | Requried | Type |
---|---|---|---|
name | Policy name | Yes | String |
obfuscation | Whether to protect messages at a field level | Yes | String |
impactType | The business impact levels in relation to the data | Yes | String |
category | Category of sensitivity in the data | Yes | String/List |
fields | Definition of fields that the data policy will apply to | YeS | String/List |
result = lenses_lib.SetPolicy("test_policy","All","HIGH","test_category",["test_field"])
print(result)
'c844ecd4-7cbd-4ec3-82f0-f3750a692efd'
View Policies¶
policies = lenses_lib.ViewPolicy()
print(policies)
[{'category': 'test_category',
'fields': ['test_field'],
'id': 'c844ecd4-7cbd-4ec3-82f0-f3750a692efd',
'impact': {'apps': [], 'connectors': [], 'processors': [], 'topics': []},
'impactType': 'HIGH',
'lastUpdated': '2020-01-20T17:27:49.368Z',
'lastUpdatedUser': 'devops@DEV.LOCAL',
'name': 'test_policy',
'obfuscation': 'All',
'versions': 0}]
Delete a Policy¶
Parameters for the DelPolicy method
Parameter Name | Description | Requried | Type |
---|---|---|---|
name | Policy name | Yes | String |
lenses_lib.DelPolicy("test_policy")
Kafka Quotas¶
Examples with methods used to manage Kafka Quotas
Get All Quotas¶
lenses_lib.GetQuotas()
Set Quotas All Users¶
Parameters for the SetQuotasAllUsers method
Parameter Name | Description | Requried | Type |
---|---|---|---|
config | Quota Configuration Dict | Yes | Json |
QUOTA_CONFIG = {
"producer_byte_rate": "100000",
"consumer_byte_rate": "200000",
"request_percentage": "75"
}
lenses_lib.SetQuotasAllUsers(QUOTA_CONFIG)
Set User Quota for a Client¶
Parameters for the SetQuotaUserClient method
Parameter Name | Description | Requried | Type |
---|---|---|---|
config | Quota Configuration Dict | Yes | Json |
user | The user to set the quota for | Yes | String |
clientid | The client id to set the quota for | Yes | String |
lenses_lib.SetQuotaUserClient('admin', 'admin', QUOTA_CONFIG)
Set Quota for User¶
Parameters for the SetQuotaUser method
Parameter Name | Description | Requried | Type |
---|---|---|---|
config | Quota Configuration Dict | Yes | Json |
user | The user to set the quota for | Yes | String |
lenses_lib.SetQuotaUser("admin", QUOTA_CONFIG)
Set Quota for all Clients¶
Parameters for the SetQuotaAllClient method
Parameter Name | Description | Requried | Type |
---|---|---|---|
config | Quota Configuration Dict | Yes | Json |
lenses_lib.SetQuotaAllClient(QUOTA_CONFIG)
Set Quota for a Client¶
Parameters for the SetQuotaClient method
Parameter Name | Description | Requried | Type |
---|---|---|---|
config | Quota Configuration Dict | Yes | Json |
clientid | The client id to set the quota for | Yes | String |
lenses_lib.SetQuotaClient("admin", QUOTA_CONFIG)
Delete Quota for all Users¶
Parameters for the DeleteQutaAllUsers method
Parameter Name | Description | Requried | Type |
---|---|---|---|
config | Quota Configuration Dict | Yes | Json |
config = ['consumer_byte_rate', 'producer_byte_rate', 'request_percentage']
lenses_lib.DeleteQutaAllUsers(config)
Delete User Quota for all Clients¶
Parameters for the DeleteQuotaUserAllClients method
Parameter Name | Description | Requried | Type |
---|---|---|---|
config | Quota Configuration Dict | Yes | Json |
user | The user to delete the quota for | Yes | String |
config = ['consumer_byte_rate', 'producer_byte_rate', 'request_percentage']
lenses_lib.DeleteQuotaUserAllClients("admin", config)
Delete User Quota for a Client¶
Parameters for the DeleteQuotaUserClient method
Parameter Name | Description | Requried | Type |
---|---|---|---|
config | Quota Configuration Dict | Yes | Json |
user | The user to delete the quota for | Yes | String |
clientid | The client id to delete the quota for | Yes | String |
config = ['consumer_byte_rate', 'producer_byte_rate', 'request_percentage']
lenses_lib.DeleteQuotaUserClient("admin", "admin", config)
Delete Quota for User¶
Parameters for the DeleteQuotaUser method
Parameter Name | Description | Requried | Type |
---|---|---|---|
config | Quota Configuration Dict | Yes | Json |
user | The user to delete the quota for | Yes | String |
config = ['consumer_byte_rate', 'producer_byte_rate', 'request_percentage']
lenses_lib.DeleteQuotaUser("admin", config)
Delete Quota for all Clients¶
Parameters for the DeleteQuotaAllClients method
Parameter Name | Description | Requried | Type |
---|---|---|---|
config | Quota Configuration Dict | Yes | Json |
config = ['consumer_byte_rate', 'producer_byte_rate', 'request_percentage']
lenses_lib.DeleteQuotaAllClients(config)
Delete Quota for a Client¶
Parameters for the DeleteQuotaClient method
Parameter Name | Description | Requried | Type |
---|---|---|---|
config | Quota Configuration Dict | Yes | Json |
clientid | The client id to delete the quota for | Yes | String |
config = ['consumer_byte_rate', 'producer_byte_rate', 'request_percentage']
lenses_lib.DeleteQuotaClient('admin', config)
Lenses Admin¶
Examples with methods used to manage the Lenses Admin Interface
Create a Group¶
Parameters for the CreateGroup method
Parameter Name | Description | Requried | Type |
---|---|---|---|
payload | Group configuration | Yes | Json |
group_payload = {
"name":"test_group",
"description":"test_description",
"scopedPermissions":[
"ViewKafkaConsumers",
"ManageKafkaConsumers",
"ViewConnectors",
"ManageConnectors",
"ViewSQLProcessors",
"ManageSQLProcessors",
"ViewSchemaRegistry",
"ManageSchemaRegistry",
"ViewTopology",
"ManageTopology"
],
"adminPermissions":[
"ViewDataPolicies",
"ManageDataPolicies",
"ViewAuditLogs",
"ViewUsers",
"ManageUsers",
"ViewAlertRules",
"ManageAlertRules",
"ViewKafkaSettings",
"ManageKafkaSettings",
"ViewLogs"
],
"namespaces":[
{
"wildcards":["*"],
"permissions":[
"CreateTopic",
"DropTopic",
"ConfigureTopic",
"QueryTopic",
"ShowTopic",
"ViewSchema",
"InsertData",
"DeleteData",
"UpdateSchema"
],"system":"Kafka","instance":"Dev"
}
]
}
lenses_lib.CreateGroup(group_payload)
View Groups¶
result = lenses_lib.GetGroups()
print(result)
[{'name': 'devops',
'description': None,
'namespaces': [{'wildcards': ['*'],
'permissions': ['CreateTopic',
...
...
]
Update a Group¶
Parameters for the UpdateGroup method
Parameter Name | Description | Requried | Type |
---|---|---|---|
payload | Group configuration | Yes | Json |
group | Group’s name | Yes | String |
group_payload = {
"name":"test_group","description":"test_description_updated","namespaces":[
{
"wildcards":["*"],
"permissions":[
"CreateTopic","DropTopic","ConfigureTopic","QueryTopic","ShowTopic",
"ViewSchema","InsertData","DeleteData","UpdateSchema"
],
"system":"Kafka","instance":"Dev"
}
],
"scopedPermissions":[
"ViewKafkaConsumers","ManageKafkaConsumers","ViewConnectors","ManageConnectors",
"ViewSQLProcessors","ManageSQLProcessors","ViewSchemaRegistry","ManageSchemaRegistry",
"ViewTopology","ManageTopology"
],
"adminPermissions":[
"ViewDataPolicies","ManageDataPolicies","ViewAuditLogs","ViewUsers","ManageUsers",
"ViewAlertRules","ManageAlertRules","ViewKafkaSettings","ManageKafkaSettings",
"ViewLogs"
],
"userAccounts":0,"serviceAccounts":0
}
result = lenses_lib.UpdateGroup("test_group", group_payload)
Delete a Group¶
Parameters for the DeleteGroup method
Parameter Name | Description | Requried | Type |
---|---|---|---|
group | Group’s name | Yes | String |
lenses_lib.DeleteGroup("test_group")
Create a User¶
Parameters for the CreateUser method
Parameter Name | Description | Requried | Type |
---|---|---|---|
acType | Accont Type: BASIC or KERBEROS | Yes | String |
username | Username | Yes | String |
password | Password | Yes | String |
Email address | No | String | |
groups | Lenses Groups | Yes | String/List |
Note: You must create a group prior to creating a user
lenses_lib.CreateUser(
acType="BASIC",
username="test_user",
password="test_user",
email=None,
groups="test_group_user"
)
Get all Users¶
result = lenses_lib.GetUsers()
print(result)
[{'username': 'test_user',
'email': None,
'groups': ['test_group_user'],
'isActive': False,
'type': 'BASIC'},
{'username': 'devops@DEV.LOCAL',
'email': None,
'groups': ['devops'],
'isActive': True,
'type': 'KERBEROS'}]
Update a User¶
Parameters for the UpdateUser method
Parameter Name | Description | Requried | Type |
---|---|---|---|
acType | Accont Type: BASIC or KERBEROS | Yes | String |
username | Username | Yes | String |
password | Password | Yes | String |
Email address | No | String | |
groups | Lenses Groups | Yes | String/List |
result = lenses_lib.UpdateUser(
acType="BASIC",
username="test_user",
password="test_user",
email="test_updated@localhost.localdomain",
groups="test_group_user"
)
Change User’s Password¶
Parameters for the UpdateUserPassword method
Parameter Name | Description | Requried | Type |
---|---|---|---|
username | Username | Yes | String |
password | Password | Yes | String |
result = lenses_lib.UpdateUserPassword(
username="test_user",
password="test_user_updated"
)
Delete a User¶
Parameters for the DeleteUser method
Parameter Name | Description | Requried | Type |
---|---|---|---|
username | Username | Yes | String |
result = lenses_lib.DeleteUser(
username="test_user",
)
Create a Service Account¶
Parameters for the CreateSA method
Parameter Name | Description | Requried | Type |
---|---|---|---|
name | Service Account Nam | Yes | String |
groups | Group or Groups | Yes | String/List |
owner | Owner of service account | Yes | String |
token | Token (Do not set for a random token) | No | String |
Note: You must first create a group and a user before creating a service account
result = lenses_lib.CreateSA(
name="test_sa",
groups="test_group_user",
owner="test_user",
token="test_sa_token"
)
Update a Service Account¶
Parameters for the UpdateSA method
Parameter Name | Description | Requried | Type |
---|---|---|---|
name | Service Account Nam | Yes | String |
groups | Group or Groups | Yes | String/List |
owner | Owner of service account | Yes | String |
result = lenses_lib.UpdateSA(
name="test_sa",
groups=["test_group_sa", "test_group_user"],
owner="test_user",
)
Add a new Token to a Service Account¶
Parameters for the UpdateSAToken method
Parameter Name | Description | Requried | Type |
---|---|---|---|
name | Service Account Nam | Yes | String |
token | Token (Do not set for a random token) | No | String |
Note: Adding a new token, automatically expires the old one
result = lenses_lib.UpdateSAToken(
name="test_sa",
token="test_sa_token_updated",
)
Add a new Random Token to a Service Account¶
Parameters for the UpdateSAToken method
Parameter Name | Description | Requried | Type |
---|---|---|---|
name | Service Account Nam | Yes | String |
Note: Adding a new token, automatically expires the old one
result = lenses_lib.UpdateSAToken(name="test_sa",)
Delete a Service Account¶
Parameters for the DeleteSA method
Parameter Name | Description | Requried | Type |
---|---|---|---|
name | Service Account Nam | Yes | String |
result = lenses_lib.DeleteSA(name="test_sa")
Get Lenses Configuration¶
result = lenses_lib.GetConfig()
print(result)
{'lenses.ip': '0.0.0.0',
'lenses.jmx.port': 9586,
...
'lenses.zookeeper.hosts': [{'jmx': '0.0.0.0:9585', 'url': '0.0.0.0:2181'}]}
Get Lenses Audits¶
result = lenses_lib.Audits()
print(result[0])
{'action': 'ADD',
'content': {'name': 'test_user',
'groups': 'test_group_user',
'password': '*****',
'type': 'BASIC'},
'resourceId': 'test_user',
'resourceName': None,
'timestamp': 1579543219695,
'type': 'USER_MANAGEMENT_USER',
'user': 'devops@DEV.LOCAL'}
Get Lenses Alerts¶
result = lenses_lib.Alerts()
print(result[0])
{'alertId': 4001,
'category': 'Topics',
'docs': None,
'instance': 'test_topic',
'level': 'INFO',
'map': {'topic': 'test_topic'},
'summary': "Topic 'test_topic' has been deleted by admin",
'tags': [],
'timestamp': 1579542199932}
Get Lenses Logs (Java Logs)¶
result = lenses_lib.GetLogs()
print(result)
[
...
{'level': 'INFO',
'logger': 'akka.actor.ActorSystemImpl',
'message': 'Request: GET->http://localhost:9991/api/audit?pageSize=999999999 '
'returned 200 OK in 43ms',
'stacktrace': '',
'thread': 'default-akka.actor.default-dispatcher-137',
'time': '2020-01-20 18:11:02.924',
'timestamp': 1579543862924},
{'level': 'INFO',
'logger': 'akka.actor.ActorSystemImpl',
'message': 'Request: GET->http://localhost:9991/api/audit?pageSize=999999999 '
'returned 200 OK in 12ms',
'stacktrace': '',
'thread': 'default-akka.actor.default-dispatcher-2',
'time': '2020-01-20 18:11:07.800',
'timestamp': 1579543867800},
{'level': 'INFO',
'logger': 'akka.actor.ActorSystemImpl',
'message': 'Request: '
'GET->http://localhost:9991/api/alerts?pageSize=999999999 '
'returned 200 OK in 15ms',
'stacktrace': '',
'thread': 'default-akka.actor.default-dispatcher-13',
'time': '2020-01-20 18:12:06.479',
'timestamp': 1579543926479},
...
]
Registry¶
Examples with methods used to manage Schemas
Get all Schemas¶
result = lenses_lib.GetAllSubjects()
['fast_vessel_processor-value',
'telecom_italia_data-key',
...
'logs_broker-value']
Register a new Schema¶
Parameters for the RegisterNewSchema method
Parameter Name | Description | Requried | Type |
---|---|---|---|
subject | Schemas Name | Yes | String |
subject_json | Schema | Yes | Json |
SCHEMA_CONFIG = {
'schema':
'{"type":"record","name":"reddit_post_key",'
'"namespace":"com.landoop.social.reddit.post.key",'
'"fields":[{"name":"testit_id","type":"string"}]}'
}
COMPATIBILITY_CONFIG = {'compatibility': 'BACKWARD'}
COMPATIBILITY_CONFIG_UPDATE = {'compatibility': 'FULL'}
result = lenses_lib.RegisterNewSchema('test_schema', SCHEMA_CONFIG).keys()
List Versions of a Schema¶
Parameters for the ListVersionsSubj method
Parameter Name | Description | Requried | Type |
---|---|---|---|
subject | Schemas Name | Yes | String |
result = lenses_lib.ListVersionsSubj('test_schema')
print(result)
[1]
Get Schema by ID¶
Parameters for the GetSchemaById method
Parameter Name | Description | Requried | Type |
---|---|---|---|
subjid | Schemas Name | Yes | Int. |
lenses_lib.GetSchemaById(schema_id)
Get Global Compatibility¶
result = lenses_lib.GetGlobalCompatibility()
Update Global Compatibility¶
Parameters for the UpdateGlobalCompatibility method
Parameter Name | Description | Requried | Type |
---|---|---|---|
compatibility | Schema compatibility | Yes | Json |
Note: See under register new schema for the COMPATIBILITY_CONFIG_UPDATE
lenses_lib.UpdateGlobalCompatibility(COMPATIBILITY_CONFIG_UPDATE)
Change Compatibility of a Schema¶
Parameters for the ChangeCompatibility method
Parameter Name | Description | Requried | Type |
---|---|---|---|
subject | Schema’s name | Yes | String |
compatibility | Schema compatibility | Yes | Json |
lenses_lib.ChangeCompatibility('test_schema', COMPATIBILITY_CONFIG_UPDATE)
Get Compatibility of a Schema¶
Parameters for the GetCompatibility method
Parameter Name | Description | Requried | Type |
---|---|---|---|
subject | Schema’s name | Yes | String |
lenses_lib.GetCompatibility('test_schema')
Update a Schema (If compatible)¶
Parameters for the UpdateSchema method
Parameter Name | Description | Requried | Type |
---|---|---|---|
subject | Schemas Name | Yes | String |
subject_json | Schema | Yes | Json |
SCHEMA_CONFIG_UPDATE = {
'schema':
'{"type":"record","name":"reddit_post_key",'
'"namespace":"com.landoop.social.reddit.post.key",'
'"fields":[{"name":"testit_id","type":"string","doc":"desc."}]}'
}
lenses_lib.UpdateSchema('test_schema', SCHEMA_CONFIG_UPDATE)
Get a certain version of a Schema¶
Parameters for the GetSchemaByVer method
Parameter Name | Description | Requried | Type |
---|---|---|---|
subject | Schemas Name | Yes | String |
verid | Version of Schema | Yes | Json |
lenses_lib.GetSchemaByVer('test_schema', subj_ver)
Delete a Schema version¶
Parameters for the DeleteSchemaByVersion method
Parameter Name | Description | Requried | Type |
---|---|---|---|
subject | Schemas Name | Yes | String |
verid | Version of Schema | Yes | Json |
lenses_lib.DeleteSchemaByVersion("test_schema", subj_ver)
Delete a Schema (all versions)¶
Parameters for the DeleteSubj method
Parameter Name | Description | Requried | Type |
---|---|---|---|
subject | Schemas Name | Yes | String |
lenses_lib.DeleteSubj("test_schema")
Virtual Environment (Recommended)¶
Install virtualenv and virtualenvwrapper:¶
pip3.5 install virtualenv virtualenvwrapper
cat << EOF >> ~/.bashrc
export WORKON_HOME=$HOME/VirtEnv/.virtualenvs
export VIRTUALENVWRAPPER_PYTHON=/usr/bin/python3.5
export VIRTUALENVWRAPPER_VIRTUALENV_ARGS=' -p /usr/bin/python3.5'
export PROJECT_HOME=$HOME/VirtEnv
source /usr/bin/virtualenvwrapper.sh
source ~/.bashrc
# All virtualenvs and their packages will be installed under ``~/VirtEnv/.virtualenvs``
ls ~/VirtEnv/.virtualenvs
get_env_details initialize machinelab postactivate ...
Create a python virtual environment:¶
mkvirtualenv myvirtenv
Activate the virtualenv (activated by default after creation):¶
# To activate just run workon myvirtenv
[user@hostname]$ workon myvirtenv
(myvirtenv)[user@hostname]$
Exiting virtualenv:¶
(myvirtenv)[user@hostname]$ deactivate
[user@hostname]$
Remove installed virtualenv along with the relevant packages:¶
rmvirtualenv myvirtenv
Integration Tests¶
Requirements¶
Will be handled automatically
Must be installed manually
Storage Requirements
Memory Requirements
Integration tests will run Lenses-Box, which requires ~4G of memory.
Start Lenses Box¶
Note: To run the docker images. First download a valid license to your host and copy it under repository/_resources/lenses-kerberos/license.json. You can find a dev license key [here](https://www.lenses.io/downloads/)
make docker
Run tests¶
Note: Run tests require a running container instance of lenses-box with container name lenses-box. Apart from the container, they also require: LensesUrl=”http://localhost:3030”, username=”admin”, password=”admin”. If you do not have such an instance running, make sure to follow the make docker target from above.
make test
Clean Dockers¶
To stop and remove the dockers containers that make docker target initiated, issue:
make docker_clean
TroubleShooting¶
For troubleshooting or additional information please join our slack channel.