Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

clayj_175802 avatar image
clayj_175802 asked ·

How can Spark connector be configured for SSL using classpath resources?

Hi, We're using the spark cassandra connector with SSL, and we want to make sure we're following best practices. Sorry in advance for the length of this multi-part explanation/question.

Connector 2.4.3

We specify the SSL connection options in our SparkSession builder.

However, in order to facilitate cloud/container deployment, our truststore/keystore are packaged as classpath jar resources rather than a file system path. It appears the connector expects/requires the value for those properties to be file system paths and not a more general URI. For example, specifying the URI "classpath:ssl/truststore" results in exception (connector 2.4.3):

2021-04-12 11:17:23.861 ERROR [boundedElastic-1] .e.b.a.SparkCassandraConnectorDiagnostic : java.nio.file.InvalidPathException Illegal char <:> at index 9: classpath:ssl/truststorejava.nio.file.InvalidPathException: Illegal char <:> at index 9: classpath:ssl/truststore
    at sun.nio.fs.WindowsPathParser.normalize(WindowsPathParser.java:182) ~[na:1.8.0_232]
    at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:153) ~[na:1.8.0_232]
    at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:77) ~[na:1.8.0_232]
    at sun.nio.fs.WindowsPath.parse(WindowsPath.java:94) ~[na:1.8.0_232]
    at sun.nio.fs.WindowsFileSystem.getPath(WindowsFileSystem.java:255) ~[na:1.8.0_232]
    at java.nio.file.Paths.get(Paths.java:84) ~[na:1.8.0_232]
    at com.datastax.spark.connector.cql.DefaultConnectionFactory$$anonfun$trustStore$lzycompute$1$1.apply(CassandraConnectionFactory.scala:92) ~[spark-cassandra-connector_2.11-2.4.3.jar:2.4.3]
    ...

So, the first question may actually be an enhancement request: Can/could the DefaultConnectionFactory be modified to use java.nio.file.Paths#get(URI) to allow classpath or other URI-based resources?

Alternatively, we are utilizing a custom CassandraConnectionFactory specified by connection option spark.cassandra.connection.factory solely for the purpose of resolving URI-based resources, then using the driver's Cluster$Builder#withSSL method. That has been working fine in connector 2.4.3. Although we are not sure using a custom factory for this purpose is the best option, we found no other.

Connector 3.x

Meanwhile, we are now trying to upgrade to connector 3.x in order to move to spark 3.x in order to move to java11. Of course, more recent versions of the (oss) driver replace Cluster with CqlSession. Accordingly, newer versions of CassandraConnectionFactory have changed signatures, and we adapted our custom factory to comply.

However, we observe different behavior with the 3.x custom factory. In 2.4.3, we see our custom factory method CassandraConnectionFactory#createCluster called only once for the life of the application.

But in 3.0 (and 2.5.x), we see our custom factory method CassandraConnectionFactory#createSession called repeatedly, apparently for each spark Dataset operation employing the connector. Not only is the cost for ssl session creation non-trivial, but they don't appear to be closed. as we see WARNings in the spark executor logs:

You have too many session instances: (n) active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration)

In version 3.0.1, some repeated createSession calls appear to come from Scanner (re)construction:

at com.datastax.spark.connector.cql.CassandraConnector$.createSession(CassandraConnector.scala:167) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1]
at com.datastax.spark.connector.cql.CassandraConnector$.$anonfun$sessionCache$1(CassandraConnector.scala:161) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1]
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:32) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1]
at com.datastax.spark.connector.cql.RefCountedCache.syncAcquire(RefCountedCache.scala:69) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1]
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:57) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1]
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1]
at com.datastax.spark.connector.cql.DefaultScanner.<init>(Scanner.scala:34) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1]
at com.datastax.spark.connector.cql.CassandraConnectionFactory.getScanner(CassandraConnectionFactory.scala:38) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1]
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:288) ~[spark-cassandra-connector_2.12-3.0.1.jar:3.0.1]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.1.1.jar:3.1.1]
...

So, we introduced a static cache for the CqlSession in our custom factory. But all this causes us to question whether we have chosen an appropriate solution.

