calute.cortex.orchestration.task

Contents

calute.cortex.orchestration.task#

Task definition for Cortex framework.

This module provides the CortexTask class and related data structures for defining and executing tasks within the Cortex multi-agent orchestration framework. Tasks represent units of work that can be assigned to agents, with support for features like output validation, chaining, retries, and memory integration.

The module includes: - CortexTask: Main task class for defining executable work units - CortexTaskOutput: Rich output container with execution metadata - ChainLink: Task chaining and conditional routing - TaskValidationError: Custom exception for validation failures

Key features: - Output validation with Pydantic models (JSON and XML parsing) - Task chaining with conditional routing - Automatic retry with configurable conditions - Human input/feedback integration - Memory persistence for results - Security configurations and tool restrictions - Context compression and priority weighting - Streaming execution support - Dependency management between tasks

Typical usage example:

from calute.cortex.task import CortexTask from calute.cortex.agent import CortexAgent

agent = CortexAgent(

role=”Data Analyst”, goal=”Analyze data”, backstory=”Expert analyst”

)

task = CortexTask(

description=”Analyze quarterly sales data”, expected_output=”Summary report with key insights”, agent=agent, max_retries=3, importance=0.8

)

result = task.execute() print(result.output)

Bases: object

Represents a link in a task chain for conditional task routing.

ChainLink enables creating conditional workflows where the next task to execute depends on the output of the current task. It supports both success and fallback paths, allowing for branching task execution.

condition#

Optional callable that takes the task output string and returns True if the next_task should execute, False for fallback. If None, next_task is always executed.

Type

collections.abc.Callable[[str], bool] | None

next_task#

The task to execute if condition returns True or is None.

Type

calute.cortex.orchestration.task.CortexTask | None

fallback_task#

The task to execute if condition returns False.

Type

calute.cortex.orchestration.task.CortexTask | None

Example

>>> def check_success(output: str) -> bool:
...     return "success" in output.lower()
>>> chain = ChainLink(
...     condition=check_success,
...     next_task=process_results_task,
...     fallback_task=handle_error_task
... )
condition: collections.abc.Callable[[str], bool] | None = None#
fallback_task: calute.cortex.orchestration.task.CortexTask | None = None#
next_task: calute.cortex.orchestration.task.CortexTask | None = None#
class calute.cortex.orchestration.task.CortexTask(description: str, expected_output: str, agent: CortexAgent | None = None, tools: list[CortexTool] = <factory>, context: list[CortexTask] | None = None, output_file: str | None = None, human_feedback: bool = False, chain: ChainLink | None = None, max_retries: int = 3, memory: CortexMemory | None = None, save_to_memory: bool = True, importance: float = 0.5, output_json: type[BaseModel] | None = None, output_pydantic: type[BaseModel] | None = None, create_directory: bool = True, async_execution: bool = False, callback: Callable | None = None, pre_execution_callback: Callable | None = None, error_callback: Callable | None = None, human_input: bool = False, human_input_prompt: str | None = None, input_validator: Callable | None = None, security_config: dict | None = None, tool_restrictions: list[str] | None = None, allow_dangerous_tools: bool = False, prompt_context: str | None = None, context_priority: dict[str, float] = <factory>, context_compression: bool = False, max_context_length: int = 10000, dependencies: list[CortexTask] = <factory>, conditional_execution: Callable | None = None, retry_conditions: list[Callable] = <factory>, timeout_behavior: Literal['fail', 'continue', 'return_partial'] = 'fail', timeout: int | None = None, _output: str | None = None, _execution_stats: dict = <factory>, _start_time: float = 0.0, _original_description: str | None = None, _original_expected_output: str | None = None, _original_output_file: str | None = None, _original_prompt_context: str | None = None)[source]#

Bases: object

Task definition for execution within the Cortex framework.

CortexTask represents a unit of work to be executed by a CortexAgent. It provides comprehensive configuration options for task execution including output validation, retry logic, human interaction, security controls, and context management.

description#

Detailed description of what the task should accomplish.

Type

str

expected_output#

Description of what successful output should look like.

Type

str

agent#

The CortexAgent assigned to execute this task.

Type

