If you need asynchronous support in python unit tests, there are a few possibilities. A simple google search points you to a few packages, one being tornado. Tornado’s tornado.testing.AsyncTestCase seems straight-forward. However it does have a shortcoming: it’s not thread-safe. Trying to use it with a thread will result in a sporadic exception. Consider this thread-safe, functionally similar alternative.
from threading import Event from unittest import TestCase class TimeoutError(Exception): pass class AsyncTestCase(TestCase): """ Provides functionality similar to tornado.testing.AsyncTestCase except is thread-safe. """ def __init__(self): self._done = Event() def setUp(self): super(AsyncTestCase, self).setUp() self._done.clear() def tearDown(self): super(AsyncTestCase, self).tearDown() self._done.clear() def wait(self, timeout=None): """ Wait for event to be set. Raise an exception if the event isn't set by the timeout. """ if timeout is None: timeout = get_async_test_timeout() self._done.wait(timeout) if not self.is_done(): raise TimeoutError() def stop(self): """ Set the event to terminate wait. """ self._done.set() def is_done(self): return self._done.isSet() # taken from tornado. def get_async_test_timeout(default=5): """Get the global timeout setting for async tests. Returns a float, the timeout in seconds. """ try: return float(os.environ.get('ASYNC_TEST_TIMEOUT')) except (ValueError, TypeError): return default