Source code for fleche.storage.base

import logging
from dataclasses import dataclass
from numbers import Number

from abc import ABC, abstractmethod
from typing import Iterable, Any, Callable, Self

from ..digest import digest, Digest, DIGEST_LENGTH
from ..call import Call, QueryCall

[docs] logger = logging.getLogger("fleche.storage")
[docs] class SaveError(Exception): pass
[docs] class AmbiguousDigestError(ValueError): pass
[docs] class StorageBase(ABC): """Shared functionality between value and call storages.""" @abstractmethod
[docs] def list(self) -> Iterable[Digest]: ...
[docs] def evict(self, key: Digest | str) -> None: """Removes the entry corresponding to the key from the storage.""" if len(key) < DIGEST_LENGTH: key = self.expand(key) else: key = Digest(key) self._evict(key)
@abstractmethod
[docs] def _evict(self, key: Digest) -> None: ...
[docs] def expand(self, key: Digest | str) -> Digest: """Expands a short-hand digest to the full length one.""" if len(key) >= DIGEST_LENGTH: return Digest(str(key)) if len(key) < 4: raise KeyError(key) matches = sorted([k for k in self.list() if k.startswith(key)]) if not matches: raise KeyError(key) if len(matches) > 1: # find longest common prefix of the first two matches to find where they diverge m1, m2 = matches[0], matches[1] for i, (c1, c2) in enumerate(zip(m1, m2)): if c1 != c2: break else: i = min(len(m1), len(m2)) raise AmbiguousDigestError( f"Short digest {key} is ambiguous; need at least {i+1} characters." ) return Digest(matches[0])
[docs] def shrink(self, key: Digest | str) -> Digest: """Find the shortest substring that is still an unambigious reference to the same value.""" for ln in range(4, len(key)): try: self.expand(key[:ln]) return Digest(key[:ln]) except AmbiguousDigestError: continue raise AmbiguousDigestError( f"Digest {key} cannot be shrunk without becoming ambigious!" )
[docs] def contains(self, key: Digest | str) -> bool: if len(key) < DIGEST_LENGTH: try: key = self.expand(key) except KeyError: return False else: key = Digest(key) return self._contains(key)
@abstractmethod
[docs] def _contains(self, key: Digest) -> bool: ...
[docs] class Storage(StorageBase): """Abstract base class for defining storage mechanisms."""
[docs] def save(self, value: Any, key: Digest | None = None) -> Digest: if key is None: key = digest(value) logger.debug("Saving value with key %s", key) return self._save(value, key)
@abstractmethod
[docs] def _save(self, value: Any, key: Digest) -> Digest: ...
[docs] def load(self, key: Digest | str) -> Any: if len(key) < DIGEST_LENGTH: key = self.expand(key) else: key = Digest(key) logger.debug("Loading value with key %s", key) return self._load(key)
@abstractmethod
[docs] def _load(self, key: Digest) -> Any: ...
[docs] def _contains(self, key: Digest) -> bool: try: self._load(key) return True except KeyError: return False
[docs] class Digested(ABC): @abstractmethod
[docs] def underlying(self): """Return plain underlying value, ie. list/dict/etc of nested values or their partial digests"""
# mess with our hash to ensure that we are referentially transparent with respect to the underlying list. # For the replacement of the 'real' list with the 'digested' list to be invisible to caches, they must hash to the # same values.
[docs] def __digest__(self): return digest(self.underlying())
@abstractmethod
[docs] def mend(self, storage: 'DestructuringMixin'): ...
@classmethod @abstractmethod
[docs] def sunder(cls, intern: Callable[[Any], tuple[Any, int | float]], value: Any): ...
@dataclass
[docs] class DigestedIterable(Digested):
[docs] items: list | tuple
[docs] def underlying(self): return self.items
[docs] def mend(self, storage: 'DestructuringMixin') -> list | tuple: return type(self.items)(map(storage._load, self.items))
@classmethod
[docs] def sunder(cls, intern: Callable[[Any], tuple[Any, int | float]], value: list | tuple): children, depths = zip(*(intern(v) for v in value)) depth = 1 + max(depths) items = type(value)(children) if all(not isinstance(r, Digest) for r in items): # intern did do anything to our children because we're out of depth, return them verbatim return items, depth return cls(items), depth
@dataclass
[docs] class DigestedDict(Digested):
[docs] items: dict
[docs] def underlying(self): return self.items
[docs] def mend(self, storage: 'DestructuringMixin') -> dict: return {storage._load(k): storage._load(v) for k, v in self.items.items()}
@classmethod
[docs] def sunder(cls, intern: Callable[[Any], tuple[Any, int | float]], value: dict): kk, k_depths = zip(*(intern(k) for k in value)) vv, v_depths = zip(*(intern(v) for v in value.values())) depth = 1 + max(max(k_depths), max(v_depths)) items = dict(zip(kk, vv)) if all(not isinstance(r, Digest) for r in (*items.keys(), *items.values())): # intern did do anything to our children because we're out of depth, return them verbatim return items, depth return cls(items), depth
[docs] class DestructuringMixin(Storage): """Mixin that recursively destructures collections on save/load. Place before a concrete :class:`Storage` in the MRO to add destructuring behavior. Lists, tuples, and dicts are broken apart so each element is stored independently; on load the original structure is reassembled. Example:: class DestructuringMemory(DestructuringMixin, Memory): pass # ``storage`` here is the backing dict required by ``Memory``, not a # Storage instance. dm = DestructuringMemory(storage={}) key = dm.save([1, [2, 3]]) assert dm.load(key) == [1, [2, 3]] """
[docs] remaining_depth: int = 0
[docs] def _intern_rec(self, value: Any) -> tuple[Any, int | float]: """Post-order traversal: recurse to leaves, decide inline-vs-store on the way back up. Returns ``(result, depth)`` where *result* is the plain value when ``depth < remaining_depth`` (the element is inlined in its parent's :class:`Digested` wrapper) or a :class:`Digest` when the element was written to storage separately. Every node in the structure is visited exactly once (O(n)), unlike a separate depth-counting pass. """ depth = float("inf") match value: case Number() | str() | bytes(): depth = 0 # treat exactly like scalars, but guard syntax gets a bit weird if we try to join # technically this violates our recursion of 1 + max children depth, but I just can't see a use for # destructuring the empty container case dict() | list() | tuple() if not value: depth = 0 case list() | tuple(): value, depth = DigestedIterable.sunder(self._intern_rec, value) case dict(): value, depth = DigestedDict.sunder(self._intern_rec, value) if depth < self.remaining_depth: return value, depth return super()._save(value, digest(value)), depth
[docs] def _save(self, value: Any, key: Digest) -> Digest: if isinstance(value, Digest): return value match value: case list() | tuple() if not value: return super()._save(value, key) case list() | tuple(): children, _ = zip(*(self._intern_rec(v) for v in value)) items = type(value)(children) if not any(isinstance(r, Digest) for r in items): return super()._save(items, key) return super()._save(DigestedIterable(items), key) case dict() if not value: return super()._save(value, key) case dict(): kk, _ = zip(*(self._intern_rec(k) for k in value)) vv, _ = zip(*(self._intern_rec(v) for v in value.values())) items = dict(zip(kk, vv)) if not any(isinstance(r, Digest) for r in (*items.keys(), *items.values())): return super()._save(items, key) return super()._save(DigestedDict(items), key) case _: return super()._save(value, key)
[docs] def _load(self, key: Digest | Any) -> Any: if not isinstance(key, Digest): return key # passing through an actual value from Digested.mend value = super()._load(key) match value: case Digested(): return value.mend(self) case _: return value
@dataclass
[docs] class _DelegatingStorage(Storage): """Storage that delegates all operations to a wrapped storage instance."""
[docs] storage: Storage
[docs] def _save(self, value: Any, key: Digest) -> Digest: return self.storage.save(value, key)
[docs] def _load(self, key: Digest) -> Any: return self.storage.load(key)
[docs] def _contains(self, key: Digest) -> bool: return self.storage.contains(key)
[docs] def _evict(self, key: Digest) -> None: return self.storage.evict(key)
[docs] def list(self) -> Iterable[Digest]: return self.storage.list()
@dataclass
[docs] class DestructuringStorage(DestructuringMixin, _DelegatingStorage): """Storage wrapper that recursively destructures collections. This is a convenience class combining :class:`DestructuringMixin` with delegation to a wrapped storage. Prefer using :class:`DestructuringMixin` directly as a mixin with a concrete storage class when possible. """
[docs] remaining_depth: int = 0
[docs] def __post_init__(self): if isinstance(self.storage, DestructuringMixin): raise ValueError("DestructuringStorage cannot wrap a storage that already uses DestructuringMixin")
[docs] class CallStorage(StorageBase): """Special storage for saving :class:`Call` instances."""
[docs] def save(self, call: Call) -> Digest: key = call.to_lookup_key() logger.debug("Saving call %s", key) if self.contains(str(key)): self.evict(str(key)) return self._save(call)
@abstractmethod
[docs] def _save(self, call: Call) -> Digest: ...
[docs] def load(self, key: str | Digest) -> Call: if len(key) < DIGEST_LENGTH: key = self.expand(key) else: key = Digest(key) logger.debug("Loading call with key %s", key) return self._load(key)
@abstractmethod
[docs] def _load(self, key: Digest) -> Call: ...
[docs] def transform(self, func: Callable[[Call], Call] | None = None) -> None: """ Applies a transformation function to all Call objects in the storage. Args: func (Callable[[Call], Call] | None): A function that takes a Call and returns a transformed Call. If None, the identity function is used (useful for re-calculating keys). """ for k in list(self.list()): try: call = self.load(k) except KeyError: continue new_call = func(call) if func is not None else call new_key = new_call.to_lookup_key() if new_key != k: self.save(new_call) self.evict(k) else: self.save(new_call)
[docs] def query(self, template: QueryCall) -> Iterable[Call]: """Find cached calls that 'match' the template. Returns all calls where the given arguments, results or metadata match exactly the stored ones. Values may be given either as they are or as :class:`Digest`. Args: template (Call): specification for calls to return; use `None` as wildcard. Returns: Iterable[Call]: an iterable over all matching call objects """ def none_or_equal(a, b): return a is None or digest(a) == digest(b) for key in self.list(): call = self.load(key) if template.matches(call): yield call
[docs] def _contains(self, key: Digest) -> bool: try: self._load(key) return True except KeyError: return False
@dataclass(frozen=True, slots=True)
[docs] class CallStorageAdapter(CallStorage): """Implement a CallStorage from a generic Storage."""
[docs] storage: Storage
[docs] def _save(self, call: Call) -> Digest: return self.storage.save(call, call.to_lookup_key())
[docs] def _load(self, key: Digest) -> Call: return self.storage.load(key)
[docs] def _contains(self, key: Digest) -> bool: return self.storage.contains(key)
[docs] def _evict(self, key: Digest) -> None: self.storage.evict(key)
[docs] def list(self) -> Iterable[Digest]: return self.storage.list()