CortexAgent | None

tools#

List of CortexTool instances available for this specific task.

Type

list[CortexTool]

context#

List of prerequisite tasks whose outputs provide context.

Type

list[CortexTask] | None

output_file#

Optional file path to save the task output.

Type

str | None

human_feedback#

Whether to request human feedback after execution.

Type

bool

chain#

Optional ChainLink for conditional task routing.

Type

ChainLink | None

max_retries#

Maximum retry attempts on failure (default: 3).

Type

int

memory#

CortexMemory instance for persisting task results.

Type

CortexMemory | None

save_to_memory#

Whether to automatically save results to memory.

Type

bool

importance#

Task importance score from 0.0 to 1.0 for prioritization.

Type

float

output_json#

Pydantic model class for JSON output validation.

Type

type[BaseModel] | None

output_pydantic#

Pydantic model class for structured output parsing.

Type

type[BaseModel] | None

create_directory#

Whether to create output directory if missing.

Type

bool

async_execution#

Whether to execute asynchronously (reserved).

Type

bool

callback#

Function called after successful task completion.

Type

Callable | None

pre_execution_callback#

Function called before task execution starts.

Type

Callable | None

error_callback#

Function called when an error occurs.

Type

Callable | None

human_input#

Whether to prompt for human input during execution.

Type

bool

human_input_prompt#

Custom prompt for human input requests.

Type

str | None

input_validator#

Function to validate human input before accepting.

Type

Callable | None

security_config#

Dictionary of security-related configuration options.

Type

dict | None

tool_restrictions#

List of allowed tool names (restricts agent tools).

Type

list[str] | None

allow_dangerous_tools#

Whether to permit dangerous tool usage.

Type

bool

prompt_context#

Additional context string to include in the prompt.

Type

str | None

context_priority#

Priority weights for context sources (higher = more important).

Type

dict[str, float]

context_compression#

Whether to compress context when exceeding limits.

Type

bool

max_context_length#

Maximum character length for context (default: 10000).

Type

int

dependencies#

List of tasks that must complete before this task.

Type

list[CortexTask]

conditional_execution#

Function that returns whether to execute the task.

Type

Callable | None

retry_conditions#

List of functions determining retry eligibility.

Type

list[Callable]

timeout_behavior#

Action on timeout - ‘fail’, ‘continue’, or ‘return_partial’.

Type

Literal[‘fail’, ‘continue’, ‘return_partial’]

timeout#

Maximum execution time in seconds before timeout.

Type

int | None

Private Attributes:

_output: Cached output string from last execution. _execution_stats: Dictionary tracking execution statistics. _start_time: Timestamp when execution started. _original_description: Original description before interpolation. _original_expected_output: Original expected_output before interpolation. _original_output_file: Original output_file before interpolation. _original_prompt_context: Original prompt_context before interpolation.

add_context(tasks: list[calute.cortex.orchestration.task.CortexTask] | calute.cortex.orchestration.task.CortexTask)[source]#

Add context tasks whose outputs will be available during execution.

Parameters

tasks – A single CortexTask or list of CortexTasks to add as context. The outputs of these tasks will be included when building the execution prompt.

add_dependency(task: CortexTask)[source]#

Add a task dependency that must complete before this task.

Parameters

task – The CortexTask that must complete before this task executes.

Note

Duplicate dependencies are ignored.

add_retry_condition(condition: Callable)[source]#

Add a custom retry condition function.

Parameters

condition – A callable that takes (error, retry_count, task) and returns True if retry should be attempted, False otherwise.

Note

Duplicate conditions are ignored.

agent: CortexAgent | None = None#
allow_dangerous_tools: bool = False#
async_execution: bool = False#
callback: Callable | None = None#
chain: ChainLink | None = None#
conditional_execution: Callable | None = None#
context: list[CortexTask] | None = None#
context_compression: bool = False#
context_priority: dict[str, float]#
create_directory: bool = True#
create_ui()[source]#

Launch a UI application for interacting with this task.

Creates and launches an interactive user interface that allows users to execute and monitor this task visually.

Returns

The launched UI application instance.

