Source code for fleche.storage.memory

from dataclasses import dataclass
from typing import Any, Iterable

from .base import ValueMixin, CallMixin, StorageBackend
from .destructuring import DestructuringMixin
from .thread_safe import PerKeyLockMixin
from ..digest import Digest
from copy import deepcopy


@dataclass(frozen=True)
[docs] class MemoryBackend(StorageBackend): """ A concrete implementation of Storage that stores values in an in-memory dictionary. """
[docs] storage: dict[Digest, Any]
# Each storage instance is its own world: hash on identity so that frozen # dataclass subclasses can serve as WeakKeyDictionary keys in PerKeyLockMixin # without the unhashable dict field causing a TypeError.
[docs] __hash__ = object.__hash__
[docs] def list(self) -> Iterable[Digest]: return tuple(self.storage.keys())
[docs] def put(self, value: Any, key: Digest) -> Digest: self.storage[key] = deepcopy(value) return key
[docs] def get(self, key: Digest) -> Any: return deepcopy(self.storage[key])
[docs] def _contains(self, key: Digest) -> bool: return key in self.storage
[docs] def _evict(self, key: Digest) -> None: self.storage.pop(key, None)
@dataclass(frozen=True)
[docs] class ValueMemory(PerKeyLockMixin, DestructuringMixin, ValueMixin, MemoryBackend):
[docs] __hash__ = object.__hash__
@dataclass(frozen=True)
[docs] class CallMemory(PerKeyLockMixin, CallMixin, MemoryBackend):
[docs] __hash__ = object.__hash__