Configure Lenses¶
Lenses use two configuration files. The main configuration file, lenses.conf
contains
runtime configuration options. Security-related configurations are stored in a
separate file security.conf
. This way sensitive data can be protected by
administrators, whilst configuration may be accessible from team members.
The full list of options can be found here.
License¶
In order to run, Lenses requires a license file which can be obtained by contacting us.
Once you have received your license, store it in a file ( i.e. license.json
) and update
the configuration to point to it. Make sure the configuration value contains the full file path.
# License file allowing connecting to up to N brokers
lenses.license.file="license.json"
Third Party Licenses
The license
folder contains the third party licenses for all the open source software and libraries Lenses uses.
A complete list is also available at https://lenses.io/third-party-software
Host and Port¶
During startup, Lenses will bind to the IP and port settings in the configuration file. Use
the ip
and port
configuration entries to set a different value. By default Lenses binds to port 9991
.
# Set the ip:port for Lenses to bind to
lenses.ip = 0.0.0.0
lenses.port = 9991
Java Options¶
The following environment variables control the Java configuration options when starting Lenses:
LENSES_HEAP_OPTS
- The heap space settings, the default is-Xmx3g -Xms512m
LENSES_JMX_OPTS
- JMX options so setLENSES_LOG4J_OPTS
- Logging optionsLENSES_PERFORMANCE_OPTS
- Any extra options, default is-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true
Logging¶
Lenses uses Logback for logging. The logback.xml
is picked up from the installation folder.
To point to a different location for the Logback configuration file just export LENSES_LOG4J_OPTS
as you see below:
export LENSES_LOG4J_OPTS="-Dlogback.configurationFile=file:mylogback.xml"
Log levels¶
Logback enables hot loading of changes to the logback.xml
file. The default refresh period is 30 seconds, and
this can be adjusted via the configuration element:
<configuration scan="true" scanPeriod="30 seconds" >
...
</configuration>
Default log level is set to INFO
. To change adjust the configuration in <logger name="akka" level="DEBUG"/>
.
The default appenders are:
<logger name="akka" level="INFO"/>
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR"/>
<logger name="com.typesafe.sslconfig.ssl.DisabledComplainingHostnameVerifier" level="ERROR"/>
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="ERROR"/>
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="WARN"/>
<logger name="org.apache.kafka.clients.consumer.internals.AbstractCoordinator" level="WARN"/>
<logger name="io.confluent.kafka.serializers.KafkaAvroDeserializerConfig" level="WARN"/>
<logger name="org.I0Itec.zkclient" level="WARN"/>
<logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.apache.calcite" level="OFF"/>
All the log entries are written to the output using the following pattern:%d{ISO8601} %-5p [%c{2}:%L] %m%n
.
Log location¶
All the logs Lenses produces can be found in the logs
directory; however we recommend following the
Twelve-Factor App approach to logging and log to stdout
, especially when
using a container orchestration engine such as Kubernetes. Leave log collection to agents such as filebeats, logstash,
fluentd and flume.
Security¶
Options related to Lenses security starting with lenses.security.***
should be stored in a separate file, named security.conf
.
You can set the path to the security configuration file in the main configuration file lenses.conf
via the key
lenses.secret.file
. This way security.conf
can be managed only by the administration team
and have more tight access control than the rest of the configuration.
Lenses has support for the following login modes: BASIC
and LDAP
.
The security mode is configured through the lenses.security.mode
option.
If set to BASIC
the users and roles can be set via the lenses.security.users
option. For example:
# Security by default to is set to BASIC, alternatively LDAP.
lenses.security.mode=BASIC
# Define the user groups and their roles. At least one user group needs to be set
lenses.security.groups=[
{"name": "adminGroup", "roles": ["admin", "write", "read"]},
{"name": "writeGroup", "roles": ["read", "write"], topic: { blacklist: ["payment.*"] },
{"name": "readGroup", "roles": ["read"], topic: { whitelist: [ "users.*" ] },
{"name": "nodataGroup", "roles": ["nodata"]}
]
#Define the users and link each one to the group(-s) it belongs
lenses.security.users=[
{"username": "admin", "password": "admin999", "displayname": "Lenses Admin", "groups": ["adminGroup"]},
{"username": "write", "password": "write1", "displayname": "Write User", "groups": ["writeGroup"]},
{"username": "read", "password": "read1", "displayname": "Read Only", "groups": ["readGroup"]},
{"username": "nodata", "password": "nodata1", "displayname": "No Data", "groups": ["nodataGroup"]}
]
Note
If a role has been added to admin
it will inherit write
, read
and nodata
automatically.
For write
role it will automatically inherit read
and nodata
as well. The
permission matrix contains additional details on roles and access levels.
Both LDAP and BASIC require lenses.security.groups
. This entry specifies for each group the roles but also,
optionally, which topics can be accessed through a whitelist/blacklist approach. In the example above, the writeGroup will
get to access all topics apart from the ones starting with payment. The readGroup has been set using the whitelisting approach,
and will only allow access to topics with names starting with users. The entries for the topic.whitelist and topic.blacklist are expected
to be regular expressions.
Note
You can control which topics a user group can access. This is how Lenses handles multi-tenancy over Kafka. Leaving out the setting means the group will be able to access all topics. If a topic is restricted no LSQL processor or Connect instance can be created by the users in the group.
LDAP¶
If you want to map LDAP roles to Lenses roles you have to set the lenses.security.mode
value to LDAP
and then use the ldap
configuration
section to provide the settings.
Each enterprise LDAP setup might be different, therefore there is a plugin functionality in order to have a custom implementation
for retrieving the user role list.
The project template for a custom implementation can be found on Github.
With the implementation ready, all that is required is to drop the jar file into Lenses lib
folder and
set the configuration entry lenses.security.ldap.plugin
to point to the implementation full classpath.
Lenses provides out-of-the-box a default implementation via com.landoop.kafka.lenses.security.LdapMemberOfUserGroupPlugin
class.
Here is the template for the LDAP configuration section:
lenses.security.mode=LDAP
lenses.security.ldap.url="ldaps://mycompany.com:636"
lenses.security.ldap.base="OU=Users,DC=mycompany,DC=com"
lenses.security.ldap.user="$LDAP_USER"
lenses.security.ldap.password="$LDAP_USER_PASSWORD"
lenses.security.ldap.filter="(&(objectClass=person)(sAMAccountName=<user>))"
//LDAP roles retriever settings
lenses.security.ldap.plugin.class="com.landoop.kafka.lenses.security.LdapMemberOfUserGroupPlugin"
lenses.security.ldap.plugin.group.extract.regex="(?i)CN=(\\w+),ou=ServiceGroups.*"
lenses.security.ldap.plugin.memberof.key="memberOf"
lenses.security.ldap.plugin.person.name.key = "sn"
Key | Description | Optional | Type | Default |
---|---|---|---|---|
url | The LDAP server url. For example: ldap://mycompany.com:10389 |
No | String | N/A |
base | Your LDAP base. For example: dc=jboss,dc=org |
No | String | N/A |
user | Your LDAP user. For example: uid=admin,ou=system |
No | String | N/A |
password | Your LDAP user password. | No | String | N/A |
filter | The LDAP search filter - must result in a unique result.
See default value.
<user> is required since is replacedat runtime with the current user id.
|
Yes | String | (&(objectClass=person)(sAMAccountName=<user>)) |
plugin.class | Contains the full classpath for the LDAP roles retriever
|implementation |
Yes | string | N/A |
plugin.memberof.key | Your LDAP member of key entry. This is the key for which a
role is attached to the user entry. For example,
memberOf: cn=AdminR,ou=Groups,dc=jboss,dc=org - linksAdminR role to the current user entry.
|
Yes | String | memberOf |
plugin.person.name.key | Your LDAP person entry attribute containing the user full name.
The default value if the configuration is not provided is
sn . |
yes | string | sn |
plugin.group.extract.regex | The regular expression syntax to extra
the role for each ``memberof``(see above) entry.
The default value matches the earlier example for
memberof . |
Yes | String | (?i)CN=(\\w+),ou=Groups.* |
Note
The configuration entries lenses.security.ldap.plugin.memberof.key
, lenses.security.ldap.plugin.person.name.key
and lenses.security.ldap.plugin.group.extract.regex
are specific to the implementation Lenses provides out of the box.
Any custom implementation may require different entries under lenses.security.ldap.plugin
Here is a sample configuration LDAP enabled Lenses:
lenses.security.mode=LDAP
lenses.security.ldap.url="ldaps://landoop.ldap.url:636"
lenses.security.ldap.base="DC=landoop,DC=com"
lenses.security.ldap.password=*****
lenses.security.ldap.user="UID=smiths,OU=ServiceAccounts,DC=landoop,DC=com"
lenses.security.ldap.filter="(&((objectclass=person)(CN=<user>)))"
lenses.security.ldap.plugin.class="com.landoop.kafka.lenses.security.LdapMemberOfUserGroupPlugin"
lenses.security.ldap.plugin.memberof.key="memberOf"
lenses.security.ldap.plugin.group.extract.regex="(?i)CN=(\\w+),ou=ServiceGroups.*"
lenses.security.ldap.plugin.person.name.key ="sn"
Service Accounts¶
Service accounts allow easier integration with Lenses API. A typical use case is enabling your CI/CD tools to interact with Lenses.
Via the lenses.security.service.accounts
specific users and their authorization token can be defined.
Here is how two service accounts can be created:
lenses.security.groups=[
{"name": "group1", "roles": ["admin", "write", "read"]},
{"name": "group2", "roles": ["read", "write"], topic: { blacklist: ["payment.*"] },
{"name": "group3", "roles": ["read"], topic: { whitelist: [ "users.*" ] },
]
lenses.security.service.accounts=[
{
"username": "jenkins",
"token": "1231543kn!!1",
"groups": ["group1", "group2"]
},
{
"username": "goCli",
"token": "12345678",
"groups": ["group3"]
}
]
Your CI system (like Jenkins) and the Lenses Go CLI tool can call into the API without having to first login.
All that is required is for every HTTP request to contain the HTTP header: X-Kafka-Lenses-Token:$TOKEN
.
Each service account is of course linked to a user group in order to restrict the actions it can execute.
SSL Authentication and Encryption¶
If your Kafka cluster uses TLS certificates for authentication, set the broker
protocol to SSL
and then pass in any keystore and truststore
configurations to the consumer and producer settings by prefixing with
lenses.kafka.settings.
such as:
lenses.kafka.settings.consumer.security.protocol=SSL
lenses.kafka.settings.consumer.ssl.truststore.location=/var/private/ssl/client.truststore.jks
lenses.kafka.settings.consumer.ssl.truststore.password=test1234
lenses.kafka.settings.consumer.ssl.keystore.location=/var/private/ssl/client.keystore.jks
lenses.kafka.settings.consumer.ssl.keystore.password=test1234
lenses.kafka.settings.consumer.ssl.key.password=test1234
lenses.kafka.settings.producer.security.protocol=SSL
lenses.kafka.settings.producer.ssl.truststore.location=/var/private/ssl/client.truststore.jks
lenses.kafka.settings.producer.ssl.truststore.password=test1234
lenses.kafka.settings.producer.ssl.keystore.location=/var/private/ssl/client.keystore.jks
lenses.kafka.settings.producer.ssl.keystore.password=test1234
lenses.kafka.settings.producer.ssl.key.password=test1234
If TLS certificates are only used for encryption of data on the wire, the keystore settings may be ommited:
lenses.kafka.settings.consumer.security.protocol=SSL
lenses.kafka.settings.consumer.ssl.truststore.location=/var/private/ssl/client.truststore.jks
lenses.kafka.settings.consumer.ssl.truststore.password=test1234
lenses.kafka.settings.producer.security.protocol=SSL
lenses.kafka.settings.producer.ssl.truststore.location=/var/private/ssl/client.truststore.jks
lenses.kafka.settings.producer.ssl.truststore.password=test1234
If your brokers’ CA certificate is embedded in the system-wide truststore, you can ommit the truststore settings.
SASL Authentication¶
In order for Lenses to access Kafka in an environment set up with Kerberos (SASL) you need to provide lenses with a JAAS file as in the example below. If Lenses is to be used with an ACL enabled cluster, it is advised to use the same principal as the brokers, so it has super user permissions.
Note
A system configured to work with Kerberos usually provides a
system-wide kerberos configuration file (krb5.conf
) that points to
the location of the KDC and includes other configuration options
necessary to authenticate. If your system is missing this file, please
contact your administrator.
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/path/to/keytab-file"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="principal@MYREALM";
};
/*
Optional section for authentication to zookeeper
Please also remember to set lenses.zookeeper.security.enabled=true
*/
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/path/to/keytab-file"
storeKey=true
useTicketCache=false
principal="principal@MYREALM";
};
Once the jaas file is in place, add it to LENSES_OPTS, before starting Lenses:
export LENSES_OPTS="-Djava.security.auth.login.config=/opt/lenses/jaas.conf"
Last, set the security protocol in lenses configuration file:
lenses.kafka.settings.consumer.security.protocol=SASL_PLAINTEXT
lenses.kafka.settings.producer.security.protocol=SASL_PLAINTEXT
lenses.kafka.settings.kstreams.security.protocol=SASL_PLAINTEXT
By default, the connection to Zookeeper will remain unauthenticated. This only affects
the Quota entries, which are written without any Zookeeper ACLs to protect them.
The option lenses.zookeeper.security.enabled
may be used to change this behaviour
but it is very important in such case to use the brokers’ principal for Lenses.
If Lenses is configured with a different principal, then the brokers will not be
able to manipulate the Quota entries and will fail to start. Please contact
our support if help is needed for this feature.
SASL_SSL Authentication and Encryption¶
In this security protocol, Kafka uses a SASL method for authentication and TLS certificates for encryption of data on the wire. As such the configuration is a combination of the SSL/TLS and SASL configurations.
Please provide Lenses with a JAAS file as described in the previous section and add it to LENSES_OPTS:
export LENSES_OPTS="-Djava.security.auth.login.config=/opt/lenses/jaas.conf"
Set Lenses to use SASL_SSL for its producer and consumer part. If your CA’s certificate isn’t part of the system wide truststore, please provide Lenses with a truststore as well:
lenses.kafka.settings.consumer.security.protocol=SASL_SSL
lenses.kafka.settings.consumer.ssl.truststore.location=/var/private/ssl/client.truststore.jks
lenses.kafka.settings.consumer.ssl.truststore.password=test1234
lenses.kafka.settings.producer.security.protocol=SAL_SSL
lenses.kafka.settings.producer.ssl.truststore.location=/var/private/ssl/client.truststore.jks
lenses.kafka.settings.producer.ssl.truststore.password=test1234
System Topics¶
Lenses is a state-less application and thus an excellent fit for Kubernetes or Openshift. During its startup it creates
a few system topics for storing: monitoring, auditing, cluster, user profiles and processors information.
These topics are configured by the topics
configuration block:
# topics created on start-up that Lenses uses to store state
lenses.topics.audits = "_kafka_lenses_audits"
lenses.topics.cluster = "_kafka_lenses_cluster"
lenses.topics.metrics = "_kafka_lenses_metrics"
lenses.topics.profiles = "_kafka_lenses_profiles"
lenses.topics.processors = "_kafka_lenses_processors"
lenses.topics.alerts.storage = "_kafka_lenses_alerts"
lenses.topics.lsql.storage= "_kafka_lenses_lsql_storage"
lenses.topics.alerts.settings = "_kafka_lenses_alerts_settings"
Warning
These are the topics created and managed by the platform automatically. If you are using ACLs allow the user running the Lenses application permissions to manage these topics.
If ACLs are already enabled on your Kafka cluster set the ACLs for the Lenses user and server for
the following topics _kafka_lenses_audits
, _kafka_lenses_cluster
, _kafka_lenses_metrics
, _kafka_lenses_profiles
, _kafka_lenses_processors
,
_kafka_lenses_alerts
, _kafka_lenses_lsql_storage
and _kafka_lenses_alerts_settings
.
kafka-acls \
--authorizer-properties zookeeper.connect=my_zk:2181 \
--add \
--allow-principal User:Lenses \
--allow-host lenses-host \
--operation Read \
--operation Write \
--operation Alter \
--topic topic
JMX Monitoring¶
Enabling JMX¶
To use the full potential of Lenses to monitor the cluster, JMX should be enabled for Kafka Brokers, Schema Registry,
Zookeeper and Kafka Connect instances.
To enable JMX, JMX_PORT
environment variable should be set for each running process with the desired port.
Additional options should be set in order to access JMX remotely (from a different host) as shown below.
- Kafka Brokers
Set
JMX_PORT
andKAFKA_JMX_OPTS
environment variables:export JMX_PORT=[JMX_PORT] export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"
- Kafka Connect
Set
JMX_PORT
andKAFKA_JMX_OPTS
environment variables:export JMX_PORT=[JMX_PORT] export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"
- Schema Registry
Set
JMX_PORT
andSCHEMA_REGISTRY_JMX_OPTS
environment variables:export JMX_PORT=[JMX_PORT] export SCHEMA_REGISTRY_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"
- Zookeeper
Set
JMX_PORT
andZOOKEEPER_SERVER_OPTS
environment variables:export JMX_PORT=[JMX_PORT] export ZOOKEEPER_SERVER_OPTS="$ZOOKEEPER_SERVER_OPTS -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"
Lenses configuration¶
Lenses integrates and monitors Kafka Brokers, Zookeepers, Schema Registries and Kafka Connect Clusters.
To configure which cluster to monitor, set the corresponding endpoints in the lenses.conf
file. If JMX is enabled
for the services mentioned earlier set the jmx
configuration entries as seen below.
# Set up infrastructure end-points
lenses.kafka.brokers = "" // "PLAINTEXT://host1:9092,PLAINTEXT://host2:9092"
lenses.zookeeper.hosts = [] // [{url:"localhost:2181",jmx:"localhost:12181"}]
lenses.zookeeper.chroot = "" // "kafka"
lenses.schema.registry.urls = [] // [{url:"http://localhost:8081",jmx:"localhost:18081"}]
lenses.connect.clusters = [] // [{name: "connectClusterA", statuses: "connect-statuses", configs: "connect-configs", offsets: "connect-offsets", urls:[{url: "http://localhost:8083", jmx:"localhost:18083"}] }]
# If using Grafana, set the url i.e. http://grafana-host:port
lenses.grafana = ""
To configure Kafka Connect correctly you must provide:
- Name for the cluster
- For each worker provide its REST endpoint and optionally the JMX connection
- The Kafka Connect backing topics for status, configs, and offsets
Note
The Kafka Brokers JMX endpoints are picked up automatically from Zookeeper.
Expose Lenses JMX¶
Lenses may expose its own JMX endpoint and therefore other systems can monitor
it. To enable it, set the lenses.jmx.port
option; to disable it comment out
the entry. The Prometheus JMX exporter may also be used which will make
Lenses metrics available to Prometheus. The JMX exporter configuration file and
a jmx_exporter build are provided within the monitoring suite. JMX exporter can run as a java agent, in which case it must be set via
the LENSES_OPTS
environment variable:
export LENSES_OPTS="-javaagent:/path/to/jmx_exporter/fastdata_agent.jar=9102:/path/to/jmx_exporter/client.yml"
In order to monitor from remote hosts, JMX remote access should be enabled as well.
export LENSES_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=[JMX_PORT]"
Alert System Integration¶
Alertmanager¶
This is the preferred way to route alerts to downstream gateways. You can read more on Alertmanager here. Lenses can push alert notifications to the Alertmanager Web hook. To enable the functionality, provide the Alertmanager endpoint(s) via the following setting:
lenses.alert.manager.endpoints="http://host1:port1,http://host1:port1"
Slack¶
To integrate Lenses alerting with Slack, add an Incoming WebHook integration here. Select the #channel where Lenses will be posting alerts and copy the Webhook URL
lenses.alert.plugins.slack.enabled = true
lenses.alert.plugins.slack.webhook.url = "https://hooks.slack.com/services/SECRET/YYYYYYYYYYY/XXXXXXXX"
lenses.alert.plugins.slack.username = "lenses"
lenses.alert.plugins.slack.channel = "#devops"
SQL Processors¶
The Lenses SQL Engine allows users to browse and query topics and also build and execute Kafka Streams flows
with a SQL like syntax. Three execution modes are currently available: IN_PROC
, CONNECT
and KUBERNETES
.
The last two are made available to Enterprise clients and offer fault tolerant and performant execution modes
of Kafka Streams apps built via Lenses SQL.
To configure the execution mode update the lenses.sql.execution.mode
.
See Configuring SQL Processors for more detials.
Topology¶
When using a Kafka Connector and/or Lenses SQL, Lenses will build a graph of all the data flows, and users can interact with this graph via the topology screen.
This provides a high-level view of how your data moves in and out of Kafka. LSQL processors (Kafka Streams applications written with LSQL)
are managed automatically. Out of the box, Lenses supports over 45 Kafka Connect connectors. To enable a
custom connector, or a connector not supported out of the box, set lenses.connectors.info
configuration
entry.
ACLs¶
Lenses may manage ACLs in Zookeeper or via the brokers and the Kafka Admin protocol. The latter mode is mandatory for Kafka version 1.1 onwards. The default method is zookeeper. To switch to the Kafka Admin method, please set in your Lenses configuration file:
lenses.acls.broker.mode=true
Configuration¶
For Topology to work at its best configuration needs to be provided for connectors.
lenses {
...
connectors.info = [
{
class.name = "The connector full classpath"
name = "The name which will be presented in the UI"
instance = "Details about the instance. Contains the connector configuration field which holds the information. If a database is involved it would be the DB connection details, if it is a file it would be the file path, etc"
sink = true
extractor.class = "The full classpath for the implementation knowing how to extract the Kafka topics involved. This is only required for a Source"
icon = "file.png"
description = "A description for the connector"
author = "The connector author"
}
...
]
}
Source connectors require an extra configuration in order to extract the topics information from the connector configuration. The extractor class should be:
com.landoop.kafka.lenses.connect.SimpleTopicsExtractor
. When using this extractor it is expected to provide also
property
configuration which specifies the field in the connector runtime configuration listing the topics to publish to.
Below you can find an example of the file stream source:
class.name = "org.apache.kafka.connect.file.FileStreamSource"
name = "File"
instance = "file"
sink = false
property = "topic"
extractor.class = "com.landoop.kafka.lenses.connect.SimpleTopicsExtractor"
Here are some of the entries the default Lenses configuration provides automatically - of course, all Landoop connectors are already covered.
lenses {
...
connectors.info = [
{
class.name = "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector"
name = "Cassandra"
instance = "connect.cassandra.contact.points"
sink = true
icon = "cassandra.jpg"
description = "Store Kafka data into Cassandra"
docs = "//lenses.stream/connectors/sink/cassandra.html"
author = "Landoop"
},
{
class.name = "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector"
name = "Cassandra"
instance = "connect.cassandra.contact.points"
sink = false
property = "connect.cassandra.kcql"
extractor.class = "com.landoop.kafka.lenses.connect.KcqlInsertTopicsExtractor"
icon = "cassandra.jpg"
description = "Extract Cassandra data using the CQL driver into Kafka"
docs = "//lenses.stream/connectors/source/cassandra.html"
author = "Landoop"
},
{
class.name = "com.datamountaineer.streamreactor.connect.ftp.source.FtpSourceConnector"
name = "Ftp"
instance = "connect.ftp.address"
sink = false
property = "connect.ftp.monitor.tail,connect.ftp.monitor.update"
extractor.class = "com.landoop.kafka.lenses.connect.FtpTopicsExtractor"
icon = "ftp.png"
description = "Tail remote FTP folders and bring messages in Kafka"
docs = "//lenses.stream/connectors/source/ftp.html"
author = "Landoop"
},
{
class.name = "com.datamountaineer.streamreactor.connect.jms.source.JMSSourceConnector"
name = "Jms"
instance = "connect.jms.url"
sink = false
property = "connect.jms.kcql"
extractor.class = "com.landoop.kafka.lenses.connect.KcqlInsertTopicsExtractor"
icon = "jms.png"
description = "Get data from JMS into Kafka"
docs = "//lenses.stream/connectors/source/jms.html"
author = "Landoop"
},
{
class.name = "org.apache.kafka.connect.file.FileStreamSinkConnector"
name = "File"
instance = "file"
sink = true
extractor.class = "com.landoop.kafka.lenses.connect.SimpleTopicsExtractor"
icon = "file.png"
description = "Store Kafka data into files"
author = "Apache Kafka"
},
{
class.name = "org.apache.kafka.connect.file.FileStreamSourceConnector"
name = "File"
instance = "file"
sink = false
property = "topic"
extractor.class = "com.landoop.kafka.lenses.connect.SimpleTopicsExtractor"
icon = "file.png"
description = "Tail files or folders and stream data into Kafka"
author = "Apache Kafka"
},
{
class.name = "io.debezium.connector.mysql.MySqlConnector"
name = "CDC MySQL",
instance = "database.hostname"
sink = false,
property = "database.history.kafka.topic"
extractor.class = "com.landoop.kafka.lenses.connect.SimpleTopicsExtractor"
icon = "debezium.png",
description = "CDC data from RDBMS into Kafka"
docs="//debezium.io/docs/connectors/mysql/"
author = "Debezium"
}
...
]
}
Note
If the connector configuration is not present in the configuration, Lenses will ignore the connector and therefore it won’t show up in the topology view.
Key | Description | Optional | Type | Default |
---|---|---|---|---|
class.name | The connector full class name. i.e. org.apache.kafka.connect.file.FileStreamSourceConnector |
No | String | N/A |
name | The name as it will appear in the topology view. For example: File |
No | String | N/A |
instance | Contains the connector configuration key(-s) the extractor instance
will use to get the information. Consider the FTP source the value is set to
connect.ftp.monitor.tail,connect.ftp.monitor.update . The extractor will get theresult of both those fields and provide a list of involved topics
|
No | String | N/A |
extractor.class | Used for Connect sources only to extract the target topics from the
connector runtime configuration.
|
No | String | N/A |
icon | The path to an icon file the UI will use to display
|the connector. |
Yes | string | N/A |
description | A short sentence to say what the connector does | Yes | string | N/A |
author | Who is providing the connector | Yes | string | N/A |
Lenses provides these classes to extract the information from a Kafka Connect connector configuration:
Class | Description |
---|---|
com.landoop.kafka.lenses.connect.KcqlInsertTopicsExtractor |
A class responsible for extracting the target Kafka topics defined by
a KCQL statement. This targets our own source connectors providing
an easy way to describe their Action via SQL like syntax.
The class can extract from syntax like
INSERT INTO $TOPICA SELECT ... |
com.landoop.kafka.lenses.connect.FtpTopicsExtractor |
A class responsible
for extracting the target topics for the FTP source
|
com.landoop.kafka.lenses.connect.SimpleTopicsExtractor |
A class responsible
for extracting the information out of a a connector configuration entry. For example, topic:topicA, topicB
|
Option Reference¶
Config | Description | Required | Type |
---|---|---|---|
lenses.ip |
Bind HTTP at the given endpoint. Used in conjunction with lenses.port |
no | string |
lenses.port |
The HTTP port the HTTP server listens for connections: serves UI, Rest and WS APIs | no | int |
lenses.jmx.port |
The port to bind an JMX agent to enable JVM monitoring | no | int |
lenses.license.file |
The full path to the license file | yes | string |
lenses.secret.file |
The full path to security.conf containing security credentials read more |
yes | string |
lenses.topics.audits |
Topic to store system auditing information. Keep track of WHO did WHAT and WHEN.
When a topic, config, connector is Created/Updated or Deleted an audit message is stored.
We advise not to change the defaults neither to delete the topic
|
yes | string |
lenses.topics.metrics |
Topic to store stream processor metrics. When your state-less stream processors are
running in Kubernetes or Kafka Connect, this topic collects healthcheck and performance metrics.
We advise not to change the defaults neither to delete the topic
|
yes | string |
lenses.topics.cluster |
Topic to store broker details. Infrastructure information is used to determine
config changes, failures and new nodes added or removed in a cluster.
We advise not to change the defaults neither to delete the topic
|
yes | string |
lenses.topics.profiles |
Topic to store user preferences. Bookmark your most used topics, connectors or SQL processors.
We advise not to change the defaults neither to delete the topic
|
yes | string |
lenses.topics.processors |
Topic to store the SQL processors details.
We advise not to change the defaults neither to delete the topic
|
yes | string |
lenses.topics.alerts.storage |
Topic to store the alerts raised.
We advise not to change the defaults neither to delete the topic
|
yes | string |
lenses.topics.alerts.settings |
Topic to store the alerts configurations.
We advise not to change the defaults neither to delete the topic
|
yes | string |
lenses.topics.lsql.storage |
Topic to store all data access SQL queries. Know WHO access WHAT data and WHEN.
We advise not to change the defaults neither to delete the topic
|
yes | string |
lenses.kafka.brokers |
A list of host/port pairs to use for establishing the initial connection to the Kafka
cluster. Add just a few broker addresses here and Lenses will bootstrap and discover the
full cluster membership (which may change dynamically). This list should be in the form
"host1:port1,host2:port2,host3:port3" |
yes | string |
lenses.zookeeper.hosts |
Provide all the available Zookeeper nodes details. For every ZooKeeper node specify the
connection url (host:port) and if JMX is enabled the JMX (host:port). The configuration should be
[{url:"hostname1:port1",jmx:"hostname1:port2"},{url:"hostname2:port3",jmx:"hostname2:port4"}] |
yes | string |
lenses.zookeeper.chroot |
You can add your
znode (chroot) path if you are using it. Please do not addleading or trailing slashes. For example if you use the zookeeper chroot ``/kafka` for
your Kafka cluster, set this value to
kafka |
no | string |
lenses.zookeeper.security.enabled |
Enables secured connection to your Zookeeper.
The default value is false. Please read about this setting before enabling it.
|
no | boolean |
lenses.acls.broker.mode |
Whether to manage ACLs via the Kafka Admin Client (through the brokers). Mandatory for Kafka 1.1 or greater.
By default this is set to false and Lenses manage ACLs directly in zookeeper.
|
no | boolean |
lenses.schema.registry.urls |
Provide all available Schema Registry node details or list the load balancer address if one is
used. For every instance specify the connection url and if JMX is enabled the JMX (host:port).
The configuration should be
[{url:"http://host1:port1", jmx:"host1:port2"},{url:"http://host2:port3",jmx:"host2:port4"}] |
no | string |
lenses.connect.clusters |
Provide all available Kafka Connect clusters. For each cluster give a name, list the 3 backing topics
and provide workers connection details (host:port) and JMX endpoints if enabled and on Kafka 1.0.0
or later See example here
|
no | array |
lenses.alert.manager.endpoints |
Comma separated Alert Manager endpoints. If provided, Lenses will push raised
alerts to the downstream notification gateway. The configuration should be
"http://host1:port1,http://host1:port1" |
no | string |
lenses.alert.manager.source |
How to identify the source of an Alert in Alert Manager. Default is
Lenses but you mightwant to override to
UAT for example |
no | string |
lenses.alert.manager.generator.url |
A unique URL identifying the creator of this alert. Default is
http://lenses but you mightwant to override to
http://<my_instance_url> for example |
no | string |
lenses.grafana |
If using Grafana, provide the Url location. The configuration should be
"http://grafana-host:port" |
no | string |
lenses.sql.max.bytes |
Used when reading data from a Kafka topic. This is the maximum data size in bytes to return
from a LSQL query. If the query is bringing more data than this limit any records received after
the limit are discarded. This can be overwritten in the LSQL query.
Default value is 20971520 ( 20 MB )
|
yes | long |
lenses.sql.max.time |
Used when reading data from a Kafka topic. This is the time in milliseconds the
query will be allowed to run. If the time is exhausted it returns the records found so far.
This can be overwritten in the LSQL query. Default value is 1 hour
|
yes | int |
lenses.sql.sample.default |
Number of messages to take in every sampling attempt | yes | int |
lenses.sql.sample.window |
How frequently to sample a topic for new messages when tailing it | yes | int |
lenses.metrics.workers |
Number of workers to distribute the load of querying JMX endpoints and collecting metrics | yes | int |
lenses.offset.workers |
Number of workers to distribute the load of querying topic offsets | yes | int |
lenses.sql.execution.mode |
The SQL execution mode, IN_PROC or CONNECT or KUBERNETES read more | yes | string |
lenses.sql.state.dir |
Directory location to store the state of KStreams. If using CONNECT mode, this folder
must already exist on each Kafka Connect worker
|
no | string |
lenses.sql.monitor.frequency |
How frequently SQL processors emmit healthcheck and performance metrics to
lenses.topics.metrics |
no | int |
lenses.kubernetes.image.name |
The docker/container repository url and name of the Lenses SQL runner | no | string |
lenses.kubernetes.image.tag |
The Lenses SQL runner image tag | no | string |
lenses.kubernetes.config.file |
The location of the kubectl config file | no | string |
lenses.kubernetes.service.account |
The service account to deploy with. This account should be able to pull images
from
lenses.kubernetes.image.name |
no | string |
lenses.kubernetes.pull.policy |
The pull policy for Kubernetes containers: IfNotPresent or Always |
no | string |
lenses.kubernetes.runner.mem.limit |
The memory limit applied to the Container | no | string |
lenses.kubernetes.runner.mem.request |
The memory requested for the Container | no | string |
lenses.kubernetes.runner.java.opts |
Advanced JVM and GC memory tunning parameters | no | string |
lenses.interval.summary |
The interval (in msec) to check for new topics, or topic config changes | no | long |
lenses.interval.consumers |
The interval (in msec) to read all consumer info | no | int |
lenses.interval.partitions.messages |
The interval (in msec) to refresh partitions info | no | long |
lenses.interval.type.detection |
The interval (in msec) to check the topic payload type | no | long |
lenses.interval.user.session.ms |
The duration (in msec) that a client session stays alive for. Default is 4 hours | no | long |
lenses.interval.user.session.refresh |
The interval (in msec) to check whether a client session is idle and should be terminated | no | long |
lenses.interval.schema.registry.healthcheck |
The interval (in msec) to check the status of schema registry instances | no | long |
lenses.interval.topology.topics.metrics |
The interval (in msec) to refresh the topology status page | no | long |
lenses.interval.alert.manager.healthcheck |
The interval (in msec) to check the status of the Alert manager instances | no | long |
lenses.interval.alert.manager.publish |
The interval (in msec) on which unresolved alerts are published to alert manager | no | long |
lenses.interval.jmx.refresh.zk |
The interval (in msec) to get Zookeeper JMX | yes | long |
lenses.interval.jmx.refresh.sr |
The interval (in msec) to get Schema Registry JMX | yes | long |
lenses.interval.jmx.refresh.broker |
The interval (in msec) to get Broker JMX | yes | long |
lenses.interval.jmx.refresh.alert.manager |
The interval (in msec) to get Alert Manager JMX | yes | long |
lenses.interval.jmx.refresh.connect |
The interval (in msec) to get Connect JMX | yes | long |
lenses.interval.jmx.refresh.brokers.in.zk |
The interval (in msec) to refresh the brokers from Zookeeper | yes | long |
lenses.kafka.ws.poll.ms |
Max time (in msec) a consumer polls for data on each request, on WS API request
|
no | int |
lenses.kafka.ws.buffer.size |
Max buffer size for WS consumer | no | int |
lenses.kafka.ws.max.poll.records |
Specify the maximum number of records returned in a single call to poll(). It will
impact how many records will be pushed at once to the WS client
|
no | int |
lenses.kafka.ws.heartbeat.ms |
The interval (in msec) to send messages to the client to keep the TCP connection open | no | int |
lenses.access.control.allow.methods |
Restrict the HTTP verbs allowed to initiate a cross-origin HTTP request | no | string |
lenses.access.control.allow.origin |
Restrict to specific hosts cross-origin HTTP requests | no | string |
lenses.schema.registry.topics |
The backing topic where schemas are stored | no | string |
lenses.schema.registry.delete |
Allows subjects to be deleted in the Schema Registry. Default is disabled.
Requires schema-registry version 3.3.0 or later
|
no | boolean |
lenses.allow.weak.SLL |
Allow connecting with https:// services even when self-signed certificates are used |
no | boolean |
lenses.telemetry.enable |
Enable or disable telemetry data collection | no | boolean |
lenses.curator.retries |
The number of attempts to read the broker metadata from Zookeeper | no | int |
lenses.curator.initial.sleep.time.ms |
The initial amount of time to wait between retries to ZK | no | int |
lenses.zookeeper.max.session.ms |
The max time (in msec) to wait for the Zookeeper server to
reply for a request. The implementation requires that the timeout be a
minimum of 2 times the tickTime (as set in the server configuration)
|
no | int |
lenses.zookeeper.max.connection.ms |
The duration (in msec) to wait for the Zookeeper client to establish a new connection | no | int |
lenses.akka.request.timeout.ms |
The maximum time (in msec) to wait for an Akka Actor to reply | no | int |
lenses.kafka.control.topics |
List of Kafka topics to be marked as system topics |
no | string |
lenses.alert.buffer.size |
The number of most recently raised alerts to keep in the cache | no | int |
lenses.kafka.settings.consumer |
Allow additional Kafka consumer settings to be specified. When Lenses creates
an instance of KafkaConsumer class it will use these properties during initialization
|
no | string |
lenses.kafka.settings.producer |
Allow additional Kafka producer settings to be specified. When Lenses creates
an instance of KafkaProducer class it will use these properties during initialization
|
no | string |
lenses.kafka.settings.kstream |
Allow additional Kafka KStreams settings to be specified
|
no | string |
The last three keys, allow configuring the consumer/producer/kstreams settings of Lenses internal consumer/producers/kstreams.
Example: lenses.kafka.settings.producer.compression.type = snappy
Default Values
Config | Default |
---|---|
lenses.ip |
0.0.0.0 |
lenses.port |
9991 |
lenses.jmx.port |
9992 |
lenses.license.file |
license.json |
lenses.secret.file |
security.conf |
lenses.topics.audits |
_kafka_lenses_audits |
lenses.topics.metrics |
_kafka_lenses_metrics |
lenses.topics.cluster |
_kafka_lenses_cluster |
lenses.topics.profiles |
_kafka_lenses_profiles |
lenses.topics.processors |
_kafka_lenses_processors |
lenses.topics.alerts.storage |
_kafka_lenses_alerts |
lenses.topics.lsql.storage |
_kafka_lenses_lsql_storage |
lenses.topics.alerts.settings |
_kafka_lenses_alerts_settings |
lenses.kafka.brokers |
PLAINTEXT://localhost:9092 |
lenses.zookeeper.hosts |
[{url:”localhost:2181”, jmx:”localhost:11991”}] |
lenses.zookeeper.chroot |
|
lenses.schema.registry.urls |
[{url:”http://localhost:8081”, jmx:”localhost:10081”}] |
lenses.connect.clusters |
[{name: “dev”, urls: [{url:”http://localhost:8083”, jmx:”localhost:11100”}], statuses: “connect-statuses”, configs: “connect-configs”, offsets: “connect-offsets” }] |
lenses.alert.manager.endpoints |
|
lenses.grafana |
|
lenses.sql.max.bytes |
20971520 |
lenses.sql.max.time |
3600000 |
lenses.sql.sample.default |
2 |
lenses.sql.sample.window |
200 |
lenses.metrics.workers |
16 |
lenses.offset.workers |
5 |
lenses.sql.execution.mode |
IN_PROC |
lenses.sql.state.dir |
logs/lenses-sql-kstream-state |
lenses.sql.monitor.frequency |
10000 |
lenses.kubernetes.image.name |
eu.gcr.io/lenses-container-registry/lenses-sql-processor |
lenses.kubernetes.image.tag |
2.0 |
lenses.kubernetes.config.file |
/home/lenses/.kube/config |
lenses.kubernetes.service.account |
default |
lenses.kubernetes.pull.policy |
IfNotPresent |
lenses.kubernetes.watch.reconnect.limit |
10 |
lenses.kubernetes.runner.mem.limit |
768Mi |
lenses.kubernetes.runner.mem.request |
512Mi |
lenses.kubernetes.runner.java.opts |
-Xms256m -Xmx512m -XX:MaxPermSize=128m -XX:MaxNewSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true |
lenses.schema.registry.topics |
_schemas |
lenses.schema.registry.delete |
false |
lenses.interval.summary |
10000 |
lenses.interval.consumers |
10000 |
lenses.interval.partitions.messages |
10000 |
lenses.interval.type.detection |
30000 |
lenses.interval.user.session.ms |
14400000 |
lenses.interval.user.session.refresh |
60000 |
lenses.interval.schema.registry.healthcheck |
30000 |
lenses.interval.topology.topics.metrics |
30000 |
lenses.interval.alert.manager.healthcheck |
5000 |
lenses.interval.alert.manager.publish |
30000 |
lenses.interval.jmx.refresh.zk |
5000 |
lenses.interval.jmx.refresh.sr |
5000 |
lenses.interval.jmx.refresh.broker |
5000 |
lenses.interval.jmx.refresh.alert.manager |
6000 |
lenses.interval.jmx.refresh.connect |
5000 |
lenses.interval.jmx.refresh.brokers.in.zk |
5000 |
lenses.kafka.ws.poll.ms |
1000 |
lenses.kafka.ws.buffer.size |
10000 |
lenses.kafka.ws.max.poll.records |
1000 |
lenses.kafka.ws.heartbeat.ms |
30000 |
lenses.access.control.allow.methods |
GET,POST,PUT,DELETE,OPTIONS |
lenses.access.control.allow.origin |
|
lenses.allow.weak.SSL |
true |
lenses.telemetry.enable |
true |
lenses.curator.retries |
3 |
lenses.curator.initial.sleep.time.ms |
2000 |
lenses.zookeeper.max.session.ms |
10000 |
lenses.zookeeper.max.connection.ms |
10000 |
lenses.akka.request.timeout.ms |
10000 |
lenses.kafka.control.topics |
[“connect-configs”, “connect-offsets”, “connect-status”, “connect-statuses”, “_schemas”, “__consumer_offsets”, “_kafka_lenses_”, “lsql_”, “__transaction_state”] |
lenses.alert.buffer.size |
100 |
lenses.kafka.settings.consumer |
{reconnect.backoff.ms = 1000, retry.backoff.ms = 1000} |
lenses.kafka.settings.producer |
{reconnect.backoff.ms = 1000, retry.backoff.ms = 1000} |
lenses.interval.alert.manager.healthcheck |
5000 |
lenses.alert.manager.source |
Lenses |
lenses.alert.manager.generator.url |
http://lenses |
Example¶
# Set the ip:port for Lenses to bind to
lenses.ip = 0.0.0.0
lenses.port = 9991
#lenses.jmx.port = 9992
# License file allowing connecting to up to N brokers
lenses.license.file = "license.json"
# Lenses security configuration is managed in an external file
lenses.secret.file = "security.conf"
# Topics created on start-up that Lenses uses to store state
lenses.topics.audits = "_kafka_lenses_audits"
lenses.topics.metrics = "_kafka_lenses_metrics"
lenses.topics.cluster = "_kafka_lenses_cluster"
lenses.topics.profiles = "_kafka_lenses_profiles"
lenses.topics.processors = "_kafka_lenses_processors"
lenses.topics.alerts.storage = "_kafka_lenses_alerts"
lenses.topics.lsql.storage = "_kafka_lenses_lsql_storage"
lenses.topics.alerts.settings= "_kafka_lenses_alerts_settings"
# Set up infrastructure end-points
lenses.kafka.brokers = "PLAINTEXT://localhost:9092"
lenses.zookeeper.hosts = [{url:"localhost:2181", jmx:"localhost:11991"}]
lenses.zookeeper.chroot= ""
# Optional integrations
lenses.schema.registry.urls = [{url:"http://localhost:8081", jmx:"localhost:10081"}]
lenses.connect.clusters = [{name: "dev", urls: [{url:"http://localhost:8083", jmx:"localhost:11100"}], statuses: "connect-statuses", configs: "connect-configs", offsets: "connect-offsets" }]
lenses.alert.manager.endpoints = "" # "http://host1:port1,http://host1:port1"
lenses.grafana = "" # "http://grafana-host:port"
# Set up Lenses SQL
lenses.sql.max.bytes = 20971520
lenses.sql.max.time = 3600000
lenses.sql.sample.default = 2 # Sample 2 messages every 200 msec
lenses.sql.sample.window = 200
# Set up Lenses workers
lenses.metrics.workers = 16
lenses.offset.workers = 5
# Set up Lenses SQL processing engine
lenses.sql.execution.mode = "IN_PROC" # "CONNECT" # "KUBERNETES"
lenses.sql.state.dir = "logs/lenses-sql-kstream-state"
lenses.sql.monitor.frequency = 10000
# Kubernetes configuration
lenses.kubernetes.image.name = "eu.gcr.io/lenses-container-registry/lenses-sql-processor"
lenses.kubernetes.image.tag = "2.0"
lenses.kubernetes.config.file = "/home/lenses/.kube/config"
lenses.kubernetes.service.account = "default"
lenses.kubernetes.pull.policy = "IfNotPresent"
lenses.kubernetes.watch.reconnect.limit = 10
lenses.kubernetes.runner.mem.limit = "768Mi"
lenses.kubernetes.runner.mem.request = "512Mi"
lenses.kubernetes.runner.java.opts = "-Xms256m -Xmx512m -XX:MaxPermSize=128m -XX:MaxNewSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true"
# Schema Registry topics and whether to allow deleting schemas in schema registry
lenses.schema.registry.topics = "_schemas"
lenses.schema.registry.delete = false
# Lenses internal refresh (in msec)
lenses.interval.summary = 10000
lenses.interval.consumers = 10000
lenses.interval.partitions.messages = 10000
lenses.interval.type.detection = 30000
lenses.interval.user.session.ms = 14400000
lenses.interval.user.session.refresh = 60000
lenses.interval.schema.registry.healthcheck = 30000
lenses.interval.topology.topics.metrics = 30000
lenses.interval.alert.manager.healthcheck = 5000
lenses.interval.alert.manager.publish = 30000
# Lenses JMX internal refresh (in msec)
lenses.interval.jmx.refresh.zk = 30000
lenses.interval.jmx.refresh.sr = 30000
lenses.interval.jmx.refresh.broker = 30000
lenses.interval.jmx.refresh.alert.manager = 30000
lenses.interval.jmx.refresh.connect = 30000
lenses.interval.jmx.refresh.brokers.in.zk = 30000
# Lenses Web Socket API
lenses.kafka.ws.poll.ms = 1000
lenses.kafka.ws.buffer.size = 10000
lenses.kafka.ws.max.poll.records = 1000
lenses.kafka.ws.heartbeat.ms = 30000
# Set access control
lenses.access.control.allow.methods = "GET,POST,PUT,DELETE,OPTIONS"
lenses.access.control.allow.origin = "*"
# Whether to allow self-signed certificates and telemetry
lenses.allow.weak.SSL = true
lenses.telemetry.enable = true
# Zookeeper connections configs
lenses.curator.retries = 3
lenses.curator.initial.sleep.time.ms = 2000
lenses.zookeeper.max.session.ms = 10000
lenses.zookeeper.max.connection.ms = 10000
lenses.akka.request.timeout.ms = 10000
lenses.kafka.control.topics = ["connect-configs", "connect-offsets", "connect-status", "connect-statuses", "_schemas", "__consumer_offsets", "_kafka_lenses_", "lsql_", "__transaction_state"]
# Set up Alerts and Integrations
lenses.alert.buffer.size = 100
lenses.alert.manager.source = "Lenses"
lenses.alert.manager.generator.url = "http://lenses" # A unique URL identifying the creator of this alert.
#we override the aggressive defaults. Don't go too low as it will affect performance when the cluster is down
lenses.kafka.settings.consumer {
reconnect.backoff.ms = 1000
retry.backoff.ms = 1000
}
#we override the aggressive defaults. Don't go too low as it will affect performance when the cluster is down
lenses.kafka.settings.producer {
reconnect.backoff.ms = 1000
retry.backoff.ms = 1000
}
lenses.kafka.settings.producer.compression.type=snappy