Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

Anatoly avatar image
Anatoly asked Erick Ramirez edited

Not able to do direct join on clustering key

Hello!

There is a question about C* Spark connector (spark 3.1.1, connector 3.1.0), trying to work the direct join on the table with a clustering key.

There is a couple of tables in C*:

create table test_ks.skv(
 s varchar,
 k int,
 v int,
 primary key (s, k)
);

create table test_ks.skvv(
 s varchar,
 k int, 
 v int, 
 v1 int, 
 primary key ((s, k), v)
);

Creating join on the following dataset:

val stringsDf = spark.range(1, 100).selectExpr("cast(id as string) skey")

When I'm doing join on the table without clustering key there is a direct join in the plan:

val resultDf = skvDf.join(stringsDf, stringsDf("skey") === skvDf("s") && skvDf("k") === lit(5))
== Physical Plan ==
Cassandra Direct Join [s = skey#2] test_ks.skv - Reading (s, k, v) Pushed {("k" = ?:5)}
+- *(1) Project [cast(id#0L as string) AS skey#2]
 +- *(1) Range (1, 100, step=1, splits=16)

But in the case of the table with clustering key there isn't any direct join :(

val resultDf = skvvDf.join(stringsDf, stringsDf("skey") === skvvDf("s") && skvvDf("k") === lit(5))
== Physical Plan ==
*(2) BroadcastHashJoin [s#4], [skey#2], Inner, BuildRight, false
:- *(2) Filter (k#5 = 5)
: +- BatchScan[s#4, k#5, v#6, v1#7] Cassandra Scan: test_ks.skvv
 - Cassandra Filters: []
 - Requested Columns: [s,k,v,v1]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [id=#44]
 +- *(1) Project [cast(id#0L as string) AS skey#2]
 +- *(1) Range (1, 100, step=1, splits=16)

Moreover adding the clustering key in the join condition doesn't solve the problem:

val resultDf = skvvDf.join(stringsDf, stringsDf("skey") === skvvDf("s") && skvvDf("k") === lit(5) && skvvDf("v") === lit(10))
== Physical Plan ==
*(2) BroadcastHashJoin [s#4], [skey#2], Inner, BuildRight, false
:- *(2) Filter (k#5 = 5)
: +- BatchScan[s#4, k#5, v#6, v1#7] Cassandra Scan: test_ks.skvv
 - Cassandra Filters: [["v" = ?, 10]]
 - Requested Columns: [s,k,v,v1]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [id=#73]
 +- *(1) Project [cast(id#0L as string) AS skey#2]
 +- *(1) Range (1, 100, step=1, splits=16)

Is it expected behavior and why?

[UPDATE] Calling directJoin with AlwaysOn doesn't work either:

scala> val resultDf = skvvDf.join(stringsDf.directJoin(AlwaysOn), stringsDf("skey") === skvvDf("s") && skvvDf("k") === lit(5))
resultDf: org.apache.spark.sql.DataFrame = [s: string, k: int ... 3 more fields]

scala> resultDf.explain(true)
== Parsed Logical Plan ==
Join Inner, ((skey#2 = s#4) AND (k#5 = 5))
:- SubqueryAlias cassandracatalog.test_ks.skvv
:  +- RelationV2[s#4, k#5, v#6, v1#7] skvv
+- Project [cast(id#0L as string) AS skey#2]
   +- Range (1, 100, step=1, splits=Some(16))

== Analyzed Logical Plan ==
s: string, k: int, v: int, v1: int, skey: string
Join Inner, ((skey#2 = s#4) AND (k#5 = 5))
:- SubqueryAlias cassandracatalog.test_ks.skvv
:  +- RelationV2[s#4, k#5, v#6, v1#7] skvv
+- Project [cast(id#0L as string) AS skey#2]
   +- Range (1, 100, step=1, splits=Some(16))

== Optimized Logical Plan ==
Join Inner, (skey#2 = s#4)
:- Filter (k#5 = 5)
:  +- RelationV2[s#4, k#5, v#6, v1#7] skvv
+- Project [cast(id#0L as string) AS skey#2]
   +- Range (1, 100, step=1, splits=Some(16))

== Physical Plan ==
*(2) BroadcastHashJoin [s#4], [skey#2], Inner, BuildRight, false
:- *(2) Filter (k#5 = 5)
:  +- BatchScan[s#4, k#5, v#6, v1#7] Cassandra Scan: test_ks.skvv
 - Cassandra Filters: []
 - Requested Columns: [s,k,v,v1]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [id=#54]
   +- *(1) Project [cast(id#0L as string) AS skey#2]
      +- *(1) Range (1, 100, step=1, splits=16)
spark-cassandra-connector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

Erick Ramirez avatar image
Erick Ramirez answered Anatoly commented

If you haven't already seen it, please try joinWithCassandraTable().

Have a look at the examples in Performing Efficient Joins with Cassandra. Cheers!

5 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

It works well with joinWithCassandraTable():

scala> val stringsRDD = sc.parallelize(0 to 5).map(_.toString)
stringsRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at map at <console>:24

scala> val intsRDD = sc.parallelize(0 to 5)
intsRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:27

scala> val strIntRDD = stringsRDD.zip(intsRDD)
strIntRDD: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[3] at zip at <console>:30

scala> val internalJoin = strIntRDD.joinWithCassandraTable("test_ks", "skvv")
internalJoin: com.datastax.spark.connector.rdd.CassandraJoinRDD[(String, Int),com.datastax.spark.connector.CassandraRow] = CassandraJoinRDD[4] at RDD at CassandraRDD.scala:18

scala> internalJoin.collect.foreach(println)
((0,0),CassandraRow{s: 0, k: 0, v: 0, v1: 0})
((3,3),CassandraRow{s: 3, k: 3, v: -3, v1: -3})

But why doesn't it work with data frames? Should I report about bug?

0 Likes 0 ·

Could you try explicitly setting directJoinSetting to on? This should force a direct join regardless of the threshold (directJoinSizeRatio). Details here. Cheers!

0 Likes 0 ·
Anatoly avatar image Anatoly Erick Ramirez ♦♦ ·
This setting was switched on in my environment during above tests.
0 Likes 0 ·

One more thing to try is to call the directJoin() function so that it is AlwaysOn:

val resultDf = skvvDf.join(stringsDf.directJoin(AlwaysOn), ... )
0 Likes 0 ·

[Update posted in question body]

0 Likes 0 ·