When filtering on an Avro-parsed column of a Cassandra table that was read with SCC, it seems that some of the binary Avros are malformed.
org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
However, this error only occurs in this specific setting, which makes me think that it is not about malformed Avro records. When collecting the Cassandra dataframe and creating a new dataframe to avoid SCC it works just fine. When running a limit clause before Avro-parsing with more elements than the dataframe contains (which should not change the result), it also works fine. When performing Avro-parsing in "PERMISSIVE"-mode, it seems to work, but multiple runs showed that the amount of rows in filtered dataframes is not consistent (count() returned different results for multiple runs with exactly the same setting). Do you have any idea what is going wrong here?
Steps to reproduce error:
- Read table from Cassandra (that contains binary Avro column) with SCC
- Filter table so that binary Avro column of all rows follow the same schema
- Parse binary Avro column with Spark-Avro in "FAILFAST"-mode
- Filter based on a column that was parsed in previous step
- Run Spark action such as count() on dataframe
High level code:
myTable = spark.read.format("org.apache.spark.sql.cassandra").options(table="myTable", keyspace="myKeyspace",directJoinSetting="on").load() myTableFiltered = myTable.filter(myTable["schemaName"] == "mySchema") jsonFormatSchema = open("/path/to/schema.json", 'r').read() avroTable = myTableFiltered.withColumn("avroColumn",from_avro(data="binaryAvro", jsonFormatSchema=jsonFormatSchema,options={"mode": "FAILFAST"})) avroTableFiltered = avroTable.filter(avroTable["avroColumn.value"].isNotNull()) print(avroTableFiltered.count()) #Workaround 1 before parsing Avro #myTable = spark.createDataFrame(myTable.collect()) #Workaround 2 before parsing Avro #myTable = spark.read.format("org.apache.spark.sql.cassandra").options(table="myTable", keyspace="myKeyspace",directJoinSetting="on").load().limit(1000)
Used versions:
- SCC: 3.0.0
- Spark: 3.0.0
- Spark-Avro: 3.0.0