Hi,
We're trying to upgrade an existing application to spark 3.x using latest spark (3.0.1), datastax driver (4.9.0), and spark cassandra connector (3.0.0). Functionally everything is working, but we are seeing significant performance degradation using dataset.write to cassandra. Enabling datastax DEBUG logging, we see repeated metadata refresh occurring. For example, the following log statements:
Starting schema refresh ... Checking schema agreement ... Schema agreement reached (048f6a87-8c9a-3217-a55f-6160c29502a1), completing
repeats 36 times for a single dataset.write with a single row. As a result, operations that were sub-second now take 10+ seconds. Same behavior with both DSV1 and DSV2 approaches.
Running out of ideas what could be causing the excessive metadata.
Would appreciate any ideas/suggestions.
[UPDATE] Ours is a springboot java application. The following component encapsulates a representative dataset.write. This example performs 4 metadata refreshes (rather than 36). I don't know what factor(s) affect the number of repetition:
import java.io.Serializable; import java.util.Arrays; import javax.annotation.PostConstruct; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.springframework.stereotype.Component; import com.datastax.spark.connector.cql.CassandraConnectorConf; import lombok.Data; import lombok.extern.slf4j.Slf4j; @Component @Slf4j public class Repro { // create table if not exists expctr.repro (key text, value text, primary key (key)) @Data public static class Dao implements Serializable { private static final long serialVersionUID = 1L; public String key; public String value; } @PostConstruct public void init() { String host = "10.18.51.105"; String keyspace = "expctr"; String table = "repro"; SparkSession sparkSession = SparkSession.builder() .appName("expctr") .master("local[*]") .config(CassandraConnectorConf.ConnectionHostParam().name(), host) .config(CassandraConnectorConf.ConnectionPortParam().name(), "9042") // .config(CassandraConnectorConf.KeepAliveMillisParam().name(), "3600000") // .config("spark.sql.catalog.mycluster", "com.datastax.spark.connector.datasource.CassandraCatalog") // .config("spark.sql.defaultCatalog", "mycluster") // .withExtensions(new CassandraSparkExtensions()) .getOrCreate(); Dao dao = new Dao(); dao.key = "key1"; dao.value = "value1"; Dataset<Row> dataset = sparkSession.createDataFrame(Arrays.asList(dao), Dao.class); long t = System.currentTimeMillis(); log.info("before dataset.write"); dataset .write() .mode(SaveMode.Append) .format("org.apache.spark.sql.cassandra") .option("keyspace", keyspace) .option("table", table) .save(); log.info("after dataset.write"); log.info("elapsed time {} ms", System.currentTimeMillis() - t); } }