When reading data from the Cassandra , When I do join with the simple DataFrame created by spark.read() , I am seeing the the Cassandra Direct Join is happening.
But when I do same join using the streaming DataFrame , I see normal join output in Physical Plan.I am using the latest Cassandra Connector jar. i.e. 3.1.0 with spark 3.1.2
What is the best way to read the data from Cassandra in streaming query??
Adding the Sample Code Snippet
Create Table -
CREATE TABLE test_ks.test ( value bigint PRIMARY KEY, v int)
Working Code -
// create Simple DF val range = spark.range(1, 1000).selectExpr("cast(id as long) value") // read cassandra data val joinTarget = spark.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "test", "keyspace" -> "test_ks")) .load range.join(joinTarget,Seq("value")).explain()
Output of Working Code -
== Physical Plan == *(2) Project [value#2L, v#5] +- Cassandra Direct Join [value = value#2L] test_ks.test - Reading (value, v) Pushed {} +- *(1) Project [id#0L AS value#2L] +- *(1) Range (1, 1000, step=1, splits=12)
Non Working Code -
// create streaming DF val range = spark .readStream .format("rate") .option("rowsPerSecond", 1) .load // read cassandra data val joinTarget = spark.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "test", "keyspace" -> "test_ks")) .load range.join(joinTarget,Seq("value")).explain()
Output of non working code -
== Physical Plan == *(5) Project [value#1L, timestamp#0, v#5] +- *(5) SortMergeJoin [value#1L], [value#4L], Inner :- *(2) Sort [value#1L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(value#1L, 200), ENSURE_REQUIREMENTS, [id=#31] : +- *(1) Filter isnotnull(value#1L) : +- StreamingRelation rate, [timestamp#0, value#1L] +- *(4) Sort [value#4L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(value#4L, 200), ENSURE_REQUIREMENTS, [id=#39] +- *(3) Project [value#4L, v#5] +- BatchScan[value#4L, v#5] Cassandra Scan: test_ks.test - Cassandra Filters: [] - Requested Columns: [value,v]
So when I use the streaming Df to join with the Cassandra DF,In output this is observed..
Cassandra Filters: []