question

taakkon avatar image
taakkon asked taakkon commented

CodecNotFoundException when writing to a blob field inside a UDT using Spark connector

I get CodecNotFoundException when writing to a blob field inside a UDT using Spark 3.1.2, spark-cassandra-connector 3.1.0 and Cassandra 3.11.10.

I have a UDT:

CREATE TYPE testks.bar (
    bin blob
);

And a table:

CREATE TABLE testks.foo (
    pk text PRIMARY KEY,
    b frozen<bar>
)

When I try inserting a new row to foo with non null data for field bin inside UDT bar using spark-cassandra-connector 3.1.0 from spark-shell (3.1.2) write fails and I get exception:
com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException: Codec not found for requested operation: [BLOB <-> java.nio.HeapByteBuffer]

I tried using RDD and Dataset APIs and got the same exception with both.

If I use table:

CREATE TABLE testks.foo2 (
    pk text PRIMARY KEY,
    bin blob
)

I can insert rows with no issues. Reading data to Spark from foo and foo2 also works.

It seems strange that codec BLOB <-> java.nio.HeapByteBuffer can't be found when writing to blob inside UDT since I'd assume the same codec is used when reading blobs and writing to blob field that is not inside a UDT. Any ideas?

spark-cassandra-connector
4 comments
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

There's now a Jira issue about this: https://datastax-oss.atlassian.net/browse/SPARKC-673

Thanks to imarkiew for filing it!

1 Like 1 ·

I tried writing to blob inside an UDT with spark 2.4.6, spark-cassandra-connector 2.4.3 and cassandra 3.11.10 and it worked:

case class Bar(bin: Array[Byte])
case class Foo(pk: String, b: Bar)
val foo = Foo("somePk", Bar("Bar".getBytes))
val ds = Seq(foo).toDS()
ds.write.cassandraFormat("foo", "testks").mode(SaveMode.Append).save()

This is starting to look like a regression bug, or am I missing something here?

0 Likes 0 ·
@Erick Ramirez Can you confirm this is a bug, please? If this works as expected and won't be fixed, I'll need to change my schema and maybe use a base64 encoded string instead of a blob, but I'd rather not do that. Currently I'm stuck with this and it's a blocker for development.
0 Likes 0 ·
Any estimate when this bug will be fixed?
0 Likes 0 ·

1 Answer

Erick Ramirez avatar image
Erick Ramirez answered taakkon commented

You will need to create a class which has a field for every element in the UDT so they can be mapped to rows.

See the Mapper section of the connector documentation for details. Cheers!

4 comments Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Thanks for taking the time to answer! I thought I had correct mappings, but maybe I made a mistake somewhere. Could you help me with the mapping, please?

Writing to table foo throws exception:

case class Bar(bin: Array[Byte])
case class Foo(pk: String, b: Bar)
val foo = Foo("somePk", Bar("Bar".getBytes))
val ds = spark.createDataset(Seq(foo))
ds.writeTo("cat.testks.foo").append 
21/12/08 08:38:11 ERROR Utils: Aborting task
com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException: Codec not found for requested operation: [BLOB <-> java.nio.HeapByteBuffer]
at com.datastax.oss.driver.internal.core.type.codec.registry.CachingCodecRegistry.createCodec(CachingCodecRegistry.java:609)
... 

Writing to table foo2 works:

case class Foo2(pk: String, bin: Array[Byte])
val foo2 = Foo2("somePk", "Bar".getBytes)
val ds2 = spark.createDataset(Seq(foo2))
ds2.writeTo("cat.testks.foo2").append
0 Likes 0 ·

@Erick Ramirez

com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException: Codec not found for requested operation: [BLOB <-> java.nio.HeapByteBuffer]

I have the same issue, it seems to be a bug. I'm working on a project where this is a blocker.

1 Like 1 ·

In my case the situation happens while writing a Dataframe.

1 Like 1 ·
Show more comments