Hi, We're using the spark cassandra connector with SSL, and we want to make sure we're following best practices. Sorry in advance for the length of this multi-part explanation/question.
Connector 2.4.3
We specify the SSL connection options in our SparkSession builder.
However, in order to facilitate cloud/container deployment, our truststore/keystore are packaged as classpath jar resources rather than a file system path. It appears the connector expects/requires the value for those properties to be file system paths and not a more general URI. For example, specifying the URI "classpath:ssl/truststore" results in exception (connector 2.4.3):
2021-04-12 11:17:23.861 ERROR [boundedElastic-1] .e.b.a.SparkCassandraConnectorDiagnostic : java.nio.file.InvalidPathException Illegal char <:> at index 9: classpath:ssl/truststorejava.nio.file.InvalidPathException: Illegal char <:> at index 9: classpath:ssl/truststore at sun.nio.fs.WindowsPathParser.normalize(WindowsPathParser.java:182) ~[na:1.8.0_232] at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:153) ~[na:1.8.0_232] at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:77) ~[na:1.8.0_232] at sun.nio.fs.WindowsPath.parse(WindowsPath.java:94) ~[na:1.8.0_232] at sun.nio.fs.WindowsFileSystem.getPath(WindowsFileSystem.java:255) ~[na:1.8.0_232] at java.nio.file.Paths.get(Paths.java:84) ~[na:1.8.0_232] at com.datastax.spark.connector.cql.DefaultConnectionFactory$$anonfun$trustStore$lzycompute$1$1.apply(CassandraConnectionFactory.scala:92) ~[spark-cassandra-connector_2.11-2.4.3.jar:2.4.3] ...
So, the first question may actually be an enhancement request: Can/could the DefaultConnectionFactory be modified to use java.nio.file.Paths#get(URI) to allow classpath or other URI-based resources?
Alternatively, we are utilizing a custom CassandraConnectionFactory specified by connection option spark.cassandra.connection.factory solely for the purpose of resolving URI-based resources, then using the driver's Cluster$Builder#withSSL method. That has been working fine in connector 2.4.3. Although we are not sure using a custom factory for this purpose is the best option, we found no other.
Connector 3.x
Meanwhile, we are now trying to upgrade to connector 3.x in order to move to spark 3.x in order to move to java11. Of course, more recent versions of the (oss) driver replace Cluster with CqlSession. Accordingly, newer versions of CassandraConnectionFactory have changed signatures, and we adapted our custom factory to comply.
However, we observe different behavior with the 3.x custom factory. In 2.4.3, we see our custom factory method CassandraConnectionFactory#createCluster called only once for the life of the application.
But in 3.0 (and 2.5.x), we see our custom factory method CassandraConnectionFactory#createSession called repeatedly, apparently for each spark Dataset operation employing the connector. Not only is the cost for ssl session creation non-trivial, but they don't appear to be closed. as we see WARNings in the spark executor logs:
You have too many session instances: (n) active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration)
In version 3.0.1, some repeated createSession calls appear to come from Scanner (re)construction:
at com.datastax.spark.connector.cql.CassandraConnector$.createSession(CassandraConnector.scala:167) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1] at com.datastax.spark.connector.cql.CassandraConnector$.$anonfun$sessionCache$1(CassandraConnector.scala:161) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1] at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:32) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1] at com.datastax.spark.connector.cql.RefCountedCache.syncAcquire(RefCountedCache.scala:69) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1] at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:57) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1] at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1] at com.datastax.spark.connector.cql.DefaultScanner.<init>(Scanner.scala:34) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1] at com.datastax.spark.connector.cql.CassandraConnectionFactory.getScanner(CassandraConnectionFactory.scala:38) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1] at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:288) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.1.1.jar:3.1.1] ...
So, we introduced a static cache for the CqlSession in our custom factory. But all this causes us to question whether we have chosen an appropriate solution.
[UPDATE]
Sample 2.4.3 Cluster factory
import java.net.URL; import javax.net.ssl.SSLContext; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Cluster.Builder; import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions; import com.datastax.spark.connector.cql.CassandraConnectionFactory; import com.datastax.spark.connector.cql.CassandraConnectorConf; import com.datastax.spark.connector.cql.DefaultConnectionFactory; import com.datastax.spark.connector.cql.DefaultScanner; import com.datastax.spark.connector.cql.Scanner; import com.datastax.spark.connector.rdd.ReadConf; import scala.collection.IndexedSeq; import scala.collection.immutable.Set; import scala.collection.immutable.Set$; /** * A CassandraSSLConnectionFactory that supports loading keystores and truststores from the classpath, and more * generally from URLs. {@link SSLConfiguration.Settings#getKeyStore()} and {@link * SSLConfiguration.Settings#getTrustStore()} must be valid URL specification, e.g. file:/path/truststore or * classpath:ssl/truststore. */ public class CassandraSSLConnectionFactory implements CassandraConnectionFactory { @Override public Cluster createCluster(final CassandraConnectorConf conf) { // create a builder w/o ssl configuration as it will fail with keystore resources final Builder builder = DefaultConnectionFactory.clusterBuilder( new CassandraConnectorConf( conf.hosts(), conf.port(), conf.authConf(), conf.localDC(), conf.keepAliveMillis(), conf.minReconnectionDelayMillis(), conf.maxReconnectionDelayMillis(), conf.maxConnectionsPerExecutor(), conf.compression(), conf.queryRetryCount(), conf.connectTimeoutMillis(), conf.readTimeoutMillis(), conf.connectionFactory(), CassandraConnectorConf.DefaultCassandraSSLConf())); // add custom built SSLOptions if ssl enabled if (conf.cassandraSSLConf().enabled()) builder.withSSL(RemoteEndpointAwareJdkSSLOptions.builder().withSSLContext(getContext(conf)).build()); return builder.build(); } private SSLContext getContext(final CassandraConnectorConf conf) { // SSLConfiguration helper handles URI-based stores try { return SSLConfiguration.getContext( SSLConfiguration.SettingsImpl.builder() .keyPassword(conf.cassandraSSLConf().keyStorePassword().get()) .keyStore(new URL(conf.cassandraSSLConf().keyStorePath().get())) .keyStorePassword(conf.cassandraSSLConf().keyStorePassword().get()) .keyStoreType(conf.cassandraSSLConf().keyStoreType()) .trustStore(new URL(conf.cassandraSSLConf().trustStorePath().get())) .trustStorePassword(conf.cassandraSSLConf().trustStorePassword().get()) .trustStoreType(conf.cassandraSSLConf().trustStoreType()) .build()); } catch (Exception e) { throw new RuntimeException(e); } } ... }
Sample 3.0.1 CqlSession factory
import java.net.URL; import java.util.stream.Stream; import javax.net.ssl.SSLContext; import org.springframework.stereotype.Component; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSessionBuilder; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder; import com.datastax.spark.connector.cql.CassandraConnectionFactory; import com.datastax.spark.connector.cql.CassandraConnectorConf; import com.datastax.spark.connector.cql.CassandraConnectorConf.CassandraSSLConf; import com.datastax.spark.connector.cql.DefaultConnectionFactory; import com.datastax.spark.connector.cql.DefaultScanner; import com.datastax.spark.connector.cql.IpBasedContactInfo; import com.datastax.spark.connector.cql.Scanner; import com.datastax.spark.connector.rdd.ReadConf; import lombok.AccessLevel; import lombok.NoArgsConstructor; import scala.collection.IndexedSeq; import scala.collection.immutable.Set; import scala.collection.immutable.Set$; /** * A CassandraSSLConnectionFactory that supports loading keystores and truststores from the classpath, and more * generally from URLs. {@link SSLConfiguration.Settings#getKeyStore()} and {@link * SSLConfiguration.Settings#getTrustStore()} must be valid URL specification, e.g. file:/path/truststore or * classpath:ssl/truststore. N.B. the DefaultConnectionFactory (circa 4.7) expects file-based stores, compliant with * java Paths#get, and does not support classpath resources. */ @Component public class CassandraSSLConnectionFactory implements CassandraConnectionFactory { private static final long serialVersionUID = 1L; @Override public CqlSession createSession(final CassandraConnectorConf conf) { if (Cache.cqlSession == null) Cache.init(conf); return Cache.cqlSession; } ... /** * Static singleton cache prevents session leak in spark executors due to repeated invocation of the custom factory */ @NoArgsConstructor(access = AccessLevel.PRIVATE) private static class Cache { private static volatile CqlSession cqlSession; private static synchronized void init(final CassandraConnectorConf conf) { if (cqlSession != null) return; // already initialized ProgrammaticDriverConfigLoaderBuilder configBuilder = DriverConfigLoader.programmaticBuilder(); DriverConfigLoader configLoader = DefaultConnectionFactory.connectorConfigBuilder(conf, configBuilder).build(); CqlSessionBuilder builder = CqlSession.builder().withConfigLoader(configLoader); if (isSSL(conf)) { IpBasedContactInfo info = IpBasedContactInfo.class.cast(conf.contactInfo()); builder.withSslContext(getContext(info.cassandraSSLConf())); builder.withAuthProvider(info.authConf().authProvider().get()); } cqlSession = builder.build(); } private static boolean isSSL(final CassandraConnectorConf conf) { return Stream.of(conf) .map(CassandraConnectorConf::contactInfo) .filter(IpBasedContactInfo.class::isInstance) .map(IpBasedContactInfo.class::cast) .map(IpBasedContactInfo::cassandraSSLConf) .anyMatch(CassandraSSLConf::enabled); } private static SSLContext getContext(final CassandraSSLConf cassandraSSLConf) { // SSLConfiguration helper handles URL-base stores try { return SSLConfiguration.getContext( SSLConfiguration.SettingsImpl.builder() .keyPassword(cassandraSSLConf.keyStorePassword().get()) .keyStore(new URL(cassandraSSLConf.keyStorePath().get())) .keyStorePassword(cassandraSSLConf.keyStorePassword().get()) .keyStoreType(cassandraSSLConf.keyStoreType()) .trustStore(new URL(cassandraSSLConf.trustStorePath().get())) .trustStorePassword(cassandraSSLConf.trustStorePassword().get()) .trustStoreType(cassandraSSLConf.trustStoreType()) .build()); } catch (Exception e) { throw new RuntimeException(e); } } } }
Reproduction Example 3.0.1
import java.io.Serializable; import java.util.Arrays; import javax.annotation.PostConstruct; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.springframework.stereotype.Component; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.spark.connector.cql.CassandraConnectionFactory; import com.datastax.spark.connector.cql.CassandraConnectorConf; import com.datastax.spark.connector.cql.DefaultConnectionFactory; import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @Component @Slf4j @RequiredArgsConstructor public class Repro { // create table if not exists expctr.repro (key text, value text, primary key (key)) @Data public static class Dao implements Serializable { public String key; public String value; } @PostConstruct public void init() { String dc = "datacenter1"; String host = "localhost"; String keyspace = "expctr"; String table = "repro"; String user = "user"; String password = "password"; // String keyStore = "file:/C:/ssl/keystore"; String keyStore = "C:/ssl/keystore"; String keyStorePassword = "password"; // String trustStore = "file:/C:/ssl/truststore"; String trustStore = "C:/ssl/truststore"; String trustStorePassword = "password"; SparkSession sparkSession = SparkSession.builder() .appName("repro") .master("local[*]") .config("spark.cassandra.connection.factory", CustomFactory.class.getName()) .config("spark.cassandra.connection.localDC", dc) .config("spark.cassandra.connection.host", host) .config("spark.cassandra.connection.port", "9042") .config("spark.cassandra.connection.keepAliveMS", "3600000") .config("spark.cassandra.connection.ssl.clientAuth.enabled", true) .config("spark.cassandra.auth.username", user) .config("spark.cassandra.auth.password", password) .config("spark.cassandra.connection.ssl.enabled", true) .config("spark.cassandra.connection.ssl.keyStore.path", keyStore) .config("spark.cassandra.connection.ssl.keyStore.password", keyStorePassword) .config("spark.cassandra.connection.ssl.trustStore.path", trustStore) .config("spark.cassandra.connection.ssl.trustStore.password", trustStorePassword) .config("spark.sql.catalog.mycluster", "com.datastax.spark.connector.datasource.CassandraCatalog") .config("spark.sql.defaultCatalog", "mycluster") .getOrCreate(); Dao dao = new Dao(); dao.key = "key1"; dao.value = "value1"; Dataset<Row> dataset = sparkSession.createDataFrame(Arrays.asList(dao), Dao.class); for (int i = 0; i < 10; i++) { long t = System.currentTimeMillis(); log.info("before {} dataset.write", i); dataset .write() .mode(SaveMode.Append) .format("org.apache.spark.sql.cassandra") .option("keyspace", keyspace) .option("table", table) .save(); log.info("after {} dataset.write", i); log.info("elapsed {} time {} ms", i, System.currentTimeMillis() - t); } } public static class CustomFactory implements CassandraConnectionFactory { @Override public CqlSession createSession(final CassandraConnectorConf conf) { log.debug("createSession"); return DefaultConnectionFactory.createSession(conf); } } }
Reproduction Log Excerpt
2021-04-16 12:37:58.715 INFO [main] Repro : before 0 dataset.write 2021-04-16 12:37:58.835 DEBUG [main] Repro : createSession 2021-04-16 12:37:59.049 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=512d45b3)] 2021-04-16 12:38:04.526 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:38:04.831 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 2 ms 2021-04-16 12:38:10.444 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:38:10.821 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 4 ms 2021-04-16 12:38:11.825 DEBUG [Executor task launch worker for task 0.0 in stage 0.0 (TID 0)] Repro : createSession 2021-04-16 12:38:11.880 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=61ca49b4)] 2021-04-16 12:38:17.290 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:38:17.705 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 1 ms 2021-04-16 12:38:23.125 INFO [main] Repro : after 0 dataset.write 2021-04-16 12:38:23.125 INFO [main] Repro : elapsed 0 time 24410 ms 2021-04-16 12:38:23.125 INFO [main] Repro : before 1 dataset.write 2021-04-16 12:38:23.162 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:38:23.538 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 1 ms 2021-04-16 12:38:23.578 DEBUG [Executor task launch worker for task 0.0 in stage 1.0 (TID 1)] Repro : createSession 2021-04-16 12:38:23.602 WARN [Executor task launch worker for task 0.0 in stage 1.0 (TID 1)] c.d.o.d.i.core.session.DefaultSession : You have too many session instances: 5 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration) 2021-04-16 12:38:23.630 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=45879f62)] 2021-04-16 12:38:29.031 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:38:29.331 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 1 ms 2021-04-16 12:38:34.662 INFO [main] Repro : after 1 dataset.write 2021-04-16 12:38:34.663 INFO [main] Repro : elapsed 1 time 11538 ms 2021-04-16 12:38:34.663 INFO [main] Repro : before 2 dataset.write 2021-04-16 12:38:34.685 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:38:35.059 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 907 us 2021-04-16 12:38:35.094 DEBUG [Executor task launch worker for task 0.0 in stage 2.0 (TID 2)] Repro : createSession 2021-04-16 12:38:35.112 WARN [Executor task launch worker for task 0.0 in stage 2.0 (TID 2)] c.d.o.d.i.core.session.DefaultSession : You have too many session instances: 6 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration) 2021-04-16 12:38:35.142 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=54a035b3)] 2021-04-16 12:38:40.532 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:38:40.831 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 1 ms 2021-04-16 12:38:46.163 INFO [main] Repro : after 2 dataset.write 2021-04-16 12:38:46.164 INFO [main] Repro : elapsed 2 time 11501 ms 2021-04-16 12:38:46.164 INFO [main] Repro : before 3 dataset.write 2021-04-16 12:38:46.181 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:38:46.600 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 1 ms 2021-04-16 12:38:46.627 DEBUG [Executor task launch worker for task 0.0 in stage 3.0 (TID 3)] Repro : createSession 2021-04-16 12:38:46.646 WARN [Executor task launch worker for task 0.0 in stage 3.0 (TID 3)] c.d.o.d.i.core.session.DefaultSession : You have too many session instances: 7 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration) 2021-04-16 12:38:46.679 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=184b25)] 2021-04-16 12:38:52.055 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:38:52.359 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 1 ms 2021-04-16 12:38:57.678 INFO [main] Repro : after 3 dataset.write 2021-04-16 12:38:57.678 INFO [main] Repro : elapsed 3 time 11514 ms 2021-04-16 12:38:57.678 INFO [main] Repro : before 4 dataset.write 2021-04-16 12:38:57.693 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:38:58.058 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 743 us 2021-04-16 12:38:58.086 DEBUG [Executor task launch worker for task 0.0 in stage 4.0 (TID 4)] Repro : createSession 2021-04-16 12:38:58.106 WARN [Executor task launch worker for task 0.0 in stage 4.0 (TID 4)] c.d.o.d.i.core.session.DefaultSession : You have too many session instances: 8 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration) 2021-04-16 12:38:58.131 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=7b2a85c4)] 2021-04-16 12:39:03.504 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:39:03.799 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 1 ms 2021-04-16 12:39:09.149 INFO [main] Repro : after 4 dataset.write 2021-04-16 12:39:09.149 INFO [main] Repro : elapsed 4 time 11471 ms 2021-04-16 12:39:09.149 INFO [main] Repro : before 5 dataset.write 2021-04-16 12:39:09.166 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:39:09.570 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 490 us 2021-04-16 12:39:09.597 DEBUG [Executor task launch worker for task 0.0 in stage 5.0 (TID 5)] Repro : createSession 2021-04-16 12:39:09.617 WARN [Executor task launch worker for task 0.0 in stage 5.0 (TID 5)] c.d.o.d.i.core.session.DefaultSession : You have too many session instances: 9 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration) 2021-04-16 12:39:09.635 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=6f12276c)] 2021-04-16 12:39:15.020 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:39:15.330 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 1 ms 2021-04-16 12:39:20.633 INFO [main] Repro : after 5 dataset.write 2021-04-16 12:39:20.633 INFO [main] Repro : elapsed 5 time 11484 ms 2021-04-16 12:39:20.633 INFO [main] Repro : before 6 dataset.write 2021-04-16 12:39:20.648 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:39:21.041 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 607 us 2021-04-16 12:39:21.061 DEBUG [Executor task launch worker for task 0.0 in stage 6.0 (TID 6)] Repro : createSession 2021-04-16 12:39:21.078 WARN [Executor task launch worker for task 0.0 in stage 6.0 (TID 6)] c.d.o.d.i.core.session.DefaultSession : You have too many session instances: 10 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration) 2021-04-16 12:39:21.095 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=72b6c62b)] 2021-04-16 12:39:26.485 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:39:26.779 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 1 ms 2021-04-16 12:39:32.098 INFO [main] Repro : after 6 dataset.write 2021-04-16 12:39:32.098 INFO [main] Repro : elapsed 6 time 11465 ms 2021-04-16 12:39:32.098 INFO [main] Repro : before 7 dataset.write 2021-04-16 12:39:32.111 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:39:32.475 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 514 us 2021-04-16 12:39:32.500 DEBUG [Executor task launch worker for task 0.0 in stage 7.0 (TID 7)] Repro : createSession 2021-04-16 12:39:32.513 WARN [Executor task launch worker for task 0.0 in stage 7.0 (TID 7)] c.d.o.d.i.core.session.DefaultSession : You have too many session instances: 11 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration) 2021-04-16 12:39:32.527 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=c2802a4)] 2021-04-16 12:39:37.902 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:39:38.205 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 962 us 2021-04-16 12:39:43.534 INFO [main] Repro : after 7 dataset.write 2021-04-16 12:39:43.534 INFO [main] Repro : elapsed 7 time 11436 ms 2021-04-16 12:39:43.534 INFO [main] Repro : before 8 dataset.write 2021-04-16 12:39:43.546 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:39:43.906 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 395 us 2021-04-16 12:39:43.928 DEBUG [Executor task launch worker for task 0.0 in stage 8.0 (TID 8)] Repro : createSession 2021-04-16 12:39:43.943 WARN [Executor task launch worker for task 0.0 in stage 8.0 (TID 8)] c.d.o.d.i.core.session.DefaultSession : You have too many session instances: 12 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration) 2021-04-16 12:39:43.963 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=406064e6)] 2021-04-16 12:39:49.348 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:39:49.651 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 898 us 2021-04-16 12:39:54.965 INFO [main] Repro : after 8 dataset.write 2021-04-16 12:39:54.965 INFO [main] Repro : elapsed 8 time 11431 ms 2021-04-16 12:39:54.965 INFO [main] Repro : before 9 dataset.write 2021-04-16 12:39:54.979 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:39:55.342 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 321 us 2021-04-16 12:39:55.366 DEBUG [Executor task launch worker for task 0.0 in stage 9.0 (TID 9)] Repro : createSession 2021-04-16 12:39:55.380 WARN [Executor task launch worker for task 0.0 in stage 9.0 (TID 9)] c.d.o.d.i.core.session.DefaultSession : You have too many session instances: 13 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration) 2021-04-16 12:39:55.399 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=9b1d914)] 2021-04-16 12:40:00.837 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Starting schema refresh 2021-04-16 12:40:01.133 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager : [cassandra] Applying schema refresh took 1 ms 2021-04-16 12:40:06.446 INFO [main] Repro : after 9 dataset.write 2021-04-16 12:40:06.446 INFO [main] Repro : elapsed 9 time 11481 ms