Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

tomas.bartalos_162914 avatar image
tomas.bartalos_162914 asked ·

Issue with Cassandra-side pushdown in Spark connector

I'd like to join spark dataframe containing partition keys with cassandra table avoiding full-table scan. I've read this feature is called "direct join" and is available directly on Dataframe in version 3.0.0 via DSEDirectJoinExec. Since I'm using OSS Spark 2.4.2, I guess I'm stuck with rdd approach.

I have problem with conversion from RDD[CassandraSQLRow] to Dataframe. I've tried to follow Russel Spitzer's answer:

//idsDF is Dataframe with cassandra partition keys
val joined = idsDF.rdd.joinWithCassandraTable[CassandraSQLRow](key, table)
val right = rdd.map { case (_, right) => right }
val rightAsRow = right.asInstanceOf[RDD[Row]]

val schema = spark.read.cassandraFormat(key, table).load().schema
spark.createDataFrame(rightAsRow, schema)

The result is:

org.apache.spark.unsafe.types.UTF8String is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, id), StringType), true, false) AS id#358

When I create my own RowReaderFactory with same implementation as CassandraSQLRowReader just UTF8String replaced with String, it works.

What am I doing wrong ?

Spark version: 2.4.2
spark-cassandra-connector:2.4.2
spark-cassandra-connectorjoindataframerdd
4 comments
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

The main issue is `createDataframe` expects you will only use Spark's known external types so you can't use CassandraSqlRow because that actually reads back into types for Spark's internal api (which require UTF8 and don't accept Strings.) Instead you just just read back into CassandraRows (no Sql) or a case class with standard scala types. Sorry I forgot about that aspect of the CassandraSqlRow class.

1 Like 1 · ·

Thank you for quick reply, when I use RDD[CassandraRow] (default behaviour), then I have to map from c: CassandraRow => Row(c.columnValues). And then I have problem with type conversion like java.util.Date => java.sql.Timestamp. I was looking for most optimal way how to convert CassandraJoinRDD -> Dataframe. After all I ended up using CassandraSQLRow with map UTF8String -> String. But maybe there is a better way how to get to spark's external types ?

0 Likes 0 · ·
Russell Spitzer avatar image Russell Spitzer tomas.bartalos_162914 ·

There probably aren't any benefits to any particular conversion approach, I imagine they will all probably be more or less the same. For simplicity I generally just like to make a Case Class which matches the Spark Sql Types for my C* schema.

0 Likes 0 · ·
Show more comments
Russell Spitzer avatar image
Russell Spitzer answered ·

The main issue is `createDataframe` expects you will only use Spark's known external types so you can't use CassandraSqlRow because that actually reads back into types for Spark's internal api (which require UTF8 and don't accept Strings.) Instead you just just read back into CassandraRows (no Sql) or a case class with standard scala types. Sorry I forgot about that aspect of the CassandraSqlRow class.

Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

tomas.bartalos_162914 avatar image
tomas.bartalos_162914 answered ·

As Russell Spitzer concluded, for effortless conversion its best to use Case classes. If you need to infer schema from Cassandra without using Case classes, this is a workaround:

val joined = idsDF.rdd.joinWithCassandraTable[CassandraSQLRow](key, table)
val joinedRight = joined.map { case (_, right) =>
  val colsWithoutUtf8 = right.columnValues.map {
    case s: UTF8String => s.toString
    case c => c
  }
  Row(colsWithoutUtf8:_*)
}
 
val schema = spark.read.cassandraFormat(key, table).load().schema
spark.createDataFrame(rightAsRow, schema)


Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.