Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

muthuselvam.dwh_190721 avatar image
muthuselvam.dwh_190721 asked ·

How do I convert CassandraRow to a DataFrame?

Hi,

I'm trying to perform left outer join with cassandra table using

leftJoinWithCassandraTable in Scala. I'm trying to convert the output into a Dataframe. Unfortunately I couldn't convert the CassandraRow into a Dataframe. Appreciate any support in this regard.

sparkconnector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

alex.ott avatar image
alex.ott answered ·

Instead of joining using the generic CassandraRow, it's better to use case class for your data in Cassandra when performing the join:

val data = sc.parallelize(Seq(Data(1, new java.util.Date(119,2,1), 20), Data(2, new java.util.Date(119,2,1), 20)))// perform left join between new data and data in Cassandraval joined = data.leftJoinWithCassandraTable[Data]("test", "sstest")

But real solution would be to upgrade to freshly released Spark Cassandra Connector 2.5.0, and perform join on the DataFrame level (so-called direct join), as it's described in the following blog post.

1 comment Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Hi Alex,

Thanks for the suggestion. I'm new to spark , so pls excuse my ignorance. I'm trying with the above option, the casting [Data] doesn't work for me.

we are still using DSE5.1.6 and I'm not sure if I can use the Direct Join. I'm trying with the below format. The issue I'm facing is to parse the joined output and convert into a data frame so I can use them


Val joined = KafkaData.rdd.repartitionByCassandraReplica("keyspace", "Table")

.leftJoinWithCassandraTable("keyspace", "Table")

.withConnector(cassconn)

.select("col a", "col b", "col c")

.on(PartitionKeyColumns)


The output joined is retrieved as CassandraLeftJoinRdd[Row,Any] and not as a SqlContext RDD where I can map to the columns and convert to a DataFrame. Kindly advise.

0 Likes 0 · ·