Parallel Execution

Fleche-decorated functions work with Python’s concurrent.futures executors and other process-/thread-based parallelism libraries. This page explains the recommended patterns for each executor type.

ThreadPoolExecutor

Use BoundWrapper with a with cache(...) block. BoundWrapper.bind(func) captures the active cache at bind time and restores it on every call — including inside worker threads — so all tasks write to the same store:

import concurrent.futures
import fleche
from fleche import BoundWrapper
from fleche.caches import Cache
from fleche.storage.memory import ValueMemory, CallMemory

@fleche.fleche
def compute(x):
    return x ** 2

my_cache = Cache(ValueMemory({}), CallMemory({}))

with fleche.cache(my_cache):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(BoundWrapper.bind(compute), i) for i in range(4)]
        results = [f.result() for f in futures]  # all cached in my_cache

Passing a bound function around

Because BoundWrapper is a regular picklable callable, you can bind it once and pass it anywhere — to multiple executor pools, helper modules, or across function calls. The captured state travels with the callable:

with fleche.cache(my_cache):
    bound_compute = BoundWrapper.bind(compute)   # bind once

# bound_compute carries my_cache — no cache context manager needed at the call site
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(bound_compute, i) for i in range(4)]
    results = [f.result() for f in futures]

ProcessPoolExecutor

Worker processes are independent OS processes with their own Python interpreter. In-memory caches are not shared across processes — a result computed in a worker is stored in the worker’s own ephemeral cache and is lost when the worker exits.

To share cached results, point both the parent and workers at the same file- or SQL-backed storage. BoundWrapper.bind(func) embeds the cache configuration into the callable so no manual worker-wrapper function is needed:

import concurrent.futures
import tempfile, os
import fleche
from fleche import BoundWrapper
from fleche.caches import Cache
from fleche.storage.pickle_file import ValuePickleFile, CallPickleFile

@fleche.fleche
def heavy_computation(x):
    return x ** 3

with tempfile.TemporaryDirectory() as tmpdir:
    file_cache = Cache(
        ValuePickleFile.with_pickle(root=os.path.join(tmpdir, "values")),
        CallPickleFile.with_pickle(root=os.path.join(tmpdir, "calls")),
    )

    with fleche.cache(file_cache):
        with concurrent.futures.ProcessPoolExecutor() as executor:
            futures = [executor.submit(BoundWrapper.bind(heavy_computation), i)
                       for i in range(8)]
            results = [f.result() for f in futures]

    assert file_cache.contains(heavy_computation.digest(3))

Fleche’s file-based storage uses lock files to coordinate concurrent writes, so multiple workers can safely write to the same directory.

Note

With the standard-library ProcessPoolExecutor (which uses pickle), fleche-decorated functions must be defined at module level. Lambdas and locally-defined functions will raise a PicklingError.

Third-party executor libraries that use dill or cloudpickle for serialisation — such as executorlib or parsl — do not have this restriction and can submit locally-defined functions directly.

BoundWrapper — Freezing State for Workers

fleche.BoundWrapper captures the current cache and metadata into a picklable callable, so workers automatically use the correct state without manual setup.

This also works for plain functions that call fleche-decorated functions internally: the bound state propagates to all nested fleche calls.

@fleche.fleche
def step_a(x):
    return x + 1

def pipeline(x):
    """Plain function that uses fleche-decorated helpers."""
    return step_a(x) * 3

with fleche.cache(file_cache):
    bound_pipeline = BoundWrapper.bind(pipeline)

# In a worker process, step_a will use file_cache
with concurrent.futures.ProcessPoolExecutor() as executor:
    future = executor.submit(bound_pipeline, 10)
    assert future.result() == 33  # (10 + 1) * 3

executorlib

executorlib provides executors that follow the same concurrent.futures interface but manage HPC resources (SLURM, MPI, etc.). Because executorlib.SingleNodeExecutor spawns worker processes, the same rules as ProcessPoolExecutor apply:

  • In-memory caches are not shared with workers.

  • Use file- or SQL-backed storage with BoundWrapper.

from executorlib import SingleNodeExecutor

with fleche.cache(file_cache):
    with SingleNodeExecutor() as executor:
        futures = [executor.submit(BoundWrapper.bind(heavy_computation), i)
                   for i in range(4)]
        results = [f.result() for f in futures]

The isolate Flag

The @fleche(isolate=True) option creates a temporary working directory for each function invocation to prevent file-system conflicts when the wrapped function writes to the current directory. This is useful for parallel execution of functions that produce side-effect files.

Warning

os.chdir() is process-wide and not thread-safe. The isolate flag should only be combined with process-based executors, not with ThreadPoolExecutor.

Quick Reference

Executor

Works?

Context inherited?

Recommended pattern

ThreadPoolExecutor

Yes

No (manual)

BoundWrapper.bind(func)

ProcessPoolExecutor

Yes

No (separate process)

File/SQL storage + BoundWrapper

executorlib.SingleNodeExecutor

Yes

No (separate process)

File/SQL storage + BoundWrapper

Known Limitations

Cache Stampede (Thundering Herd)

Fleche does not protect against cache stampedes. When multiple workers (threads or processes) call the same fleche-decorated function with identical arguments at the same time and the result is not yet cached, all of them will experience a cache miss simultaneously. Each worker then independently computes the function and writes the result — wasting redundant work and potentially producing inconsistent call metadata (e.g. Runtime values will differ between the duplicate records).

The root cause is that there is no reservation mechanism between the cache miss check and the cache write in wrapper.py. A correct fix would require pre-allocating a “pending” slot in the cache so that other workers can detect the in-flight computation and wait, rather than starting their own. Implementing this correctly is non-trivial: it requires key-scoped locking, timeout handling for stale allocations (e.g. if the computing worker crashes), and cross-process coordination for file- and SQL-backed stores. This is currently out of scope.

Warning

If you submit many workers with the same arguments before the first result is cached, all workers will compute the function. The last writer wins for the stored value, but no data is lost or corrupted — subsequent calls will hit the cache normally.

Practical workarounds:

  • Pre-warm the cache sequentially before launching parallel workers.

  • Partition work so each worker handles a disjoint set of arguments.