The Kafka connector is designed to seamlessly ingest records from GCP Pub/Sub topics and queues into your Kafka cluster. This makes it useful for backing up or streaming data from Pub/Sub to your Kafka infrastructure. This connector provides robust support for at least once semantics (this connector ensures that each record reaches the Kafka topic at least once).
name=GcpPubSubSourceDemo connector.class=io.lenses.streamreactor.connect.gcp.pubsub.source.GCPPubSubSourceConnector topics=kafka_topic_to_write_to tasks.max=1 connect.pubsub.gcp.auth.mode=File connect.pubsub.gcp.file=/path/to/gcp-service-account-key.json connect.pubsub.gcp.project.id=gcp-project-id connect.pubsub.kcql=insert into `kafka_topic_to_write_to` select * from `gcp-subscription-id`
The connector uses a SQL-like syntax to configure the connector behaviour. The full KCQL syntax is:
INSERT INTO kafka-topic SELECT * FROM subscriptionId [PROPERTIES( 'property.1'=x, 'property.2'=x, )]
Please note that you can employ escaping within KCQL for the INSERT INTO and SELECT * FROM clauses when necessary. For example, if you need to use a topic name that contains a hyphen, you can escape it as follows:
INSERT INTO `my-topic-with-hyphen` SELECT * FROM bucketAddress:pathPrefix
The source and target of the data are specified via the INSERT INTO... SELECT * FROM clause. The connector will write all the records to the given topic, from the given subscription:
INSERT INTO... SELECT * FROM
INSERT INTO my-topic SELECT * FROM subscriptionId;
The PROPERTIES clause is optional and adds a layer of configurability to the connector. It enhances versatility by permitting the application of multiple configurations (delimited by ‘,’).
PROPERTIES
The following properties are supported:
poll
The connector offers two distinct authentication modes:
The simplest example to configure in the connector is the “Default” mode, as this requires no other configuration.
... connect.pubsub.gcp.auth.mode=Default ...
When selecting the “Credentials” mode, it is essential to provide the necessary credentials. Alternatively, if you prefer not to configure these properties explicitly, the connector will follow the credentials retrieval order as described here.
Here’s an example configuration for the “Credentials” mode:
... connect.pubsub.gcp.auth.mode=Credentials connect.pubsub.gcp.credentials=$GCP_CREDENTIALS connect.pubsub.gcp.project.id=$GCP_PROJECT_ID ...
And here is an example configuration using the “File” mode:
... connect.pubsub.gcp.auth.mode=File connect.pubsub.gcp.file=/home/secure-stuff/gcp-read-credential.txt ...
Remember when using file mode the file will need to exist on every worker node in your Kafka connect cluster and be readable by the Kafka Connect process.
For enhanced security and flexibility when using either the “Credentials” mode, it is highly advisable to utilize Connect Secret Providers. You can find detailed information on how to use the Connect Secret Providers here. This approach ensures robust security practices while handling access credentials.
Two modes are available: Default Mode and Compatibility Mode.
Compatibility Mode is intended to ensure compatibility with existing tools, while Default Mode offers a simpler modern redesign of the functionality.
You can choose whichever suits your requirements.
connect.pubsub.output.mode=DEFAULT
Each Pub/Sub message is transformed into a single Kafka record, structured as follows:
The Kafka Key is mapped from the Pub/Sub MessageID, a unique ID for a Pub/Sub message.
The Kafka Value is mapped from the body of the Pub/Sub message.
The Kafka Headers include:
connect.pubsub.output.mode=COMPATIBILITY
The Key is a structure with these fields:
The Value is a structure with these fields:
On this page