question

digui avatar image
digui asked Erick Ramirez edited

Connecting to AWS Keyspaces from AWS EMR using pyspark

Hello everyone, I'm still taking baby steps, both on using spark and cassandra. I'd really love if someone could shed some light here.

I'm trying to connect to a table in AWS Keyspace through a jupyter notebook running on top of my AWS EMR Cluster:

Cluster configs:

Release label:emr-5.30.1

Hadoop distribution:Amazon 2.8.5

Applications:Hive 2.3.6, Pig 0.17.0, Hue 4.6.0, Livy 0.7.0, Spark 2.4.5

%%configure -f 
{
  "jars": ["s3://test-bucket/lib/spark-cassandra-connector_2.11-2.5.1.jar"]
}

from pyspark.sql import *
from pyspark.sql.types import *

spark = SparkSession.builder \
  .appName('SparkCassandraApp')
  .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.11-2.5.1')\
  .config('spark.cassandra.connection.host', 'cassandra.sa-east-1.amazonaws.com') \
  .config('spark.cassandra.connection.port', '9142') \
  .config('spark.cassandra.connection.ssl.enabled','true') \
  .config('spark.cassandra.connection.ssl.trustStore.password','cassandratest') \
  .config("spark.cassandra.auth.username","myusername")\
  .config("spark.cassandra.auth.password","mypassword") \
  .getOrCreate()

keyspace = 'test'
table = 'test_table'
df = (
    sqlContext.
    read.format("org.apache.spark.sql.cassandra").
    options(table=table, keyspace=keyspace).
    load()
)

As you can see from above, I'm using spark-cassandra-connector_2.11-2.5.1.jar

I'm getting the following error:

An error occurred while calling o111.load.
: java.lang.NoClassDefFoundError: com/datastax/spark/connector/util/Logging
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:405)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.util.Logging
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 32 more

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o111.load.
: java.lang.NoClassDefFoundError: com/datastax/spark/connector/util/Logging
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:405)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.util.Logging
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 32 more

Am I missing something here?

Any help is appreciated! Thanks!

spark-cassandra-connectorunsupportedaws keyspaces
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

Erick Ramirez avatar image
Erick Ramirez answered Erick Ramirez commented

The stack trace you posted indicates an environmental issue possibly with the classpath not configured correctly. I don't think the Spark connector is the problem here.

Consequently, I would recommend getting help from AWS since you are using their stack. AWS Keyspaces is a black box and there's limited public information on what it really is but my suspicion is that it's not really Cassandra under the hood but Dynamo DB and there's a skin/engine sitting on top of Dynamo that provides a CQL API so it can pretend to be a Cassandra instance to clients.

I'll reach out to the Analytics team here at DataStax to see if they can provide clues but I think since you're using AWS, they should be your first port of call. Cheers!

2 comments Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

digui avatar image digui commented ·

Thanks @jaroslaw.grabowski_50515 and @Erick Ramirez for the reply. I actually changed the version of the package, and started getting this:


'Unsupported partitioner: com.amazonaws.cassandra.DefaultPartitioner


Honestly, I wanted to get a quickstart with Keyspaces before trying to setup a cluster in our Kubernetes environment, but given that it's not really being a "quick" start, I'm gonna start looking into creating the cluster. Thanks a lot for the help though!

0 Likes 0 ·
Erick Ramirez avatar image Erick Ramirez ♦♦ digui commented ·

That's because there are only 2 Cassandra partitioners supported:

  • Murmur3Partitioner
  • RandomPartitioner

The connector doesn't know anything about AWS partitioners. Cheers!

0 Likes 0 ·
jaroslaw.grabowski_50515 avatar image
jaroslaw.grabowski_50515 answered

Hi! It seams that you place only `spark-cassandra-connector` jar on your classpath with `jars option. You need to also have the connector's dependencies on your classpath. I could either:

- use `spark.jar.packages` before driver is started (pass it with --conf ...) argument. Passing it to SparkSession builder may not work.

- or use `spark-cassandra-connector-assembly` in `jars`, it is a fat jar with all the needed dependencies

Cheers!

Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.