question

DataBroker avatar image
DataBroker asked DataBroker commented

SparkSQL LIMIT predicate does not limit returned rows

Hello Team,

I want to retreive only 10000 rows of data from a Cassandra table :

val spark = SparkSession.builder
  .master(sparkMasterUrl)
  .config("spark.sql.catalog.cassandra", "com.datastax.spark.connector.datasource.CassandraCatalog")
  .config("spark.sql.catalog.casscatalog", "com.datastax.spark.connector.datasource.CassandraCatalog")
  .config("spark.cassandra.connection.host", cassandraHosts)
  .config("spark.cassandra.connection.port", cassandraConnectPort)
  .config("spark.cassandra.auth.username", cassandraUser)
  .config("spark.cassandra.auth.password", cassandraPassword)
  .config("spark.cassandra.input.consistency.level", "ONE")
  .config("spark.cassandra.concurrent.reads", 128)
  .withExtensions(new com.datastax.spark.connector.CassandraSparkExtensions)
  .getOrCreate()

val inputDf = spark.sql("select col1, col2 from casscatalog.MyKeySpace.MyTable limit 10000")
inputDf.write.format("csv").save("/output/file/path/")

But, I don't understand why spark does a full scan of the table (50M of rows) instead of just writing the first read 10000 rows.

spark:3.0.2, spark-cassandra-connector:3.0.1, scala:2.12

Thanks a lot !

spark-cassandra-connector
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

jaroslaw.grabowski_50515 avatar image
jaroslaw.grabowski_50515 answered DataBroker commented

SCC should read <limit> of rows for every partition and apply global <limit> afterwards.

Are you sure that all 50M of rows is read? Could you experiment with smaller values that could be easier to work with, .e.g limit 10? We should get <num_partitons> * 10 rows read.

1 comment Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

DataBroker avatar image DataBroker commented ·

Thanks for your reply @jaroslaw.grabowski_50515

I launched a job with limit 2, and it took almost 40 minutes to write successfully the output file (with 2 rows).

The cassandra table was created with a primary key (col1) and there is no clustering key. So, the limit 2 is applied to each of the rows, am I right ?

(In thiscase, partition key = primary key)


Is it a way to simply read 2 lines and write it without scanning all the table ?

Because when I do a show(2), it is very fast !

Cheers

0 Likes 0 ·