Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

JJorczik avatar image
JJorczik asked jaroslaw.grabowski_50515 commented

Error when filtering Spark Cassandra table after parsing binary Avro column

When filtering on an Avro-parsed column of a Cassandra table that was read with SCC, it seems that some of the binary Avros are malformed.

org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.

However, this error only occurs in this specific setting, which makes me think that it is not about malformed Avro records. When collecting the Cassandra dataframe and creating a new dataframe to avoid SCC it works just fine. When running a limit clause before Avro-parsing with more elements than the dataframe contains (which should not change the result), it also works fine. When performing Avro-parsing in "PERMISSIVE"-mode, it seems to work, but multiple runs showed that the amount of rows in filtered dataframes is not consistent (count() returned different results for multiple runs with exactly the same setting). Do you have any idea what is going wrong here?

Steps to reproduce error:

  1. Read table from Cassandra (that contains binary Avro column) with SCC
  2. Filter table so that binary Avro column of all rows follow the same schema
  3. Parse binary Avro column with Spark-Avro in "FAILFAST"-mode
  4. Filter based on a column that was parsed in previous step
  5. Run Spark action such as count() on dataframe

High level code:

myTable = spark.read.format("org.apache.spark.sql.cassandra").options(table="myTable", keyspace="myKeyspace",directJoinSetting="on").load()
myTableFiltered = myTable.filter(myTable["schemaName"] == "mySchema")
jsonFormatSchema = open("/path/to/schema.json", 'r').read()
avroTable = myTableFiltered.withColumn("avroColumn",from_avro(data="binaryAvro", jsonFormatSchema=jsonFormatSchema,options={"mode": "FAILFAST"}))
avroTableFiltered = avroTable.filter(avroTable["avroColumn.value"].isNotNull())
print(avroTableFiltered.count())

#Workaround 1 before parsing Avro
#myTable = spark.createDataFrame(myTable.collect())

#Workaround 2 before parsing Avro
#myTable = spark.read.format("org.apache.spark.sql.cassandra").options(table="myTable", keyspace="myKeyspace",directJoinSetting="on").load().limit(1000)

Used versions:

  • SCC: 3.0.0
  • Spark: 3.0.0
  • Spark-Avro: 3.0.0
spark-cassandra-connector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

jaroslaw.grabowski_50515 avatar image
jaroslaw.grabowski_50515 answered jaroslaw.grabowski_50515 commented

I don't think this is a SCC related problem. Try counting `myTableFiltered` it should always give you the same result (assuming no one changes the underlying data).

Once you confirm this, look into your avro records, probably some of them are broken.

2 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Counting 'myTableFiltered' returns the same number always as expected. If this problem is not related with SCC, how do you explain that it works when running 'myTable = spark.createDataFrame(myTable.collect())' before Avro parsing? If some Avro records are malformed, this workaround should not change anything, right?

0 Likes 0 ·

I can't explain your code but I can help debug it if you provide a runnable code, cassandra schema and data inserts.

0 Likes 0 ·