blob: 2298ad8d2c12e0c6965c5746d786751e64e2e536 [file] [log] [blame]
from functools import partial
from pytest import fixture
from more_executors import Executors
from more_executors._impl.event import GLOBAL_HANDLER
from .util import assert_soon, thread_names, assert_no_extra_threads
def poll_noop(descriptors):
for descriptor in descriptors:
descriptor.yield_result(descriptor.result)
@fixture(autouse=True)
def reset_event_handler():
yield
GLOBAL_HANDLER.shutdown = False
@fixture
def ctor_sync():
return Executors.sync
@fixture
def ctor_thread_pool():
return Executors.thread_pool
@fixture
def ctor_with_retry():
return Executors.sync().with_retry
@fixture
def ctor_with_poll():
def fn():
return Executors.sync().with_poll(poll_noop)
return fn
@fixture
def ctor_with_throttle():
def fn():
return Executors.sync().with_throttle(2)
return fn
@fixture
def ctor_with_timeout():
def fn():
return Executors.sync().with_timeout(30.0)
return fn
@fixture
def ctor_with_poll_throttle():
def fn():
return Executors.sync().with_poll(poll_noop).with_throttle(2)
return fn
EXECUTORS_WITH_WORKER_THREAD = [
"with_retry",
"with_poll",
"with_throttle",
"with_poll_throttle",
"with_timeout",
]
@fixture(params=["sync", "thread_pool"] + EXECUTORS_WITH_WORKER_THREAD)
def executor_ctor(request):
return request.getfixturevalue("ctor_" + request.param)
@fixture(params=EXECUTORS_WITH_WORKER_THREAD)
def executor_with_worker_thread_ctor(request):
return request.getfixturevalue("ctor_" + request.param)
def test_no_leak_on_noop(executor_ctor):
no_extra_threads = partial(assert_no_extra_threads, thread_names())
executor = executor_ctor()
del executor
assert_soon(no_extra_threads)
def mult2(x):
return x * 2
def test_no_leak_on_discarded_futures(executor_ctor):
no_extra_threads = partial(assert_no_extra_threads, thread_names())
executor = executor_ctor()
futures = [executor.submit(mult2, n) for n in range(0, 1000)]
del executor
del futures
assert_soon(no_extra_threads)
def get_future_results(futures):
return [f.result() for f in futures]
def test_no_leak_on_completed_futures(executor_ctor):
no_extra_threads = partial(assert_no_extra_threads, thread_names())
executor = executor_ctor()
results = [executor.submit(mult2, n) for n in range(0, 1000)]
results = get_future_results(results)
del executor
assert_soon(no_extra_threads)
def test_no_leak_on_completed_held_futures(executor_ctor):
no_extra_threads = partial(assert_no_extra_threads, thread_names())
executor = executor_ctor()
futures = [executor.submit(mult2, n) for n in range(0, 1000)]
get_future_results(futures)
del executor
assert_soon(no_extra_threads)
def test_no_leak_after_exit(executor_with_worker_thread_ctor):
no_extra_threads = partial(assert_no_extra_threads, thread_names())
executor = executor_with_worker_thread_ctor()
futures = [executor.submit(mult2, n) for n in range(0, 1000)]
# Let at least one of them complete to guarantee that a thread has started
# before we proceed to on_exiting().
futures[0].result()
GLOBAL_HANDLER.on_exiting()
assert_soon(no_extra_threads)