import org.apache.spark.sql.cassandra._ import com.datastax.spark.connector._ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import com.typesafe.config.{Config, ConfigFactory} import java.util.Calendar; /** * @author Anshita Saxena * Purpose Application designed for fetching uis */ object ChildRetrievalRemoveApp { def main(args: Array[String]) { println("....Process Starts....") val setconf: Config = ConfigFactory.load() val settings: Settings = new Settings(setconf) val conf = new SparkConf(true).set("spark.cassandra.connection.host", settings.serverIP) .set("spark.cassandra.auth.username", settings.username) .set("spark.cassandra.auth.password", settings.password) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.executor.memory", settings.serverMemory) .set("spark.driver.memory", settings.driverMemory) .set("spark.cassandra.input.split.size_in_mb", settings.inputSplitSizeInMb) .set("spark.eventLog.enabled", "true") .set("spark.cassandra.input.consistency.level", settings.consistencyLevel) val sc = new SparkContext(settings.masterURL, "structured_app", conf); println("----Configuration_mine-----" + sc.getConf); var ui = settings.aui; var mainAUISet = scala.collection.mutable.Set(ui); var mainUIQueue = scala.collection.mutable.Queue(ui); while (mainUIQueue.nonEmpty) { var ui = mainUIQueue.dequeue; val rddUIState = sc.cassandraTable(settings.keyspace, settings.uitable).select("ui", "primaryrepositoryid", "state").where("ui = ?", ui).withDescOrder.limit(1).first; if (rddUIState.getLong("state") == 5 || rddUIState.getLong("state") == 4) { var prid = rddUIState.getString("primaryrepositoryid"); val rddUIDisaggregate = sc.cassandraTable(settings.keyspace, settings.uidisaggregatetable).select("aui", "childui", "primaryrepositoryid").where("primaryrepositoryid = ?", prid); val uiDisaggrHistorySet: Set[(CassandraRow)] = rddUIDisaggregate.collect().toSet; for (crDisaggregated <- uiDisaggrHistorySet) { var childui = crDisaggregated.getString("childui"); if (!mainAUISet.contains(childui)) { mainUIQueue += childui; mainAUISet += childui; } } } else { val rddUIChild = sc.cassandraTable(settings.keyspace, settings.auitable).select("aui", "parentaui").where("parentaui = ?", ui); val uiChildSet: Set[(CassandraRow)] = rddUIChild.collect().toSet; for (ui <- uiChildSet) { var uiVal = ui.getString("aui"); if(!mainAUISet.contains(uiVal)) { mainUIQueue += uiVal; mainAUISet += uiVal; } } } } var mainUPUISet = scala.collection.mutable.Set[String](); for (paui <- mainAUISet) { val rddFullAui = sc.cassandraTable(settings.keyspace, settings.upuitable).select("upuishort", "parentaui").where("parentaui = ?", paui); val uiChildSet: Set[(CassandraRow)] = rddFullAui.collect().toSet; for (ui <- uiChildSet) { var uiVal = ui.getString("upuishort"); mainUPUISet += uiVal; } } var totalSet = mainAUISet ++ mainUPUISet; val rddMainUIStateHistory = sc.cassandraTable(settings.keyspace, settings.uitable).where("ui in ?", totalSet); rddMainUIStateHistory.saveAsTextFile(settings.outputFileName); } }