question

Anatoly avatar image
Anatoly asked Erick Ramirez edited

Not able to do direct join on clustering key

Hello!

There is a question about C* Spark connector (spark 3.1.1, connector 3.1.0), trying to work the direct join on the table with a clustering key.

There is a couple of tables in C*:

create table test_ks.skv(
 s varchar,
 k int,
 v int,
 primary key (s, k)
);

create table test_ks.skvv(
 s varchar,
 k int, 
 v int, 
 v1 int, 
 primary key ((s, k), v)
);

Creating join on the following dataset:

val stringsDf = spark.range(1, 100).selectExpr("cast(id as string) skey")

When I'm doing join on the table without clustering key there is a direct join in the plan:

val resultDf = skvDf.join(stringsDf, stringsDf("skey") === skvDf("s") && skvDf("k") === lit(5))
== Physical Plan ==
Cassandra Direct Join [s = skey#2] test_ks.skv - Reading (s, k, v) Pushed {("k" = ?:5)}
+- *(1) Project [cast(id#0L as string) AS skey#2]
 +- *(1) Range (1, 100, step=1, splits=16)

But in the case of the table with clustering key there isn't any direct join :(

val resultDf = skvvDf.join(stringsDf, stringsDf("skey") === skvvDf("s") && skvvDf("k") === lit(5))
== Physical Plan ==
*(2) BroadcastHashJoin [s#4], [skey#2], Inner, BuildRight, false
:- *(2) Filter (k#5 = 5)
: +- BatchScan[s#4, k#5, v#6, v1#7] Cassandra Scan: test_ks.skvv
 - Cassandra Filters: []
 - Requested Columns: [s,k,v,v1]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [id=#44]
 +- *(1) Project [cast(id#0L as string) AS skey#2]
 +- *(1) Range (1, 100, step=1, splits=16)

Moreover adding the clustering key in the join condition doesn't solve the problem:

val resultDf = skvvDf.join(stringsDf, stringsDf("skey") === skvvDf("s") && skvvDf("k") === lit(5) && skvvDf("v") === lit(10))
== Physical Plan ==
*(2) BroadcastHashJoin [s#4], [skey#2], Inner, BuildRight, false
:- *(2) Filter (k#5 = 5)
: +- BatchScan[s#4, k#5, v#6, v1#7] Cassandra Scan: test_ks.skvv
 - Cassandra Filters: [["v" = ?, 10]]
 - Requested Columns: [s,k,v,v1]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [id=#73]
 +- *(1) Project [cast(id#0L as string) AS skey#2]
 +- *(1) Range (1, 100, step=1, splits=16)

Is it expected behavior and why?

[UPDATE] Calling directJoin with AlwaysOn doesn't work either:

scala> val resultDf = skvvDf.join(stringsDf.directJoin(AlwaysOn), stringsDf("skey") === skvvDf("s") && skvvDf("k") === lit(5))
resultDf: org.apache.spark.sql.DataFrame = [s: string, k: int ... 3 more fields]

scala> resultDf.explain(true)
== Parsed Logical Plan ==
Join Inner, ((skey#2 = s#4) AND (k#5 = 5))
:- SubqueryAlias cassandracatalog.test_ks.skvv
:  +- RelationV2[s#4, k#5, v#6, v1#7] skvv
+- Project [cast(id#0L as string) AS skey#2]
   +- Range (1, 100, step=1, splits=Some(16))

== Analyzed Logical Plan ==
s: string, k: int, v: int, v1: int, skey: string
Join Inner, ((skey#2 = s#4) AND (k#5 = 5))
:- SubqueryAlias cassandracatalog.test_ks.skvv
:  +- RelationV2[s#4, k#5, v#6, v1#7] skvv
+- Project [cast(id#0L as string) AS skey#2]
   +- Range (1, 100, step=1, splits=Some(16))

== Optimized Logical Plan ==
Join Inner, (skey#2 = s#4)
:- Filter (k#5 = 5)
:  +- RelationV2[s#4, k#5, v#6, v1#7] skvv
+- Project [cast(id#0L as string) AS skey#2]
   +- Range (1, 100, step=1, splits=Some(16))

== Physical Plan ==
*(2) BroadcastHashJoin [s#4], [skey#2], Inner, BuildRight, false
:- *(2) Filter (k#5 = 5)
:  +- BatchScan[s#4, k#5, v#6, v1#7] Cassandra Scan: test_ks.skvv
 - Cassandra Filters: []
 - Requested Columns: [s,k,v,v1]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [id=#54]
   +- *(1) Project [cast(id#0L as string) AS skey#2]
      +- *(1) Range (1, 100, step=1, splits=16)
spark-cassandra-connector
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

Erick Ramirez avatar image
Erick Ramirez answered Anatoly commented

If you haven't already seen it, please try joinWithCassandraTable().

Have a look at the examples in Performing Efficient Joins with Cassandra. Cheers!

5 comments Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Anatoly avatar image Anatoly commented ·

It works well with joinWithCassandraTable():

scala> val stringsRDD = sc.parallelize(0 to 5).map(_.toString)
stringsRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at map at <console>:24

scala> val intsRDD = sc.parallelize(0 to 5)
intsRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:27

scala> val strIntRDD = stringsRDD.zip(intsRDD)
strIntRDD: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[3] at zip at <console>:30

scala> val internalJoin = strIntRDD.joinWithCassandraTable("test_ks", "skvv")
internalJoin: com.datastax.spark.connector.rdd.CassandraJoinRDD[(String, Int),com.datastax.spark.connector.CassandraRow] = CassandraJoinRDD[4] at RDD at CassandraRDD.scala:18

scala> internalJoin.collect.foreach(println)
((0,0),CassandraRow{s: 0, k: 0, v: 0, v1: 0})
((3,3),CassandraRow{s: 3, k: 3, v: -3, v1: -3})

But why doesn't it work with data frames? Should I report about bug?

0 Likes 0 ·
Erick Ramirez avatar image Erick Ramirez ♦♦ Anatoly commented ·

Could you try explicitly setting directJoinSetting to on? This should force a direct join regardless of the threshold (directJoinSizeRatio). Details here. Cheers!

0 Likes 0 ·
Anatoly avatar image Anatoly Erick Ramirez ♦♦ commented ·
This setting was switched on in my environment during above tests.
0 Likes 0 ·
Erick Ramirez avatar image Erick Ramirez ♦♦ Anatoly commented ·

One more thing to try is to call the directJoin() function so that it is AlwaysOn:

val resultDf = skvvDf.join(stringsDf.directJoin(AlwaysOn), ... )
0 Likes 0 ·
Anatoly avatar image Anatoly commented ·

[Update posted in question body]

0 Likes 0 ·