4.1
Coap
A Kafka Connect sink connector for writing records from Kafka to a Coap Server.
KCQL support
The following KCQL is supported:
INSERT
INTO <your-coap-resource>
SELECT FIELD, ...
FROM kafka_topic
Examples:
-- Insert mode, select all fields from topicA and write to resourceA
INSERT INTO resourceA SELECT * FROM topicA
-- Insert mode, select 3 fields and rename from topicB and write to resourceA
INSERT INTO resourceA SELECT x AS a, y,z FROM topicB
Concepts
The connector writes message to Coap with the Content-Format set to application/json.
DTLS Secure connections
The Connector uses the Californium Java API and for secure connections use the Scandium security module provided by Californium. Scandium (Sc) is an implementation of Datagram Transport Layer Security 1.2, also known as RFC 6347.
Please refer to the Californium for more information.
The connector supports:
- SSL trust and key stores
- Public/Private PEM keys and PSK client/identity
- PSK Client Identity
The Sink will attempt secure connections in the following order if the URI schema of connect.coap.uri set to secure, i.e.coaps
. If connect.coap.username is set PSK client identity authentication is used, if additional connect.coap.private.key.path Public/Private keys authentication will also be attempted. Otherwise SSL trust and key store.
openssl pkcs8 -in privatekey.pem -topk8 -nocrypt -out privatekey-pkcs8.pem
Only cipher suites TLS_PSK_WITH_AES_128_CCM_8 and TLS_PSK_WITH_AES_128_CBC_SHA256 are currently supported.
Loading specific certificates can be achieved by providing a comma separated list for the connect.coap.certs configuration option. The certificate chain can be set by the connect.coap.cert.chain.key configuration option.
Kafka payload support
This sink supports the following Kafka payloads:
- Schema.Struct and Struct (Avro)
- Schema.Struct and JSON
- No Schema and JSON
See connect payloads for more information.
Error polices
The connector supports Error polices .
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=coap
docker-compose up -d coap
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select Coap as the sink and paste the following:
name=coap
connector.class=com.datamountaineer.streamreactor.connect.coap.sink.CoapSinkConnector
tasks.max=1
topics=orders
connect.coap.uri=coap://coap-server:5683
connect.coap.kcql=INSERT INTO create1 SELECT * FROM orders
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create coap < connector.properties
Wait a for the connector to start and check its running:
connect-cli status coap
Inserting test data
In the to fastdata container start the kafka producer shell:
kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"string"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"string"}, {"name":"qty", "type":"string"}]}'
the console is now waiting for your input, enter the following:
{"id": "1", "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": "94.2", "qty":"100"}
Check for data in the Coap Server
docker exec -ti coap coap get coap://coap-server:5683/create1
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.coap.uri | The COAP server to connect to. | string | localhost |
connect.coap.truststore.path | The path to the truststore. | string | |
connect.coap.truststore.pass | The password of the trust store. | password | rootPass |
connect.coap.certs | The password of the trust store. | list | [] |
connect.coap.keystore.path | The path to the truststore. | string | |
connect.coap.keystore.pass | The password of the key store. | password | rootPass |
connect.coap.cert.chain.key | The key to use to get the certificate chain. | string | client |
connect.coap.port | The port the DTLS connector will bind to on the Connector host. | int | 0 |
connect.coap.host | The hostname the DTLS connector will bind to on the Connector host. | string | localhost |
connect.coap.username | CoAP PSK identity. | string | |
connect.coap.password | CoAP PSK secret. | password | |
connect.coap.private.key.file | Path to the private key file for use in with PSK credentials in PKCS8 rather than PKCS1 Use open SSL to convert. openssl pkcs8 -in privatekey.pem -topk8 -nocrypt -out privatekey-pkcs8.pem Only cipher suites TLS_PSK_WITH_AES_128_CCM_8 and TLS_PSK_WITH_AES_128_CBC_SHA256 are currently supported. | string | |
connect.coap.public.key.file | Path to the public key file for use in with PSK credentials | string | |
connect.coap.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them. | string | THROW |
connect.coap.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.coap.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.coap.batch.size | The number of events to take from the internal queue to batch together to send to Kafka. The records willbe flushed if the linger period has expired first. | int | 100 |
connect.source.linger.ms | The number of milliseconds to wait before flushing the received messages to Kafka. The records willbe flushed if the batch size is reached before the linger period has expired. | int | 5000 |
connect.coap.kcql | The KCQL statement to select and route resources to topics. | string |