5.0

Working with topics

Get Topics List 

To get a list with all kafka topics, issue

topicsList = lenses_lib.LstOfTopicsNames()

Get topic descriptions 

kafkaTopics = lenses_lib.GetAllTopics()

Details for a topic 

topicInfo = lenses_lib.TopicInfo('connect-configs')

Create a topic 


Parameter NameDescriptionRequiredType
topicNameTopic’s NameYesString
partitionsTopic’s partitionsYesInt
replicationTopic’s replication numberYesInt
configDict with Topic optionsNoJson

To create a topic, first create a dictionary with the options below

config = {
    "cleanup.policy": "compact",
    "compression.type": "snappy"
}

Then issue the CreateTopic method in order to create the topic

result = lenses_lib.CreateTopic(
    name="test_topic",
    partitions=1,
    replication=1,
    config=config
)

Update topic configuration 


Parameter NameDescriptionRequiredType
topicnameTopic’s NameYesString
data_paramsDict with Topic optionsYesJson

Create the configuration with the desired options:

config = {"configs": [{"key": "cleanup.policy", "value": "compact"}]}

Use the UpdateTopicConfig method to update the topic’s configuration

result = lenses_lib.UpdateTopicConfig('test_topic', config)

Publish data to a topic 


Parameter NameDescriptionRequiredType
nameTopic’s NameYesString
keyRecord’s keyYesString/Json
valueRecord’s valueYesString/Json
clientIdClient’s IDYesString

Publishing data to a topic is as easy as.


result = lenses_lib.Publish(
    "test_topic",
    "test_key",
    "{'value':1}"
)

Subscribe data to a topic 


Parameter NameDescriptionRequiredType
dataFuncCustom function to work with dataYesString
querySQL QueryYesString/Json
clientIdClient’s IDNoString

Note: First define a custom function to work with your data. A custom function is needed because the query is continuous.

def print_data(message):
    print(message)

Subscribing to a topic is as easy as

lenses_lib.Subscribe(print_data, "select * from my_topic")
{'content': 'test_topic', 'correlationId': 1, 'type': 'SUCCESS'}
{'key': '8387236824701691257', 'offset': 0, 'partition': 0, 'timestamp': 1580157301897, 'topic': 'test_topic', 'value': "{'value':1}"}

Delete records from a topic 

Parameter Name Description Requried Type name Topic’s Name Yes String partition Topic’s Partition Yes Int. offset End offset Yes Int. Records can be deleted by providing a range of offsets

result = lenses_lib.DeleteTopicRecords('test_topic', "0", "10")

Delete a topic 

Parameter Name Description Requried Type topicname Topic’s Name Yes String

Delete a topic called test_topic by using the DeleteTopic method

result = lenses_lib.DeleteTopic("test_topic")