I'm using structured streaming to read and save data to cassandra.
Cassandra Version is 3.11.3. Spark 2.2 was used.
Spark-cassandra-connector:2.0.10 driver was used. The app runs fine at the start, but every 20 hours perhaps, it suspended with exception:
java.io.IOException: Exception during execution of SELECT "is_delete", "region_code", "gateway_id", "purpose_code", "address_code" FROM "hk_ods"."frm_firealarm_gateway" WHERE token("gateway_id") > ? AND token("gateway_id") <= ? ALLOW FILTERING: [node1-fzy/192.168.1.191:9042] Connection has been closed at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:350) at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367) at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: com.datastax.driver.core.exceptions.TransportException: [node1-fzy/192.168.1.191:9042] Connection has been closed at com.datastax.driver.core.exceptions.TransportException.copy(TransportException.java:38) at com.datastax.driver.core.exceptions.TransportException.copy(TransportException.java:24) at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68) at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40) at com.sun.proxy.$Proxy13.execute(Unknown Source) at com.datastax.spark.connector.cql.DefaultScanner.scan(Scanner.scala:34) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:342) ... 22 more Caused by: com.datastax.driver.core.exceptions.TransportException: [node1-fzy/192.168.1.191:9042] Connection has been closed at com.datastax.driver.core.Connection$ConnectionCloseFuture.force(Connection.java:1210) at com.datastax.driver.core.Connection$ConnectionCloseFuture.force(Connection.java:1195) at com.datastax.driver.core.Connection.defunct(Connection.java:445) at com.datastax.driver.core.Connection$Dispatcher.exceptionCaught(Connection.java:1128) at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:844) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) ... 1 more
The image above is node2(192.168.1.192) status. The casssandra process has not been killed. But the exception indicate to node2 connection has been closed.
Here is the system avg load. No info show a sharp load aroud 1:30 am.
My cluster contains 3 nodes both with 4cores and 16G memory for dev environment, mainly including service CDH, Storm, Cassandra and so on. The average cpu load both 3 nodes are aroud 70% and memory usage are about 80%. The operation system is Ubuntu 14.04 LTS.
The structured streaming app does things like consuming kafka signal and joining with cassandra table completing signal info , after fufill some logical computation then write back to cassandra and kairosdb with totally about 20 query streams.