4.0
Working with topics
Get Topics List
To get a list with all kafka topics, issue
topicsList = lenses_lib.LstOfTopicsNames()
Get topic descriptions
kafkaTopics = lenses_lib.GetAllTopics()
Details for a topic
topicInfo = lenses_lib.TopicInfo('connect-configs')
Create a topic
| Parameter Name | Description | Required | Type | 
|---|---|---|---|
| topicName | Topic’s Name | Yes | String | 
| partitions | Topic’s partitions | Yes | Int | 
| replication | Topic’s replication number | Yes | Int | 
| config | Dict with Topic options | No | Json | 
To create a topic, first create a dictionary with the options below
config = {
    "cleanup.policy": "compact",
    "compression.type": "snappy"
}
Then issue the CreateTopic method in order to create the topic
result = lenses_lib.CreateTopic(
    name="test_topic",
    partitions=1,
    replication=1,
    config=config
)
Update topic configuration
| Parameter Name | Description | Required | Type | 
|---|---|---|---|
| topicname | Topic’s Name | Yes | String | 
| data_params | Dict with Topic options | Yes | Json | 
Create the configuration with the desired options:
config = {"configs": [{"key": "cleanup.policy", "value": "compact"}]}
Use the UpdateTopicConfig method to update the topic’s configuration
result = lenses_lib.UpdateTopicConfig('test_topic', config)
Publish data to a topic
| Parameter Name | Description | Required | Type | 
|---|---|---|---|
| name | Topic’s Name | Yes | String | 
| key | Record’s key | Yes | String/Json | 
| value | Record’s value | Yes | String/Json | 
| clientId | Client’s ID | Yes | String | 
Publishing data to a topic is as easy as.
result = lenses_lib.Publish(
    "test_topic",
    "test_key",
    "{'value':1}"
)
Subscribe data to a topic
| Parameter Name | Description | Required | Type | 
|---|---|---|---|
| dataFunc | Custom function to work with data | Yes | String | 
| query | SQL Query | Yes | String/Json | 
| clientId | Client’s ID | No | String | 
Note: First define a custom function to work with your data. A custom function is needed because the query is continuous.
def print_data(message):
    print(message)
Subscribing to a topic is as easy as
lenses_lib.Subscribe(print_data, "select * from my_topic")
{'content': 'test_topic', 'correlationId': 1, 'type': 'SUCCESS'}
{'key': '8387236824701691257', 'offset': 0, 'partition': 0, 'timestamp': 1580157301897, 'topic': 'test_topic', 'value': "{'value':1}"}
Delete records from a topic
Parameter Name Description Requried Type name Topic’s Name Yes String partition Topic’s Partition Yes Int. offset End offset Yes Int. Records can be deleted by providing a range of offsets
result = lenses_lib.DeleteTopicRecords('test_topic', "0", "10")
Delete a topic
Parameter Name Description Requried Type topicname Topic’s Name Yes String
Delete a topic called test_topic by using the DeleteTopic method
result = lenses_lib.DeleteTopic("test_topic")