[UPDATE]

Sample 2.4.3 Cluster factory

import java.net.URL;
import javax.net.ssl.SSLContext;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Cluster.Builder;
import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions;
import com.datastax.spark.connector.cql.CassandraConnectionFactory;
import com.datastax.spark.connector.cql.CassandraConnectorConf;
import com.datastax.spark.connector.cql.DefaultConnectionFactory;
import com.datastax.spark.connector.cql.DefaultScanner;
import com.datastax.spark.connector.cql.Scanner;
import com.datastax.spark.connector.rdd.ReadConf;
import scala.collection.IndexedSeq;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
 
/**
 * A CassandraSSLConnectionFactory that supports loading keystores and truststores from the classpath, and more
 * generally from URLs. {@link SSLConfiguration.Settings#getKeyStore()} and {@link
 * SSLConfiguration.Settings#getTrustStore()} must be valid URL specification, e.g. file:/path/truststore or
 * classpath:ssl/truststore.
 */
public class CassandraSSLConnectionFactory implements CassandraConnectionFactory {
  
 
  @Override
  public Cluster createCluster(final CassandraConnectorConf conf) {
  
 
    // create a builder w/o ssl configuration as it will fail with keystore resources
    final Builder builder =
        DefaultConnectionFactory.clusterBuilder(
            new CassandraConnectorConf(
                conf.hosts(),
                conf.port(),
                conf.authConf(),
                conf.localDC(),
                conf.keepAliveMillis(),
                conf.minReconnectionDelayMillis(),
                conf.maxReconnectionDelayMillis(),
                conf.maxConnectionsPerExecutor(),
                conf.compression(),
                conf.queryRetryCount(),
                conf.connectTimeoutMillis(),
                conf.readTimeoutMillis(),
                conf.connectionFactory(),
                CassandraConnectorConf.DefaultCassandraSSLConf()));
    // add custom built SSLOptions if ssl enabled
    if (conf.cassandraSSLConf().enabled())
      builder.withSSL(RemoteEndpointAwareJdkSSLOptions.builder().withSSLContext(getContext(conf)).build());
    return builder.build();
  }
 
  private SSLContext getContext(final CassandraConnectorConf conf) {
  
    // SSLConfiguration helper handles URI-based stores
    try {
  
      return SSLConfiguration.getContext(
          SSLConfiguration.SettingsImpl.builder()
              .keyPassword(conf.cassandraSSLConf().keyStorePassword().get())
              .keyStore(new URL(conf.cassandraSSLConf().keyStorePath().get()))
              .keyStorePassword(conf.cassandraSSLConf().keyStorePassword().get())
              .keyStoreType(conf.cassandraSSLConf().keyStoreType())
              .trustStore(new URL(conf.cassandraSSLConf().trustStorePath().get()))
              .trustStorePassword(conf.cassandraSSLConf().trustStorePassword().get())
              .trustStoreType(conf.cassandraSSLConf().trustStoreType())
              .build());
    } catch (Exception e) {
  
      throw new RuntimeException(e);
    }
  }
...
}



Sample 3.0.1 CqlSession factory

import java.net.URL;
import java.util.stream.Stream;
import javax.net.ssl.SSLContext;
import org.springframework.stereotype.Component;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
import com.datastax.spark.connector.cql.CassandraConnectionFactory;
import com.datastax.spark.connector.cql.CassandraConnectorConf;
import com.datastax.spark.connector.cql.CassandraConnectorConf.CassandraSSLConf;
import com.datastax.spark.connector.cql.DefaultConnectionFactory;
import com.datastax.spark.connector.cql.DefaultScanner;
import com.datastax.spark.connector.cql.IpBasedContactInfo;
import com.datastax.spark.connector.cql.Scanner;
import com.datastax.spark.connector.rdd.ReadConf;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import scala.collection.IndexedSeq;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
 
/**
 * A CassandraSSLConnectionFactory that supports loading keystores and truststores from the classpath, and more
 * generally from URLs. {@link SSLConfiguration.Settings#getKeyStore()} and {@link
 * SSLConfiguration.Settings#getTrustStore()} must be valid URL specification, e.g. file:/path/truststore or
 * classpath:ssl/truststore. N.B. the DefaultConnectionFactory (circa 4.7) expects file-based stores, compliant with
 * java Paths#get, and does not support classpath resources.
 */
