blob: 39a7e26401e3f6bff9ca9db1f2ee1803b02500b5 [file] [log] [blame]
import sys
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from .wrap import CanCustomizeBind
from .metrics import metrics, track_future
from .helpers import ShutdownHelper
class CustomizableThreadPoolExecutor(CanCustomizeBind, ThreadPoolExecutor):
def __init__(self, *args, **kwargs):
self.__name = kwargs.pop("name", "default")
self.__shutdown = ShutdownHelper()
if (
sys.version_info >= (3, 6)
and len(args) < 2
and "thread_name_prefix" not in kwargs
and self.__name != "default"
):
kwargs["thread_name_prefix"] = "ThreadPoolExecutor-%s" % self.__name
super(CustomizableThreadPoolExecutor, self).__init__(*args, **kwargs)
metrics.EXEC_TOTAL.labels(type="threadpool", executor=self.__name).inc()
metrics.EXEC_INPROGRESS.labels(type="threadpool", executor=self.__name).inc()
def shutdown(self, *args, **kwargs):
if self.__shutdown():
metrics.EXEC_INPROGRESS.labels(
type="threadpool", executor=self.__name
).dec()
super(CustomizableThreadPoolExecutor, self).shutdown(*args, **kwargs)
def submit(self, *args, **kwargs):
with self.__shutdown.ensure_alive():
f = super(CustomizableThreadPoolExecutor, self).submit(*args, **kwargs)
track_future(f, type="threadpool", executor=self.__name)
return f
class CustomizableProcessPoolExecutor(CanCustomizeBind, ProcessPoolExecutor):
def __init__(self, *args, **kwargs):
kwargs.pop("name", None)
super(CustomizableProcessPoolExecutor, self).__init__(*args, **kwargs)