mmarovic avatar image
mmarovic asked mmarovic edited

How to control partition size when reading data with Apache Spark?

Hey there!

I recently asked a question on the ASF cassandra slack channel and was redirected here, so I'm hoping someone here can help me out.

I'm currently using org.apache.cassandra.hadoop.cql3.CqlInputFormat to read some data from Cassandra (3.11.5) into Apache Spark. Some of the partitions are too big to fit into my Spark executor memory. From what I understand, the spark partitions correspond to the cassandra token ranges by default, but the parameters cassandra.input.split.size and cassandra.input.split.size_mb should be used to further split these into smaller parts so that memory issues can be avoided. However, this didn't solve my problems so I started debugging.

From what I gathered, it turns out that sub-splitting works by using the data from the system.size_estimates table. The class queries for a specific token range and then uses this data to calculate how many additional splits it should create. However, because of the LocalStrategy replication, the query might return no results if it was sent to the "wrong" node (one that has no data about that specific token range). In that case, there is no data so the input format cannot create any subsplits, and the initial partition remains too big.

I even went to the extreme by setting cassandra.input.split.size to 1, which should mean that each spark partition corresponds to a single partition from a single token range. In my test case I had two nodes, one with 16 token ranges with 2453 partitions total and the other with 17 token ranges with 3220 partitions total (according to the system.size_estimates table). In theory, by setting that parameter to 1 i should get 2453 + 3220 = 5673 partitions in Apache Spark. In practice, I get 2470 partitions. This is because all the queries are sent to the first node so its token ranges are split properly according to the parameter, but the token ranges from the second node are not split at all because it cannot find any size estimates for those ranges. In the end, I get 2453 "good" partitions (split according to the parameter) and 17 "bad" partitions (not split because they're from the second node), which sums up to 2470.

So my questions are as follows:

1. Am I doing something wrong? Because after analysing the code, this seems like a bug, but I suppose if that's the case more people should be having this problem. I tried to find some info on the JIRA, but nothing came up.

2. Can I somehow fetch the data for all token ranges with a single query and deal with this in my app?
3. If not, can I somehow force the client to send the query to a specific node, so I can reliably find the data I need for a specific token range, or for all token ranges by ensuring that I iterate on all nodes in the cluster?

As an additional note, I'm using Cassandra as a backend for the Janusgraph graph database, so I don't have much control over the schema or the type of the data stored there.

Also, the Janusgraph code provides only a single host to the cassandra.input.thrift.address parameter, so this would explain why only the ranges from a single node are split successfully. If I change this to provide both the nodes, then token ranges are split at random, depending on if the node the query was sent to is the same as the node that owns the current token range (referring to the describeSplits() method).



Update 1:

I managed to solve questions 2 & 3 so now I can programmatically get the proper size estimates for each node. However, to use this and get the expected behavior in Spark, I'd have to basically reimplement org.apache.cassandra.hadoop.cql3.CqlInputFormat and change some other Janusgraph classes to work with this new input format. This could work as a last resort solution, but I'm still primarily interested in using the "official" implementation, so question 1 still remains.

Update 2:

While trying to implement my own version, I noticed another issue. The token ranges used to generate the initial partitions are obtained via the Metadata.getTokenRanges() method. In my test case, this method returns 32 token ranges. However, system.size_estimates contains 33 rows for 33 token ranges. After comparing the ranges, I noticed that the wrapping range from Metadata.getTokenRanges():

(9136685927276781333, -8825951397969454570)

is split into two rows in system.size_estimates:

(9136685927276781333, -9223372036854775808)
(-9223372036854775808, -8825951397969454570)

The query used in the input filter code to fetch the ranges is:

SELECT mean_partition_size, partitions_count
FROM system.size_estimates
WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?

If that's the case, then the wrapping range can never be subsplit because its estimates can never be fetched because the limits differ.

I don't know if this is different in newer versions, but I didn't see anything related to this in the 3.11.x release notes.

Anyway, really looking forward to hearing from someone about this!

1 comment
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Erick Ramirez avatar image Erick Ramirez ♦♦ commented ·

Thanks for posting the question here. As discussed on ASF Slack, I'm going to request one of the DataStax Analytics engineers to respond. Cheers!

0 Likes 0 ·

1 Answer

jaroslaw.grabowski_50515 avatar image
jaroslaw.grabowski_50515 answered mmarovic edited

1. I don't think so, this indeed looks like a bug.

The thing with org.apache.cassandra.hadoop.cql3.CqlInputFormat is that it's a legacy way of retrieving data in Hadoop (and, by extension, Spark). I highly recommend porting your apps to spark-cassandra-connector. SCC doesn't have the issue you described and brings a lot of other benefits.

2 comments Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

mmarovic avatar image mmarovic commented ·

Thanks for the response. Unfortunately, since I'm using this via an external library (hadoop-gremlin), I can't change that as easily. I can either reimplement these InputFormat classes and fix issues in my implementation, or write my own InputFormat classes that use the spark-cassandra-connector. I'll see with the library devs about this then.

0 Likes 0 ·
mmarovic avatar image mmarovic commented ·

For anyone else that comes across this problem: it looks like the issues were fixed in and the fixed classes should be available in cassandra 4.0 jars. I haven't been able to confirm this by fully testing, but the changes in the code show that at least the problems I came across were resolved.

0 Likes 0 ·