blob: 8b99e15bc842e4986fdaacab8ac1937bb2cba8d7 [file] [log] [blame]
"""These basic tests may be applied to most types of executors."""
from __future__ import print_function
from functools import partial
from random import randint
import traceback
from threading import RLock
import sys
from concurrent.futures import Executor, CancelledError, wait, FIRST_COMPLETED
from six.moves.queue import Queue
try:
from time import monotonic
except ImportError:
from monotonic import monotonic
from hamcrest import (
assert_that,
equal_to,
calling,
raises,
instance_of,
has_length,
is_,
contains_string,
)
from pytest import fixture, skip
from more_executors import Executors, RetryPolicy, f_proxy, f_return
from .util import assert_soon, run_or_timeout
from .logging_util import dump_executor, add_debug_logging
TIMEOUT = 20.0
class SimulatedError(RuntimeError):
pass
def map_noop(value):
# Make MapExecutor pass everything through unchanged
return value
def flat_map_noop(executor, value):
# Make FlatMapExecutor pass everything through unchanged,
# using the given executor to produce new futures
return executor.submit(map_noop, value)
def poll_noop(ds):
# Make PollExecutor pass everything through unchanged
[d.yield_result(d.result) for d in ds]
# Poll again soon
return 0.0001
# Don't want to make this public API now, but do want to test if
# an executor returning everything as a proxy would work
class ProxyExecutor(Executor):
# pylint: disable=arguments-differ
def __init__(self, delegate):
self.__delegate = delegate
def submit(self, *args, **kwargs):
return f_proxy(self.__delegate.submit(*args, **kwargs))
def shutdown(self, *args, **kwargs):
return self.__delegate.shutdown(*args, **kwargs)
@fixture
def retry_executor_ctor():
return lambda: Executors.thread_pool(name="test-retry").with_retry(max_attempts=1)
@fixture
def threadpool_executor_ctor():
return lambda: Executors.thread_pool(max_workers=20)
@fixture
def sync_executor_ctor():
return Executors.sync
@fixture
def map_executor_ctor(threadpool_executor_ctor):
return lambda: threadpool_executor_ctor().with_map(map_noop)
@fixture
def flat_map_executor_ctor(threadpool_executor_ctor):
def out():
threadpool_executor = threadpool_executor_ctor()
return threadpool_executor.with_flat_map(
partial(flat_map_noop, threadpool_executor)
)
return out
@fixture
def proxy_executor_ctor(threadpool_executor_ctor):
def out():
threadpool_executor = threadpool_executor_ctor()
return ProxyExecutor(threadpool_executor)
return out
@fixture
def throttle_executor_ctor(threadpool_executor_ctor):
return lambda: threadpool_executor_ctor().with_throttle(10)
@fixture
def cancel_on_shutdown_executor_ctor(threadpool_executor_ctor):
return lambda: threadpool_executor_ctor().with_cancel_on_shutdown()
@fixture
def map_retry_executor_ctor(threadpool_executor_ctor):
return (
lambda: threadpool_executor_ctor().with_retry(RetryPolicy()).with_map(map_noop)
)
@fixture
def retry_map_executor_ctor(threadpool_executor_ctor):
return (
lambda: threadpool_executor_ctor().with_map(map_noop).with_retry(RetryPolicy())
)
@fixture
def timeout_executor_ctor(threadpool_executor_ctor):
return lambda: threadpool_executor_ctor().with_timeout(60.0)
@fixture
def cancel_poll_map_retry_executor_ctor(threadpool_executor_ctor):
return (
lambda: threadpool_executor_ctor()
.with_retry(RetryPolicy())
.with_map(map_noop)
.with_poll(poll_noop)
.with_cancel_on_shutdown()
)
@fixture
def cancel_retry_map_poll_executor_ctor(threadpool_executor_ctor):
return (
lambda: threadpool_executor_ctor()
.with_poll(poll_noop)
.with_map(map_noop)
.with_retry(RetryPolicy())
.with_cancel_on_shutdown()
)
@fixture
def retry_map_poll_executor_ctor(threadpool_executor_ctor):
return (
lambda: threadpool_executor_ctor()
.with_poll(poll_noop)
.with_map(map_noop)
.with_retry(RetryPolicy())
)
def random_cancel(_value):
"""cancel function for use with poll executor which randomly decides whether
cancel should succeed. This targets the stress test. The point here is that
the futures should still satisfy the invariants of the Future API regardless
of what the cancel function does."""
select = randint(0, 300)
if select < 100:
return True
if select < 200:
return False
raise RuntimeError("simulated error from cancel")
@fixture
def poll_executor_ctor(threadpool_executor_ctor):
return lambda: threadpool_executor_ctor().with_poll(poll_noop, random_cancel)
def everything_executor(base_executor, name):
# Get ready to go *nuts*
return (
base_executor.with_poll(poll_noop, name=name)
.with_map(map_noop)
.with_retry(RetryPolicy())
.with_cancel_on_shutdown()
.with_retry(max_attempts=1, max_sleep=0.1)
.with_retry(RetryPolicy())
.with_throttle(10)
.with_flat_map(partial(flat_map_noop, base_executor))
.with_timeout(120.0)
.with_poll(poll_noop)
.with_poll(poll_noop)
.with_cancel_on_shutdown()
.with_flat_map(lambda x: f_proxy(f_return(x)))
.with_throttle(16)
.with_flat_map(partial(flat_map_noop, base_executor))
.with_map(map_noop)
.with_timeout(180.0)
.with_map(map_noop)
.with_retry(RetryPolicy())
)
@fixture
def everything_sync_executor_ctor(sync_executor_ctor):
return lambda: everything_executor(sync_executor_ctor(), "everything-sync")
@fixture
def everything_threadpool_executor_ctor(threadpool_executor_ctor):
return lambda: everything_executor(
threadpool_executor_ctor(), "everything-threadpool"
)
EXECUTOR_TYPES = [
"threadpool",
"retry",
"map",
"retry_map",
"map_retry",
"poll",
"retry_map_poll",
"sync",
"timeout",
"throttle",
"cancel_poll_map_retry",
"cancel_retry_map_poll",
"flat_map",
"proxy",
"everything_sync",
"everything_threadpool",
]
@fixture(params=EXECUTOR_TYPES)
def any_executor(request):
ctor = request.getfixturevalue(request.param + "_executor_ctor")
ex = ctor()
# Capture log messages onto the executor itself,
# for use with dump_executor if test fails.
add_debug_logging(ex)
# Want to know if the test failed; is there a better way
# than counting the failure counts here??
failed_before = request.session.testsfailed
yield ex
failed_diff = request.session.testsfailed - failed_before
if not failed_diff:
try:
kwargs = {}
if sys.version_info > (3, 9):
kwargs["cancel_futures"] = True
run_or_timeout(ex.shutdown, True, **kwargs)
return
except Exception:
print("Shutdown failed")
dump_executor(ex)
raise
# Current test failed:
# - dump state of executor for improved debugging
# - use non-blocking shutdown, as blocking has a high chance
# of hanging
print("Test failed at time %s" % monotonic())
dump_executor(ex)
ex.shutdown(False)
@fixture(params=EXECUTOR_TYPES)
def any_executor_ctor(request):
return request.getfixturevalue(request.param + "_executor_ctor")
def test_submit_results(any_executor):
values = range(0, 1000)
expected_results = [v * 2 for v in values]
def fn(x):
return x * 2
futures = [any_executor.submit(fn, x) for x in values]
for f in futures:
assert_that(not f.cancelled())
results = [f.result(TIMEOUT) for f in futures]
assert_that(results, equal_to(expected_results))
def test_future_outlive_executor(any_executor_ctor):
def make_futures(executor):
return [executor.submit(lambda x: x * 2, y) for y in [1, 2, 3, 4]]
futures = make_futures(any_executor_ctor())
results = [f.result(TIMEOUT) for f in futures]
assert results == [2, 4, 6, 8]
def test_broken_callback(any_executor):
values = range(0, 1000)
expected_results = [v * 2 for v in values]
callback_calls = []
def fn(x):
return x * 2
def broken_callback(f):
callback_calls.append(f)
raise RuntimeError("simulated broken callback")
futures = [any_executor.submit(fn, x) for x in values]
for f in futures:
try:
f.add_done_callback(broken_callback)
except RuntimeError:
# This is allowed - if future is done already,
# the callback was invoked directly without exception handler.
pass
for f in futures:
assert_that(not f.cancelled())
results = [f.result(TIMEOUT) for f in futures]
assert_that(results, equal_to(expected_results))
# assert_soon as there's no guarantee that callbacks
# are invoked before result() returns.
assert_soon(lambda: assert_that(callback_calls, has_length(len(futures))))
for f in futures:
assert_that(f in callback_calls)
def test_submit_delayed_results(any_executor, request):
if "sync" in request.node.name:
skip("test not applicable with sync executor")
values = [1, 2, 3]
expected_results = [2, 4, 6]
queue = Queue()
def fn(value):
queue.get(True)
return value * 2
futures = [any_executor.submit(fn, x) for x in values]
for f in futures:
assert_that(not f.cancelled())
# They're not guaranteed to be "running" yet, but should
# become so soon
assert_soon(lambda: assert_that(all([f.running() for f in futures])))
# OK, they're not done yet though.
for f in futures:
assert_that(not f.done())
# Let them proceed
queue.put(None)
queue.put(None)
queue.put(None)
results = [f.result(TIMEOUT) for f in futures]
assert_that(results, equal_to(expected_results))
def test_cancel(any_executor):
for _ in range(0, 100):
values = [1, 2, 3]
expected_results = set([2, 4, 6])
def fn(x):
return x * 2
futures = [any_executor.submit(fn, x) for x in values]
cancelled = []
for f in futures:
# There's no way we can be sure if cancel is possible.
# We can only try...
if f.cancel():
cancelled.append(f)
assert_that(f.cancelled(), str(f))
assert_that(not f.running(), str(f))
assert_that(f.done(), str(f))
# Cancelling multiple times should be fine
assert_that(f.cancel())
assert_that(f.cancel())
else:
assert_that(not f.cancelled(), str(f))
for f in futures:
if f in cancelled:
assert_that(
calling(f.result).with_args(TIMEOUT), raises(CancelledError), str(f)
)
else:
result = f.result(TIMEOUT)
assert_that(result in expected_results)
expected_results.remove(result)
def test_blocked_cancel(any_executor, request):
if "sync" in request.node.name:
skip("test not applicable with sync executor")
to_fn = Queue(1)
from_fn = Queue(1)
def fn():
to_fn.get()
from_fn.put(None)
to_fn.get()
return 123
future = any_executor.submit(fn)
# Wait until fn is certainly running
to_fn.put(None)
from_fn.get()
# Since the function is in progress,
# it should NOT be possible to cancel it
assert_that(not future.cancel(), str(future))
# assert_soon since, by blocking on from_fn.get(), we only guarantee
# that the future is running from the innermost executor/future's point
# of view, but this may not have propagated to the outermost future yet
assert_soon(lambda: assert_that(future.running(), str(future)))
# Re-check cancel after running() is true
assert_that(not future.cancel(), str(future))
# Let fn proceed and the future should be able to complete
to_fn.put(None)
assert_that(future.result(TIMEOUT), equal_to(123))
def get_traceback(future):
exception = future.exception()
if "__traceback__" in dir(exception):
return exception.__traceback__
return future.exception_info()[1]
def test_submit_mixed(any_executor):
values = [1, 2, 3, 4]
def crash_on_even(x):
if (x % 2) == 0:
raise SimulatedError("Simulated error on %s" % x)
return x * 2
futures = [any_executor.submit(crash_on_even, x) for x in values]
for f in futures:
assert_that(not f.cancelled())
# Success
assert_that(futures[0].result(TIMEOUT), equal_to(2))
# Crash, via exception
assert_that(futures[1].exception(TIMEOUT), instance_of(SimulatedError))
assert_that(
"".join(traceback.format_tb(get_traceback(futures[1]))),
contains_string("crash_on_even"),
)
# Success
assert_that(futures[2].result(TIMEOUT), equal_to(6))
# Crash, via result
assert_that(
calling(futures[3].result).with_args(TIMEOUT),
raises(SimulatedError, "Simulated error on 4"),
)
def test_submit_staggered(any_executor, request):
if "sync" in request.node.name:
skip("test not applicable with sync executor")
for _ in range(0, 100):
do_test_submit_staggered(any_executor)
def test_double_shutdown(any_executor):
"""Shutting down an executore more than once is OK."""
any_executor.shutdown()
any_executor.shutdown()
def test_submit_after_shutdown(any_executor):
"""Submitting after executor shutdown raises an error."""
any_executor.shutdown()
assert_that(
calling(any_executor.submit).with_args(lambda: 123),
raises(RuntimeError, "cannot schedule new futures after shutdown"),
)
def do_test_submit_staggered(executor):
values = [1, 2, 3]
expected_results = [2, 4, 6, 2, 4, 6]
q1 = Queue()
q2 = Queue()
def fn(value):
q1.get(True)
q2.get(True)
return value * 2
futures = [executor.submit(fn, x) for x in values]
for f in futures:
assert_that(not f.cancelled())
# They're not guaranteed to be "running" yet, but should
# become so soon
assert_soon(lambda: assert_that(all([f.running() for f in futures])))
# OK, they're not done yet though.
for f in futures:
assert_that(not f.done())
# Let them proceed to first checkpoint
[q1.put(None) for f in futures]
# Submit some more
futures.extend([executor.submit(fn, x) for x in values])
# Let a couple of futures complete
q2.put(True)
q2.put(True)
(done, not_done) = wait(futures, return_when=FIRST_COMPLETED, timeout=TIMEOUT)
# Might have received 1, or 2
if len(done) == 1:
(more_done, _more_not_done) = wait(
not_done, return_when=FIRST_COMPLETED, timeout=TIMEOUT
)
done = done | more_done
assert_that(done, has_length(2))
for f in done:
assert_that(f.done(), str(f))
# OK, let them all finish up now
[q1.put(None) for _ in (1, 2, 3)]
[q2.put(None) for _ in (1, 2, 3, 4)]
results = [f.result(TIMEOUT) for f in futures]
assert_that(results, equal_to(expected_results))
class StressTester(object):
FUTURES_LIMIT = 500
CANCELLED = object()
def __init__(self, executor):
self.executor = executor
self.lock = RLock()
self.futures = []
self.future_idents = {}
self.expected_results = {}
self.idents = 0
def next_ident(self, msg):
with self.lock:
self.idents = self.idents + 1
return "%s %d" % (msg, self.idents)
def cancel_something(self):
# Try to pick and cancel some future
for f in self.futures:
if f.cancel():
with self.lock:
ident = self.future_idents[f]
self.expected_results[ident] = self.CANCELLED
return
def stress_fn(self, ident, behavior):
sub_future = None
if len(self.futures) < self.FUTURES_LIMIT:
sub_ident = self.next_ident("submit from [%s]" % ident)
sub_future = self.executor.submit(self.stress_fn, sub_ident, randint(0, 3))
self.add_future(sub_future, sub_ident)
# Return a value
if behavior == 0:
with self.lock:
assert ident not in self.expected_results
self.expected_results[ident] = ident
return ident
# Raise an exception
if behavior == 1:
error = RuntimeError("error %s" % ident)
with self.lock:
assert ident not in self.expected_results
self.expected_results[ident] = error
raise error
# Submit again from callback
if behavior == 2 and sub_future:
sub_future.add_done_callback(
lambda f: self.stress_fn(
self.next_ident("case2 from [%s]" % sub_ident), 2
)
)
with self.lock:
assert ident not in self.expected_results
self.expected_results[ident] = ident * 3
return ident * 3
if behavior == 3:
self.cancel_something()
with self.lock:
self.expected_results[ident] = ident * 4
return ident * 4
self.expected_results[ident] = ident * 5
return ident * 5
def add_future(self, future, ident):
with self.lock:
if len(self.futures) < self.FUTURES_LIMIT:
self.futures.append(future)
self.future_idents[future] = ident
return True
future.cancel()
return False
def start(self):
for _ in range(0, 100):
value = randint(0, 3)
ident = self.next_ident("init")
future = self.executor.submit(self.stress_fn, ident, value)
if not self.add_future(future, ident):
break
def verify(self):
# Wait until all the expected futures have been created
assert_soon(
lambda: assert_that(len(self.futures), equal_to(self.FUTURES_LIMIT))
)
# The timeout here is so that the test fails rather than hangs forever,
# if something goes wrong.
(done, _not_done) = wait(self.futures, 60.0)
assert_that(len(done), equal_to(len(self.futures)))
for f in self.futures:
self.verify_future(f)
def verify_future(self, f):
assert f.done(), str(f)
ident = self.future_idents[f]
assert ident in self.expected_results, "missing entry %s for future %s" % (
ident,
f,
)
expected_result = self.expected_results[ident]
if expected_result is self.CANCELLED:
assert_that(f.cancelled())
elif isinstance(expected_result, Exception):
assert_that(f.exception(TIMEOUT), is_(expected_result))
else:
assert_that(f.result(TIMEOUT), equal_to(expected_result))
def test_stress(any_executor, request):
if "sync" in request.node.name:
# The test as written currently will blow the stack on sync executor
skip("test not applicable with sync executor")
tester = StressTester(any_executor)
tester.start()
tester.verify()