Concurrent Execution with Fleche

This notebook shows how to use @fleche-decorated functions with Python’s three executor types using three complementary APIs:

API

What it does

When to use

BoundWrapper.bind(func)

Freezes the current cache/metadata into a picklable callable

You want explicit control; you pass the bound callable around

Future pass-through

A decorated function returns a Future; fleche caches on completion

The decorated function is the one that submits work

wrap_executor(executor)

Monkey-patches submit: binds on miss, short-circuits on hit

Simplest call site; avoids submit/serialise on cached inputs

Executor

Required storage

Works with

ThreadPoolExecutor

Any (Memory or file)

All three

ProcessPoolExecutor

Persistent (file / SQL)

BoundWrapper, wrap_executor

SingleNodeExecutor (executorlib)

Persistent (file / SQL)

BoundWrapper, wrap_executor

[ ]:
import tempfile
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Future

from fleche import fleche, cache, BoundWrapper, wrap_executor
from fleche.caches import Cache
from fleche.storage.memory import ValueMemory, CallMemory
from fleche.storage.pickle_file import ValuePickleFile, CallPickleFile

1. BoundWrapper.bind — explicit state capture

BoundWrapper.bind(func) captures the active cache and metadata at bind time and restores them on every call, including inside worker threads and processes. The returned object is a regular picklable callable — you can pass it to any executor, store it in a dict, or hand it to another module.

1a. ThreadPoolExecutor

[ ]:
@fleche
def add(x, y):
    return x + y


my_cache = Cache(ValueMemory({}), CallMemory({}))

with cache(my_cache):
    with ThreadPoolExecutor(max_workers=2) as pool:
        futures = [pool.submit(BoundWrapper.bind(add), x, x + 1) for x in range(4)]
        results = [f.result() for f in futures]

print("Results:", results)                                    # [1, 3, 5, 7]
print("In my_cache:", my_cache.contains(add.fleche.digest(0, 1)))   # True

1b. Passing a bound callable around

Because the bound callable is a regular Python object, you can bind it once and pass it to other modules, helper functions, or executor pools without re-specifying the cache. The captured state travels with the callable:

[ ]:
my_cache = Cache(ValueMemory({}), CallMemory({}))

with cache(my_cache):
    bound_add = BoundWrapper.bind(add)   # bind once, reuse anywhere

# bound_add carries my_cache — the cache context manager is no longer needed at the call site
with ThreadPoolExecutor(max_workers=2) as pool:
    futures = [pool.submit(bound_add, x, x + 1) for x in range(4)]
    results = [f.result() for f in futures]

print("Results:", results)                                    # [1, 3, 5, 7]
print("In my_cache:", my_cache.contains(add.fleche.digest(0, 1)))   # True

1c. ProcessPoolExecutor

Worker processes run in separate Python interpreters — an in-memory cache set in the parent is not visible inside workers. Use a file- or SQL-backed backend so that results written by workers are visible to the parent process.

[ ]:
@fleche
def expensive(x):
    return x ** 3


with tempfile.TemporaryDirectory() as tmpdir:
    shared_cache = Cache(
        ValuePickleFile.with_pickle(root=os.path.join(tmpdir, "values")),
        CallPickleFile.with_pickle(root=os.path.join(tmpdir, "calls")),
    )

    with cache(shared_cache):
        with ProcessPoolExecutor(max_workers=2) as pool:
            results = list(pool.map(BoundWrapper.bind(expensive), range(5)))

    print("Results:", results)                                      # [0, 1, 8, 27, 64]
    print("Cached:", shared_cache.contains(expensive.fleche.digest(3)))    # True

1d. executorlib.SingleNodeExecutor

executorlib provides executors that follow the same concurrent.futures interface but manage HPC resources. Its SingleNodeExecutor uses process-based workers with the same isolation semantics as ProcessPoolExecutor, so the same file-backed cache pattern applies.

Note: executorlib is an optional dependency. Install it with pip install executorlib.

[ ]:
try:
    from executorlib import SingleNodeExecutor

    @fleche
    def cube_sne(x):
        return x ** 3

    with tempfile.TemporaryDirectory() as tmpdir:
        shared_cache = Cache(
            ValuePickleFile.with_pickle(root=os.path.join(tmpdir, "values")),
            CallPickleFile.with_pickle(root=os.path.join(tmpdir, "calls")),
        )

        with cache(shared_cache):
            with SingleNodeExecutor() as executor:
                futures = [executor.submit(BoundWrapper.bind(cube_sne), x) for x in range(5)]
                results = [f.result() for f in futures]

        print("Results:", results)                                              # [0, 1, 8, 27, 64]
        print("In shared_cache:", shared_cache.contains(cube_sne.fleche.digest(3)))    # True
except ImportError:
    print("executorlib not installed; skipping SingleNodeExecutor example.")

2. Future pass-through — cache async-style return values

