Parallel Execution
Fleche-decorated functions work with Python’s concurrent.futures executors
and other process-/thread-based parallelism libraries. This page explains the
recommended patterns for each executor type.
ThreadPoolExecutor
Use BoundWrapper with a with cache(...) block.
BoundWrapper.bind(func) captures the active cache at bind time and
restores it on every call — including inside worker threads — so all tasks
write to the same store:
import concurrent.futures
import fleche
from fleche import BoundWrapper
from fleche.caches import Cache
from fleche.storage.memory import ValueMemory, CallMemory
@fleche.fleche
def compute(x):
return x ** 2
my_cache = Cache(ValueMemory({}), CallMemory({}))
with fleche.cache(my_cache):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(BoundWrapper.bind(compute), i) for i in range(4)]
results = [f.result() for f in futures] # all cached in my_cache
Passing a bound function around
Because BoundWrapper is a regular picklable callable, you can
bind it once and pass it anywhere — to multiple executor pools, helper modules,
or across function calls. The captured state travels with the callable:
with fleche.cache(my_cache):
bound_compute = BoundWrapper.bind(compute) # bind once
# bound_compute carries my_cache — no cache context manager needed at the call site
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(bound_compute, i) for i in range(4)]
results = [f.result() for f in futures]
ProcessPoolExecutor
Worker processes are independent OS processes with their own Python interpreter. In-memory caches are not shared across processes — a result computed in a worker is stored in the worker’s own ephemeral cache and is lost when the worker exits.
To share cached results, point both the parent and workers at the same
file- or SQL-backed storage. BoundWrapper.bind(func) embeds the
cache configuration into the callable so no manual worker-wrapper function is
needed:
import concurrent.futures
import tempfile, os
import fleche
from fleche import BoundWrapper
from fleche.caches import Cache
from fleche.storage.pickle_file import ValuePickleFile, CallPickleFile
@fleche.fleche
def heavy_computation(x):
return x ** 3
with tempfile.TemporaryDirectory() as tmpdir:
file_cache = Cache(
ValuePickleFile.with_pickle(root=os.path.join(tmpdir, "values")),
CallPickleFile.with_pickle(root=os.path.join(tmpdir, "calls")),
)
with fleche.cache(file_cache):
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(BoundWrapper.bind(heavy_computation), i)
for i in range(8)]
results = [f.result() for f in futures]
assert file_cache.contains(heavy_computation.digest(3))
Fleche’s file-based storage uses lock files to coordinate concurrent writes, so multiple workers can safely write to the same directory.
Note
With the standard-library ProcessPoolExecutor (which uses pickle),
fleche-decorated functions must be defined at module level. Lambdas and
locally-defined functions will raise a PicklingError.
Third-party executor libraries that use dill or cloudpickle for
serialisation — such as executorlib
or parsl — do not have this restriction and
can submit locally-defined functions directly.
BoundWrapper — Freezing State for Workers
fleche.BoundWrapper captures the current cache and metadata into
a picklable callable, so workers automatically use the correct state without
manual setup.
This also works for plain functions that call fleche-decorated functions internally: the bound state propagates to all nested fleche calls.
@fleche.fleche
def step_a(x):
return x + 1
def pipeline(x):
"""Plain function that uses fleche-decorated helpers."""
return step_a(x) * 3
with fleche.cache(file_cache):
bound_pipeline = BoundWrapper.bind(pipeline)
# In a worker process, step_a will use file_cache
with concurrent.futures.ProcessPoolExecutor() as executor:
future = executor.submit(bound_pipeline, 10)
assert future.result() == 33 # (10 + 1) * 3
executorlib
executorlib provides executors that
follow the same concurrent.futures interface but manage HPC resources
(SLURM, MPI, etc.). Because executorlib.SingleNodeExecutor spawns worker
processes, the same rules as ProcessPoolExecutor apply:
In-memory caches are not shared with workers.
Use file- or SQL-backed storage with
BoundWrapper.
from executorlib import SingleNodeExecutor
with fleche.cache(file_cache):
with SingleNodeExecutor() as executor:
futures = [executor.submit(BoundWrapper.bind(heavy_computation), i)
for i in range(4)]
results = [f.result() for f in futures]
The isolate Flag
The @fleche(isolate=True) option creates a temporary working directory for
each function invocation to prevent file-system conflicts when the wrapped
function writes to the current directory. This is useful for parallel
execution of functions that produce side-effect files.
Warning
os.chdir() is process-wide and not thread-safe. The isolate
flag should only be combined with process-based executors, not with
ThreadPoolExecutor.
Quick Reference
Executor |
Works? |
Context inherited? |
Recommended pattern |
|---|---|---|---|
|
Yes |
No (manual) |
|
|
Yes |
No (separate process) |
File/SQL storage + |
|
Yes |
No (separate process) |
File/SQL storage + |
Known Limitations
Cache Stampede (Thundering Herd)
Fleche does not protect against cache stampedes. When multiple workers
(threads or processes) call the same fleche-decorated function with identical
arguments at the same time and the result is not yet cached, all of them will
experience a cache miss simultaneously. Each worker then independently
computes the function and writes the result — wasting redundant work and
potentially producing inconsistent call metadata (e.g. Runtime values will
differ between the duplicate records).
The root cause is that there is no reservation mechanism between the cache
miss check and the cache write in wrapper.py. A correct fix would require
pre-allocating a “pending” slot in the cache so that other workers can detect
the in-flight computation and wait, rather than starting their own.
Implementing this correctly is non-trivial: it requires key-scoped locking,
timeout handling for stale allocations (e.g. if the computing worker crashes),
and cross-process coordination for file- and SQL-backed stores. This is
currently out of scope.
Warning
If you submit many workers with the same arguments before the first result is cached, all workers will compute the function. The last writer wins for the stored value, but no data is lost or corrupted — subsequent calls will hit the cache normally.
Practical workarounds:
Pre-warm the cache sequentially before launching parallel workers.
Partition work so each worker handles a disjoint set of arguments.