Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

Robospecta avatar image
Robospecta asked ·

Why do I get InvalidQueryException: Timestamp is not yet supported?

Hi All,

I'm hoping someone can help me understand why I'm getting a "InvalidQueryException: Timestamp is not yet supported." when running my kafka connect worker using the datastax kafka connector.

Here is the output detailing both my connector configuration and the exception.

connect | [2021-06-09 03:36:05,624] INFO CassandraSinkTask starting with config:
connect | Global configuration:
connect | contactPoints: [cassandra.ap-southeast-2.amazonaws.com]
connect | port: 9142
connect | maxConcurrentRequests: 500
connect | maxNumberOfRecordsInBatch: 32
connect | jmx: true
connect | SSL configuration:
connect | provider: JDK
connect | hostnameValidation: false
connect | keystore.path:
connect | keystore.password: [hidden]
connect | truststore.path: /etc/confluent/docker/cassandra_truststore.jks
connect | truststore.password: [hidden]
connect | openssl.keyCertChain:
connect | openssl.privateKey:
connect | Authentication configuration:
connect | provider: PLAIN
connect | username: temp_michael-at-226611388535
connect | password: [hidden]
connect | gssapi.keyTab:
connect | gssapi.principal:
connect | gssapi.service: dse
connect | Topic configurations:
connect | name: enviro.dds.measured.tide, codec settings: locale: en_US, timeZone: UTC, timestamp: ISO_INSTANT, date: ISO_LOCAL_DATE, time: ISO_LOCAL_TIME, unit: MILLISECONDS
connect | Table configurations:
connect | {keyspace: enviro, table: dds_measured_tide, cl: LOCAL_ONE, ttl: -1, nullToUnset: true, deletesEnabled: true, mapping:
connect | source=key
connect | time_utc=value.validTime_Utc
connect | height_m=value.height_m
connect | residual_m=value.residual_m
connect | }
connect | datastax-java-driver configuration:
connect | datastax-java-driver.advanced.metrics.session.cql-requests.refresh-interval=30 seconds
connect | datastax-java-driver.advanced.connection.pool.local.size=4
connect | datastax-java-driver.advanced.metrics.node.cql-messages.highest-latency=35 seconds
connect | datastax-java-driver.basic.request.timeout=30 seconds
connect | datastax-java-driver.basic.load-balancing-policy.local-datacenter=ap-southeast-2
connect | datastax-java-driver.advanced.protocol.compression=none
connect | datastax-java-driver.advanced.metrics.session.enabled.1=cql-client-timeouts
connect | datastax-java-driver.advanced.metrics.session.enabled.0=cql-requests
connect | (com.datastax.oss.kafka.sink.state.LifeCycleManager)
connect | [2021-06-09 03:36:05,835] INFO PID obtained through native call to getpid(): 1 (com.datastax.oss.driver.api.core.uuid.Uuids)
connect | [2021-06-09 03:36:06,095] INFO DataStax Java driver for Apache Cassandra(R) (com.datastax.oss:java-driver-core) version 4.6.0 (com.datastax.oss.driver.internal.core.DefaultMavenCoordinates)
connect | [2021-06-09 03:36:06,183] INFO Could not register Graph extensions; this is normal if Tinkerpop was explicitly excluded from classpath (com.datastax.oss.driver.internal.core.context.InternalDriverContext)
connect | [2021-06-09 03:36:06,226] INFO Using native clock for microsecond precision (com.datastax.oss.driver.internal.core.time.Clock)
connect | [2021-06-09 03:36:06,555] INFO [s0] Failed to connect with protocol DSE_V2, retrying with DSE_V1 (com.datastax.oss.driver.internal.core.channel.ChannelFactory)
connect | [2021-06-09 03:36:06,692] INFO [s0] Failed to connect with protocol DSE_V1, retrying with V4 (com.datastax.oss.driver.internal.core.channel.ChannelFactory)
connect | [2021-06-09 03:36:07,094] WARN [s0] Unsupported partitioner 'com.amazonaws.cassandra.DefaultPartitioner', token map will be empty. (com.datastax.oss.driver.internal.core.metadata.token.DefaultTokenFactoryRegistry)
connect | [2021-06-09 03:36:08,331] ERROR WorkerSinkTask{id=cassandra-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
connect | java.lang.RuntimeException: Prepare failed for statement: INSERT INTO enviro.dds_measured_tide(source,time_utc,height_m,residual_m) VALUES (:source,:time_utc,:height_m,:residual_m) USING TIMESTAMP :kafka_internal_timestamp or DELETE FROM enviro.dds_measured_tide WHERE source = :source AND time_utc = :time_utc
connect | at com.datastax.oss.kafka.sink.state.LifeCycleManager.lambda$prepareStatementsAsync$14(LifeCycleManager.java:654)
connect | at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
connect | at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
connect | at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
connect | at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
connect | at com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor.lambda$process$1(CqlPrepareAsyncProcessor.java:74)
connect | at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
connect | at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
connect | at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
connect | at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
connect | at com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler.lambda$setFinalResult$2(CqlPrepareHandler.java:251)
connect | at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:783)
connect | at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
connect | at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
connect | at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.lambda$allDone$1(CompletableFutures.java:64)
connect | at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
connect | at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
connect | at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
connect | at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
connect | at com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler.setFinalResult(AdminRequestHandler.java:200)
connect | at com.datastax.oss.driver.internal.core.adminrequest.ThrottledAdminRequestHandler.setFinalResult(ThrottledAdminRequestHandler.java:158)
connect | at com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler.onResponse(AdminRequestHandler.java:190)
connect | at com.datastax.oss.driver.internal.core.channel.InFlightHandler.channelRead(InFlightHandler.java:255)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
connect | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
connect | at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
connect | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
connect | at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
connect | at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
connect | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
connect | at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1470)
connect | at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1219)
connect | at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1266)
connect | at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:498)
connect | at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:437)
connect | at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
connect | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
connect | at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
connect | at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
connect | at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
connect | at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
connect | at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
connect | at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
connect | at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
connect | at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
connect | at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
connect | at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
connect | at java.base/java.lang.Thread.run(Thread.java:829)
connect | Caused by: com.datastax.oss.driver.api.core.servererrors.InvalidQueryException: Timestamp is not yet supported.
connect | [2021-06-09 03:36:08,335] INFO Task is stopped. (com.datastax.oss.kafka.sink.TaskStateManager)
connect | [2021-06-09 03:36:08,339] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
connect | [2021-06-09 03:36:08,339] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
connect | [2021-06-09 03:36:08,340] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
connect | [2021-06-09 03:36:08,348] INFO App info kafka.consumer for connector-consumer-cassandra-sink-connector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser)

