calute.executors

Contents

calute.executors#

Function execution and agent orchestration system.

This module provides the core execution infrastructure for Calute, including: - Function registry and management - Agent orchestration and switching - Function execution with various strategies (sequential, parallel, pipeline) - Retry policies and error handling - Execution metrics and monitoring - Enhanced versions with additional features

The module supports both synchronous and asynchronous function execution, timeout management, and sophisticated error recovery mechanisms.

class calute.executors.AgentOrchestrator[source]#

Bases: object

Orchestrates multiple agents and handles switching logic.

Manages a collection of agents, their functions, and the logic for switching between agents based on various triggers.

agents#

Dictionary of registered agents by ID.

function_registry#

Registry of all available functions.

switch_triggers#

Dictionary of trigger handlers for agent switching.

current_agent_id#

ID of the currently active agent.

execution_history#

History of agent switches and executions.

get_current_agent() Agent[source]#

Get the currently active agent.

Returns

The currently active Agent instance.

Raises

ValueError – If no agent is currently active.

register_agent(agent: Agent) None[source]#

Register an agent with the orchestrator.

Parameters

agent – The agent instance to register.

Returns

None

register_switch_trigger(trigger: AgentSwitchTrigger, handler: Callable) None[source]#

Register a custom switch trigger handler.

Parameters
  • trigger – The trigger type to register.

  • handler – The callable handler for this trigger.

Returns

None

should_switch_agent(context: dict) str | None[source]#

Determine if agent switching is needed.

Parameters

context – The current execution context.

Returns

The ID of the target agent if switching is needed, None otherwise.

switch_agent(target_agent_id: str, reason: str | None = None) None[source]#

Switch to a different agent.

Parameters
  • target_agent_id – ID of the agent to switch to.

  • reason – Optional reason for the switch.

Returns

None

Raises

ValueError – If the target agent is not found.

class calute.executors.EnhancedAgentOrchestrator(max_agents: int = 100, enable_metrics: bool = True)[source]#

Bases: object

Enhanced orchestrator with better error handling and monitoring.

async register_agent(agent: Agent)[source]#

Register an agent with validation.

register_switch_trigger(trigger: AgentSwitchTrigger, handler: Callable)[source]#

Register a custom switch trigger handler.

should_switch_agent(context: dict) str | None[source]#

Determine if agent switching is needed.

async switch_agent(target_agent_id: str, reason: str | None = None)[source]#

Switch to a different agent with validation.

class calute.executors.EnhancedFunctionExecutor(orchestrator: EnhancedAgentOrchestrator, default_timeout: float = 30.0, retry_policy: calute.executors.RetryPolicy | None = None, max_concurrent_executions: int = 10)[source]#

Bases: object

Enhanced function executor with timeout, retry, and error handling.

batch_execution() AsyncGenerator[EnhancedFunctionExecutor, None][source]#

Async context manager for a batch execution session with guaranteed cleanup.

async execute_function_calls(calls: list[RequestFunctionCall], strategy: FunctionCallStrategy = FunctionCallStrategy.SEQUENTIAL, context_variables: dict | None = None, agent: Agent | None = None) list[RequestFunctionCall][source]#

Execute multiple function calls with specified strategy.

async execute_single_call(call: RequestFunctionCall, context_variables: dict | None = None, agent: Agent | None = None) RequestFunctionCall[source]#

Execute a single function call with full error handling.

async execute_with_retry(func: Callable, arguments: dict, timeout: float | None = None, retry_policy: calute.executors.RetryPolicy | None = None) Any[source]#

Execute a function with automatic retries on FunctionExecutionError.

Parameters
  • func – The callable to execute.

  • arguments – Keyword arguments to pass to the function.

  • timeout – Per-attempt timeout; passed to execute_with_timeout.

  • retry_policy – Override the instance-level retry policy for this call.

Returns

The function’s return value.

Raises
async execute_with_timeout(func: Callable, arguments: dict, timeout: float | None = None) Any[source]#

Execute a function with a timeout, wrapping exceptions in framework types.

Parameters
  • func – The callable to execute.

  • arguments – Keyword arguments passed to the function.

  • timeout – Maximum execution time in seconds; defaults to self.default_timeout.

Returns

The function’s return value.

Raises
class calute.executors.EnhancedFunctionRegistry[source]#

Bases: object

Enhanced registry with validation and metadata management.

get_metrics(func_name: str) calute.executors.ExecutionMetrics | None[source]#

Get execution metrics for a function.

register(func: Callable, agent_id: str, metadata: dict | None = None, validator: Optional[Callable] = None) None[source]#

Register a function with optional argument validator.

Parameters
  • func – The callable to register.

  • agent_id – ID of the owning agent.

  • metadata – Optional metadata dictionary for the function.

  • validator – Optional callable that receives the arguments dict and raises on invalid input.

