question

MayuriD avatar image
MayuriD asked MayuriD commented

DirectJoin does not work with Structured Streaming DataFrame

When reading data from the Cassandra , When I do join with the simple DataFrame created by spark.read() , I am seeing the the Cassandra Direct Join is happening.
But when I do same join using the streaming DataFrame , I see normal join output in Physical Plan.I am using the latest Cassandra Connector jar. i.e. 3.1.0 with spark 3.1.2

What is the best way to read the data from Cassandra in streaming query??

Adding the Sample Code Snippet

Create Table -

CREATE TABLE test_ks.test (

        value bigint PRIMARY KEY,

        v int)      

Working Code -

// create Simple DF

val range = spark.range(1, 1000).selectExpr("cast(id as long) value")

// read cassandra data

val joinTarget = spark.read

      .format("org.apache.spark.sql.cassandra")

      .options(Map("table" -> "test", "keyspace" -> "test_ks"))

      .load

range.join(joinTarget,Seq("value")).explain()

Output of Working Code -

== Physical Plan ==

*(2) Project [value#2L, v#5]

+- Cassandra Direct Join [value = value#2L] test_ks.test - Reading (value, v) Pushed {} 

   +- *(1) Project [id#0L AS value#2L]

      +- *(1) Range (1, 1000, step=1, splits=12)


Non Working Code -

// create streaming DF

val range = spark

      .readStream

      .format("rate")

      .option("rowsPerSecond", 1)

      .load

  
// read cassandra data

val joinTarget = spark.read

      .format("org.apache.spark.sql.cassandra")

      .options(Map("table" -> "test", "keyspace" -> "test_ks"))

      .load
range.join(joinTarget,Seq("value")).explain()

Output of non working code -

== Physical Plan ==

*(5) Project [value#1L, timestamp#0, v#5]

+- *(5) SortMergeJoin [value#1L], [value#4L], Inner

   :- *(2) Sort [value#1L ASC NULLS FIRST], false, 0

   :  +- Exchange hashpartitioning(value#1L, 200), ENSURE_REQUIREMENTS, [id=#31]

   :     +- *(1) Filter isnotnull(value#1L)

   :        +- StreamingRelation rate, [timestamp#0, value#1L]

   +- *(4) Sort [value#4L ASC NULLS FIRST], false, 0

      +- Exchange hashpartitioning(value#4L, 200), ENSURE_REQUIREMENTS, [id=#39]

         +- *(3) Project [value#4L, v#5]

            +- BatchScan[value#4L, v#5] Cassandra Scan: test_ks.test

 - Cassandra Filters: []

 - Requested Columns: [value,v]


So when I use the streaming Df to join with the Cassandra DF,In output this is observed..

Cassandra Filters: []
spark-cassandra-connectordirect join
4 comments
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Erick Ramirez avatar image Erick Ramirez ♦♦ commented ·

Please provide the following to give us a better idea of the issue:

  • minimal code sample that replicates the issue
  • explain output using the code sample
  • minimal code sample you're comparing it to (the one that works)

Once you provide the additional info, I'll get the Analytics team to have a look at them. Cheers!

0 Likes 0 ·
MayuriD avatar image MayuriD Erick Ramirez ♦♦ commented ·

@Erick Ramirez Edited the question and added the minimal code of the both working and non working code, along with its output.
Thanks!!!

0 Likes 0 ·
MayuriD avatar image MayuriD Erick Ramirez ♦♦ commented ·
@Erick Ramirez Hey, Have you or your Analytics team got any chance to look at the issue. I have provided additional information.. Thanks in advance!!!


0 Likes 0 ·
MayuriD avatar image MayuriD commented ·

@Erick Ramirez Got the Solution.. Thanks a bunch!!!!

0 Likes 0 ·

1 Answer

jaroslaw.grabowski_50515 avatar image
jaroslaw.grabowski_50515 answered MayuriD commented

Hi! Probably your sets aren't big enough and `auto` behavior of `directJoinSetting` setting turns direct join off. Set `directJoinSetting` to `on` and it should be fine (add `"directJoinSetting" -> "on"` to your options map).

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#cassandra-datasource-table-options

1 comment Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

MayuriD avatar image MayuriD commented ·

It worked by adding `"directJoinSetting" -> "on"` to my options map..
Thank You So Much!! @jaroslaw.grabowski_50515

0 Likes 0 ·