@Component
public class CassandraSSLConnectionFactory implements CassandraConnectionFactory {
  
 
  private static final long serialVersionUID = 1L;
 
  @Override
  public CqlSession createSession(final CassandraConnectorConf conf) {
  
    if (Cache.cqlSession == null) Cache.init(conf);
    return Cache.cqlSession;
  }
 
  ...
 
  /**
   * Static singleton cache prevents session leak in spark executors due to repeated invocation of the custom factory
   */
  @NoArgsConstructor(access = AccessLevel.PRIVATE)
  private static class Cache {
  
 
    private static volatile CqlSession cqlSession;
 
    private static synchronized void init(final CassandraConnectorConf conf) {
  
      if (cqlSession != null) return; // already initialized
      ProgrammaticDriverConfigLoaderBuilder configBuilder = DriverConfigLoader.programmaticBuilder();
      DriverConfigLoader configLoader = DefaultConnectionFactory.connectorConfigBuilder(conf, configBuilder).build();
      CqlSessionBuilder builder = CqlSession.builder().withConfigLoader(configLoader);
      if (isSSL(conf)) {
  
        IpBasedContactInfo info = IpBasedContactInfo.class.cast(conf.contactInfo());
        builder.withSslContext(getContext(info.cassandraSSLConf()));
        builder.withAuthProvider(info.authConf().authProvider().get());
      }
      cqlSession = builder.build();
    }
 
    private static boolean isSSL(final CassandraConnectorConf conf) {
  
      return Stream.of(conf)
          .map(CassandraConnectorConf::contactInfo)
          .filter(IpBasedContactInfo.class::isInstance)
          .map(IpBasedContactInfo.class::cast)
          .map(IpBasedContactInfo::cassandraSSLConf)
          .anyMatch(CassandraSSLConf::enabled);
    }
 
    private static SSLContext getContext(final CassandraSSLConf cassandraSSLConf) {
  
      // SSLConfiguration helper handles URL-base stores
      try {
  
        return SSLConfiguration.getContext(
            SSLConfiguration.SettingsImpl.builder()
                .keyPassword(cassandraSSLConf.keyStorePassword().get())
                .keyStore(new URL(cassandraSSLConf.keyStorePath().get()))
                .keyStorePassword(cassandraSSLConf.keyStorePassword().get())
                .keyStoreType(cassandraSSLConf.keyStoreType())
                .trustStore(new URL(cassandraSSLConf.trustStorePath().get()))
                .trustStorePassword(cassandraSSLConf.trustStorePassword().get())
                .trustStoreType(cassandraSSLConf.trustStoreType())
                .build());
      } catch (Exception e) {
  
        throw new RuntimeException(e);
      }
    }
  }
}


Reproduction Example 3.0.1

import java.io.Serializable;
import java.util.Arrays;
import javax.annotation.PostConstruct;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.springframework.stereotype.Component;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.spark.connector.cql.CassandraConnectionFactory;
import com.datastax.spark.connector.cql.CassandraConnectorConf;
import com.datastax.spark.connector.cql.DefaultConnectionFactory;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
@RequiredArgsConstructor
public class Repro {

  // create table if not exists expctr.repro (key text, value text, primary key (key))

  @Data
  public static class Dao implements Serializable {
    public String key;
    public String value;
  }

