Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

landonvg_125049 avatar image
landonvg_125049 asked ·

Spark Cassandra error with DateRangeType

We added a DateRangeType field to a UDT, and now our Spark job that reads those records is failing with this error, running in AWS:

Exception in thread “main” java.util.NoSuchElementException: key not found: ‘org.apache.cassandra.db.marshal.DateRangeType’
   at scala.collection.MapLike$class.default(MapLike.scala:228

We are running on DSE 5.1.15. Here are the relevant dependencies:

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.1</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <classifier>sources</classifier>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.1.1</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>com.datastax.client</groupId>
      <artifactId>dse-byos_2.11</artifactId>
      <version>5.1.10</version>
      <scope>provided</scope>
    </dependency>

Running locally, we get a different issue:

WARN  [dse-app-client-thread-pool-0] 2019-09-17 13:54:59,114  CodecRegistry.java:357 - Ignoring codec DateRangeCodec ['org.apache.cassandra.db.marshal.DateRangeType' <-> com.datastax.driver.dse.search.DateRange] because it collides with previously registered codec DateRangeCodec ['org.apache.cassandra.db.marshal.DateRangeType' <-> com.datastax.driver.dse.search.DateRange]

We can't find the DateRangeType class anywhere on our classpath. What do we need to do?

dsespark
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Russell Spitzer avatar image
Russell Spitzer answered ·

Based on the posted stack trace, I see that the issue is actually not with the TypeCodec registry. The issue is that the customFromDriver method provided by DseTypeCoverter is not accounting for the DateRange type. This means it won't be able to run with DateRange types unless this converter is updated.

4 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

@Russell Spitzer Where do we go from here? Your other comment below say this will likely need a DSE JIRA. @brent.hale_101199 posted a small example that shows how you can reproduce this behavior, but that post has been removed (it was posted as an answer due to length).

0 Likes 0 · ·

Could you provide an update on https://datastax.jira.com/browse/DSP-19779? We've hit up against this again and wonder if there has been a fix.


What will we need? A new version of dse-byos_2.11-5.1.15.jar?

0 Likes 0 · ·

There are still no public versions which are fixed. There is a difficulty with reading the type in Oss/DSE driver (3.X, 1.X) since the driver type is not serializable. While we can do a patch that will allow us to not fail on reading the schema it would not allow us to read the column.

0 Likes 0 · ·

The fix/patch would be helpful. In our current circumstances, not reading that column would be fine. We defined the column in one of our UDT's as an experiment. We are not using it anymore but because we can't delete the field (since it is in a UDT) it kills us.

0 Likes 0 · ·
brent.hale_101199 avatar image
brent.hale_101199 answered ·

Another reply too big for a reply.


Trying to simplify things. I tried the following; combining things from your docs:

In cqlsh:

CREATE TABLE taxi_trips(id int PRIMARY KEY, pickup_dropoff_range 'DateRangeType');
INSERT INTO taxi_trips(id, pickup_dropoff_range) VALUES (1, '[2017-02-02T14:57:00 TO 2017-02-02T15:10:17]');
INSERT INTO taxi_trips(id, pickup_dropoff_range) VALUES (2, '[2017-02-01T09:00:03 TO 2017-02-01T09:32:00.001]');
INSERT INTO taxi_trips(id, pickup_dropoff_range) VALUES (3, '[2017-02-03T12:10:01.358 TO 2017-02-03T12:19:57]');

Then in the spark shell (dse spark):

val rdd = sc.cassandraTable("gencat", "taxi_trips")

rdd: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:19


val firstRow = rdd.first

java.util.NoSuchElementException: key not found: 'org.apache.cassandra.db.marshal.DateRangeType'


we're on DSE 5.1.15.


What's going on with DateRangeType and Spark?

1 comment Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

@Russell Spitzer could you look at this post. I am duplicating our problem in these very simple steps.

0 Likes 0 · ·
brent.hale_101199 avatar image
brent.hale_101199 answered ·

I tried to put this in a reply but exceeded the character limit for a reply.

This is what we tried:

  private void run(String[] args) {
    processArgs(args);

    final JavaSparkContext sc = SparkUtil.createSparkContext(GetIds.class.getName(), cassUrl, cassUser, cassPwd);

    CodecRegistry.DEFAULT_INSTANCE.register(DateRangeCodec.INSTANCE);

    process(sc);

    SparkUtil.stop(sc);
  }

  private void process(JavaSparkContext sc) {
    LOGGER.info("In process");

    JavaRDD<String> idRDD = javaFunctions(sc)
        .cassandraTable(CASS_KEYSPACE, CliUtil.getTableNameFromResourceType(resourceType), mapColumnTo(String.class))
        .select("id");

    JavaRDD<String> filteredIdRDD = idRDD.filter(SparkUtil.bucketFilterIds(numOfBuckets, bucketNumber));

    filteredIdRDD.saveAsTextFile(outputFilename);
  }

This still results in


2019-09-18 14:59:07,889 INFO cql.CassandraConnector (main) Connected to Cassandra cluster: rmsstage-db

Exception in thread "main" java.util.NoSuchElementException: key not found: 'org.apache.cassandra.db.marshal.DateRangeType'

14 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

@Russell Spitzer the suggested fix doesn't seem to be working.

0 Likes 0 · ·

That is in the main method, so it would only be run in the driver machine. You need to invoke the registration from a singleton (a public static class) that is imported on the executors.

0 Likes 0 · ·

@Russell Spitzer Can you give us instructions and/or example code on how to do that?

0 Likes 0 · ·
Show more comments
Russell Spitzer avatar image
Russell Spitzer answered ·

The local issue just shows the codec being registered twice which isn't an issue. The first error says that the "DateRangeType" which is a Dse Search Specific type is not being found when trying to read a specific column.


In 5.1 I do not believe this type has an automatically registered codec which means you can either get around this by registering the codec or by avoiding reading the column with this type.

To register the codec you would need to invoke something like


CodecRegistry.DEFAULT_INSTANCE.register(DateRangeCodec.instance);



In a singleton (this one make it run on all nodes)

Or if you want to just ignore that column, just select all columns except for the one with this type.

3 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Thanks, we'll give it a try.

0 Likes 0 · ·

Note that we are already avoiding reading the column with that type; we are reading only the ID PK column, which is a String.

0 Likes 0 · ·
To compile, we had to uppercase INSTANCE:
CodecRegistry.DEFAULT_INSTANCE.register(DateRangeCodec.INSTANCE);


0 Likes 0 · ·