4.1
Bloomberg
A Kafka Connector source connector to subscribe to Bloomberg feeds and source via the Bloomberg labs open API data and write to Kafka.
Requires a Bloomberg BPIPE subscription.
KCQL support
KCQL is not supported.
Quickstart
Launch the stack
- Copy the docker-compose file.
- Bring up the stack.
docker-compose up -d fastdata
Start the connector
If you are using Lenses, login into Lenses and navigate to the connectors page , select Bloomberg as the source and paste the following:
name=bloomberg
connector.class=com.datamountaineer.streamreactor.connect.bloomberg.BloombergSourceConnector
connect.bloomberg.server.host=localhost
connect.bloomberg.server.port=8194
connect.bloomberg.service.uri=//blp/mkdata
connect.bloomberg.subscriptions=AAPL US Equity:LAST_PRICE,BID,ASK;IBM US Equity:BID,ASK,HIGH,LOW,OPEN
connect.bloomberg.authentication.mode=USER_AND_APPLICATION
connect.bloomberg.authentication.mode=bloomberg
connect.bloomberg.buffer.size=4096
connect.bloomberg.payload.type=avro
connect.bloomberg.kafka.topic=bloomberg
To start the connector without using Lenses, log into the fastdatadev container:
export CONNECTOR=bloomberg
docker exec -ti fastdata /bin/bash
and create a connector.properties file containing the properties above.
Create the connector, with the connect-cli :
connect-cli create bloomberg < connector.properties
Wait a for the connector to start and check its running:
connect-cli status bloomberg
Check for records in Kafka
Check the records in Lenses or with via the console:
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic bloomberg \
--from-beginning
Clean up
Bring down the stack:
docker-compose down
Options
Name | Description | Type | Default Value |
---|---|---|---|
connect.bloomberg.server.host | The hostname running the bloomberg service | string | |
connect.bloomberg.server.port | The port on which the bloomberg service runs (8124-is the default) | int | |
connect.bloomberg.service.uri | The Bloomberg service type: Market Data(//blp/mktdata);Message scrape(//blp/msgscrape) | string | |
connect.bloomberg.authentication.mode | Optional parameter setting how the authentication should be done. It can be APPLICATION_ONLY or USER_AND_APPLICATION. Follow the Bloomberg API documentation for how to configure this | string | |
connect.bloomberg.buffer.size | Specifies how big is the queue to hold the updates received from Bloomberg. If the buffer is fullit won’t accept new items until it is drained. | int | |
connect.bloomberg.service.authorization | Does this service need authorization? true in normal case, false if running in simulatorExample: ture | false | boolean |
connect.progress.enabled | Enables the output for how many records have been processed | boolean | false |
connect.bloomberg.subscriptions | Provides the list of securities and the fields to subscribe to. Example: “AAPL US Equity:LAST_PRICE,BID,ASK;IBM US Equity:BID,ASK,HIGH,LOW,OPEN” | string | |
connect.bloomberg.kafka.topic | The name of the kafka topic on which the data from Bloomberg will be sent. | string | |
connect.bloomberg.payload.type | Specifies the way the information is serialized and sent over kafka. There are two modes supported: json(default) and avro. | string |