Concurrent Execution with Fleche
This notebook shows how to use @fleche-decorated functions with Python’s three executor types using three complementary APIs:
API |
What it does |
When to use |
|---|---|---|
|
Freezes the current cache/metadata into a picklable callable |
You want explicit control; you pass the bound callable around |
Future pass-through |
A decorated function returns a |
The decorated function is the one that submits work |
|
Monkey-patches |
Simplest call site; avoids submit/serialise on cached inputs |
Executor |
Required storage |
Works with |
|---|---|---|
|
Any (Memory or file) |
All three |
|
Persistent (file / SQL) |
|
|
Persistent (file / SQL) |
|
[ ]:
import tempfile
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Future
from fleche import fleche, cache, BoundWrapper, wrap_executor
from fleche.caches import Cache
from fleche.storage.memory import ValueMemory, CallMemory
from fleche.storage.pickle_file import ValuePickleFile, CallPickleFile
1. BoundWrapper.bind — explicit state capture
BoundWrapper.bind(func) captures the active cache and metadata at bind time and restores them on every call, including inside worker threads and processes. The returned object is a regular picklable callable — you can pass it to any executor, store it in a dict, or hand it to another module.
1a. ThreadPoolExecutor
[ ]:
@fleche
def add(x, y):
return x + y
my_cache = Cache(ValueMemory({}), CallMemory({}))
with cache(my_cache):
with ThreadPoolExecutor(max_workers=2) as pool:
futures = [pool.submit(BoundWrapper.bind(add), x, x + 1) for x in range(4)]
results = [f.result() for f in futures]
print("Results:", results) # [1, 3, 5, 7]
print("In my_cache:", my_cache.contains(add.fleche.digest(0, 1))) # True
1b. Passing a bound callable around
Because the bound callable is a regular Python object, you can bind it once and pass it to other modules, helper functions, or executor pools without re-specifying the cache. The captured state travels with the callable:
[ ]:
my_cache = Cache(ValueMemory({}), CallMemory({}))
with cache(my_cache):
bound_add = BoundWrapper.bind(add) # bind once, reuse anywhere
# bound_add carries my_cache — the cache context manager is no longer needed at the call site
with ThreadPoolExecutor(max_workers=2) as pool:
futures = [pool.submit(bound_add, x, x + 1) for x in range(4)]
results = [f.result() for f in futures]
print("Results:", results) # [1, 3, 5, 7]
print("In my_cache:", my_cache.contains(add.fleche.digest(0, 1))) # True
1c. ProcessPoolExecutor
Worker processes run in separate Python interpreters — an in-memory cache set in the parent is not visible inside workers. Use a file- or SQL-backed backend so that results written by workers are visible to the parent process.
[ ]:
@fleche
def expensive(x):
return x ** 3
with tempfile.TemporaryDirectory() as tmpdir:
shared_cache = Cache(
ValuePickleFile.with_pickle(root=os.path.join(tmpdir, "values")),
CallPickleFile.with_pickle(root=os.path.join(tmpdir, "calls")),
)
with cache(shared_cache):
with ProcessPoolExecutor(max_workers=2) as pool:
results = list(pool.map(BoundWrapper.bind(expensive), range(5)))
print("Results:", results) # [0, 1, 8, 27, 64]
print("Cached:", shared_cache.contains(expensive.fleche.digest(3))) # True
1d. executorlib.SingleNodeExecutor
executorlib provides executors that follow the same concurrent.futures interface but manage HPC resources. Its SingleNodeExecutor uses process-based workers with the same isolation semantics as ProcessPoolExecutor, so the same file-backed cache pattern applies.
Note:
executorlibis an optional dependency. Install it withpip install executorlib.
[ ]:
try:
from executorlib import SingleNodeExecutor
@fleche
def cube_sne(x):
return x ** 3
with tempfile.TemporaryDirectory() as tmpdir:
shared_cache = Cache(
ValuePickleFile.with_pickle(root=os.path.join(tmpdir, "values")),
CallPickleFile.with_pickle(root=os.path.join(tmpdir, "calls")),
)
with cache(shared_cache):
with SingleNodeExecutor() as executor:
futures = [executor.submit(BoundWrapper.bind(cube_sne), x) for x in range(5)]
results = [f.result() for f in futures]
print("Results:", results) # [0, 1, 8, 27, 64]
print("In shared_cache:", shared_cache.contains(cube_sne.fleche.digest(3))) # True
except ImportError:
print("executorlib not installed; skipping SingleNodeExecutor example.")
2. Future pass-through — cache async-style return values
A fleche-decorated function can return a concurrent.futures.Future directly. Fleche detects this, passes the future through to the caller unchanged, and caches the result once the future completes via a done-callback.
This is the right pattern when the decorated function is the one that submits work: its caller can still .result() the future, and a second call with the same arguments skips the executor entirely and returns the cached value synchronously.
[ ]:
_thread_pool = ThreadPoolExecutor(max_workers=2)
@fleche
def square_async(x):
"""Submits work and returns a Future; fleche caches on completion."""
return _thread_pool.submit(lambda: x ** 2)
my_cache = Cache(ValueMemory({}), CallMemory({}))
with cache(my_cache):
fut = square_async(4)
print("First call returns a Future:", isinstance(fut, Future)) # True
print("Result:", fut.result()) # 16
print("Cached:", my_cache.contains(square_async.fleche.digest(4))) # True
# Second call: no executor submission, cached value returned directly.
result = square_async(4)
print("Second call is the cached value, not a Future:", result) # 16
When a cached value already exists, the decorated function is not invoked and no Future is created — you simply get the cached result back. If you need the result to always behave like a Future at the call site, wrap it with an already-completed Future at that point, or use wrap_executor below instead.
3. wrap_executor — transparent submit interception
fleche.wrap_executor(executor) monkey-patches executor.submit on an existing instance so that fleche-decorated functions are handled automatically. It does three things per submit:
Non-fleche callables pass straight through to the original
submit.Cache hits return an already-completed
Future— the executor is never touched, saving submit/serialise overhead.Cache misses are bound via
BoundWrapper.bind(...)so the parent’s cache and metadata travel with the call into the worker.
No subclassing, no proxy — it replaces the submit attribute on the instance and returns the same instance.
3a. Thread-pool: cache hits skip the executor
With wrap_executor in place, repeat submissions with the same arguments never enter the executor. The second future is already done before we ask for its result.
[ ]:
@fleche
def triple(x):
return x * 3
my_cache = Cache(ValueMemory({}), CallMemory({}))
with cache(my_cache):
# First pass: cache is cold, work runs in workers.
with ThreadPoolExecutor(max_workers=2) as pool:
wrap_executor(pool)
futures = [pool.submit(triple, x) for x in range(4)]
results = [f.result() for f in futures]
print("First-pass results:", results) # [0, 3, 6, 9]
# Second pass: every submission is already cached.
with ThreadPoolExecutor(max_workers=2) as pool:
wrap_executor(pool)
futures = [pool.submit(triple, x) for x in range(4)]
print("All already done?", all(f.done() for f in futures)) # True
print("Cached results:", [f.result() for f in futures])
3b. Process pool with file-backed cache
wrap_executor composes with the file-backed cache pattern, so users can submit fleche-decorated functions directly without needing to remember BoundWrapper.bind at every call site.
[ ]:
with tempfile.TemporaryDirectory() as tmpdir:
shared_cache = Cache(
ValuePickleFile.with_pickle(root=os.path.join(tmpdir, "values")),
CallPickleFile.with_pickle(root=os.path.join(tmpdir, "calls")),
)
with cache(shared_cache):
with ProcessPoolExecutor(max_workers=2) as pool:
wrap_executor(pool)
futures = [pool.submit(expensive, i) for i in range(5)]
results = [f.result() for f in futures]
print("Results:", results) # [0, 1, 8, 27, 64]
print("Cached:", shared_cache.contains(expensive.fleche.digest(3))) # True
3c. Executor-specific kwargs are split automatically
Some executors (e.g. executorlib’s resource_dict, dask’s resources=) declare their own keyword-only parameters on submit. wrap_executor inspects the signature with inspect.signature and forwards those to submit while binding the rest into the function payload:
with cache(shared_cache):
with SingleNodeExecutor() as executor:
wrap_executor(executor)
future = executor.submit(
expensive, 5,
resource_dict={"cores": 4}, # goes to SingleNodeExecutor.submit
)
future.result() # goes to expensive
Plain stdlib executors (ThreadPoolExecutor, ProcessPoolExecutor) have no keyword-only parameters on submit, so everything is forwarded to the function as expected.
Summary
BoundWrapper.bind(func) ✅ explicit; pass bound callable anywhere
with cache(...): pool.submit(BoundWrapper.bind(func), ...)
Future pass-through ✅ cache async-style return values
@fleche
def f(x): return pool.submit(...)
wrap_executor(pool) ✅ transparent; cache hits never submit
with cache(...):
wrap_executor(pool)
pool.submit(f, x) ← ordinary submit
Storage reference
Backend |
Cross-process sharing |
|---|---|
|
❌ Ephemeral — process-local only |
|
✅ Persistent — shared via filesystem |
|
✅ Persistent — shared via database |
|
✅ Persistent — shared via HDF5 file |