Source code for calute.context.compaction_strategies

# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Context compaction strategies for managing conversation history.

This module provides various strategies for compacting conversation
history when context length exceeds token limits. Each strategy
implements a different approach to reducing context size while
preserving relevant information.

The strategies range from simple truncation to intelligent
summarization using LLM capabilities, allowing for flexible
context management based on requirements and available resources.

Key Components:
    - BaseCompactionStrategy: Abstract base class defining the interface
    - SummarizationStrategy: LLM-based conversation summarization
    - SlidingWindowStrategy: Recent message retention with window
    - PriorityBasedStrategy: Importance-based message selection
    - SummarizationStrategy: LLM-based summarization
    - TruncateStrategy: Simple truncation for emergency cases

Example:
    >>> from calute.context import get_compaction_strategy
    >>> from calute.types import CompactionStrategy
    >>> strategy = get_compaction_strategy(
    ...     strategy=CompactionStrategy.SLIDING_WINDOW,
    ...     target_tokens=4000,
    ...     model="gpt-4"
    ... )
    >>> compacted, stats = strategy.compact(messages)
"""

from abc import ABC, abstractmethod
from collections.abc import Callable
from typing import Any

from ..types.function_execution_types import CompactionStrategy
from .token_counter import SmartTokenCounter


[docs]class BaseCompactionStrategy(ABC): """Base class for context compaction strategies. Provides the foundational interface and common functionality for all compaction strategies. Subclasses must implement the compact() method to define their specific compaction logic. Attributes: target_tokens: Target number of tokens after compaction. model: Model name for accurate token counting. preserve_system: Whether to preserve system messages during compaction. preserve_recent: Number of recent messages to always preserve. token_counter: SmartTokenCounter instance for token counting. """ def __init__( self, target_tokens: int, model: str = "gpt-4", preserve_system: bool = True, preserve_recent: int = 3, ): """Initialize the compaction strategy. Args: target_tokens: Target number of tokens after compaction model: Model name for token counting preserve_system: Whether to preserve system messages preserve_recent: Number of recent messages to preserve """ self.target_tokens = target_tokens self.model = model self.preserve_system = preserve_system self.preserve_recent = preserve_recent self.token_counter = SmartTokenCounter(model=model)
[docs] @abstractmethod def compact( self, messages: list[dict[str, str]], metadata: dict[str, Any] | None = None, ) -> tuple[list[dict[str, str]], dict[str, Any]]: """Compact the message history. Args: messages: List of message dictionaries metadata: Optional metadata about messages Returns: Tuple of (compacted_messages, compaction_stats) """ pass
def _separate_messages( self, messages: list[dict[str, str]] ) -> tuple[list[dict[str, str]], list[dict[str, str]], list[dict[str, str]]]: """Separate messages into system, preserved, and compactable. Args: messages: List of all messages Returns: Tuple of (system_messages, preserved_messages, compactable_messages) """ system_messages = [] preserved_messages = [] compactable_messages = [] for msg in messages: if msg.get("role") == "system" and self.preserve_system: system_messages.append(msg) break non_system = [m for m in messages if m.get("role") != "system"] if self.preserve_recent > 0 and len(non_system) > self.preserve_recent: preserved_messages = non_system[-self.preserve_recent :] compactable_messages = non_system[: -self.preserve_recent] else: preserved_messages = non_system compactable_messages = [] return system_messages, preserved_messages, compactable_messages
[docs]class SummarizationStrategy(BaseCompactionStrategy): """Compaction strategy that uses LLM to summarize older messages. This strategy leverages an LLM client to intelligently summarize older portions of conversation history, creating a condensed representation that preserves key information while reducing token count significantly. Attributes: llm_client: LLM client instance for generating summaries. compaction_agent: Optional compaction agent for advanced summarization. """ def __init__(self, llm_client: Any | None = None, **kwargs): """Initialize summarization strategy. Args: llm_client: LLM client for generating summaries **kwargs: Arguments for base class """ super().__init__(**kwargs) self.llm_client = llm_client self.compaction_agent = None if llm_client: from ..agents.compaction_agent import create_compaction_agent self.compaction_agent = create_compaction_agent(llm_client, target_length="concise")
[docs] def compact( self, messages: list[dict[str, str]], metadata: dict[str, Any] | None = None, ) -> tuple[list[dict[str, str]], dict[str, Any]]: """Compact messages using summarization. Args: messages: List of message dictionaries metadata: Optional metadata Returns: Compacted messages and statistics """ system_msgs, preserved_msgs, compactable_msgs = self._separate_messages(messages) stats = { "original_count": len(messages), "strategy": "summarization", } if not compactable_msgs and len(preserved_msgs) == 1: single_msg = preserved_msgs[0] content = single_msg.get("content", "") if self.compaction_agent and len(content) > 500: try: summary = self.compaction_agent.summarize_context(content) compacted = [*system_msgs, {"role": single_msg.get("role", "user"), "content": summary}] stats["compacted_count"] = len(compacted) stats["summary_created"] = True stats["messages_summarized"] = 1 return compacted, stats except Exception as e: print(f"Error summarizing single message: {e}") if not compactable_msgs: stats["compacted_count"] = len(messages) stats["summary_created"] = False return messages, stats if self.compaction_agent: compacted = self.compaction_agent.summarize_messages(messages=messages, preserve_recent=self.preserve_recent) stats["compacted_count"] = len(compacted) stats["summary_created"] = True stats["messages_summarized"] = len(compactable_msgs) return compacted, stats else: conversation_text = self._format_conversation(compactable_msgs) summary = self._generate_summary(conversation_text) summary_message = {"role": "system", "content": f"[Previous conversation summary]\n{summary}"} compacted = [*system_msgs, summary_message, *preserved_msgs] stats["compacted_count"] = len(compacted) stats["summary_created"] = True stats["messages_summarized"] = len(compactable_msgs) return compacted, stats
def _format_conversation(self, messages: list[dict[str, str]]) -> str: """Format messages as conversation text for summarization. Args: messages: List of message dictionaries to format. Returns: Formatted conversation text with role prefixes. """ lines = [] for msg in messages: role = msg.get("role", "unknown").capitalize() content = msg.get("content", "") lines.append(f"{role}: {content}") return "\n\n".join(lines) def _generate_summary(self, conversation: str) -> str: """Generate summary using LLM or fallback method. Falls back to a simple truncation-based summary if no LLM client is available. Args: conversation: Formatted conversation text to summarize. Returns: Summarized version of the conversation. """ if self.llm_client: try: import asyncio prompt = ( "Summarize the following conversation concisely. " "Preserve key facts, decisions, and outcomes. " "Remove redundant information.\n\n" f"CONVERSATION:\n{conversation}\n\nSUMMARY:" ) try: loop = asyncio.get_event_loop() if loop.is_running(): from ..core.utils import run_sync response = run_sync( self.llm_client.generate_completion( prompt=prompt, temperature=0.3, max_tokens=1024, stream=False ) ) else: response = loop.run_until_complete( self.llm_client.generate_completion( prompt=prompt, temperature=0.3, max_tokens=1024, stream=False ) ) except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) response = loop.run_until_complete( self.llm_client.generate_completion( prompt=prompt, temperature=0.3, max_tokens=1024, stream=False ) ) if hasattr(self.llm_client, "extract_content"): return self.llm_client.extract_content(response) elif hasattr(response, "choices") and response.choices: return response.choices[0].message.content elif isinstance(response, str): return response return str(response) except Exception: pass lines = conversation.split("\n") if len(lines) > 10: summary_parts = ["Earlier discussion covered:", *lines[:5], "...", "Recent points:", *lines[-5:]] return "\n".join(summary_parts) return conversation
[docs]class SlidingWindowStrategy(BaseCompactionStrategy): """Compaction strategy that keeps only recent messages. Implements a sliding window approach where older messages are progressively removed to stay within token limits, while always preserving the most recent messages for context continuity. This strategy is efficient and doesn't require an LLM client, making it suitable for cost-sensitive applications. """
[docs] def compact( self, messages: list[dict[str, str]], metadata: dict[str, Any] | None = None, ) -> tuple[list[dict[str, str]], dict[str, Any]]: """Compact messages using sliding window. Args: messages: List of message dictionaries metadata: Optional metadata Returns: Compacted messages and statistics """ stats = { "original_count": len(messages), "strategy": "sliding_window", } system_msgs = [m for m in messages if m.get("role") == "system"] non_system = [m for m in messages if m.get("role") != "system"] compacted = system_msgs.copy() if self.preserve_system else [] if self.preserve_recent > 0 and len(non_system) > 0: recent_to_keep = min(self.preserve_recent, len(non_system)) recent_messages = non_system[-recent_to_keep:] remaining_messages = non_system[:-recent_to_keep] if recent_to_keep < len(non_system) else [] else: recent_messages = [] remaining_messages = non_system test_compacted = system_msgs.copy() if self.preserve_system else [] test_compacted.extend(recent_messages) tokens_used = self.token_counter.count_tokens(test_compacted) if tokens_used > self.target_tokens: compacted = system_msgs.copy() if self.preserve_system else [] for msg in recent_messages: content = msg.get("content", "") if len(content) > 500: truncated_msg = msg.copy() truncated_msg["content"] = content[:500] + "... [truncated for context limit]" compacted.append(truncated_msg) else: compacted.append(msg) tokens_used = self.token_counter.count_tokens(compacted) else: compacted = test_compacted messages_to_add = [] for msg in reversed(remaining_messages): msg_tokens = self.token_counter.count_tokens([msg]) if tokens_used + msg_tokens <= self.target_tokens: messages_to_add.insert(0, msg) tokens_used += msg_tokens else: break if messages_to_add: insert_pos = len(system_msgs) if self.preserve_system else 0 compacted[insert_pos:insert_pos] = messages_to_add stats["compacted_count"] = len(compacted) stats["messages_removed"] = len(messages) - len(compacted) stats["final_tokens"] = tokens_used return compacted, stats
[docs]class PriorityBasedStrategy(BaseCompactionStrategy): """Compaction strategy based on message priority and importance. Scores messages based on their importance and retains high-priority messages while removing lower-priority ones. This allows for more intelligent compaction that preserves critical conversation elements. Attributes: priority_scorer: Callable that scores message priority (0-1). """ def __init__(self, priority_scorer: Callable | None = None, **kwargs): """Initialize priority-based strategy. Args: priority_scorer: Function to score message priority **kwargs: Arguments for base class """ super().__init__(**kwargs) self.priority_scorer = priority_scorer or self._default_scorer
[docs] def compact( self, messages: list[dict[str, str]], metadata: dict[str, Any] | None = None, ) -> tuple[list[dict[str, str]], dict[str, Any]]: """Compact messages based on priority. Args: messages: List of message dictionaries metadata: Optional metadata with priority info Returns: Compacted messages and statistics """ stats = { "original_count": len(messages), "strategy": "priority_based", } system_msgs, preserved_msgs, compactable_msgs = self._separate_messages(messages) if not compactable_msgs: stats["compacted_count"] = len(messages) return messages, stats scored_messages = [(msg, self.priority_scorer(msg, i, metadata)) for i, msg in enumerate(compactable_msgs)] scored_messages.sort(key=lambda x: x[1], reverse=True) compacted = system_msgs.copy() tokens_used = self.token_counter.count_tokens(compacted) kept_messages = [] for msg, _score in scored_messages: msg_tokens = self.token_counter.count_tokens([msg]) if tokens_used + msg_tokens <= self.target_tokens: kept_messages.append(msg) tokens_used += msg_tokens original_order = {id(msg): i for i, msg in enumerate(compactable_msgs)} kept_messages.sort(key=lambda m: original_order.get(id(m), float("inf"))) compacted.extend(kept_messages) compacted.extend(preserved_msgs) stats["compacted_count"] = len(compacted) stats["messages_removed"] = len(messages) - len(compacted) stats["final_tokens"] = tokens_used return compacted, stats
def _default_scorer(self, message: dict[str, str], index: int, metadata: dict[str, Any] | None) -> float: """Default message priority scorer. Args: message: Message to score index: Message index metadata: Optional metadata Returns: Priority score (0-1) """ score = 0.5 if message.get("role") == "system": score += 0.3 if "function_call" in message or "tool_calls" in message: score += 0.2 content_length = len(message.get("content", "")) if content_length > 500: score += 0.1 recency_bonus = 0.1 * (index / 100) score += min(recency_bonus, 0.1) return min(score, 1.0)
[docs]class TruncateStrategy(BaseCompactionStrategy): """Simple truncation strategy for emergency compaction. Provides a straightforward truncation approach that removes older messages and truncates long message content. This is the simplest and fastest strategy, suitable when more sophisticated approaches are not needed or available. Does not require an LLM client and has minimal computational overhead, making it ideal for resource-constrained situations. """
[docs] def compact( self, messages: list[dict[str, str]], metadata: dict[str, Any] | None = None, ) -> tuple[list[dict[str, str]], dict[str, Any]]: """Compact messages using simple truncation. Args: messages: List of message dictionaries metadata: Optional metadata Returns: Compacted messages and statistics """ stats = { "original_count": len(messages), "strategy": "truncate", } system_msgs, preserved_msgs, compactable_msgs = self._separate_messages(messages) current_tokens = self.token_counter.count_tokens(messages) tokens_to_save = max(0, current_tokens - self.target_tokens) if tokens_to_save > 0: compacted = [] compacted.extend(system_msgs) for msg in preserved_msgs: content = msg.get("content", "") if len(content) > 1000: truncated_msg = msg.copy() truncated_msg["content"] = content[:1000] + "... [truncated]" compacted.append(truncated_msg) else: compacted.append(msg) if compactable_msgs: tokens_used = self.token_counter.count_tokens(compacted) tokens_available = self.target_tokens - tokens_used if tokens_available > 100: summary = f"[Previous {len(compactable_msgs)} messages truncated. " if compactable_msgs: last_content = compactable_msgs[-1].get("content", "")[:200] summary += f"Last message preview: {last_content}...]" compacted.append({"role": "system", "content": summary}) else: compacted = system_msgs + compactable_msgs + preserved_msgs stats["compacted_count"] = len(compacted) stats["messages_removed"] = len(messages) - len(compacted) return compacted, stats
[docs]def get_compaction_strategy( strategy: CompactionStrategy, target_tokens: int, model: str = "gpt-4", llm_client: Any | None = None, **kwargs ) -> BaseCompactionStrategy: """Factory function to get a compaction strategy. Args: strategy: The compaction strategy enum target_tokens: Target number of tokens model: Model name for token counting llm_client: Optional LLM client **kwargs: Additional strategy-specific arguments Returns: Compaction strategy instance """ strategy_map = { CompactionStrategy.SUMMARIZE: SummarizationStrategy, CompactionStrategy.SLIDING_WINDOW: SlidingWindowStrategy, CompactionStrategy.PRIORITY_BASED: PriorityBasedStrategy, CompactionStrategy.TRUNCATE: TruncateStrategy, } strategy_class = strategy_map.get(strategy, SummarizationStrategy) if strategy == CompactionStrategy.SUMMARIZE: return strategy_class(llm_client=llm_client, target_tokens=target_tokens, model=model, **kwargs) else: return strategy_class(target_tokens=target_tokens, model=model, **kwargs)