Hi, we are trying to use the kafka connector but we are receiving strange errors. I am setting the connector configuration to use `LOCAL_QUORUM`.
Is this expected, is this simply the cassandra server not keeping up with the volume. If not what might I be doing wrong? :
[2020-06-04 11:53:09,665] ERROR Problem when getting queryFuture. This is likely a bug in the connector, please report. (com.datastax.kafkaconnector.DseSinkTask) java.util.concurrent.ExecutionException: com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException: Cassandra timeout during SIMPLE write query at consistency LOCAL_QUORUM (2 replica were required at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at com.datastax.kafkaconnector.DseSinkTask.lambda$put$4(DseSinkTask.java:151) at com.datastax.kafkaconnector.TaskStateManager.waitRunTransitionLogic(TaskStateManager.java:33) at com.datastax.kafkaconnector.DseSinkTask.put(DseSinkTask.java:108) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException: Cassandra timeout during SIMPLE write query at consistency LOCAL_QUORUM (2 replica were required but only 0 acknowledged the wr
UPDATE: This seems strange to me that we are having timeout issues, as we are not pushing data at a very high rate, we have a batch size of 30, and only 10 concurrent requests for 10 kafka connect stream tasks, and no other activity on the server.
Maybe my configuration could reveal a clue why?
[ { "name": "cassandra-sink-spm-datastax", "config": { "name": "cassandra-sink-spm-datastax", "connector.class": "com.datastax.kafkaconnector.DseSinkConnector", "tasks.max": "10", "contactPoints": "${env:APP_CASSANDRA_HOST}", "loadBalancing.localDc": "eu-west-1", "port": "${env:APP_CASSANDRA_PORT}", "_cloud.secureConnectBundle": "", "ignoreErrors": "true", "maxConcurrentRequests": "10", "maxNumberOfRecordsInBatch": "30", "queryExecutionTimeout": "30", "connectionPoolLocalSize": "4", "jmx": "false", "_compression": "None", "auth.provider": "DSE", "auth.username": "${env:APP_CASSANDRA_USERNAME}", "auth.password": "${env:APP_CASSANDRA_PASSWORD}", "_auth.gssapi.keyTab": "", "_auth.gssapi.principal": "", "_auth.gssapi.service": "dse", "ssl.provider": "JDK", "ssl.hostnameValidation": "false", "_ssl.keystore.password": "", "_ssl.keystore.path": "", "_ssl.openssl.keyCertChain": "", "_ssl.openssl.privateKey": "", "_ssl.truststore.password": "", "_ssl.truststore.path": "", "_ssl.cipherSuites": "", "topics": "dev_90_in_vt04_engine_spm", "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.mapping": "imo=value.imo, record_id=value.record_id, timestamp=value.timestamp, revolution_per_minute_average=value.revolution_per_minute_average, m_average=value.m_average, n_average=value.n_average, speed_through_water_average_per_minute=value.speed_through_water_average_per_minute, fuel_temperature_average_1=value.fuel_temperature_average_1, fuel_temperature_average_2=value.fuel_temperature_average_2, fuel_temperature_average_3=value.fuel_temperature_average_3, me_hfo_density=value.me_hfo_density, ae_hfo_density=value.ae_hfo_density, blr_hfo_density=value.blr_hfo_density, me_mgo_density=value.me_mgo_density, ae_mgo_density=value.ae_mgo_density, blr_mgo_density=value.blr_mgo_density, hfo_actual_calorific_value=value.hfo_actual_calorific_value, mgo_actual_calorific_value=value.mgo_actual_calorific_value, hfo_reference_calorific_value=value.hfo_reference_calorific_value, mgo_reference_calorific_value=value.mgo_reference_calorific_value, actual_revolutions_counter=value.actual_revolutions_counter, me_total_power_counter=value.me_total_power_counter, me_hfo_consumption_volume_counter=value.me_hfo_consumption_volume_counter, ae_hfo_consumption_volume_counter=value.ae_hfo_consumption_volume_counter, blr_hfo_consumption_volume_counter=value.blr_hfo_consumption_volume_counter, me_mgo_consumption_volume_counter=value.me_mgo_consumption_volume_counter, ae_mgo_consumption_volume_counter=value.ae_mgo_consumption_volume_counter, blr_mgo_consumption_volume_counter=value.blr_mgo_consumption_volume_counter, me_hfo_consumption_mass_counter=value.me_hfo_consumption_mass_counter, ae_hfo_consumption_mass_counter=value.ae_hfo_consumption_mass_counter, blr_hfo_consumption_mass_counter=value.blr_hfo_consumption_mass_counter, me_mgo_consumption_mass_counter=value.me_mgo_consumption_mass_counter, ae_mgo_consumption_mass_counter=value.ae_mgo_consumption_mass_counter, blr_mgo_consumption_mass_counter=value.blr_mgo_consumption_mass_counter, distance_sailed=value.distance_sailed, time_average=value.time_average, me_hfo_volume_1hour_average=value.me_hfo_volume_1hour_average, me_hfo_mass_1hour_average=value.me_hfo_mass_1hour_average, ae_hfo_volume_1hour_average=value.ae_hfo_volume_1hour_average, ae_hfo_mass_1hour_average=value.ae_hfo_mass_1hour_average, bl_hfo_volume_1hour_average=value.bl_hfo_volume_1hour_average, bl_hfo_mass_1hour_average=value.bl_hfo_mass_1hour_average, me_mgo_volume_1hour_average=value.me_mgo_volume_1hour_average, me_mgo_mass_1hour_average=value.me_mgo_mass_1hour_average, ae_mgo_volume_1hour_average=value.ae_mgo_volume_1hour_average, ae_mgo_mass_1hour_average=value.ae_mgo_mass_1hour_average, bl_mgo_volume_1hour_average=value.bl_mgo_volume_1hour_average, bl_mgo_mass_1hour_average=value.bl_mgo_mass_1hour_average, me_sfoc=value.me_sfoc, hull_efficiency=value.hull_efficiency, overall_efficiency=value.overall_efficiency, ae_total_power=value.ae_total_power, ae_1_power_counter=value.ae_1_power_counter, ae_2_power_counter=value.ae_2_power_counter, ae_3_power_counter=value.ae_3_power_counter, ae_4_power_counter=value.ae_4_power_counter, ae_total_sfoc=value.ae_total_sfoc, treserved=value.treserved, torque_offset=value.torque_offset", "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.query": "INSERT INTO dev_import_test.spm(imo,record_id,timestamp,revolution_per_minute_average,m_average,n_average,speed_through_water_average_per_minute,fuel_temperature_average_1,fuel_temperature_average_2,fuel_temperature_average_3,me_hfo_density,ae_hfo_density,blr_hfo_density,me_mgo_density,ae_mgo_density,blr_mgo_density,hfo_actual_calorific_value,mgo_actual_calorific_value,hfo_reference_calorific_value,mgo_reference_calorific_value,actual_revolutions_counter,me_total_power_counter,me_hfo_consumption_volume_counter,ae_hfo_consumption_volume_counter,blr_hfo_consumption_volume_counter,me_mgo_consumption_volume_counter,ae_mgo_consumption_volume_counter,blr_mgo_consumption_volume_counter,me_hfo_consumption_mass_counter,ae_hfo_consumption_mass_counter,blr_hfo_consumption_mass_counter,me_mgo_consumption_mass_counter,ae_mgo_consumption_mass_counter,blr_mgo_consumption_mass_counter,distance_sailed,time_average,me_hfo_volume_1hour_average,me_hfo_mass_1hour_average,ae_hfo_volume_1hour_average,ae_hfo_mass_1hour_average,bl_hfo_volume_1hour_average,bl_hfo_mass_1hour_average,me_mgo_volume_1hour_average,me_mgo_mass_1hour_average,ae_mgo_volume_1hour_average,ae_mgo_mass_1hour_average,bl_mgo_volume_1hour_average,bl_mgo_mass_1hour_average,me_sfoc,hull_efficiency,overall_efficiency,ae_total_power,ae_1_power_counter,ae_2_power_counter,ae_3_power_counter,ae_4_power_counter,ae_total_sfoc,treserved,torque_offset) VALUES (:imo,:record_id,:timestamp,:revolution_per_minute_average,:m_average,:n_average,:speed_through_water_average_per_minute,:fuel_temperature_average_1,:fuel_temperature_average_2,:fuel_temperature_average_3,:me_hfo_density,:ae_hfo_density,:blr_hfo_density,:me_mgo_density,:ae_mgo_density,:blr_mgo_density,:hfo_actual_calorific_value,:mgo_actual_calorific_value,:hfo_reference_calorific_value,:mgo_reference_calorific_value,:actual_revolutions_counter,:me_total_power_counter,:me_hfo_consumption_volume_counter,:ae_hfo_consumption_volume_counter,:blr_hfo_consumption_volume_counter,:me_mgo_consumption_volume_counter,:ae_mgo_consumption_volume_counter,:blr_mgo_consumption_volume_counter,:me_hfo_consumption_mass_counter,:ae_hfo_consumption_mass_counter,:blr_hfo_consumption_mass_counter,:me_mgo_consumption_mass_counter,:ae_mgo_consumption_mass_counter,:blr_mgo_consumption_mass_counter,:distance_sailed,:time_average,:me_hfo_volume_1hour_average,:me_hfo_mass_1hour_average,:ae_hfo_volume_1hour_average,:ae_hfo_mass_1hour_average,:bl_hfo_volume_1hour_average,:bl_hfo_mass_1hour_average,:me_mgo_volume_1hour_average,:me_mgo_mass_1hour_average,:ae_mgo_volume_1hour_average,:ae_mgo_mass_1hour_average,:bl_mgo_volume_1hour_average,:bl_mgo_mass_1hour_average,:me_sfoc,:hull_efficiency,:overall_efficiency,:ae_total_power,:ae_1_power_counter,:ae_2_power_counter,:ae_3_power_counter,:ae_4_power_counter,:ae_total_sfoc,:treserved,:torque_offset)", "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.consistencyLevel": "LOCAL_QUORUM", "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.ttl": "-1", "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.ttlTimeUnit" : "SECONDS", "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.timestampTimeUnit" : "MICROSECONDS", "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.nullToUnset": "true", "topic.dev_90_in_vt04_engine_spm.dev_import_test.spm.deletesEnabled": "false", "topic.dev_90_in_vt04_engine_spm.codec.locale": "en", "topic.dev_90_in_vt04_engine_spm.codec.timeZone": "UTC", "topic.dev_90_in_vt04_engine_spm.codec.timestamp": "CQL_TIMESTAMP", "topic.dev_90_in_vt04_engine_spm.codec.date": "ISO_LOCAL_DATE", "topic.dev_90_in_vt04_engine_spm.codec.time": "ISO_LOCAL_TIME", "topic.dev_90_in_vt04_engine_spm.codec.unit": "MILLISECONDS", "transforms": "Cast", "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value", "transforms.Cast.spec": "timestamp:string", "_transforms": "TimestampConverter", "_transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "_transforms.TimestampConverter.format": "yyyy-MM-dd", "_transforms.TimestampConverter.field": "timestamp", "_transforms.TimestampConverter.target.type": "string" } } ]
Ive pasted the full stack trace is below.
│ [2020-06-04 14:13:28,560] ERROR Problem when getting queryFuture. This is likely a bug in the connector, please report. (com.datastax.kafkaconnector.DseSinkTask) │ java.util.concurrent.ExecutionException: com.datastax.oss.driver.api.core.servererrors.ServerError: Internal Server Error │ at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) │ at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) │ at com.datastax.kafkaconnector.DseSinkTask.lambda$put$4(DseSinkTask.java:151) │ at com.datastax.kafkaconnector.TaskStateManager.waitRunTransitionLogic(TaskStateManager.java:33) │ at com.datastax.kafkaconnector.DseSinkTask.put(DseSinkTask.java:108) │ at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539) │ at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) │ at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) │ at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) │ at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) │ at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) │ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) │ at java.util.concurrent.FutureTask.run(FutureTask.java:266) │ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) │ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) │ at java.lang.Thread.run(Thread.java:748) │ Caused by: com.datastax.oss.driver.api.core.servererrors.ServerError: Internal Server Error