A Kafka Connect sink connector for writing records from Kafka to HTTP endpoints.
Below you can find an example of a typical configuration for the HTTP Sink Connector to work (remember, you still need to provide your own endpoint):
name=lenseshttp connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector tasks.max=1 topics=topicToRead value.converter=org.apache.kafka.connect.storage.StringConverter key.converter=org.apache.kafka.connect.storage.StringConverter connect.http.authentication.type=none connect.http.method=POST connect.http.endpoint=http://endpoint.local/receive connect.http.request.content="My Static Content Template" connect.http.batch.count=1
The Lenses HTTP sink comes with multiple options for content templating of the HTTP request.
If you do not wish any part of the key, value, headers or other data to form a part of the message, you can use static templating:
connect.http.request.content="My Static Content Template"
When you are confident you will be generating a single HTTP request per Kafka message, then you can use the simpler templating.
In your configuration, in the content property of your config, you can define template substitutions like the following example:
(please note the XML is only an example, your template can consist of any text format that can be submitted in a http request)
connect.http.request.content="<product><id>{{value.name}}</id></product>"
To collapse multiple messages into a single HTTP request you can use the multiple messaging template. This is automatic if the template has a messages tag. See the below example:
messages
<messages> {{#message}} <message> <topic>{{topic}}</topic> <employee>{{value.employeeId}}</employee> <order>{{value.orderNo}}</order> <groupDomain>{{value.groupDomain}}</groupDomain> </message> {{/message}} </messages>
Again, this is an XML example but your message body can consist of anything including plain text, json or yaml.
Your connector configuration will look like this:
connect.http.request.content="<messages>{{#message}}<message><topic>{{topic}}</topic><employee>{{value.employeeId}}</employee><order>{{value.orderNo}}</order><groupDomain>{{value.groupDomain}}</groupDomain></message>{{/message}}</messages>"
The final result will be HTTP requests with bodies like this:
<messages> <message> <topic>myTopic</topic> <employee>Abcd1234</employee> <order>10</order> <groupDomain>myExampleGroup.uk</groupDomain> </message> <message> <topic>myTopic</topic> <employee>Efgh5678</employee> <order>11</order> <groupDomain>myExampleGroup.uk</groupDomain> </message> </messages>
When using simple and multiple message templating, the following are available:
URL including protocol (eg. http://lenses.io). Template variables can be used.
http://lenses.io
The URL is also a Content Template so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the URL.
Currently, the HTTP Sink supports either no authentication or BASIC HTTP authentication.
By default, no authentication is set. This can be also done by providing a configuration like this:
connect.http.authentication.type=none
BASIC auth can be configured by providing a configuration like this:
connect.http.authentication.type=basic connect.http.authentication.basic.username=user connect.http.authentication.basic.password=password
OAuth auth can be configured by providing a configuration like this:
connect.http.authentication.type=oauth2 connect.http.authentication.oauth2.token.url=http://myoauth2.local/getToken connect.http.authentication.oauth2.client.id=clientId connect.http.authentication.oauth2.client.secret=client-secret connect.http.authentication.oauth2.token.property=access_token connect.http.authentication.oauth2.client.scope=any connect.http.authentication.oauth2.client.headers=header:value
To customise the headers sent with your HTTP request you can supply a Headers List in a key-value fashon.
Each header key and value is also a Content Template so can contain substitutions from the message key/value/headers etc. If you are batching multiple kafka messages into a single request, then the first message will be used for the substitution of the headers.
Example:
connect.http.request.headers="Content-Type","text/plain","X-User","{{header.kafkauser}}","Product","{{value.product.id}}"
Enabling SSL connections between Kafka Connect and HTTP Endpoint ensures that the communication between these services is secure, protecting sensitive data from being intercepted or tampered with. SSL (or TLS) encrypts data in transit, verifying the identity of both parties and ensuring data integrity. Please check out SSL Configuration Properties section in order to set it up.
The connector offers three distinct flush options for data management:
It’s worth noting that the interval flush is a continuous process that acts as a fail-safe mechanism, ensuring that files are periodically flushed, even if the other flush options are not configured or haven’t reached their thresholds.
Consider a scenario where the flush size is set to 10MB, and only 9.8MB of data has been written to the file, with no new Kafka messages arriving for an extended period of 6 hours. To prevent undue delays, the interval flush guarantees that the file is flushed after the specified time interval has elapsed. This ensures the timely management of data even in situations where other flush conditions are not met.
The flush options are configured using the batchCount, batchSize and `timeInterval properties. The settings are optional and if not specified the defaults are:
batchCount
batchSize
connect.http.batch.count=50000 connect.http.batch.size=500000000 connect.http.time.interval=3600
Some configuration examples follow on how to apply this connector to different message types.
These include converters, which are required to instruct Kafka Connect on how to read the source content.
In this case the converters are irrelevant as we are not using the message content to populate our message template.
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector topics=mytopic tasks.max=1 connect.http.method=POST connect.http.endpoint="https://my-endpoint.example.com" connect.http.request.content="My Static Content Template" connect.http.batch.count=1
The HTTP request body contains the value of the message, which is retained as a string value via the StringConverter.
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector topics=mytopic tasks.max=1 connect.http.method=POST connect.http.endpoint="https://my-endpoint.example.com" connect.http.request.content="{{value}}" connect.http.batch.count=1 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
Specific fields from the JSON message are substituted into the HTTP request body alongside some static content.
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector topics=mytopic tasks.max=1 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter connect.http.method=POST connect.http.endpoint="https://my-endpoint.example.com" connect.http.request.content="product: {{value.product}}" connect.http.batch.size=1 value.converter.schemas.enable=false
The entirety of the message value is substituted into a placeholder in the message body. The message is treated as a string via the StringConverter.
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector topics=mytopic tasks.max=1 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter connect.http.method=POST connect.http.endpoint="https://my-endpoint.example.com" connect.http.request.content="whole product message: {{value}}" connect.http.time.interval=5
Fields from the AVRO message are substituted into the message body in the following example:
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector topics=mytopic tasks.max=1 connect.http.method=POST connect.http.endpoint="https://my-endpoint.example.com" connect.http.request.content="product: {{value.product}}" key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=io.confluent.connect.avro.AvroConverter value.converter.schemas.enable=true value.converter.schema.registry.url=http://schema-registry:8081
Starting from version 8.1 as pilot release we give our customers ability to use functionality called Reporter which (if enabled) writes Success and Error processing reports to specified Kafka topic. Reports don’t have key and you can find details about status in the message headers and value.
In order to enable this functionality we have to enable one (or both if we want full reporting) of the properties below:
connect.reporting.error.config.enabled=true connect.reporting.success.config.enabled=true
Then we need to specify other connectivity properties just as we would when configuring Kafka Producer. Full configuration options can be found on Success Reporter Properties and Error Reporter Properties. Below you will be able to find two examples: one with local/plain configuration, other using SASL connection parameter.
This is most common scenario for on-premises Kafka Clusters used just for monitoring
connect.reporting.error.config.enabled=true connect.reporting.error.config.bootstrap.servers=localhost:9094 connect.reporting.error.config.topic=http-monitoring
This is more robust scenario when Connecting to external Kafka Cluster
connect.reporting.error.config.enabled=true connect.reporting.error.config.bootstrap.servers=my-kafka-cluster.com:9093 connect.reporting.error.config.security.protocol=SASL_SSL connect.reporting.error.config.sasl.mechanism=PLAIN connect.reporting.error.config.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="MYUSER" password="MYPASSWORD";
This sink connector supports the following options as part of its configuration:
connect.http.method
connect.http.endpoint
connect.http.request.content
connect.http.authentication.type
connect.http.request.headers
connect.http.batch.count
connect.http.batch.size
connect.http.time.interval
connect.http.upload.sync.period
connect.http.error.threshold
connect.http.retries.on.status.codes
connect.http.retries.max.retries
connect.http.retries.max.timeout.ms
connect.http.connection.timeout.ms
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
JKS
PKCS12
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
TLSv1.2
TLSv1.3
ssl.keymanager.algorithm
ssl.trustmanager.algorithm
connect.reporting.error.config.enabled
false
connect.reporting.error.config.bootstrap.servers
connect.reporting.error.config.topic
connect.reporting.error.config.location
connect.reporting.error.config.sasl.jaas.config
connect.reporting.error.config.sasl.mechanism
connect.reporting.success.config.enabled
connect.reporting.success.config.bootstrap.servers
connect.reporting.success.config.topic
connect.reporting.success.config.location
connect.reporting.success.config.sasl.jaas.config
connect.reporting.success.config.sasl.mechanism
On this page