A fleche-decorated function can return a concurrent.futures.Future directly. Fleche detects this, passes the future through to the caller unchanged, and caches the result once the future completes via a done-callback.

This is the right pattern when the decorated function is the one that submits work: its caller can still .result() the future, and a second call with the same arguments skips the executor entirely and returns the cached value synchronously.

[ ]:
_thread_pool = ThreadPoolExecutor(max_workers=2)


@fleche
def square_async(x):
    """Submits work and returns a Future; fleche caches on completion."""
    return _thread_pool.submit(lambda: x ** 2)


my_cache = Cache(ValueMemory({}), CallMemory({}))

with cache(my_cache):
    fut = square_async(4)
    print("First call returns a Future:", isinstance(fut, Future))     # True
    print("Result:", fut.result())                                      # 16
    print("Cached:", my_cache.contains(square_async.fleche.digest(4)))         # True

    # Second call: no executor submission, cached value returned directly.
    result = square_async(4)
    print("Second call is the cached value, not a Future:", result)    # 16

When a cached value already exists, the decorated function is not invoked and no Future is created — you simply get the cached result back. If you need the result to always behave like a Future at the call site, wrap it with an already-completed Future at that point, or use wrap_executor below instead.


3. wrap_executor — transparent submit interception

fleche.wrap_executor(executor) monkey-patches executor.submit on an existing instance so that fleche-decorated functions are handled automatically. It does three things per submit:

  1. Non-fleche callables pass straight through to the original submit.

  2. Cache hits return an already-completed Future — the executor is never touched, saving submit/serialise overhead.

  3. Cache misses are bound via BoundWrapper.bind(...) so the parent’s cache and metadata travel with the call into the worker.

No subclassing, no proxy — it replaces the submit attribute on the instance and returns the same instance.

3a. Thread-pool: cache hits skip the executor

With wrap_executor in place, repeat submissions with the same arguments never enter the executor. The second future is already done before we ask for its result.

[ ]:
@fleche
def triple(x):
    return x * 3


my_cache = Cache(ValueMemory({}), CallMemory({}))

with cache(my_cache):
    # First pass: cache is cold, work runs in workers.
    with ThreadPoolExecutor(max_workers=2) as pool:
        wrap_executor(pool)
        futures = [pool.submit(triple, x) for x in range(4)]
        results = [f.result() for f in futures]

    print("First-pass results:", results)                              # [0, 3, 6, 9]

    # Second pass: every submission is already cached.
    with ThreadPoolExecutor(max_workers=2) as pool:
        wrap_executor(pool)
        futures = [pool.submit(triple, x) for x in range(4)]
        print("All already done?", all(f.done() for f in futures))    # True
        print("Cached results:", [f.result() for f in futures])

3b. Process pool with file-backed cache

wrap_executor composes with the file-backed cache pattern, so users can submit fleche-decorated functions directly without needing to remember BoundWrapper.bind at every call site.

[ ]:
with tempfile.TemporaryDirectory() as tmpdir:
    shared_cache = Cache(
        ValuePickleFile.with_pickle(root=os.path.join(tmpdir, "values")),
        CallPickleFile.with_pickle(root=os.path.join(tmpdir, "calls")),
    )

    with cache(shared_cache):
        with ProcessPoolExecutor(max_workers=2) as pool:
            wrap_executor(pool)
            futures = [pool.submit(expensive, i) for i in range(5)]
            results = [f.result() for f in futures]

    print("Results:", results)                                         # [0, 1, 8, 27, 64]
    print("Cached:", shared_cache.contains(expensive.fleche.digest(3)))       # True

3c. Executor-specific kwargs are split automatically

Some executors (e.g. executorlib’s resource_dict, dask’s resources=) declare their own keyword-only parameters on submit. wrap_executor inspects the signature with inspect.signature and forwards those to submit while binding the rest into the function payload:

with cache(shared_cache):
    with SingleNodeExecutor() as executor:
        wrap_executor(executor)
        future = executor.submit(
            expensive, 5,
            resource_dict={"cores": 4},   # goes to SingleNodeExecutor.submit
        )
        future.result()                    # goes to expensive

Plain stdlib executors (ThreadPoolExecutor, ProcessPoolExecutor) have no keyword-only parameters on submit, so everything is forwarded to the function as expected.


Summary

BoundWrapper.bind(func)                    ✅ explicit; pass bound callable anywhere
    with cache(...): pool.submit(BoundWrapper.bind(func), ...)

Future pass-through                        ✅ cache async-style return values
    @fleche
    def f(x): return pool.submit(...)

wrap_executor(pool)                        ✅ transparent; cache hits never submit
    with cache(...):
        wrap_executor(pool)
        pool.submit(f, x)                   ← ordinary submit

Storage reference

Backend

Cross-process sharing

Memory

❌ Ephemeral — process-local only

PickleFile

✅ Persistent — shared via filesystem

Sql

✅ Persistent — shared via database

BagOfHolding

✅ Persistent — shared via HDF5 file