Hi All,
I'm hoping someone can help me understand why I'm getting a "InvalidQueryException: Timestamp is not yet supported." when running my kafka connect worker using the datastax kafka connector.
Here is the output detailing both my connector configuration and the exception.
connect | [2021-06-09 03:36:05,624] INFO CassandraSinkTask starting with config: connect | Global configuration: connect | contactPoints: [cassandra.ap-southeast-2.amazonaws.com] connect | port: 9142 connect | maxConcurrentRequests: 500 connect | maxNumberOfRecordsInBatch: 32 connect | jmx: true connect | SSL configuration: connect | provider: JDK connect | hostnameValidation: false connect | keystore.path: connect | keystore.password: [hidden] connect | truststore.path: /etc/confluent/docker/cassandra_truststore.jks connect | truststore.password: [hidden] connect | openssl.keyCertChain: connect | openssl.privateKey: connect | Authentication configuration: connect | provider: PLAIN connect | username: temp_michael-at-226611388535 connect | password: [hidden] connect | gssapi.keyTab: connect | gssapi.principal: connect | gssapi.service: dse connect | Topic configurations: connect | name: enviro.dds.measured.tide, codec settings: locale: en_US, timeZone: UTC, timestamp: ISO_INSTANT, date: ISO_LOCAL_DATE, time: ISO_LOCAL_TIME, unit: MILLISECONDS connect | Table configurations: connect | {keyspace: enviro, table: dds_measured_tide, cl: LOCAL_ONE, ttl: -1, nullToUnset: true, deletesEnabled: true, mapping: connect | source=key connect | time_utc=value.validTime_Utc connect | height_m=value.height_m connect | residual_m=value.residual_m connect | } connect | datastax-java-driver configuration: connect | datastax-java-driver.advanced.metrics.session.cql-requests.refresh-interval=30 seconds connect | datastax-java-driver.advanced.connection.pool.local.size=4 connect | datastax-java-driver.advanced.metrics.node.cql-messages.highest-latency=35 seconds connect | datastax-java-driver.basic.request.timeout=30 seconds connect | datastax-java-driver.basic.load-balancing-policy.local-datacenter=ap-southeast-2 connect | datastax-java-driver.advanced.protocol.compression=none connect | datastax-java-driver.advanced.metrics.session.enabled.1=cql-client-timeouts connect | datastax-java-driver.advanced.metrics.session.enabled.0=cql-requests connect | (com.datastax.oss.kafka.sink.state.LifeCycleManager) connect | [2021-06-09 03:36:05,835] INFO PID obtained through native call to getpid(): 1 (com.datastax.oss.driver.api.core.uuid.Uuids) connect | [2021-06-09 03:36:06,095] INFO DataStax Java driver for Apache Cassandra(R) (com.datastax.oss:java-driver-core) version 4.6.0 (com.datastax.oss.driver.internal.core.DefaultMavenCoordinates) connect | [2021-06-09 03:36:06,183] INFO Could not register Graph extensions; this is normal if Tinkerpop was explicitly excluded from classpath (com.datastax.oss.driver.internal.core.context.InternalDriverContext) connect | [2021-06-09 03:36:06,226] INFO Using native clock for microsecond precision (com.datastax.oss.driver.internal.core.time.Clock) connect | [2021-06-09 03:36:06,555] INFO [s0] Failed to connect with protocol DSE_V2, retrying with DSE_V1 (com.datastax.oss.driver.internal.core.channel.ChannelFactory) connect | [2021-06-09 03:36:06,692] INFO [s0] Failed to connect with protocol DSE_V1, retrying with V4 (com.datastax.oss.driver.internal.core.channel.ChannelFactory) connect | [2021-06-09 03:36:07,094] WARN [s0] Unsupported partitioner 'com.amazonaws.cassandra.DefaultPartitioner', token map will be empty. (com.datastax.oss.driver.internal.core.metadata.token.DefaultTokenFactoryRegistry) connect | [2021-06-09 03:36:08,331] ERROR WorkerSinkTask{id=cassandra-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) connect | java.lang.RuntimeException: Prepare failed for statement: INSERT INTO enviro.dds_measured_tide(source,time_utc,height_m,residual_m) VALUES (:source,:time_utc,:height_m,:residual_m) USING TIMESTAMP :kafka_internal_timestamp or DELETE FROM enviro.dds_measured_tide WHERE source = :source AND time_utc = :time_utc connect | at com.datastax.oss.kafka.sink.state.LifeCycleManager.lambda$prepareStatementsAsync$14(LifeCycleManager.java:654) connect | at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) connect | at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) connect | at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) connect | at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) connect | at com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor.lambda$process$1(CqlPrepareAsyncProcessor.java:74) connect | at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) connect | at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) connect | at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) connect | at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) connect | at com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler.lambda$setFinalResult$2(CqlPrepareHandler.java:251) connect | at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:783) connect | at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) connect | at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) connect | at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.lambda$allDone$1(CompletableFutures.java:64) connect | at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) connect | at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) connect | at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) connect | at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) connect | at com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler.setFinalResult(AdminRequestHandler.java:200) connect | at com.datastax.oss.driver.internal.core.adminrequest.ThrottledAdminRequestHandler.setFinalResult(ThrottledAdminRequestHandler.java:158) connect | at com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler.onResponse(AdminRequestHandler.java:190) connect | at com.datastax.oss.driver.internal.core.channel.InFlightHandler.channelRead(InFlightHandler.java:255) connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) connect | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) connect | at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) connect | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) connect | at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) connect | at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) connect | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) connect | at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1470) connect | at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1219) connect | at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1266) connect | at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:498) connect | at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:437) connect | at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) connect | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) connect | at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) connect | at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) connect | at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) connect | at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) connect | at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) connect | at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) connect | at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) connect | at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) connect | at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) connect | at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) connect | at java.base/java.lang.Thread.run(Thread.java:829) connect | Caused by: com.datastax.oss.driver.api.core.servererrors.InvalidQueryException: Timestamp is not yet supported. connect | [2021-06-09 03:36:08,335] INFO Task is stopped. (com.datastax.oss.kafka.sink.TaskStateManager) connect | [2021-06-09 03:36:08,339] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics) connect | [2021-06-09 03:36:08,339] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics) connect | [2021-06-09 03:36:08,340] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics) connect | [2021-06-09 03:36:08,348] INFO App info kafka.consumer for connector-consumer-cassandra-sink-connector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser)
Here is a sample of some events on the enviro.dds.measured.tide topic with the key on the left, json value on the right.
enviro.lyttelton.measured.tide.tide.tides ☺��{"height_m": 0.9828569037656907, "residual_m": 0.1378569037656907, "validTime_Utc": "2021-06-03T07:00:00Z", "astro_m": 0.845} enviro.lyttelton.measured.tide.tide.tides ☺��{"height_m": 0.9743093645484953, "residual_m": 0.10530936454849527, "validTime_Utc": "2021-06-03T07:05:00Z", "astro_m": 0.869} enviro.lyttelton.measured.tide.tide.tides ☺��{"height_m": 1.007846153846154, "residual_m": 0.11384615384615404, "validTime_Utc": "2021-06-03T07:10:00Z", "astro_m": 0.894} enviro.lyttelton.measured.tide.tide.tides ☺��{"height_m": 1.0251747899159667, "residual_m": 0.10617478991596663, "validTime_Utc": "2021-06-03T07:15:00Z", "astro_m": 0.919} enviro.lyttelton.measured.tide.tide.tides ☺��{"height_m": 1.0367911073825506, "residual_m": 0.0907911073825507, "validTime_Utc": "2021-06-03T07:20:00Z", "astro_m": 0.946} enviro.lyttelton.measured.tide.tide.tides ☺��{"height_m": 1.0381566164154108, "residual_m": 0.06515661641541082, "validTime_Utc": "2021-06-03T07:25:00Z", "astro_m": 0.973} enviro.lyttelton.measured.tide.tide.tides ☺��{"height_m": 1.050763247863248, "residual_m": 0.04976324786324815, "validTime_Utc": "2021-06-03T07:30:00Z", "astro_m": 1.001} enviro.lyttelton.measured.tide.tide.tides ☺��{"height_m": 1.0799898904802026, "residual_m": 0.049989890480202526, "validTime_Utc": "2021-06-03T07:35:00Z", "astro_m": 1.03} enviro.lyttelton.measured.tide.tide.tides ☺��{"height_m": 1.0995008389261747, "residual_m": 0.03950083892617462, "validTime_Utc": "2021-06-03T07:40:00Z", "astro_m": 1.06} enviro.lyttelton.measured.tide.tide.tides ☺��{"height_m": 1.1511108312342573, "residual_m": 0.0611108312342572, "validTime_Utc": "2021-06-03T07:45:00Z", "astro_m": 1.09}
Here is the structure of my table
select * from system_schema.columns where keyspace_name = 'enviro' AND table_name = 'dds_measured_tide'; "keyspace_name","clustering_order","kind","column_name_bytes","column_name","position","type","table_name" "enviro","none","partition_key","c291cmNl","source",0,"ascii","dds_measured_tide" "enviro","asc","clustering","dGltZV91dGM=","time_utc",0,"timestamp","dds_measured_tide" "enviro","none","regular","aGVpZ2h0X20=","height_m",-1,"float","dds_measured_tide" "enviro","none","regular","cmVzaWR1YWxfbQ==","residual_m",-1,"float","dds_measured_tide"
My suspicion is that this error is telling me that the format used by the codec in parsing the value.validTime_Utc field from my events isn't valid. But as far as I can see my configuration to use ISO_INSTANT should support the format given by value.validTime_Utc.
My only other suspicion is that it is something to do with the "using timestamp" portion of the insert and something wrong with the kafka timestamp.
I figure it's not working because either my suspicions above are incorrect and I'm misunderstanding or missing information, or I have incorrect configuration.
Relevant documentation I have reviewed is here. I also took a search of the forums but couldn't find a similar problem.
https://docs.datastax.com/en/kafka/doc/kafka/configuration_reference/kafkaDates.html
https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html
Thanks.