question

anshita333saxena_187432 avatar image
anshita333saxena_187432 asked Russell Spitzer commented

How can I optimize my Spark app for retrieving all the children records in Scylla?

In this use-case, need to fetch the records as the tables are designed as parent-child relation. Having the value and need to query that value on parent-table, collect it's child until the child not there in the table, query this to the child-table and fetch the children of all the parents. We have spark (2 node cluster) and DB (12 node cluster)
Parent element count = approx 8k (extracted on the basis of the given value)~30G
Child element count = approx 11k (extracted on the basis of 8k values)~412G
Total records = approx 76k (extracted on the basis of 11k+8k=19k values)~126G

Current entire code is taking 2.1 hrs to run and collect the data.
executor-memory: 30GB and driver-memory: 10GB

In the first two tables, parentaui is secondary index. As per your suggestion and raised here CASSANDRA-10050 , secondary indexes are not that good.

Tried couple of cases here-
1. Using datasets- we saw cached read is slower.
2. Using rdd- still trying the approaches.

Code took 2.1 hours:
1. Create a queue(in parent-child retrieval, order is important) and set(elements will be unique).
2. Filter rdd at reading time in where clause for parentaui (secondary index filter) as given specifically [Recursive till the queue not ended as need to retrieve the children of children] ---> Around 8k extracted.
3. After retrieving the values from step2, querying to child table based on where clause parentaui (secondary index filter) ----> Around 11k extracted
4. After collecting all the records, querying to the final table based on where clause on primary key and store the records ----> Around 76k extracted (based on 11k+8k=19k)
Intention is to reduce the processing time.

Attaching code for reference. (Doing rdd.collect().toSet operation to collect the items for the further retrieval)child_retrieval.txt
Please let me know in case, need for further details, I believe we can able to optimize the time for this sort of use-cases.

sparkconnector
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

Russell Spitzer avatar image
Russell Spitzer answered Russell Spitzer commented

When using datasets and an In Clause, you should not use caching. We talked about this a little in the past thread but if you are doing a single filtering step, it is not important to do a caching. Caching is only for repeated uses of the same RDD.

Secondary indexes may be acceptable on Scylla, which you were using in the logs you previously submitted. If this is the case I would try an approach without spark at all since it doesn't seem like you actually need Spark for a Scatter Gather approach. For this I would just issue all the requests Async and do a AwaitAll to wait for the results.

If Secondary Indexes aren't acceptable then I would have a hard time believing that you can do better than a single pass looking up all your keys at the same time. This would be similar code in RDD's if we are avoiding the pushdown. rdd.filter( ParentHash.contains(row.getCol("key")) or something like that. I am not sure what you mean about ordering since sets are not intrinsically ordered in Scala and i'm pretty sure you would lose ordering on combining the sets. The best you could get is a SortedSet which if sorted on the ParentHash would be the same as if you looked up the records in order more or less. If that isn't acceptable you can always index the records you retrieve based on the ordinal of the parent used to look them up and sort afterwards.

Again this is difficult to tell from your question since you are asking for many various things at once and I would recommend you make small focused questions if possible, but it does sound like perhaps youare looking up records based on a Primary Key at some point? For this you would want to use one of the custom cassandra joins in the connector. Methods like `joinWithCassandraTable`.

All of this assumes you are working with a lot of data and it does not sound like this actual result set is ever that large, so Spark may not actually be helpful here.

In summation my approaches would be (Psudo Code below),

1) Are secondary indexes good in Scylla? Do Async Requests

Futures.awaitAll(
  for (parent <- parents ) yield { 
    session.executeAsync("SELECT from Child ...")  
  }
) // This will return back the results of all these queries in order


2) Are secondary indexes bad in Scylla? Full Table Scan non Cache unless repeated Scan's are used.

indexOfParent = parents.zipWithIndex.swap.map // I want a map of parentId -> order
rdd.flatMap( row => parents.find(row.get("parent")).map( parent => (indexOfParent, row))

// DataFrame version, you would have to do an index lookup for this as well but I haven't written it out
df.filter(col(parent).in("parents")


4 comments Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

anshita333saxena_187432 avatar image anshita333saxena_187432 commented ·

Thanks @Russell Spitzer, for the further directions. Let me try for for these.
I am using queue (FIFO) for the ordering because here we will have this use-case for accessing children of children.
I read in some article that avoid using queues and sets and use arrays but here if I will not traverse element by element like parent--->child---->grand children, then I will not able to collect the result.
Think like you have the single-pack--->outers--->big-packs. (hierarchical tree)
So if you are passing big-packs, you would expect to get outers. For this, only need the ordering.

0 Likes 0 ·
anshita333saxena_187432 avatar image anshita333saxena_187432 anshita333saxena_187432 commented ·

Just for record retrieval, i need ordering,(Traversal-----queue, Storage----set) but once the records stored and hit on the upui table, in that case, doesn't need ordering therefore, using sets there.

0 Likes 0 ·
anshita333saxena_187432 avatar image anshita333saxena_187432 commented ·

@Russell Spitzer
As I need to extract the data here based on the transactional database (relationship between parent and child records) therefore, I tried to implement this use-case using Python Cassandra.
I got good performance in terms of time.

Here is the comparison between the performance:
spark ----> 2.5 hrs (From the snippet, I shared in the previous post)
python --> Minimum = 4 minutes and Maximum = 10 minutes.

0 Likes 0 ·
Russell Spitzer avatar image Russell Spitzer anshita333saxena_187432 commented ·

Thats great! I'm glad you found an efficient way to get your results

1 Like 1 ·