Hello everyone,
During readstream operation, how does spark connector handles record delete/update(when the record is already fetched from DB)? Does these changes gets reflected in readstream join?
Thanks,
Rahul
Hello everyone,
During readstream operation, how does spark connector handles record delete/update(when the record is already fetched from DB)? Does these changes gets reflected in readstream join?
Thanks,
Rahul
When you don't cache the dataframe read from the Cassandra, then when doing join, Spark will perform query to DB when join happens. So if you first had the data in Cassandra, then you modified them, then at least next microbatch of structured streaming should get the updated data.
You may see this demonstrated in the following Apache Zeppelin notebook (you need Zeppelin 0.9.0-preview1 to load it)
@alex.ott Thank you for your quick response.
My assumption is there is some change notification over a table (depending on the query), which gets propagated to executors. This also implies, not all data is sent, only changes are transported to executors.
Consider the case of update: If we update a column(or being a new column added), which is not part of primary key, how this notification sent by client, to spark executor. I believe there need to be some kind of merging to be done on executor (otherwise we will have two plus copies of record in executor).
Let's say if the update change is somehow propagated, I want to know since that data/row is already present in spark memory, how does connector update the executor memory with new row. Can you please point me to the connector code?
delete: A record got deleted in DB, how does Cassandra notifies spark executors? Can you please point me to relevant section of connector code.
-Rahul
No notifications are sent from Cassandra to Spark executor - it simply reads the data anew. The trick is that when you're doing joining on the primary or partition key, then Spark Cassandra Connector is doing optimization by reading only necessary data, not the whole table. For dataframes it's often called direct join - please read this blog post on that functionality in the SCC 2.5.0: https://www.datastax.com/blog/2020/05/advanced-apache-cassandra-analytics-now-open-all
Thank you @alex.ott again. I spent some time reading the link. To me it seems "direct join" reads from specific partition, instead of complete table scan. Hence, it acheived better scalability and lower latency.
However it does not address my question. Implementation of readstream and read have to be distinctly different, imo. Read evaluation is one time operation, hence makes sense that query can be pushed to db, and db can take care of internal states. However, this is not the case in readstream(), imo. Structured streaming programming model states that new data will be appended in spark memory . Please correct me If I'm mistaken.
Is Structured streaming source implemented in connector?
Best,
Rahul
If a partition is read at time t
, Cassandra will return the latest version of that partition at that time whatever it is. If that partition was updated say m
milliseconds later, it's irrelevant since the version at time t+m
is not what was requested but the version at time t
.
With writes to C* taking place at high velocity, it is plausible that the data may have been updated between the time that the read was requested and the time that a response was returned. This is expected in a distributed environment. Cheers!
[EDIT] I've got a feeling that my response doesn't address your needs. Feel free to comment and I'd be happy to update my answer. :)
@Erick Ramirez Thank you for taking time.
I have updated my comment above. My concern is how the executor cached table is updated in case of record changes. If document gets deleted+ updated, connector/client should inform/handle spark cached table. It would really really help if you can point me to the connector code, where it is being handled.
Best,
Rahul
6 People are following this question.
Error when filtering Spark Cassandra table after parsing binary Avro column
Issue with Cassandra-side pushdown in Spark connector
What is the best way to check if record exist in a large partition table in Cassandra?
Why has Spark created more partitions reading a table from Cassandra than input split size?
DataStax Enterprise is powered by the best distribution of Apache Cassandra ™
© 2023 DataStax, Titan, and TitanDB are registered trademarks of DataStax, Inc. and its subsidiaries in the United States and/or other countries.
Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries.
Privacy Policy Terms of Use