Parallel Execution
Fleche-decorated functions work with Python’s concurrent.futures executors
and other process-/thread-based parallelism libraries. This page explains the
interaction between fleche’s context-variable state and the different executor
models, and shows recommended patterns for each.
Background: How Fleche Tracks State
Fleche stores the active cache and metadata in Python
contextvars — specifically two ContextVar instances for the cache
and for metadata. Context managers like fleche.cache(...) and
fleche.meta(...) set these variables for the current context.
This design is inherently thread-safe: each logical execution context gets its own value. However, whether a new thread or process inherits the parent’s context depends on how it is spawned.
ThreadPoolExecutor
Threads spawned by concurrent.futures.ThreadPoolExecutor do not
automatically inherit ContextVar values from the submitting thread. A
fleche-decorated function submitted to a thread pool will therefore see the
default cache rather than one set via with fleche.cache(...).
Propagating context explicitly
Use contextvars.copy_context() to snapshot the current context and
ctx.run() to execute the function inside that snapshot:
import contextvars
import concurrent.futures
import fleche
from fleche.caches import Cache
from fleche.storage import Memory
@fleche.fleche
def compute(x):
return x ** 2
my_cache = Cache(Memory({}), Memory({}))
with fleche.cache(my_cache):
ctx = contextvars.copy_context()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(ctx.run, compute, 42)
result = future.result() # 1764, cached in my_cache
Results are written to my_cache because the worker thread runs inside the
copied context.
Tip
If you submit many tasks, create one context snapshot and reuse it for
every submit call — the snapshot is a lightweight, read-copy-on-write
object.
ProcessPoolExecutor
Worker processes are independent OS processes with their own Python
interpreter. ContextVar values set in the parent are never visible in
workers — they always start from the default.
In-memory caches are therefore not shared across processes. A result computed in a worker is stored in the worker’s own ephemeral cache and is lost when the worker exits.
Using file- or SQL-backed storage
To share cached results between the parent and worker processes, point both sides at the same file- or SQL-backed storage:
import concurrent.futures
import fleche
from fleche.caches import Cache
from fleche.storage.pickle_file import PickleFile
@fleche.fleche
def heavy_computation(x):
return x ** 3
def worker(x, values_dir, calls_dir):
"""Each worker sets up a cache pointing at the shared directory."""
values = PickleFile.with_pickle(root=values_dir)
calls = PickleFile.with_pickle(root=calls_dir)
with fleche.cache(Cache(values, calls)):
return heavy_computation(x)
# Shared directories (could also be SQL connection strings)
values_dir = "/tmp/fleche_values"
calls_dir = "/tmp/fleche_calls"
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(worker, i, values_dir, calls_dir)
for i in range(8)]
results = [f.result() for f in futures]
# Parent can now read the cached results
parent_cache = Cache(
PickleFile.with_pickle(root=values_dir),
PickleFile.with_pickle(root=calls_dir),
)
assert parent_cache.contains(heavy_computation.digest(3))
Fleche’s file-based storage uses lock files to coordinate concurrent writes, so multiple workers can safely write to the same directory.
Note
With the standard-library ProcessPoolExecutor (which uses pickle),
fleche-decorated functions must be defined at module level. Lambdas and
locally-defined functions will raise a PicklingError.
Third-party executor libraries that use dill or cloudpickle for
serialisation — such as executorlib
or parsl — do not have this restriction and
can submit locally-defined functions directly.
BoundWrapper — Freezing State for Workers
fleche.state.BoundWrapper captures the current cache and metadata into
a picklable callable, so workers automatically use the correct state without
manual setup:
import concurrent.futures
from fleche.state import BoundWrapper
from fleche.caches import Cache
from fleche.storage import Memory
import fleche
@fleche.fleche
def predict(x):
return x * 2
file_cache = Cache(Memory({}), Memory({}))
with fleche.cache(file_cache):
bound_predict = BoundWrapper.bind(predict)
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(bound_predict, i) for i in range(4)]
results = [f.result() for f in futures]
BoundWrapper.bind(func) snapshots the active cache and metadata at bind
time. When called — even in a different process after pickling — the bound
wrapper restores that state before invoking the function.
This also works for plain functions that call fleche-decorated functions internally: the bound state propagates to all nested fleche calls.
@fleche.fleche
def step_a(x):
return x + 1
def pipeline(x):
"""Plain function that uses fleche-decorated helpers."""
return step_a(x) * 3
with fleche.cache(file_cache):
bound_pipeline = BoundWrapper.bind(pipeline)
# In a worker process, step_a will use file_cache
with concurrent.futures.ProcessPoolExecutor() as executor:
future = executor.submit(bound_pipeline, 10)
assert future.result() == 33 # (10 + 1) * 3
executorlib
executorlib provides executors that
follow the same concurrent.futures interface but manage HPC resources
(SLURM, MPI, etc.). Because executorlib.SingleNodeExecutor spawns worker
processes, the same rules as ProcessPoolExecutor apply:
In-memory caches are not shared with workers.
Use file- or SQL-backed storage and set it up explicitly in each worker, or use
BoundWrapperwith a file-backed cache.
from executorlib import SingleNodeExecutor
with fleche.cache(file_cache):
bound = BoundWrapper.bind(heavy_computation)
with SingleNodeExecutor() as executor:
future = executor.submit(bound, 42)
result = future.result()
The isolate Flag
The @fleche(isolate=True) option creates a temporary working directory for
each function invocation to prevent file-system conflicts when the wrapped
function writes to the current directory. This is useful for parallel
execution of functions that produce side-effect files.
Warning
os.chdir() is process-wide and not thread-safe. The isolate
flag should only be combined with process-based executors, not with
ThreadPoolExecutor.
Quick Reference
Executor |
Works? |
Context inherited? |
Recommended pattern |
|---|---|---|---|
|
Yes |
No (manual) |
|
|
Yes |
No (separate process) |
File/SQL storage or |
|
Yes |
No (separate process) |
File/SQL storage or |