blob: c61894d8dc2f390b008ba8467347403f8f0bcc47 [file] [log] [blame] [edit]
import time
import logging
import weakref
import gc
import pytest
from more_executors import Executors, f_or, f_nocancel, f_return
from .bool_utils import (
falsey,
truthy,
as_future,
assert_future_equal,
resolve_inputs,
resolve_value,
)
from ..util import assert_in_traceback
LOG = logging.getLogger("test_or")
cases = [
[(falsey,), falsey],
[(truthy,), truthy],
[(falsey, falsey), falsey],
[(falsey, truthy), truthy],
[(truthy, falsey), truthy],
[(truthy, truthy), truthy],
[(falsey, falsey, falsey), falsey],
[(falsey, falsey, truthy), truthy],
[(falsey, truthy, falsey), truthy],
[(falsey, truthy, truthy), truthy],
[(truthy, falsey, falsey), truthy],
[(truthy, falsey, truthy), truthy],
[(truthy, truthy, falsey), truthy],
[(truthy, truthy, truthy), truthy],
]
@pytest.mark.parametrize("inputs, expected_result", cases)
def test_or(inputs, expected_result, falsey, truthy):
inputs = resolve_inputs(inputs, falsey, truthy)
expected_result = resolve_value(expected_result, falsey, truthy)
f_inputs = [as_future(x) for x in inputs]
future = f_or(*f_inputs)
assert_future_equal(future, expected_result)
def test_or_order_sync(falsey):
f_inputs = [as_future(x) for x in [falsey, 123, 456, falsey]]
future = f_or(*f_inputs)
assert_future_equal(future, 123)
def test_or_order_async(falsey):
executor = Executors.thread_pool(max_workers=2)
def delay_then(delay, value):
time.sleep(delay)
return value
f_inputs = [
as_future(falsey),
executor.submit(delay_then, 0.1, 123),
executor.submit(delay_then, 0.05, 456),
as_future(falsey),
]
future = f_or(*f_inputs)
assert_future_equal(future, 456)
def test_or_cancels():
calls = set()
def delayed_call(delay):
time.sleep(delay)
calls.add(delay)
return delay
executor = Executors.thread_pool(max_workers=2)
with executor:
futures = [executor.submit(delayed_call, x) for x in (0.5, 0.1, 0.2, 2.0, 3.0)]
future = f_or(*futures)
result = future.result()
# Result comes from the future which completed first
assert result == 0.1
assert 0.1 in calls
# There could have been one more call, since f_or
# cancelling other futures races with threadpool trying
# to pick them up
assert len(calls) in (2, 3)
# This could not have been cancelled since it was
# submitted earlier than the winner
assert futures[0].done()
# The winner 0.1
assert futures[1].done()
# at least 2 of the remaining futures should have been cancelled
assert len([f for f in futures if f.cancelled()]) >= 2
def test_or_with_nocancel():
calls = set()
def delayed_call(delay):
time.sleep(delay)
calls.add(delay)
return delay
executor = Executors.thread_pool(max_workers=2)
futures = [executor.submit(delayed_call, x) for x in (0.5, 0.1, 0.2, 1.0, 1.1)]
futures = [f_nocancel(f) for f in futures]
future = f_or(*futures)
result = future.result()
# Result comes from the future which completed first
assert result == 0.1
# Only this calls should have completed so far
assert calls == set([0.1])
# But if we wait on this...
assert futures[-1].result() == 1.1
# Then they've all completed now, thanks to nocancel
assert calls == set([0.1, 0.2, 0.5, 1.0, 1.1])
def test_or_propagate_traceback():
def inner_test_fn():
raise RuntimeError("oops")
def my_test_fn(inner_fn):
inner_fn()
executor = Executors.thread_pool()
future = f_or(
executor.submit(my_test_fn),
executor.submit(my_test_fn),
executor.submit(my_test_fn, inner_test_fn),
)
assert_in_traceback(future, "inner_test_fn")
def test_or_gc():
"""Inputs to f_or can be garbage collected if only output is held."""
inputs = [f_return(True), f_return(False)]
weak_input = weakref.ref(inputs[0])
out = f_or(*inputs)
assert out.result() is True
# the input future is still live since inputs is still live
gc.collect()
assert weak_input()
# But if we wipe out inputs, then it should be collected
inputs[:] = []
gc.collect()
assert not weak_input()
def test_or_large():
inputs = [f_return(0) for _ in range(0, 100000)]
assert f_or(*inputs).result() == 0