question

anshita333saxena_187432 avatar image
anshita333saxena_187432 asked Erick Ramirez edited

Concurrent Programming for cassandraTable rdd in spark-scala

Is this possible to write concurrent programming for cassandraTable queries?

My requirement is: Set having all the secondary Indexes values. Need to query this Set on the cassandraTable to fetch the records.

Here is code snippet: (parentaui is secondary indexed column)

var mainUPUISet = scala.collection.mutable.Set[String]();
for (paui <- mainAUISet) {
    val rddFullAui = sc.cassandraTable(settings.keyspace, settings.upuitable).select("upuishort", "parentaui").where("parentaui = ?", paui);
    val uiChildSet: Set[(CassandraRow)] = rddFullAui.collect().toSet;
    for (ui <- uiChildSet) {
        mainUPUISet += ui.getString("upuishort");
    }
}

Problem is this code is taking one hour as I have lot of values in mainAUISet Set.
I am thinking to put concurrent programming (Futures) so that all the values stored in mainAUISet will be calculated parallelly.
I tried the below code in spark-shell and it is working fine:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
// Storing some test value
var mainAUISet = scala.collection.mutable.Set("0380021675741861160DB220339638300", "0594853000151565280C6192714580300", "0403130022731276610BM130615522300");
// Initializing the result set
var mainUPUISet = scala.collection.mutable.Set[String]();
// Future function to give the concurrency
def getChildUpuis(paui: String, mainUPUISet: scala.collection.mutable.Set[String]) = Future {
    val r = scala.util.Random; 
    val randomSleepTime = r.nextInt(3000); println(s"For $paui, sleep time is $randomSleepTime");
    val rddFullAui = sc.cassandraTable("bat_tpd_pri_msg", "upui").select("upuishort", "parentaui").where("parentaui = ?", paui); 
    val uiChildSet: Set[(CassandraRow)] = rddFullAui.collect().toSet; 
    for (ui <- uiChildSet) {mainUPUISet += ui.getString("upuishort");}
}
// Calling of the future function to for all the values
for (paui <- mainAUISet) { getChildUpuis(paui, mainUPUISet);}
// Sleeping of the main thread
def sleep(time: Long): Unit = Thread.sleep(time)
sleep(15000)
var totalSet = mainAUISet ++ mainUPUISet;

But in spark-submit, this code is failing with the error:

ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.

After exploring about this error, I got to know that this is coming because of re-partitioning.
https://stackoverflow.com/questions/41338617/spark-could-not-find-coarsegrainedscheduler

Can anyone please suggest or recommend about can we use here concurrent programming or not or what is the best way to achieve this requirement in spark and can the corrent programming is supporting the cassandraTable rdds or not?
Please reply ASAP.

Thanks.

spark-cassandra-connector
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

Russell Spitzer avatar image
Russell Spitzer answered anshita333saxena_187432 commented

There are no issues with using concurrent programming with any Spark actions but you may not get the outcome you desire. Since RDD"s schedule their own work and are already parallelized you may not actually increase performance with multiple driver actions at once.

More so, your current code will pull all records back to the driver (the collect Operation) this could be very very expensive and overload the Driver Jvm.

Now the error you are showing is most likely unrelated to any of this. It's most likely being caused by one of your executors being overwhelmed by the operation and shutting down. But this also means it's probably more of a downstream error than the actual cause.

Try looking further up in your logs for any other exceptions or failures.


So in summary

1) Concurrency is fine, but probably not beneficial unless you have an much larger spark cluster than number of partitions being processed
2) Doing lots of "collect" operations is not a great behavior if the datasets are large
3) The error posted is more of a symptom than a cause, look for other exceptions either in the driver log or executor logs.

14 comments Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

anshita333saxena_187432 avatar image anshita333saxena_187432 commented ·

Hi @Russell Spitzer, appreciate your help for the directions.

Further digging up the logs as you recommended, I saw out of 3 rdds, 1 rdd result got recorded and for the other two rdd shutdown. As the sleep(15000) parameter is there for only 15 sec, therefore the job got exited after 15 sec. When I increased the sleep time of this from 15 sec to 20 sec then it is able to record the output for all the three values of mainAUISet set successfully.

Actually, I am expecting around 8000 values in mainAUISet set. Need to query this set in DB (12 node cluster) from spark (2 node cluster). Intention is not go to the DB and then run queries. If I run 8000 values as the code pasted above one by one, it is taking 1.2 hours to give me the result (Set ----> for loop for each value ----> collect it's corresponding records from the cassandraTable).

0 Likes 0 ·
Erick Ramirez avatar image Erick Ramirez ♦♦ anshita333saxena_187432 commented ·

@anshita333saxena_187432 A friendly note to let you know that I converted your post to a comment since it is not an "answer". If you need to make a really long comment, please make them in 2 separate posts. Cheers!

1 Like 1 ·
Russell Spitzer avatar image Russell Spitzer anshita333saxena_187432 commented ·

You shouldn't really be sleeping to work with futures (and again I don't think you need them at all for this use case, I wrote a blog post about this ) you should await your futures.

Anything invloving secondary indexes is probably going to be a bad idea, see Cassandra-10050

For this sort of use case I would probably just cache the Table, and then pass over it with my filter. You could either use the Spark caching mechanism, or copy all the data into a Parquet file and the filter on that. Basically doing a "IN" clause lookup. This is actually a built in functionality with DataFrames (which I also suggest you use instead of RDDS)

The concurrency here is almost certainly not doing anything, especially with a two node Spark cluster. The first read will most certainly consume all of the cores and queue up tasks, causing the second job to just queue up even more tasks (not to mention the third, fourth, fifth, .... nth)

1 Like 1 ·
Russell Spitzer avatar image Russell Spitzer Russell Spitzer commented ·


In summary ,

1) Forget about the concurrency, and if you do use it await your futures
2) Secondary indexes have major perf problems in C* probably best to avoid them as well
3) Doing multiple full scan filters should always be done on a cached dataset, cache and then scan

0 Likes 0 ·
Show more comments
anshita333saxena_187432 avatar image anshita333saxena_187432 commented ·

@Russell Spitzer Thanks for the further directions.

Further digging up the logs as you recommended, I saw out of 3 rdds, 1 rdd result got recorded and for the other two rdd shutdown.

As the job got exited after 15 sec & When I increased the sleep time of this from 15 sec to 20 sec then it is able to record the output for all the three values of mainAUISet set successfully.

Actually, expecting around 8000 values in mainAUISet set. Need to query this set in DB (12 node cluster) from spark (2 node cluster). Intention is not go to the DB and then run queries. If I run 8000 values as the code pasted above one by one, it is taking 1.2 hr to fetch the result (Set -> for loop for each value -> collect it's corresponding records from the cassandraTable).
As this is the pain where having 8000 records needs to be queried on the table one by one as the column parentaui is secondary-indexed column.

Do you think is there any other way to achieve this?

0 Likes 0 ·