question

fwy_187020 avatar image
fwy_187020 asked fwy_187020 commented

Can't get pushdown filters to work with Java API

I am a relative newcomer to development using the Spark Cassandra Connector, using the Java API to read large datasets into Spark from a Cassandra table. In the queries I have tested so far, the predicate pushdown feature of SCC is not working.

I have read the documentation of the feature here, and am using similar code: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md Although the simple filters I am using are within the list of predicate pushdown restrictions at the bottom of the doc, the org.apache.spark.sql.cassandra log entries and Spark UI pages show no evidence that any filters are being pushed down to Cassandra.

I see that the Scala code to read the dataset looks like this:

val df = spark
  .read
  .cassandraFormat("pushdownexample", "pushdowns")
  .load()

I don't see an equivalent cassandraFormat method accessible in the Java API, but based on reading its Scala source I tried code like the following in Java.

Dataset<Row> df = spark
  .read()
  .format("org.apache.spark.sql.cassandra")
  .option("keyspace", "pushdowns")
  .option("table", "pushdownexample")
  .option("pushdownEnable", true)
  .load();

(I read that pushdown is enabled by default but tried the "pushdownEnable" option anyway.)

Assuming that I haven't made a stupid coding error elsewhere, should this approach work? Any suggestions on debugging it? I am using the Dataset.explain method in my code in combination with Spark UI SQL and job details but can't find any clues. These details show that the entire table is being scanned in read into Spark before any filters are applied.

sparkconnector
1 comment
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Erick Ramirez avatar image Erick Ramirez ♦♦ commented ·

@fwy_187020 Just acknowledging your question. Let me see if I can catch any of the engineers in the East coast. Cheers!

0 Likes 0 ·

1 Answer

Russell Spitzer avatar image
Russell Spitzer answered fwy_187020 commented

You are correct that pushdowns are on by default. From your code everything looks correct so what we really need is to look at the explain output of your DataFrame. This will let us know exactly what predicates Spark Constructed, and whether or not it believed they could be pushed.

Usually the reason why a predicate isn't pushdown down when it should be, is that the types are mismatched. Ie Spark believes that it needs to cast the type of the predicate to something matching the column. This breaks the pushdown mechanism within spark.

See
http://www.russellspitzer.com/2016/04/18/Catalyst-Debugging/
For more details on debugging and this issue

7 comments Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

fwy_187020 avatar image fwy_187020 commented ·

Thanks for the prompt reply and the link to your blog page.

After more debugging, I found that the problem is caused by code in the application I am testing that automatically applies the Spark lower() function to the filtered column before retrieving the data from Cassandra, in order to perform case insensitive compares against the data. The filter is pushed down correctly when the lower() function is not used. This kind of case isn't mentioned in the list of pushdown filter limitations, but probably should be.

0 Likes 0 ·
Russell Spitzer avatar image Russell Spitzer fwy_187020 commented ·

"cast" is just an example. Any function breaks Spark's ability to pushdown a predicate because it only allows for direct predicate on column pushdowns. There isn't really a way around this (yet) without rewriting the optimizer rules themselves.

1) Probably? But you most likely would not be able to push it down throught the SparkSQL api. Spark basically has no way of handling non spark udf's (especially for pushdown) in Datasource V1

2. This would most likely be impossible. Basically the problem is you are too far into the optimization process at the time that datasource pushdown rules get to be applied and Spark will have already failed your analysis. You can see the code in the SCC for dealing with TTL and Writetime custom functions for an example of how we can get around this with some difficulty. DSV2 should make this easier.

The debugging level is the same for both Java and Scala but the default level changed since I wrote the blogpost.

0 Likes 0 ·
Russell Spitzer avatar image Russell Spitzer Russell Spitzer commented ·
0 Likes 0 ·
Show more comments
fwy_187020 avatar image fwy_187020 commented ·

That brings up other questions... I see that Cassandra does not provide UPPER or LOWER functions, or support case insensitive behavior in general. We could add extra upper/lower-cased columns to our Cassandra tables to work around this limitation, but is there a more general solution? For example, is it feasible to:

1. Implement Cassandra user-defined functions for UPPER/LOWER; and

2. Use the SCC property "sql.pushdown.additionalClasses" and implement classes that would push upper/lower-cased columns down to Cassandra and these UDFs?

If this makes sense, please pass along any examples that might help. In a quick search, I didn't turn up much.

BTW, a quick debugging note about the Java API that differs from the Scala examples: the pushdown filters are only logged by the explain() and show() methods if the log level for org.apache.spark.sql.cassandra is set to DEBUG, not INFO as with Scala.

0 Likes 0 ·