4.1
Coap
A Kafka Connect Coap source connector and to stream messages from a CoAP server and write them to a Kafka topic.
KCQL support
The following KCQL is supported:
INSERT INTO <your-topic>
SELECT *
FROM <coap-resource>
Examples:
INSERT INTO topicA SELECT * FROM resourceA
Selection of fields from the Coap message is not supported.
Concepts
The Source Connector automatically converts the CoAP response into a Kafka Connect Struct to be store in Kafka as AVRO or JSON dependent on the Converters used in Connect. The schema is fixed can found Data Types .
The key of the Struct message sent to Kafka is made from the source defined in the message, the resource on the CoAP server and the message id.
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
export CONNECTOR=coap
docker-compose up -d coap
Inserting test data
The coap docker starts a CoaP server with an observable resource.
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select Coap as the source and paste the following:
name=coap
connector.class=com.datamountaineer.streamreactor.connect.coap.source.CoapSourceConnector
tasks.max=1
connect.coap.uri=coap://coap
connect.coap.kcql= INSERT INTO coap SELECT * FROM obs-pumping
To start the connector without using Lenses, log into the fastdatadev container:
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create coap < connector.properties
Wait a for the connector to start and check its running:
connect-cli status coap
Check for records in Kafka
Check the records in Lenses or with via the console:
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic coap \
--from-beginning
Data type conversion
The schema is fixed.
The following schema is used for the key:
Name | Type |
---|---|
source | Optional string |
source_resource | Optional String |
message_id | Optional int32 |
The following schema is used for the payload:
Name | Type |
---|---|
message_id | Optional int32 |
type | Optional String |
code | Optional String |
raw_code | Optional int32 |
rtt | Optional int64 |
is_last | Optional boolean |
is_notification | Optional boolean |
source | Optional String |
destination | Optional String |
timestamp | Optional int64 |
token | Optional String |
is_duplicate | Optional boolean |
is_confirmable | Optional boolean |
is_rejected | Optional boolean |
is_acknowledged | Optional boolean |
is_canceled | Optional boolean |
accept | Optional int32 |
block1 | Optional String |
block2 | Optional String |
content_format | Optional int32 |
etags | Array of Optional Strings |
location_path | Optional String |
location_query | Optional String |
max_age | Optional int64 |
observe | Optional int32 |
proxy_uri | Optional String |
size_1 | Optional String |
size_2 | Optional String |
uri_host | Optional String |
uri_port | Optional int32 |
uri_path | Optional String |
uri_query | Optional String |
payload | Optional String |
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.coap.uri | The COAP server to connect to. | string | localhost |
connect.coap.truststore.path | The path to the truststore. | string | |
connect.coap.truststore.pass | The password of the trust store. | password | rootPass |
connect.coap.certs | The password of the trust store. | list | [] |
connect.coap.keystore.path | The path to the truststore. | string | |
connect.coap.keystore.pass | The password of the key store. | password | rootPass |
connect.coap.cert.chain.key | The key to use to get the certificate chain. | string | client |
connect.coap.port | The port the DTLS connector will bind to on the Connector host. | int | 0 |
connect.coap.host | The hostname the DTLS connector will bind to on the Connector host. | string | localhost |
connect.coap.username | CoAP PSK identity. | string | |
connect.coap.password | CoAP PSK secret. | password | |
connect.coap.private.key.file | Path to the private key file for use in with PSK credentials in PKCS8 rather than PKCS1 Use open SSL to convert. openssl pkcs8 -in privatekey.pem -topk8 -nocrypt -out privatekey-pkcs8.pem Only cipher suites TLS_PSK_WITH_AES_128_CCM_8 and TLS_PSK_WITH_AES_128_CBC_SHA256 are currently supported. | string | |
connect.coap.public.key.file | Path to the public key file for use in with PSK credentials | string | |
connect.coap.error.policy | Specifies the action to be taken if an error occurs while inserting the data. There are three available options: NOOP - the error is swallowed THROW - the error is allowed to propagate. RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.cassandra.max.retries. All errors will be logged automatically, even if the code swallows them. | string | THROW |
connect.coap.max.retries | The maximum number of times to try the write again. | int | 20 |
connect.coap.retry.interval | The time in milliseconds between retries. | int | 60000 |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.coap.batch.size | The number of events to take from the internal queue to batch together to send to Kafka. The records willbe flushed if the linger period has expired first. | int | 100 |
connect.source.linger.ms | The number of milliseconds to wait before flushing the received messages to Kafka. The records willbe flushed if the batch size is reached before the linger period has expired. | int | 5000 |
connect.coap.kcql | The KCQL statement to select and route resources to topics. | string |