Bringing together the Apache Cassandra experts from the community and DataStax.

Want to learn? Have a question? Want to share your expertise? You are in the right place!

Not sure where to begin? Getting Started

 

question

jagannath.bilgi@anant.us avatar image
jagannath.bilgi@anant.us asked ·

Spark to cassandra connection from different data centers

Trying to connect from spark in one data center (only used for processing) to cassandra (data is present) in other data center. Would please advise how this can be achieved. using DSE 5.1.17.On submit spark job it is always connecting to cassandra on spark node.
Note: User used for connecting Spark is different from Cassandra user.
Able connect to C* and Spark if user is same

[UPDATE]

Environment.

  • sparkUser - Present in spark cluster
  • cassUser - Present in C* and spark cluster
  • "superusers" = "Y"

Fails

dse -u sparkUser -p sparkPwd spark --master "dse://sparkIP" --conf spark.cassandra.connection.host=cassIP --conf spark.cassandra.auth.username=cassUser --conf spark.cassandra.auth.password=cassPwd --conf spark.hadoop.cassandra.host=cassIP

Succeeds

dse -u cassUser -p cassPwd spark --master "dse://sparkIP" --conf spark.cassandra.connection.host=cassIP --conf spark.cassandra.auth.username=cassUser --conf spark.cassandra.auth.password=cassPwd --conf spark.hadoop.cassandra.host=cassIP

Expectation

Command to succeed with different user. i.e. option 1

spark-cassandra-connector
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

jaroslaw.grabowski_50515 avatar image
jaroslaw.grabowski_50515 answered ·
16 comments Share
10 |1000 characters needed characters left characters exceeded

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Hi Jaroslaw,

Thank you for answer. Able to get get the data using sc.cassandraTable. Have requirement to get data using spark.sql as well. Below is my requirement.

Have composite partition key and need to query based on first column from partition key. Have tried using sc.cassandraTable with where clause. However it failed with below error.

java.lang.UnsupportedOperationException: Partition key predicate must include all partition key columns or partition key columns need to be indexed. Missing columns:

Able to get results using prepared sql statement (spark.sql). However query is failing as it is referring cassandra from spark cluster instead of cassandra cluster

Would you please suggest.

Thanks and regards

Jagannath S Bilgi

0 Likes 0 · ·

Not sure what happened here but a part of the conversation got deleted.

To reiterate, you are able to do a full table scan with Spark side filtering for RDDs and DFs. Querying data like this is the least efficient way possible.

To connect to a different cluster follow the blogpost mentioned above. Set up different connection parameters for the clusters. Use different user and password if needed. Here is the settings summary: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#cassandra-connection-parameters

0 Likes 0 · ·

Hi Jaroslaw,

Would you please advise on above. In case solution mentioned in previous trails if final, please confirm the same

0 Likes 0 · ·
Show more comments
Show more comments

For DFs querying with a subset of the partition key performs a full table scan with Spark side filtering. For RDDs you could achieve the same effect with

sc.cassandraTable(ks, table).filter(...)

But again, it's a full table scan with a filter applied by Spark.

0 Likes 0 · ·

Here my assumption is spark would apply tokenawarepolicy along with first partition key. There by it prevents full table scan

0 Likes 0 · ·

Cassandra is not able to build the partition key hash with only a subset of partitioning values thus this case ends with a full table scan.

0 Likes 0 · ·

Thank you for the response.

Before proceeding further would you please advise below statements perform the same.

table1 is with composite partition key and col1 is first column of part of partition key

spark.sql("Select col1, col2, col3 from ks.table1 where col1 = 'abc'")

sc.cassandraTable("ks","table1").filter( r => r.getString("col1") == "abc")

Apart from the above also need to query like below. Please advise connector supports the same

row_number() over(partition by
0 Likes 0 · ·
Show more comments