Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

mishra.anurag643_153409 avatar image
mishra.anurag643_153409 asked mishra.anurag643_153409 commented

Why does input.split.size_in_mb not break large Cassandra partition to fit into executor memory?

I am reading a cassandra table using pyspark . My spark job is failing as there is few large partitions in cassandra those do not fit into spark memory . I was under impression that input.split.size should divide such large partitions diving by 64 MB and creating number of spark partitions from large cassandra partition , but this does not seem to be happening . As per my research I have come to know that cassandra data is divided into 64 MB and if partition size is less than 64 MB in that case all smaller partitions are combined and created one partition size closer to 64 MB , but if cassandra partition is greater than 64 MB in that case input split size des not work and that large cassandra partition is also going to be large spark partition .

How to deal with such issues ? What can I do to break cassandra large partitions into spark smaller partitions ?

spark-cassandra-connector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

Erick Ramirez avatar image
Erick Ramirez answered mishra.anurag643_153409 commented

The input.split.size_in_mb property is a notional target for the Spark connector to split the Cassandra data into Spark partitions. As I explained to you in question #11500, the calculations are based on estimates so they will never be exact.

In reality, the Spark partitions will never be 64MB in size (size_in_mb) unless the Cassandra partitions are exactly the same. Spark doesn't read the table and stop reading when it's reached 64MB.

As an example let's say that based on the estimated table size and estimated number of C* partitions, each Spark partition is estimated to map to 200 token ranges in Cassandra.

For the first Spark partition, the token range might only contain 2 Cassandra partitions of size 3MB and 15MB so the actual size of the data in Spark partition is just 18MB.

But in the next Spark partition, the token range contains 28 Cassandra partitions that are mostly 1 to 4MB but there is one partition that is 56MB. The total size of this Spark partition ends up being a lot more than 64MB.

If you happen to have really large partitions, they would skew the size of the Spark partitions. For example, your table might have 99% of partitions with 3 to 10MB but 1% of partitions are larger than 250MB. Every time Spark retrieves data from a token range which has a large partition, the Spark partition will be larger than 250MB.

Note that Cassandra partitions are not distributed across multiple Spark partitions -- a Cassandra partition can only belong to one Spark partition. If you have a really bad case where you have large partitions which do not fit in the executor memory, the task will run into out-of-memory errors.

There isn't a straightforward way to deal with large partitions. It requires deep knowledge of the data and isn't something that can be answered in a Q&A forum.

Russell Spitzer who is one of the authors of the Spark connector provides some advice in his blog post which might interest you -- Dealing with large partitions. Cheers!

1 comment Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

thanks a lot !! This is very good explanation and I have got most of details from here.


0 Likes 0 ·