Hello Team,
I just want to perform a join with data (cassandraTable1, cassandraTable2) from 2 cassandra tables and a stream dataset (partitionKeyStream) from a kafka topic :
val joinedDataDf = partitionKeyStream .join(cassandraTable1, partitionKeyStream("id") === cassandraTable1("id"), "left") .join(cassandraTable2, partitionKeyStream("id") === cassandraTable2("id"), "left")
I got the plan below with joinedDataDf.explain :
== Physical Plan == SortMergeJoin [id#496], [id#492], LeftOuter :- *(3) Sort [id#496 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#496, 200), true, [id=#230] :
+- *(2) Project [id#496, latitude#112] :
+- Cassandra Direct Join [id_number = id#496] key1.table1 - Reading (id_number, latitude) Pushed {} :
+- *(1) Project ... :
+- *(1) Filter ... :
+- StreamingRelation kafka, [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15]
+- *(5) Sort [id#492 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#492, 200), true, [id=#237]
+- *(4) Project [id#492, longitude#493]
+- BatchScan[id#157, longitude#183] Cassandra Scan: key1.table2
When I execute my code with a single join (only data from table1 or table2), it seems all good. But with 2 joins, spark executes a Direct Join with the first, but does a full scan for the second.
I am using : spark 3.0.2 with the cassandra-spark-connector 3.0.1 and scala 2.12
Is there a way to only do direct joins ? How can i manage that ?
Thanks a lot.