validate_arguments(func_name: str, arguments: dict) None[source]#

Validate that all required parameters are present and pass the custom validator.

Parameters
  • func_name – Name of the registered function to validate against.

  • arguments – Argument dictionary to check.

Raises

ValidationError – If the function is not registered, a required parameter is missing, or the custom validator rejects the arguments.

class calute.executors.ExecutionMetrics(total_calls: int = 0, successful_calls: int = 0, failed_calls: int = 0, timeout_calls: int = 0, total_duration: float = 0.0, average_duration: float = 0.0, max_duration: float = 0.0, min_duration: float = inf)[source]#

Bases: object

Metrics for function execution.

Tracks aggregate statistics across all recorded executions including success/failure counts, call counts, and duration statistics.

average_duration: float = 0.0#
failed_calls: int = 0#
max_duration: float = 0.0#
min_duration: float = inf#
record_execution(duration: float, status: ExecutionStatus) None[source]#

Record a single execution result into the running metrics.

Parameters
  • duration – Wall-clock time for the execution in seconds.

  • status – The terminal status of the execution.

successful_calls: int = 0#
timeout_calls: int = 0#
total_calls: int = 0#
total_duration: float = 0.0#
class calute.executors.FunctionExecutionHistory(executions: list[calute.types.function_execution_types.RequestFunctionCall] = <factory>, execution_map: dict[str, calute.types.function_execution_types.RequestFunctionCall] = <factory>)[source]#

Bases: object

History of function executions and their results.

add_execution(call: RequestFunctionCall) None[source]#

Add a completed function call to the history, indexing it by ID and name.

as_context_dict() dict[source]#

Convert execution history to a context dictionary suitable for prompt generation.

execution_map: dict[str, calute.types.function_execution_types.RequestFunctionCall]#
executions: list[calute.types.function_execution_types.RequestFunctionCall]#
get_by_id(call_id: str) calute.types.function_execution_types.RequestFunctionCall | None[source]#

Return the function call matching call_id, or None if not found.

get_by_name(name: str) calute.types.function_execution_types.RequestFunctionCall | None[source]#

Return the most recently recorded function call with the given name, or None.

get_successful_results() dict[str, Any][source]#

Return a mapping of function_name to result for all successful calls.

class calute.executors.FunctionExecutor(orchestrator: AgentOrchestrator)[source]#

Bases: object

Handles function execution with various strategies.

async execute_function_calls(calls: list[RequestFunctionCall], strategy: FunctionCallStrategy = FunctionCallStrategy.SEQUENTIAL, context_variables: dict | None = None, agent: Agent | None = None, runtime_features_state: RuntimeFeaturesState | None = None, loop_detector: LoopDetector | None = None) list[RequestFunctionCall][source]#

Execute function calls using the specified strategy.

Parameters
  • calls – List of function calls to execute.

  • strategy – Execution strategy (SEQUENTIAL, PARALLEL, PIPELINE, CONDITIONAL).

  • context_variables – Optional context variables to pass to functions.

  • agent – Optional agent instance for function lookup.

  • runtime_features_state – Optional runtime features for policy/hooks/audit.

  • loop_detector – Optional loop detector to guard against repetitive tool calls.

Returns

List of RequestFunctionCall instances with populated results and statuses.

class calute.executors.FunctionRegistry[source]#

Bases: object

Registry for managing functions across agents.

Maintains a central registry of all functions available in the system, tracking which agent owns each function and associated metadata.

_functions#

Dictionary mapping function names to callable functions.

_function_agents#

Dictionary mapping function names to agent IDs.

_function_metadata#

Dictionary mapping function names to metadata.

get_function(name: str) tuple[Optional[Callable], str | None][source]#

Get function and its associated agent.

Parameters

name – Name of the function to retrieve.

Returns

Tuple of (function, agent_id) or (None, None) if not found.

get_functions_by_agent(agent_id: str) list[Callable][source]#

Get all functions for a specific agent.

Parameters

agent_id – ID of the agent.

Returns

List of functions registered to the agent.

register(func: Callable, agent_id: str, metadata: dict | None = None)[source]#

Register a function with the registry.

Parameters
  • func – The callable function to register.

  • agent_id – ID of the agent that owns this function.

  • metadata – Optional metadata about the function.

class calute.executors.RetryPolicy(max_retries: int = 3, initial_delay: float = 1.0, max_delay: float = 60.0, exponential_base: float = 2.0, jitter: bool = True)[source]#

Bases: object

Configurable retry policy for function execution.

get_delay(attempt: int) float[source]#

Calculate delay for a given retry attempt.

calute.executors.add_depth(x, ep=False)#