Hello!
There is a question about C* Spark connector (spark 3.1.1, connector 3.1.0), trying to work the direct join on the table with a clustering key.
There is a couple of tables in C*:
create table test_ks.skv( s varchar, k int, v int, primary key (s, k) ); create table test_ks.skvv( s varchar, k int, v int, v1 int, primary key ((s, k), v) );
Creating join on the following dataset:
val stringsDf = spark.range(1, 100).selectExpr("cast(id as string) skey")
When I'm doing join on the table without clustering key there is a direct join in the plan:
val resultDf = skvDf.join(stringsDf, stringsDf("skey") === skvDf("s") && skvDf("k") === lit(5))
== Physical Plan == Cassandra Direct Join [s = skey#2] test_ks.skv - Reading (s, k, v) Pushed {("k" = ?:5)} +- *(1) Project [cast(id#0L as string) AS skey#2] +- *(1) Range (1, 100, step=1, splits=16)
But in the case of the table with clustering key there isn't any direct join :(
val resultDf = skvvDf.join(stringsDf, stringsDf("skey") === skvvDf("s") && skvvDf("k") === lit(5))
== Physical Plan == *(2) BroadcastHashJoin [s#4], [skey#2], Inner, BuildRight, false :- *(2) Filter (k#5 = 5) : +- BatchScan[s#4, k#5, v#6, v1#7] Cassandra Scan: test_ks.skvv - Cassandra Filters: [] - Requested Columns: [s,k,v,v1] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [id=#44] +- *(1) Project [cast(id#0L as string) AS skey#2] +- *(1) Range (1, 100, step=1, splits=16)
Moreover adding the clustering key in the join condition doesn't solve the problem:
val resultDf = skvvDf.join(stringsDf, stringsDf("skey") === skvvDf("s") && skvvDf("k") === lit(5) && skvvDf("v") === lit(10))
== Physical Plan == *(2) BroadcastHashJoin [s#4], [skey#2], Inner, BuildRight, false :- *(2) Filter (k#5 = 5) : +- BatchScan[s#4, k#5, v#6, v1#7] Cassandra Scan: test_ks.skvv - Cassandra Filters: [["v" = ?, 10]] - Requested Columns: [s,k,v,v1] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [id=#73] +- *(1) Project [cast(id#0L as string) AS skey#2] +- *(1) Range (1, 100, step=1, splits=16)
Is it expected behavior and why?
[UPDATE] Calling directJoin with AlwaysOn doesn't work either:
scala> val resultDf = skvvDf.join(stringsDf.directJoin(AlwaysOn), stringsDf("skey") === skvvDf("s") && skvvDf("k") === lit(5)) resultDf: org.apache.spark.sql.DataFrame = [s: string, k: int ... 3 more fields] scala> resultDf.explain(true) == Parsed Logical Plan == Join Inner, ((skey#2 = s#4) AND (k#5 = 5)) :- SubqueryAlias cassandracatalog.test_ks.skvv : +- RelationV2[s#4, k#5, v#6, v1#7] skvv +- Project [cast(id#0L as string) AS skey#2] +- Range (1, 100, step=1, splits=Some(16)) == Analyzed Logical Plan == s: string, k: int, v: int, v1: int, skey: string Join Inner, ((skey#2 = s#4) AND (k#5 = 5)) :- SubqueryAlias cassandracatalog.test_ks.skvv : +- RelationV2[s#4, k#5, v#6, v1#7] skvv +- Project [cast(id#0L as string) AS skey#2] +- Range (1, 100, step=1, splits=Some(16)) == Optimized Logical Plan == Join Inner, (skey#2 = s#4) :- Filter (k#5 = 5) : +- RelationV2[s#4, k#5, v#6, v1#7] skvv +- Project [cast(id#0L as string) AS skey#2] +- Range (1, 100, step=1, splits=Some(16)) == Physical Plan == *(2) BroadcastHashJoin [s#4], [skey#2], Inner, BuildRight, false :- *(2) Filter (k#5 = 5) : +- BatchScan[s#4, k#5, v#6, v1#7] Cassandra Scan: test_ks.skvv - Cassandra Filters: [] - Requested Columns: [s,k,v,v1] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [id=#54] +- *(1) Project [cast(id#0L as string) AS skey#2] +- *(1) Range (1, 100, step=1, splits=16)