Hey there!
I recently asked a question on the ASF cassandra slack channel and was redirected here, so I'm hoping someone here can help me out.
I'm currently using org.apache.cassandra.hadoop.cql3.CqlInputFormat
to read some data from Cassandra (3.11.5) into Apache Spark. Some of the partitions are too big to fit into my Spark executor memory. From what I understand, the spark partitions correspond to the cassandra token ranges by default, but the parameters cassandra.input.split.size
and cassandra.input.split.size_mb
should be used to further split these into smaller parts so that memory issues can be avoided. However, this didn't solve my problems so I started debugging.
From what I gathered, it turns out that sub-splitting works by using the data from the system.size_estimates
table. The class queries for a specific token range and then uses this data to calculate how many additional splits it should create. However, because of the LocalStrategy
replication, the query might return no results if it was sent to the "wrong" node (one that has no data about that specific token range). In that case, there is no data so the input format cannot create any subsplits, and the initial partition remains too big.
I even went to the extreme by setting cassandra.input.split.size
to 1, which should mean that each spark partition corresponds to a single partition from a single token range. In my test case I had two nodes, one with 16 token ranges with 2453 partitions total and the other with 17 token ranges with 3220 partitions total (according to the system.size_estimates
table). In theory, by setting that parameter to 1 i should get 2453 + 3220 = 5673 partitions in Apache Spark. In practice, I get 2470 partitions. This is because all the queries are sent to the first node so its token ranges are split properly according to the parameter, but the token ranges from the second node are not split at all because it cannot find any size estimates for those ranges. In the end, I get 2453 "good" partitions (split according to the parameter) and 17 "bad" partitions (not split because they're from the second node), which sums up to 2470.
So my questions are as follows:
1. Am I doing something wrong? Because after analysing the code, this seems like a bug, but I suppose if that's the case more people should be having this problem. I tried to find some info on the JIRA, but nothing came up.
2. Can I somehow fetch the data for all token ranges with a single query and deal with this in my app?
3. If not, can I somehow force the client to send the query to a specific node, so I can reliably find the data I need for a specific token range, or for all token ranges by ensuring that I iterate on all nodes in the cluster?
As an additional note, I'm using Cassandra as a backend for the Janusgraph graph database, so I don't have much control over the schema or the type of the data stored there.
Also, the Janusgraph code provides only a single host to the cassandra.input.thrift.address
parameter, so this would explain why only the ranges from a single node are split successfully. If I change this to provide both the nodes, then token ranges are split at random, depending on if the node the query was sent to is the same as the node that owns the current token range (referring to the describeSplits()
method).
Cheers,
Mladen
Update 1:
I managed to solve questions 2 & 3 so now I can programmatically get the proper size estimates for each node. However, to use this and get the expected behavior in Spark, I'd have to basically reimplement org.apache.cassandra.hadoop.cql3.CqlInputFormat
and change some other Janusgraph classes to work with this new input format. This could work as a last resort solution, but I'm still primarily interested in using the "official" implementation, so question 1 still remains.
Update 2:
While trying to implement my own version, I noticed another issue. The token ranges used to generate the initial partitions are obtained via the Metadata.getTokenRanges()
method. In my test case, this method returns 32 token ranges. However, system.size_estimates
contains 33 rows for 33 token ranges. After comparing the ranges, I noticed that the wrapping range from Metadata.getTokenRanges()
:
(9136685927276781333, -8825951397969454570)
is split into two rows in system.size_estimates
:
(9136685927276781333, -9223372036854775808) (-9223372036854775808, -8825951397969454570)
The query used in the input filter code to fetch the ranges is:
SELECT mean_partition_size, partitions_count FROM system.size_estimates WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?
If that's the case, then the wrapping range can never be subsplit because its estimates can never be fetched because the limits differ.
I don't know if this is different in newer versions, but I didn't see anything related to this in the 3.11.x release notes.
Anyway, really looking forward to hearing from someone about this!