import logging
from dataclasses import dataclass
from numbers import Number
from abc import ABC, abstractmethod
from typing import Iterable, Any, Callable
from ..digest import digest, Digest, DIGEST_LENGTH
from ..call import Call, QueryCall
[docs]
logger = logging.getLogger("fleche.storage")
[docs]
class SaveError(Exception):
pass
[docs]
class AmbiguousDigestError(ValueError):
pass
[docs]
class KeyManagement(ABC):
"""Abstract base providing key-management helpers for any keyed storage.
Subclasses must implement ``list``, ``_evict``, and ``_contains``.
The concrete helpers ``evict``, ``contains``, ``expand``, and ``shrink``
are implemented here once and inherited by all storage classes.
"""
@abstractmethod
[docs]
def list(self) -> Iterable[Digest]: ...
@abstractmethod
[docs]
def _evict(self, key: Digest) -> None: ...
@abstractmethod
[docs]
def _contains(self, key: Digest) -> bool: ...
[docs]
def evict(self, key: Digest | str) -> None:
"""Removes the entry corresponding to the key from the storage."""
if len(key) < DIGEST_LENGTH:
key = self.expand(key)
else:
key = Digest(key)
self._evict(key)
[docs]
def contains(self, key: Digest | str) -> bool:
if len(key) < DIGEST_LENGTH:
try:
key = self.expand(key)
except KeyError:
return False
else:
key = Digest(key)
return self._contains(key)
[docs]
def expand(self, key: Digest | str) -> Digest:
"""Expands a short-hand digest to the full length one."""
if len(key) >= DIGEST_LENGTH:
return Digest(str(key))
if len(key) < 4:
raise KeyError(key)
matches = sorted([k for k in self.list() if k.startswith(key)])
if not matches:
raise KeyError(key)
if len(matches) > 1:
# find longest common prefix of the first two matches to find where they diverge
m1, m2 = matches[0], matches[1]
for i, (c1, c2) in enumerate(zip(m1, m2)):
if c1 != c2:
break
else:
i = min(len(m1), len(m2))
raise AmbiguousDigestError(
f"Short digest {key} is ambiguous; need at least {i+1} characters."
)
return Digest(matches[0])
[docs]
def shrink(self, key: Digest | str) -> Digest:
"""Find the shortest substring that is still an unambiguous reference to the same value."""
for ln in range(4, len(key)):
try:
self.expand(key[:ln])
return Digest(key[:ln])
except AmbiguousDigestError:
continue
raise AmbiguousDigestError(
f"Digest {key} cannot be shrunk without becoming ambiguous!"
)
[docs]
class StorageBackend(KeyManagement):
"""Primitive backend interface for key-value storage.
Backends implement the low-level ``put``/``get``/``_evict``/``list``
operations. Higher-level classes (:class:`ValueMixin`, :class:`CallMixin`)
add domain-specific logic on top.
"""
@abstractmethod
[docs]
def put(self, value: Any, key: Digest) -> Digest: ...
@abstractmethod
[docs]
def get(self, key: Digest) -> Any: ...
[docs]
def _contains(self, key: Digest) -> bool:
try:
self.get(key)
return True
except KeyError:
return False
[docs]
class ValueStorage(KeyManagement):
"""Abstract domain interface for value storage."""
@abstractmethod
[docs]
def save(self, value: Any, key: Digest | None = None) -> Digest: ...
@abstractmethod
[docs]
def load(self, key: Digest | str) -> Any: ...
[docs]
class ValueMixin(ValueStorage, StorageBackend):
"""Bridges :class:`ValueStorage` with :class:`StorageBackend` primitives.
Implements ``save`` and ``load`` using ``put`` and ``get``.
Concrete classes inherit from this and a :class:`StorageBackend`
implementation to get a fully functional value storage.
"""
[docs]
def save(self, value: Any, key: Digest | None = None) -> Digest:
if key is None:
key = digest(value)
logger.debug("Saving value with key %s", key)
return self.put(value, key)
[docs]
def load(self, key: Digest | str) -> Any:
if len(key) < DIGEST_LENGTH:
key = self.expand(key)
else:
key = Digest(key)
logger.debug("Loading value with key %s", key)
return self.get(key)
[docs]
class Digested(ABC):
@abstractmethod
[docs]
def underlying(self):
"""Return plain underlying value, ie. list/dict/etc of nested values or their partial digests"""
# mess with our hash to ensure that we are referentially transparent with respect to the underlying list.
# For the replacement of the 'real' list with the 'digested' list to be invisible to caches, they must hash to the
# same values.
[docs]
def __digest__(self):
return digest(self.underlying())
@abstractmethod
[docs]
def mend(self, storage: 'DestructuringMixin'): ...
@classmethod
@abstractmethod
[docs]
def sunder(cls, intern: Callable[[Any], tuple[Any, int | float]], value: Any): ...
@staticmethod
[docs]
def get(storage, key):
if isinstance(key, Digest):
return storage.get(key)
else:
return key
@dataclass
[docs]
class DigestedIterable(Digested):
[docs]
def underlying(self):
return self.items
[docs]
def mend(self, storage: 'DestructuringMixin') -> list | tuple:
return type(self.items)(map(lambda v: self.get(storage, v), self.items))
@classmethod
[docs]
def sunder(cls, intern: Callable[[Any], tuple[Any, int | float]], value: list | tuple):
children, depths = zip(*(intern(v) for v in value))
depth = 1 + max(depths)
items = type(value)(children)
if all(not isinstance(r, Digest) for r in items):
# intern did do anything to our children because we're out of depth, return them verbatim
return items, depth
return cls(items), depth
@dataclass
[docs]
class DigestedDict(Digested):
[docs]
def underlying(self):
return self.items
[docs]
def mend(self, storage: 'DestructuringMixin') -> dict:
return {self.get(storage, k): self.get(storage, v)
for k, v in self.items.items()}
@classmethod
[docs]
def sunder(cls, intern: Callable[[Any], tuple[Any, int | float]], value: dict):
kk, k_depths = zip(*(intern(k) for k in value))
vv, v_depths = zip(*(intern(v) for v in value.values()))
depth = 1 + max(max(k_depths), max(v_depths))
items = dict(zip(kk, vv))
if all(not isinstance(r, Digest) for r in (*items.keys(), *items.values())):
# intern did not do anything to our children because we're out of depth, return them verbatim
return items, depth
return cls(items), depth
[docs]
class DestructuringMixin(StorageBackend):
"""Mixin that recursively destructures collections on save/load.
Place before a concrete :class:`StorageBackend` in the MRO to add
destructuring behavior. Lists, tuples, and dicts are broken apart so each
element is stored independently; on load the original structure is
reassembled.
Example::
class DestructuringMemory(ValueMixin, DestructuringMixin, Memory):
pass
dm = DestructuringMemory(storage={})
key = dm.save([1, [2, 3]])
assert dm.load(key) == [1, [2, 3]]
"""
[docs]
remaining_depth: int = 0
[docs]
def _intern_rec(self, value: Any, key: Digest | None = None) -> tuple[Any, int | float]:
"""Post-order traversal: recurse to leaves, decide inline-vs-store on the way back up.
Returns ``(result, depth)`` where *result* is the plain value when ``depth < remaining_depth``
(the element is inlined in its parent's :class:`Digested` wrapper) or a :class:`Digest` when
the element was written to storage separately. Every node in the structure is visited exactly
once (O(n)), unlike a separate depth-counting pass.
"""
if isinstance(value, Digest):
return value, -1
depth = float("inf")
match value:
case Number() | str() | bytes():
depth = 0
# treat exactly like scalars, but guard syntax gets a bit weird if we try to join
# technically this violates our recursion of 1 + max children depth, but I just can't see a use for
# destructuring the empty container
case dict() | list() | tuple() if not value:
depth = 0
case list() | tuple():
value, depth = DigestedIterable.sunder(self._intern_rec, value)
case dict():
value, depth = DigestedDict.sunder(self._intern_rec, value)
if depth < self.remaining_depth:
return value, depth
return super().put(value, key or digest(value)), depth
[docs]
def put(self, value: Any, key: Digest) -> Digest:
match value:
case list() | tuple() | dict() if not value:
return super().put(value, key)
case list() | tuple() | dict():
value_or_digest, depth = self._intern_rec(value, key)
# if given value is nominally not deep enough to be destructured/saved during recursion
# we do it here manually as the recursion base case
if depth < self.remaining_depth:
return super().put(value_or_digest, key)
else:
return value_or_digest
case _:
return super().put(value, key)
[docs]
def get(self, key: Digest | Any) -> Any:
value = super().get(key)
match value:
case Digested():
return value.mend(self)
case _:
return value
[docs]
class CallStorage(KeyManagement):
"""Abstract domain interface for call storage."""
@abstractmethod
[docs]
def save(self, call: Call) -> Digest: ...
@abstractmethod
[docs]
def load(self, key: Digest | str) -> Call: ...
@abstractmethod
[docs]
def query(self, template: QueryCall) -> Iterable[Call]: ...
[docs]
class CallMixin(CallStorage, StorageBackend):
"""Bridges :class:`CallStorage` with :class:`StorageBackend` primitives.
Implements ``save``, ``load``, and ``query`` using ``put`` and ``get``,
deriving the storage key from the call's lookup key. ``transform`` is
inherited from :class:`CallStorage`.
Concrete classes inherit from this and a :class:`StorageBackend`
implementation to get a fully functional call storage.
"""
[docs]
def save(self, call: Call) -> Digest:
key = call.to_lookup_key()
logger.debug("Saving call %s", key)
if self.contains(key):
self.evict(key)
return self.put(call, key)
[docs]
def load(self, key: Digest | str) -> Call:
if len(key) < DIGEST_LENGTH:
key = self.expand(key)
else:
key = Digest(key)
logger.debug("Loading call with key %s", key)
return self.get(key)
[docs]
def query(self, template: QueryCall) -> Iterable[Call]:
"""Find cached calls that 'match' the template.
Returns all calls where the given arguments, results or metadata match exactly the stored ones. Values may be
given either as they are or as :class:`Digest`.
Args:
template (Call): specification for calls to return; use `None` as wildcard.
Returns:
Iterable[Call]: an iterable over all matching call objects
"""
def none_or_equal(a, b):
return a is None or digest(a) == digest(b)
for key in self.list():
call = self.load(key)
if template.matches(call):
yield call