Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

clayj_175802 avatar image
clayj_175802 asked ·

Why are we seeing repeated metadata refresh with the Spark connector v3?

Hi,

We're trying to upgrade an existing application to spark 3.x using latest spark (3.0.1), datastax driver (4.9.0), and spark cassandra connector (3.0.0). Functionally everything is working, but we are seeing significant performance degradation using dataset.write to cassandra. Enabling datastax DEBUG logging, we see repeated metadata refresh occurring. For example, the following log statements:

Starting schema refresh
...
Checking schema agreement
...
Schema agreement reached (048f6a87-8c9a-3217-a55f-6160c29502a1), completing

repeats 36 times for a single dataset.write with a single row. As a result, operations that were sub-second now take 10+ seconds. Same behavior with both DSV1 and DSV2 approaches.

Running out of ideas what could be causing the excessive metadata.

Would appreciate any ideas/suggestions.

[UPDATE] Ours is a springboot java application. The following component encapsulates a representative dataset.write. This example performs 4 metadata refreshes (rather than 36). I don't know what factor(s) affect the number of repetition:

import java.io.Serializable;
import java.util.Arrays;
import javax.annotation.PostConstruct;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.springframework.stereotype.Component;
import com.datastax.spark.connector.cql.CassandraConnectorConf;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
 
@Component
@Slf4j
public class Repro {
 
  // create table if not exists expctr.repro (key text, value text, primary key (key))
 
  @Data
  public static class Dao implements Serializable {
    private static final long serialVersionUID = 1L;
    public String key;
    public String value;
  }
 
  @PostConstruct
  public void init() {
 
    String host = "10.18.51.105";
    String keyspace = "expctr";
    String table = "repro";
 
    SparkSession sparkSession =
        SparkSession.builder()
            .appName("expctr")
            .master("local[*]")
            .config(CassandraConnectorConf.ConnectionHostParam().name(), host)
            .config(CassandraConnectorConf.ConnectionPortParam().name(), "9042")
            // .config(CassandraConnectorConf.KeepAliveMillisParam().name(), "3600000")
            // .config("spark.sql.catalog.mycluster", "com.datastax.spark.connector.datasource.CassandraCatalog")
            // .config("spark.sql.defaultCatalog", "mycluster")
            // .withExtensions(new CassandraSparkExtensions())
            .getOrCreate();
 
    Dao dao = new Dao();
    dao.key = "key1";
    dao.value = "value1";
 
    Dataset<Row> dataset = sparkSession.createDataFrame(Arrays.asList(dao), Dao.class);
 
    long t = System.currentTimeMillis();
 
    log.info("before dataset.write");
 
    dataset
        .write()
        .mode(SaveMode.Append)
        .format("org.apache.spark.sql.cassandra")
        .option("keyspace", keyspace)
        .option("table", table)
        .save();
 
    log.info("after dataset.write");
 
    log.info("elapsed time {} ms", System.currentTimeMillis() - t);
  }
}
spark-cassandra-connector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

jaroslaw.grabowski_50515 avatar image
jaroslaw.grabowski_50515 answered ·

Hi, do you have repro steps for this issue?

5 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

[Reply posted in original question]

0 Likes 0 · ·

From which version of the connector do you upgrade?

This certainly looks like something that may be improved. Could you create a jira? https://datastax-oss.atlassian.net/jira/software/c/projects/SPARKC/issues

0 Likes 0 · ·
clayj_175802 avatar image clayj_175802 jaroslaw.grabowski_50515 ·

Prior to attempted spark 3.x upgrade, our product has been using versions spark 2.4.5, datastax driver 4.4.0, and spark cassandra connector 2.4.3. With those versions, there is no datastax DEBUG logging seen during the dataset.write operation (between the "before/after" log events.)

0 Likes 0 · ·
Show more comments