"""Executor wrapper that intercepts ``submit`` for fleche-decorated functions.
Motivation: users passing a :func:`fleche.fleche`-decorated function to
``executor.submit(...)`` have to remember to call :meth:`.BoundWrapper.bind`
themselves to carry the active cache/metadata state into the worker. They also
pay the submit/serialisation cost even when the result is already cached.
:func:`wrap_executor` patches the ``submit`` method of an executor *instance*
(we cannot subclass, since callers pass us instances of third-party executors)
so that:
* non-fleche callables are forwarded unchanged,
* fleche callables whose result is already cached are returned via an
already-completed :class:`~concurrent.futures.Future` without touching the
executor, and
* otherwise the call is bound via :meth:`.BoundWrapper.bind` and submitted to
the original ``submit``.
Executors that declare their own keyword-only parameters on ``submit``
(e.g. ``resources=`` or ``resource_dict=``) have those split off from the
caller's ``**kwargs`` and forwarded to the underlying ``submit`` while the
remaining keyword arguments are bound as part of the function payload.
"""
from concurrent.futures import Future
from inspect import signature, Parameter
from types import SimpleNamespace
__all__ = ["wrap_executor"]
def _is_fleche_function(func) -> bool:
return isinstance(getattr(func, "fleche", None), SimpleNamespace)
def _split_submit_kwargs(submit_method, kwargs):
"""Partition ``kwargs`` into (executor-reserved, function-payload).
Any keyword-only parameter declared on ``submit_method``'s signature is
reserved for the executor; everything else (including arguments matched by
a ``**kwargs`` catch-all) is forwarded to the callable.
``concurrent.futures.Executor.submit`` has no keyword-only parameters, so
for stdlib executors this is a pure pass-through.
"""
try:
sig = signature(submit_method)
except (TypeError, ValueError):
return {}, dict(kwargs)
submit_only = {
name for name, p in sig.parameters.items()
if p.kind is Parameter.KEYWORD_ONLY
}
for_submit, for_func = {}, {}
for k, v in kwargs.items():
if k in submit_only:
for_submit[k] = v
else:
for_func[k] = v
return for_submit, for_func
[docs]
def wrap_executor(executor):
"""Monkey-patch ``executor.submit`` to intercept fleche-wrapped functions.
Calling :func:`wrap_executor` on an executor that is already wrapped is a
no-op: the patch is not stacked, and the original ``submit`` continues to
refer to the pre-wrap method.
Args:
executor: any object with a ``submit(func, *args, **kwargs)`` method
(e.g. :class:`concurrent.futures.Executor` subclass instances).
Returns:
The same ``executor`` instance, with a replaced ``submit`` attribute.
"""
original_submit = executor.submit
if getattr(original_submit, "_fleche_wrapped", False):
return executor
def submit(func, *args, **kwargs):
if not _is_fleche_function(func):
return original_submit(func, *args, **kwargs)
submit_kwargs, func_kwargs = _split_submit_kwargs(
original_submit, kwargs
)
if func.fleche.contains(*args, **func_kwargs):
fut: Future = Future()
fut.set_result(func.fleche.load(*args, **func_kwargs))
return fut
bound = func.fleche.bind(*args, **func_kwargs)
return original_submit(bound, **submit_kwargs)
submit._fleche_wrapped = True # type: ignore
executor.submit = submit
return executor