A Kafka Connect sink connector for writing records from Kafka to Azure CosmosDB using the SQL API.
The following KCQL is supported:
INSERT | UPSERT INTO <your-collection> SELECT FIELD, ... FROM kafka_topic [PK FIELDS,...]
Examples:
-- Insert mode, select all fields from topicA -- and write to tableA INSERT INTO collectionA SELECT * FROM topicA -- UPSERT mode, select 3 fields and -- rename from topicB and write to tableB -- with primary key as the field id from the topic UPSERT INTO tableB SELECT x AS a, y, z AS c FROM topicB PK id
Insert is the default write mode of the sink. It inserts messages from Kafka topics into DocumentDB.
A failure to insert a record in DocumentDB may occur due to integrity constraints or other exceptions such as casting issues. Kafka currently provides at least once delivery semantics. Therefore, this mode may produce errors if unique constraints have been implemented on the target tables. If the error policy has been set to NOOP then the Sink will discard the error and continue to process, however, it currently makes no attempt to distinguish violation of integrity constraints from other exceptions such as casting issues.
The Sink supports DocumentDB upsert functionality which replaces the existing row if a match is found on the primary keys.
This mode works nicely with at least once delivery semantics on Kafka as order is a guaranteed within partitions. If the same record is delivered twice to the sink, it results in an idempotent write. The existing record will be updated with the values of the second which are the same.
If records are delivered with the same field or group of fields that are used as the primary key on the target table, but different values, the existing record in the target table will be updated.
Since records are delivered in the order they were written per partition the write is idempotent on failure or restart. Redelivery produces the same result.
This sink supports the following Kafka payloads:
See connect payloads for more information.
The connector supports Error polices.
export CONNECTOR=documentdb docker-compose up -d lenses-box
If you are using Lenses, login into Lenses and navigate to the connectors page, select CosmosDB as the sink and paste the following:
name=cosmosdb connector.class=io.lenses.streamreactor.connect.azure.documentdb.sink.DocumentDbSinkConnector tasks.max=1 topics=orders-string connect.documentdb.kcql=INSERT INTO orders SELECT * FROM orders-string connect.documentdb.db=dm connect.documentdb.endpoint=[YOUR_AZURE_ENDPOINT] connect.documentdb.db.create=true connect.documentdb.master.key=[YOUR_MASTER_KEY] connect.documentdb.batch.size=10
To start the connector using the command line, log into the lenses-box container:
docker exec -ti lenses-box /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli:
connect-cli create cosmosdb < connector.properties
Wait for the connector to start and check it’s running:
connect-cli status cosmosdb
In the to lenses-box container start the kafka producer shell:
kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders-string \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"string"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"string"}, {"name":"qty", "type":"string"}]}'
the console is now waiting for your input, enter the following:
{ "id": "1", "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": "94.2", "qty": "100" }
In the Azure portal, select the dm database in the Data explorer, select query and run.
SELECT * FROM c
Bring down the stack:
docker-compose down
On this page