question

JJorczik avatar image
JJorczik asked Erick Ramirez edited

Which tasks should have locality level node_local with collocated Spark/Cassandra Setup?

We have collocated Spark and Cassandra nodes and observe that just a subset of created tasks are running on locality level node_local when reading a Cassandra Table with SCC and performing a count on it. All shuffle read related tasks are running node_local, but shuffle writes are running on locality level any. Is this behaviour as expected? When replacing count with collect, we observe only tasks with locality level any. Shouldn't transformations that do not trigger a shuffle also create tasks with locality level node_local?

spark-cassandra-connectordata locality
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

jaroslaw.grabowski_50515 avatar image
jaroslaw.grabowski_50515 answered JJorczik edited

Hi! A simple read with collect definitely should have at least some of the tasks with locality better then any. Maybe it's a misconfiguration around addresses? SCC exposes preferred locations to Spark via this method: https://github.com/datastax/spark-cassandra-connector/blob/b2.5/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala#L287

Once you have your rdd you may obtain all the partitions and see which locations are preferred. Do these locations match addresses used by Spark?

Also what SCC/DSE version is this?

3 comments Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

JJorczik avatar image JJorczik commented ·

I have tried out the helper function from this thread. Do you think it works correctly? https://stackoverflow.com/questions/50872579/how-to-determine-preferred-location-for-partitions-of-pyspark-dataframe

The preferred locations of each partition after reading a Cassandra table are empty. Can you explain how all shuffle read tasks can be node_local after shuffling when using count if there is no preferred location information?

We are using SCC version 3.0.0, Spark version 3.0.0 and Cassandra Version 3.11.4. We have collocated Spark Worker and Cassandra nodes by running Spark docker container and Cassandra docker container on the same VM. We expected that SCC uses the VM's IP for deciding about locality. Did we do wrong here?

0 Likes 0 ·
jaroslaw.grabowski_50515 avatar image jaroslaw.grabowski_50515 ♦ JJorczik commented ·

Fire up spark-shell and create an rdd

import com.datastax.spark.connector._
val rdd = spark.sparkContext.cassandraTable(keyspace = "ks", table = "kv")

this rdd is actually a CassandraTableScanRDD instance and should print the preferred locations (L287):

rdd.partitions.foreach(p=>println(rdd.preferredLocations(p)))

You should see preferred location for each partition. If these addresses don't match your executors (check webui) then no locality may be achieved and all the tasks are marked as "ANY".

In the basic scenario, shuffle reads should run locally as the executors use local filesystem to store the spills (there is no room for address misconfiguration here).

2 Likes 2 ·
JJorczik avatar image JJorczik jaroslaw.grabowski_50515 ♦ commented ·

You were right that it was about address configuration. We modified the worker addresses in our Spark setup and now we observe node_local tasks even without shuffling. The preferred locations also match our executors addresses. Thank you for your help!

0 Likes 0 ·
Erick Ramirez avatar image
Erick Ramirez answered Erick Ramirez edited

To me it makes sense that writes don't have to be node_local since any node in the DC can act as the coordinator for the write request which sends the mutation to all replicas.

If you don't think this is the correct behaviour, feel free to provide a minimal code sample which replicates the problem including the table schema + versions of SCC, Spark and Cassandra you're using.

In the meantime, I'm going to reach out to the Analytics team here at DataStax for ideas or known issues in some versions. Cheers!

Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.