Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

andy.smith_191512 avatar image
andy.smith_191512 asked ·

Why does LOCAL_QUORUM with Kafka connector result in large number of timeouts?

Hi, we are trying to use the kafka connector but we are receiving strange errors. I am setting the connector configuration to use `LOCAL_QUORUM`.

Is this expected, is this simply the cassandra server not keeping up with the volume. If not what might I be doing wrong? :

 [2020-06-04 11:53:09,665] ERROR Problem when getting queryFuture. This is likely a bug in the connector, please report. (com.datastax.kafkaconnector.DseSinkTask)
java.util.concurrent.ExecutionException: com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException: Cassandra timeout during SIMPLE write query at consistency LOCAL_QUORUM (2 replica were required
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at com.datastax.kafkaconnector.DseSinkTask.lambda$put$4(DseSinkTask.java:151)
        at com.datastax.kafkaconnector.TaskStateManager.waitRunTransitionLogic(TaskStateManager.java:33)
        at com.datastax.kafkaconnector.DseSinkTask.put(DseSinkTask.java:108)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException: Cassandra timeout during SIMPLE write query at consistency LOCAL_QUORUM (2 replica were required but only 0 acknowledged the wr

UPDATE: This seems strange to me that we are having timeout issues, as we are not pushing data at a very high rate, we have a batch size of 30, and only 10 concurrent requests for 10 kafka connect stream tasks, and no other activity on the server.

Maybe my configuration could reveal a clue why?

[
   {
    "name": "cassandra-sink-spm-datastax",
    "config": {
        "name": "cassandra-sink-spm-datastax",
        "connector.class": "com.datastax.kafkaconnector.DseSinkConnector",
        "tasks.max": "10",
        "contactPoints": "${env:APP_CASSANDRA_HOST}",
        "loadBalancing.localDc": "eu-west-1",
        "port": "${env:APP_CASSANDRA_PORT}",
        "_cloud.secureConnectBundle": "",
        "ignoreErrors": "true",
        "maxConcurrentRequests": "10",
        "maxNumberOfRecordsInBatch": "30",
        "queryExecutionTimeout": "30",
        "connectionPoolLocalSize": "4",
        "jmx": "false",
        "_compression": "None",
        "auth.provider": "DSE",
        "auth.username": "${env:APP_CASSANDRA_USERNAME}",
        "auth.password": "${env:APP_CASSANDRA_PASSWORD}",
        "_auth.gssapi.keyTab": "",
        "_auth.gssapi.principal": "",
        "_auth.gssapi.service": "dse",
        "ssl.provider": "JDK",
        "ssl.hostnameValidation": "false",
        "_ssl.keystore.password": "",
        "_ssl.keystore.path": "",
        "_ssl.openssl.keyCertChain": "",
        "_ssl.openssl.privateKey": "",
        "_ssl.truststore.password": "",
        "_ssl.truststore.path": "",
        "_ssl.cipherSuites": "",
        "topics": "dev_90_in_vt04_engine_spm",
        "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.mapping": "imo=value.imo, record_id=value.record_id, timestamp=value.timestamp, revolution_per_minute_average=value.revolution_per_minute_average, m_average=value.m_average, n_average=value.n_average, speed_through_water_average_per_minute=value.speed_through_water_average_per_minute, fuel_temperature_average_1=value.fuel_temperature_average_1, fuel_temperature_average_2=value.fuel_temperature_average_2, fuel_temperature_average_3=value.fuel_temperature_average_3, me_hfo_density=value.me_hfo_density, ae_hfo_density=value.ae_hfo_density, blr_hfo_density=value.blr_hfo_density, me_mgo_density=value.me_mgo_density, ae_mgo_density=value.ae_mgo_density, blr_mgo_density=value.blr_mgo_density, hfo_actual_calorific_value=value.hfo_actual_calorific_value, mgo_actual_calorific_value=value.mgo_actual_calorific_value, hfo_reference_calorific_value=value.hfo_reference_calorific_value, mgo_reference_calorific_value=value.mgo_reference_calorific_value, actual_revolutions_counter=value.actual_revolutions_counter, me_total_power_counter=value.me_total_power_counter, me_hfo_consumption_volume_counter=value.me_hfo_consumption_volume_counter, ae_hfo_consumption_volume_counter=value.ae_hfo_consumption_volume_counter, blr_hfo_consumption_volume_counter=value.blr_hfo_consumption_volume_counter, me_mgo_consumption_volume_counter=value.me_mgo_consumption_volume_counter, ae_mgo_consumption_volume_counter=value.ae_mgo_consumption_volume_counter, blr_mgo_consumption_volume_counter=value.blr_mgo_consumption_volume_counter, me_hfo_consumption_mass_counter=value.me_hfo_consumption_mass_counter, ae_hfo_consumption_mass_counter=value.ae_hfo_consumption_mass_counter, blr_hfo_consumption_mass_counter=value.blr_hfo_consumption_mass_counter, me_mgo_consumption_mass_counter=value.me_mgo_consumption_mass_counter, ae_mgo_consumption_mass_counter=value.ae_mgo_consumption_mass_counter, blr_mgo_consumption_mass_counter=value.blr_mgo_consumption_mass_counter, distance_sailed=value.distance_sailed, time_average=value.time_average, me_hfo_volume_1hour_average=value.me_hfo_volume_1hour_average, me_hfo_mass_1hour_average=value.me_hfo_mass_1hour_average, ae_hfo_volume_1hour_average=value.ae_hfo_volume_1hour_average, ae_hfo_mass_1hour_average=value.ae_hfo_mass_1hour_average, bl_hfo_volume_1hour_average=value.bl_hfo_volume_1hour_average, bl_hfo_mass_1hour_average=value.bl_hfo_mass_1hour_average, me_mgo_volume_1hour_average=value.me_mgo_volume_1hour_average, me_mgo_mass_1hour_average=value.me_mgo_mass_1hour_average, ae_mgo_volume_1hour_average=value.ae_mgo_volume_1hour_average, ae_mgo_mass_1hour_average=value.ae_mgo_mass_1hour_average, bl_mgo_volume_1hour_average=value.bl_mgo_volume_1hour_average, bl_mgo_mass_1hour_average=value.bl_mgo_mass_1hour_average, me_sfoc=value.me_sfoc, hull_efficiency=value.hull_efficiency, overall_efficiency=value.overall_efficiency, ae_total_power=value.ae_total_power, ae_1_power_counter=value.ae_1_power_counter, ae_2_power_counter=value.ae_2_power_counter, ae_3_power_counter=value.ae_3_power_counter, ae_4_power_counter=value.ae_4_power_counter, ae_total_sfoc=value.ae_total_sfoc, treserved=value.treserved, torque_offset=value.torque_offset",
        "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.query": "INSERT INTO dev_import_test.spm(imo,record_id,timestamp,revolution_per_minute_average,m_average,n_average,speed_through_water_average_per_minute,fuel_temperature_average_1,fuel_temperature_average_2,fuel_temperature_average_3,me_hfo_density,ae_hfo_density,blr_hfo_density,me_mgo_density,ae_mgo_density,blr_mgo_density,hfo_actual_calorific_value,mgo_actual_calorific_value,hfo_reference_calorific_value,mgo_reference_calorific_value,actual_revolutions_counter,me_total_power_counter,me_hfo_consumption_volume_counter,ae_hfo_consumption_volume_counter,blr_hfo_consumption_volume_counter,me_mgo_consumption_volume_counter,ae_mgo_consumption_volume_counter,blr_mgo_consumption_volume_counter,me_hfo_consumption_mass_counter,ae_hfo_consumption_mass_counter,blr_hfo_consumption_mass_counter,me_mgo_consumption_mass_counter,ae_mgo_consumption_mass_counter,blr_mgo_consumption_mass_counter,distance_sailed,time_average,me_hfo_volume_1hour_average,me_hfo_mass_1hour_average,ae_hfo_volume_1hour_average,ae_hfo_mass_1hour_average,bl_hfo_volume_1hour_average,bl_hfo_mass_1hour_average,me_mgo_volume_1hour_average,me_mgo_mass_1hour_average,ae_mgo_volume_1hour_average,ae_mgo_mass_1hour_average,bl_mgo_volume_1hour_average,bl_mgo_mass_1hour_average,me_sfoc,hull_efficiency,overall_efficiency,ae_total_power,ae_1_power_counter,ae_2_power_counter,ae_3_power_counter,ae_4_power_counter,ae_total_sfoc,treserved,torque_offset) VALUES (:imo,:record_id,:timestamp,:revolution_per_minute_average,:m_average,:n_average,:speed_through_water_average_per_minute,:fuel_temperature_average_1,:fuel_temperature_average_2,:fuel_temperature_average_3,:me_hfo_density,:ae_hfo_density,:blr_hfo_density,:me_mgo_density,:ae_mgo_density,:blr_mgo_density,:hfo_actual_calorific_value,:mgo_actual_calorific_value,:hfo_reference_calorific_value,:mgo_reference_calorific_value,:actual_revolutions_counter,:me_total_power_counter,:me_hfo_consumption_volume_counter,:ae_hfo_consumption_volume_counter,:blr_hfo_consumption_volume_counter,:me_mgo_consumption_volume_counter,:ae_mgo_consumption_volume_counter,:blr_mgo_consumption_volume_counter,:me_hfo_consumption_mass_counter,:ae_hfo_consumption_mass_counter,:blr_hfo_consumption_mass_counter,:me_mgo_consumption_mass_counter,:ae_mgo_consumption_mass_counter,:blr_mgo_consumption_mass_counter,:distance_sailed,:time_average,:me_hfo_volume_1hour_average,:me_hfo_mass_1hour_average,:ae_hfo_volume_1hour_average,:ae_hfo_mass_1hour_average,:bl_hfo_volume_1hour_average,:bl_hfo_mass_1hour_average,:me_mgo_volume_1hour_average,:me_mgo_mass_1hour_average,:ae_mgo_volume_1hour_average,:ae_mgo_mass_1hour_average,:bl_mgo_volume_1hour_average,:bl_mgo_mass_1hour_average,:me_sfoc,:hull_efficiency,:overall_efficiency,:ae_total_power,:ae_1_power_counter,:ae_2_power_counter,:ae_3_power_counter,:ae_4_power_counter,:ae_total_sfoc,:treserved,:torque_offset)",
        "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.consistencyLevel": "LOCAL_QUORUM",
        "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.ttl": "-1",
        "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.ttlTimeUnit" : "SECONDS",
        "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.timestampTimeUnit" : "MICROSECONDS",
        "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.nullToUnset": "true",
        "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.deletesEnabled": "false",
        "topic.dev_90_in_vt04_engine_spm.codec.locale": "en",
        "topic.dev_90_in_vt04_engine_spm.codec.timeZone": "UTC",
        "topic.dev_90_in_vt04_engine_spm.codec.timestamp": "CQL_TIMESTAMP",
        "topic.dev_90_in_vt04_engine_spm.codec.date": "ISO_LOCAL_DATE",
        "topic.dev_90_in_vt04_engine_spm.codec.time": "ISO_LOCAL_TIME",
        "topic.dev_90_in_vt04_engine_spm.codec.unit": "MILLISECONDS",
        "transforms": "Cast",
        "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
        "transforms.Cast.spec": "timestamp:string",
        "_transforms": "TimestampConverter",
        "_transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "_transforms.TimestampConverter.format": "yyyy-MM-dd",
        "_transforms.TimestampConverter.field": "timestamp",
        "_transforms.TimestampConverter.target.type": "string"
    }
  }
]

Ive pasted the full stack trace is below.

│ [2020-06-04 14:13:28,560] ERROR Problem when getting queryFuture. This is likely a bug in the connector, please report. (com.datastax.kafkaconnector.DseSinkTask)                                               │ java.util.concurrent.ExecutionException: com.datastax.oss.driver.api.core.servererrors.ServerError: Internal Server Error                                                                                       │     at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)                                                                                                                             │     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)                                                                                                                                  │     at com.datastax.kafkaconnector.DseSinkTask.lambda$put$4(DseSinkTask.java:151)                                                                                                                               │     at com.datastax.kafkaconnector.TaskStateManager.waitRunTransitionLogic(TaskStateManager.java:33)                                                                                                            │     at com.datastax.kafkaconnector.DseSinkTask.put(DseSinkTask.java:108)                                                                                                                                        │     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)                                                                                                                 │     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)                                                                                                                            │     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)                                                                                                                       │     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)                                                                                                                         │     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)                                                                                                                                   │     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)                                                                                                                                     │     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)                                                                                                                                  │     at java.util.concurrent.FutureTask.run(FutureTask.java:266)                                                                                                                                                 │     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)                                                                                                                          │     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)                                                                                                                          │     at java.lang.Thread.run(Thread.java:748)                                                                                                                                                                    │ Caused by: com.datastax.oss.driver.api.core.servererrors.ServerError: Internal Server Error
connectorkafka
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

Erick Ramirez avatar image
Erick Ramirez answered ·

The stack trace is incomplete but on the face of it, the error is being generated because the coordinator did not get any responses back from any replica. The WriteTimeoutException indicates that the replicas did not respond within write_request_timeout_in_ms usually because the commitlog disk cannot keep up with the IO requests.

You can confirm this by reviewing the logs and see evidence of dropped mutations getting logged. Similarly, you should be able to see dropped mutations from the output of nodetool tpstats. Cheers!

2 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Thanks Erick, so it sounds like if this fails, then it is likely that the data will not get into cassandra at all?

This seems strange to me that we are having timeout issues, as we are not pushing data at a very high rate, we have a batch size of 30, and only 10 concurrent requests for 10 kafka connect stream tasks, and no other activity on the server.

Maybe my configuration could reveal a clue why?

[EDIT: Additional info re-posted into original question.]

0 Likes 0 · ·

You didn't mention whether you checked for dropped mutations as I stated in my answer. Could you do that please? Cheers!

P.S. I've converted your post into a comment since it's not an "answer". :)

0 Likes 0 · ·