Configuring SQL Processors¶
Lenses SQL Engine has been designed to allow topics browsing but also stream processing with SQL.
The SQL streaming engine supports 3 execution modes:IN_PROC
, CONNECT
and KUBERNETES
.
The last two are made available to Enterprise clients and offer fault tolerant and performant streaming applications built via Lenses SQL.
To configure the execution mode update the lenses.sql.execution.mode
.
In Process Mode¶
IN_PROC
is the default execution, set the lenses.sql.execution.mode
to IN_PROC
. This is targeting Development environments or
those production setups where stream processing is kept to a minimum.
# Set up Lenses SQL processing engine
lenses.sql.execution.mode = "IN_PROC" // "CONNECT" // "KUBERNETES"
lenses.sql.state.dir = "logs/lenses-sql-kstream-state"
Lenses stores the internal state of apps in the above folder. If Lenses restarts, it will pick up the state from the above folder and continue the processing. When running on Kubernetes and a restart occurs on a host where the state directory is not present, it will first rebuild it before it starts processing messages.
Kafka Connect Mode¶
This mode requires a Kafka Connect cluster at version 2.0 or 2.1. The Lenses SQL processor is added as a plugin (connector) to the Connect cluster.
Note
If you want to run the Lenses SQL processors on a KAFKA CONNECT cluster read below section. To execute in KUBERNETES read here
To configure Lenses for CONNECT
execution mode:
- Edit the
lenses.conf
file and set the SQL execution mode toCONNECT
- Add one or more connect-distributed endpoints for each of your Lenses SQL enabled clusters in the
lenses.connect.clusters
configuration option.
The resulting lenses.conf
should look like this:
lenses.connect.clusters = [
{
name: "sql_cluster",
urls: [
{
url:"http://localhost:8083",
jmx: "localhost:19555"
}
],
statuses: "connect-statuses",
configs: "connect-configs",
offsets: "connect-offsets"
}
]
....
# Set up Lenses SQL processing engine
lenses.sql.execution.mode = "CONNECT"
lenses.sql.state.dir = "logs/lenses-sql-kstream-state"
This configuration tells Lenses the processor
execution mode is CONNECT
but also which Connect cluster is enabled to run Lenses SQL.
Warning
When scaling out with CONNECT
, the lenses.sql.state.dir
must be created on all workers in any SQL enabled Connect Cluster!
This maps internally to the connect.sql.state.store.dir
option in the connector.
Connector Install¶
The connector (that is a JAR file) needs to be available to each worker in the Kafka Connect Cluster intended for SQL. The best way to add the connector plugin to a Kafka Connect instance is via the isolated classpath loader introduced into Connect in Kafka version 0.11.
Please note that the current Lenses SQL 2.2 connector, requires a Kafka Connect cluster at version 2.0 or 2.1.
- Create a folder called
plugins/lib
and place the Lenses SQL Connector jar inside - Set the
plugin.path
in the worker properties file to the location of the jar - Restart the Connect worker.
# create folder
mkdir -p plugins/lib
# copy in the jar
cp lenses-sql-runners-x.x.x-all.jar plugins/lib
# add plugins path to the worker properties file, ensure this is the only uncommented entry
echo "$PWD/plugins/lib" > config/connect-distributed.properties
# restart the workers
bin/connect-distributed.sh config/connect-distributed.properties
Lenses automatically scans the Connect clusters specified in lenses.connect.clusters
and identifies if
the Lenses SQL connector is available. Multiple Lenses SQL enabled Connect cluster can be specified. When a Lenses SQL connector is created, the user interface will require to select the target Connect cluster.
You can check if the SQL runner is correctly picked with the Connect CLI.
~|⇒ connect-cli plugins
Class name: com.landoop.connect.SQL, Type: source, Version: X.X.X
Class name: org.apache.kafka.connect.file.FileStreamSinkConnector, Type: sink, Version: 0.11.0.0-cp1
Class name: org.apache.kafka.connect.file.FileStreamSourceConnector, Type: source, Version: 0.11.0.0-cp1
~|⇒
Connect SQL with SASL and SSL¶
For Apache Kafka clusters that have SASL and or SSL enabled you to need to take extra steps to enable the Connect cluster and Runner to operate.
- SASL for Connect Workers and SQL Runners
Set the
java.security.auth.login.config
to your jaas.conf file and add it to JVM options when starting each Connect worker. For example:export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/connect/connect_jaas.conf" bin/connect-distributed.sh config/connect-distributed.properties
The SASL mechanism for the SQL runners,
GSSAPI
,SCRAM
orPLAINTEXT
will be set insecurity.mechanism
of thesql.extras
option for the runner. This value comes from the configuration of Lenses.- SSL for Connect Workers
Connect workers have consumers and producers. Add the Kafka client SSL options to the worker properties files that you start the worker with. For example in the
connect-distributed.properties
file add:security.protocol=SSL ssl.truststore.location=/etc/connect/connect.client.truststore.jks ssl.truststore.password=streamreactor ssl.keystore.location=/etc/connect/connect.client.keystore.jks ssl.keystore.password=streamreactor
- SSL for the SQL Runners
SSL and SASL options are passed to the runners, which are Kafka Connectors via the
sql.extras
option. This is a json string which contains the paths to the keystore and truststore, the passwords and other options required for SSL/SASL. The paths to the truststore and keystore are the same as those set up for Lenses.For example, if you have set the keystore path for Lenses to
/mnt/secrets
it must also exist in this location on each of the Kafka Connect worker hosts.Warning
If you do not set the same paths the SQL Runner will not start as the Kafka Clients cannot find the key/truststores.
Custom Serde for Connect SQL¶
If custom serde is required for the SQL Processors in Connect mode, the serde libraries (jar files) should be added in the same directory as the Lenses SQL connector’s jars.
Kubernetes Mode¶
To enable execution of Lenses SQL processors on Kubernetes change lenses.sql.execution.mode
to KUBERNETES
.
Additionally, Lenses requires access to a kubectl
config file and Kubernetes requires access to Landoops Container Registry.
lenses.sql.execution.mode = "KUBERNETES"
# kubernetes configuration
lenses.kubernetes.config.file = "/home/lenses/.kube/config"
lenses.kubernetes.service.account = "default"
#lenses.kubernetes.processor.image.name = "" # Only needed if you use a custom image
#lenses.kubernetes.processor.image.tag = "" # Only needed if you use a custom image
The Docker images for the Lenses SQL Runners are hosted in Landoop container registry. Kubernetes requires an image pull secret to be set up for each namespace you wish to deploy the Lenses SQL Runners too.
Enterprise customers will be provided with credentials to access the registry. For each namespace, you wish to deploy to, the
script bin/configure-image-secret
can be run to set up the image pull secret:
./configure-image-secret landoop lenses-sql gce-credentials.json username@example.com https://eu.gcr.io default
The options for the script are, in ordinal position.
argument | Description |
---|---|
context | Kubectl context to use |
namespace | Namespace to create the secret in |
json_key_path | The path to the GCE service
account user credential file
|
The email to use, require for creating
a docker-registry secret in Kubernetes
|
|
gcr_registry | The google container registry URL |
service_account | The Kubernetes service account to patch.
This is optional. The ‘default’ service account is
patched in the namespace if not set
|
If you are not using the default service account you need to set the correct service
account via lenses.kubernetes.service.account
configuration entry. This tells Lenses
to deploy the pods
using this service account.
Kubernetes SQL with SASL and SSL¶
Note
SQL Processors are deployed with SASL/SSL only if lenses.kubernetes.processor.security.protocol
is
set to SASL_PLAINTEXT
, SASL_SSL
or SSL
.
Lenses can be configured with SSL and SASL settings for the SQL Processors in the main lenses.conf
file. Lenses will load
all SSL and SASL settings starting with the key lenses.kubernetes.processor
. A Kubernetes secret will be created for each namespace
labeled with lenses.io/app.type: lenses-secret
. This secret will then be used by the processor pods to mount JKS, keytab, krb5.conf
and sasl.jaas.configs accordingly and set the environment variables for the processor to use. Below is the minimum requirements,
additional ssl and sasl java configurations can be added prefix with the lenses.kubernetes.processor
key.
lenses.kubernetes.processor.security.protocol = SSL
lenses.kubernetes.processor.ssl.truststore.location = /var/private/ssl/client.truststore.jks
lenses.kubernetes.processor.ssl.truststore.password = test1234
lenses.kubernetes.processor.ssl.keystore.location = /var/private/ssl/client.keystore.jks
lenses.kubernetes.processor.ssl.keystore.password = test1234
lenses.kubernetes.processor.ssl.key.password = test1234
For security.protocols
that include SASL you must also provide:
lenses.kubernetes.processor.jaas.conf.file
which is the path to a jaas.conf file for the processors to use.lenses.kubernetes.processor.keytab.file
which is the path to the Kerberos keytab if sasl.mechanism is GSSAPIlenses.kubernetes.processor.krb5.file
which is the path to the Kerberos krb5 file if sasl.mechanism is GSSAPI
JKS and jaas.conf files are mounted in the pod /mnt/secrets/
and the keytab and krb5.conf at /etc/
.
Important
If you are using GSSAPI your jaas.conf file keytab
entry must be /etc/keytab
!
The keytab provide to Lenses is mounted at /etc/
Custom Serdes for Kubernetes SQL¶
If custom Serdes are required, they should be embedded in a new LSQL processor docker image. The template below may be used for the custom image:
FROM eu.gcr.io/lenses-container-registry/lenses-sql-processor:latest
ADD path/to/serde/jars/directory /opt/serde
ENV LENSES_SQL_RUNNERS_SERDE_CLASSPATH_OPTS=/opt/serde
Once the image is deployed in your registry, please set Lenses to use it (lenses.conf):
lenses.kubernetes.processor.image.name = "your/image-name"
lenses.kubernetes.processor.image.tag = "your-tag"
Note
A more elaborate example to build a custom docker image, would be to create a
directory processor-docker
and under that a subdirectory named serde
:
mkdir -p processor-docker/serde
Once created, copy your serde jar files under
processor-docker/serde
. Then create the file
processor-docker/Dockerfile
with contents:
FROM eu.gcr.io/lenses-container-registry/lenses-sql-processor:latest
ADD serde /opt/serde
ENV LENSES_SQL_RUNNERS_SERDE_CLASSPATH_OPTS=/opt/serde
Proceed to build the docker:
cd processor-docker
docker build -t example/lsql-processor
Once built, upload the image to your registry and set in lenses.conf
:
lenses.kubernetes.processor.image.name = "example/lsql-processor"
lenses.kubernetes.processor.image.tag = "latest"
Lenses SQL Processor Config¶
The connector or Kubernetes processor when not deployed via Lenses requires a minimal set of configurations which are handled for you when submitting requests via Lenses.
Key | Description | Type | Importance |
---|---|---|---|
sql.bootstrap.servers | Kafka brokers to bootstrap the clients | string | high |
sql.schema.registry.url | The url of the schema registry including the protocol .i.e. http | string | high |
sql.state.store.dir | Location for KStreams rocksdb directory | string | high |
sql | Lenses SQL query to execution in the KStream | string | high |
sql.app.id | The Kafka consumer group | string | medium |
sql.metrics.topic | The topic to write connector metrics to | string | medium |
sql.metric.frequency | Frequency in msec to send state and metrics to the metric topic | long | medium |
sql.enable.metrics | Enable state and metrics reporting to Lenses metrics topic | boolean | medium |
sql.status.topic | Status backing topic of the Connect Cluster, has been paused.
The Connect framework does not expose this at runtime
|
string | high |
sql.extras | Contains specific
connection settings as a JSON.
These are used mainly for
SSL/Kerberorised clusters (CONNECT MODE ONLY)
|
string | medium |
The following Default values are used if not provided
Key | Default value |
---|---|
sql.bootstrap.servers | localhost:9092 |
sql.schema.registry.url | http://localhost:8081 |
sql.state.store.dir | logs/lenses-kafka-streams-state |
sql.lenses.id | lenses.connect.id.${UUID.randomUUID()} |
sql.metrics.topic | _kafka_lenses_metrics |
sql.metric.frequency | 5000 |
sql.enable.metrics | true |
sql.status.topic | connect-statuses |
sql.extras |
Helm Lenses SQL Processor¶
Helm is a package manager for Kubernetes which allows you to set via configuration the image, the container specs, the application environment, labels, and annotations. Helm can be download from here and relies on kubectl. Helm and KubeCtl are not part of the Lenses package and are must be installed separately.
For a current list of our existing Helm Charts please visit our repo and are available on our GitHub repo.
The Lenses SQL processor chart, available for Enterprise users, is packaged in the SQL runner release.
To deploy the SQL runner Helm Chart, edit the values.yaml
accordingly or set them via the command line.
# Add repos other connector charts
helm repo add landoop https://landoop.github.io/kafka-helm-charts/
# Install with values.yaml in dry run mode
helm install charts/lenses-sql-processor-runner --name my-stream --namespace lenses --dry-run --debug
# Install
helm install charts/lenses-sql-processor-runner --name my-stream --namespace lenses
# Install and override with different values from a file
helm install -f myvalues.yaml ./helm
# Install and override with different values from command line
helm install install charts/lenses-sql-processor-runner --name my-stream --namespace lenses --set sql.app.id=landoop,brokers.sslEnabled=true
Warning
Lenses will pick up and track deployments created via Helm however if you modify or delete via Lenses, Helm is not aware of these changes. Future releases of Lenses will address this.
Important
The connector and Kubernetes artifacts are only available on an Enterprise license.
Helm Chart Options¶
Key | Description | Default |
---|---|---|
replicaCount | The number of runners/pods to deploy | 1 |
image.repository | The sql runner image | eu.gcr.io/k8-engine/lenses-sql-processor |
image.tag | The sql runner image tag | 2.1 |
resources.limits.memory | 512Mi | |
resources.requests.memory | 256Mi | |
monitoring.pipeline | An optional label to add deployment and pods
|
|
monitoring.enabled | Enable monitoring by adding adding
of prometheus scrape annotations
|
true |
monitoring.port | The port metrics are exposed on
|
9102 |
monitoring.path | The path metrics are exposed on
|
/metrics |
monitoring.logLevel | Log4j debug level | INFO |
serviceAccount | The Kubernetes service account
to deploy as
|
default |
javaOpts | JVM options | -Xms256m -Xmx512m
|
sql | The Lense SQL statement to run | |
applicationId | The consumer group id to use | |
metricsTopicSuffix | The suffix to add to the topic
the processors will report its metrics on
|
|
metricsFrequency | The frequency at which the runner
will report its metrics in miliseconds
|
5000 |
stateStoreDir | The location used by the runner
for the rocks db state store
|
logs/state-store |
port | The port the runner will expose
for status/stop/start and interactive query
|
8083 |
kafka | List of brokers
See brokers
|
|
schemaRegistries | List of schema registries
See schemaRegistries
|
- Bootstrap servers
brokers.bootstrapServers` is a list of
bootstrap servers
. Multiple brokers are supported.If you are deploying brokers inside Kubernetes they should be deployed as a statefulset like this. This allows the pods to have stable network identifiers. Each pods address should be added as an entry. The address takes the form of:
<statefulset-name>-<pod ordinal identifier>.<service name>.<namespace>.svc.cluster.local
For example, a statefulset of 3 replicas called
broker
with a headless service calledbroker
, the addresses would be:broker-0.broker.defaut.svc.cluster.local broker-1.broker.defaut.svc.cluster.local
Note
New brokers added or removed on scaling will not be reflected. Currently, Lenses will require a config update. Future releases will address this.
If you only have one broker you can set the service name.
If your brokers are outside Kubernetes add host names.
Key Description Default kafka.ssl.enabled SSL is enabled on the brokers false kafka.ssl.truststoreFileData The base64 encoded contentsof the truststorekafka.ssl.keystoreFileData The base64 encoded contentsof the keystorekafka.ssl.truststorePassword The truststore password kafka.ssl.keystorePassword The keystore password kafka.sasl.enabled SASL is enabled on the brokers false kafka.sasl.keyTabData The base64 encoded contentsof the keytab file is sasl enabled with GSSAPIkafka.sasl.jaasFileData The contents of the jaas.conf fileis sasl is enabledkafka.sasl.mechanism Thesecurity.mechanism
to use.GSSAPI, SCRAM or PLAINTEXTGSSAPI kafka.sasl.krb5Conf The contents of the krb5Conf fileif the sasl mechanism is GSSAPI
kafka.bootstrapServers.name Host name of the broker kafka.bootstrapServers.port The PLAINTEXT default Kafka port 9092 kafka.bootstrapServers.sslPort The SSL Kafka port 9093 kafka.bootstrapServers.saslPort The SASL_SSL Kafka port 9094 kafka.bootstrapServers.saslPlainTextPort The SASL_PLAINTEXT Kafka port 9095 Example:
kafka: ssl: enabled: false trustStoreFileData: keyStoreFileData: trustStorePassword: keyStorePassword: keyPassword: sasl: enabled: false # keyTabData is the base64 enecoded contents kerberos keytab file is using kerberos mounted in /mnt/secrets keyTabData: |- # jaasFileData is the contents of the kafka jaas file mounted in /mnt/secrets jaasFileData: |- # mechanism is the sasl authentication mechanism GSSAPI, SCRAM or PLAIN mechanism: "GSSAPI" # krb5Conf is the Kerberos config data to be mounted into /etc krb5Conf: |- bootstrapServers: - name: kafka port: 9092 sslPort: 9093 saslSslPort: 9094 saslPlainTextPort: 9095
- Schema Registries
schemaRegistries
is a list ofschemaRegistries
detailing the hostname, HTTP protocol, and ports. Multiple schema registries are supported.If you are deploying multiple schema registries for high availability inside Kubernetes they should be deployed as a statefulset. This allows the pods to have stable network identifiers. Each pods address should be added as an entry. The address takes the form of:
<statefulset-name>-<pod ordinal identifier>.<service name>.<namespace>.svc.cluster.local
For example, a statefulset of 2 replicas called
schema-registry
with a headless service calledschema-registry
, the addresses would be:schema-registry-0.schema.defaut.svc.cluster.local schema-registry-1.schema.defaut.svc.cluster.local
Note
New schema registries added or removed on scaling will not be reflected. Currently, Lenses will require a config update. Future releases will address this.
If you only have one schema registry you can set the service name.
If your schema registries are outside Kubernetes add host names.
Key Description Default schemaRegistries.enabled Enable schema registry support false schemaRegistries.hosts.host The host name of the schema registry instance schemaRegistries.hosts.protocol The HTTP protocol, http or https http schemaRegistries.hosts.port The port for the schema registry instance 8081 Example:
schemaRegistries: enabled: true hosts: - host: schema-registry-1 protocol: http port: 8081 - host: schema-registry-2 protocol: http port: 8081