question

zia.shaikh_181698 avatar image
zia.shaikh_181698 asked csplinter answered

Datastax sink connector issue

Hi Members,


I am trying to create configuration for cassandra kafka sink connector to write data from Kafka topic to cassandra table on cloud. I am using kafka-connect-dse-1.2.0.jar to connect with DDAC cassandra 3.11 in cloud. but getting timout while connecting. Kindly help

===========================errors=======================================

com.datastax.oss.driver.api.core.AllNodesFailedException: Could not reach any contact point, make sure you've provided valid addresses (showing first 1, use getErrors() for more:


Node(endPoint=<some-host-name>:9042, hostId=null, hashCode=69fed51f): com.datastax.oss.driver.api.core.DriverTimeoutException: [s3|control|id: 0x5244925d, L:/10.192.166.201:60743 - R:<some-host-name>:9042] init query AUTH_RESPONSE: timed out after 500 ms)

at com.datastax.oss.driver.api.core.AllNodesFailedException.copy(AllNodesFailedException.java:95)

at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)

at com.datastax.oss.driver.api.core.session.SessionBuilder.build(SessionBuilder.java:501)

at com.datastax.kafkaconnector.state.LifeCycleManager.buildDseSession(LifeCycleManager.java:542)

at com.datastax.kafkaconnector.state.LifeCycleManager.lambda$startTask$0(LifeCycleManager.java:108)

at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)

at com.datastax.kafkaconnector.state.LifeCycleManager.startTask(LifeCycleManager.java:104)

at com.datastax.kafkaconnector.DseSinkTask.start(DseSinkTask.java:74)

at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:301)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

[2019-11-25 18:45:31,596] ERROR WorkerSinkTask{id=dse-connector-json-example5-2} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)


[2019-11-25 18:45:32,542] INFO Could not access native clock (see debug logs for details), falling back to Java system clock (com.datastax.oss.driver.internal.core.time.Clock)

[2019-11-25 18:45:33,345] INFO [s4] Failed to connect with protocol DSE_V2, retrying with DSE_V1 (com.datastax.oss.driver.internal.core.channel.ChannelFactory)

[2019-11-25 18:45:36,627] ERROR WorkerSinkTask{id=dse-connector-json-example5-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)

com.datastax.oss.driver.api.core.DriverTimeoutException: query 'SELECT * FROM system.local' timed out after PT0.5S

at com.datastax.oss.driver.api.core.DriverTimeoutException.copy(DriverTimeoutException.java:34)

at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)

at com.datastax.oss.driver.api.core.session.SessionBuilder.build(SessionBuilder.java:501)

at com.datastax.kafkaconnector.state.LifeCycleManager.buildDseSession(LifeCycleManager.java:542)

at com.datastax.kafkaconnector.state.LifeCycleManager.lambda$startTask$0(LifeCycleManager.java:108)

at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)

at com.datastax.kafkaconnector.state.LifeCycleManager.startTask(LifeCycleManager.java:104)

at com.datastax.kafkaconnector.DseSinkTask.start(DseSinkTask.java:74)

at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:301)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

[2019-11-25 18:45:36,628] ERROR WorkerSinkTask{id=dse-connector-json-example5-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)



datastax connector not able to connect ddac cassandra
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Tomasz Lelek avatar image
Tomasz Lelek answered Tomasz Lelek edited

Hello, when using Kafka connector with the cloud you need to provide three parameters in the submitted file with connector setting:

"cloud.secureConnectBundle" = "/path/to/secure-connect-db.zip"
"auth.username": "CLOUD_USERNAME",
"auth.password": "CLOUD_PASSWORD"

The bundle can be downloaded from cloud UI. Username and Password need to match those specified when created a database in the cloud.

Besides that, in your mapping you can use only the keyspace that was created for your cloud db, so, for example, such mapping:

"topic.avro-stream.some_keyspace.avro_udt_table.mapping": "id=key.key,....",

needs to be changed to:

"topic.avro-stream.CLOUD_KEYSPACE.avro_udt_table.mapping": "id=key.key,....",

Where CLOUD_KEYSPACE is equal to keyspace within your cloud database.

Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

csplinter avatar image
csplinter answered

Looking at the error "Could not reach any contact point, make sure you've provided valid addresses" ... I would check to make sure that `contactPoints` and `loadBalancing.localDc` are correctly configured in your connector configuration file.

Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.