blob: 44f2c7bf67f4200cf5c19aae575219bbb3c81bff [file] [log] [blame]
import time
import pytest
from more_executors import Executors, f_return, f_and, f_nocancel
from .bool_utils import (
falsey,
truthy,
as_future,
assert_future_equal,
resolve_inputs,
resolve_value,
)
from ..util import assert_in_traceback
cases = [
[(falsey,), falsey],
[(truthy,), truthy],
[(falsey, falsey), falsey],
[(falsey, truthy), falsey],
[(truthy, falsey), falsey],
[(truthy, truthy), truthy],
[(falsey, falsey, falsey), falsey],
[(falsey, falsey, truthy), falsey],
[(falsey, truthy, falsey), falsey],
[(falsey, truthy, truthy), falsey],
[(truthy, falsey, falsey), falsey],
[(truthy, falsey, truthy), falsey],
[(truthy, truthy, falsey), falsey],
[(truthy, truthy, truthy), truthy],
[(truthy, truthy, falsey, truthy), falsey],
]
@pytest.mark.parametrize("inputs, expected_result", cases)
def test_and(inputs, expected_result, falsey, truthy):
inputs = resolve_inputs(inputs, falsey, truthy)
expected_result = resolve_value(expected_result, falsey, truthy)
f_inputs = [as_future(x) for x in inputs]
future = f_and(*f_inputs)
assert_future_equal(future, expected_result)
def test_and_order_sync():
f_inputs = [as_future(x) for x in [1, 2, 3]]
future = f_and(*f_inputs)
assert_future_equal(future, 3)
def test_and_order_async():
executor = Executors.thread_pool(max_workers=2)
def delay_then(delay, value):
time.sleep(delay)
return value
f_inputs = [
executor.submit(delay_then, 0.1, 123),
executor.submit(delay_then, 0.05, 456),
]
future = f_and(*f_inputs)
assert_future_equal(future, 123)
def test_and_cancels():
calls = set()
error = RuntimeError("simulated error")
def delayed_call(delay):
if delay is error:
raise error
time.sleep(delay)
calls.add(delay)
return delay
executor = Executors.thread_pool(max_workers=2)
with executor:
futures = [
executor.submit(delayed_call, x) for x in (0.5, error, 0.2, 2.0, 3.0)
]
future = f_and(*futures)
exception = future.exception()
# error should have been propagated
assert exception is error
# 0.5 definitely should have been invoked
assert 0.5 in calls
# up to 1 more of the calls might have succeeded,
# but we can't say which.
# Why: because the cancels race with the thread pool,
# once 'error' completes then f_and and the thread pool
# are both trying to grab the next futures ASAP,
# and either cancel or run could win.
assert len(calls) in (1, 2)
# This could not have been cancelled since it was
# submitted earlier than the terminal
assert futures[0].done()
# The future which terminated the and()
assert futures[1].done()
# at least 2 of the remaining futures should have been cancelled
assert len([f for f in futures if f.cancelled()]) >= 2
def test_and_with_nocancel():
calls = set()
error = RuntimeError("simulated error")
def delayed_call(delay):
if delay is error:
raise error
time.sleep(delay)
calls.add(delay)
return delay
executor = Executors.thread_pool(max_workers=2)
futures = [executor.submit(delayed_call, x) for x in (0.5, error, 0.2, 1.1, 1.2)]
futures = [f_nocancel(f) for f in futures]
future = f_and(*futures)
exception = future.exception()
# error should have been propagated
assert exception is error
# nothing else has been called yet
assert not calls
# but if we wait on the last future...
assert futures[-1].result() == 1.2
# then all calls were made thanks to nocancel
assert calls == set([0.5, 0.2, 1.1, 1.2])
def test_and_propagate_traceback():
def inner_test_fn():
raise RuntimeError("oops")
def my_test_fn(inner_fn=None):
if inner_fn:
inner_fn()
return True
executor = Executors.thread_pool()
futures = [
executor.submit(my_test_fn),
executor.submit(my_test_fn),
executor.submit(my_test_fn, inner_fn=inner_test_fn),
executor.submit(my_test_fn),
]
future = f_and(*futures)
assert_in_traceback(future, "inner_test_fn")
def test_and_large():
inputs = [f_return(True) for _ in range(0, 100000)]
assert f_and(*inputs).result() is True