question

clayj_175802 avatar image
clayj_175802 asked clayj_175802 commented

Why are we seeing repeated metadata refresh with the Spark connector v3?

Hi,

We're trying to upgrade an existing application to spark 3.x using latest spark (3.0.1), datastax driver (4.9.0), and spark cassandra connector (3.0.0). Functionally everything is working, but we are seeing significant performance degradation using dataset.write to cassandra. Enabling datastax DEBUG logging, we see repeated metadata refresh occurring. For example, the following log statements:

Starting schema refresh
...
Checking schema agreement
...
Schema agreement reached (048f6a87-8c9a-3217-a55f-6160c29502a1), completing

repeats 36 times for a single dataset.write with a single row. As a result, operations that were sub-second now take 10+ seconds. Same behavior with both DSV1 and DSV2 approaches.

Running out of ideas what could be causing the excessive metadata.

Would appreciate any ideas/suggestions.

[UPDATE] Ours is a springboot java application. The following component encapsulates a representative dataset.write. This example performs 4 metadata refreshes (rather than 36). I don't know what factor(s) affect the number of repetition:

import java.io.Serializable;
import java.util.Arrays;
import javax.annotation.PostConstruct;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.springframework.stereotype.Component;
import com.datastax.spark.connector.cql.CassandraConnectorConf;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
 
@Component
@Slf4j
public class Repro {
 
  // create table if not exists expctr.repro (key text, value text, primary key (key))
 
  @Data
  public static class Dao implements Serializable {
    private static final long serialVersionUID = 1L;
    public String key;
    public String value;
  }
 
  @PostConstruct
  public void init() {
 
    String host = "10.18.51.105";
    String keyspace = "expctr";
    String table = "repro";
 
    SparkSession sparkSession =
        SparkSession.builder()
            .appName("expctr")
            .master("local[*]")
            .config(CassandraConnectorConf.ConnectionHostParam().name(), host)
            .config(CassandraConnectorConf.ConnectionPortParam().name(), "9042")
            // .config(CassandraConnectorConf.KeepAliveMillisParam().name(), "3600000")
            // .config("spark.sql.catalog.mycluster", "com.datastax.spark.connector.datasource.CassandraCatalog")
            // .config("spark.sql.defaultCatalog", "mycluster")
            // .withExtensions(new CassandraSparkExtensions())
            .getOrCreate();
 
    Dao dao = new Dao();
    dao.key = "key1";
    dao.value = "value1";
 
    Dataset<Row> dataset = sparkSession.createDataFrame(Arrays.asList(dao), Dao.class);
 
    long t = System.currentTimeMillis();
 
    log.info("before dataset.write");
 
    dataset
        .write()
        .mode(SaveMode.Append)
        .format("org.apache.spark.sql.cassandra")
        .option("keyspace", keyspace)
        .option("table", table)
        .save();
 
    log.info("after dataset.write");
 
    log.info("elapsed time {} ms", System.currentTimeMillis() - t);
  }
}
spark-cassandra-connector
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

jaroslaw.grabowski_50515 avatar image
jaroslaw.grabowski_50515 answered clayj_175802 commented

Hi, do you have repro steps for this issue?

11 comments Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

clayj_175802 avatar image clayj_175802 commented ·

[Reply posted in original question]

0 Likes 0 ·
jaroslaw.grabowski_50515 avatar image jaroslaw.grabowski_50515 ♦ clayj_175802 commented ·

From which version of the connector do you upgrade?

This certainly looks like something that may be improved. Could you create a jira? https://datastax-oss.atlassian.net/jira/software/c/projects/SPARKC/issues

0 Likes 0 ·
clayj_175802 avatar image clayj_175802 jaroslaw.grabowski_50515 ♦ commented ·

Prior to attempted spark 3.x upgrade, our product has been using versions spark 2.4.5, datastax driver 4.4.0, and spark cassandra connector 2.4.3. With those versions, there is no datastax DEBUG logging seen during the dataset.write operation (between the "before/after" log events.)

0 Likes 0 ·
Show more comments
clayj_175802 avatar image clayj_175802 commented ·

Any further insights regarding this issue? Has anyone else seen/reported this? Thanks.

0 Likes 0 ·
jaroslaw.grabowski_50515 avatar image jaroslaw.grabowski_50515 ♦ clayj_175802 commented ·

Could you please see if the proposed fix (https://datastax-oss.atlassian.net/browse/SPARKC-633) helps in your case?

0 Likes 0 ·
clayj_175802 avatar image clayj_175802 commented ·

It appears this behavior was introduced in connector 2.5.0. As mentioned below, prior to attempted spark 3 upgrade, our product was (is) using versions spark 2.4.5, driver 4.4.0, and connector 2.4.3. With that combination, the Repro test shows no schema refreshes using the dataset.write. Changing only the connector to 2.5.0 (or later) introduces the refreshes.

0 Likes 0 ·
jaroslaw.grabowski_50515 avatar image jaroslaw.grabowski_50515 ♦ clayj_175802 commented ·

It's probable as 2.5.x has a new Java Driver. We don't have an answer for you yet. I'll get back to you as soon as we have something interesting.

0 Likes 0 ·
jaroslaw.grabowski_50515 avatar image jaroslaw.grabowski_50515 ♦ clayj_175802 commented ·

@clayj_175802 recently 3.0.1 was released, could you try this version? It contains a couple of fixes that removed unwanted sessions reopening.

0 Likes 0 ·
clayj_175802 avatar image clayj_175802 commented ·

Fixed in 3.0.1

0 Likes 0 ·