Source code for fleche.storage.base

import logging
from dataclasses import dataclass

from abc import ABC, abstractmethod
from typing import Iterable, Any, Callable

from ..digest import digest, Digest, DIGEST_LENGTH
from ..call import Call, QueryCall

[docs] logger = logging.getLogger("fleche.storage")
[docs] class SaveError(Exception): pass
[docs] class AmbiguousDigestError(ValueError): pass
[docs] class StorageBase(ABC): """Shared functionality between value and call storages.""" @abstractmethod
[docs] def list(self) -> Iterable[Digest]: ...
[docs] def evict(self, key: Digest | str) -> None: """Removes the entry corresponding to the key from the storage.""" if len(key) < DIGEST_LENGTH: key = self.expand(key) else: key = Digest(key) self._evict(key)
@abstractmethod
[docs] def _evict(self, key: Digest) -> None: ...
[docs] def expand(self, key: Digest | str) -> Digest: """Expands a short-hand digest to the full length one.""" if len(key) >= DIGEST_LENGTH: return Digest(str(key)) if len(key) < 4: raise KeyError(key) matches = sorted([k for k in self.list() if k.startswith(key)]) if not matches: raise KeyError(key) if len(matches) > 1: # find longest common prefix of the first two matches to find where they diverge m1, m2 = matches[0], matches[1] for i, (c1, c2) in enumerate(zip(m1, m2)): if c1 != c2: break else: i = min(len(m1), len(m2)) raise AmbiguousDigestError( f"Short digest {key} is ambiguous; need at least {i+1} characters." ) return Digest(matches[0])
[docs] def shrink(self, key: Digest | str) -> Digest: """Find the shortest substring that is still an unambigious reference to the same value.""" for ln in range(4, len(key)): try: self.expand(key[:ln]) return Digest(key[:ln]) except AmbiguousDigestError: continue raise AmbiguousDigestError( f"Digest {key} cannot be shrunk without becoming ambigious!" )
[docs] def contains(self, key: Digest | str) -> bool: if len(key) < DIGEST_LENGTH: try: key = self.expand(key) except KeyError: return False else: key = Digest(key) return self._contains(key)
@abstractmethod
[docs] def _contains(self, key: Digest) -> bool: ...
[docs] class Storage(StorageBase): """Abstract base class for defining storage mechanisms."""
[docs] def save(self, value: Any, key: Digest | None = None) -> Digest: if key is None: key = digest(value) logger.debug("Saving value with key %s", key) return self._save(value, key)
@abstractmethod
[docs] def _save(self, value: Any, key: Digest) -> Digest: ...
[docs] def load(self, key: Digest | str) -> Any: if len(key) < DIGEST_LENGTH: key = self.expand(key) else: key = Digest(key) logger.debug("Loading value with key %s", key) return self._load(key)
@abstractmethod
[docs] def _load(self, key: Digest) -> Any: ...
[docs] def _contains(self, key: Digest) -> bool: try: self._load(key) return True except KeyError: return False
[docs] class Digested(ABC): @abstractmethod
[docs] def underlying(self): ...
# mess with our hash to ensure that we are referentially transparent with respect to the underlying list. # For the replacement of the 'real' list with the 'digested' list to be invisible to caches, they must hash to the # same values.
[docs] def __digest__(self): return digest(self.underlying())
@dataclass
[docs] class DigestedIterable(Digested):
[docs] items: Iterable
[docs] def underlying(self): return self.items
@dataclass
[docs] class DigestedDict(Digested):
[docs] items: dict
[docs] def underlying(self): return self.items
@dataclass
[docs] class DestructuringStorage(Storage):
[docs] storage: Storage
[docs] def _save(self, value: Any, key: Digest) -> Digest: if isinstance(value, Digest): return value match value: case list() | tuple(): return self.storage.save( DigestedIterable(type(value)(self.save(v) for v in value)) ) case dict(): return self.storage.save( DigestedDict({self.save(k): self.save(v) for k, v in value.items()}) ) case _: return self.storage.save(value, key)
[docs] def _load(self, key: Digest) -> Any: value = self.storage.load(key) match value: case DigestedIterable(items=items): return type(items)(self.load(v) for v in items) case DigestedDict(items=items): return {self.load(k): self.load(v) for k, v in items.items()} case _: return value
[docs] def _contains(self, key: Digest) -> bool: return self.storage.contains(key)
[docs] def _evict(self, key: Digest) -> None: self.storage.evict(key)
[docs] def list(self) -> Iterable[Digest]: return self.storage.list()
[docs] class CallStorage(StorageBase): """Special storage for saving :class:`Call` instances."""
[docs] def save(self, call: Call) -> Digest: key = call.to_lookup_key() logger.debug("Saving call %s", key) if self.contains(str(key)): self.evict(str(key)) return self._save(call)
@abstractmethod
[docs] def _save(self, call: Call) -> Digest: ...
[docs] def load(self, key: str) -> Call: if len(key) < DIGEST_LENGTH: key = self.expand(key) else: key = Digest(key) logger.debug("Loading call with key %s", key) return self._load(key)
@abstractmethod
[docs] def _load(self, key: Digest) -> Call: ...
[docs] def transform(self, func: Callable[[Call], Call] | None = None) -> None: """ Applies a transformation function to all Call objects in the storage. Args: func (Callable[[Call], Call] | None): A function that takes a Call and returns a transformed Call. If None, the identity function is used (useful for re-calculating keys). """ for k in list(self.list()): try: call = self.load(k) except KeyError: continue new_call = func(call) if func is not None else call new_key = new_call.to_lookup_key() if new_key != k: self.save(new_call) self.evict(k) else: self.save(new_call)
[docs] def query(self, template: QueryCall) -> Iterable[Call]: """Find cached calls that 'match' the template. Returns all calls where the given arguments, results or metadata match exactly the stored ones. Values may be given either as they are or as :class:`Digest`. Args: template (Call): specification for calls to return; use `None` as wildcard. Returns: Iterable[Call]: an iterable over all matching call objects """ def none_or_equal(a, b): return a is None or digest(a) == digest(b) for key in self.list(): call = self.load(key) if template.matches(call): yield call
[docs] def _contains(self, key: Digest) -> bool: try: self._load(key) return True except KeyError: return False
@dataclass(frozen=True, slots=True)
[docs] class CallStorageAdapter(CallStorage): """Implement a CallStorage from a generic Storage."""
[docs] storage: Storage
[docs] def _save(self, call: Call) -> Digest: return self.storage.save(call, call.to_lookup_key())
[docs] def _load(self, key: Digest) -> Call: return self.storage.load(key)
[docs] def _contains(self, key: Digest) -> bool: return self.storage.contains(key)
[docs] def _evict(self, key: Digest) -> None: self.storage.evict(key)
[docs] def list(self) -> Iterable[Digest]: return self.storage.list()