blob: fd4f085192a55c8b2ca1822c553e327f39a48fde [file] [log] [blame]
"""Tests of the retry behavior in RetryExecutor."""
from concurrent.futures import ThreadPoolExecutor, Future
from threading import Event
from six.moves.queue import Queue
from hamcrest import (
assert_that,
equal_to,
is_,
calling,
raises,
all_of,
instance_of,
has_string,
)
from pytest import fixture
try:
from unittest.mock import MagicMock, call
except ImportError:
from mock import MagicMock, call
from more_executors import RetryExecutor, ExceptionRetryPolicy, RetryPolicy
from ..util import assert_soon
@fixture
def max_attempts():
return 10
@fixture
def executor(max_attempts):
policy = ExceptionRetryPolicy(
max_attempts=max_attempts,
exponent=2.0,
sleep=0.001,
max_sleep=0.010,
exception_base=Exception,
)
ex = RetryExecutor(ThreadPoolExecutor(), policy)
yield ex
ex.shutdown(wait=True)
def test_basic_retry(executor):
for _ in range(0, 100):
do_test_basic_retry(executor)
def do_test_basic_retry(executor):
fn = MagicMock()
fn.side_effect = [
ValueError("error 1"),
ValueError("error 2"),
ValueError("error 3"),
"result",
]
done_callback = MagicMock()
future = executor.submit(fn, "a1", "a2", kw1=1, kw2=2)
future.add_done_callback(done_callback)
# It should give the correct result
assert_that(future.result(10), equal_to("result"))
# It should not have any exception
assert_that(future.exception(), is_(None))
# It should have called the done callback exactly once
# (It could still be queued for call from another thread)
assert_soon(lambda: done_callback.assert_called_once_with(future))
# It should have called the function 4 times with
# exactly the submitted args
fn.assert_has_calls(
[
call("a1", "a2", kw1=1, kw2=2),
call("a1", "a2", kw1=1, kw2=2),
call("a1", "a2", kw1=1, kw2=2),
call("a1", "a2", kw1=1, kw2=2),
]
)
def test_fail(executor):
calls = []
def fn():
calls.append(None)
raise ValueError("error %s" % len(calls))
done_callback = MagicMock()
future = executor.submit(fn)
future.add_done_callback(done_callback)
# It should raise the exception from result()
assert_that(calling(future.result), raises(ValueError, "error 10"))
# exception() should be the same
assert_that(
future.exception(), all_of(instance_of(ValueError), has_string("error 10"))
)
# It should have called the done callback exactly once
# (It could still be queued for call from another thread)
assert_soon(lambda: done_callback.assert_called_once_with(future))
def test_cancel_delegate():
queue = Queue(1)
def null_submit(*args, **kwargs):
# makes a future which will never run
# (thus will be able to cancel)
queue.put(None)
return Future()
inner = MagicMock()
inner.submit.side_effect = null_submit
executor = RetryExecutor(inner)
def never_called():
raise AssertionError("this should not have been called!")
f = executor.submit(never_called)
def recancel(future):
# See what happens if we try to cancel it again from within
# the callback...
assert_that(future.cancel(), str(future))
f.add_done_callback(recancel)
# Wait until it was definitely submitted to inner
queue.get(True)
# Though it has been submitted to delegate executor,
# the inner future is cancelable,
# so the outer future should be as well
assert_that(f.cancel(), str(f))
# Should be cancelled
assert_that(f.cancelled(), str(f))
# Should have only submitted once
inner.submit.assert_called_once_with(never_called)
# Calling cancel yet again should be harmless
assert_that(f.cancel(), str(f))
def test_order(executor):
# Using a policy with a bigger sleep just for this test,
# as higher sleep time is needed for non-flaky results.
policy = ExceptionRetryPolicy(
max_attempts=10,
exponent=2.0,
sleep=0.040,
max_sleep=0.400,
exception_base=Exception,
)
f1_attempt = [0]
f2_attempt = [0]
calls = []
futures = []
def f2():
attempt = f2_attempt[0] + 1
f2_attempt[0] = attempt
calls.append("f2 %s" % attempt)
if attempt < 6:
raise ValueError("Simulated error %s" % attempt)
return "f2 success"
def f1():
attempt = f1_attempt[0] + 1
f1_attempt[0] = attempt
calls.append("f1 %s" % attempt)
if attempt == 3:
futures.append(executor.submit_retry(policy, f2))
if attempt < 6:
raise ValueError("Simulated error %s" % attempt)
return "f1 success"
futures.append(executor.submit_retry(policy, f1))
# Both should eventually succeed
assert_that(futures[0].result(), equal_to("f1 success"))
assert_that(futures[1].result(), equal_to("f2 success"))
assert_that(
calls,
equal_to(
[
# The calls should occur in this order.
# Comments list the number of time units expected until
# a function's next call.
"f1 1", # f1: +1
"f1 2", # f1: +2
"f1 3", # f1: +4, f2: +0
"f2 1", # f1: +4, f2: +1
"f2 2", # f1: +3, f2: +2
"f2 3", # f1: +1, f2: +4
"f1 4", # f1: +8, f2: +3
"f2 4", # f1: +5, f2: +8
# note: f1 hits max sleep time after next attempt
"f1 5", # f1: +10, f2: +3
"f2 5", # f1: +7, f2: +10
"f1 6", # f1: fin, f2: +3
"f2 6", # f1: fin, f2: fin
]
),
)
def test_override_policy(executor):
"""Should be able to provide a custom policy by subclassing RetryPolicy."""
class SubRetryPolicy(RetryPolicy):
def should_retry(self, attempt, future):
return future.result() < 3
fn = MagicMock()
fn.side_effect = [1, 2, 3, AssertionError("unexpected call")]
policy = SubRetryPolicy()
result = executor.submit_retry(policy, fn).result()
assert_that(result, equal_to(3))
assert_that(fn.call_count, equal_to(3))
def test_only_retry_exception_type(executor):
policy = ExceptionRetryPolicy(
exception_base=[RuntimeError, ValueError, ArithmeticError],
max_attempts=10,
max_sleep=0.01,
)
errors = [ValueError("error 1"), RuntimeError("error 2"), Exception("error 3")]
fn = MagicMock()
fn.side_effect = errors
future = executor.submit_retry(policy, fn)
# It should have retried on the first two due to exception match, then failed
assert_that(future.exception(), is_(errors[-1]))
assert_that(fn.call_count, equal_to(len(errors)))
def test_cancel_stops_retry(executor, max_attempts):
calls = [0, 0]
future = []
cancelled = [None, None]
error = RuntimeError("Simulated error")
def fn(idx, do_cancel):
calls[idx] += 1
if calls[idx] == 2 and do_cancel:
cancelled[idx] = future[idx].cancel()
raise error
future.append(executor.submit(fn, idx=0, do_cancel=False))
future.append(executor.submit(fn, idx=1, do_cancel=True))
# Both futures should fail
assert future[0].exception() is error
assert future[1].exception() is error
# future0 didn't cancel, so it should have run as many
# times as permitted
assert cancelled[0] is None
assert calls[0] == max_attempts
# future1 had cancel() called.
# The call to cancel() failed, but it stopped any further retries.
assert cancelled[1] is False
assert calls[1] == 2
def test_cancel_stops_retry_race():
"""cancel will stop retries even if called during critical section of callback.
See issue #150.
"""
in_should_retry = Event()
can_leave_should_retry = Event()
class TestRetryPolicy(RetryPolicy):
def should_retry(self, attempt, future):
in_should_retry.set()
can_leave_should_retry.wait(timeout=5.0)
return True
executor = RetryExecutor(ThreadPoolExecutor(), retry_policy=TestRetryPolicy())
calls = [0]
error = RuntimeError("Simulated error")
def fn():
calls[0] += 1
raise error
# Start the callable
future = executor.submit(fn)
# It'll fail; wait until we're in the "critical section" of the failure
# callback, where it's checking if job should be retried
in_should_retry.wait(timeout=5.0)
# Now it's in should_retry().
# Cancel it:
cancelled = future.cancel()
# The cancel result should be False since the future was in progress
assert not cancelled
# Now let should_retry() proceed
can_leave_should_retry.set()
# The future should fail
assert future.exception(timeout=5.0) is error
# And it should have called the function exactly once
assert calls[0] == 1