This page describes the usage of the Stream Reactor Azure Event Hubs Source Connector.
The following KCQL is supported:
INSERT INTO <your-kafka-topic> SELECT * FROM <your-event-hub>;
The selection of fields from the Event Hubs message is not supported.
io.lenses.streamreactor.connect.azure.eventhubs.source.AzureEventHubsSourceConnector
As for now Azure Event Hubs Connector supports raw bytes passthrough from source Hub to Kafka Topic specified in the KCQL config.
You can connect to Azure EventHubs passing specific JAAS parameters in configuration.
Example:
connect.eventhubs.source.connection.settings.bootstrap.servers=NAMESPACENAME.servicebus.windows.net:9093 connect.eventhubs.source.connection.settings.sasl.mechanism=PLAIN connect.eventhubs.source.connection.settings.security.protocol=SASL_SSL connect.eventhubs.source.connection.settings.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Learn more about different methods of connecting to Event Hubs on Azure Website. The only caveat is to add connector-specific prefix like in example above. See Fine-tunning the Kafka Connector section below for more info.
The Azure Event Hubs Connector utilizes the Apache Kafka API implemented by Event Hubs. This also allows fine-tuning for user-specific needs because the Connector passes all of the properties with a specific prefix directly to the consumer. The prefix is connect.eventhubs.source.connection.settings., and when a user specifies a property with it, it will be automatically passed to the consumer.
connect.eventhubs.source.connection.settings.
The user wants to fine-tune how many data records come through the network at once. They specify the below property as part of their configuration for Azure Event Hubs Connector before starting it:
connect.eventhubs.source.connection.settings.max.poll.records = 100
It means that the internal Kafka Consumer will poll at most 100 records at time (as determined by the configuration max.poll.records)
max.poll.records
There are certain exceptions to this rule as couple of configuration parameters are internally used in order to smoothly proceed with consumption. Those exceptions are listed below:
client.id
group.id
key.deserializer
value.deserializer
enable.auto.commit
false
Below example presents all the necessary parameters configuration in order to use Event Hubs connector. It contains all the necessary parameters (but nothing optional, so feel free to tweak it to your needs):
name=AzureEventHubsSourceConnector connector.class=io.lenses.java.streamreactor.connect.azure.eventhubs.source.AzureEventHubsSourceConnector tasks.max=1 connect.eventhubs.kcql=INSERT INTO azureoutput SELECT * FROM inputhub; connect.eventhubs.source.connection.settings.bootstrap.servers=MYNAMESPACE.servicebus.windows.net:9093 connect.eventhubs.source.connection.settings.sasl.mechanism=PLAIN connect.eventhubs.source.connection.settings.security.protocol=SASL_SSL connect.eventhubs.source.connection.settings.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://MYNAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SOME_SHARED_ACCESS_STRING;EntityPath=inputhub";
On this page