When fetching results from cassandra db using single thread it works fine, as soon as the number of threads are increased all the threads slows down. Example using 1 thead it takes 5 sec, but with 2 each takes 10 sec. Below are the details of how I am executing the fetching and cassandra model.
In each execution of the below function it creates approx 30 async Queries to cassandra db. It queries the db based on date and cmId as the partition key.
These are the below 3 queries happening
findByCreateDayAndCmId - Select * from conversation_detail where create_day=? and cm_id=?;
findByCreateDayAndCmIdAndTimestampGreaterThanEqual - Select * from conversation_detail where create_day= ? and cm_id=? and create_date >= ?;
findByCreateDayAndCmIdAndTimestampLessThanEqual - Select * from conversation_detail where create_day=? and cm_id=? and create_date <= ?
The table structure for conversation_detail is below, the partition key is create_day. And cm_id,create_date,id are clustering columns, I have queried based on these only.
So to query for 1 month of data it is making 30 async request. Its works fine when only 1 thread is executing the function but if we increase the number of threads it slows down.
1 month of data is approx 374000 rows,table contains data 11500000 rows total, also each partition has approx 15000 rows only.
Table Structure : -
CREATE TABLE conversation_detail ( id int, cm_id int, ticket_id text, task_id int, create_date text, conversation_type text, conversation_id int, type text, agent_id int, assign_reference int, queue_key text, folder_id int, create_day date, PRIMARY KEY ((create_day),cm_id,create_date,id) );
public List<ConversationDetail> getConversationDetailByCreateDateAndCustomerId(Timestamp startDateTimestamp, Timestamp endDateTimestamp, int cmId) { LocalDate startDate = startDateTimestamp.toInstant().atZone(ZoneId.systemDefault()).toLocalDate(); LocalDate endDate = endDateTimestamp.toInstant().atZone(ZoneId.systemDefault()).toLocalDate(); List<ConversationDetail> results = new ArrayList<>(); List<ResultSetFuture> futures = new ArrayList<>(); try { for (LocalDate date = startDate.plusDays(1); date.isBefore(endDate); date = date.plusDays(1)) { ResultSetFuture resultSetFuture = session.executeAsync(findByCreateDayAndCmId.bind(date, cmId).setReadTimeoutMillis(20000)); futures.add(resultSetFuture); } futures.add(session.executeAsync(findByCreateDayAndCmIdAndTimestampGreaterThanEqual.bind(startDate, cmId, startDateTimestamp).setReadTimeoutMillis(20000))); futures.add(session.executeAsync(findByCreateDayAndCmIdAndTimestampLessThanEqual.bind(endDate, cmId, endDateTimestamp).setReadTimeoutMillis(20000))); for (ResultSetFuture future : futures) { try { ResultSet rows = future.getUninterruptibly(); List<ConversationDetail> all = mapper.map(rows).all(); results.addAll(all); }catch (Exception e){ System.out.println("Exception1 " + e.getMessage()); } } }catch (Exception e){ System.out.println("Exception2 "+e.getMessage()); } return results; }