Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

Ryan Quey avatar image
Ryan Quey asked ·

Can I specify optional fields in my Kafka connector mapping when using Datastax Kafka connector?

When running Datastax Kafka Connector, I am getting a lot of errors that look like this:

Required field 'value.my_field' (mapped to column my_field) was missing from record (or may refer to an invalid function). Please remove it from the mapping. (com.datastax.oss.kafka.sink.CassandraSinkTask)

It looks like the solution is given in the official docs here: I just have to remove the field. Is it possible to make the field optional though, so that Kafka messages that do have that field can set that column to that C* record, and records that don't just set the C* column to null?

It seems like it should be possible but unsure about how I would go about doing that.

I do have hope though since at least when using Structs, it appears to be possible to allow for optional fields if the example shown here is correct.

kafka-connector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

Tomasz Lelek avatar image
Tomasz Lelek answered ·

You can make the fields optional in the connector.

For example, if you will define the following mapping:

topic.my_topic.my_ks.my_table.mapping = bigintcol=value.bigint, intcol=value.int

and having the record with such a schema:

Schema schema =
        SchemaBuilder.struct()
            .name("Kafka")
            .field("bigint", Schema.OPTIONAL_INT64_SCHEMA)
            .field("int", Schema.OPTIONAL_INT32_SCHEMA)
            .build();

you can leave optional fields unset; you can send, for example, a Kafka record that thas only one field set:

Struct value = new Struct(schema).put("bigint", 1000L);

As a result, when retrieving data from my_ks.my_table, you will see that only value for bigintcol is inserted.

2 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

I see, so in order to mark it as optional in the connector, you just mark it as optional in the schema?

Also, is this documented somewhere so I can read more?

0 Likes 0 · ·

Yes, you need to make it optional in the Kafka event schema definition.

Unfortunately, we don't have this feature documented, but I will make sure that this will be documented in the next kafka-sink release.

If you are interested in the working example, you can inspect this integration test:

https://github.com/datastax/kafka-sink/blob/5ec53eb1dfe82b75a6659773530d2aff60f8cd2a/sink/src/it/java/com/datastax/oss/kafka/sink/ccm/StructEndToEndCCMIT.java#L346-L376

1 Like 1 · ·