calute.cortex.orchestration.task#
Task definition for Cortex framework.
This module provides the CortexTask class and related data structures for defining and executing tasks within the Cortex multi-agent orchestration framework. Tasks represent units of work that can be assigned to agents, with support for features like output validation, chaining, retries, and memory integration.
The module includes: - CortexTask: Main task class for defining executable work units - CortexTaskOutput: Rich output container with execution metadata - ChainLink: Task chaining and conditional routing - TaskValidationError: Custom exception for validation failures
Key features: - Output validation with Pydantic models (JSON and XML parsing) - Task chaining with conditional routing - Automatic retry with configurable conditions - Human input/feedback integration - Memory persistence for results - Security configurations and tool restrictions - Context compression and priority weighting - Streaming execution support - Dependency management between tasks
- Typical usage example:
from calute.cortex.task import CortexTask from calute.cortex.agent import CortexAgent
- agent = CortexAgent(
role=”Data Analyst”, goal=”Analyze data”, backstory=”Expert analyst”
)
- task = CortexTask(
description=”Analyze quarterly sales data”, expected_output=”Summary report with key insights”, agent=agent, max_retries=3, importance=0.8
)
result = task.execute() print(result.output)
- class calute.cortex.orchestration.task.ChainLink(condition: collections.abc.Callable[[str], bool] | None = None, next_task: calute.cortex.orchestration.task.CortexTask | None = None, fallback_task: calute.cortex.orchestration.task.CortexTask | None = None)[source]#
Bases:
objectRepresents a link in a task chain for conditional task routing.
ChainLink enables creating conditional workflows where the next task to execute depends on the output of the current task. It supports both success and fallback paths, allowing for branching task execution.
- condition#
Optional callable that takes the task output string and returns True if the next_task should execute, False for fallback. If None, next_task is always executed.
- Type
collections.abc.Callable[[str], bool] | None
- next_task#
The task to execute if condition returns True or is None.
- Type
- fallback_task#
The task to execute if condition returns False.
- Type
Example
>>> def check_success(output: str) -> bool: ... return "success" in output.lower() >>> chain = ChainLink( ... condition=check_success, ... next_task=process_results_task, ... fallback_task=handle_error_task ... )
- condition: collections.abc.Callable[[str], bool] | None = None#
- fallback_task: calute.cortex.orchestration.task.CortexTask | None = None#
- next_task: calute.cortex.orchestration.task.CortexTask | None = None#
- class calute.cortex.orchestration.task.CortexTask(description: str, expected_output: str, agent: CortexAgent | None = None, tools: list[CortexTool] = <factory>, context: list[CortexTask] | None = None, output_file: str | None = None, human_feedback: bool = False, chain: ChainLink | None = None, max_retries: int = 3, memory: CortexMemory | None = None, save_to_memory: bool = True, importance: float = 0.5, output_json: type[BaseModel] | None = None, output_pydantic: type[BaseModel] | None = None, create_directory: bool = True, async_execution: bool = False, callback: Callable | None = None, pre_execution_callback: Callable | None = None, error_callback: Callable | None = None, human_input: bool = False, human_input_prompt: str | None = None, input_validator: Callable | None = None, security_config: dict | None = None, tool_restrictions: list[str] | None = None, allow_dangerous_tools: bool = False, prompt_context: str | None = None, context_priority: dict[str, float] = <factory>, context_compression: bool = False, max_context_length: int = 10000, dependencies: list[CortexTask] = <factory>, conditional_execution: Callable | None = None, retry_conditions: list[Callable] = <factory>, timeout_behavior: Literal['fail', 'continue', 'return_partial'] = 'fail', timeout: int | None = None, _output: str | None = None, _execution_stats: dict = <factory>, _start_time: float = 0.0, _original_description: str | None = None, _original_expected_output: str | None = None, _original_output_file: str | None = None, _original_prompt_context: str | None = None)[source]#
Bases:
objectTask definition for execution within the Cortex framework.
CortexTask represents a unit of work to be executed by a CortexAgent. It provides comprehensive configuration options for task execution including output validation, retry logic, human interaction, security controls, and context management.
- description#
Detailed description of what the task should accomplish.
- Type
str
- expected_output#
Description of what successful output should look like.
- Type
str
- agent#
The CortexAgent assigned to execute this task.
- Type
CortexAgent | None
- tools#
List of CortexTool instances available for this specific task.
- Type
list[CortexTool]
- context#
List of prerequisite tasks whose outputs provide context.
- Type
list[CortexTask] | None
- output_file#
Optional file path to save the task output.
- Type
str | None
- human_feedback#
Whether to request human feedback after execution.
- Type
bool
- max_retries#
Maximum retry attempts on failure (default: 3).
- Type
int
- memory#
CortexMemory instance for persisting task results.
- Type
CortexMemory | None
- save_to_memory#
Whether to automatically save results to memory.
- Type
bool
- importance#
Task importance score from 0.0 to 1.0 for prioritization.
- Type
float
- output_json#
Pydantic model class for JSON output validation.
- Type
type[BaseModel] | None
- output_pydantic#
Pydantic model class for structured output parsing.
- Type
type[BaseModel] | None
- create_directory#
Whether to create output directory if missing.
- Type
bool
- async_execution#
Whether to execute asynchronously (reserved).
- Type
bool
- callback#
Function called after successful task completion.
- Type
Callable | None
- pre_execution_callback#
Function called before task execution starts.
- Type
Callable | None
- error_callback#
Function called when an error occurs.
- Type
Callable | None
- human_input#
Whether to prompt for human input during execution.
- Type
bool
- human_input_prompt#
Custom prompt for human input requests.
- Type
str | None
- input_validator#
Function to validate human input before accepting.
- Type
Callable | None
- security_config#
Dictionary of security-related configuration options.
- Type
dict | None
- tool_restrictions#
List of allowed tool names (restricts agent tools).
- Type
list[str] | None
- allow_dangerous_tools#
Whether to permit dangerous tool usage.
- Type
bool
- prompt_context#
Additional context string to include in the prompt.
- Type
str | None
- context_priority#
Priority weights for context sources (higher = more important).
- Type
dict[str, float]
- context_compression#
Whether to compress context when exceeding limits.
- Type
bool
- max_context_length#
Maximum character length for context (default: 10000).
- Type
int
- dependencies#
List of tasks that must complete before this task.
- Type
list[CortexTask]
- conditional_execution#
Function that returns whether to execute the task.
- Type
Callable | None
- retry_conditions#
List of functions determining retry eligibility.
- Type
list[Callable]
- timeout_behavior#
Action on timeout - ‘fail’, ‘continue’, or ‘return_partial’.
- Type
Literal[‘fail’, ‘continue’, ‘return_partial’]
- timeout#
Maximum execution time in seconds before timeout.
- Type
int | None
- Private Attributes:
_output: Cached output string from last execution. _execution_stats: Dictionary tracking execution statistics. _start_time: Timestamp when execution started. _original_description: Original description before interpolation. _original_expected_output: Original expected_output before interpolation. _original_output_file: Original output_file before interpolation. _original_prompt_context: Original prompt_context before interpolation.
- add_context(tasks: list[calute.cortex.orchestration.task.CortexTask] | calute.cortex.orchestration.task.CortexTask)[source]#
Add context tasks whose outputs will be available during execution.
- Parameters
tasks – A single CortexTask or list of CortexTasks to add as context. The outputs of these tasks will be included when building the execution prompt.
- add_dependency(task: CortexTask)[source]#
Add a task dependency that must complete before this task.
- Parameters
task – The CortexTask that must complete before this task executes.
Note
Duplicate dependencies are ignored.
- add_retry_condition(condition: Callable)[source]#
Add a custom retry condition function.
- Parameters
condition – A callable that takes (error, retry_count, task) and returns True if retry should be attempted, False otherwise.
Note
Duplicate conditions are ignored.
- agent: CortexAgent | None = None#
- allow_dangerous_tools: bool = False#
- async_execution: bool = False#
- callback: Callable | None = None#
- conditional_execution: Callable | None = None#
- context: list[CortexTask] | None = None#
- context_compression: bool = False#
- context_priority: dict[str, float]#
- create_directory: bool = True#
- create_ui()[source]#
Launch a UI application for interacting with this task.
Creates and launches an interactive user interface that allows users to execute and monitor this task visually.
- Returns
The launched UI application instance.
- dependencies: list[CortexTask]#
- description: str#
- error_callback: Callable | None = None#
- execute(context_outputs: list[str] | None = None, use_streaming: bool = False, stream_callback: collections.abc.Callable[[Any], None] | None = None) calute.cortex.orchestration.task.CortexTaskOutput | tuple[calute.core.streamer_buffer.StreamerBuffer, threading.Thread][source]#
Execute the task with full Cortex feature support and optional streaming.
Main execution method for CortexTask. Handles the complete task lifecycle including dependency checking, tool restriction enforcement, callback invocation, context building, output validation, retry logic, human interaction, memory persistence, and file output.
- Parameters
context_outputs – Optional list of output strings from previously completed tasks to include as context. These are combined with any additional context sources (human input, prompt_context) and prioritized according to
context_priorityweights.use_streaming – If True, executes in a background thread with real-time streaming output. Returns immediately with a buffer/thread tuple. Defaults to False (blocking execution).
stream_callback – Optional callback function invoked with each streaming chunk. Only used when
use_streaming=True.
- Returns
A CortexTaskOutput containing the result string, execution metrics, validation results, and metadata. If
use_streaming=True: A tuple of (StreamerBuffer, Thread) for asynchronous consumption of the streaming response.- Return type
If
use_streaming=False- Raises
ValueError – If no agent is assigned or dependencies are not satisfied.
TimeoutError – If execution exceeds the configured
timeoutandtimeout_behavioris “fail”.TaskValidationError – If output validation fails after all retries.
Exception – If task fails after all retry attempts and
timeout_behavioris not “return_partial”.
- Side Effects:
Invokes
pre_execution_callbackbefore execution.Invokes
callbackafter successful completion.Invokes
error_callbackon failure.Saves results to memory if
save_to_memoryis True.Writes output to
output_fileif configured.Creates output directories if
create_directoryis True.Applies tool restrictions to the agent if configured.
Requests human feedback if
human_feedbackis enabled.
- expected_output: str#
- get_execution_stats() dict[source]#
Get a copy of execution statistics from the last run.
- Returns
Dictionary containing used_tools, tools_errors, delegations, and retry_count from the most recent execution.
- human_feedback: bool = False#
- human_input: bool = False#
- human_input_prompt: str | None = None#
- importance: float = 0.5#
- input_validator: Callable | None = None#
- interpolate_inputs(inputs: dict[str, Any]) None[source]#
Interpolate inputs into the task’s description, expected output, and other fields.
This method replaces template variables (e.g., {variable_name}) in the task’s attributes with values from the provided inputs dictionary. Original values are preserved for potential re-interpolation.
- Parameters
inputs – Dictionary mapping template variables to their values. Supported value types are strings, integers, floats, bools, and serializable dicts/lists.
- Side Effects:
Updates description, expected_output, output_file, and prompt_context with interpolated values. Stores original values if not already saved.
Example
>>> task = CortexTask( ... description="Analyze {topic} trends in {year}", ... expected_output="Report on {topic} developments" ... ) >>> task.interpolate_inputs({"topic": "AI", "year": 2025})
- max_context_length: int = 10000#
- max_retries: int = 3#
- memory: CortexMemory | None = None#
- property output: str | None#
Get the cached output from the last task execution.
- Returns
The output string from the most recent execute() call, or None if the task has not been executed.
- output_file: str | None = None#
- output_json: type[BaseModel] | None = None#
- output_pydantic: type[BaseModel] | None = None#
- pre_execution_callback: Callable | None = None#
- prompt_context: str | None = None#
- remove_dependency(task: CortexTask)[source]#
Remove a task from the dependencies list.
- Parameters
task – The CortexTask to remove from dependencies.
Note
Silently ignores if task is not in dependencies.
- reset_stats()[source]#
Reset execution statistics to initial values.
Clears all counters for tool usage, errors, delegations, and retries. Should be called before re-executing a task if fresh statistics are needed.
- retry_conditions: list[Callable]#
- save_to_memory: bool = True#
- security_config: dict | None = None#
- set_callback(callback_type: Literal['pre_execution', 'post_execution', 'error'], callback: Callable) None[source]#
Set a callback function for task lifecycle events.
- Parameters
callback_type – One of ‘pre_execution’, ‘post_execution’, or ‘error’.
callback – The function to call at the specified lifecycle point.
- Raises
ValueError – If callback_type is not a recognized type.
- set_json_output_model(model: type[pydantic.main.BaseModel])[source]#
Set a Pydantic model for JSON output extraction and validation.
- Parameters
model – A Pydantic BaseModel subclass. Output will be parsed as JSON and validated against this model.
- set_security_config(**config)[source]#
Update security configuration with provided options.
- Parameters
**config – Key-value pairs to merge into security_config.
- timeout: int | None = None#
- timeout_behavior: Literal['fail', 'continue', 'return_partial'] = 'fail'#
- tool_restrictions: list[str] | None = None#
- tools: list[CortexTool]#
- class calute.cortex.orchestration.task.CortexTaskOutput(task: CortexTask, output: str, agent: CortexAgent, timestamp: float = <factory>, raw_output: str | None = None, token_usage: dict[str, int] = <factory>, validation_results: dict[str, bool] = <factory>, pydantic_output: object | None = None, json_dict: dict | None = None, execution_time: float = 0.0, used_tools: int = 0, tools_errors: int = 0, delegations: int = 0, retry_count: int = 0, execution_metadata: dict = <factory>, performance_metrics: dict = <factory>)[source]#
Bases:
objectEnhanced output container from a completed task with rich metadata.
CortexTaskOutput encapsulates the result of task execution along with comprehensive metadata about the execution process, including timing, validation results, tool usage statistics, and performance metrics.
- task#
Reference to the CortexTask that produced this output.
- Type
- output#
The primary output string from task execution.
- Type
str
- agent#
The CortexAgent that executed the task.
- Type
- timestamp#
Unix timestamp when the output was generated.
- Type
float
- raw_output#
Unprocessed output before any formatting or validation.
- Type
str | None
- token_usage#
Dictionary tracking token consumption (e.g., prompt_tokens, completion_tokens, total_tokens).
- Type
dict[str, int]
- validation_results#
Dictionary of validation check results. Keys are validation type names, values are boolean pass/fail or error strings.
- Type
dict[str, bool]
- pydantic_output#
Parsed and validated Pydantic model instance if output_json or output_pydantic was specified on the task.
- Type
object | None
- json_dict#
Dictionary representation if output was valid JSON.
- Type
dict | None
- execution_time#
Total execution duration in seconds.
- Type
float
- used_tools#
Count of tool invocations during execution.
- Type
int
- tools_errors#
Count of tool invocation errors encountered.
- Type
int
- delegations#
Number of delegations to other agents.
- Type
int
- retry_count#
Number of retry attempts before success.
- Type
int
- execution_metadata#
Additional metadata about execution context (e.g., had_human_input, security_applied, validation_applied).
- Type
dict
- performance_metrics#
Aggregated performance statistics (e.g., avg_execution_time, total_retries).
- Type
dict
- agent: CortexAgent#
- delegations: int = 0#
- execution_metadata: dict#
- execution_time: float = 0.0#
- json_dict: dict | None = None#
- output: str#
- performance_metrics: dict#
- pydantic_output: object | None = None#
- raw_output: str | None = None#
- retry_count: int = 0#
- property summary: str#
Get a truncated summary of the output.
- Returns
The first 200 characters of the output with ellipsis if truncated, or the full output if shorter than 200 characters.
- task: CortexTask#
- timestamp: float#
- to_dict() dict[source]#
Convert the task output to a dictionary representation.
- Returns
Dictionary containing all task output data including task details, agent information, execution metrics, and validation results. Suitable for serialization or logging purposes.
- token_usage: dict[str, int]#
- tools_errors: int = 0#
- used_tools: int = 0#
- validation_results: dict[str, bool]#
- exception calute.cortex.orchestration.task.TaskValidationError(message: str)[source]#
Bases:
ExceptionException raised when task output validation fails.
This exception is raised when a task’s output fails to validate against the specified Pydantic model or JSON schema, after all retry attempts have been exhausted.
- message#
Detailed description of the validation failure, including which validation rules failed and any parsing errors encountered.