Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

zia.shaikh_181698 avatar image
zia.shaikh_181698 asked ·

Datastax sink connector issue

Hi Members,


I am trying to create configuration for cassandra kafka sink connector to write data from Kafka topic to cassandra table on cloud. I am using kafka-connect-dse-1.2.0.jar to connect with DDAC cassandra 3.11 in cloud. but getting timout while connecting. Kindly help

===========================errors=======================================

com.datastax.oss.driver.api.core.AllNodesFailedException: Could not reach any contact point, make sure you've provided valid addresses (showing first 1, use getErrors() for more:


Node(endPoint=<some-host-name>:9042, hostId=null, hashCode=69fed51f): com.datastax.oss.driver.api.core.DriverTimeoutException: [s3|control|id: 0x5244925d, L:/10.192.166.201:60743 - R:<some-host-name>:9042] init query AUTH_RESPONSE: timed out after 500 ms)

at com.datastax.oss.driver.api.core.AllNodesFailedException.copy(AllNodesFailedException.java:95)

at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)

at com.datastax.oss.driver.api.core.session.SessionBuilder.build(SessionBuilder.java:501)

at com.datastax.kafkaconnector.state.LifeCycleManager.buildDseSession(LifeCycleManager.java:542)

at com.datastax.kafkaconnector.state.LifeCycleManager.lambda$startTask$0(LifeCycleManager.java:108)

at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)

at com.datastax.kafkaconnector.state.LifeCycleManager.startTask(LifeCycleManager.java:104)

at com.datastax.kafkaconnector.DseSinkTask.start(DseSinkTask.java:74)

at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:301)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

[2019-11-25 18:45:31,596] ERROR WorkerSinkTask{id=dse-connector-json-example5-2} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)


[2019-11-25 18:45:32,542] INFO Could not access native clock (see debug logs for details), falling back to Java system clock (com.datastax.oss.driver.internal.core.time.Clock)

[2019-11-25 18:45:33,345] INFO [s4] Failed to connect with protocol DSE_V2, retrying with DSE_V1 (com.datastax.oss.driver.internal.core.channel.ChannelFactory)

[2019-11-25 18:45:36,627] ERROR WorkerSinkTask{id=dse-connector-json-example5-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)

com.datastax.oss.driver.api.core.DriverTimeoutException: query 'SELECT * FROM system.local' timed out after PT0.5S

at com.datastax.oss.driver.api.core.DriverTimeoutException.copy(DriverTimeoutException.java:34)

at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)

at com.datastax.oss.driver.api.core.session.SessionBuilder.build(SessionBuilder.java:501)

at com.datastax.kafkaconnector.state.LifeCycleManager.buildDseSession(LifeCycleManager.java:542)

at com.datastax.kafkaconnector.state.LifeCycleManager.lambda$startTask$0(LifeCycleManager.java:108)

at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)

at com.datastax.kafkaconnector.state.LifeCycleManager.startTask(LifeCycleManager.java:104)

at com.datastax.kafkaconnector.DseSinkTask.start(DseSinkTask.java:74)

at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:301)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

[2019-11-25 18:45:36,628] ERROR WorkerSinkTask{id=dse-connector-json-example5-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)



datastax connector not able to connect ddac cassandra
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

csplinter avatar image
csplinter answered ·

Looking at the error "Could not reach any contact point, make sure you've provided valid addresses" ... I would check to make sure that `contactPoints` and `loadBalancing.localDc` are correctly configured in your connector configuration file.

Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Tomasz Lelek avatar image
Tomasz Lelek answered ·

Hello, when using Kafka connector with the cloud you need to provide three parameters in the submitted file with connector setting:

"cloud.secureConnectBundle" = "/path/to/secure-connect-db.zip"
"auth.username": "CLOUD_USERNAME",
"auth.password": "CLOUD_PASSWORD"

The bundle can be downloaded from cloud UI. Username and Password need to match those specified when created a database in the cloud.

Besides that, in your mapping you can use only the keyspace that was created for your cloud db, so, for example, such mapping:

"topic.avro-stream.some_keyspace.avro_udt_table.mapping": "id=key.key,....",

needs to be changed to:

"topic.avro-stream.CLOUD_KEYSPACE.avro_udt_table.mapping": "id=key.key,....",

Where CLOUD_KEYSPACE is equal to keyspace within your cloud database.

Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.