question

sumeet avatar image
sumeet asked Erick Ramirez answered

How do I handle backpressure with Reactive programming in the Java driver?

I am using cassandra java reactive api like below to read . When I am running 700 request / sec and in each request 1200 rows are fetched. It is throwing below error . It is not able to handle back pressure . How to handle backpressures on read java native reactive driver.

@Override
public Flux<ItemInventory> getItem(String id, String market) {
    BoundStatement bound = findByIdAndMarket.bind(id,market);
    ReactiveResultSet rs = session.executeReactive(bound);  // 1200 rows  

    return Flux.from(rs)
        .map(
            row -> {
                return ItemInventory.builder()
                    .item(row.getString("item"))
                    .build()
            }
    );
}
com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT10S
   2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT     at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.lambda$scheduleTimeout$1(CqlRequestHandler.java:206) ~[java-driver-core-4.13.0.jar:na]
   2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT     Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
   2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT     *__checkpoint ⇢ Handler com.gap.inventory.controller.ItemController#getItem(String, String, String, String, String) [DispatcherHandler]
   2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT     *__checkpoint ⇢ HTTP GET "/api/v1/item_inventory_buckets/3404180010000?market=US" [ExceptionHandlingWebHandler]
   2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT Original Stack Trace:
   2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT             at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.lambda$scheduleTimeout$1(CqlRequestHandler.java:206) ~[java-driver-core-4.13.0.jar:na]
   2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT             at io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
   2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT             at io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
   2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT             at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
   2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT             at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
   2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT             at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
   2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT             at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
java driver
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

Erick Ramirez avatar image
Erick Ramirez answered

Unfortunately, there isn't a mechanism for clients to manage backpressure. More info is provided in the Advanced topics section of Reactive Style Programming with the Java driver.

But based on your description, the DriverTimeoutException indicates that the coordinators are overloaded with requests and the replicas are likely not able to cope either.

700 requests per second isn't particularly high so it suggests to me that your cluster doesn't have enough capacity. I would suggest having a look at your cluster monitoring to see if your cluster is hitting the peak traffic it can serve. If so, it might be indicative that you need to review the size of your cluster and possibly look at increasing the capacity by adding more nodes. Cheers!

Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.