Is this possible to write concurrent programming for cassandraTable queries?
My requirement is: Set having all the secondary Indexes values. Need to query this Set on the cassandraTable to fetch the records.
Here is code snippet: (parentaui is secondary indexed column)
var mainUPUISet = scala.collection.mutable.Set[String](); for (paui <- mainAUISet) { val rddFullAui = sc.cassandraTable(settings.keyspace, settings.upuitable).select("upuishort", "parentaui").where("parentaui = ?", paui); val uiChildSet: Set[(CassandraRow)] = rddFullAui.collect().toSet; for (ui <- uiChildSet) { mainUPUISet += ui.getString("upuishort"); } }
Problem is this code is taking one hour as I have lot of values in mainAUISet Set.
I am thinking to put concurrent programming (Futures) so that all the values stored in mainAUISet will be calculated parallelly.
I tried the below code in spark-shell and it is working fine:
import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.util.{Failure, Success} // Storing some test value var mainAUISet = scala.collection.mutable.Set("0380021675741861160DB220339638300", "0594853000151565280C6192714580300", "0403130022731276610BM130615522300"); // Initializing the result set var mainUPUISet = scala.collection.mutable.Set[String](); // Future function to give the concurrency def getChildUpuis(paui: String, mainUPUISet: scala.collection.mutable.Set[String]) = Future { val r = scala.util.Random; val randomSleepTime = r.nextInt(3000); println(s"For $paui, sleep time is $randomSleepTime"); val rddFullAui = sc.cassandraTable("bat_tpd_pri_msg", "upui").select("upuishort", "parentaui").where("parentaui = ?", paui); val uiChildSet: Set[(CassandraRow)] = rddFullAui.collect().toSet; for (ui <- uiChildSet) {mainUPUISet += ui.getString("upuishort");} } // Calling of the future function to for all the values for (paui <- mainAUISet) { getChildUpuis(paui, mainUPUISet);} // Sleeping of the main thread def sleep(time: Long): Unit = Thread.sleep(time) sleep(15000) var totalSet = mainAUISet ++ mainUPUISet;
But in spark-submit, this code is failing with the error:
ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message. org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
After exploring about this error, I got to know that this is coming because of re-partitioning.
https://stackoverflow.com/questions/41338617/spark-could-not-find-coarsegrainedscheduler
Can anyone please suggest or recommend about can we use here concurrent programming or not or what is the best way to achieve this requirement in spark and can the corrent programming is supporting the cassandraTable rdds or not?
Please reply ASAP.
Thanks.