question

ortizfabio_185816 avatar image
ortizfabio_185816 asked ortizfabio_185816 commented

Why does the Spark connector not retry when TableWriter.writeInternal gets an exception?

Hi,


I am using spark-cassandra-connector_2.11-2.0.8 and I have been getting exceptions while writing to Cassandra like this:

Cassandra timeout during write query at consistency LOCAL_ONE (1 replica were required but only 0 acknowledged the write)


What is not clear to me is what happens to my job when an exception happens. Clearly this is an exception that could be retried. But looking at the class TableWriter starting at line 241:

queryExecutor.getLatestException().map {
  case exception =>
    throw new IOException(
      s"""Failed to write statements to $keyspaceName.$tableName. The
         |latest exception was
         |  ${exception.getMessage}
         |
         |Please check the executor logs for more exceptions and information
       """.stripMargin)
}


I see an exception being raised but It appears at this point the current task is aborted thus failing the job. Is this the correct understanding?

If so why wouldn't the driver retry again instead of failing a job?


Thanks


sparkconnector
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

Russell Spitzer avatar image
Russell Spitzer answered ortizfabio_185816 commented

It actually does retry. There are several layers of retry here.

First on actual execution the query will hit the retry policy as specified

CassandraConnectionFactory

Which uses

MultipleRetryPolicy which is configured by parameters as explained in the Reference Docs

If the query times out that amount of times then the error is bubbled back up to the task which fails it.

At this point there is yet *another* system of retries that takes place. Spark has a configurable number of task attempts (spark.task.maxFailures) before it decides a task is unsolvable and aborts the whole job.


This means for you to actually abort a job the insert must fail internally 60 Times and then the Task as a whole must Fail 4 times. Which means you had a total of 240 failures before the job is restarted.

If this is the case you can increase timeouts or retries, but in all likelihood the cluster is just being overwhelmed to the point that no queries can succeed. While we would like to have better automatic throughput throttling that is currently not available and it's up to the user to choose a sustainable level.

3 comments Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

ortizfabio_185816 avatar image ortizfabio_185816 commented ·

Russel thanks for your reply.

I seen the link you gave me and I am aware of query.retry.count I was under the impression this was for reads. I am doing writes does this still works for writing?

I am also aware of the (spark.task.maxFailures) however my impression is that the task will be restarted from that point then inserting again any data previously inserting. This can do the job but it might delay the job.

I would appreciate it if you confirm the query.retry.count option.


Thanks

0 Likes 0 ·
Erick Ramirez avatar image Erick Ramirez ♦♦ ortizfabio_185816 commented ·

Just letting you know that I've converted your post to a comment since it's not an "answer". Cheers!

0 Likes 0 ·
ortizfabio_185816 avatar image ortizfabio_185816 commented ·

I accepted this answer because currently the only way to retry is at the level of Spark by using parameter spark.task.maxFailures. The other parameters mentioned here are for reading not writting.

0 Likes 0 ·