  @PostConstruct
  public void init() {

    String dc = "datacenter1";
    String host = "localhost";
    String keyspace = "expctr";
    String table = "repro";
    String user = "user";
    String password = "password";

    // String keyStore = "file:/C:/ssl/keystore";
    String keyStore = "C:/ssl/keystore";
    String keyStorePassword = "password";
    // String trustStore = "file:/C:/ssl/truststore";
    String trustStore = "C:/ssl/truststore";
    String trustStorePassword = "password";

    SparkSession sparkSession =
        SparkSession.builder()
            .appName("repro")
            .master("local[*]")
            .config("spark.cassandra.connection.factory", CustomFactory.class.getName())
            .config("spark.cassandra.connection.localDC", dc)
            .config("spark.cassandra.connection.host", host)
            .config("spark.cassandra.connection.port", "9042")
            .config("spark.cassandra.connection.keepAliveMS", "3600000")
            .config("spark.cassandra.connection.ssl.clientAuth.enabled", true)
            .config("spark.cassandra.auth.username", user)
            .config("spark.cassandra.auth.password", password)
            .config("spark.cassandra.connection.ssl.enabled", true)
            .config("spark.cassandra.connection.ssl.keyStore.path", keyStore)
            .config("spark.cassandra.connection.ssl.keyStore.password", keyStorePassword)
            .config("spark.cassandra.connection.ssl.trustStore.path", trustStore)
            .config("spark.cassandra.connection.ssl.trustStore.password", trustStorePassword)
            .config("spark.sql.catalog.mycluster", "com.datastax.spark.connector.datasource.CassandraCatalog")
            .config("spark.sql.defaultCatalog", "mycluster")
            .getOrCreate();

    Dao dao = new Dao();
    dao.key = "key1";
    dao.value = "value1";

    Dataset<Row> dataset = sparkSession.createDataFrame(Arrays.asList(dao), Dao.class);

    for (int i = 0; i < 10; i++) {
      long t = System.currentTimeMillis();
      log.info("before {} dataset.write", i);
      dataset
          .write()
          .mode(SaveMode.Append)
          .format("org.apache.spark.sql.cassandra")
          .option("keyspace", keyspace)
          .option("table", table)
          .save();
      log.info("after {} dataset.write", i);
      log.info("elapsed {} time {} ms", i, System.currentTimeMillis() - t);
    }
  }

  public static class CustomFactory implements CassandraConnectionFactory {
    @Override
    public CqlSession createSession(final CassandraConnectorConf conf) {
      log.debug("createSession");
      return DefaultConnectionFactory.createSession(conf);
    }
  }
}


Reproduction Log Excerpt

