Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

DLF avatar image
DLF asked ·

Getting TransportException "Connection has been closed" when using structured streaming read and write data to Cassandra

I'm using structured streaming to read and save data to cassandra.

Cassandra Version is 3.11.3. Spark 2.2 was used.

Spark-cassandra-connector:2.0.10 driver was used. The app runs fine at the start, but every 20 hours perhaps, it suspended with exception:

java.io.IOException: Exception during execution of SELECT "is_delete", "region_code", "gateway_id", "purpose_code", "address_code" FROM "hk_ods"."frm_firealarm_gateway" WHERE token("gateway_id") > ? AND token("gateway_id") <= ?   ALLOW FILTERING: [node1-fzy/192.168.1.191:9042] Connection has been closed
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:350)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: com.datastax.driver.core.exceptions.TransportException: [node1-fzy/192.168.1.191:9042] Connection has been closed
    at com.datastax.driver.core.exceptions.TransportException.copy(TransportException.java:38)
    at com.datastax.driver.core.exceptions.TransportException.copy(TransportException.java:24)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
    at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
    at com.sun.proxy.$Proxy13.execute(Unknown Source)
    at com.datastax.spark.connector.cql.DefaultScanner.scan(Scanner.scala:34)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:342)
    ... 22 more
Caused by: com.datastax.driver.core.exceptions.TransportException: [node1-fzy/192.168.1.191:9042] Connection has been closed
    at com.datastax.driver.core.Connection$ConnectionCloseFuture.force(Connection.java:1210)
    at com.datastax.driver.core.Connection$ConnectionCloseFuture.force(Connection.java:1195)
    at com.datastax.driver.core.Connection.defunct(Connection.java:445)
    at com.datastax.driver.core.Connection$Dispatcher.exceptionCaught(Connection.java:1128)
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:844)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    ... 1 more

1610168059349.png

The image above is node2(192.168.1.192) status. The casssandra process has not been killed. But the exception indicate to node2 connection has been closed.

Here is the system avg load. No info show a sharp load aroud 1:30 am.

1610167960594.png

My cluster contains 3 nodes both with 4cores and 16G memory for dev environment, mainly including service CDH, Storm, Cassandra and so on. The average cpu load both 3 nodes are aroud 70% and memory usage are about 80%. The operation system is Ubuntu 14.04 LTS.

The structured streaming app does things like consuming kafka signal and joining with cassandra table completing signal info , after fufill some logical computation then write back to cassandra and kairosdb with totally about 20 query streams.

cassandraspark-cassandra-connector
1610165381448.png (209.3 KiB)
1610167960594.png (54.6 KiB)
1610168059349.png (240.2 KiB)
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

jaroslaw.grabowski_50515 avatar image
jaroslaw.grabowski_50515 answered ·

Hi! Could you check the system log to see if OOM kills your DSE process?

If Spark overloads your nodes use throttling to minimize the impact: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md


2 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Unfortunately it seems Cassandra process never been killed except manually. After restart structured streaming app, it runs fine for about 18 hours and the same exception shows up but for another node and another table query. The time exception shows up is 01:30 am. And the system monitor image shows no huge load.

 java.io.IOException: Exception during execution of SELECT "ammeter_id", "ct_change_ratio", "pt_change_ratio", "organise_unit_id", "gateway_id", "is_delete" FROM "hk_ods"."em_ammetermanage" WHERE token("ammeter_id") > ? AND token("ammeter_id") <= ?   ALLOW FILTERING: [/192.168.1.192:9042] Connection has been closed

Seems this error shows up randomly between nodes of Cassandra Cluster. And both the prometheus monitor data and cassandra sys log show no clue of nodes down.

0 Likes 0 ·

Does this architecture of app referred above in revised question cause this exception because lots of cassandra table read to join with steaming DF?

0 Likes 0 ·
Erick Ramirez avatar image
Erick Ramirez answered ·

There's a known-issue with specific versions of Cassandra that I want to rule out quickly.

Please update your original question and provide the version of Cassandra and/or DSE you're using. Cheers!

8 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Cassandra Version is 3.11.3 and I use spark 2.2. The connector version is spark-cassandra-connector_2.11-2.0.10

0 Likes 0 ·

Thanks for the update. It isn't the known issue I was thinking of.

I'm going to reach out to DataStax Analytics team and get them to respond to your post. Cheers!

0 Likes 0 ·
DLF avatar image DLF Erick Ramirez ♦♦ ·

Thanks very much. I checked cassandra system.log and there is no relative error info. Really dont know what cuases this.

Thanks for your quick reply!

0 Likes 0 ·

Have you checked the logs of node1-fzy/192.168.1.191 to see what the node is doing around the time (or a few seconds immediately before) the exception was thrown by the driver?

0 Likes 0 ·
DLF avatar image DLF Erick Ramirez ♦♦ ·

And just now I shut down one node by kill -9 pid intendidly and I got the same exception~~. Seems some nodes of my cluster keeps down every night!

But the cassandra system log did not imply any info of shutdown. I' m operating prometheus jmx to monitor my cluster now, hope this can show some clue

0 Likes 0 ·
Show more comments

@Erick Ramirez After anonther 18 hours around, the same error occurs but point to another table and another node. It seems "Connection has been closed" is randomly between nodes cause I have seen this info show [/192.168.1.190:9042] too.

Does this have something to do with my code referred above due to lots of cassandra table be read to join with streaming DF that cause the connection being closed forcely?

But it's strange why this exception only shows up after app runs hours ? And the streaming data input is smooth and there is no huge amount of data at some exact time.

0 Likes 0 ·