dependencies: list[CortexTask]#
description: str#
error_callback: Callable | None = None#
execute(context_outputs: list[str] | None = None, use_streaming: bool = False, stream_callback: collections.abc.Callable[[Any], None] | None = None) calute.cortex.orchestration.task.CortexTaskOutput | tuple[calute.core.streamer_buffer.StreamerBuffer, threading.Thread][source]#

Execute the task with full Cortex feature support and optional streaming.

Main execution method for CortexTask. Handles the complete task lifecycle including dependency checking, tool restriction enforcement, callback invocation, context building, output validation, retry logic, human interaction, memory persistence, and file output.

Parameters
  • context_outputs – Optional list of output strings from previously completed tasks to include as context. These are combined with any additional context sources (human input, prompt_context) and prioritized according to context_priority weights.

  • use_streaming – If True, executes in a background thread with real-time streaming output. Returns immediately with a buffer/thread tuple. Defaults to False (blocking execution).

  • stream_callback – Optional callback function invoked with each streaming chunk. Only used when use_streaming=True.

Returns

A CortexTaskOutput containing the result string, execution metrics, validation results, and metadata. If use_streaming=True: A tuple of (StreamerBuffer, Thread) for asynchronous consumption of the streaming response.

Return type

If use_streaming=False

Raises
  • ValueError – If no agent is assigned or dependencies are not satisfied.

  • TimeoutError – If execution exceeds the configured timeout and timeout_behavior is “fail”.

  • TaskValidationError – If output validation fails after all retries.

  • Exception – If task fails after all retry attempts and timeout_behavior is not “return_partial”.

Side Effects:
  • Invokes pre_execution_callback before execution.

  • Invokes callback after successful completion.

  • Invokes error_callback on failure.

  • Saves results to memory if save_to_memory is True.

  • Writes output to output_file if configured.

  • Creates output directories if create_directory is True.

  • Applies tool restrictions to the agent if configured.

  • Requests human feedback if human_feedback is enabled.

expected_output: str#
get_execution_stats() dict[source]#

Get a copy of execution statistics from the last run.

Returns

Dictionary containing used_tools, tools_errors, delegations, and retry_count from the most recent execution.

human_feedback: bool = False#
human_input: bool = False#
human_input_prompt: str | None = None#
importance: float = 0.5#
input_validator: Callable | None = None#
interpolate_inputs(inputs: dict[str, Any]) None[source]#

Interpolate inputs into the task’s description, expected output, and other fields.

This method replaces template variables (e.g., {variable_name}) in the task’s attributes with values from the provided inputs dictionary. Original values are preserved for potential re-interpolation.

Parameters

inputs – Dictionary mapping template variables to their values. Supported value types are strings, integers, floats, bools, and serializable dicts/lists.

Side Effects:

Updates description, expected_output, output_file, and prompt_context with interpolated values. Stores original values if not already saved.

Example

>>> task = CortexTask(
...     description="Analyze {topic} trends in {year}",
...     expected_output="Report on {topic} developments"
... )
>>> task.interpolate_inputs({"topic": "AI", "year": 2025})
max_context_length: int = 10000#
max_retries: int = 3#
memory: CortexMemory | None = None#
property output: str | None#

Get the cached output from the last task execution.

Returns

The output string from the most recent execute() call, or None if the task has not been executed.

output_file: str | None = None#
output_json: type[BaseModel] | None = None#
output_pydantic: type[BaseModel] | None = None#
pre_execution_callback: Callable | None = None#
prompt_context: str | None = None#
remove_dependency(task: CortexTask)[source]#

Remove a task from the dependencies list.

Parameters

task – The CortexTask to remove from dependencies.

Note

Silently ignores if task is not in dependencies.

reset_stats()[source]#

Reset execution statistics to initial values.

Clears all counters for tool usage, errors, delegations, and retries. Should be called before re-executing a task if fresh statistics are needed.

retry_conditions: list[Callable]#
save_to_memory: bool = True#
security_config: dict | None = None#
set_callback(callback_type: Literal['pre_execution', 'post_execution', 'error'], callback: Callable) None[source]#

Set a callback function for task lifecycle events.

