Parallel Execution
==================
Fleche-decorated functions work with Python's ``concurrent.futures`` executors
and other process-/thread-based parallelism libraries. This page explains the
interaction between fleche's context-variable state and the different executor
models, and shows recommended patterns for each.
.. contents:: On this page
:local:
:depth: 2
Background: How Fleche Tracks State
------------------------------------
Fleche stores the active cache and metadata in Python
:mod:`contextvars` — specifically two ``ContextVar`` instances for the cache
and for metadata. Context managers like ``fleche.cache(...)`` and
``fleche.meta(...)`` set these variables for the current context.
This design is inherently **thread-safe**: each logical execution context gets
its own value. However, whether a new thread or process *inherits* the
parent's context depends on how it is spawned.
ThreadPoolExecutor
------------------
Threads spawned by :class:`concurrent.futures.ThreadPoolExecutor` do **not**
automatically inherit ``ContextVar`` values from the submitting thread. A
fleche-decorated function submitted to a thread pool will therefore see the
*default* cache rather than one set via ``with fleche.cache(...)``.
Propagating context explicitly
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Use :func:`contextvars.copy_context` to snapshot the current context and
``ctx.run()`` to execute the function inside that snapshot:
.. code-block:: python
import contextvars
import concurrent.futures
import fleche
from fleche.caches import Cache
from fleche.storage import Memory
@fleche.fleche
def compute(x):
return x ** 2
my_cache = Cache(Memory({}), Memory({}))
with fleche.cache(my_cache):
ctx = contextvars.copy_context()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(ctx.run, compute, 42)
result = future.result() # 1764, cached in my_cache
Results are written to ``my_cache`` because the worker thread runs inside the
copied context.
.. tip::
If you submit many tasks, create **one** context snapshot and reuse it for
every ``submit`` call — the snapshot is a lightweight, read-copy-on-write
object.
ProcessPoolExecutor
-------------------
Worker processes are independent OS processes with their own Python
interpreter. ``ContextVar`` values set in the parent are **never** visible in
workers — they always start from the default.
In-memory caches are therefore **not shared** across processes. A result
computed in a worker is stored in the worker's own ephemeral cache and is lost
when the worker exits.
Using file- or SQL-backed storage
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To share cached results between the parent and worker processes, point both
sides at the same **file-** or **SQL-backed** storage:
.. code-block:: python
import concurrent.futures
import fleche
from fleche.caches import Cache
from fleche.storage.pickle_file import PickleFile
@fleche.fleche
def heavy_computation(x):
return x ** 3
def worker(x, values_dir, calls_dir):
"""Each worker sets up a cache pointing at the shared directory."""
values = PickleFile.with_pickle(root=values_dir)
calls = PickleFile.with_pickle(root=calls_dir)
with fleche.cache(Cache(values, calls)):
return heavy_computation(x)
# Shared directories (could also be SQL connection strings)
values_dir = "/tmp/fleche_values"
calls_dir = "/tmp/fleche_calls"
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(worker, i, values_dir, calls_dir)
for i in range(8)]
results = [f.result() for f in futures]
# Parent can now read the cached results
parent_cache = Cache(
PickleFile.with_pickle(root=values_dir),
PickleFile.with_pickle(root=calls_dir),
)
assert parent_cache.contains(heavy_computation.digest(3))
Fleche's file-based storage uses lock files to coordinate concurrent writes, so
multiple workers can safely write to the same directory.
.. note::
With the standard-library ``ProcessPoolExecutor`` (which uses ``pickle``),
fleche-decorated functions must be defined at **module level**. Lambdas and
locally-defined functions will raise a ``PicklingError``.
Third-party executor libraries that use ``dill`` or ``cloudpickle`` for
serialisation — such as `executorlib `_
or `parsl `_ — do not have this restriction and
can submit locally-defined functions directly.
BoundWrapper — Freezing State for Workers
-----------------------------------------
:class:`fleche.state.BoundWrapper` captures the current cache and metadata into
a picklable callable, so workers automatically use the correct state without
manual setup:
.. code-block:: python
import concurrent.futures
from fleche.state import BoundWrapper
from fleche.caches import Cache
from fleche.storage import Memory
import fleche
@fleche.fleche
def predict(x):
return x * 2
file_cache = Cache(Memory({}), Memory({}))
with fleche.cache(file_cache):
bound_predict = BoundWrapper.bind(predict)
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(bound_predict, i) for i in range(4)]
results = [f.result() for f in futures]
``BoundWrapper.bind(func)`` snapshots the active cache and metadata at bind
time. When called — even in a different process after pickling — the bound
wrapper restores that state before invoking the function.
This also works for **plain functions that call fleche-decorated functions
internally**: the bound state propagates to all nested fleche calls.
.. code-block:: python
@fleche.fleche
def step_a(x):
return x + 1
def pipeline(x):
"""Plain function that uses fleche-decorated helpers."""
return step_a(x) * 3
with fleche.cache(file_cache):
bound_pipeline = BoundWrapper.bind(pipeline)
# In a worker process, step_a will use file_cache
with concurrent.futures.ProcessPoolExecutor() as executor:
future = executor.submit(bound_pipeline, 10)
assert future.result() == 33 # (10 + 1) * 3
executorlib
-----------
`executorlib `_ provides executors that
follow the same ``concurrent.futures`` interface but manage HPC resources
(SLURM, MPI, etc.). Because ``executorlib.SingleNodeExecutor`` spawns worker
processes, the same rules as ``ProcessPoolExecutor`` apply:
- In-memory caches are **not** shared with workers.
- Use **file-** or **SQL-backed** storage and set it up explicitly in each
worker, or use :class:`~fleche.state.BoundWrapper` with a file-backed cache.
.. code-block:: python
from executorlib import SingleNodeExecutor
with fleche.cache(file_cache):
bound = BoundWrapper.bind(heavy_computation)
with SingleNodeExecutor() as executor:
future = executor.submit(bound, 42)
result = future.result()
The ``isolate`` Flag
--------------------
The ``@fleche(isolate=True)`` option creates a temporary working directory for
each function invocation to prevent file-system conflicts when the wrapped
function writes to the current directory. This is useful for parallel
execution of functions that produce side-effect files.
.. warning::
``os.chdir()`` is **process-wide and not thread-safe**. The ``isolate``
flag should only be combined with process-based executors, not with
``ThreadPoolExecutor``.
Quick Reference
---------------
.. list-table::
:header-rows: 1
:widths: 30 15 25 30
* - Executor
- Works?
- Context inherited?
- Recommended pattern
* - ``ThreadPoolExecutor``
- Yes
- No (manual)
- ``ctx.run()`` with ``copy_context()``
* - ``ProcessPoolExecutor``
- Yes
- No (separate process)
- File/SQL storage or ``BoundWrapper``
* - ``executorlib.SingleNodeExecutor``
- Yes
- No (separate process)
- File/SQL storage or ``BoundWrapper``