Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

peter.kovgan_176371 avatar image
peter.kovgan_176371 asked ·

Is there a simple way to convert to Data Frame from RDD[CassandraRaw]?

Hi,

I found 2 ways to fetch data from cassandra, one is

spark.read... - this one , if applied select(column, column) anyway really selects only on client side


and sc.cassandaTable(...) - this one selects on cassandra side, but does not provide a good working option to convert RDD to DataFrame

Thus, I do something pretty complex to get a dataFrame:


val data = spark.sparkContext.cassandraTable(keyspace, table).select("event_log_multiplier", "resolution","ip_country","user_hash","man_vs_machine_collection")
  .filter(row=> (row.getInt("event_log_multiplier") <= toPk  &&  row.getInt("event_log_multiplier") >= fromPk ))

 val sqlContext = spark.sqlContext

 import sqlContext.implicits._

 val selectedData = data.keyBy(row => (
   row.getStringOption("resolution"),
   row.getStringOption("ip_country"),
   row.getStringOption("user_hash"),
   row.getStringOption("man_vs_machine_collection"))).map(x => x._1).toDF("resolution","ip_country","user_hash","man_vs_machine_collection").na.fill("-1000", colNames)



That means, I need list all columns that I want to use in the data frame like that row.getStringOption("resolution"), and if I have 100+ columns my code will be a nightmare.

Is there a simple way to convert to Data Frame from RDD[CassandraRaw]?

Thanks!

sparkconnectordataframe
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

Russell Spitzer avatar image
Russell Spitzer answered ·

Dataframes Supports Cassandra Side Pushdowns and Column Pruning

Both CassandraTable and the DataFrame reader approach both perform Cassandra side pushdowns and selection pruning. The DataFrame approach automatically translates pruning and filters from your request. You can get more details on what exactly is happening by running a "explain" on your query and seeing what is passed down to the Cassandra source.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#automatic--predicate-pushdown-and-column-pruning

So that should fix your problem since you don't actually need to use Cassandra Table in the first place.

Converting from RDD to Dataframes

If you are interested in the conversion to DataFrames from RDD (this should only be done in very rare circumstance when you can only do something with the RDD API) you can use the implicit transform.

The import is here
https://github.com/datastax/spark-cassandra-connector/blob/c724585a3185d96726323e7354e2c564c496c00f/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala#L55

Which would let you read an RDD directly into the CassandraSqlRow class which should be convertible to Dataframes without calling out explicit columns. This is what the Datasource approach uses to create an RDD see
https://github.com/datastax/spark-cassandra-connector/blob/c724585a3185d96726323e7354e2c564c496c00f/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSourceRelation.scala#L87

With this RDD you can use createDataFrame(rdd, schema) which you can generate from your "selects"

Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.