PLANNED MAINTENANCE

Hello, DataStax Community!

We want to make you aware of a few operational updates which will be carried out on the site. We are working hard to streamline the login process to integrate with other DataStax resources. As such, you will soon be prompted to update your password. Please note that your username will remain the same.

As we work to improve your user experience, please be aware that login to the DataStax Community will be unavailable for a few hours on:

  • Wednesday, July 15 16:00 PDT | 19:00 EDT | 20:00 BRT
  • Thursday, July 16 00:00 BST | 01:00 CEST | 04:30 IST | 07:00 CST | 09:00 AEST

For more info, check out the FAQ page. Thank you for being a valued member of our community.


question

aashish.aiesec_188335 avatar image
aashish.aiesec_188335 asked ·

Can I get the number of records without calling an action on RDD?

While writing to Cassandra table, I get below information on number of rows written and time taken. From log, I can see it is coming from TableWriter class. How can I find same information while reading from Cassandra without calling an action on RDD? I am not sure which method is use to read.

2020-04-20 11:58:42 INFO  com.datastax.spark.connector.writer.TableWriter.logInfo:35 - Wrote 24 rows to my_keyspace.mytable in 0.153 s.


Code to write spark dataframe to Cassandra table

myDF.write
  .format("org.apache.spark.sql.cassandra")
  .mode(saveMode)
  .options(Map("keyspace" -> "my_keyspace", "table" -> "my_table"))
  .save()

Code to read Cassandra table into spark RDD

val cassandraRDD = sparkSession.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map( "table" -> "my_table", "keyspace" -> "my_keyspace", "pushdown" -> "true"))
      .load()
sparkconnector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Russell Spitzer avatar image
Russell Spitzer answered ·

No. Actions refer to anything which actually does work on the datasource. There is no way to perform work without calling an action by definition.

Basically you are asking if you can count without counting, and you cannot do that :)

If you are looking for the number of records which are coming out you can see

https://github.com/datastax/spark-cassandra-connector/blob/master/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala#L368-L375

Which logs the number of records and as well reports these metrics via Codahale for more info
check out

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/11_metrics.md



7 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Thank you. This is exactly what I was looking for. So number of rows and time took to read them is logged at the time of read as well. But the difference is that at the time of read it is logged with log level of DEBUG and while writing it is logged with log level INFO.

Is there any particular reason for above differencr?

0 Likes 0 · ·
Russell Spitzer avatar image Russell Spitzer aashish.aiesec_188335 ·

No meaningful one, just the normal artifacts of software development by many authors

0 Likes 0 · ·

Got it. But something is different with com.datastax.spark.connector.rdd.CassandraTableScanRDD which I am not able to figure out. I changed the log level of spark context to DEBUG. When run on my local Intellij, I can see below logs.

2020-04-21 11:19:57 DEBUG com.datastax.driver.core.ControlConnection.tryConnect:275 - [Control connection] Refreshing schema
2020-04-21 11:09:27 DEBUG com.datastax.spark.connector.cql.Schema.logDebug:39 - Retrieving database schema from cluster mycluster mycluster cluster.
2020-04-21 11:09:37 DEBUG com.datastax.spark.connector.rdd.CassandraTableScanRDD.logDebug:58 - Fetched 169 rows from mykeyspace.qtl_oden_pl for partition 17 in 0.431 s.

I am interested on 3rd log here which tells the rows and time it took. But when run on server, I get 1st and 2nd debug message but the ones which are generated by CassandraTableScanRDD are missing. Do you know what could be the cause or a few pointers to look at?

0 Likes 0 · ·
Show more comments
Erick Ramirez avatar image
Erick Ramirez answered ·

@aashish.aiesec_188335 Spark does lazy evaluation so unless you call an action, no execution happens and so there's no result.

If you would like to get the number of rows, you can call cassandraCount() from a Cassandra-based RDD. For more information, see the documentation on Counting rows. Cheers!

2 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Thanks for your response. But I am not able to understand


1. the reason that this task level number of records and duration metric is logged in Tablewriter but not in reader.


2. Is it possible to modify the source code to log this information in reader too? And which class I should be looking at to do that?




0 Likes 0 · ·

Also, one thing to clarify - I am reading data from cassandra by multiple lookups(joinwithCassandraTable) transformations and calling an action on final dataframe. My requirement here is to understand number of records per task it read in each lookup.

So yes, I am calling an action but that is on final dataframe. Is it possible to achieve the information I need without performing additional actions?

0 Likes 0 · ·