import logging
from dataclasses import dataclass
from numbers import Number
from abc import ABC, abstractmethod
from typing import Iterable, Any, Callable, Self
from ..digest import digest, Digest, DIGEST_LENGTH
from ..call import Call, QueryCall
[docs]
logger = logging.getLogger("fleche.storage")
[docs]
class SaveError(Exception):
pass
[docs]
class AmbiguousDigestError(ValueError):
pass
[docs]
class StorageBase(ABC):
"""Shared functionality between value and call storages."""
@abstractmethod
[docs]
def list(self) -> Iterable[Digest]: ...
[docs]
def evict(self, key: Digest | str) -> None:
"""Removes the entry corresponding to the key from the storage."""
if len(key) < DIGEST_LENGTH:
key = self.expand(key)
else:
key = Digest(key)
self._evict(key)
@abstractmethod
[docs]
def _evict(self, key: Digest) -> None: ...
[docs]
def expand(self, key: Digest | str) -> Digest:
"""Expands a short-hand digest to the full length one."""
if len(key) >= DIGEST_LENGTH:
return Digest(str(key))
if len(key) < 4:
raise KeyError(key)
matches = sorted([k for k in self.list() if k.startswith(key)])
if not matches:
raise KeyError(key)
if len(matches) > 1:
# find longest common prefix of the first two matches to find where they diverge
m1, m2 = matches[0], matches[1]
for i, (c1, c2) in enumerate(zip(m1, m2)):
if c1 != c2:
break
else:
i = min(len(m1), len(m2))
raise AmbiguousDigestError(
f"Short digest {key} is ambiguous; need at least {i+1} characters."
)
return Digest(matches[0])
[docs]
def shrink(self, key: Digest | str) -> Digest:
"""Find the shortest substring that is still an unambigious reference to the same value."""
for ln in range(4, len(key)):
try:
self.expand(key[:ln])
return Digest(key[:ln])
except AmbiguousDigestError:
continue
raise AmbiguousDigestError(
f"Digest {key} cannot be shrunk without becoming ambigious!"
)
[docs]
def contains(self, key: Digest | str) -> bool:
if len(key) < DIGEST_LENGTH:
try:
key = self.expand(key)
except KeyError:
return False
else:
key = Digest(key)
return self._contains(key)
@abstractmethod
[docs]
def _contains(self, key: Digest) -> bool: ...
[docs]
class Storage(StorageBase):
"""Abstract base class for defining storage mechanisms."""
[docs]
def save(self, value: Any, key: Digest | None = None) -> Digest:
if key is None:
key = digest(value)
logger.debug("Saving value with key %s", key)
return self._save(value, key)
@abstractmethod
[docs]
def _save(self, value: Any, key: Digest) -> Digest: ...
[docs]
def load(self, key: Digest | str) -> Any:
if len(key) < DIGEST_LENGTH:
key = self.expand(key)
else:
key = Digest(key)
logger.debug("Loading value with key %s", key)
return self._load(key)
@abstractmethod
[docs]
def _load(self, key: Digest) -> Any: ...
[docs]
def _contains(self, key: Digest) -> bool:
try:
self._load(key)
return True
except KeyError:
return False
[docs]
class Digested(ABC):
@abstractmethod
[docs]
def underlying(self):
"""Return plain underlying value, ie. list/dict/etc of nested values or their partial digests"""
# mess with our hash to ensure that we are referentially transparent with respect to the underlying list.
# For the replacement of the 'real' list with the 'digested' list to be invisible to caches, they must hash to the
# same values.
[docs]
def __digest__(self):
return digest(self.underlying())
@abstractmethod
[docs]
def mend(self, storage: 'DestructuringMixin'): ...
@classmethod
@abstractmethod
[docs]
def sunder(cls, intern: Callable[[Any], tuple[Any, int | float]], value: Any): ...
@dataclass
[docs]
class DigestedIterable(Digested):
[docs]
def underlying(self):
return self.items
[docs]
def mend(self, storage: 'DestructuringMixin') -> list | tuple:
return type(self.items)(map(storage._load, self.items))
@classmethod
[docs]
def sunder(cls, intern: Callable[[Any], tuple[Any, int | float]], value: list | tuple):
children, depths = zip(*(intern(v) for v in value))
depth = 1 + max(depths)
items = type(value)(children)
if all(not isinstance(r, Digest) for r in items):
# intern did do anything to our children because we're out of depth, return them verbatim
return items, depth
return cls(items), depth
@dataclass
[docs]
class DigestedDict(Digested):
[docs]
def underlying(self):
return self.items
[docs]
def mend(self, storage: 'DestructuringMixin') -> dict:
return {storage._load(k): storage._load(v) for k, v in self.items.items()}
@classmethod
[docs]
def sunder(cls, intern: Callable[[Any], tuple[Any, int | float]], value: dict):
kk, k_depths = zip(*(intern(k) for k in value))
vv, v_depths = zip(*(intern(v) for v in value.values()))
depth = 1 + max(max(k_depths), max(v_depths))
items = dict(zip(kk, vv))
if all(not isinstance(r, Digest) for r in (*items.keys(), *items.values())):
# intern did do anything to our children because we're out of depth, return them verbatim
return items, depth
return cls(items), depth
[docs]
class DestructuringMixin(Storage):
"""Mixin that recursively destructures collections on save/load.
Place before a concrete :class:`Storage` in the MRO to add destructuring
behavior. Lists, tuples, and dicts are broken apart so each element is
stored independently; on load the original structure is reassembled.
Example::
class DestructuringMemory(DestructuringMixin, Memory):
pass
# ``storage`` here is the backing dict required by ``Memory``, not a
# Storage instance.
dm = DestructuringMemory(storage={})
key = dm.save([1, [2, 3]])
assert dm.load(key) == [1, [2, 3]]
"""
[docs]
remaining_depth: int = 0
[docs]
def _intern_rec(self, value: Any) -> tuple[Any, int | float]:
"""Post-order traversal: recurse to leaves, decide inline-vs-store on the way back up.
Returns ``(result, depth)`` where *result* is the plain value when ``depth < remaining_depth``
(the element is inlined in its parent's :class:`Digested` wrapper) or a :class:`Digest` when
the element was written to storage separately. Every node in the structure is visited exactly
once (O(n)), unlike a separate depth-counting pass.
"""
depth = float("inf")
match value:
case Number() | str() | bytes():
depth = 0
# treat exactly like scalars, but guard syntax gets a bit weird if we try to join
# technically this violates our recursion of 1 + max children depth, but I just can't see a use for
# destructuring the empty container
case dict() | list() | tuple() if not value:
depth = 0
case list() | tuple():
value, depth = DigestedIterable.sunder(self._intern_rec, value)
case dict():
value, depth = DigestedDict.sunder(self._intern_rec, value)
if depth < self.remaining_depth:
return value, depth
return super()._save(value, digest(value)), depth
[docs]
def _save(self, value: Any, key: Digest) -> Digest:
if isinstance(value, Digest):
return value
match value:
case list() | tuple() if not value:
return super()._save(value, key)
case list() | tuple():
children, _ = zip(*(self._intern_rec(v) for v in value))
items = type(value)(children)
if not any(isinstance(r, Digest) for r in items):
return super()._save(items, key)
return super()._save(DigestedIterable(items), key)
case dict() if not value:
return super()._save(value, key)
case dict():
kk, _ = zip(*(self._intern_rec(k) for k in value))
vv, _ = zip(*(self._intern_rec(v) for v in value.values()))
items = dict(zip(kk, vv))
if not any(isinstance(r, Digest) for r in (*items.keys(), *items.values())):
return super()._save(items, key)
return super()._save(DigestedDict(items), key)
case _:
return super()._save(value, key)
[docs]
def _load(self, key: Digest | Any) -> Any:
if not isinstance(key, Digest):
return key # passing through an actual value from Digested.mend
value = super()._load(key)
match value:
case Digested():
return value.mend(self)
case _:
return value
@dataclass(frozen=True)
[docs]
class _DelegatingStorage(Storage):
"""Storage that delegates all operations to a wrapped storage instance."""
[docs]
def _save(self, value: Any, key: Digest) -> Digest:
return self.storage.save(value, key)
[docs]
def _load(self, key: Digest) -> Any:
return self.storage.load(key)
[docs]
def _contains(self, key: Digest) -> bool:
return self.storage.contains(key)
[docs]
def _evict(self, key: Digest) -> None:
return self.storage.evict(key)
[docs]
def list(self) -> Iterable[Digest]:
return self.storage.list()
@dataclass(frozen=True)
[docs]
class DestructuringStorage(DestructuringMixin, _DelegatingStorage):
"""Storage wrapper that recursively destructures collections.
This is a convenience class combining :class:`DestructuringMixin` with
delegation to a wrapped storage. Prefer using :class:`DestructuringMixin`
directly as a mixin with a concrete storage class when possible.
"""
[docs]
remaining_depth: int = 0
[docs]
def __post_init__(self):
if isinstance(self.storage, DestructuringMixin):
raise ValueError("DestructuringStorage cannot wrap a storage that already uses DestructuringMixin")
[docs]
class CallStorage(StorageBase):
"""Special storage for saving :class:`Call` instances."""
[docs]
def save(self, call: Call) -> Digest:
key = call.to_lookup_key()
logger.debug("Saving call %s", key)
if self.contains(str(key)):
self.evict(str(key))
return self._save(call)
@abstractmethod
[docs]
def _save(self, call: Call) -> Digest: ...
[docs]
def load(self, key: str | Digest) -> Call:
if len(key) < DIGEST_LENGTH:
key = self.expand(key)
else:
key = Digest(key)
logger.debug("Loading call with key %s", key)
return self._load(key)
@abstractmethod
[docs]
def _load(self, key: Digest) -> Call: ...
[docs]
def query(self, template: QueryCall) -> Iterable[Call]:
"""Find cached calls that 'match' the template.
Returns all calls where the given arguments, results or metadata match exactly the stored ones. Values may be
given either as they are or as :class:`Digest`.
Args:
template (Call): specification for calls to return; use `None` as wildcard.
Returns:
Iterable[Call]: an iterable over all matching call objects
"""
def none_or_equal(a, b):
return a is None or digest(a) == digest(b)
for key in self.list():
call = self.load(key)
if template.matches(call):
yield call
[docs]
def _contains(self, key: Digest) -> bool:
try:
self._load(key)
return True
except KeyError:
return False
@dataclass(frozen=True, slots=True)
[docs]
class CallStorageAdapter(CallStorage):
"""Implement a CallStorage from a generic Storage."""
[docs]
def _save(self, call: Call) -> Digest:
return self.storage.save(call, call.to_lookup_key())
[docs]
def _load(self, key: Digest) -> Call:
return self.storage.load(key)
[docs]
def _contains(self, key: Digest) -> bool:
return self.storage.contains(key)
[docs]
def _evict(self, key: Digest) -> None:
self.storage.evict(key)
[docs]
def list(self) -> Iterable[Digest]:
return self.storage.list()