calute.executors#
Function execution and agent orchestration system.
This module provides the core execution infrastructure for Calute, including: - Function registry and management - Agent orchestration and switching - Function execution with various strategies (sequential, parallel, pipeline) - Retry policies and error handling - Execution metrics and monitoring - Enhanced versions with additional features
The module supports both synchronous and asynchronous function execution, timeout management, and sophisticated error recovery mechanisms.
- class calute.executors.AgentOrchestrator[source]#
Bases:
objectOrchestrates multiple agents and handles switching logic.
Manages a collection of agents, their functions, and the logic for switching between agents based on various triggers.
- agents#
Dictionary of registered agents by ID.
- function_registry#
Registry of all available functions.
- switch_triggers#
Dictionary of trigger handlers for agent switching.
- current_agent_id#
ID of the currently active agent.
- execution_history#
History of agent switches and executions.
- get_current_agent() Agent[source]#
Get the currently active agent.
- Returns
The currently active Agent instance.
- Raises
ValueError – If no agent is currently active.
- register_agent(agent: Agent) None[source]#
Register an agent with the orchestrator.
- Parameters
agent – The agent instance to register.
- Returns
None
- register_switch_trigger(trigger: AgentSwitchTrigger, handler: Callable) None[source]#
Register a custom switch trigger handler.
- Parameters
trigger – The trigger type to register.
handler – The callable handler for this trigger.
- Returns
None
- class calute.executors.EnhancedAgentOrchestrator(max_agents: int = 100, enable_metrics: bool = True)[source]#
Bases:
objectEnhanced orchestrator with better error handling and monitoring.
- register_switch_trigger(trigger: AgentSwitchTrigger, handler: Callable)[source]#
Register a custom switch trigger handler.
- class calute.executors.EnhancedFunctionExecutor(orchestrator: EnhancedAgentOrchestrator, default_timeout: float = 30.0, retry_policy: calute.executors.RetryPolicy | None = None, max_concurrent_executions: int = 10)[source]#
Bases:
objectEnhanced function executor with timeout, retry, and error handling.
- batch_execution() AsyncGenerator[EnhancedFunctionExecutor, None][source]#
Async context manager for a batch execution session with guaranteed cleanup.
- async execute_function_calls(calls: list[RequestFunctionCall], strategy: FunctionCallStrategy = FunctionCallStrategy.SEQUENTIAL, context_variables: dict | None = None, agent: Agent | None = None) list[RequestFunctionCall][source]#
Execute multiple function calls with specified strategy.
- async execute_single_call(call: RequestFunctionCall, context_variables: dict | None = None, agent: Agent | None = None) RequestFunctionCall[source]#
Execute a single function call with full error handling.
- async execute_with_retry(func: Callable, arguments: dict, timeout: float | None = None, retry_policy: calute.executors.RetryPolicy | None = None) Any[source]#
Execute a function with automatic retries on FunctionExecutionError.
- Parameters
func – The callable to execute.
arguments – Keyword arguments to pass to the function.
timeout – Per-attempt timeout; passed to execute_with_timeout.
retry_policy – Override the instance-level retry policy for this call.
- Returns
The function’s return value.
- Raises
CaluteTimeoutError – Propagated immediately without retrying.
FunctionExecutionError – Re-raised after all retry attempts are exhausted.
- async execute_with_timeout(func: Callable, arguments: dict, timeout: float | None = None) Any[source]#
Execute a function with a timeout, wrapping exceptions in framework types.
- Parameters
func – The callable to execute.
arguments – Keyword arguments passed to the function.
timeout – Maximum execution time in seconds; defaults to self.default_timeout.
- Returns
The function’s return value.
- Raises
CaluteTimeoutError – If execution exceeds the timeout.
FunctionExecutionError – If the function raises any other exception.
- class calute.executors.EnhancedFunctionRegistry[source]#
Bases:
objectEnhanced registry with validation and metadata management.
- get_metrics(func_name: str) calute.executors.ExecutionMetrics | None[source]#
Get execution metrics for a function.
- register(func: Callable, agent_id: str, metadata: dict | None = None, validator: Optional[Callable] = None) None[source]#
Register a function with optional argument validator.
- Parameters
func – The callable to register.
agent_id – ID of the owning agent.
metadata – Optional metadata dictionary for the function.
validator – Optional callable that receives the arguments dict and raises on invalid input.
- validate_arguments(func_name: str, arguments: dict) None[source]#
Validate that all required parameters are present and pass the custom validator.
- Parameters
func_name – Name of the registered function to validate against.
arguments – Argument dictionary to check.
- Raises
ValidationError – If the function is not registered, a required parameter is missing, or the custom validator rejects the arguments.
- class calute.executors.ExecutionMetrics(total_calls: int = 0, successful_calls: int = 0, failed_calls: int = 0, timeout_calls: int = 0, total_duration: float = 0.0, average_duration: float = 0.0, max_duration: float = 0.0, min_duration: float = inf)[source]#
Bases:
objectMetrics for function execution.
Tracks aggregate statistics across all recorded executions including success/failure counts, call counts, and duration statistics.
- average_duration: float = 0.0#
- failed_calls: int = 0#
- max_duration: float = 0.0#
- min_duration: float = inf#
- record_execution(duration: float, status: ExecutionStatus) None[source]#
Record a single execution result into the running metrics.
- Parameters
duration – Wall-clock time for the execution in seconds.
status – The terminal status of the execution.
- successful_calls: int = 0#
- timeout_calls: int = 0#
- total_calls: int = 0#
- total_duration: float = 0.0#
- class calute.executors.FunctionExecutionHistory(executions: list[calute.types.function_execution_types.RequestFunctionCall] = <factory>, execution_map: dict[str, calute.types.function_execution_types.RequestFunctionCall] = <factory>)[source]#
Bases:
objectHistory of function executions and their results.
- add_execution(call: RequestFunctionCall) None[source]#
Add a completed function call to the history, indexing it by ID and name.
- as_context_dict() dict[source]#
Convert execution history to a context dictionary suitable for prompt generation.
- execution_map: dict[str, calute.types.function_execution_types.RequestFunctionCall]#
- executions: list[calute.types.function_execution_types.RequestFunctionCall]#
- get_by_id(call_id: str) calute.types.function_execution_types.RequestFunctionCall | None[source]#
Return the function call matching call_id, or None if not found.
- get_by_name(name: str) calute.types.function_execution_types.RequestFunctionCall | None[source]#
Return the most recently recorded function call with the given name, or None.
- class calute.executors.FunctionExecutor(orchestrator: AgentOrchestrator)[source]#
Bases:
objectHandles function execution with various strategies.
- async execute_function_calls(calls: list[RequestFunctionCall], strategy: FunctionCallStrategy = FunctionCallStrategy.SEQUENTIAL, context_variables: dict | None = None, agent: Agent | None = None, runtime_features_state: RuntimeFeaturesState | None = None, loop_detector: LoopDetector | None = None) list[RequestFunctionCall][source]#
Execute function calls using the specified strategy.
- Parameters
calls – List of function calls to execute.
strategy – Execution strategy (SEQUENTIAL, PARALLEL, PIPELINE, CONDITIONAL).
context_variables – Optional context variables to pass to functions.
agent – Optional agent instance for function lookup.
runtime_features_state – Optional runtime features for policy/hooks/audit.
loop_detector – Optional loop detector to guard against repetitive tool calls.
- Returns
List of RequestFunctionCall instances with populated results and statuses.
- class calute.executors.FunctionRegistry[source]#
Bases:
objectRegistry for managing functions across agents.
Maintains a central registry of all functions available in the system, tracking which agent owns each function and associated metadata.
- _functions#
Dictionary mapping function names to callable functions.
- _function_agents#
Dictionary mapping function names to agent IDs.
- _function_metadata#
Dictionary mapping function names to metadata.
- get_function(name: str) tuple[Optional[Callable], str | None][source]#
Get function and its associated agent.
- Parameters
name – Name of the function to retrieve.
- Returns
Tuple of (function, agent_id) or (None, None) if not found.
- class calute.executors.RetryPolicy(max_retries: int = 3, initial_delay: float = 1.0, max_delay: float = 60.0, exponential_base: float = 2.0, jitter: bool = True)[source]#
Bases:
objectConfigurable retry policy for function execution.
- calute.executors.add_depth(x, ep=False)#