Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

tomas.bartalos_162914 avatar image
tomas.bartalos_162914 asked ·

Spark connector 3.0.0-beta write hangs forever for some executors

Hello,

I'm running Spark structured streaming app which writes to C*. The app have 3 executors and I'm experiencing weird issue where only one executor is able to write to C*, and the other 2 executors are hanging forever. The executor scheduling is unevenly balanced because the app is running 3 streams and one stream's tasks are always getting scheduled on the same executor, which is able to write:

while the other 2 streams are evenly scheduled:

But as you can see only executorID 2 can complete and 1,2 are hanging forever (duration: 24.7 hours):

I'm using Spark 3.0 in standalone mode with cassandra connector 3.0.0-beta, since currently I couldn't find any stable release for Spark 3. The same code runs for months without any problems with spark connector 2.4.2 and Spark 2.4.4.

[UPDATE] My cassandra DB version is 3.11.5

I've found out that when I create only one Spark executor ( --executor-cores == --total-executor-cores) there is no hanging.

Code simplified:

val connectionOpts: Map[String, String] =
  Map(
    "spark.cassandra.connection.host" -> config.cassandraHost,
    "spark.cassandra.auth.username" -> config.cassandraUsername,
    "spark.cassandra.auth.password" -> config.cassandraPassword,
    "spark.cassandra.output.batch.grouping.key" -> "none",
    "spark.cassandra.output.ignoreNulls" -> "true"
  )

// df is streaming from Kafka source
df.writeStream
  .option("checkpointLocation", conf.checkpointPath)
  .foreachBatch(saveToCassandra _)
  .start()

def saveToCassandra(df: DataFrame, id: Long): Unit = {
  df.write
    .format("org.apache.spark.sql.cassandra")
    .options(connectionOpts)
    .options(Map(
      "table" -> conf.cassandraTable, 
      "keyspace" -> conf.cassandraKeyspace
    ))
    .mode("append")
    .save()
}

env: Spark 3.0.0 standalone cluster with 3 worker nodes

# create 3 executors (5 cores each)
"${SPARK_HOME}/bin/spark-submit"
--driver-memory 3G \
--executor-memory 3G \
--master spark://spark-master-1:7077,spark-master-2:7077 \
--executor-cores=5 \
--total-executor-cores=15 \
--conf spark.kryoserializer.buffer.max.mb=512 \
--conf spark.driver.maxResultSize=0
...

The behavior is the same when I change spark.cassandra.output.batch.grouping.key -> partition. The executor balancing is uneven and executors 1 and 2 are hanging:executor_stacktrace.txt

driver_stacktrace.txt

spark-cassandra-connector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

jaroslaw.grabowski_50515 avatar image
jaroslaw.grabowski_50515 answered ·

@tomas.bartalos_162914 how many partitions does your topic have?

Could you do a thread dump for each hanging executor?

Are you able to reproduce the issue with the "Code simplified"? It has only one stream and works fine for me.

What's the idea behind using foreachBatch instead of .format("org.apache.spark.sql.cassandra") ?

Are you able to reproduce the issue with a different write target (like console)?

1 comment Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

I've added executor and driver's thread dump to the original question (the stacktrace is same for every task's thread).

Our code consumes 3 kafka streams with 8 topics each. Unfortunately I'm not able to reproduce the issue with "code simplified". I even tried to extract the important part of the code to mini app and the issue is not manifested there. However everytime I use the real app code on staging environment, the issue is there :-(. Could this be somehow related to structured streaming checkpoints ? I will try to further investigate and reproduce the behavior.

0 Likes 0 · ·
Erick Ramirez avatar image
Erick Ramirez answered ·

It would be good if you could provide a minimum amount of code plus your environment configuration that would allow the devs to replicate the issue.

Can you also confirm the version of Apache Cassandra you're running against? We want to make sure that it isn't for an unsupported DB.

We've had reports of issues with the beta release such as reduced throughput that is being tracked under SPARKC-614 so we want to thank you for testing out the beta release. Cheers!

2 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Thank you for your reply.

[Update posted in original question above]

[Post converted to comment since it's not an "answer"]

0 Likes 0 · ·
Erick Ramirez avatar image Erick Ramirez ♦♦ tomas.bartalos_162914 ·

Thanks for the additional details. I'll share it with the Analytics team here in DataStax. Cheers!

0 Likes 0 · ·