I am using cassandra java reactive api like below to read . When I am running 700 request / sec and in each request 1200 rows are fetched. It is throwing below error . It is not able to handle back pressure . How to handle backpressures on read java native reactive driver.
@Override public Flux<ItemInventory> getItem(String id, String market) { BoundStatement bound = findByIdAndMarket.bind(id,market); ReactiveResultSet rs = session.executeReactive(bound); // 1200 rows return Flux.from(rs) .map( row -> { return ItemInventory.builder() .item(row.getString("item")) .build() } ); }
com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT10S 2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.lambda$scheduleTimeout$1(CqlRequestHandler.java:206) ~[java-driver-core-4.13.0.jar:na] 2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT *__checkpoint ⇢ Handler com.gap.inventory.controller.ItemController#getItem(String, String, String, String, String) [DispatcherHandler] 2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT *__checkpoint ⇢ HTTP GET "/api/v1/item_inventory_buckets/3404180010000?market=US" [ExceptionHandlingWebHandler] 2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT Original Stack Trace: 2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.lambda$scheduleTimeout$1(CqlRequestHandler.java:206) ~[java-driver-core-4.13.0.jar:na] 2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT at io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715) ~[netty-common-4.1.74.Final.jar:4.1.74.Final] 2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT at io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34) ~[netty-common-4.1.74.Final.jar:4.1.74.Final] 2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703) ~[netty-common-4.1.74.Final.jar:4.1.74.Final] 2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790) ~[netty-common-4.1.74.Final.jar:4.1.74.Final] 2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503) ~[netty-common-4.1.74.Final.jar:4.1.74.Final] 2022-03-07T23:11:17.27-0800 [APP/PROC/WEB/0] OUT at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]