5.0
Working with topics
Get Topics List
To get a list with all kafka topics, issue
topicsList = lenses_lib.LstOfTopicsNames()
Get topic descriptions
kafkaTopics = lenses_lib.GetAllTopics()
Details for a topic
topicInfo = lenses_lib.TopicInfo('connect-configs')
Create a topic
Parameter Name | Description | Required | Type |
---|---|---|---|
topicName | Topic’s Name | Yes | String |
partitions | Topic’s partitions | Yes | Int |
replication | Topic’s replication number | Yes | Int |
config | Dict with Topic options | No | Json |
To create a topic, first create a dictionary with the options below
config = {
"cleanup.policy": "compact",
"compression.type": "snappy"
}
Then issue the CreateTopic method in order to create the topic
result = lenses_lib.CreateTopic(
name="test_topic",
partitions=1,
replication=1,
config=config
)
Update topic configuration
Parameter Name | Description | Required | Type |
---|---|---|---|
topicname | Topic’s Name | Yes | String |
data_params | Dict with Topic options | Yes | Json |
Create the configuration with the desired options:
config = {"configs": [{"key": "cleanup.policy", "value": "compact"}]}
Use the UpdateTopicConfig method to update the topic’s configuration
result = lenses_lib.UpdateTopicConfig('test_topic', config)
Publish data to a topic
Parameter Name | Description | Required | Type |
---|---|---|---|
name | Topic’s Name | Yes | String |
key | Record’s key | Yes | String/Json |
value | Record’s value | Yes | String/Json |
clientId | Client’s ID | Yes | String |
Publishing data to a topic is as easy as.
result = lenses_lib.Publish(
"test_topic",
"test_key",
"{'value':1}"
)
Subscribe data to a topic
Parameter Name | Description | Required | Type |
---|---|---|---|
dataFunc | Custom function to work with data | Yes | String |
query | SQL Query | Yes | String/Json |
clientId | Client’s ID | No | String |
Note: First define a custom function to work with your data. A custom function is needed because the query is continuous.
def print_data(message):
print(message)
Subscribing to a topic is as easy as
lenses_lib.Subscribe(print_data, "select * from my_topic")
{'content': 'test_topic', 'correlationId': 1, 'type': 'SUCCESS'}
{'key': '8387236824701691257', 'offset': 0, 'partition': 0, 'timestamp': 1580157301897, 'topic': 'test_topic', 'value': "{'value':1}"}
Delete records from a topic
Parameter Name Description Requried Type name Topic’s Name Yes String partition Topic’s Partition Yes Int. offset End offset Yes Int. Records can be deleted by providing a range of offsets
result = lenses_lib.DeleteTopicRecords('test_topic', "0", "10")
Delete a topic
Parameter Name Description Requried Type topicname Topic’s Name Yes String
Delete a topic called test_topic by using the DeleteTopic method
result = lenses_lib.DeleteTopic("test_topic")