Parallel Execution
==================
Fleche-decorated functions work with Python's ``concurrent.futures`` executors
and other process-/thread-based parallelism libraries. This page explains the
recommended patterns for each executor type.
Three complementary APIs cover the common cases:
- :class:`~fleche.BoundWrapper` — freeze the active cache and metadata into a
picklable callable that a worker can invoke without any further setup. Use
when you own the submission site and want explicit control.
- **Future pass-through** — a fleche-decorated function that internally
returns a :class:`~concurrent.futures.Future` is cached automatically when
the future completes. Use when the decorated function *is* the one that
submits work.
- :func:`~fleche.wrap_executor` — a one-line wrapper around an executor
instance that transparently binds fleche-decorated functions on ``submit``
**and** short-circuits cache hits without ever touching the executor. Use
when you want the most compact call site, or to avoid submit/serialise
overhead on cached inputs.
.. contents:: On this page
:local:
:depth: 2
ThreadPoolExecutor
------------------
For the most compact call site, use :func:`~fleche.wrap_executor` — it patches
the executor's ``submit`` once and then handles any fleche-decorated function
automatically, serving cache hits from a pre-completed
:class:`~concurrent.futures.Future` without ever submitting a task:
.. code-block:: python
import concurrent.futures
import fleche
from fleche.caches import Cache
from fleche.storage.memory import ValueMemory, CallMemory
@fleche.fleche
def compute(x):
return x ** 2
my_cache = Cache(ValueMemory({}), CallMemory({}))
with fleche.cache(my_cache):
with concurrent.futures.ThreadPoolExecutor() as executor:
fleche.wrap_executor(executor)
futures = [executor.submit(compute, i) for i in range(4)]
results = [f.result() for f in futures] # all cached in my_cache
Alternatively, use :class:`~fleche.BoundWrapper` when you need explicit
control over which callable is submitted — for example, to share a single
bound callable across multiple pools or helper modules.
``BoundWrapper.bind(func)`` captures the active cache at bind time and
restores it on every call — including inside worker threads — so all tasks
write to the same store:
.. code-block:: python
import concurrent.futures
import fleche
from fleche import BoundWrapper
from fleche.caches import Cache
from fleche.storage.memory import ValueMemory, CallMemory
@fleche.fleche
def compute(x):
return x ** 2
my_cache = Cache(ValueMemory({}), CallMemory({}))
with fleche.cache(my_cache):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(BoundWrapper.bind(compute), i) for i in range(4)]
results = [f.result() for f in futures] # all cached in my_cache
Passing a bound function around
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Because :class:`~fleche.BoundWrapper` is a regular picklable callable, you can
bind it once and pass it anywhere — to multiple executor pools, helper modules,
or across function calls. The captured state travels with the callable:
.. code-block:: python
with fleche.cache(my_cache):
bound_compute = BoundWrapper.bind(compute) # bind once
# bound_compute carries my_cache — no cache context manager needed at the call site
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(bound_compute, i) for i in range(4)]
results = [f.result() for f in futures]
ProcessPoolExecutor
-------------------
Worker processes are independent OS processes with their own Python
interpreter. In-memory caches are **not shared** across processes — a result
computed in a worker is stored in the worker's own ephemeral cache and is lost
when the worker exits.
To share cached results, point both the parent and workers at the same
**file-** or **SQL-backed** storage. ``BoundWrapper.bind(func)`` embeds the
cache configuration into the callable so no manual worker-wrapper function is
needed:
.. code-block:: python
import concurrent.futures
import tempfile, os
import fleche
from fleche import BoundWrapper
from fleche.caches import Cache
from fleche.storage.pickle_file import ValuePickleFile, CallPickleFile
@fleche.fleche
def heavy_computation(x):
return x ** 3
with tempfile.TemporaryDirectory() as tmpdir:
file_cache = Cache(
ValuePickleFile.with_pickle(root=os.path.join(tmpdir, "values")),
CallPickleFile.with_pickle(root=os.path.join(tmpdir, "calls")),
)
with fleche.cache(file_cache):
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(BoundWrapper.bind(heavy_computation), i)
for i in range(8)]
results = [f.result() for f in futures]
assert file_cache.contains(heavy_computation.digest(3))
Fleche's file-based storage uses lock files to coordinate concurrent writes, so
multiple workers can safely write to the same directory.
.. note::
With the standard-library ``ProcessPoolExecutor`` (which uses ``pickle``),
fleche-decorated functions must be defined at **module level**. Lambdas and
locally-defined functions will raise a ``PicklingError``.
Third-party executor libraries that use ``dill`` or ``cloudpickle`` for
serialisation — such as `executorlib `_
or `parsl `_ — do not have this restriction and
can submit locally-defined functions directly.
BoundWrapper — Freezing State for Workers
-----------------------------------------
:class:`fleche.BoundWrapper` captures the current cache and metadata into
a picklable callable, so workers automatically use the correct state without
manual setup.
This also works for **plain functions that call fleche-decorated functions
internally**: the bound state propagates to all nested fleche calls.
.. code-block:: python
@fleche.fleche
def step_a(x):
return x + 1
def pipeline(x):
"""Plain function that uses fleche-decorated helpers."""
return step_a(x) * 3
with fleche.cache(file_cache):
bound_pipeline = BoundWrapper.bind(pipeline)
# In a worker process, step_a will use file_cache
with concurrent.futures.ProcessPoolExecutor() as executor:
future = executor.submit(bound_pipeline, 10)
assert future.result() == 33 # (10 + 1) * 3
executorlib
-----------
`executorlib `_ provides executors that
follow the same ``concurrent.futures`` interface but manage HPC resources
(SLURM, MPI, etc.). Because ``executorlib.SingleNodeExecutor`` spawns worker
processes, the same rules as ``ProcessPoolExecutor`` apply:
- In-memory caches are **not** shared with workers.
- Use **file-** or **SQL-backed** storage with :class:`~fleche.BoundWrapper`.
.. code-block:: python
# file_cache and heavy_computation as set up in the ProcessPoolExecutor example above
import fleche
from fleche import BoundWrapper
from executorlib import SingleNodeExecutor
with fleche.cache(file_cache):
with SingleNodeExecutor() as executor:
futures = [executor.submit(BoundWrapper.bind(heavy_computation), i)
for i in range(4)]
results = [f.result() for f in futures]
Future Pass-Through — Caching Async-style Results
--------------------------------------------------
As an alternative to :class:`~fleche.BoundWrapper`, you can write a
fleche-decorated function that itself submits work to an executor and returns
the resulting :class:`~concurrent.futures.Future`. Fleche detects the
``Future`` return value, passes it through to the caller unchanged, and
automatically caches the result once the future completes:
.. code-block:: python
import concurrent.futures
import fleche
from fleche.caches import Cache
from fleche.storage.memory import ValueMemory, CallMemory
_executor = concurrent.futures.ThreadPoolExecutor()
@fleche.fleche
def compute(x):
"""Submits work and returns a Future — fleche caches on completion."""
return _executor.submit(lambda: x ** 2)
my_cache = Cache(ValueMemory({}), CallMemory({}))
with fleche.cache(my_cache):
future = compute(4) # returns the Future immediately
result = future.result() # 16, and the result is now cached
On the next call, ``compute(4)`` returns the cached value directly (no
``Future`` is created).
.. note::
When a decorated function with future pass-through is called and a cached
value already exists, the function is **not** invoked and no ``Future`` is
created — the cached result is returned directly.
``wrap_executor`` — Transparent Submit Interception
---------------------------------------------------
:func:`fleche.wrap_executor` monkey-patches ``executor.submit`` on an existing
executor **instance** (no subclassing — callers pass us instances of
third-party executors) so that fleche-decorated functions are handled
automatically:
- Non-fleche callables pass straight through to the original ``submit``.
- Cache hits are served from an already-completed
:class:`~concurrent.futures.Future` without ever touching the executor,
avoiding submit/serialise overhead on cached inputs.
- Cache misses are bound via :meth:`~fleche.BoundWrapper.bind` so the parent's
cache and metadata travel with the call into the worker.
.. code-block:: python
import concurrent.futures, tempfile, os
import fleche
from fleche.caches import Cache
from fleche.storage.pickle_file import ValuePickleFile, CallPickleFile
@fleche.fleche
def heavy_computation(x):
return x ** 3
with tempfile.TemporaryDirectory() as tmpdir:
file_cache = Cache(
ValuePickleFile.with_pickle(root=os.path.join(tmpdir, "values")),
CallPickleFile.with_pickle(root=os.path.join(tmpdir, "calls")),
)
with fleche.cache(file_cache):
with concurrent.futures.ProcessPoolExecutor() as executor:
fleche.wrap_executor(executor) # patch once
# Submit exactly as you would a plain function.
futures = [executor.submit(heavy_computation, i) for i in range(8)]
results = [f.result() for f in futures]
# A second round with the same inputs never enters the worker pool:
with concurrent.futures.ProcessPoolExecutor() as executor:
fleche.wrap_executor(executor)
fut = executor.submit(heavy_computation, 3)
assert fut.done() # already-completed future
assert fut.result() == 27
Executor-specific keyword arguments
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Some executors (e.g. `executorlib
`_'s ``resource_dict``, dask's
``resources=``/``key=``) define their own keyword-only parameters on
``submit``. :func:`~fleche.wrap_executor` introspects the signature with
:func:`inspect.signature` and splits those off automatically — they are
forwarded to ``submit`` while the rest are bound as part of the callable's
payload:
.. code-block:: python
# file_cache and heavy_computation as set up in the wrap_executor example above
from executorlib import SingleNodeExecutor
with fleche.cache(file_cache):
with SingleNodeExecutor() as executor:
fleche.wrap_executor(executor)
future = executor.submit(
heavy_computation, 5,
resource_dict={"cores": 4}, # goes to SingleNodeExecutor.submit
)
assert future.result() == 125
.. note::
:func:`~fleche.wrap_executor` mutates the instance it is given; it does
not return a proxy. The call is idempotent: wrapping an already-wrapped
executor a second time is a no-op.
When to prefer each pattern
---------------------------
.. list-table::
:header-rows: 1
:widths: 22 26 26 26
* -
- :class:`~fleche.BoundWrapper`
- Future pass-through
- :func:`~fleche.wrap_executor`
* - Who owns the executor?
- Caller
- The decorated function
- Caller
* - Call-site change
- Wrap callable: ``executor.submit(BoundWrapper.bind(func), ...)``
- None (decorated function returns a ``Future``)
- Patch executor once, then ``executor.submit(func, ...)``
* - Cache hit behaviour
- Always submits; worker does the cache lookup
- Returns cached result directly (no ``Future`` created)
- Returns an already-completed ``Future`` without contacting the worker
* - Works across processes
- Yes, with file/SQL storage
- Yes, if the wrapping function owns the executor
- Yes, with file/SQL storage
* - Best for
- Fine-grained control; sharing a bound callable across modules
- Wrapping expensive async-style code inside a decorated helper
- Transparently caching existing executor-based pipelines
The ``isolate`` Flag
--------------------
The ``@fleche(isolate=True)`` option creates a temporary working directory for
each function invocation to prevent file-system conflicts when the wrapped
function writes to the current directory. This is useful for parallel
execution of functions that produce side-effect files.
.. warning::
``os.chdir()`` is **process-wide and not thread-safe**. The ``isolate``
flag should only be combined with process-based executors, not with
``ThreadPoolExecutor``.
Quick Reference
---------------
.. list-table::
:header-rows: 1
:widths: 30 15 25 30
* - Executor
- Works?
- Context inherited?
- Recommended pattern
* - ``ThreadPoolExecutor``
- Yes
- No (manual)
- :func:`~fleche.wrap_executor` (simplest), or ``BoundWrapper.bind(func)``,
or future pass-through
* - ``ProcessPoolExecutor``
- Yes
- No (separate process)
- File/SQL storage + :func:`~fleche.wrap_executor` (or ``BoundWrapper``)
* - ``executorlib.SingleNodeExecutor``
- Yes
- No (separate process)
- File/SQL storage + :func:`~fleche.wrap_executor` (or ``BoundWrapper``)
* - Function returns ``Future`` directly
- Yes
- Yes (same thread/context)
- Future pass-through (function returns its own ``Future``)
Known Limitations
-----------------
Cache Stampede (Thundering Herd)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Fleche does **not** protect against cache stampedes. When multiple workers
(threads or processes) call the same fleche-decorated function with identical
arguments at the same time and the result is not yet cached, all of them will
experience a cache miss simultaneously. Each worker then independently
computes the function and writes the result — wasting redundant work and
potentially producing inconsistent call metadata (e.g. ``Runtime`` values will
differ between the duplicate records).
The root cause is that there is no reservation mechanism between the cache
miss check and the cache write in ``wrapper.py``. A correct fix would require
pre-allocating a "pending" slot in the cache so that other workers can detect
the in-flight computation and wait, rather than starting their own.
Implementing this correctly is non-trivial: it requires key-scoped locking,
timeout handling for stale allocations (e.g. if the computing worker crashes),
and cross-process coordination for file- and SQL-backed stores. This is
currently out of scope.
.. warning::
If you submit many workers with the same arguments before the first result
is cached, all workers will compute the function. The last writer wins for
the stored value, but **no data is lost or corrupted** — subsequent calls
will hit the cache normally.
**Practical workarounds:**
- Pre-warm the cache sequentially before launching parallel workers.
- Partition work so each worker handles a disjoint set of arguments.