calute.cortex.agents.agent#
Agent definition for Cortex framework.
This module provides the CortexAgent class, which represents an intelligent agent within the Cortex framework for AI agent orchestration. CortexAgent enables autonomous task execution, delegation, tool usage, and memory integration for building complex multi-agent systems.
The agent supports features like: - Task execution with context and memory - Automatic delegation to other agents - Rate limiting and execution timeouts - Knowledge management and tool integration - Step-by-step callback mechanisms - Pydantic model output formatting
- Typical usage example:
- agent = CortexAgent(
role=”Data Analyst”, goal=”Analyze data and provide insights”, backstory=”Expert in statistical analysis”, model=”gpt-4”, tools=[analysis_tool], allow_delegation=True
) result = agent.execute(“Analyze sales trends”)
- class calute.cortex.agents.agent.CortexAgent(role: str, goal: str, backstory: str, model: str | None = None, instructions: str | None = None, tools: list[calute.cortex.core.tool.CortexTool | collections.abc.Callable | calute.types.tool_calls.Tool] = <factory>, max_iterations: int = 10, verbose: bool = True, allow_delegation: bool = False, temperature: float = 0.7, max_tokens: int = 2048, memory_enabled: bool = True, capabilities: list[calute.types.function_execution_types.AgentCapability] = <factory>, calute_instance: Calute | None = None, memory: ~typing.Optional[CortexMemory] = None, llm: calute.llms.base.BaseLLM | None = None, reinvoke_after_function: bool = True, cortex_instance: ~typing.Optional[Cortex] = None, max_execution_time: int | None = None, max_rpm: int | None = None, step_callback: collections.abc.Callable | None = None, config: dict = <factory>, knowledge: dict = <factory>, knowledge_sources: list = <factory>, auto_format_guidance: bool = True, output_format_preference: ~typing.Literal['xml', 'json'] = 'xml', auto_compact: bool = False, compact_threshold: float = 0.8, compact_target: float = 0.5, max_context_tokens: int | None = None, compaction_strategy: ~calute.types.function_execution_types.CompactionStrategy = CompactionStrategy.SUMMARIZE, preserve_system_prompt: bool = True, preserve_recent_messages: int = 5, _internal_agent: calute.types.agent_types.Agent | None = None, _auto_compact_agent: calute.agents.auto_compact_agent.AutoCompactAgent | None = None, _conversation_history: list[dict[str, str]] = <factory>, _messages_history: ~typing.Any = None, _template_engine: calute.cortex.core.templates.PromptTemplate | None = None, _delegation_count: int = 0, _times_executed: int = 0, _execution_times: list = <factory>, _last_rpm_window: float = 0, _rpm_requests: list = <factory>, _original_role: str | None = None, _original_goal: str | None = None, _original_backstory: str | None = None, _original_instructions: str | None = None)[source]#
Bases:
objectAgent with specific role, goal, and capabilities.
CortexAgent represents an autonomous AI agent that can execute tasks, use tools, delegate to other agents, and maintain context through memory. It serves as a wrapper around Calute’s Agent class with additional orchestration capabilities for the Cortex framework.
- role#
The agent’s role or title defining its specialization.
- Type
str
- goal#
The primary objective or purpose of the agent.
- Type
str
- backstory#
Background information providing context for the agent’s expertise.
- Type
str
- model#
Optional model identifier for the LLM to use (e.g., ‘gpt-4’).
- Type
str | None
- instructions#
Optional custom system instructions. Auto-generated if not provided.
- Type
str | None
- tools#
List of CortexTool instances the agent can use.
- max_iterations#
Maximum number of retry attempts for failed executions.
- Type
int
- verbose#
Whether to output detailed logging information.
- Type
bool
- allow_delegation#
Whether the agent can delegate tasks to other agents.
- Type
bool
- temperature#
LLM temperature parameter for response randomness (0.0-1.0).
- Type
float
- max_tokens#
Maximum tokens for LLM responses.
- Type
int
- memory_enabled#
Whether to use memory for context building.
- Type
bool
- capabilities#
List of special capabilities the agent possesses.
- memory#
Optional CortexMemory instance for persistent context.
- Type
Optional[CortexMemory]
- llm#
Optional BaseLLM instance for direct LLM access.
- Type
calute.llms.base.BaseLLM | None
- reinvoke_after_function#
Whether to reinvoke LLM after tool execution.
- Type
bool
- cortex_instance#
Reference to parent Cortex instance for multi-agent coordination.
- Type
Optional[Cortex]
- max_execution_time#
Optional timeout in seconds for task execution.
- Type
int | None
- max_rpm#
Optional rate limit for requests per minute.
- Type
int | None
- step_callback#
Optional callback function invoked at each execution step.
- Type
collections.abc.Callable | None
- config#
Dictionary for custom configuration parameters.
- Type
dict
- knowledge#
Dictionary storing agent-specific knowledge.
- Type
dict
- knowledge_sources#
List of knowledge source identifiers.
- Type
list
- auto_format_guidance#
Whether to auto-generate format instructions for Pydantic models.
- Type
bool
- output_format_preference#
Preferred output format (‘xml’ or ‘json’).
- Type
Literal[‘xml’, ‘json’]
- Private Attributes:
_internal_agent: Internal Calute Agent instance. _logger: Logger instance for verbose output. _template_engine: PromptTemplate engine for generating prompts. _delegation_count: Current delegation depth to prevent infinite recursion. _times_executed: Total number of task executions. _execution_times: List of execution durations for statistics. _last_rpm_window: Timestamp for rate limiting window. _rpm_requests: List of request timestamps for rate limiting.
- add_knowledge(key: str, value: str)[source]#
Add knowledge to the agent’s knowledge base.
Adds or updates a knowledge entry that will be included in the agent’s context during task execution.
- Parameters
key – Knowledge identifier or category.
value – Knowledge content or description.
- Side Effects:
Updates the knowledge dictionary.
Example
agent.add_knowledge(“company_policy”, “Always prioritize customer satisfaction”) agent.add_knowledge(“domain_expertise”, “Specialized in healthcare analytics”)
- add_knowledge_source(source: str)[source]#
Add a knowledge source.
Registers a knowledge source identifier that represents an external source of information available to the agent.
- Parameters
source – Knowledge source identifier (e.g., database name, API endpoint).
- Side Effects:
Appends to knowledge_sources list if not already present.
Note
Duplicate sources are automatically prevented.
- allow_delegation: bool = False#
- attach_mcp(mcp_servers: Any, server_names: list[str] | None = None) None[source]#
Attach MCP servers to this agent, connecting and adding their tools.
This method provides a convenient way to connect MCP servers and automatically add their tools to the agent’s function list. Works with both CortexAgent and its internal Calute Agent.
- Parameters
mcp_servers – Can be one of: - MCPManager: An existing MCP manager instance - MCPServerConfig: A single server config (will create manager and connect) - list[MCPServerConfig]: Multiple server configs (will create manager and connect all)
server_names – Optional list of server names to filter tools from. If None, adds tools from all servers in the manager.
Example
>>> >>> agent.attach_mcp(MCPServerConfig( ... name="filesystem", ... command="npx", ... args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"] ... )) >>> >>> >>> agent.attach_mcp([ ... MCPServerConfig(name="filesystem", ...), ... MCPServerConfig(name="sqlite", ...) ... ]) >>> >>> >>> manager = MCPManager() >>> await manager.add_server(config1) >>> await manager.add_server(config2) >>> agent.attach_mcp(manager, server_names=["filesystem"])
- auto_compact: bool = False#
- auto_format_guidance: bool = True#
- backstory: str#
- capabilities: list[calute.types.function_execution_types.AgentCapability]#
- check_context_usage() dict[str, Any] | None[source]#
Check current context usage.
- Returns
Dictionary with usage statistics or None if not enabled.
- compact_target: float = 0.5#
- compact_threshold: float = 0.8#
- compaction_strategy: CompactionStrategy = 'summarize'#
- config: dict#
- create_ui() Any[source]#
Create and launch a user interface for the agent.
Launches an interactive UI application that allows users to interact with this agent through a graphical interface.
- Returns
The UI application instance from calute.ui.
- delegate_task(task_description: str, context: str | None = None) str[source]#
Delegate a task to another agent.
Delegates a task to another agent in the Cortex system. Selects the most appropriate agent and passes the task with additional context about the delegation.
- Parameters
task_description – Description of the task to delegate.
context – Optional additional context for the delegated task.
- Returns
Result from the delegate agent, or error message if delegation fails.
- Return type
str
- Side Effects:
Increments delegation count
Logs delegation activity
Saves delegation to memory if enabled
Passes delegation count to delegate to maintain depth tracking
Note
Returns “Delegation not available” if delegation is disabled or no Cortex instance is available.
- execute(task_description: str, context: str | None = None, streamer_buffer: calute.core.streamer_buffer.StreamerBuffer | None = None, stream_callback: collections.abc.Callable[[Any], None] | None = None, use_thread: bool = False) str | tuple[calute.core.streamer_buffer.StreamerBuffer, threading.Thread][source]#
Execute a task using the agent with advanced controls.
Main method for task execution. Handles rate limiting, timeouts, memory integration, knowledge context, retries, and step callbacks. Streams responses from the LLM and processes function calls if configured.
- Parameters
task_description – Natural language description of the task to execute.
context – Optional additional context to include in the prompt.
streamer_buffer – Optional StreamerBuffer to receive streaming chunks.
stream_callback – Optional callback function for streaming responses.
use_thread – If True, executes in background thread and returns immediately with (buffer, thread) tuple. If False, blocks until complete.
- Returns
str containing the agent’s response. If use_thread=True: tuple of (StreamerBuffer, Thread) for async consumption.
- Return type
If use_thread=False
- Raises
ValueError – If the agent is not connected to a Cortex instance.
TimeoutError – If execution exceeds max_execution_time.
RuntimeError – If no response is received after max_iterations.
Exception – Re-raises any exceptions from the underlying LLM execution.
- Side Effects:
Records request for rate limiting
Updates execution statistics
Invokes step callbacks
Saves interaction to memory if enabled
Example
- result = agent.execute(
“Analyze the sales data”, context=”Focus on Q4 2024 trends”
)
- execute_stream(task_description: str, context: str | None = None, callback: collections.abc.Callable[[calute.types.function_execution_types.StreamChunk | calute.types.function_execution_types.FunctionDetection | calute.types.function_execution_types.FunctionCallsExtracted | calute.types.function_execution_types.FunctionExecutionStart | calute.types.function_execution_types.FunctionExecutionComplete | calute.types.function_execution_types.AgentSwitch | calute.types.function_execution_types.Completion | calute.types.function_execution_types.ReinvokeSignal], None] | None = None) str[source]#
Execute task with real-time streaming and optional callback.
Convenience method that executes a task in a background thread and processes streaming chunks through an optional callback while returning the final result.
- Parameters
task_description – Natural language description of the task to execute.
context – Optional additional context to include in the prompt.
callback – Optional function to call with each streaming chunk. If not provided, chunks are silently consumed.
- Returns
The final agent response after all streaming is complete.
- Return type
str
Example
- def print_chunk(chunk):
- if hasattr(chunk, ‘content’) and chunk.content:
print(chunk.content, end=’’, flush=True)
- result = agent.execute_stream(
“Write a poem”, callback=print_chunk
)
- execute_with_delegation(task_description: str, context: str | None = None) str[source]#
Execute task with automatic delegation if needed and allowed.
Executes a task with intelligent delegation support. First attempts to execute the task directly, then checks if delegation would be beneficial. If delegation occurs, combines both responses into a comprehensive final answer.
- Parameters
task_description – Description of the task to execute.
context – Optional additional context.
- Returns
Final response, potentially combining insights from multiple agents.
- Return type
str
Note
Automatically determines if delegation is needed based on initial response
Combines delegated results with initial response for comprehensive answer
Falls back to delegated result if combination fails
Returns initial result if delegation is not needed or not available
Example
- result = agent.execute_with_delegation(
“Create a marketing strategy with technical implementation details”
)
- property functions: list[collections.abc.Callable]#
Get the list of callable functions registered with the internal agent.
- Returns
List of callable functions available to the agent, or an empty list if the internal agent has not been initialized.
- get_compaction_stats() dict[str, Any] | None[source]#
Get auto-compaction statistics.
- Returns
Dictionary with compaction statistics or None if not enabled.
- get_config(key: str, default: Any = None) Any[source]#
Get configuration value.
Retrieves a configuration parameter value with optional default.
- Parameters
key – Configuration parameter name to retrieve.
default – Default value if key is not found.
- Returns
The configuration value for the key, or default if not found.
- get_execution_stats() dict[source]#
Get execution statistics.
Compiles comprehensive statistics about the agent’s execution history, including timing metrics and execution counts.
- Returns
- Statistics dictionary containing:
times_executed: Total number of executions
avg_execution_time: Average execution duration in seconds
total_execution_time: Sum of all execution times
min_execution_time: Fastest execution time
max_execution_time: Slowest execution time
recent_execution_times: Last 5 execution durations (if available)
- Return type
dict
Note
Returns zero values for timing metrics if no executions have occurred.
- get_rate_limit_status() dict[source]#
Get current rate limiting status.
Provides detailed information about the current rate limiting state, including requests made and remaining capacity.
- Returns
- Rate limit status containing:
rate_limited: Whether currently rate limited
max_rpm: Maximum requests per minute allowed (None if no limit)
current_requests: Number of requests in current 60-second window
requests_remaining: Available requests before hitting limit
- Return type
dict
Example
status = agent.get_rate_limit_status() if status[‘requests_remaining’] < 5:
print(“Approaching rate limit”)
- goal: str#
- instructions: str | None = None#
- interpolate_inputs(inputs: dict[str, Any]) None[source]#
Interpolate inputs into the agent’s role, goal, backstory, and instructions.
This method replaces template variables (e.g., {variable_name}) in the agent’s attributes with values from the provided inputs dictionary. Original values are preserved for potential re-interpolation.
- Parameters
inputs – Dictionary mapping template variables to their values. Supported value types are strings, integers, floats, bools, and serializable dicts/lists.
- Side Effects:
Updates role, goal, backstory, and instructions with interpolated values. Stores original values if not already saved.
Example
>>> agent = CortexAgent(role="Expert in {domain}", goal="Master {topic}") >>> agent.interpolate_inputs({"domain": "AI", "topic": "LLMs"})
- is_rate_limited() bool[source]#
Check if agent is currently rate limited.
Determines whether the agent would be rate limited if it attempted to make a request right now.
- Returns
True if currently rate limited, False otherwise.
- Return type
bool
Note
This is a read-only check that doesn’t modify rate limit state.
- knowledge: dict#
- knowledge_sources: list#
- llm: calute.llms.base.BaseLLM | None = None#
- max_context_tokens: int | None = None#
- max_execution_time: int | None = None#
- max_iterations: int = 10#
- max_rpm: int | None = None#
- max_tokens: int = 2048#
- memory: Optional[CortexMemory] = None#
- memory_enabled: bool = True#
- model: str | None = None#
- output_format_preference: Literal['xml', 'json'] = 'xml'#
- preserve_recent_messages: int = 5#
- preserve_system_prompt: bool = True#
- reinvoke_after_function: bool = True#
- reset_stats()[source]#
Reset execution statistics.
Clears all execution statistics and counters, returning the agent to a fresh state for metric tracking.
- Side Effects:
Resets _times_executed to 0
Clears _execution_times list
Clears _rpm_requests list
Resets _delegation_count to 0
- role: str#
- set_step_callback(callback: Callable)[source]#
Set step callback function.
Registers a callback function that will be invoked at each step of task execution, useful for monitoring and debugging.
- Parameters
callback – Callable that accepts a dict parameter containing step information. The dict includes fields like ‘step’, ‘agent’, ‘task’, etc.
Example
- def my_callback(step_info):
print(f”Step: {step_info[‘step’]} for agent: {step_info[‘agent’]}”)
agent.set_step_callback(my_callback)
- step_callback: collections.abc.Callable | None = None#
- temperature: float = 0.7#
- tools: list[calute.cortex.core.tool.CortexTool | collections.abc.Callable | calute.types.tool_calls.Tool]#
- update_config(key: str, value: Any) None[source]#
Update agent configuration.
Sets or updates a configuration parameter for the agent. Configuration can be used to store agent-specific settings or runtime parameters.
- Parameters
key – Configuration parameter name.
value – Configuration parameter value (any type).
- Side Effects:
Updates the config dictionary.
- verbose: bool = True#