2021-04-16 12:37:58.715  INFO [main] Repro    : before 0 dataset.write
2021-04-16 12:37:58.835 DEBUG [main] Repro    : createSession
2021-04-16 12:37:59.049 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=512d45b3)]
2021-04-16 12:38:04.526 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:38:04.831 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 2 ms
2021-04-16 12:38:10.444 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:38:10.821 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 4 ms
2021-04-16 12:38:11.825 DEBUG [Executor task launch worker for task 0.0 in stage 0.0 (TID 0)] Repro    : createSession
2021-04-16 12:38:11.880 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=61ca49b4)]
2021-04-16 12:38:17.290 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:38:17.705 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 1 ms
2021-04-16 12:38:23.125  INFO [main] Repro    : after 0 dataset.write
2021-04-16 12:38:23.125  INFO [main] Repro    : elapsed 0 time 24410 ms
2021-04-16 12:38:23.125  INFO [main] Repro    : before 1 dataset.write
2021-04-16 12:38:23.162 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:38:23.538 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 1 ms
2021-04-16 12:38:23.578 DEBUG [Executor task launch worker for task 0.0 in stage 1.0 (TID 1)] Repro    : createSession
2021-04-16 12:38:23.602  WARN [Executor task launch worker for task 0.0 in stage 1.0 (TID 1)] c.d.o.d.i.core.session.DefaultSession    : You have too many session instances: 5 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration)
2021-04-16 12:38:23.630 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=45879f62)]
2021-04-16 12:38:29.031 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:38:29.331 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 1 ms
2021-04-16 12:38:34.662  INFO [main] Repro    : after 1 dataset.write
2021-04-16 12:38:34.663  INFO [main] Repro    : elapsed 1 time 11538 ms
2021-04-16 12:38:34.663  INFO [main] Repro    : before 2 dataset.write
2021-04-16 12:38:34.685 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:38:35.059 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 907 us
2021-04-16 12:38:35.094 DEBUG [Executor task launch worker for task 0.0 in stage 2.0 (TID 2)] Repro    : createSession
2021-04-16 12:38:35.112  WARN [Executor task launch worker for task 0.0 in stage 2.0 (TID 2)] c.d.o.d.i.core.session.DefaultSession    : You have too many session instances: 6 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration)
2021-04-16 12:38:35.142 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=54a035b3)]
2021-04-16 12:38:40.532 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:38:40.831 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 1 ms
2021-04-16 12:38:46.163  INFO [main] Repro    : after 2 dataset.write
2021-04-16 12:38:46.164  INFO [main] Repro    : elapsed 2 time 11501 ms
2021-04-16 12:38:46.164  INFO [main] Repro    : before 3 dataset.write
2021-04-16 12:38:46.181 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:38:46.600 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 1 ms
2021-04-16 12:38:46.627 DEBUG [Executor task launch worker for task 0.0 in stage 3.0 (TID 3)] Repro    : createSession
2021-04-16 12:38:46.646  WARN [Executor task launch worker for task 0.0 in stage 3.0 (TID 3)] c.d.o.d.i.core.session.DefaultSession    : You have too many session instances: 7 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration)
2021-04-16 12:38:46.679 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=184b25)]
2021-04-16 12:38:52.055 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:38:52.359 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 1 ms
2021-04-16 12:38:57.678  INFO [main] Repro    : after 3 dataset.write
2021-04-16 12:38:57.678  INFO [main] Repro    : elapsed 3 time 11514 ms
2021-04-16 12:38:57.678  INFO [main] Repro    : before 4 dataset.write
2021-04-16 12:38:57.693 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:38:58.058 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 743 us
2021-04-16 12:38:58.086 DEBUG [Executor task launch worker for task 0.0 in stage 4.0 (TID 4)] Repro    : createSession
2021-04-16 12:38:58.106  WARN [Executor task launch worker for task 0.0 in stage 4.0 (TID 4)] c.d.o.d.i.core.session.DefaultSession    : You have too many session instances: 8 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration)
2021-04-16 12:38:58.131 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=7b2a85c4)]
2021-04-16 12:39:03.504 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:39:03.799 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 1 ms
2021-04-16 12:39:09.149  INFO [main] Repro    : after 4 dataset.write
2021-04-16 12:39:09.149  INFO [main] Repro    : elapsed 4 time 11471 ms
2021-04-16 12:39:09.149  INFO [main] Repro    : before 5 dataset.write
2021-04-16 12:39:09.166 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:39:09.570 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 490 us
2021-04-16 12:39:09.597 DEBUG [Executor task launch worker for task 0.0 in stage 5.0 (TID 5)] Repro    : createSession
2021-04-16 12:39:09.617  WARN [Executor task launch worker for task 0.0 in stage 5.0 (TID 5)] c.d.o.d.i.core.session.DefaultSession    : You have too many session instances: 9 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration)
2021-04-16 12:39:09.635 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=6f12276c)]
2021-04-16 12:39:15.020 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:39:15.330 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 1 ms
2021-04-16 12:39:20.633  INFO [main] Repro    : after 5 dataset.write
2021-04-16 12:39:20.633  INFO [main] Repro    : elapsed 5 time 11484 ms
2021-04-16 12:39:20.633  INFO [main] Repro    : before 6 dataset.write
2021-04-16 12:39:20.648 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:39:21.041 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 607 us
2021-04-16 12:39:21.061 DEBUG [Executor task launch worker for task 0.0 in stage 6.0 (TID 6)] Repro    : createSession
2021-04-16 12:39:21.078  WARN [Executor task launch worker for task 0.0 in stage 6.0 (TID 6)] c.d.o.d.i.core.session.DefaultSession    : You have too many session instances: 10 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration)
2021-04-16 12:39:21.095 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=72b6c62b)]
2021-04-16 12:39:26.485 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:39:26.779 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 1 ms
2021-04-16 12:39:32.098  INFO [main] Repro    : after 6 dataset.write
2021-04-16 12:39:32.098  INFO [main] Repro    : elapsed 6 time 11465 ms
2021-04-16 12:39:32.098  INFO [main] Repro    : before 7 dataset.write
2021-04-16 12:39:32.111 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:39:32.475 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 514 us
2021-04-16 12:39:32.500 DEBUG [Executor task launch worker for task 0.0 in stage 7.0 (TID 7)] Repro    : createSession
2021-04-16 12:39:32.513  WARN [Executor task launch worker for task 0.0 in stage 7.0 (TID 7)] c.d.o.d.i.core.session.DefaultSession    : You have too many session instances: 11 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration)
2021-04-16 12:39:32.527 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=c2802a4)]
2021-04-16 12:39:37.902 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:39:38.205 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 962 us
2021-04-16 12:39:43.534  INFO [main] Repro    : after 7 dataset.write
2021-04-16 12:39:43.534  INFO [main] Repro    : elapsed 7 time 11436 ms
2021-04-16 12:39:43.534  INFO [main] Repro    : before 8 dataset.write
2021-04-16 12:39:43.546 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:39:43.906 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 395 us
2021-04-16 12:39:43.928 DEBUG [Executor task launch worker for task 0.0 in stage 8.0 (TID 8)] Repro    : createSession
2021-04-16 12:39:43.943  WARN [Executor task launch worker for task 0.0 in stage 8.0 (TID 8)] c.d.o.d.i.core.session.DefaultSession    : You have too many session instances: 12 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration)
2021-04-16 12:39:43.963 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=406064e6)]
2021-04-16 12:39:49.348 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:39:49.651 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 898 us
2021-04-16 12:39:54.965  INFO [main] Repro    : after 8 dataset.write
2021-04-16 12:39:54.965  INFO [main] Repro    : elapsed 8 time 11431 ms
2021-04-16 12:39:54.965  INFO [main] Repro    : before 9 dataset.write
2021-04-16 12:39:54.979 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:39:55.342 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 321 us
2021-04-16 12:39:55.366 DEBUG [Executor task launch worker for task 0.0 in stage 9.0 (TID 9)] Repro    : createSession
2021-04-16 12:39:55.380  WARN [Executor task launch worker for task 0.0 in stage 9.0 (TID 9)] c.d.o.d.i.core.session.DefaultSession    : You have too many session instances: 13 active, expected less than 4 (see 'advanced.session-leak.threshold' in the configuration)
2021-04-16 12:39:55.399 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Adding initial contact points [Node(endPoint=/10.9.57.230:9042, hostId=null, hashCode=9b1d914)]
2021-04-16 12:40:00.837 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Starting schema refresh
2021-04-16 12:40:01.133 DEBUG [cassandra-admin-0] c.d.o.d.i.core.metadata.MetadataManager  : [cassandra] Applying schema refresh took 1 ms
2021-04-16 12:40:06.446  INFO [main] Repro    : after 9 dataset.write
2021-04-16 12:40:06.446  INFO [main] Repro    : elapsed 9 time 11481 ms


