I am trying to read records from Cassandra using Spark Cassandra Connector 2.5 in Scala where my UDT is null and convert the record to POJO. I am able to read primitive data types and convert them to POJO but reading and converting UDT (which is null) is giving me TypeConversionException
I have the following POJO's
import com.datastax.oss.driver.api.mapper.annotations.Entity; import com.datastax.oss.driver.api.mapper.annotations.CqlName; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.Singular; @Data @Builder(toBuilder = true) @NoArgsConstructor @AllArgsConstructor @Entity public class Employee implements Serializable { @CqlName("name") private String name; @CqlName("address") private Address address; } @Data @Builder(toBuilder = true) @NoArgsConstructor @AllArgsConstructor @Entity public class Address implements Serializable { @CqlName("house_no") private String houseNo; @CqlName("street") private String street;
Scala code to read the records from Cassandra
implicit object UDTValueToAddressConverter extends TypeConverter[Address] { val AddressTypeTag = typeTag[Address] def targetTypeTag = AddressTypeTag def convertPF() = { case _ => Address.builder().build() } } TypeConverter.registerConverter(UDTValueToAddressConverter) implicit val cm = new JavaBeanColumnMapper[Employee] val employees : CassandraTableScanRDD[Employee] = spark.sparkContext.cassandraTable("Sample_KS","employee_records") employees.select("name", "address").take(10).foreach((x : Employee) => println (x))
Error
com.datastax.spark.connector.types.TypeConversionException: Failed to convert column address of Sample_KS.employee_records to com.dss.pojos.Address: null at com.datastax.spark.connector.mapper.GettableDataToMappedTypeConverter.tryConvert(GettableDataToMappedTypeConverter.scala:134) at com.datastax.spark.connector.mapper.GettableDataToMappedTypeConverter.convertedColumnValue(GettableDataToMappedTypeConverter.scala:160) at com.datastax.spark.connector.mapper.GettableDataToMappedTypeConverter.com$datastax$spark$connector$mapper$GettableDataToMappedTypeConverter$$setterParamValue(GettableDataToMappedTypeConverter.scala:209) at com.datastax.spark.connector.mapper.GettableDataToMappedTypeConverter$$anonfun$com$datastax$spark$connector$mapper$GettableDataToMappedTypeConverter$$invokeSetters$2.apply(GettableDataToMappedTypeConverter.scala:249) at com.datastax.spark.connector.mapper.GettableDataToMappedTypeConverter$$anonfun$com$datastax$spark$connector$mapper$GettableDataToMappedTypeConverter$$invokeSetters$2.apply(GettableDataToMappedTypeConverter.scala:248) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at com.datastax.spark.connector.mapper.GettableDataToMappedTypeConverter.com$datastax$spark$connector$mapper$GettableDataToMappedTypeConverter$$invokeSetters(GettableDataToMappedTypeConverter.scala:248) at com.datastax.spark.connector.mapper.GettableDataToMappedTypeConverter$$anonfun$convertPF$1.applyOrElse(GettableDataToMappedTypeConverter.scala:262) at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:44) at com.datastax.spark.connector.mapper.GettableDataToMappedTypeConverter.convert(GettableDataToMappedTypeConverter.scala:19) at com.datastax.spark.connector.rdd.reader.ClassBasedRowReader.read(ClassBasedRowReader.scala:33) at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$15.apply(CassandraTableScanRDD.scala:345) at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$15.apply(CassandraTableScanRDD.scala:345) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$12.next(Iterator.scala:445) at com.datastax.spark.connector.util.CountingIterator.next(CountingIterator.scala:16) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at com.datastax.spark.connector.util.CountingIterator.foreach(CountingIterator.scala:4) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)