{ "cells": [ { "cell_type": "markdown", "id": "c5cac4f4", "metadata": {}, "source": [ "# Concurrent Execution with Fleche\n", "\n", "This notebook shows how to use `@fleche`-decorated functions with Python's three executor types using **three complementary APIs**:\n", "\n", "| API | What it does | When to use |\n", "|-----|--------------|-------------|\n", "| `BoundWrapper.bind(func)` | Freezes the current cache/metadata into a picklable callable | You want explicit control; you pass the bound callable around |\n", "| **Future pass-through** | A decorated function returns a `Future`; fleche caches on completion | The decorated function *is* the one that submits work |\n", "| `wrap_executor(executor)` | Monkey-patches `submit`: binds on miss, short-circuits on hit | Simplest call site; avoids submit/serialise on cached inputs |\n", "\n", "| Executor | Required storage | Works with |\n", "|---|---|---|\n", "| `ThreadPoolExecutor` | Any (Memory or file) | All three |\n", "| `ProcessPoolExecutor` | **Persistent** (file / SQL) | `BoundWrapper`, `wrap_executor` |\n", "| `SingleNodeExecutor` (executorlib) | **Persistent** (file / SQL) | `BoundWrapper`, `wrap_executor` |" ] }, { "cell_type": "code", "id": "3157c6b5", "metadata": {}, "execution_count": null, "outputs": [], "source": [ "import tempfile\n", "import os\n", "from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Future\n", "\n", "from fleche import fleche, cache, BoundWrapper, wrap_executor\n", "from fleche.caches import Cache\n", "from fleche.storage.memory import ValueMemory, CallMemory\n", "from fleche.storage.pickle_file import ValuePickleFile, CallPickleFile" ] }, { "cell_type": "markdown", "id": "5d911614", "metadata": {}, "source": [ "## 1. `BoundWrapper.bind` — explicit state capture\n", "\n", "`BoundWrapper.bind(func)` captures the active cache and metadata at bind time and restores them on every call, including inside worker threads and processes. The returned object is a regular picklable callable — you can pass it to any executor, store it in a dict, or hand it to another module." ] }, { "cell_type": "markdown", "id": "e1051643", "metadata": {}, "source": [ "### 1a. ThreadPoolExecutor" ] }, { "cell_type": "code", "id": "eba0bf4d", "metadata": {}, "execution_count": null, "outputs": [], "source": [ "@fleche\n", "def add(x, y):\n", " return x + y\n", "\n", "\n", "my_cache = Cache(ValueMemory({}), CallMemory({}))\n", "\n", "with cache(my_cache):\n", " with ThreadPoolExecutor(max_workers=2) as pool:\n", " futures = [pool.submit(BoundWrapper.bind(add), x, x + 1) for x in range(4)]\n", " results = [f.result() for f in futures]\n", "\n", "print(\"Results:\", results) # [1, 3, 5, 7]\n", "print(\"In my_cache:\", my_cache.contains(add.fleche.digest(0, 1))) # True" ] }, { "cell_type": "markdown", "id": "394dbb85", "metadata": {}, "source": [ "### 1b. Passing a bound callable around\n", "\n", "Because the bound callable is a regular Python object, you can bind it once and pass it to other modules, helper functions, or executor pools without re-specifying the cache. The captured state travels with the callable:" ] }, { "cell_type": "code", "id": "4a88effb", "metadata": {}, "execution_count": null, "outputs": [], "source": [ "my_cache = Cache(ValueMemory({}), CallMemory({}))\n", "\n", "with cache(my_cache):\n", " bound_add = BoundWrapper.bind(add) # bind once, reuse anywhere\n", "\n", "# bound_add carries my_cache — the cache context manager is no longer needed at the call site\n", "with ThreadPoolExecutor(max_workers=2) as pool:\n", " futures = [pool.submit(bound_add, x, x + 1) for x in range(4)]\n", " results = [f.result() for f in futures]\n", "\n", "print(\"Results:\", results) # [1, 3, 5, 7]\n", "print(\"In my_cache:\", my_cache.contains(add.fleche.digest(0, 1))) # True" ] }, { "cell_type": "markdown", "id": "8e339bd3", "metadata": {}, "source": [ "### 1c. ProcessPoolExecutor\n", "\n", "Worker processes run in separate Python interpreters — an in-memory cache set in the parent is **not visible** inside workers. Use a **file-** or **SQL-backed** backend so that results written by workers are visible to the parent process." ] }, { "cell_type": "code", "id": "0375884b", "metadata": {}, "execution_count": null, "outputs": [], "source": [ "@fleche\n", "def expensive(x):\n", " return x ** 3\n", "\n", "\n", "with tempfile.TemporaryDirectory() as tmpdir:\n", " shared_cache = Cache(\n", " ValuePickleFile.with_pickle(root=os.path.join(tmpdir, \"values\")),\n", " CallPickleFile.with_pickle(root=os.path.join(tmpdir, \"calls\")),\n", " )\n", "\n", " with cache(shared_cache):\n", " with ProcessPoolExecutor(max_workers=2) as pool:\n", " results = list(pool.map(BoundWrapper.bind(expensive), range(5)))\n", "\n", " print(\"Results:\", results) # [0, 1, 8, 27, 64]\n", " print(\"Cached:\", shared_cache.contains(expensive.fleche.digest(3))) # True" ] }, { "cell_type": "markdown", "id": "6a8ac14b", "metadata": {}, "source": [ "### 1d. executorlib.SingleNodeExecutor\n", "\n", "[executorlib](https://github.com/pyiron/executorlib) provides executors that follow the same `concurrent.futures` interface but manage HPC resources. Its `SingleNodeExecutor` uses process-based workers with the same isolation semantics as `ProcessPoolExecutor`, so the same file-backed cache pattern applies.\n", "\n", "> **Note:** `executorlib` is an optional dependency. Install it with `pip install executorlib`." ] }, { "cell_type": "code", "id": "f28fd4ad", "metadata": {}, "execution_count": null, "outputs": [], "source": [ "try:\n", " from executorlib import SingleNodeExecutor\n", "\n", " @fleche\n", " def cube_sne(x):\n", " return x ** 3\n", "\n", " with tempfile.TemporaryDirectory() as tmpdir:\n", " shared_cache = Cache(\n", " ValuePickleFile.with_pickle(root=os.path.join(tmpdir, \"values\")),\n", " CallPickleFile.with_pickle(root=os.path.join(tmpdir, \"calls\")),\n", " )\n", "\n", " with cache(shared_cache):\n", " with SingleNodeExecutor() as executor:\n", " futures = [executor.submit(BoundWrapper.bind(cube_sne), x) for x in range(5)]\n", " results = [f.result() for f in futures]\n", "\n", " print(\"Results:\", results) # [0, 1, 8, 27, 64]\n", " print(\"In shared_cache:\", shared_cache.contains(cube_sne.fleche.digest(3))) # True\n", "except ImportError:\n", " print(\"executorlib not installed; skipping SingleNodeExecutor example.\")" ] }, { "cell_type": "markdown", "id": "ebb1f3f8", "metadata": {}, "source": [ "---\n", "## 2. Future pass-through — cache async-style return values\n", "\n", "A fleche-decorated function can return a `concurrent.futures.Future` directly. Fleche detects this, passes the future through to the caller unchanged, and **caches the result once the future completes** via a done-callback.\n", "\n", "This is the right pattern when the decorated function *is* the one that submits work: its caller can still `.result()` the future, and a second call with the same arguments skips the executor entirely and returns the cached value synchronously." ] }, { "cell_type": "code", "id": "0b6f9de2", "metadata": {}, "execution_count": null, "outputs": [], "source": [ "_thread_pool = ThreadPoolExecutor(max_workers=2)\n", "\n", "\n", "@fleche\n", "def square_async(x):\n", " \"\"\"Submits work and returns a Future; fleche caches on completion.\"\"\"\n", " return _thread_pool.submit(lambda: x ** 2)\n", "\n", "\n", "my_cache = Cache(ValueMemory({}), CallMemory({}))\n", "\n", "with cache(my_cache):\n", " fut = square_async(4)\n", " print(\"First call returns a Future:\", isinstance(fut, Future)) # True\n", " print(\"Result:\", fut.result()) # 16\n", " print(\"Cached:\", my_cache.contains(square_async.fleche.digest(4))) # True\n", "\n", " # Second call: no executor submission, cached value returned directly.\n", " result = square_async(4)\n", " print(\"Second call is the cached value, not a Future:\", result) # 16" ] }, { "cell_type": "markdown", "id": "64942686", "metadata": {}, "source": [ "When a cached value already exists, the decorated function is **not** invoked and **no** Future is created — you simply get the cached result back. If you need the result to always behave like a Future at the call site, wrap it with an already-completed Future at that point, or use `wrap_executor` below instead." ] }, { "cell_type": "markdown", "id": "77a5d329", "metadata": {}, "source": [ "---\n", "## 3. `wrap_executor` — transparent submit interception\n", "\n", "`fleche.wrap_executor(executor)` monkey-patches `executor.submit` on an existing instance so that fleche-decorated functions are handled automatically. It does three things per `submit`:\n", "\n", "1. **Non-fleche callables** pass straight through to the original `submit`.\n", "2. **Cache hits** return an already-completed `Future` — the executor is never touched, saving submit/serialise overhead.\n", "3. **Cache misses** are bound via `BoundWrapper.bind(...)` so the parent's cache and metadata travel with the call into the worker.\n", "\n", "No subclassing, no proxy — it replaces the `submit` attribute on the instance and returns the same instance." ] }, { "cell_type": "markdown", "id": "71e759d4", "metadata": {}, "source": [ "### 3a. Thread-pool: cache hits skip the executor\n", "\n", "With `wrap_executor` in place, repeat submissions with the same arguments never enter the executor. The second future is already done before we ask for its result." ] }, { "cell_type": "code", "id": "fb5528cc", "metadata": {}, "execution_count": null, "outputs": [], "source": [ "@fleche\n", "def triple(x):\n", " return x * 3\n", "\n", "\n", "my_cache = Cache(ValueMemory({}), CallMemory({}))\n", "\n", "with cache(my_cache):\n", " # First pass: cache is cold, work runs in workers.\n", " with ThreadPoolExecutor(max_workers=2) as pool:\n", " wrap_executor(pool)\n", " futures = [pool.submit(triple, x) for x in range(4)]\n", " results = [f.result() for f in futures]\n", "\n", " print(\"First-pass results:\", results) # [0, 3, 6, 9]\n", "\n", " # Second pass: every submission is already cached.\n", " with ThreadPoolExecutor(max_workers=2) as pool:\n", " wrap_executor(pool)\n", " futures = [pool.submit(triple, x) for x in range(4)]\n", " print(\"All already done?\", all(f.done() for f in futures)) # True\n", " print(\"Cached results:\", [f.result() for f in futures])" ] }, { "cell_type": "markdown", "id": "ccbf01e0", "metadata": {}, "source": [ "### 3b. Process pool with file-backed cache\n", "\n", "`wrap_executor` composes with the file-backed cache pattern, so users can submit fleche-decorated functions directly without needing to remember `BoundWrapper.bind` at every call site." ] }, { "cell_type": "code", "id": "9d4292dc", "metadata": {}, "execution_count": null, "outputs": [], "source": [ "with tempfile.TemporaryDirectory() as tmpdir:\n", " shared_cache = Cache(\n", " ValuePickleFile.with_pickle(root=os.path.join(tmpdir, \"values\")),\n", " CallPickleFile.with_pickle(root=os.path.join(tmpdir, \"calls\")),\n", " )\n", "\n", " with cache(shared_cache):\n", " with ProcessPoolExecutor(max_workers=2) as pool:\n", " wrap_executor(pool)\n", " futures = [pool.submit(expensive, i) for i in range(5)]\n", " results = [f.result() for f in futures]\n", "\n", " print(\"Results:\", results) # [0, 1, 8, 27, 64]\n", " print(\"Cached:\", shared_cache.contains(expensive.fleche.digest(3))) # True" ] }, { "cell_type": "markdown", "id": "045d8559", "metadata": {}, "source": [ "### 3c. Executor-specific kwargs are split automatically\n", "\n", "Some executors (e.g. executorlib's `resource_dict`, dask's `resources=`) declare their own keyword-only parameters on `submit`. `wrap_executor` inspects the signature with `inspect.signature` and forwards those to `submit` while binding the rest into the function payload:\n", "\n", "```python\n", "with cache(shared_cache):\n", " with SingleNodeExecutor() as executor:\n", " wrap_executor(executor)\n", " future = executor.submit(\n", " expensive, 5,\n", " resource_dict={\"cores\": 4}, # goes to SingleNodeExecutor.submit\n", " )\n", " future.result() # goes to expensive\n", "```\n", "\n", "Plain stdlib executors (`ThreadPoolExecutor`, `ProcessPoolExecutor`) have no keyword-only parameters on `submit`, so everything is forwarded to the function as expected." ] }, { "cell_type": "markdown", "id": "7d76b646", "metadata": {}, "source": [ "---\n", "## Summary\n", "\n", "```\n", "BoundWrapper.bind(func) ✅ explicit; pass bound callable anywhere\n", " with cache(...): pool.submit(BoundWrapper.bind(func), ...)\n", "\n", "Future pass-through ✅ cache async-style return values\n", " @fleche\n", " def f(x): return pool.submit(...)\n", "\n", "wrap_executor(pool) ✅ transparent; cache hits never submit\n", " with cache(...):\n", " wrap_executor(pool)\n", " pool.submit(f, x) ← ordinary submit\n", "```\n", "\n", "**Storage reference**\n", "\n", "| Backend | Cross-process sharing |\n", "|---|---|\n", "| `Memory` | ❌ Ephemeral — process-local only |\n", "| `PickleFile` | ✅ Persistent — shared via filesystem |\n", "| `Sql` | ✅ Persistent — shared via database |\n", "| `BagOfHolding` | ✅ Persistent — shared via HDF5 file |" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.11.0" } }, "nbformat": 4, "nbformat_minor": 5 }