question

ortizfabio_185816 avatar image
ortizfabio_185816 asked Erick Ramirez edited

Why does my Spark app using connector v2.0.8 hang on 2 tasks?

I have a job that writes to Cassandra table from Spark. The job has 36 executors which have 46857 tasks. All the tasks complete except two. There is always two task hanging. When I dump thread on that executor and I look at all the threads I see the hanging ones on a state of TIMED_WAITING with a lock of: "Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1804210130}) ".

I see the stack trace for that thread to be hanging at code: RateLimiter.scala:29.

Below is the stack trace. I will appreciate any hints about this problem.


Thanks

java.lang.Thread.sleep(Native Method) com.datastax.spark.connector.writer.RateLimiter$$anonfun$$lessinit$greater$default$4$1.apply$mcVJ$sp(RateLimiter.scala:29) com.datastax.spark.connector.writer.RateLimiter$$anonfun$$lessinit$greater$default$4$1.apply(RateLimiter.scala:29) com.datastax.spark.connector.writer.RateLimiter$$anonfun$$lessinit$greater$default$4$1.apply(RateLimiter.scala:29) com.datastax.spark.connector.writer.RateLimiter.maybeSleep(RateLimiter.scala:63) com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1$$anonfun$apply$2.apply(TableWriter.scala:236) com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1$$anonfun$apply$2.apply(TableWriter.scala:233) scala.collection.Iterator$class.foreach(Iterator.scala:893) com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233) com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210) com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112) com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145) com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111) com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210) com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197) com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183) com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) org.apache.spark.scheduler.Task.run(Task.scala:108) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)
sparkconnector
2 comments
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Erick Ramirez avatar image Erick Ramirez ♦♦ commented ·

@ortizfabio_185816 The stack trace is a bit difficult to read. Are you able to upload it somewhere (like https://gist.github.com/) then post the URL here? Also, can you provide a bit more background on what your app is doing so engineers looking at this have a bit more context? Cheers!

1 Like 1 ·
ortizfabio_185816 avatar image ortizfabio_185816 commented ·

Erick:

I have put the stack trace here: https://gist.github.com/ortizfabio/84f791a129debbc8f5844e186a849f07

Thanks

1 Like 1 ·

1 Answer

Russell Spitzer avatar image
Russell Spitzer answered ortizfabio_185816 commented

When a few tasks hang this is usually a symptom of data skew in previous operations. Basically you end up with a Spark partition that has significantly more information in it than the other average tasks in that stage.

The trace provided shows that the "write speed" rate limiter is where the code is waiting, which most likely means that in these very large tasks you are writing data but it is triggering the write rate limiter causing it pause while writing to maintain the limit set in the configuration. By default writing is speed is unlimited.


I would check the Driver UI to see if those specific tasks are reporting any amount of data going through them. And of course check to make sure your writing is not limited. But usually this is the case that a previous operation did some shuffle or cogroup which ended up placing almost all the info in one or two partitions (groupBy userName where "test" is in billions of things)

Since the sleep is measured like this


val overflow = currentFill - bucketSize
val delay = 1000L * overflow / rate
sleep(delay)


So the sleep can only be large if the amount of records inserted in the last time period was extremely large. So It's unlikely an issue there

9 comments Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

ortizfabio_185816 avatar image ortizfabio_185816 commented ·

I wish it was just that simple. As you can see from the picture below every executor has the same exact load. There is a large number of partitions 2000 distributing the load.

0 Likes 0 ·
Erick Ramirez avatar image Erick Ramirez ♦♦ ortizfabio_185816 commented ·

@ortizfabio_185816 I agree with Russell that this is data-related because otherwise, the other tasks wouldn't have worked. Do you have reason to believe the problem lies with the connector? If so, it would be good to hear your reasoning behind it.

In any case, did you manage to review the stdout/stderr for the tasks to see if they yield clues as to where they're stuck? Cheers!

0 Likes 0 ·
ortizfabio_185816 avatar image ortizfabio_185816 commented ·

I can also see in Graphana that there are 0 writes going into that table.

0 Likes 0 ·
cassweb-precalc.png (40.3 KiB)
ortizfabio_185816 avatar image ortizfabio_185816 commented ·

I am not sure I added the comments in the write place. But they are appended to this answer. Can you please take a look by clicking the the double comment icon?


Thanks

0 Likes 0 ·
Erick Ramirez avatar image Erick Ramirez ♦♦ ortizfabio_185816 commented ·

@ortizfabio_185816 I converted your comments since they weren't "answers" -- they were comments to Russell's answer. Cheers!

0 Likes 0 ·
ortizfabio_185816 avatar image ortizfabio_185816 Erick Ramirez ♦♦ commented ·

the problem is with the connector because when there is a mutation exception in any partition the driver hangs the whole job. I can see from the Graphana tool that no inserts are taking place but the driver and all partitions are still there.

This is a problem with this driver. First if very hard to configure so it does not produce a mutation. Second if it does it should retry not hang.


0 Likes 0 ·
Show more comments
ortizfabio_185816 avatar image ortizfabio_185816 commented ·

So for example I run the job again and I got an error. As you can see from the Spark GUI it is still running.

It is supposed to be writing to Cassandra that is the last task. However I see the exception in the errors:

Cassandra timeout during write query at consistency LOCAL_ONE (1 replica were required but only 0 acknowledged the write)
Please check the executor logs for more exceptions and information at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1$$anonfun$apply$3.apply(TableWriter.scala:243)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1$$anonfun$apply$3.apply(TableWriter.scala:241)

When I look at the cluster thru Graphana for the last hour there is no writes. From the same Graphana I can see 30+ client connected which is what I have configured. (see attachments)

0 Likes 0 ·
capture.png (13.0 KiB)
capture.png (8.8 KiB)
ortizfabio_185816 avatar image ortizfabio_185816 commented ·

Yes it is data skew related problem

0 Likes 0 ·