I'm trying to parse a nested field of JSON messages into columns in cassandra.
JSON messages look like this:
{ "table":"CUSTOMER_ORDER", "op_type":"I", "op_ts":"2021-05-13 10:35:41.000337", "current_ts":"2021-05-26T22:24:20.227000", "pos":"00000000000000002478", "after": { "ID":"9", "CODE":"AAAA09", "CREATED":"2021-05-13 10:35:37", "STATUS":"DELIVERED", "UPDATE_TIME":"2021-05-13 10:35:37.021298000" } }
I did it using UDT for "after" record at all, but when i trying map column by column, didn't work.
I add the following mapping fields to topic in configuration file:
- So it works (table with UDT in Cassandra):
topic.CUSTOMER_ORDER.eshop.cust_ord2.mapping: r_table=value.table, op_type=value.op_type, op_ts=key.op_ts, current_ts=value.current_ts, pos=key.pos, after=value.after
- like that, it doesn't work (table withot UDT in Cassandra):
topic.CUSTOMER_ORDER.eshop.cust_ord7.mapping: r_table=value.table, op_type=value.op_type, op_ts=value.op_ts, current_ts=value.current_ts, pos=value.pos, ID=value.after.ID, CODE=value.after.CODE, CREATED=value.after.CREATED, STATUS=value.after.STATUS, UPDATE_TIME=value.after.UPDATE_TIME
I tried to put in the sink connector configuration file, the following settings:
transforms=flatten transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value transforms.flatten.delimiter=.
But when I start the consumer and it tries to read the first message, I get the following error:
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [flattening], found: java.lang.String
Has anyone been through something like that? Can anyone help me with this issue?