import logging
from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import Iterable, Any, Callable
from ..digest import digest, Digest, DIGEST_LENGTH
from ..call import Call, QueryCall
[docs]
logger = logging.getLogger("fleche.storage")
[docs]
class SaveError(Exception):
pass
[docs]
class AmbiguousDigestError(ValueError):
pass
[docs]
class StorageBase(ABC):
"""Shared functionality between value and call storages."""
@abstractmethod
[docs]
def list(self) -> Iterable[Digest]: ...
[docs]
def evict(self, key: Digest | str) -> None:
"""Removes the entry corresponding to the key from the storage."""
if len(key) < DIGEST_LENGTH:
key = self.expand(key)
else:
key = Digest(key)
self._evict(key)
@abstractmethod
[docs]
def _evict(self, key: Digest) -> None: ...
[docs]
def expand(self, key: Digest | str) -> Digest:
"""Expands a short-hand digest to the full length one."""
if len(key) >= DIGEST_LENGTH:
return Digest(str(key))
if len(key) < 4:
raise KeyError(key)
matches = sorted([k for k in self.list() if k.startswith(key)])
if not matches:
raise KeyError(key)
if len(matches) > 1:
# find longest common prefix of the first two matches to find where they diverge
m1, m2 = matches[0], matches[1]
for i, (c1, c2) in enumerate(zip(m1, m2)):
if c1 != c2:
break
else:
i = min(len(m1), len(m2))
raise AmbiguousDigestError(
f"Short digest {key} is ambiguous; need at least {i+1} characters."
)
return Digest(matches[0])
[docs]
def shrink(self, key: Digest | str) -> Digest:
"""Find the shortest substring that is still an unambigious reference to the same value."""
for ln in range(4, len(key)):
try:
self.expand(key[:ln])
return Digest(key[:ln])
except AmbiguousDigestError:
continue
raise AmbiguousDigestError(
f"Digest {key} cannot be shrunk without becoming ambigious!"
)
[docs]
def contains(self, key: Digest | str) -> bool:
if len(key) < DIGEST_LENGTH:
try:
key = self.expand(key)
except KeyError:
return False
else:
key = Digest(key)
return self._contains(key)
@abstractmethod
[docs]
def _contains(self, key: Digest) -> bool: ...
[docs]
class Storage(StorageBase):
"""Abstract base class for defining storage mechanisms."""
[docs]
def save(self, value: Any, key: Digest | None = None) -> Digest:
if key is None:
key = digest(value)
logger.debug("Saving value with key %s", key)
return self._save(value, key)
@abstractmethod
[docs]
def _save(self, value: Any, key: Digest) -> Digest: ...
[docs]
def load(self, key: Digest | str) -> Any:
if len(key) < DIGEST_LENGTH:
key = self.expand(key)
else:
key = Digest(key)
logger.debug("Loading value with key %s", key)
return self._load(key)
@abstractmethod
[docs]
def _load(self, key: Digest) -> Any: ...
[docs]
def _contains(self, key: Digest) -> bool:
try:
self._load(key)
return True
except KeyError:
return False
[docs]
class Digested(ABC):
@abstractmethod
[docs]
def underlying(self): ...
# mess with our hash to ensure that we are referentially transparent with respect to the underlying list.
# For the replacement of the 'real' list with the 'digested' list to be invisible to caches, they must hash to the
# same values.
[docs]
def __digest__(self):
return digest(self.underlying())
@dataclass
[docs]
class DigestedIterable(Digested):
[docs]
def underlying(self):
return self.items
@dataclass
[docs]
class DigestedDict(Digested):
[docs]
def underlying(self):
return self.items
@dataclass
[docs]
class DestructuringStorage(Storage):
[docs]
def __post_init__(self):
if isinstance(self.storage, DestructuringStorage):
raise ValueError("DestructuringStorage cannot wrap another DestructuringStorage")
[docs]
def _save(self, value: Any, key: Digest) -> Digest:
if isinstance(value, Digest):
return value
match value:
case list() | tuple():
return self.storage.save(
DigestedIterable(type(value)(self.save(v) for v in value))
)
case dict():
return self.storage.save(
DigestedDict({self.save(k): self.save(v) for k, v in value.items()})
)
case _:
return self.storage.save(value, key)
[docs]
def _load(self, key: Digest) -> Any:
value = self.storage.load(key)
match value:
case DigestedIterable(items=items):
return type(items)(self.load(v) for v in items)
case DigestedDict(items=items):
return {self.load(k): self.load(v) for k, v in items.items()}
case _:
return value
[docs]
def _contains(self, key: Digest) -> bool:
return self.storage.contains(key)
[docs]
def _evict(self, key: Digest) -> None:
self.storage.evict(key)
[docs]
def list(self) -> Iterable[Digest]:
return self.storage.list()
[docs]
class CallStorage(StorageBase):
"""Special storage for saving :class:`Call` instances."""
[docs]
def save(self, call: Call) -> Digest:
key = call.to_lookup_key()
logger.debug("Saving call %s", key)
if self.contains(str(key)):
self.evict(str(key))
return self._save(call)
@abstractmethod
[docs]
def _save(self, call: Call) -> Digest: ...
[docs]
def load(self, key: str | Digest) -> Call:
if len(key) < DIGEST_LENGTH:
key = self.expand(key)
else:
key = Digest(key)
logger.debug("Loading call with key %s", key)
return self._load(key)
@abstractmethod
[docs]
def _load(self, key: Digest) -> Call: ...
[docs]
def query(self, template: QueryCall) -> Iterable[Call]:
"""Find cached calls that 'match' the template.
Returns all calls where the given arguments, results or metadata match exactly the stored ones. Values may be
given either as they are or as :class:`Digest`.
Args:
template (Call): specification for calls to return; use `None` as wildcard.
Returns:
Iterable[Call]: an iterable over all matching call objects
"""
def none_or_equal(a, b):
return a is None or digest(a) == digest(b)
for key in self.list():
call = self.load(key)
if template.matches(call):
yield call
[docs]
def _contains(self, key: Digest) -> bool:
try:
self._load(key)
return True
except KeyError:
return False
@dataclass(frozen=True, slots=True)
[docs]
class CallStorageAdapter(CallStorage):
"""Implement a CallStorage from a generic Storage."""
[docs]
def _save(self, call: Call) -> Digest:
return self.storage.save(call, call.to_lookup_key())
[docs]
def _load(self, key: Digest) -> Call:
return self.storage.load(key)
[docs]
def _contains(self, key: Digest) -> bool:
return self.storage.contains(key)
[docs]
def _evict(self, key: Digest) -> None:
self.storage.evict(key)
[docs]
def list(self) -> Iterable[Digest]:
return self.storage.list()