Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

Robospecta avatar image
Robospecta asked Erick Ramirez commented

Does Kafka sink connector support topic names with periods?

Hi All,

I'm using the datastax sink connector 1.4.0 with confluent cloud kafka to write data to a cassandra database. I am running the sink connector in standalone mode using the cp-all-in-one docker image and passing configuration through environment variables.

The topic names I'm using in confluent kafka contain periods ("."). I know that periods are used as a delimiter to separate the topic name, keyspace, and table name in the mapping of the connectors configuration.

Assuming:
Topic name is "enviro.raw.wind".
Keyspace name is "enviro".
Table name is "raw_wind".

My mapping would be:
topic.enviro.raw.wind.enviro.raw_wind.mapping=<mapping>

Running it understandably produces an error as the underlying code would be unsure on what period character to delimit.

java.lang.IllegalArgumentException: The setting: topic.enviro.raw.wind.enviro.raw_wind.mapping does not match topic.keyspace.table nor topic.codec regular expression pattern

I'm suspecting this might be a bug and one that is not easily solved without changing the structure/delimit of the datastax configuration. If that is the case I can probably change my topic names without much trouble. At the very least I thought it would be worth documenting this behaviour and hopefully confirming that this is a known issue for anyone else having this problem, as I couldn't find any mention of this limitation in forums or documentation.

Thanks.

kafka-connector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

Erick Ramirez avatar image
Erick Ramirez answered Erick Ramirez commented

Support for Kafka topic names that contain dot/period (.) was added in v1.1.0 (KAF-104). This means that enviro.raw.wind is a supported topic name and should work since you're running with v1.4.0.

The JSON configuration should look like:

{
  "name": "my-kafka-sink",
  "config": {
    "connector.class": "com.datastax.kafkaconnector.DseSinkConnector",
    "tasks.max": "1",
    "topics": "my.kafka.topic",
    "topic.my.kafka.topic.ks_name.table_name.mapping”: “pk=key, column=value"
  }
}

I'm wondering if there was an issue with your configuration that is causing the parser to fail. If you post details of your configuration, we'd be happy to review and possibly identify the cause of the failure. Cheers!

3 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Ah yes! I had forgotten, I had seen that in my searching - but couldn't find any more relevant information on the jira ticket and assumed it must have been talking about some other feature given I wasn't having any luck with it.


Having you confirm that it should be possible I took another look. The example above was amended, in my outline below detailing where I went wrong I will provide actual values used to avoid any further issues. Apologies for perhaps leading astray.

It turns out I was seeing the error I noted in my last post because I was trying to set the consistency level, and had not escaped the underscores present in my table name. I figured this out by reviewing the confluent standalone aio connect docker image and reviewing the documentation for dub.py which translates my environment variables in my docker compose into configuration used by the image.
https://github.com/confluentinc/confluent-docker-utils/blob/master/confluent/docker_utils/dub.py

0 Likes 0 ·

So for a topic "enviro.dds.measured.tide" and table "dds_measured_tide" in "enviro" keyspace these were my environment variables (which get translated into configuration).

# What I had
CONNECTOR_topic.enviro.dds.measured.tide.enviro.dds_measured_tide.CONSISTENCY LEVEL: ONE

# What I should have had 
# (Note the double "__" to properly escape the table name, as dub.py is replacing single "_" with ".")
CONNECTOR_topic.enviro.dds.measured.tide.enviro.dds__measured__tide.CONSISTENCY_LEVEL: ONE


I made sure to escape the underscores in my table name for my mapping configuration also to avoid any issues and it works a treat.

Appreciate your help @Erick Ramirez, thanks for the prompt response.

0 Likes 0 ·

Not a problem at all. I'm just glad you figured it out in the end. Cheers!

1 Like 1 ·