Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

DataBroker avatar image
DataBroker asked Erick Ramirez answered

Cannot do direct joins with multiple tables with the Spark connector

Hello Team,

I just want to perform a join with data (cassandraTable1, cassandraTable2) from 2 cassandra tables and a stream dataset (partitionKeyStream) from a kafka topic :

val joinedDataDf = partitionKeyStream  
    .join(cassandraTable1, partitionKeyStream("id") === cassandraTable1("id"), "left")  
    .join(cassandraTable2, partitionKeyStream("id") === cassandraTable2("id"), "left")

I got the plan below with joinedDataDf.explain :

 == Physical Plan ==
SortMergeJoin [id#496], [id#492], LeftOuter
:- *(3) Sort [id#496 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#496, 200), true, [id=#230] :
     +- *(2) Project [id#496, latitude#112] :
        +- Cassandra Direct Join [id_number = id#496] key1.table1 - Reading (id_number, latitude) Pushed {} :
           +- *(1) Project ... :
              +- *(1) Filter ... :
                 +- StreamingRelation kafka, [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15] 
+- *(5) Sort [id#492 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#492, 200), true, [id=#237]
      +- *(4) Project [id#492, longitude#493]
         +- BatchScan[id#157, longitude#183] Cassandra Scan: key1.table2

When I execute my code with a single join (only data from table1 or table2), it seems all good. But with 2 joins, spark executes a Direct Join with the first, but does a full scan for the second.

I am using : spark 3.0.2 with the cassandra-spark-connector 3.0.1 and scala 2.12

Is there a way to only do direct joins ? How can i manage that ?

Thanks a lot.

spark-cassandra-connector
1 comment
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

UPDATE :

I tried to seperate the queries from table1 and table2 then perform a join between these 2 datasets => I had to add a watermark column as it is a stream-stream join. But, I am bloqued with the same full table scan ...

Does someone have a clue on that, please ?

0 Likes 0 ·

1 Answer

Erick Ramirez avatar image
Erick Ramirez answered

I'm not sure if the table scan is necessary because of the way JOINs work when multiple tables are involved.

If you want, you can try to force a direct join by making sure it is always on with:

val joinedDataDf = partitionKeyStream  
  .join(t1.directJoin(AlwaysOn), partitionKeyStream("id") === t1("id"), "left")  
  .join(t2.directJoin(AlwaysOn), partitionKeyStream("id") === t2("id"), "left")

In the meantime, I'm going to reach out to the Analytics team here at DataStax. Cheers!

Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.