Kafka Connect sink connector for writing data from Kafka to Azure Service Bus.
Stream Reactor Azure Service Bus Sink Connector is designed to effortlessly translate Kafka records into your Azure Service Bus cluster. It leverages Microsoft Azure API to transfer data to Service Buses in a seamless manner, allowing for their safe transition and safekeeping both payloads and metadata (see Payload support). It supports both types of Service Buses: Queues and Topics. Azure Service Bus Source Connector provides its user with AT-LEAST-ONCE guarantee as the data is committed (marked as read) in Kafka topic (for assigned topic and partition) once Connector verifies it was successfully committed to designated Service Bus topic.
The following KCQL is supported:
INSERT INTO <your-service-bus> SELECT * FROM <your-kafka-topic> PROPERTIES(...);
It allows you to map Kafka topic of name <your-kafka-topic> to Service Bus of name <your-service-bus> using the PROPERTIES specified (please check Configuring KCQL mappings for your QUEUEs and TOPICs for more info on necessary properties)
<your-kafka-topic>
<your-service-bus>
The selection of fields from the Service Bus message is not supported.
You can connect to an Azure Service Bus by passing your connection string in configuration. The connection string can be found in the Shared access policies section of your Azure Portal.
Example:
connect.servicebus.connection.string=Endpoint=sb://YOURNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=YOUR_KEYNAME;SharedAccessKey=YOUR_ACCESS_KEY=
Learn more about different methods of connecting to Service Bus on the Azure Website.
This sink supports the following Kafka payloads:
MessageId
Azure Service Bus doesn’t allow to send messages with null content (payload)
Null Payload (sometimes referred as Kafka Tombstone) is a known concept in Kafka messages world. However, because of Service Bus limitations around that matter, we aren’t allowed to send messages with null payload and we have to drop them instead.
Please keep that in mind when using Service Bus and designing business logic around null payloads!
The Azure Service Bus Connector connects to Service Bus via Microsoft API. In order to smoothly configure your mappings you have to pay attention to PROPERTIES part of your KCQL mappings. There are two cases here: reading from Service Bus of type QUEUE and of type TOPIC. Please refer to the relevant sections below. In case of further questions check Azure Service Bus documentation to learn more about those mechanisms.
In order to be writing to the queue there’s an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part. This parameter is servicebus.type and it can take one of two values depending on the type of the service bus: QUEUE or TOPIC. Naturally for Queue we’re interested in QUEUE here and we need to pass it.
servicebus.type
QUEUE
Example KCQL:
connect.servicebus.kcql=INSERT INTO azure-queue SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='QUEUE');
This is sufficient to enable you to create the mapping with your queue.
In order to be writing to the topic there is an additional parameter that you need to pass with your KCQL mapping in the PROPERTIES part:
TOPIC
connect.servicebus.kcql=INSERT INTO azure-topic SELECT * FROM kafka-topic PROPERTIES('servicebus.type'='TOPIC');
This is sufficient to enable you to create the mapping with your topic.
The following example presents all the mandatory configuration properties for the Service Bus connector. Please note there are also optional parameters listed in Configuration parameters section. Feel free to tweak the configuration to your requirements.
connector.class=io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnector name=AzureEventHubsSinkConnector tasks.max=1 value.converter=org.apache.kafka.connect.storage.StringConverter key.converter=org.apache.kafka.connect.storage.StringConverter connect.servicebus.connection.string="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING"; connect.servicebus.kcql=INSERT INTO output-servicebus SELECT * FROM input-topic PROPERTIES('servicebus.type'='QUEUE');
On this page