Here is a sample of some events on the enviro.dds.measured.tide topic with the key on the left, json value on the right.

enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 0.9828569037656907, "residual_m": 0.1378569037656907, "validTime_Utc": "2021-06-03T07:00:00Z", "astro_m": 0.845}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 0.9743093645484953, "residual_m": 0.10530936454849527, "validTime_Utc": "2021-06-03T07:05:00Z", "astro_m": 0.869}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.007846153846154, "residual_m": 0.11384615384615404, "validTime_Utc": "2021-06-03T07:10:00Z", "astro_m": 0.894}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.0251747899159667, "residual_m": 0.10617478991596663, "validTime_Utc": "2021-06-03T07:15:00Z", "astro_m": 0.919}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.0367911073825506, "residual_m": 0.0907911073825507, "validTime_Utc": "2021-06-03T07:20:00Z", "astro_m": 0.946}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.0381566164154108, "residual_m": 0.06515661641541082, "validTime_Utc": "2021-06-03T07:25:00Z", "astro_m": 0.973}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.050763247863248, "residual_m": 0.04976324786324815, "validTime_Utc": "2021-06-03T07:30:00Z", "astro_m": 1.001}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.0799898904802026, "residual_m": 0.049989890480202526, "validTime_Utc": "2021-06-03T07:35:00Z", "astro_m": 1.03}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.0995008389261747, "residual_m": 0.03950083892617462, "validTime_Utc": "2021-06-03T07:40:00Z", "astro_m": 1.06}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.1511108312342573, "residual_m": 0.0611108312342572, "validTime_Utc": "2021-06-03T07:45:00Z", "astro_m": 1.09}

Here is the structure of my table

select * from system_schema.columns where keyspace_name = 'enviro' AND table_name = 'dds_measured_tide';

"keyspace_name","clustering_order","kind","column_name_bytes","column_name","position","type","table_name"
"enviro","none","partition_key","c291cmNl","source",0,"ascii","dds_measured_tide"
"enviro","asc","clustering","dGltZV91dGM=","time_utc",0,"timestamp","dds_measured_tide"
"enviro","none","regular","aGVpZ2h0X20=","height_m",-1,"float","dds_measured_tide"
"enviro","none","regular","cmVzaWR1YWxfbQ==","residual_m",-1,"float","dds_measured_tide"

My suspicion is that this error is telling me that the format used by the codec in parsing the value.validTime_Utc field from my events isn't valid. But as far as I can see my configuration to use ISO_INSTANT should support the format given by value.validTime_Utc.

My only other suspicion is that it is something to do with the "using timestamp" portion of the insert and something wrong with the kafka timestamp.

I figure it's not working because either my suspicions above are incorrect and I'm misunderstanding or missing information, or I have incorrect configuration.

Relevant documentation I have reviewed is here. I also took a search of the forums but couldn't find a similar problem.

https://docs.datastax.com/en/kafka/doc/kafka/configuration_reference/kafkaDates.html

https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html

Thanks.

kafka-connector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

0 Answers