import bisect
import contextlib
import logging
from abc import ABC, abstractmethod
from enum import StrEnum
from typing import Iterable, Any, Callable, Sequence, overload
from ..digest import digest, Digest, DIGEST_LENGTH
from ..call import DigestedCall, QueryCall
[docs]
logger = logging.getLogger("fleche.storage")
[docs]
class SaveError(Exception):
pass
[docs]
class AmbiguousDigestError(ValueError):
pass
[docs]
class Intent(StrEnum):
"""Describes the kind of operation being performed on storage.
Mixins may use this to choose between exclusive and shared locks.
"""
[docs]
def _longest_common_prefix_length(s1: str, s2: str) -> int:
for i, (c1, c2) in enumerate(zip(s1, s2)):
if c1 != c2:
return i
return min(len(s1), len(s2))
[docs]
def _resolve_prefix(key: str, candidates: list[Digest]) -> Digest:
"""Return the unique Digest for *key* prefix, or raise KeyError / AmbiguousDigestError.
*candidates* must contain at most two entries (the two lexicographically
smallest keys that start with *key*); callers are responsible for fetching
them efficiently (e.g. via a ``LIKE … LIMIT 2`` query for SQL backends).
"""
if not candidates:
raise KeyError(key)
if len(candidates) == 1:
return candidates[0]
lcp = _longest_common_prefix_length(candidates[0], candidates[1])
raise AmbiguousDigestError(
f"Short digest {key} is ambiguous; need at least {lcp+1} characters."
)
[docs]
class OperationContext(ABC):
"""Minimal base that exposes the :meth:`_operation_context` hook.
Both :class:`KeyManagement` (storage layer) and :class:`~fleche.caches.BaseCache`
(cache layer) inherit from this class so that the same thread-safety mixins
(:class:`~fleche.storage.thread_safe.SerializingMixin`,
:class:`~fleche.storage.thread_safe.PerKeyLockMixin`) can attach to either
layer without duplication.
"""
@contextlib.contextmanager
[docs]
def _operation_context(self, key: Digest | str, *, intent: Intent = Intent.WRITE):
"""Context manager entered around every operation on ``key``.
The base implementation is a no-op. Override in a mixin to inject
any resource scoped to the operation — a threading lock, a SQLAlchemy
session, an open file handle, a decompression stream, etc.
Receiving ``key`` lets implementations choose between a single global
resource (ignore the key) or per-key resources (e.g. a striped lock
table or a key-specific file handle).
``intent`` describes the kind of operation being performed. Mixins
may use it to choose between exclusive and shared locks. Currently
the only defined value is :attr:`Intent.WRITE` (the default).
**Composing multiple mixins**: use ``super()`` to chain so that every
mixin in the MRO gets to wrap the operation::
@contextlib.contextmanager
def _operation_context(self, key, *, intent=Intent.WRITE):
with self._lock: # this mixin's resource
with super()._operation_context(key, intent=intent):
yield
"""
yield
[docs]
class KeyManagement(OperationContext):
"""Abstract base providing key-management helpers for any keyed storage.
Subclasses must implement ``list``, ``_evict``, and ``_contains``.
The concrete helpers ``evict``, ``contains``, ``expand``, and ``shrink``
are implemented here once and inherited by all storage classes.
Every public operation enters :meth:`_operation_context` around the
compound work it performs, so mixins can inject an operation-scoped
resource (e.g. a threading lock, a SQLAlchemy session, a file handle)
without overriding every method individually.
"""
@abstractmethod
[docs]
def list(self) -> Iterable[Digest]: ...
@abstractmethod
[docs]
def _evict(self, key: Digest) -> None: ...
@abstractmethod
[docs]
def _contains(self, key: Digest) -> bool: ...
[docs]
def evict(self, key: Digest | str) -> None:
"""Removes the entry corresponding to the key from the storage."""
with self._operation_context(key):
if len(key) < DIGEST_LENGTH:
key = self.expand(key)
else:
key = Digest(key)
self._evict(key)
[docs]
def contains(self, key: Digest | str) -> bool:
with self._operation_context(key):
if len(key) < DIGEST_LENGTH:
try:
key = self.expand(key)
except KeyError:
return False
else:
key = Digest(key)
return self._contains(key)
[docs]
def expand(self, key: Digest | str) -> Digest:
"""Expands a short-hand digest to the full length one."""
with self._operation_context(key):
if len(key) >= DIGEST_LENGTH:
return Digest(str(key))
if len(key) < 4:
raise KeyError(key)
candidates = sorted(k for k in self.list() if k.startswith(key))
return _resolve_prefix(str(key), candidates[:2])
@overload
[docs]
def shrink(self, key: Digest | str, /) -> Digest: ...
@overload
def shrink(self, key: Digest | str, /, *keys: Digest | str) -> "tuple[Digest, ...]": ...
def shrink(self, *keys: Digest | str) -> "Digest | tuple[Digest, ...]":
"""Find the shortest substring(s) that unambiguously reference each key.
With a single key, returns one :class:`Digest`. With multiple keys,
returns a tuple of :class:`Digest` in the same order as the inputs;
the batched form fetches ``list()`` once instead of per-key, which
matters on backends where listing is expensive (e.g. SQL, filesystem).
"""
if not keys:
raise TypeError("shrink() requires at least one key")
# Enter _operation_context for each key so subclasses with locks
# (e.g. PerKeyLockMixin) still observe the read. The list() snapshot
# is taken once inside the combined context.
with contextlib.ExitStack() as stack:
for k in keys:
stack.enter_context(self._operation_context(k))
sorted_all = sorted(self.list())
out = tuple(self._shrink_one(k, sorted_all) for k in keys)
return out[0] if len(keys) == 1 else out
[docs]
def _shrink_one(self, key: "Digest | str", sorted_all: Sequence[str]) -> Digest:
# Correctness: in a sorted key list, the longest prefix any *other*
# stored key shares with `key` is the LCP with one of `key`'s two
# immediate sorted neighbours. Lexicographic adjacency implies
# prefix adjacency, so any third key with a longer shared prefix
# would have to sit strictly between `key` and the neighbour in the
# sort order — contradicting "immediate neighbour". Therefore
# `max(lcp_left, lcp_right) + 1` is the shortest unambiguous length.
s_key = str(key)
i = bisect.bisect_left(sorted_all, s_key)
if i == len(sorted_all) or sorted_all[i] != s_key:
raise KeyError(key)
lcp_left = (
_longest_common_prefix_length(s_key, sorted_all[i - 1]) if i > 0 else 0
)
lcp_right = (
_longest_common_prefix_length(s_key, sorted_all[i + 1])
if i + 1 < len(sorted_all)
else 0
)
n = max(4, max(lcp_left, lcp_right) + 1)
if n >= len(s_key):
raise AmbiguousDigestError(
f"Digest {key} cannot be shrunk without becoming ambiguous!"
)
return Digest(s_key[:n])
[docs]
def _normalize_key(self, key: Digest | str) -> Digest:
"""Expand a short digest prefix to a full key, or wrap a full key as Digest."""
if len(key) < DIGEST_LENGTH:
return self.expand(key)
return Digest(key)
[docs]
class StorageBackend(KeyManagement):
"""Primitive backend interface for key-value storage.
Backends implement the low-level ``put``/``get``/``_evict``/``list``
operations. Higher-level classes (:class:`ValueMixin`, :class:`CallMixin`)
add domain-specific logic on top.
"""
@abstractmethod
[docs]
def put(self, value: Any, key: Digest) -> Digest: ...
@abstractmethod
[docs]
def get(self, key: Digest) -> Any: ...
[docs]
def _contains(self, key: Digest) -> bool:
with self._operation_context(key):
try:
self.get(key)
return True
except KeyError:
return False
[docs]
class ValueStorage(KeyManagement):
"""Abstract domain interface for value storage."""
@abstractmethod
[docs]
def save(self, value: Any, key: Digest | None = None) -> Digest: ...
@abstractmethod
[docs]
def load(self, key: Digest | str) -> Any: ...
[docs]
class ValueMixin(ValueStorage, StorageBackend):
"""Bridges :class:`ValueStorage` with :class:`StorageBackend` primitives.
Implements ``save`` and ``load`` using ``put`` and ``get``.
Concrete classes inherit from this and a :class:`StorageBackend`
implementation to get a fully functional value storage.
"""
[docs]
def save(self, value: Any, key: Digest | None = None) -> Digest:
if key is None:
key = digest(value)
with self._operation_context(key):
logger.debug("Saving value with key %s", key)
return self.put(value, key)
[docs]
def load(self, key: Digest | str) -> Any:
with self._operation_context(key):
key = self._normalize_key(key)
logger.debug("Loading value with key %s", key)
return self.get(key)
[docs]
class CallStorage(KeyManagement):
"""Abstract domain interface for call storage."""
@abstractmethod
[docs]
def save(self, call: DigestedCall) -> Digest: ...
@abstractmethod
[docs]
def load(self, key: Digest | str) -> DigestedCall: ...
@abstractmethod
[docs]
def query(self, template: QueryCall) -> Iterable[DigestedCall]: ...
[docs]
class CallMixin(CallStorage, StorageBackend):
"""Bridges :class:`CallStorage` with :class:`StorageBackend` primitives.
Implements ``save``, ``load``, and ``query`` using ``put`` and ``get``,
deriving the storage key from the call's lookup key. ``transform`` is
inherited from :class:`CallStorage`.
Concrete classes inherit from this and a :class:`StorageBackend`
implementation to get a fully functional call storage.
"""
[docs]
def save(self, call: DigestedCall) -> Digest:
key = call.to_lookup_key()
with self._operation_context(key):
logger.debug("Saving call %s", key)
if self.contains(key):
self.evict(key)
return self.put(call, key)
[docs]
def load(self, key: Digest | str) -> DigestedCall:
with self._operation_context(key):
key = self._normalize_key(key)
logger.debug("Loading call with key %s", key)
return self.get(key)
[docs]
def query(self, template: QueryCall) -> Iterable[DigestedCall]:
"""Find cached calls that 'match' the template.
Returns all calls where the given arguments, results or metadata match exactly the stored ones. Values may be
given either as they are or as :class:`Digest`.
Args:
template (Call): specification for calls to return; use `None` as wildcard.
Returns:
Iterable[DigestedCall]: an iterable over all matching digested call objects
"""
for key in self.list():
call = self.load(key)
if template.matches(call):
yield call