blob: 276d07d435593c3d4a3649cd8c53bc50b3bc1672 [file] [log] [blame]
from concurrent.futures import ThreadPoolExecutor
import traceback
from pytest import fixture
from hamcrest import assert_that, equal_to, instance_of, calling, raises
from more_executors import MapExecutor
@fixture
def executor():
return ThreadPoolExecutor()
def div(x, y):
return x / y
def div10(x):
return div(10, x)
def test_basic_map(executor):
map_executor = MapExecutor(executor, lambda x: x * 10)
inputs = [1, 2, 3]
expected_result = [20, 40, 60]
futures = [map_executor.submit(lambda x: x * 2, x) for x in inputs]
results = [f.result() for f in futures]
assert_that(results, equal_to(expected_result))
def get_traceback(future):
exception = future.exception()
if "__traceback__" in dir(exception):
return exception.__traceback__
return future.exception_info()[1]
def test_map_error_fn(executor):
map_executor = MapExecutor(executor, error_fn=lambda ex: "<error: %s>" % ex)
inputs = [1, 2, 0]
futures = [map_executor.submit(lambda x: 10 / x, x) for x in inputs]
# All futures should succeed, the last having been made successful
# via the error_fn
assert_that(futures[0].result(), equal_to(10))
assert_that(futures[1].result(), equal_to(5))
fail_result = futures[2].result()
assert fail_result.startswith("<error: ")
assert "division" in fail_result
def test_map_error_fn_raised(executor):
error = ValueError("I don't like the number 4!")
def mult2(x):
if x == 4:
raise error
return x * 2
def raise_(ex):
raise RuntimeError("Oops, some error! %s" % ex)
map_executor = MapExecutor(executor, error_fn=raise_)
inputs = [1, 2, 4]
futures = [map_executor.submit(mult2, x) for x in inputs]
# First two should succeed
assert_that(futures[0].result(), equal_to(2))
assert_that(futures[1].result(), equal_to(4))
# Last should fail and should have the error raised from error_fn
assert "Oops, some error" in str(futures[2].exception())
def test_map_error_fn_propagate(executor):
error = ValueError("I don't like the number 4!")
def mult2(x):
if x == 4:
raise error
return x * 2
def raise_(ex):
raise ex
map_executor = MapExecutor(executor, error_fn=raise_)
inputs = [1, 2, 4]
futures = [map_executor.submit(mult2, x) for x in inputs]
# First two should succeed
assert_that(futures[0].result(), equal_to(2))
assert_that(futures[1].result(), equal_to(4))
# Last should fail, and should have *exactly* the raised error passed through
assert futures[2].exception() is error
def test_map_exception(executor):
map_executor = MapExecutor(executor, div10)
inputs = [1, 2, 0]
futures = [map_executor.submit(lambda v: v, x) for x in inputs]
# First two should succeed and give the mapped value
assert_that(futures[0].result(), equal_to(10))
assert_that(futures[1].result(), equal_to(5))
# The third should have crashed
assert_that(futures[2].exception(), instance_of(ZeroDivisionError))
assert_that(calling(futures[2].result), raises(ZeroDivisionError))
# It should give an accurate traceback
tb = get_traceback(futures[2])
formatted = traceback.format_tb(tb)
assert "div10" in "".join(formatted)
def test_exception_propagate(executor):
map_executor = MapExecutor(executor, lambda x: x)
f = map_executor.submit(div10, 0.0)
# It should have failed
assert_that(f.exception(), instance_of(ZeroDivisionError))
# It should give an accurate traceback
tb = get_traceback(f)
formatted = traceback.format_tb(tb)
assert "div10" in "".join(formatted)