Kafka Connect sink connector for writing data from Kafka to Redis.
The following KCQL is supported:
[INSERT INTO <redis-cache>] SELECT FIELD, ... FROM <kafka-topic> [PK FIELD] [STOREAS SortedSet(key=FIELD)|GEOADD|STREAM]
The purpose of this mode is to cache in Redis [Key-Value] pairs. Imagine a Kafka topic with currency foreign exchange rate messages:
{ "symbol": "USDGBP" , "price": 0.7943 } { "symbol": "EURGBP" , "price": 0.8597 }
You may want to store in Redis: the symbol as the Key and the price as the Value. This will effectively make Redis a caching system, which multiple other applications can access to get the (latest) value. To achieve that using this particular Kafka Redis Sink Connector, you need to specify the KCQL as:
SELECT price from yahoo-fx PK symbol
This will update the keys USDGBP , EURGBP with the relevant price using the (default) JSON format:
Key=EURGBP Value={ "price": 0.7943 }
Composite keys are support with the PK clause, a delimiter can be set with optional configuration property connect.redis.pk.delimiter
To insert messages from a Kafka topic into 1 Sorted Set use the following KCQL syntax:
INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS SortedSet(score=timestamp) TTL=60
This will create and add entries to the (sorted set) named cpu_stats. The entries will be ordered in the Redis set based on the score that we define it to be the value of the timestamp field of the AVRO message from Kafka. In the above example, we are selecting and storing all the fields of the Kafka message.
The TTL statement allows setting a time to live on the sorted set. If not specified to TTL is set.
The connector can create multiple sorted sets by promoting each value of one field from the Kafka message into one Sorted Set and selecting which values to store into the sorted-sets. Set KCQL clause to define the filed using PK (primary key)
SELECT temperature, humidity FROM sensorsTopic PK sensorID STOREAS SortedSet(score=timestamp)
The connector can also prefix the name of the Key using the INSERT statement for Multiple SortedSets:
INSERT INTO FX- SELECT price from yahoo-fx PK symbol STOREAS SortedSet(score=timestamp) TTL=60
This will create key with names FX-USDGBP , FX-EURGBP etc.
To insert messages from a Kafka topic with GEOADD use the following KCQL syntax:
INSERT INTO cpu_stats SELECT * from cpuTopic STOREAS GEOADD
To insert messages from a Kafka topic to a Redis Stream use the following KCQL syntax:
INSERT INTO redis_stream_name SELECT * FROM my-kafka-topic STOREAS STREAM
To insert message from a Kafka topic to a Redis PubSub use the following KCQL syntax:
SELECT * FROM topic STOREAS PubSub (channel=myfield)
The channel to write to in Redis is determined by field in the payload of the Kafka message set in the KCQL statment, in this case a field called myfield.
myfield
This sink supports the following Kafka payloads:
See connect payloads for more information.
The connector supports Error polices.
export CONNECTOR=redis docker-compose up -d redis
If you are using Lenses, login into Lenses and navigate to the connectors page, select Redis as the sink and paste the following:
name=redis connector.class=com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector tasks.max=1 topics=redis connect.redis.host=redis connect.redis.port=6379 connect.redis.kcql=INSERT INTO lenses SELECT * FROM redis STOREAS STREAM
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create redis < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status redis
In the lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 --topic redis \ --property value.schema='{"type":"record","name":"User", "fields":[{"name":"firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"salary","type":"double"}]}'
the console is now waiting for your input, enter the following:
{ "firstName": "John", "lastName": "Smith", "age": 30, "salary": 4830 }
docker exec -ti redis redis-cli get "John"
Bring down the stack:
docker-compose down
On this page