Parallel Execution ================== Fleche-decorated functions work with Python's ``concurrent.futures`` executors and other process-/thread-based parallelism libraries. This page explains the recommended patterns for each executor type. .. contents:: On this page :local: :depth: 2 ThreadPoolExecutor ------------------ Use :class:`~fleche.BoundWrapper` with a ``with cache(...)`` block. ``BoundWrapper.bind(func)`` captures the active cache at bind time and restores it on every call — including inside worker threads — so all tasks write to the same store: .. code-block:: python import concurrent.futures import fleche from fleche import BoundWrapper from fleche.caches import Cache from fleche.storage.memory import ValueMemory, CallMemory @fleche.fleche def compute(x): return x ** 2 my_cache = Cache(ValueMemory({}), CallMemory({})) with fleche.cache(my_cache): with concurrent.futures.ThreadPoolExecutor() as executor: futures = [executor.submit(BoundWrapper.bind(compute), i) for i in range(4)] results = [f.result() for f in futures] # all cached in my_cache Passing a bound function around ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Because :class:`~fleche.BoundWrapper` is a regular picklable callable, you can bind it once and pass it anywhere — to multiple executor pools, helper modules, or across function calls. The captured state travels with the callable: .. code-block:: python with fleche.cache(my_cache): bound_compute = BoundWrapper.bind(compute) # bind once # bound_compute carries my_cache — no cache context manager needed at the call site with concurrent.futures.ThreadPoolExecutor() as executor: futures = [executor.submit(bound_compute, i) for i in range(4)] results = [f.result() for f in futures] ProcessPoolExecutor ------------------- Worker processes are independent OS processes with their own Python interpreter. In-memory caches are **not shared** across processes — a result computed in a worker is stored in the worker's own ephemeral cache and is lost when the worker exits. To share cached results, point both the parent and workers at the same **file-** or **SQL-backed** storage. ``BoundWrapper.bind(func)`` embeds the cache configuration into the callable so no manual worker-wrapper function is needed: .. code-block:: python import concurrent.futures import tempfile, os import fleche from fleche import BoundWrapper from fleche.caches import Cache from fleche.storage.pickle_file import ValuePickleFile, CallPickleFile @fleche.fleche def heavy_computation(x): return x ** 3 with tempfile.TemporaryDirectory() as tmpdir: file_cache = Cache( ValuePickleFile.with_pickle(root=os.path.join(tmpdir, "values")), CallPickleFile.with_pickle(root=os.path.join(tmpdir, "calls")), ) with fleche.cache(file_cache): with concurrent.futures.ProcessPoolExecutor() as executor: futures = [executor.submit(BoundWrapper.bind(heavy_computation), i) for i in range(8)] results = [f.result() for f in futures] assert file_cache.contains(heavy_computation.digest(3)) Fleche's file-based storage uses lock files to coordinate concurrent writes, so multiple workers can safely write to the same directory. .. note:: With the standard-library ``ProcessPoolExecutor`` (which uses ``pickle``), fleche-decorated functions must be defined at **module level**. Lambdas and locally-defined functions will raise a ``PicklingError``. Third-party executor libraries that use ``dill`` or ``cloudpickle`` for serialisation — such as `executorlib `_ or `parsl `_ — do not have this restriction and can submit locally-defined functions directly. BoundWrapper — Freezing State for Workers ----------------------------------------- :class:`fleche.BoundWrapper` captures the current cache and metadata into a picklable callable, so workers automatically use the correct state without manual setup. This also works for **plain functions that call fleche-decorated functions internally**: the bound state propagates to all nested fleche calls. .. code-block:: python @fleche.fleche def step_a(x): return x + 1 def pipeline(x): """Plain function that uses fleche-decorated helpers.""" return step_a(x) * 3 with fleche.cache(file_cache): bound_pipeline = BoundWrapper.bind(pipeline) # In a worker process, step_a will use file_cache with concurrent.futures.ProcessPoolExecutor() as executor: future = executor.submit(bound_pipeline, 10) assert future.result() == 33 # (10 + 1) * 3 executorlib ----------- `executorlib `_ provides executors that follow the same ``concurrent.futures`` interface but manage HPC resources (SLURM, MPI, etc.). Because ``executorlib.SingleNodeExecutor`` spawns worker processes, the same rules as ``ProcessPoolExecutor`` apply: - In-memory caches are **not** shared with workers. - Use **file-** or **SQL-backed** storage with :class:`~fleche.BoundWrapper`. .. code-block:: python from executorlib import SingleNodeExecutor with fleche.cache(file_cache): with SingleNodeExecutor() as executor: futures = [executor.submit(BoundWrapper.bind(heavy_computation), i) for i in range(4)] results = [f.result() for f in futures] The ``isolate`` Flag -------------------- The ``@fleche(isolate=True)`` option creates a temporary working directory for each function invocation to prevent file-system conflicts when the wrapped function writes to the current directory. This is useful for parallel execution of functions that produce side-effect files. .. warning:: ``os.chdir()`` is **process-wide and not thread-safe**. The ``isolate`` flag should only be combined with process-based executors, not with ``ThreadPoolExecutor``. Quick Reference --------------- .. list-table:: :header-rows: 1 :widths: 30 15 25 30 * - Executor - Works? - Context inherited? - Recommended pattern * - ``ThreadPoolExecutor`` - Yes - No (manual) - ``BoundWrapper.bind(func)`` * - ``ProcessPoolExecutor`` - Yes - No (separate process) - File/SQL storage + ``BoundWrapper`` * - ``executorlib.SingleNodeExecutor`` - Yes - No (separate process) - File/SQL storage + ``BoundWrapper`` Known Limitations ----------------- Cache Stampede (Thundering Herd) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Fleche does **not** protect against cache stampedes. When multiple workers (threads or processes) call the same fleche-decorated function with identical arguments at the same time and the result is not yet cached, all of them will experience a cache miss simultaneously. Each worker then independently computes the function and writes the result — wasting redundant work and potentially producing inconsistent call metadata (e.g. ``Runtime`` values will differ between the duplicate records). The root cause is that there is no reservation mechanism between the cache miss check and the cache write in ``wrapper.py``. A correct fix would require pre-allocating a "pending" slot in the cache so that other workers can detect the in-flight computation and wait, rather than starting their own. Implementing this correctly is non-trivial: it requires key-scoped locking, timeout handling for stale allocations (e.g. if the computing worker crashes), and cross-process coordination for file- and SQL-backed stores. This is currently out of scope. .. warning:: If you submit many workers with the same arguments before the first result is cached, all workers will compute the function. The last writer wins for the stored value, but **no data is lost or corrupted** — subsequent calls will hit the cache normally. **Practical workarounds:** - Pre-warm the cache sequentially before launching parallel workers. - Partition work so each worker handles a disjoint set of arguments.