blob: 309db2bd22081dac92e4e9daad279974eda4a9ab [file] [log] [blame]
from pytest import fixture
from hamcrest import assert_that, equal_to, instance_of, calling, raises
from more_executors import Executors, FlatMapExecutor
@fixture
def executor():
return Executors.thread_pool()
def add1(x):
return x + 1
def mult10(x):
return x * 10
def div10by(x):
return 10 / x
def test_basic_flat_map(executor):
flat_map_executor = FlatMapExecutor(executor, lambda x: executor.submit(mult10, x))
inputs = [1, 2, 3]
expected_result = [20, 40, 60]
futures = [flat_map_executor.submit(lambda x: x * 2, x) for x in inputs]
results = [f.result() for f in futures]
assert_that(results, equal_to(expected_result))
def test_flat_map_exception(executor):
flat_map_executor = FlatMapExecutor(executor, lambda x: executor.submit(div10by, x))
inputs = [1, 2, 0]
futures = [flat_map_executor.submit(lambda v: v, x) for x in inputs]
# First two should succeed and give the mapped value
assert_that(futures[0].result(), equal_to(10))
assert_that(futures[1].result(), equal_to(5))
# The third should have crashed
assert_that(futures[2].exception(), instance_of(ZeroDivisionError))
assert_that(calling(futures[2].result), raises(ZeroDivisionError))
def test_flat_map_nofuture(executor):
"""Executor raises TypeError if map function does not produce a Future."""
flat_map_executor = FlatMapExecutor(
executor, lambda x: executor.submit(mult10, x) if x == 2 else mult10(x)
)
inputs = [1, 2, 3]
futures = [flat_map_executor.submit(lambda v: v, x) for x in inputs]
# The second future should have returned the mapped value
assert_that(futures[1].result(), equal_to(20))
# The others should have failed since returned value is not a Future
assert_that(futures[0].exception(), instance_of(TypeError))
assert_that(calling(futures[2].result), raises(TypeError))
def test_chained_flat_map(executor):
"""Chaining multiple flatmaps pass through values as expected."""
flat_map_executor = executor.with_flat_map(
lambda x: executor.submit(mult10, x)
).with_flat_map(lambda x: executor.submit(add1, x))
inputs = [1, 2]
futures = [flat_map_executor.submit(lambda v: v, x) for x in inputs]
# It should have produced the expected values
assert_that(futures[0].result(), equal_to(11))
assert_that(futures[1].result(), equal_to(21))
def test_chained_flat_map_exception(executor):
"""Executor propagates innermost exception in the case of chained flatmaps."""
my_exception = RuntimeError("testing")
fn2_called = []
def fn1(x):
raise my_exception
def fn2(x):
fn2_called.append(0)
raise AssertionError("Can't get here!") # pragma: no cover
flat_map_executor = executor.with_flat_map(fn1).with_flat_map(fn2)
inputs = [1, 2]
futures = [flat_map_executor.submit(lambda v: v, x) for x in inputs]
# The innermost exception should have been raised (exactly)
assert_that(futures[0].exception() is my_exception)
assert_that(futures[1].exception() is my_exception)
# The outermost function should never have been called
assert_that(fn2_called, equal_to([]))