Kafka Connect Examples
The easiest way to manage Connections is via Lenses GUI under their respective pages, however it is also possible to do it directly via API, Helm or Lenses CLI. In such case, some connection type-specific values have to be used. Here are few examples of such configuration in YAML format.
- Find out more about managing Kafka Connect Connections via API
- Find out more about managing Connections via **Lenses **provision
- Find out more about installing Lenses via Helm
Simple configuration, with JMX metrics
The URLs (workers) should always have a scheme defined (http:// or https://).
This example uses an optional AES-256 key. The key decodes values encoded with AES-256 to enable passing encrypted values to connectors. It is only needed if your cluster uses AES-256 Decryption plugin.
Copyyour-cluster-name:
tags: ["tag1"]
templateName: KafkaConnect
configurationObject:
workers:
- http://my-kc.worker1:8083
- http://my-kc.worker2:8083
aes256Key: PasswordPasswordPasswordPassword
# all metrics properties are optional
metricsPort: 9581
metricsType: JMX
metricsSsl: false
Misc metrics configurations
Find more about multiple options of configuring services’ metrics (like secured JMX, Jolokia, etc) under Services Metrics
Basic authentication
For Basic Authentication, define username
and password
properties.
your-cluster-name:
tags: ["tag1"]
templateName: KafkaConnect
configurationObject:
workers:
- http://my-kc.worker1:8083
- http://my-kc.worker2:8083
username: my-username
password: my-password
TLS with custom truststore
A custom truststore is needed when the Kafka Connect workers are served over TLS (encryption-in-transit) and their certificates are not signed by a trusted CA.
Copyyour-cluster-name:
tags: ["tag1"]
templateName: KafkaConnect
configurationObject:
workers:
- https://my-kc.worker1:8083
- https://my-kc.worker2:8083
sslTruststore:
fileRef:
filePath: /path/to/my/truststore.jks
sslTruststorePassword: myPassword
TLS with client authentication
A custom truststore might be necessary too (see above).
Copyyour-cluster-name:
tags: ["tag1"]
templateName: KafkaConnect
configurationObject:
workers:
- https://my-kc.worker1:8083
- https://my-kc.worker2:8083
sslKeystore:
fileRef:
filePath: /path/to/my/keystore.jks
sslKeyPassword: keyPassword
sslKeystorePassword: keystorePassword
TLS with Basic Authentication
As above - a custom truststore is needed when the Kafka Connect workers are served over TLS (encryption-in-transit) and their certificates are not signed by a trusted CA.
Copyyour-cluster-name:
tags: ["tag1"]
templateName: KafkaConnect
configurationObject:
workers:
- https://my-kc.worker1:8083
- https://my-kc.worker2:8083
sslTruststore:
fileRef:
filePath: /path/to/my/truststore.jks
sslTruststorePassword: myPassword
username: my-username
password: my-password
Custom Connector
If you have a custom connector that you want to enable it for the topology view, or a connector not supported out
of the box, you will need to set the lenses.connectors.info
configuration entry.
Here is how you can configure connectors in order to appear in the topology graph:
Copylenses {
...
connectors.info = [
{
class.name = "The connector full classpath"
name = "The name which will be presented in the UI"
instance = "Details about the instance. Contains the connector configuration field which holds the information. If a database is involved it would be the DB connection details, if it is a file it would be the file path, etc"
sink = true
extractor.class = "The full classpath for the implementation knowing how to extract the Kafka topics involved. This is only required for a Source"
icon = "file.png"
description = "A description for the connector"
author = "The connector author"
}
...
]
}
Source connectors
Source connectors do not have a standard way to identify target topics in Kafka. The extractor.class option, as seen above, allows Lenses to identify which Kafka topics the connector writes to. When using this extractor it is also expected to provide a property configuration which specifies the field within the connector runtime configuration containing the topics to publish to. Here is an example for a file stream source connector:
Copyclass.name = "org.apache.kafka.connect.file.FileStreamSource"
name = "File"
instance = "file"
sink = false
property = "topic"
extractor.class = "io.lenses.config.kafka.connect.SimpleTopicsExtractor"