Source code for calute.memory.long_term_memory

# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Long-term memory implementation with persistence and semantic search."""

from datetime import datetime, timedelta
from typing import Any

from .base import Memory, MemoryItem
from .storage import RAGStorage, SQLiteStorage


[docs]class LongTermMemory(Memory): """Long-term memory with persistence and semantic search. Designed for storing important information over extended periods. Supports both keyword-based and semantic (vector similarity) search depending on the storage backend. Automatically cleans up expired or low-importance memories when the item limit is reached. Attributes: retention_days: Number of days a memory item is retained before it becomes eligible for automatic cleanup. storage: The underlying :class:`MemoryStorage` backend used for persistence. May be a :class:`SQLiteStorage`, :class:`RAGStorage`, or any compatible implementation. Example: >>> from calute.memory import LongTermMemory >>> ltm = LongTermMemory(retention_days=90, max_items=500) >>> item = ltm.save("Project deadline is March 15", importance=0.9) >>> results = ltm.search("deadline") """ def __init__( self, storage: Any | None = None, enable_embeddings: bool = True, db_path: str | None = None, max_items: int = 10000, retention_days: int = 365, ) -> None: """Initialize long-term memory with persistence and optional embeddings. When no ``storage`` is provided, a default backend is constructed: :class:`SQLiteStorage` (at ``db_path`` or the default path) optionally wrapped in :class:`RAGStorage` when ``enable_embeddings`` is ``True``. On initialisation, any previously persisted items (keys prefixed with ``ltm_``) are loaded from the storage backend into memory. Args: storage: Pre-configured :class:`MemoryStorage` backend. When ``None``, a new SQLite-backed storage is created. enable_embeddings: Whether to wrap the base storage in :class:`RAGStorage` for semantic search capability. Only effective when ``storage`` is ``None``. db_path: File path for the SQLite database. Only used when ``storage`` is ``None``. max_items: Maximum number of items to retain. When exceeded, :meth:`_cleanup_old_memories` is invoked. retention_days: Number of days after which a memory is eligible for automatic removal during cleanup. """ if storage is None: if db_path: base_storage = SQLiteStorage(db_path) else: base_storage = SQLiteStorage() storage = RAGStorage(base_storage) if enable_embeddings else base_storage super().__init__(storage=storage, max_items=max_items, enable_embeddings=enable_embeddings) self.retention_days = retention_days self._load_from_storage() def _load_from_storage(self) -> None: """Load existing memory items from the storage backend on initialisation. Scans all keys with the ``ltm_`` prefix, deserialises each entry via :meth:`MemoryItem.from_dict`, and populates both :attr:`_items` and :attr:`_index`. """ if not self.storage: return for key in self.storage.list_keys("ltm_"): data = self.storage.load(key) if data: item = MemoryItem.from_dict(data) self._items.append(item) self._index[item.memory_id] = item
[docs] def save( self, content: str, metadata: dict[str, Any] | None = None, agent_id: str | None = None, user_id: str | None = None, conversation_id: str | None = None, importance: float = 0.5, **kwargs, ) -> MemoryItem: """Save a new item to long-term memory with importance scoring. Creates a :class:`MemoryItem` with ``memory_type="long_term"``, stores it in both the in-memory index and the persistent storage backend (if configured). If the item limit has been reached, :meth:`_cleanup_old_memories` is called first to free space. Args: content: Text content to store. metadata: Optional key-value metadata. An ``"importance"`` key is added automatically from the ``importance`` parameter. agent_id: Identifier of the agent that created this memory. user_id: Identifier of the user associated with this memory. conversation_id: Identifier of the conversation context. importance: Importance weight (0.0--1.0) used for ranking and cleanup decisions. **kwargs: Extra key-value pairs merged into ``metadata``. Returns: The newly created :class:`MemoryItem`. """ metadata = metadata or {} metadata["importance"] = importance metadata.update(kwargs) item = MemoryItem( content=content, memory_type="long_term", metadata=metadata, agent_id=agent_id, user_id=user_id, conversation_id=conversation_id, ) if self.max_items and len(self._items) >= self.max_items: self._cleanup_old_memories() self._items.append(item) self._index[item.memory_id] = item if self.storage: self.storage.save(f"ltm_{item.memory_id}", item.to_dict()) return item
[docs] def search( self, query: str, limit: int = 10, filters: dict[str, Any] | None = None, use_semantic: bool = True, **kwargs ) -> list[MemoryItem]: """Search long-term memory using semantic similarity or keyword matching. When the storage backend is a :class:`RAGStorage` instance and ``use_semantic`` is ``True``, performs vector-similarity search. Otherwise, falls back to keyword matching with a composite scoring formula that blends text relevance (50 %), recency (30 %), and importance (20 %). Matching items have their ``access_count`` incremented and ``last_accessed`` timestamp updated as a side-effect. Args: query: Natural-language or keyword search query string. limit: Maximum number of results to return. filters: Optional key-value criteria for narrowing results. Checked against both item attributes and metadata. use_semantic: When ``True`` and a :class:`RAGStorage` backend is available, performs vector-based semantic search. **kwargs: Additional keyword arguments (currently unused). Returns: List of :class:`MemoryItem` instances sorted by descending relevance score, with at most ``limit`` entries. """ if use_semantic and isinstance(self.storage, RAGStorage): results = self.storage.search_similar(query, limit=limit * 2) memories = [] for key, similarity, data in results: if key.startswith("ltm_"): item = MemoryItem.from_dict(data) item.relevance_score = similarity if filters: if not self._matches_filters(item, filters): continue item.access_count += 1 item.last_accessed = datetime.now() memories.append(item) if len(memories) >= limit: break return memories query_lower = query.lower() matches = [] for item in self._items: if filters and not self._matches_filters(item, filters): continue relevance = self._calculate_relevance(item.content, query_lower) age_days = (datetime.now() - item.timestamp).days recency_score = max(0, 1 - (age_days / self.retention_days)) importance = item.metadata.get("importance", 0.5) item.relevance_score = relevance * 0.5 + recency_score * 0.3 + importance * 0.2 if item.relevance_score > 0: item.access_count += 1 item.last_accessed = datetime.now() matches.append(item) matches.sort(key=lambda x: x.relevance_score, reverse=True) return matches[:limit]
[docs] def retrieve( self, memory_id: str | None = None, filters: dict[str, Any] | None = None, limit: int = 10, ) -> MemoryItem | list[MemoryItem] | None: """ Retrieve specific memories by ID or filter criteria. When memory_id is provided, returns the specific item and updates its access count. Otherwise, filters through memories and returns matches. Args: memory_id: Specific memory ID to retrieve filters: Filter criteria to match against memory attributes and metadata limit: Maximum number of items to return when using filters Returns: Single MemoryItem if memory_id provided, list of MemoryItem if filters used, or None if memory_id not found """ if memory_id: item = self._index.get(memory_id) if item: item.access_count += 1 item.last_accessed = datetime.now() if self.storage: self.storage.save(f"ltm_{memory_id}", item.to_dict()) return item results = [] for item in self._items: if filters and not self._matches_filters(item, filters): continue item.access_count += 1 item.last_accessed = datetime.now() results.append(item) if len(results) >= limit: break return results
[docs] def update(self, memory_id: str, updates: dict[str, Any]) -> bool: """ Update a memory item with new values. Persists changes to storage backend if configured. Args: memory_id: ID of the memory item to update updates: Dictionary of field names and new values to apply Returns: True if the update was successful, False if memory_id not found """ if memory_id not in self._index: return False item = self._index[memory_id] for key, value in updates.items(): if hasattr(item, key): setattr(item, key, value) if self.storage: self.storage.save(f"ltm_{memory_id}", item.to_dict()) return True
[docs] def delete(self, memory_id: str | None = None, filters: dict[str, Any] | None = None) -> int: """ Delete memory items by ID or filter criteria. Removes items from both memory and storage backend if configured. Args: memory_id: Specific memory ID to delete filters: Filter criteria to match items for deletion Returns: Number of items deleted """ count = 0 if memory_id: if memory_id in self._index: item = self._index[memory_id] self._items.remove(item) del self._index[memory_id] if self.storage: self.storage.delete(f"ltm_{memory_id}") count = 1 elif filters: to_remove = [] for item in self._items: if self._matches_filters(item, filters): to_remove.append(item) for item in to_remove: self._items.remove(item) del self._index[item.memory_id] if self.storage: self.storage.delete(f"ltm_{item.memory_id}") count += 1 return count
[docs] def clear(self) -> None: """ Clear all long-term memories. Removes all items from memory and storage backend. This operation permanently deletes all stored memories. """ if self.storage: for key in self.storage.list_keys("ltm_"): self.storage.delete(key) self._items.clear() self._index.clear()
def _cleanup_old_memories(self) -> None: """Remove expired or low-importance memories to free capacity. Applies a three-stage cleanup strategy: 1. Remove items older than :attr:`retention_days`. 2. Remove items with importance < 0.3 **and** access count < 2. 3. If the above two stages did not free at least 20 % of current items, sort all items by a composite score (importance 30 %, normalised access count 30 %, recency 40 %) and remove the bottom 20 %. Removed items are also deleted from the storage backend if one is configured. """ cutoff_date = datetime.now() - timedelta(days=self.retention_days) to_remove = [] for item in self._items: if item.timestamp < cutoff_date: to_remove.append(item) elif item.metadata.get("importance", 0.5) < 0.3 and item.access_count < 2: to_remove.append(item) if len(to_remove) < len(self._items) * 0.2: self._items.sort( key=lambda x: ( x.metadata.get("importance", 0.5) * 0.3 + (x.access_count / 100) * 0.3 + (1 - (datetime.now() - x.timestamp).days / self.retention_days) * 0.4 ) ) to_remove = list(self._items[: int(len(self._items) * 0.2)]) for item in to_remove: self._items.remove(item) del self._index[item.memory_id] if self.storage: self.storage.delete(f"ltm_{item.memory_id}") def _matches_filters(self, item: MemoryItem, filters: dict[str, Any]) -> bool: """ Check if item matches all filter criteria. Checks both direct attributes and metadata fields. Supports callable filter values for custom comparisons (e.g., ``{"importance": lambda x: x >= 0.8}``). Args: item: Memory item to check filters: Dictionary of field names to required values or callables Returns: True if item matches all filters, False otherwise """ for key, value in filters.items(): if hasattr(item, key): actual = getattr(item, key) elif key in item.metadata: actual = item.metadata[key] else: return False if callable(value): if not value(actual): return False elif actual != value: return False return True def _calculate_relevance(self, content: str, query: str) -> float: """ Calculate keyword-based relevance score. Uses exact match and word overlap to compute relevance. Args: content: Content string to search within query: Query string (should be lowercase) Returns: Relevance score between 0.0 and 1.0 """ content_lower = content.lower() if query in content_lower: return 1.0 query_words = query.split() if query_words: matching = sum(1 for word in query_words if word in content_lower) return matching / len(query_words) return 0.0
[docs] def consolidate(self, merge_similar: bool = True, similarity_threshold: float = 0.8) -> str: """ Consolidate memories by merging similar entries and producing a summary. Groups memories by conversation or agent, merges similar entries to reduce redundancy, removes low-value items, and produces a human-readable summary. When merge_similar is True, entries with high word overlap are combined into single entries. Args: merge_similar: Whether to merge entries with similar content similarity_threshold: Word overlap ratio to consider entries similar (0-1) Returns: Formatted string summary of consolidated long-term memory contents """ if not self._items: return "No long-term memories available." if merge_similar: self._merge_similar_memories(similarity_threshold) grouped: dict[str, list[MemoryItem]] = {} for item in self._items: key = item.conversation_id or item.agent_id or "general" if key not in grouped: grouped[key] = [] grouped[key].append(item) summary = ["Long-term memory summary:"] for key, items in grouped.items(): items.sort(key=lambda x: (x.metadata.get("importance", 0.5), x.timestamp), reverse=True) summary.append(f"\n{key.title()}:") for item in items[:5]: importance = item.metadata.get("importance", 0.5) access_info = f"(importance: {importance:.1f}, accessed: {item.access_count}x)" summary.append(f" - {item.content[:150]} {access_info}") return "\n".join(summary)
def _merge_similar_memories(self, threshold: float = 0.8): """Merge memories with similar content to reduce redundancy. Compares word sets between items and merges those exceeding the similarity threshold. The merged item retains the higher importance and combined access count. Args: threshold: Minimum word overlap ratio to trigger a merge (0-1) """ if len(self._items) < 2: return merged_ids: set[str] = set() for i, item_a in enumerate(self._items): if item_a.memory_id in merged_ids: continue words_a = set(item_a.content.lower().split()) if not words_a: continue for j in range(i + 1, len(self._items)): item_b = self._items[j] if item_b.memory_id in merged_ids: continue words_b = set(item_b.content.lower().split()) if not words_b: continue overlap = len(words_a & words_b) / max(len(words_a), len(words_b)) if overlap >= threshold: keep = item_a if len(item_a.content) >= len(item_b.content) else item_b discard = item_b if keep is item_a else item_a keep.access_count += discard.access_count keep_importance = keep.metadata.get("importance", 0.5) discard_importance = discard.metadata.get("importance", 0.5) keep.metadata["importance"] = max(keep_importance, discard_importance) merged_ids.add(discard.memory_id) if merged_ids: for mid in merged_ids: if mid in self._index: item = self._index[mid] self._items.remove(item) del self._index[mid] if self.storage: self.storage.delete(f"ltm_{mid}")