Parameters
  • callback_type – One of ‘pre_execution’, ‘post_execution’, or ‘error’.

  • callback – The function to call at the specified lifecycle point.

Raises

ValueError – If callback_type is not a recognized type.

set_json_output_model(model: type[pydantic.main.BaseModel])[source]#

Set a Pydantic model for JSON output extraction and validation.

Parameters

model – A Pydantic BaseModel subclass. Output will be parsed as JSON and validated against this model.

set_security_config(**config)[source]#

Update security configuration with provided options.

Parameters

**config – Key-value pairs to merge into security_config.

timeout: int | None = None#
timeout_behavior: Literal['fail', 'continue', 'return_partial'] = 'fail'#
tool_restrictions: list[str] | None = None#
tools: list[CortexTool]#
validate_output_with_model(model: type[pydantic.main.BaseModel])[source]#

Set a Pydantic model for output validation.

Parameters

model – A Pydantic BaseModel subclass for validating output.

class calute.cortex.orchestration.task.CortexTaskOutput(task: CortexTask, output: str, agent: CortexAgent, timestamp: float = <factory>, raw_output: str | None = None, token_usage: dict[str, int] = <factory>, validation_results: dict[str, bool] = <factory>, pydantic_output: object | None = None, json_dict: dict | None = None, execution_time: float = 0.0, used_tools: int = 0, tools_errors: int = 0, delegations: int = 0, retry_count: int = 0, execution_metadata: dict = <factory>, performance_metrics: dict = <factory>)[source]#

Bases: object

Enhanced output container from a completed task with rich metadata.

CortexTaskOutput encapsulates the result of task execution along with comprehensive metadata about the execution process, including timing, validation results, tool usage statistics, and performance metrics.

task#

Reference to the CortexTask that produced this output.

Type

CortexTask

output#

The primary output string from task execution.

Type

str

agent#

The CortexAgent that executed the task.

Type

CortexAgent

timestamp#

Unix timestamp when the output was generated.

Type

float

raw_output#

Unprocessed output before any formatting or validation.

Type

str | None

token_usage#

Dictionary tracking token consumption (e.g., prompt_tokens, completion_tokens, total_tokens).

Type

dict[str, int]

validation_results#

Dictionary of validation check results. Keys are validation type names, values are boolean pass/fail or error strings.

Type

dict[str, bool]

pydantic_output#

Parsed and validated Pydantic model instance if output_json or output_pydantic was specified on the task.

Type

object | None

json_dict#

Dictionary representation if output was valid JSON.

Type

dict | None

execution_time#

Total execution duration in seconds.

Type

float

used_tools#

Count of tool invocations during execution.

Type

int

tools_errors#

Count of tool invocation errors encountered.

Type

int

delegations#

Number of delegations to other agents.

Type

int

retry_count#

Number of retry attempts before success.

Type

int

execution_metadata#

Additional metadata about execution context (e.g., had_human_input, security_applied, validation_applied).

Type

dict

performance_metrics#

Aggregated performance statistics (e.g., avg_execution_time, total_retries).

Type

dict

agent: CortexAgent#
delegations: int = 0#
execution_metadata: dict#
execution_time: float = 0.0#
json_dict: dict | None = None#
output: str#
performance_metrics: dict#
pydantic_output: object | None = None#
raw_output: str | None = None#
retry_count: int = 0#
property summary: str#

Get a truncated summary of the output.

Returns

The first 200 characters of the output with ellipsis if truncated, or the full output if shorter than 200 characters.

task: CortexTask#
timestamp: float#
to_dict() dict[source]#

Convert the task output to a dictionary representation.

Returns

Dictionary containing all task output data including task details, agent information, execution metrics, and validation results. Suitable for serialization or logging purposes.

token_usage: dict[str, int]#
tools_errors: int = 0#
used_tools: int = 0#
validation_results: dict[str, bool]#
exception calute.cortex.orchestration.task.TaskValidationError(message: str)[source]#

Bases: Exception

Exception raised when task output validation fails.

This exception is raised when a task’s output fails to validate against the specified Pydantic model or JSON schema, after all retry attempts have been exhausted.

message#

Detailed description of the validation failure, including which validation rules failed and any parsing errors encountered.