Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

Robospecta avatar image
Robospecta asked Erick Ramirez commented

Why do I get InvalidQueryException: Timestamp is not yet supported?

Hi All,

I'm hoping someone can help me understand why I'm getting a "InvalidQueryException: Timestamp is not yet supported." when running my kafka connect worker using the datastax kafka connector.

Here is the output detailing both my connector configuration and the exception.

connect | [2021-06-09 03:36:05,624] INFO CassandraSinkTask starting with config:
connect | Global configuration:
connect | contactPoints: [cassandra.ap-southeast-2.amazonaws.com]
connect | port: 9142
connect | maxConcurrentRequests: 500
connect | maxNumberOfRecordsInBatch: 32
connect | jmx: true
connect | SSL configuration:
connect | provider: JDK
connect | hostnameValidation: false
connect | keystore.path:
connect | keystore.password: [hidden]
connect | truststore.path: /etc/confluent/docker/cassandra_truststore.jks
connect | truststore.password: [hidden]
connect | openssl.keyCertChain:
connect | openssl.privateKey:
connect | Authentication configuration:
connect | provider: PLAIN
connect | username: temp_michael-at-226611388535
connect | password: [hidden]
connect | gssapi.keyTab:
connect | gssapi.principal:
connect | gssapi.service: dse
connect | Topic configurations:
connect | name: enviro.dds.measured.tide, codec settings: locale: en_US, timeZone: UTC, timestamp: ISO_INSTANT, date: ISO_LOCAL_DATE, time: ISO_LOCAL_TIME, unit: MILLISECONDS
connect | Table configurations:
connect | {keyspace: enviro, table: dds_measured_tide, cl: LOCAL_ONE, ttl: -1, nullToUnset: true, deletesEnabled: true, mapping:
connect | source=key
connect | time_utc=value.validTime_Utc
connect | height_m=value.height_m
connect | residual_m=value.residual_m
connect | }
connect | datastax-java-driver configuration:
connect | datastax-java-driver.advanced.metrics.session.cql-requests.refresh-interval=30 seconds
connect | datastax-java-driver.advanced.connection.pool.local.size=4
connect | datastax-java-driver.advanced.metrics.node.cql-messages.highest-latency=35 seconds
connect | datastax-java-driver.basic.request.timeout=30 seconds
connect | datastax-java-driver.basic.load-balancing-policy.local-datacenter=ap-southeast-2
connect | datastax-java-driver.advanced.protocol.compression=none
connect | datastax-java-driver.advanced.metrics.session.enabled.1=cql-client-timeouts
connect | datastax-java-driver.advanced.metrics.session.enabled.0=cql-requests
connect | (com.datastax.oss.kafka.sink.state.LifeCycleManager)
connect | [2021-06-09 03:36:05,835] INFO PID obtained through native call to getpid(): 1 (com.datastax.oss.driver.api.core.uuid.Uuids)
connect | [2021-06-09 03:36:06,095] INFO DataStax Java driver for Apache Cassandra(R) (com.datastax.oss:java-driver-core) version 4.6.0 (com.datastax.oss.driver.internal.core.DefaultMavenCoordinates)
connect | [2021-06-09 03:36:06,183] INFO Could not register Graph extensions; this is normal if Tinkerpop was explicitly excluded from classpath (com.datastax.oss.driver.internal.core.context.InternalDriverContext)
connect | [2021-06-09 03:36:06,226] INFO Using native clock for microsecond precision (com.datastax.oss.driver.internal.core.time.Clock)
connect | [2021-06-09 03:36:06,555] INFO [s0] Failed to connect with protocol DSE_V2, retrying with DSE_V1 (com.datastax.oss.driver.internal.core.channel.ChannelFactory)
connect | [2021-06-09 03:36:06,692] INFO [s0] Failed to connect with protocol DSE_V1, retrying with V4 (com.datastax.oss.driver.internal.core.channel.ChannelFactory)
connect | [2021-06-09 03:36:07,094] WARN [s0] Unsupported partitioner 'com.amazonaws.cassandra.DefaultPartitioner', token map will be empty. (com.datastax.oss.driver.internal.core.metadata.token.DefaultTokenFactoryRegistry)
connect | [2021-06-09 03:36:08,331] ERROR WorkerSinkTask{id=cassandra-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
connect | java.lang.RuntimeException: Prepare failed for statement: INSERT INTO enviro.dds_measured_tide(source,time_utc,height_m,residual_m) VALUES (:source,:time_utc,:height_m,:residual_m) USING TIMESTAMP :kafka_internal_timestamp or DELETE FROM enviro.dds_measured_tide WHERE source = :source AND time_utc = :time_utc
connect | at com.datastax.oss.kafka.sink.state.LifeCycleManager.lambda$prepareStatementsAsync$14(LifeCycleManager.java:654)
connect | at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
connect | at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
connect | at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
connect | at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
connect | at com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor.lambda$process$1(CqlPrepareAsyncProcessor.java:74)
connect | at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
connect | at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
connect | at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
connect | at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
connect | at com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler.lambda$setFinalResult$2(CqlPrepareHandler.java:251)
connect | at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:783)
connect | at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
connect | at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
connect | at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.lambda$allDone$1(CompletableFutures.java:64)
connect | at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
connect | at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
connect | at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
connect | at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
connect | at com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler.setFinalResult(AdminRequestHandler.java:200)
connect | at com.datastax.oss.driver.internal.core.adminrequest.ThrottledAdminRequestHandler.setFinalResult(ThrottledAdminRequestHandler.java:158)
connect | at com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler.onResponse(AdminRequestHandler.java:190)
connect | at com.datastax.oss.driver.internal.core.channel.InFlightHandler.channelRead(InFlightHandler.java:255)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
connect | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
connect | at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
connect | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
connect | at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
connect | at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
connect | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
connect | at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1470)
connect | at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1219)
connect | at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1266)
connect | at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:498)
connect | at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:437)
connect | at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
connect | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
connect | at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
connect | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
connect | at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
connect | at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
connect | at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
connect | at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
connect | at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
connect | at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
connect | at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
connect | at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
connect | at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
connect | at java.base/java.lang.Thread.run(Thread.java:829)
connect | Caused by: com.datastax.oss.driver.api.core.servererrors.InvalidQueryException: Timestamp is not yet supported.
connect | [2021-06-09 03:36:08,335] INFO Task is stopped. (com.datastax.oss.kafka.sink.TaskStateManager)
connect | [2021-06-09 03:36:08,339] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
connect | [2021-06-09 03:36:08,339] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
connect | [2021-06-09 03:36:08,340] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
connect | [2021-06-09 03:36:08,348] INFO App info kafka.consumer for connector-consumer-cassandra-sink-connector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser)

