Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

manosso_194091 avatar image
manosso_194091 asked ·

How do I map JSON nested field for the Kafka connector?

I'm trying to parse a nested field of JSON messages into columns in cassandra.

JSON messages look like this:

{
  "table":"CUSTOMER_ORDER",
  "op_type":"I",
  "op_ts":"2021-05-13 10:35:41.000337",
  "current_ts":"2021-05-26T22:24:20.227000",
  "pos":"00000000000000002478",
  "after":
  {
    "ID":"9",
    "CODE":"AAAA09",
    "CREATED":"2021-05-13 10:35:37",
    "STATUS":"DELIVERED",
    "UPDATE_TIME":"2021-05-13 10:35:37.021298000"
  }
}

I did it using UDT for "after" record at all, but when i trying map column by column, didn't work.

I add the following mapping fields to topic in configuration file:

- So it works (table with UDT in Cassandra):

topic.CUSTOMER_ORDER.eshop.cust_ord2.mapping:
  r_table=value.table,
  op_type=value.op_type,
  op_ts=key.op_ts,
  current_ts=value.current_ts,
  pos=key.pos,
  after=value.after

- like that, it doesn't work (table withot UDT in Cassandra):

topic.CUSTOMER_ORDER.eshop.cust_ord7.mapping:
  r_table=value.table,
  op_type=value.op_type,
  op_ts=value.op_ts,
  current_ts=value.current_ts,
  pos=value.pos,
  ID=value.after.ID,
  CODE=value.after.CODE,
  CREATED=value.after.CREATED,
  STATUS=value.after.STATUS,
  UPDATE_TIME=value.after.UPDATE_TIME

I tried to put in the sink connector configuration file, the following settings:

transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=.

But when I start the consumer and it tries to read the first message, I get the following error:

Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [flattening], found: java.lang.String

Has anyone been through something like that? Can anyone help me with this issue?

kafka-connector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

Erick Ramirez avatar image
Erick Ramirez answered ·

You won't be able to map column-by-column because fields like value.after.ID and value.after.CODE are not available when they are nested in the source. Since the nested fields are not sent to the connector, there's nothing to "flatten".

The value.after field exists and gets mapped to the UDT column. The nested fields in value.after are transformed and get flattened.

If you want the nested fields in value.after to be sent to the connector, you need to manually transform your source data and not nest them in after so they appear as:

{
  "table":"CUSTOMER_ORDER",
  "op_type":"I",
  "op_ts":"2021-05-13 10:35:41.000337",
  "current_ts":"2021-05-26T22:24:20.227000",
  "pos":"00000000000000002478",
  "ID":"9",
  "CODE":"AAAA09",
  "CREATED":"2021-05-13 10:35:37",
  "STATUS":"DELIVERED",
  "UPDATE_TIME":"2021-05-13 10:35:37.021298000"
}

If you cannot manipulate the source data then you have to map value.after to a UDT column. Cheers!

Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.