Clients / Python¶
The Lenses python-library
is a Python client enabling Python developers and data scientists to take advantage
of the Rest and WebSocket endpoints Lenses exposes. Users can:
- Manage topics
- Manage schemas
- Manage processors
- Manage connectors
- Browse topic data via SQL
- Subscribe to live continuous SQL queries via SQL
Installation¶
Dependencies¶
Install Pip3
curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
python3.5 get-pip.py
rm -f get-pip.py
Virtual Environment (Recommended)¶
Install virtualenv and virtualenvwrapper
pip3.5 install virtualenv virtualenvwrapper
cat << EOF >> ~/.bashrc
export WORKON_HOME=$HOME/VirtEnv/.virtualenvs
export VIRTUALENVWRAPPER_PYTHON=/usr/bin/python3.5
export VIRTUALENVWRAPPER_VIRTUALENV_ARGS=' -p /usr/bin/python3.5'
export PROJECT_HOME=$HOME/VirtEnv
source /usr/bin/virtualenvwrapper.sh
source ~/.bashrc
# All virtualenvs and their packages will be installed under ``~/VirtEnv/.virtualenvs``
ls ~/VirtEnv/.virtualenvs
get_env_details initialize machinelab postactivate ...
Create a python virtual environment:
mkvirtualenv myvirtenv
Activate the virtualenv (activated by default after creation):
# To activate just run workon myvirtenv
[user@hostname]$ workon myvirtenv
(myvirtenv)[user@hostname]$
Exiting virtualenv:
(myvirtenv)[user@hostname]$ deactivate
[user@hostname]$
Remove installed virtualenv along with the relevant packages:
rmvirtualenv myvirtenv
Install Lenses Python¶
Clone the Python repo and execute the following command inside the repo:
python3 setup.py install
You should have already install Python3 on your system:
Or just use the pip3
:
pip3 install lenses_python
Getting started¶
Basic authentication¶
First, we import the Python 3 client:
from lenses_python.lenses import lenses
data=lenses(<url>,<username>,<password>)
Next get the credentials and the roles for our account:
data.GetCredentials()
Example code:
data=lenses("http://127.0.0.1:3030","admin","admin")
data.GetCredentials()
{
'schemaRegistryDelete': True,
'success': True,
'token': 'c439f615-e511-4dfd-863b-76ad285b7572',
'user': {
'email': None,
'id':'admin',
'name': 'Lenses Admin',
'roles': ['admin', 'read', 'write', 'nodata']
}
}
Kerberos authentication¶
In this case we will connect with Kerberos. First of all you should install the kinit and set the configuration in path /etc/krb5.conf. Then you should run the kinit utility to create a Kerberos token, which will later be retrieved by the Python 3 library.
from lenses_python.lenses import lenses
data = lenses(url=<url>,kerberos_mode=1)
SQL Data Handlers¶
Using the Lenses SQL Engine we can utilize Lenses rest endpoints to execute SQL queries on Kafka topics.
This is achieved using the SqlHandler
.
data.SqlHandler(<query>, <optional argument is_extract_pandas>,<optional argument stats>, <optional datetimelist>, <optional formatinglist>)
If we do not supply the is_extract_pandas
parameter the output will be in JSON format, the value of this parameter is True
or False
. Otherwise, the output
will be pandas data frame. The stats
parameter is an integer. If we want the output as pandas data frames, we have the option to use the
two optional arguments datetimelist
and formatinglist
. With these arguments, we convert the datetime
strings to datetime objects.
datetimelist
- is a list which contains all the keys including datetime stringsformatinglist
- is a list which contains the date format for each element in the listdatetimelist
If all the formatting of dates is same, we put only one in formatinglist
.
For more info about the format check this page http://strftime.org/.
Example of use arguments datetimelist and formatinglist,
data.SqlHandler(
'SELECT * FROM `nyc_yellow_taxi_trip_data`',
['tpep_dropoff_datetime'],
['%Y-%m-%d %H:%M:%S'])
Managing Topics¶
Topics Information¶
To list topics:
data.TopicsNames()
To obtain the information for a specific topic and return a dictionary:
data.TopicInfo(<topic name>)
If we want to list all topic information, we execute the following command:
data.GetAllTopics()
The output of this is a list of JSON records, with all topics information.
Update topic configuration¶
data.UpdateTopicConfig(<topic name>, <optional configuration>, <optional filename>)
Example configuration:
{
"configs": [
{
"key": "cleanup.policy",
"value": "compact"
}
]
}
There are three options for setting the parameters:
- Topic name and configuration argument (second). This will update the topic with the provided configuration
- Topic name and configuration file (third). This will update the topic with the configuration from the file
- Provide only the configuration file with the topic name and config in the default section. This will load and update the topics specified in the configuration file. For example:
[Default]
topicname: my-topic
config:{
"configs": [
{
"key": "cleanup.policy",
"value": "compact"
}
]
}
Create new topic¶
data.CreateTopic(<topic name>, <replication>, <partitions>, <optional configuration>, <optional filename>)
Example of configuration:
{
"cleanup.policy": "compact",
"compression.type": "snappy"
}
There are three options for setting the parameters:
- Topic name and configuration argument (second). This will update the topic with the provided configuration
- Topic name and configuration file (third). This will update the topic with the configuration from the file
- Provide only the configuration file with the topic name and config in the default section. This will load and update the topics specified in the configuration file.
Example
config = {"cleanup.policy": "compact","compression.type": "snappy"}
data.CreateTopic("test-topic",1,1,config)
Delete topic¶
data.DeleteTopic(<topic name>)
Delete topic records¶
With this one we can delete specific records of given topic. We should give as input the topic, the partition and the offset. For example, if we want to delete the first 100 records of topicA and from partition 0, we should have something like this:
data.DeleteTopicRecords(topicA,"0","100")
Managing SQL Processors¶
Using Lenses we can deploy and manage SQL processors.
Create new Processor¶
data.CreateProcessor(<processor name>, <sql query>, <runners>, <cluster name>, <optional namespace>, <optional pipeline>)
The parameters are:
sql query
- The Lenses SQL to runrunners
- The number of runners to spawncluster name
- The cluster name, either the Connect cluster name or the Kubernetes cluster name. UseIN_PROC
optional namespace
- Kubernetes namespace, only applicable in Kubernetes modeoptional pipeline
- Kubernetes pipeline tag, only applicable in Kubernetes mode
Example
data.CreateProcessor("new-processor",
"SET autocreate=true;INSERT INTO topicB SELECT * FROM topicA",
1,
"dev",
"ns",
"pipeline")
On successfully registration Lenses will return an id for the processor. For example something like this lsql_818a158c50a24d71952652ab49e75637
Deleting a Processor¶
data.DeleteProcessor(<processor name>)
Resume a Processor¶
data.ResumeProcessor(<processor name>)
Pause a Processor¶
data.PauseProcessor(<processor name>)
Scale a Processor¶
Scaling a processor involves changing the number of runners, either threads for IN_PROC
mode, connect tasks
from CONNECT
mode or pods for KUBERNETES
mode.
data.UpdateProcessor(<processor name>, <number of runners>)
Managing Schemas¶
Best practice is to use AVRO as the message format for your data. Lenses supports schema registries from both Confluent and Hortonworks (compatibility mode 0).
List all Subjects¶
data.GetAllSubjects()
List Subject Versions¶
data.ListVersionsSubj(<name of subject>)
Get a Schema by ID¶
data.GetSchemaById(<subject\'s id>)
Register new Schema¶
data.RegisterNewSchema(<name of schema>, <optional schema>, <optional filename>)
There are three options for setting the parameters:
- Topic name and schema argument (second). This will register the topic with the provided schema.
- Topic name and schema file (third). This will register the topic with the schema from the file.
- Provide the schema file only. This will register the schema specified in the configuration file.
Example of schema configuration
{"schema": "{
"type":"record",
"name":"reddit_post_key",
"namespace":"com.landoop.social.reddit.post.key",
"fields":[
{
"name":"subreddit_id",
"type":"string"
}
]
}"
}
Example file configuration, no schema name:
[Default]
{"schema": "{
"type":"record",
"name":"reddit_post_key",
"namespace":"com.landoop.social.reddit.post.key",
"fields":[
{
"name":"subreddit_id",
"type":"string"
}
]
}"
}
Get Global Compatibility¶
data.GetGlobalCompatibility()
Get Compatibility of a Subject¶
data.GetCompatibility(<subject name>)
Delete specific Subjects¶
data.DeleteSubj(<name of subject>)
Delete Schema by Version¶
data.DeleteSchemaByVersion(<name of subject>, <version of subject>)
Change Compatibility of Subject¶
data.ChangeCompatibility(<name of subject>, <optional compatibility>, <optional filename>)
Example of compatibility
{'compatibility': 'BACKWARD'}
There are three options for setting the parameters:
- Subject name and compatibility (second). This will update the subject with the provided compatibility level.
- Subject name and compatibility file (third). This will update the schema with the compatibility from the file.
- Provide only the compatibility file. This will set the subject to the compatibility specified in the configuration file.
For example:
[Default]
compatibility:{"compatibility": "BACKWARD"}
Update Global Compatibility¶
This command updates the compatibility on the Schema Registry servers:
data.UpdateGlobalCompatibility(<optional compatibility>, <optional filename>)
Example
{'compatibility': 'BACKWARD'}
There are two options for setting the parameters:
1. Compatibility. This will update the schema registries with the provided compatibility level. 2 Provide only the compatibility file. This will set the schema registry to the compatibility specified in the configuration file.
For example:
[Default]
compatibility:{"compatibility": "BACKWARD"}
Managing Connectors¶
Lenses allows you manage Kafka Connect Connectors to load and unload data from Kafka.
List Connectors¶
data.ListAllConnectors(<name of cluster>)
Get Connector Information¶
This command retrieves the status plus configuration of connectors:
data.GetInfoConnector(<name of cluster>, <name of connector>)
Example
connectorsnames = data.ListAllConnectors("dev")
data.GetInfoConnector("dev",connectorsnames[0])
The output will be something like this:
{
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/var/log/broker.log",
"name": "logs-broker",
"tasks.max": "1",
"topic": "logs_broker"
},
"name": "logs-broker",
"tasks": [{"connector": "logs-broker", "task": 0}]
}
Get Connector Configuration¶
data.GetConnectorConfig(<name of cluster>, <name of connector>)
Example
connectors = data.ListAllConnectors("dev")
data.GetConnectorConfig("dev", connectors[0])
Example output:
{
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/var/log/broker.log",
"name": "logs-broker",
"tasks.max": "1",
"topic": "logs_broker"
}
Get Connector Status¶
data.GetConnectorStatus(<name of cluster>, <name of connector>)
Example
connectors = data.ListAllConnectors("dev")
data.GetConnectorStatus("dev",connectors[0])
Example output:
{
"connector" : { "state": "RUNNING", "worker_id": "172.17.0.3:8083" },
"name" : "logs-broker",
"tasks" : [{ "id": 0, "state": "RUNNING", "worker_id": "172.17.0.3:8083" }]
}
Get Connector Tasks¶
data.GetConnectorTasks(<name of cluster>, <name of connector>)
Example
connectors = data.ListAllConnectors("dev")
data.GetConnectorTasks("dev",connectors[0])
Example of output:
[
{
"config": {
"file": "/var/log/broker.log",
"task.class": "org.apache.kafka.connect.file.FileStreamSourceTask",
"topic": "logs_broker"
},
"id": {"connector": "logs-broker", "task": 0}
}
]
Get Status of specific Task¶
data.GetStatusTask(<name of cluster>, <name of connector>, <task id>)
Restart Connector Task¶
data.RestartConnectorTask(<name of cluster>, <name of connector>,<task id>)
Create new Connector¶
data.CreateConnector(<name of cluster>, <optional configuration>, <optional filename>)
Example configuration:
{
"config": {
"connect.coap.kcql": "1",
"connector.class": "com.datamountaineer.streamreactor.connect.coap.sink.CoapSinkConnector"
},
"name": "name"
}
There are three options for setting the parameters:
- Set the cluster, name of the connector and the config.
- Set the cluster, name of the connector and the file to load the connector config from.
- Set the configuration file name only, specifying the cluster and connector name in the default section.
[Default]
cluster: my-cluster
connector: my-connector
config: {
"config": {
"connect.coap.kcql": "1",
"connector.class": "com.datamountaineer.streamreactor.connect.coap.sink.coapsinkconnector"
},
"name": "name"
}
Set Connector Configuration¶
data.SetConnectorConfig(<name of cluster>, <name of connector>, <optional configuration>, <optional filename>)
Example configuration file:
{
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"task.max": 5,
"topics": "nyc_yellow_taxi_trip_data,reddit_posts,sea_vessel_position_reports,telecom_italia_data",
"file": "/dev/null",
"name": "nullsink"
}
There are three options for setting the parameters:
- Set the cluster, name of the connector and the config
- Set the cluster, name of the connector and the file to load the connector config from
- Set the configuration file name only, specifying the cluster and connector name in the default section.
[Default]
cluster: my-cluster
connector: my-connector
config: {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"task.max": 5,
"topics": "nyc_yellow_taxi_trip_data,reddit_posts,sea_vessel_position_reports, telecom_italia_data",
"file": "/dev/null",
"name": "nullsink"
}
Delete a Connector¶
To delete a connector, call the DeleteConnector
method, supplying the name of the cluster and the connector name.
data.DeleteConnector(<name of cluster>, <name of connector>)
Continuous Queries¶
Lenses allows clients to submit SQL and subscribe to the output of the query continuously via web sockets.
Subscribing¶
A client can SUBSCRIBE to a topic via SQL as follows:
data.SubscribeHandler(<url>, <client id>, <sql query>, <write>, <filename>, <print_results>, <optional datetimelist>, <optional formatinglist>)
Parameters:
url
- The Lenses WebSocket endpoint to subscribe toclient id
- A unique identifier of the clientsql
- The Lenses SQL query to runwrite
- A boolean parameter and is pre-defined as False. If defined as True, data is saved in a file. The file name is defined by filename parameterfilename
- Name of the file to store the output toprint_results
- Print the incoming data, default false.
The optional arguments datetimelist
and formatinglist
convert datetime strings to datetime objects.
With these arguments we convert the datetime strings to datetime objects:
datetimelist
- is a list which contains all the keys including datetime stringsformatinglist
- is a list which contains the date format for each element in the listdatetimelist
If all the formatting of dates is same, we put only one in formatinglist
.
For more info about the format check this page http://strftime.org/.
Publishing¶
A client can PUBLISH messages to a topic. The current version supports only string/json. In the future, we will add support for AVRO.
data.Publish(<url>, <client id>, <topic>, <key>, <value>)
Commit¶
A client can COMMIT the (topic, partition) offsets
data.Commit(<url>, <client id>, <topic>, <partition>, <offset>)
ACLs Handler¶
Create/Update ACLs¶
data.SetAcl(<resourceType>,<resourceName>,<principal>,<permissionType>,<host>, <operation>)
resourceType
, string, requiredresourceName
, string, requiredprincipal
, string, requiredpermissionType
, string, required(either Allow or Deny)host
, string, requiredoperation
, string, required
Example
data.SetAcl("Topic","transactions","GROUPA:UserA","Allow","*","Read")
The following operations are valid (depending on the Kafka version)
Resource Type | Operation |
---|---|
Topic | Read |
Topic | Write |
Topic | Describe |
Topic | Delete |
Topic | DescribeConfigs |
Topic | AlterConfigs |
Topic | All |
Group | Read |
Group | Describe |
Group | All |
Cluster | Create |
Cluster | ClusterAction |
Cluster | DescribeConfigs |
Cluster | AlterConfigs |
Cluster | IdempotentWrite |
Cluster | Alter |
Cluster | Describe |
Cluster | All |
TransactionalId | Describe |
TransactionalId | Write |
TransactionalId | All |
Quota Handler¶
Create/Update Quota - All Users¶
data.SetQuotasAllUsers(config)
config
The quota constraints.
Example of config
{
"producer_byte_rate" : "100000",
"consumer_byte_rate" : "200000",
"request_percentage" : "75"
}
Create/Update Quota - User all Clients¶
data.SetQuotaUserAllClients(user, config)
Where,
user
The user to set the quota forconfig
The quota constraints
Example of config
{
"producer_byte_rate" : "100000",
"consumer_byte_rate" : "200000",
"request_percentage" : "75"
}
Create/Update a Quota - User/Client pair¶
data.SetQuotaUserClient(user, clientid, config)
Where,
user
The user to set the quota forclientid
The client id to set the quota forconfig
The quota constraints
Example of config
{
"producer_byte_rate" : "100000",
"consumer_byte_rate" : "200000",
"request_percentage" : "75"
}
Create/Update a Quota - User¶
data.SetQuotaUser(user, config)
Where
user
The user to set the quota forconfig
The quota constraints
Example of config
{
"producer_byte_rate" : "100000",
"consumer_byte_rate" : "200000",
"request_percentage" : "75"
}
Create/Update Quota - All Clients¶
data.SetQuotaAllClient(config)
Where,
config
The quota constraints
Example of config,
{
"producer_byte_rate" : "100000",
"consumer_byte_rate" : "200000",
"request_percentage" : "75"
}
Create/Update a Quota - Client¶
data.SetQuotaClient(clientid, config)
clientid
The client id to set the quota forconfig
The quota constraints
Example of config,
{
"producer_byte_rate" : "100000",
"consumer_byte_rate" : "200000",
"request_percentage" : "75"
}
Delete Quota - All Users¶
data.DeleteQutaAllUsers(config)
config
The list of quota settings to delete.
For example,
config=["producer_byte_rate","consumer_byte_rate"]
Delete Quota - User all Clients¶
data. DeleteQuotaUserAllClients(user, config)
user
The user to set the quota forconfig
The list of quota settings to delete.
For example
config=["producer_byte_rate","consumer_byte_rate"]
Delete a Quota - User/Client pair¶
data.DeleteQuotaUserClient(user, clientid, config)
user
The user to set the quota forclientid
The client id to set the quota forconfig
The list of quota settings to delete.
For example
config=["producer_byte_rate","consumer_byte_rate"]
Delete a Quota - User¶
data.DeleteQuotaUser(user, config)
user
The user to set the quota forconfig
The list of quota settings to delete.
For example
config=["producer_byte_rate","consumer_byte_rate"]
Delete Quota - All Clients¶
data.DeleteQuotaAllClients(config)
config
The list of quota settings to delete.
For example
config=["producer_byte_rate","consumer_byte_rate"]
Delete a Quota - Client¶
data.DeleteQuotaClient(clientid, config)
clientid
The client id to set the quota forconfig
The list of quota settings to delete.
For example
config=["producer_byte_rate","consumer_byte_rate"]
TroubleShooting¶
For troubleshooting or additional information and join our slack channel