DataBroker asked Erick Ramirez answered

Cannot do direct joins with multiple tables with the Spark connector

Hello Team,

I just want to perform a join with data (cassandraTable1, cassandraTable2) from 2 cassandra tables and a stream dataset (partitionKeyStream) from a kafka topic :

val joinedDataDf = partitionKeyStream  
    .join(cassandraTable1, partitionKeyStream("id") === cassandraTable1("id"), "left")  
    .join(cassandraTable2, partitionKeyStream("id") === cassandraTable2("id"), "left")

I got the plan below with joinedDataDf.explain :

 == Physical Plan ==
SortMergeJoin [id#496], [id#492], LeftOuter
:- *(3) Sort [id#496 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#496, 200), true, [id=#230] :
     +- *(2) Project [id#496, latitude#112] :
        +- Cassandra Direct Join [id_number = id#496] key1.table1 - Reading (id_number, latitude) Pushed {} :
           +- *(1) Project ... :
              +- *(1) Filter ... :
                 +- StreamingRelation kafka, [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15] 
+- *(5) Sort [id#492 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#492, 200), true, [id=#237]
      +- *(4) Project [id#492, longitude#493]
         +- BatchScan[id#157, longitude#183] Cassandra Scan: key1.table2

When I execute my code with a single join (only data from table1 or table2), it seems all good. But with 2 joins, spark executes a Direct Join with the first, but does a full scan for the second.

I am using : spark 3.0.2 with the cassandra-spark-connector 3.0.1 and scala 2.12

Is there a way to only do direct joins ? How can i manage that ?

Thanks a lot.

I tried to seperate the queries from table1 and table2 then perform a join between these 2 datasets => I had to add a watermark column as it is a stream-stream join. But, I am bloqued with the same full table scan ...

Does someone have a clue on that, please ?

1 Answer

Erick Ramirez answered

I'm not sure if the table scan is necessary because of the way JOINs work when multiple tables are involved.

If you want, you can try to force a direct join by making sure it is always on with:

val joinedDataDf = partitionKeyStream  
  .join(t1.directJoin(AlwaysOn), partitionKeyStream("id") === t1("id"), "left")  
  .join(t2.directJoin(AlwaysOn), partitionKeyStream("id") === t2("id"), "left")

In the meantime, I'm going to reach out to the Analytics team here at DataStax. Cheers!