Here is a sample of some events on the enviro.dds.measured.tide topic with the key on the left, json value on the right.

enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 0.9828569037656907, "residual_m": 0.1378569037656907, "validTime_Utc": "2021-06-03T07:00:00Z", "astro_m": 0.845}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 0.9743093645484953, "residual_m": 0.10530936454849527, "validTime_Utc": "2021-06-03T07:05:00Z", "astro_m": 0.869}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.007846153846154, "residual_m": 0.11384615384615404, "validTime_Utc": "2021-06-03T07:10:00Z", "astro_m": 0.894}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.0251747899159667, "residual_m": 0.10617478991596663, "validTime_Utc": "2021-06-03T07:15:00Z", "astro_m": 0.919}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.0367911073825506, "residual_m": 0.0907911073825507, "validTime_Utc": "2021-06-03T07:20:00Z", "astro_m": 0.946}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.0381566164154108, "residual_m": 0.06515661641541082, "validTime_Utc": "2021-06-03T07:25:00Z", "astro_m": 0.973}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.050763247863248, "residual_m": 0.04976324786324815, "validTime_Utc": "2021-06-03T07:30:00Z", "astro_m": 1.001}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.0799898904802026, "residual_m": 0.049989890480202526, "validTime_Utc": "2021-06-03T07:35:00Z", "astro_m": 1.03}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.0995008389261747, "residual_m": 0.03950083892617462, "validTime_Utc": "2021-06-03T07:40:00Z", "astro_m": 1.06}
enviro.lyttelton.measured.tide.tide.tides       ☺��{"height_m": 1.1511108312342573, "residual_m": 0.0611108312342572, "validTime_Utc": "2021-06-03T07:45:00Z", "astro_m": 1.09}

Here is the structure of my table

select * from system_schema.columns where keyspace_name = 'enviro' AND table_name = 'dds_measured_tide';

"keyspace_name","clustering_order","kind","column_name_bytes","column_name","position","type","table_name"
"enviro","none","partition_key","c291cmNl","source",0,"ascii","dds_measured_tide"
"enviro","asc","clustering","dGltZV91dGM=","time_utc",0,"timestamp","dds_measured_tide"
"enviro","none","regular","aGVpZ2h0X20=","height_m",-1,"float","dds_measured_tide"
"enviro","none","regular","cmVzaWR1YWxfbQ==","residual_m",-1,"float","dds_measured_tide"

My suspicion is that this error is telling me that the format used by the codec in parsing the value.validTime_Utc field from my events isn't valid. But as far as I can see my configuration to use ISO_INSTANT should support the format given by value.validTime_Utc.

My only other suspicion is that it is something to do with the "using timestamp" portion of the insert and something wrong with the kafka timestamp.

I figure it's not working because either my suspicions above are incorrect and I'm misunderstanding or missing information, or I have incorrect configuration.

Relevant documentation I have reviewed is here. I also took a search of the forums but couldn't find a similar problem.

https://docs.datastax.com/en/kafka/doc/kafka/configuration_reference/kafkaDates.html

https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html

Thanks.

kafka-connector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

Erick Ramirez avatar image
Erick Ramirez answered Erick Ramirez commented

I've been going through the code for both the Kafka sink connector and the Java driver and neither appear to generate the error Timestamp is not yet supported.

I have a feeling the Java driver is just propagating the error generated by the cluster. Is it possible that you're connecting to a cluster that is not Apache Cassandra?

We have received reports of weird behaviours when connecting to AWS Keyspaces but we don't test the drivers or connectors against it so it isn't something we can validate. Cheers!

2 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Ah yes. I imagine you're right. We have been attempting to use the driver against AWS Keyspaces.

Apologies for missing that information in my initial post, I imagine it might have saved some headache for you having it upfront. Appreciate the time you've spent looking at this.

0 Likes 0 ·

Not a problem, mate. I tried to search through Amazon's website to see if timestamps are not supported but I couldn't find anything relevant. Cheers!

1 Like 1 ·