spark-cassandra-connector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

jaroslaw.grabowski_50515 avatar image
jaroslaw.grabowski_50515 answered ·

@clayj_175802 as posted in one of your previous questions, could you try with 3.0.1 and get back to us?

10 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Both 3.0.0 (with driver 4.9.0) and 3.0.1 (with driver 4.10.0) work with sample code above. And both invoke createSession for every spark executor task and do not close that session. Caching and reusing a single session in the factory works to mitigate both creation cost and leak, but we question whether this behavior and solution is preferred. See question for sample stack for calls in 3.0.1.

0 Likes 0 ·

Could you catch a stacktrace for session opening?

0 Likes 0 ·
clayj_175802 avatar image clayj_175802 jaroslaw.grabowski_50515 ·

It's added to the question, for 3.0.1.

0 Likes 0 ·
Show more comments
Erick Ramirez avatar image
Erick Ramirez answered ·

I don't have a quick answer for you yet but it would be really ideal if you could provide minimal sample code you used for v2.4.3 and the corresponding minimal code you attempted in v3.0. Cheers!

2 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Not sure how to attach code samples. This comment option is limited to 1000 characters. Should I "Write an Answer"?

0 Likes 0 ·

Similar to Stack Overflow, the idea is for you to update your question with the details since the comments section isn't intended to be used for that purpose. Cheers!

0 Likes 0 ·