Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

rk20.storage_134453 avatar image
rk20.storage_134453 asked ·

How does the Spark connector handle deletes/updates in streaming use cases?

Hello everyone,


During readstream operation, how does spark connector handles record delete/update(when the record is already fetched from DB)? Does these changes gets reflected in readstream join?


Thanks,

Rahul

spark-cassandra-connector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

alex.ott avatar image
alex.ott answered ·

When you don't cache the dataframe read from the Cassandra, then when doing join, Spark will perform query to DB when join happens. So if you first had the data in Cassandra, then you modified them, then at least next microbatch of structured streaming should get the updated data.

You may see this demonstrated in the following Apache Zeppelin notebook (you need Zeppelin 0.9.0-preview1 to load it)

6 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

@alex.ott Thank you for your quick response.

My assumption is there is some change notification over a table (depending on the query), which gets propagated to executors. This also implies, not all data is sent, only changes are transported to executors.

Consider the case of update: If we update a column(or being a new column added), which is not part of primary key, how this notification sent by client, to spark executor. I believe there need to be some kind of merging to be done on executor (otherwise we will have two plus copies of record in executor).

Let's say if the update change is somehow propagated, I want to know since that data/row is already present in spark memory, how does connector update the executor memory with new row. Can you please point me to the connector code?


delete: A record got deleted in DB, how does Cassandra notifies spark executors? Can you please point me to relevant section of connector code.


-Rahul

0 Likes 0 · ·
alex.ott avatar image alex.ott rk20.storage_134453 ·

No notifications are sent from Cassandra to Spark executor - it simply reads the data anew. The trick is that when you're doing joining on the primary or partition key, then Spark Cassandra Connector is doing optimization by reading only necessary data, not the whole table. For dataframes it's often called direct join - please read this blog post on that functionality in the SCC 2.5.0: https://www.datastax.com/blog/2020/05/advanced-apache-cassandra-analytics-now-open-all

0 Likes 0 · ·

Thank you @alex.ott again. I spent some time reading the link. To me it seems "direct join" reads from specific partition, instead of complete table scan. Hence, it acheived better scalability and lower latency.

However it does not address my question. Implementation of readstream and read have to be distinctly different, imo. Read evaluation is one time operation, hence makes sense that query can be pushed to db, and db can take care of internal states. However, this is not the case in readstream(), imo. Structured streaming programming model states that new data will be appended in spark memory . Please correct me If I'm mistaken.

Is Structured streaming source implemented in connector?


Best,

Rahul

0 Likes 0 · ·
Show more comments
Erick Ramirez avatar image
Erick Ramirez answered ·

If a partition is read at time t, Cassandra will return the latest version of that partition at that time whatever it is. If that partition was updated say m milliseconds later, it's irrelevant since the version at time t+m is not what was requested but the version at time t.

With writes to C* taking place at high velocity, it is plausible that the data may have been updated between the time that the read was requested and the time that a response was returned. This is expected in a distributed environment. Cheers!

[EDIT] I've got a feeling that my response doesn't address your needs. Feel free to comment and I'd be happy to update my answer. :)

1 comment Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

@Erick Ramirez Thank you for taking time.

I have updated my comment above. My concern is how the executor cached table is updated in case of record changes. If document gets deleted+ updated, connector/client should inform/handle spark cached table. It would really really help if you can point me to the connector code, where it is being handled.

Best,

Rahul

0 Likes 0 · ·