Source code for calute.memory.compat

# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Backward-compatible memory API for legacy Calute callers.

Provides :class:`MemoryType`, :class:`MemoryEntry`, and :class:`MemoryStore`
which mirror the older public surface so that existing code continues to work
without modification.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import StrEnum
from typing import Any

from .base import MemoryItem
from .contextual_memory import ContextualMemory
from .storage import SQLiteStorage


[docs]class MemoryType(StrEnum): """Legacy memory type enum. Each value names a category of memory that the :class:`MemoryStore` maintains separately. """ SHORT_TERM = "short_term" LONG_TERM = "long_term" EPISODIC = "episodic" SEMANTIC = "semantic" WORKING = "working" PROCEDURAL = "procedural"
[docs]@dataclass class MemoryEntry(MemoryItem): """Legacy memory entry shape expected by the older API/tests. Extends :class:`~calute.memory.base.MemoryItem` with additional fields that the legacy public API exposes: ``context``, ``importance_score``, and ``tags``. These are synced into ``metadata`` during ``__post_init__`` so that the underlying storage machinery sees them. Attributes: memory_type: Memory category (default: SHORT_TERM). context: Arbitrary context dictionary attached to this entry. importance_score: Importance weight used for promotion decisions (0-1). tags: Searchable string labels for this entry. """ memory_type: str | MemoryType = MemoryType.SHORT_TERM context: dict[str, Any] = field(default_factory=dict) importance_score: float = 0.5 tags: list[str] = field(default_factory=list) def __post_init__(self) -> None: """Normalise memory_type and sync legacy fields into metadata.""" if isinstance(self.memory_type, MemoryType): self.memory_type = self.memory_type.value self.metadata = dict(self.metadata) if self.context: self.metadata["context"] = dict(self.context) if self.tags: self.metadata["tags"] = list(self.tags) self.metadata["importance"] = self.importance_score
[docs] def to_dict(self) -> dict[str, Any]: """Serialise entry to a dictionary including legacy fields. Returns: Dictionary representation extending the base :meth:`MemoryItem.to_dict` output with ``context``, ``importance_score``, and ``tags`` keys. """ data = super().to_dict() data["memory_type"] = self.memory_type data["context"] = dict(self.context) data["importance_score"] = self.importance_score data["tags"] = list(self.tags) return data
[docs]class MemoryStore(ContextualMemory): """Compatibility wrapper preserving the legacy memory store surface. Maintains per-type buckets (``self.memories``) alongside the underlying :class:`~calute.memory.contextual_memory.ContextualMemory` stores so that legacy callers using :meth:`add_memory` / :meth:`retrieve_memories` etc. continue to work correctly. """ def __init__( self, max_short_term: int = 100, max_working: int = 10, max_long_term: int = 10000, enable_vector_search: bool = False, embedding_dimension: int = 768, enable_persistence: bool = False, persistence_path: str | None = None, cache_size: int = 100, memory_type: MemoryType | None = None, ) -> None: """Initialise the legacy-compatible memory store. Args: max_short_term: Maximum entries in the short-term bucket. max_working: Maximum entries in the working-memory bucket. max_long_term: Maximum entries in the long-term bucket. enable_vector_search: Reserved; vector search is not currently used. embedding_dimension: Reserved embedding dimension hint. enable_persistence: Whether to enable SQLite persistence. persistence_path: File path for the SQLite database. cache_size: Reserved cache size hint. memory_type: Default :class:`MemoryType` for new entries. """ import os write_memory = os.environ.get("WRITE_MEMORY", "0") == "1" storage = None if enable_persistence and persistence_path and write_memory: storage = SQLiteStorage(persistence_path) super().__init__( short_term_capacity=max_short_term, long_term_storage=storage, promotion_threshold=3, importance_threshold=0.7, ) self.max_short_term = max_short_term self.max_working = max_working self.max_long_term = max_long_term self.enable_vector_search = enable_vector_search self.embedding_dimension = embedding_dimension self.cache_size = cache_size self.default_memory_type = memory_type or MemoryType.SHORT_TERM self.memories: dict[MemoryType, list[MemoryEntry]] = {memory_kind: [] for memory_kind in MemoryType}
[docs] def add_memory( self, content: str, memory_type: MemoryType, agent_id: str, context: dict | None = None, importance_score: float = 0.5, tags: list | None = None, **kwargs, ) -> MemoryEntry: """Create and store a new memory entry. Adds the entry to the appropriate per-type bucket, syncs it into the underlying ContextualMemory stores, and enforces capacity limits. Args: content: Text content to store. memory_type: Which bucket to place the entry in. agent_id: Identifier of the creating agent. context: Optional context dictionary attached to the entry. importance_score: Importance weight (0-1) influencing promotion. tags: Optional list of string tags for filtering. **kwargs: Additional fields passed to :class:`MemoryEntry`. Returns: The newly created :class:`MemoryEntry`. """ entry = MemoryEntry( content=content, timestamp=kwargs.pop("timestamp", datetime.now()), memory_type=memory_type, agent_id=agent_id, context=context or {}, importance_score=importance_score, tags=list(tags or []), **kwargs, ) self.memories[memory_type].append(entry) self._sync_underlying_stores(entry, memory_type) self._enforce_limit(memory_type) return entry
[docs] def retrieve_memories( self, memory_types: list[MemoryType] | None = None, agent_id: str | None = None, tags: list | None = None, limit: int = 10, min_importance: float = 0.0, query_embedding: object = None, memory_type: MemoryType | None = None, ) -> list[MemoryEntry]: """Retrieve entries matching the given criteria across memory types. Args: memory_types: Explicit list of types to search; defaults to all. agent_id: If provided, only return entries for this agent. tags: If provided, only return entries containing at least one tag. limit: Maximum number of entries to return. min_importance: Minimum importance_score threshold (inclusive). query_embedding: Accepted for API compatibility but not used. memory_type: Single type shortcut (used when memory_types is None). Returns: List of matching :class:`MemoryEntry` instances, newest first. """ del query_embedding selected_types = memory_types or ([memory_type] if memory_type is not None else list(MemoryType)) results: list[MemoryEntry] = [] for selected_type in selected_types: for entry in self.memories[selected_type]: if agent_id and entry.agent_id != agent_id: continue if tags and not any(tag in entry.tags for tag in tags): continue if entry.importance_score < min_importance: continue results.append(entry) results.sort(key=lambda item: item.timestamp, reverse=True) return results[:limit]
[docs] def retrieve_recent(self, minutes_ago: int = 60) -> list[MemoryEntry]: """Return all entries created within the last *minutes_ago* minutes. Args: minutes_ago: Look-back window in minutes. Returns: List of :class:`MemoryEntry` instances, newest first. """ cutoff = datetime.now() - timedelta(minutes=minutes_ago) recent = [entry for entries in self.memories.values() for entry in entries if entry.timestamp >= cutoff] recent.sort(key=lambda item: item.timestamp, reverse=True) return recent
[docs] def clear_memories( self, memory_type: MemoryType | None = None, agent_id: str | None = None, ) -> None: """Clear stored entries, optionally scoped to a type and/or agent. Args: memory_type: If provided, only clear this type's bucket. agent_id: If provided, only remove entries belonging to this agent. """ if memory_type is None: if agent_id is None: for selected_type in MemoryType: self.memories[selected_type] = [] else: for selected_type in MemoryType: self.memories[selected_type] = [ entry for entry in self.memories[selected_type] if entry.agent_id != agent_id ] else: if agent_id is None: self.memories[memory_type] = [] else: self.memories[memory_type] = [ entry for entry in self.memories[memory_type] if entry.agent_id != agent_id ] self._rebuild_underlying_stores()
[docs] def consolidate_memories( self, agent_id: str | None = None, merge_similar: bool = True, threshold: float = 0.7, ) -> str: """Promote high-importance memories to long-term and return a summary. Iterates over short-term, working, and episodic buckets and moves any entry with ``importance_score >= threshold`` into the long-term bucket. Then generates a text summary of the most important and recent entries. Args: agent_id: If provided, only consider entries for this agent. merge_similar: Accepted for API compatibility; merging is not performed. threshold: Minimum importance score for promotion and inclusion in summary. Returns: Formatted text summary, or an empty string if no entries match. """ del merge_similar promoted: list[MemoryEntry] = [] for source_type in (MemoryType.SHORT_TERM, MemoryType.WORKING, MemoryType.EPISODIC): retained: list[MemoryEntry] = [] for entry in self.memories[source_type]: if entry.importance_score >= threshold and (agent_id is None or entry.agent_id == agent_id): self.memories[MemoryType.LONG_TERM].append(entry) promoted.append(entry) else: retained.append(entry) self.memories[source_type] = retained self._rebuild_underlying_stores() relevant = self.retrieve_memories(agent_id=agent_id, limit=20) if not relevant: return "" summary_parts: list[str] = [] important = [memory for memory in relevant if memory.importance_score >= threshold] recent = relevant[:5] if important: summary_parts.append("Important facts:") for memory in important[:5]: summary_parts.append(f"- [{memory.importance_score:.1f}] {memory.content}") if recent: summary_parts.append("\nRecent context:") for memory in recent: if memory not in important: summary_parts.append(f"- {memory.content}") return "\n".join(summary_parts)
[docs] def get_statistics(self) -> dict: """Return aggregate statistics including the legacy total_memories count. Returns: Dictionary extending the base statistics with ``total_memories`` (sum across all type buckets) and ``cache_hit_rate`` (always 0.0). """ stats = super().get_statistics() stats["total_memories"] = sum(len(entries) for entries in self.memories.values()) stats["cache_hit_rate"] = 0.0 return stats
def _enforce_limit(self, memory_type: MemoryType) -> None: """Trim the named bucket if it exceeds its configured capacity. Args: memory_type: The bucket to check and trim. """ if memory_type == MemoryType.SHORT_TERM: limit = self.max_short_term elif memory_type == MemoryType.WORKING: limit = self.max_working elif memory_type == MemoryType.LONG_TERM: limit = self.max_long_term else: limit = None if limit is not None and len(self.memories[memory_type]) > limit: overflow = len(self.memories[memory_type]) - limit del self.memories[memory_type][0:overflow] self._rebuild_underlying_stores() def _sync_underlying_stores(self, entry: MemoryEntry, memory_type: MemoryType) -> None: """Mirror a single entry into the appropriate ContextualMemory sub-stores. Long-term, semantic, and procedural entries go into ``long_term``; all others go into ``short_term``. Args: entry: The entry to mirror. memory_type: The source bucket for this entry. """ if memory_type in {MemoryType.LONG_TERM, MemoryType.SEMANTIC, MemoryType.PROCEDURAL}: self.long_term._items.append(entry) self.long_term._index[entry.memory_id] = entry else: self.short_term._items.append(entry) self.short_term._index[entry.memory_id] = entry self._items.append(entry) self._index[entry.memory_id] = entry def _rebuild_underlying_stores(self) -> None: """Rebuild the ContextualMemory sub-stores from the canonical buckets. Clears all sub-store state and re-syncs every entry from ``self.memories``. Called after any bulk-deletion operation. """ self.short_term._items = [] self.short_term._index = {} self.long_term._items = [] self.long_term._index = {} self._items = [] self._index = {} for memory_type, entries in self.memories.items(): for entry in entries: self._sync_underlying_stores(entry, memory_type)