question

devloper avatar image
devloper asked absurdfarce answered

Python3: cassandra.cluster.NoHostAvailable: (“Unable to connect to any servers using keyspace 'test'”, ['127.0.0.1']) when using execute_async future

I am trying to fetch data from Cassandra from a specific table and trying to insert it into another table in Cassandra after making some changes. Both the tables are located in keyspace "test". When I am trying to get the data from the first table everything works fine and it is able to fetch the data. However, in the future handler which handles the output of the first query, I am trying to insert the data into another table under the same Cassandra instance and it is gettingting failed. I am getting an error from the application stating "cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])" . I am not sure where I am going wrong

import threading
from threading import Event
from cassandra.query import SimpleStatement
from cassandra.cluster import Cluster


hosts=['127.0.0.1']
keyspace="test"
thread_local = threading.local()
cluster_ = Cluster(hosts)
def get_session():
    if hasattr(thread_local, "cassandra_session"):
        print("got session from threadlocal")
        return thread_local.cassandra_session
    print(" Connecting to Cassandra Host " + str(hosts))
    session_ = cluster_.connect(keyspace)
    print(" Connecting and creating session to Cassandra KeySpace " + keyspace)
    thread_local.cassandra_session = session_
    return session_


class PagedResultHandler(object):

    def __init__(self, future):
        self.error = None
        self.finished_event = Event()
        self.future = future
        self.future.add_callbacks(
            callback=self.handle_page,
            errback=self.handle_error)

    def handle_page(self, rows):
        for row in rows:
            process_row(row)

        if self.future.has_more_pages:
            self.future.start_fetching_next_page()
        else:
            self.finished_event.set()

    def handle_error(self, exc):
        self.error = exc
        self.finished_event.set()

def process_row(row):
    print(row)
    session_ = get_session()
    stmt = session_.prepare(
        "INSERT INTO test.data(customer,snr,rttt, event_time) VALUES (?,?,?,?)")
    results = session_.execute(stmt,
                               [row.customer, row.snr, row.rttt,row.created_time])
    print("Done")

session = get_session()
query = "select * from test.data_log"
statement = SimpleStatement(query, fetch_size=1000)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
handler.finished_event.wait()
if handler.error:
    raise handler.error
cluster_.shutdown()

However, when I try to execute the python file the application is throwing an error "cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])" from getSession() call from "process_row" method. Clearly, the first call to Cassandra is getting succeeded without any issues. There is no connectivity issue and the Cassandra instance is running fine locally. I am able to query the data using cqlsh. If I call the process_row method outside the future handler everything is working fine, I am not sure what needs to be done to make it happen from the Future Handler.

Connecting to Cassandra Host ['127.0.0.1']
Connecting and creating session to Cassandra KeySpace test
Row(customer='abcd', snr=100, rttt=121, created_time=datetime.datetime(2020, 8, 8, 2, 26, 51))
 Connecting to Cassandra Host ['127.0.0.1']
Traceback (most recent call last):
  File "test/check.py", , in <module>
    raise handler.error
  File "cassandra/cluster.py", line 4579, in cassandra.cluster.ResponseFuture._set_result
  File "cassandra/cluster.py", line 4777, in cassandra.cluster.ResponseFuture._set_final_result
  File "test/check.py"",  in handle_page
    process_row(row)
  File "test/check.py"",  in process_row
    session_ = get_session()
  File "/test/check.py"", in get_session
    session_ = cluster_.connect(keyspace)
  File "cassandra/cluster.py", line 1715, in cassandra.cluster.Cluster.connect
  File "cassandra/cluster.py", line 1772, in cassandra.cluster.Cluster._new_session
  File "cassandra/cluster.py", line 2553, in cassandra.cluster.Session.__init__
cassandra.cluster.NoHostAvailable: ("Unable to connect to any servers using keyspace 'test'", ['127.0.0.1'])

Process finished with exit code 1
python driver
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.

1 Answer

absurdfarce avatar image
absurdfarce answered

There are a couple things which might be contributing to the problem you're seeing. The first problem is a multiple connection issue, the second is a synchronous op within your callback.


The multiple connection issue is occurring because process_row() is being called from within a callback. The Python driver will execute that callback on a background thread pool if the result isn't readily available (see https://docs.datastax.com/en/developer/python-driver/3.24/api/cassandra/cluster/#cassandra.cluster.ResponseFuture.add_callback) so your re-use logic based around a thread-specific structure isn't actually leading to re-use. Note the duplicate instances of "Connecting to Cassandra Host ['127.0.0.1']" in your log messages; you're not re-using the session as you might expect here. Fortunately this is easy to fix; the Session is itself thread-safe so you can just create an instance in your initialization logic and re-use that.


The synchronous op is the Session.execute() call in process_row(). This function is called within a callback that's executing asynchronously so you're mixing sync and async code here which can certainly lead to trouble. The link to the docs around add_callback() referenced above provides some additional context:


"Note: in the case that the result is not available when the callback is added, the callback is executed by IO event thread. This means that the callback should not block or attempt further synchronous requests, because no further IO will be processed until the callback returns."


Session.execute() is a blocking function.


The following variation on your code works for me locally. I removed the thread-local logic per the discussion above and fixed the sync/async mismatch:


from threading import Event
from cassandra.query import SimpleStatement
from cassandra.cluster import Cluster
hosts=['127.0.0.1']
keyspace="testkeyspace"
cluster_ = Cluster(hosts)
session = cluster_.connect(keyspace)
insert_stmt = session.prepare("INSERT INTO testkeyspace.other_testtable(key,value) VALUES (?,?)")
class PagedResultHandler(object):
    def __init__(self, future):
        self.error = None
        self.finished_event = Event()
        self.future = future
        self.future.add_callbacks(
            callback=self.handle_page,
            errback=self.handle_error)
    def handle_page(self, rows):
        print("In handle_page")
        for row in rows:
            process_row(row)
        if self.future.has_more_pages:
            self.future.start_fetching_next_page()
        else:
            self.finished_event.set()
    def handle_error(self, exc):
        self.error = exc
        self.finished_event.set()
def process_row(row):
    print(row)
    future = session.execute_async(insert_stmt, [row.key, row.value])
    def done_cb(x):
        print("Done")
    future.add_callback(done_cb)
query = "select * from testkeyspace.testtable"
statement = SimpleStatement(query, fetch_size=1000)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
handler.finished_event.wait()
if handler.error:
    raise handler.error
cluster_.shutdown


Share
10 |1000

Up to 8 attachments (including images) can be used with a maximum of 1.0 MiB each and 10.0 MiB total.