Source code for calute.cortex.orchestration.task

# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Task definition for Cortex framework.

This module provides the CortexTask class and related data structures for defining
and executing tasks within the Cortex multi-agent orchestration framework. Tasks
represent units of work that can be assigned to agents, with support for features
like output validation, chaining, retries, and memory integration.

The module includes:
- CortexTask: Main task class for defining executable work units
- CortexTaskOutput: Rich output container with execution metadata
- ChainLink: Task chaining and conditional routing
- TaskValidationError: Custom exception for validation failures

Key features:
- Output validation with Pydantic models (JSON and XML parsing)
- Task chaining with conditional routing
- Automatic retry with configurable conditions
- Human input/feedback integration
- Memory persistence for results
- Security configurations and tool restrictions
- Context compression and priority weighting
- Streaming execution support
- Dependency management between tasks

Typical usage example:
    from calute.cortex.task import CortexTask
    from calute.cortex.agent import CortexAgent

    agent = CortexAgent(
        role="Data Analyst",
        goal="Analyze data",
        backstory="Expert analyst"
    )

    task = CortexTask(
        description="Analyze quarterly sales data",
        expected_output="Summary report with key insights",
        agent=agent,
        max_retries=3,
        importance=0.8
    )

    result = task.execute()
    print(result.output)
"""

from __future__ import annotations

import json
import os
import threading
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal

from pydantic import BaseModel, ValidationError

from ...core.streamer_buffer import StreamerBuffer
from ...logging.console import log_error, log_retry, log_task_complete, log_task_start, log_warning
from ..core.string_utils import interpolate_inputs
from ..core.tool import CortexTool

if TYPE_CHECKING:
    from ..agents.agent import CortexAgent
    from ..agents.memory_integration import CortexMemory


[docs]class TaskValidationError(Exception): """Exception raised when task output validation fails. This exception is raised when a task's output fails to validate against the specified Pydantic model or JSON schema, after all retry attempts have been exhausted. Attributes: message: Detailed description of the validation failure, including which validation rules failed and any parsing errors encountered. """ def __init__(self, message: str): """Initialize the validation error. Args: message: Description of the validation failure. """ self.message = message super().__init__(self.message)
[docs]@dataclass class CortexTaskOutput: """Enhanced output container from a completed task with rich metadata. CortexTaskOutput encapsulates the result of task execution along with comprehensive metadata about the execution process, including timing, validation results, tool usage statistics, and performance metrics. Attributes: task: Reference to the CortexTask that produced this output. output: The primary output string from task execution. agent: The CortexAgent that executed the task. timestamp: Unix timestamp when the output was generated. raw_output: Unprocessed output before any formatting or validation. token_usage: Dictionary tracking token consumption (e.g., prompt_tokens, completion_tokens, total_tokens). validation_results: Dictionary of validation check results. Keys are validation type names, values are boolean pass/fail or error strings. pydantic_output: Parsed and validated Pydantic model instance if output_json or output_pydantic was specified on the task. json_dict: Dictionary representation if output was valid JSON. execution_time: Total execution duration in seconds. used_tools: Count of tool invocations during execution. tools_errors: Count of tool invocation errors encountered. delegations: Number of delegations to other agents. retry_count: Number of retry attempts before success. execution_metadata: Additional metadata about execution context (e.g., had_human_input, security_applied, validation_applied). performance_metrics: Aggregated performance statistics (e.g., avg_execution_time, total_retries). """ task: CortexTask output: str agent: CortexAgent timestamp: float = field(default_factory=time.time) raw_output: str | None = None token_usage: dict[str, int] = field(default_factory=dict) validation_results: dict[str, bool] = field(default_factory=dict) pydantic_output: object | None = None json_dict: dict | None = None execution_time: float = 0.0 used_tools: int = 0 tools_errors: int = 0 delegations: int = 0 retry_count: int = 0 execution_metadata: dict = field(default_factory=dict) performance_metrics: dict = field(default_factory=dict) @property def summary(self) -> str: """Get a truncated summary of the output. Returns: The first 200 characters of the output with ellipsis if truncated, or the full output if shorter than 200 characters. """ return self.output[:200] + "..." if len(self.output) > 200 else self.output
[docs] def to_dict(self) -> dict: """Convert the task output to a dictionary representation. Returns: Dictionary containing all task output data including task details, agent information, execution metrics, and validation results. Suitable for serialization or logging purposes. """ return { "task_description": self.task.description, "expected_output": self.task.expected_output, "actual_output": self.output, "agent_role": self.agent.role, "timestamp": self.timestamp, "execution_time": self.execution_time, "token_usage": self.token_usage, "validation_results": self.validation_results, "used_tools": self.used_tools, "tools_errors": self.tools_errors, "delegations": self.delegations, "retry_count": self.retry_count, "execution_metadata": self.execution_metadata, "performance_metrics": self.performance_metrics, }
def __str__(self) -> str: """Return a formatted string showing agent, task, output, and execution time. Returns: Multi-line string with agent role, truncated task description, output summary, and execution duration. """ return f"""Task Output: Agent: {self.agent.role} Task: {self.task.description[:256]}... Output: {self.summary} Execution Time: {self.execution_time:.2f}s """ def __repr__(self) -> str: """Return a concise developer-facing representation of the output. Returns: String showing agent role and total output length. """ return f"CortexTaskOutput(agent={self.agent.role}, output_length={len(self.output)})"
[docs]@dataclass class CortexTask: """Task definition for execution within the Cortex framework. CortexTask represents a unit of work to be executed by a CortexAgent. It provides comprehensive configuration options for task execution including output validation, retry logic, human interaction, security controls, and context management. Attributes: description: Detailed description of what the task should accomplish. expected_output: Description of what successful output should look like. agent: The CortexAgent assigned to execute this task. tools: List of CortexTool instances available for this specific task. context: List of prerequisite tasks whose outputs provide context. output_file: Optional file path to save the task output. human_feedback: Whether to request human feedback after execution. chain: Optional ChainLink for conditional task routing. max_retries: Maximum retry attempts on failure (default: 3). memory: CortexMemory instance for persisting task results. save_to_memory: Whether to automatically save results to memory. importance: Task importance score from 0.0 to 1.0 for prioritization. output_json: Pydantic model class for JSON output validation. output_pydantic: Pydantic model class for structured output parsing. create_directory: Whether to create output directory if missing. async_execution: Whether to execute asynchronously (reserved). callback: Function called after successful task completion. pre_execution_callback: Function called before task execution starts. error_callback: Function called when an error occurs. human_input: Whether to prompt for human input during execution. human_input_prompt: Custom prompt for human input requests. input_validator: Function to validate human input before accepting. security_config: Dictionary of security-related configuration options. tool_restrictions: List of allowed tool names (restricts agent tools). allow_dangerous_tools: Whether to permit dangerous tool usage. prompt_context: Additional context string to include in the prompt. context_priority: Priority weights for context sources (higher = more important). context_compression: Whether to compress context when exceeding limits. max_context_length: Maximum character length for context (default: 10000). dependencies: List of tasks that must complete before this task. conditional_execution: Function that returns whether to execute the task. retry_conditions: List of functions determining retry eligibility. timeout_behavior: Action on timeout - 'fail', 'continue', or 'return_partial'. timeout: Maximum execution time in seconds before timeout. Private Attributes: _output: Cached output string from last execution. _execution_stats: Dictionary tracking execution statistics. _start_time: Timestamp when execution started. _original_description: Original description before interpolation. _original_expected_output: Original expected_output before interpolation. _original_output_file: Original output_file before interpolation. _original_prompt_context: Original prompt_context before interpolation. """ description: str expected_output: str agent: CortexAgent | None = None tools: list[CortexTool] = field(default_factory=list) context: list[CortexTask] | None = None output_file: str | None = None human_feedback: bool = False chain: ChainLink | None = None max_retries: int = 3 memory: CortexMemory | None = None save_to_memory: bool = True importance: float = 0.5 output_json: type[BaseModel] | None = None output_pydantic: type[BaseModel] | None = None create_directory: bool = True async_execution: bool = False callback: Callable | None = None pre_execution_callback: Callable | None = None error_callback: Callable | None = None human_input: bool = False human_input_prompt: str | None = None input_validator: Callable | None = None security_config: dict | None = None tool_restrictions: list[str] | None = None allow_dangerous_tools: bool = False prompt_context: str | None = None context_priority: dict[str, float] = field(default_factory=dict) context_compression: bool = False max_context_length: int = 10000 dependencies: list[CortexTask] = field(default_factory=list) conditional_execution: Callable | None = None retry_conditions: list[Callable] = field(default_factory=list) timeout_behavior: Literal["fail", "continue", "return_partial"] = "fail" timeout: int | None = None _output: str | None = None _execution_stats: dict = field(default_factory=dict) _start_time: float = 0.0 _original_description: str | None = None _original_expected_output: str | None = None _original_output_file: str | None = None _original_prompt_context: str | None = None
[docs] def interpolate_inputs(self, inputs: dict[str, Any]) -> None: """ Interpolate inputs into the task's description, expected output, and other fields. This method replaces template variables (e.g., {variable_name}) in the task's attributes with values from the provided inputs dictionary. Original values are preserved for potential re-interpolation. Args: inputs: Dictionary mapping template variables to their values. Supported value types are strings, integers, floats, bools, and serializable dicts/lists. Side Effects: Updates description, expected_output, output_file, and prompt_context with interpolated values. Stores original values if not already saved. Example: >>> task = CortexTask( ... description="Analyze {topic} trends in {year}", ... expected_output="Report on {topic} developments" ... ) >>> task.interpolate_inputs({"topic": "AI", "year": 2025}) """ if self._original_description is None: self._original_description = self.description if self._original_expected_output is None: self._original_expected_output = self.expected_output if self.output_file is not None and self._original_output_file is None: self._original_output_file = self.output_file if self.prompt_context is not None and self._original_prompt_context is None: self._original_prompt_context = self.prompt_context if inputs: self.description = interpolate_inputs(input_string=self._original_description, inputs=inputs) self.expected_output = interpolate_inputs(input_string=self._original_expected_output, inputs=inputs) if self.output_file: self.output_file = interpolate_inputs(input_string=self._original_output_file, inputs=inputs) if self.prompt_context: self.prompt_context = interpolate_inputs(input_string=self._original_prompt_context, inputs=inputs)
def _extract_json_from_output(self, output: str) -> str | None: """Extract JSON from output that may contain thinking process or other text. Attempts multiple extraction strategies to find valid JSON: 1. JSON within markdown code blocks (```json or ```) 2. Regex patterns for nested JSON objects 3. Character-by-character brace matching Args: output: The raw output string potentially containing JSON. Returns: The extracted JSON string if found and valid, None otherwise. """ import re json_patterns = [ r"```json\s*(\{.*?\})\s*```", r"```\s*(\{.*?\})\s*```", r"(\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\})", ] for pattern in json_patterns: matches = re.findall(pattern, output, re.DOTALL) for match in matches: try: json.loads(match) return match except json.JSONDecodeError: continue brace_level = 0 start_idx = -1 for i, char in enumerate(output): if char == "{": if brace_level == 0: start_idx = i brace_level += 1 elif char == "}": brace_level -= 1 if brace_level == 0 and start_idx != -1: potential_json = output[start_idx : i + 1] try: json.loads(potential_json) return potential_json except json.JSONDecodeError: continue return None def _extract_xml_content(self, output: str, tag: str) -> str | None: """Extract content from XML tags in the output. Args: output: The raw output string containing XML. tag: The XML tag name to extract content from. Returns: The content between the specified tags (stripped of whitespace), or None if the tag is not found. """ import re pattern = f"<{tag}>(.*?)</{tag}>" match = re.search(pattern, output, re.DOTALL) return match.group(1).strip() if match else None def _parse_xml_to_dict(self, xml_content: str) -> dict: """Parse XML content to dictionary for Pydantic validation. Recursively converts XML elements to a nested dictionary structure. Handles repeated elements by converting them to lists. Args: xml_content: The XML string to parse. Returns: Dictionary representation of the XML structure. Returns empty dict if parsing fails. """ import xml.etree.ElementTree as ET try: root = ET.fromstring(f"<root>{xml_content}</root>") def xml_to_dict(element) -> dict: """Recursively convert an XML element tree to a nested dictionary. Args: element: An xml.etree.ElementTree.Element to convert. Returns: Dictionary representation of the element's children. Repeated tags are converted to lists. """ result = {} for child in element: if len(child) > 0: if child.tag in result: if not isinstance(result[child.tag], list): result[child.tag] = [result[child.tag]] result[child.tag].append(xml_to_dict(child)) else: result[child.tag] = xml_to_dict(child) else: if child.tag in result: if not isinstance(result[child.tag], list): result[child.tag] = [result[child.tag]] result[child.tag].append(child.text or "") else: result[child.tag] = child.text or "" return result return xml_to_dict(root) except ET.ParseError: return {} def _validate_output(self, output: str) -> tuple[bool, Any, dict]: """Validate task output against configured Pydantic models. Supports multiple extraction strategies for finding structured data within unstructured output, including JSON and XML parsing. Args: output: The raw output string to validate. Returns: Tuple of (validation_passed, pydantic_output, validation_results): - validation_passed: True if all validations succeeded. - pydantic_output: Parsed Pydantic model instance if successful. - validation_results: Dictionary with validation details and any errors. """ validation_results = {} pydantic_output = None if self.output_json: json_data = None try: json_data = json.loads(output) validation_results["extraction_method"] = "direct_json" except json.JSONDecodeError: extracted_json = self._extract_json_from_output(output) if extracted_json: try: json_data = json.loads(extracted_json) validation_results["extraction_method"] = "extracted_json" validation_results["extracted_content"] = ( extracted_json[:200] + "..." if len(extracted_json) > 200 else extracted_json ) except json.JSONDecodeError: pass if not json_data: xml_content = self._extract_xml_content(output, "json") or self._extract_xml_content( output, "output" ) if xml_content: try: json_data = json.loads(xml_content) validation_results["extraction_method"] = "xml_json" except json.JSONDecodeError: dict_data = self._parse_xml_to_dict(xml_content) if dict_data: json_data = dict_data validation_results["extraction_method"] = "xml_dict" if json_data: try: pydantic_output = self.output_json.parse_obj(json_data) validation_results["output_json"] = True except ValidationError as e: validation_results["output_json"] = False validation_results["output_json_error"] = str(e) else: validation_results["output_json"] = False validation_results["output_json_error"] = "No valid JSON found in output" if self.output_pydantic: try: pydantic_output = self.output_pydantic.parse_raw(output) validation_results["output_pydantic"] = True except ValidationError as e: validation_results["output_pydantic"] = False validation_results["output_pydantic_error"] = str(e) validation_passed = all( result for key, result in validation_results.items() if not key.endswith("_error") and key not in ["extraction_method", "extracted_content"] ) return validation_passed, pydantic_output, validation_results def _execute_callback(self, callback: Callable | None, *args, **kwargs): """Safely execute a callback function with error handling. If the callback raises an exception and an error_callback is configured, the error is passed to the error_callback. Otherwise, the error is printed. Args: callback: The callback function to execute, or None to skip. *args: Positional arguments to pass to the callback. **kwargs: Keyword arguments to pass to the callback. Returns: The return value of the callback, or None if callback is None or if an exception occurred. """ if callback and callable(callback): try: return callback(*args, **kwargs) except Exception as e: if self.error_callback: self._execute_callback(self.error_callback, e, self) else: print(f"Callback error: {e}") def _check_dependencies(self) -> bool: """Check if all task dependencies have been completed. Returns: True if all dependencies have output (are completed), False if any dependency is still pending. """ for dep in self.dependencies: if not dep.output: return False return True def _should_retry(self, error: Exception, retry_count: int) -> bool: """Determine if the task should be retried after an error. Checks against max_retries limit and evaluates any custom retry condition functions that have been configured. Args: error: The exception that caused the failure. retry_count: Current number of retry attempts made. Returns: True if retry should be attempted, False otherwise. """ if retry_count >= self.max_retries: return False for condition in self.retry_conditions: try: if not condition(error, retry_count, self): return False except Exception: continue return True def _get_human_input(self) -> str: """Prompt for and collect human input with optional validation. Displays the configured prompt and waits for user input. If an input_validator is configured, continues prompting until valid input is received. Returns: The validated user input string. """ prompt = self.human_input_prompt or "Please provide input for this task: " while True: user_input = input(f"\n🤔 {prompt}") if self.input_validator: try: if self.input_validator(user_input): return user_input else: print("❌ Input validation failed. Please try again.") continue except Exception as e: print(f"❌ Input validation error: {e}. Please try again.") continue return user_input def _create_output_directory(self): """Create the output file's parent directory if it doesn't exist. Only creates directories if both output_file is specified and create_directory is True. Creates all intermediate directories as needed (like mkdir -p). """ if self.output_file and self.create_directory: output_path = Path(self.output_file) output_path.parent.mkdir(parents=True, exist_ok=True) def _apply_tool_restrictions(self): """Apply tool restrictions to the assigned agent. Filters the agent's available tools to only include those specified in tool_restrictions. This provides security control over which tools can be used during task execution. Side Effects: Modifies the agent's tools list to only include allowed tools. """ if not self.tool_restrictions: return if self.agent: allowed_tools = [] for tool in self.agent.tools: tool_name = tool.__class__.__name__ if tool_name in self.tool_restrictions: allowed_tools.append(tool) self.agent.tools = allowed_tools def _build_enhanced_context(self, context: str, context_outputs: list[str] | None, human_input: str) -> str: """Build enhanced context with priority weighting and optional compression. Combines context from multiple sources (previous task outputs, base context, human input) and orders them by priority. Optionally compresses the result if it exceeds max_context_length. Args: context: Base context string from previous outputs. context_outputs: List of output strings from context tasks. human_input: Optional human input to include in context. Returns: Combined context string, sorted by priority and optionally compressed. """ context_parts = [] if context_outputs: for i, ctx_output in enumerate(context_outputs): priority = self.context_priority.get(f"context_{i}", 1.0) if priority > 0: context_parts.append((ctx_output, priority)) if context: priority = self.context_priority.get("base_context", 1.0) context_parts.append((context, priority)) if human_input: priority = self.context_priority.get("human_input", 1.5) context_parts.append((f"Human Input: {human_input}", priority)) context_parts.sort(key=lambda x: x[1], reverse=True) final_context = "\n\n".join([part[0] for part in context_parts]) if self.context_compression and len(final_context) > self.max_context_length: compressed_parts = [] current_length = 0 for ctx, _priority in context_parts: if current_length + len(ctx) <= self.max_context_length: compressed_parts.append(ctx) current_length += len(ctx) else: remaining = self.max_context_length - current_length if remaining > 100: compressed_parts.append(ctx[:remaining] + "...") break final_context = "\n\n".join(compressed_parts) return final_context def _create_empty_output(self, reason: str) -> CortexTaskOutput: """Create an empty output for tasks that were skipped. Used when a task is not executed (e.g., conditional execution returned False or dependencies weren't met). Args: reason: Description of why the task was skipped. Returns: CortexTaskOutput with the reason as output and skipped=True in execution_metadata. """ return CortexTaskOutput( task=self, output=reason, agent=self.agent, timestamp=time.time(), execution_metadata={"skipped": True, "reason": reason}, )
[docs] def execute( self, context_outputs: list[str] | None = None, use_streaming: bool = False, stream_callback: Callable[[Any], None] | None = None, ) -> CortexTaskOutput | tuple[StreamerBuffer, threading.Thread]: """Execute the task with full Cortex feature support and optional streaming. Main execution method for CortexTask. Handles the complete task lifecycle including dependency checking, tool restriction enforcement, callback invocation, context building, output validation, retry logic, human interaction, memory persistence, and file output. Args: context_outputs: Optional list of output strings from previously completed tasks to include as context. These are combined with any additional context sources (human input, prompt_context) and prioritized according to ``context_priority`` weights. use_streaming: If True, executes in a background thread with real-time streaming output. Returns immediately with a buffer/thread tuple. Defaults to False (blocking execution). stream_callback: Optional callback function invoked with each streaming chunk. Only used when ``use_streaming=True``. Returns: If ``use_streaming=False``: A CortexTaskOutput containing the result string, execution metrics, validation results, and metadata. If ``use_streaming=True``: A tuple of (StreamerBuffer, Thread) for asynchronous consumption of the streaming response. Raises: ValueError: If no agent is assigned or dependencies are not satisfied. TimeoutError: If execution exceeds the configured ``timeout`` and ``timeout_behavior`` is "fail". TaskValidationError: If output validation fails after all retries. Exception: If task fails after all retry attempts and ``timeout_behavior`` is not "return_partial". Side Effects: - Invokes ``pre_execution_callback`` before execution. - Invokes ``callback`` after successful completion. - Invokes ``error_callback`` on failure. - Saves results to memory if ``save_to_memory`` is True. - Writes output to ``output_file`` if configured. - Creates output directories if ``create_directory`` is True. - Applies tool restrictions to the agent if configured. - Requests human feedback if ``human_feedback`` is enabled. """ if not self.agent: raise ValueError("Task must have an assigned agent") if self.dependencies and not self._check_dependencies(): raise ValueError("Task dependencies not satisfied") log_task_start( self.description[:50] + "..." if len(self.description) > 50 else self.description, self.agent.role ) self._apply_tool_restrictions() self._execute_callback(self.pre_execution_callback, self) self._create_output_directory() for tool in self.tools: if tool not in self.agent.tools: self.agent.tools.append(tool) context = "" if context_outputs: context = "\n\n".join(context_outputs) retries = 0 last_error = None start_time = time.time() self._start_time = start_time self._execution_stats = { "used_tools": 0, "tools_errors": 0, "delegations": 0, "retry_count": 0, } while retries <= self.max_retries: try: if self.timeout and (time.time() - start_time) > self.timeout: if self.timeout_behavior == "fail": raise TimeoutError(f"Task timeout after {self.timeout}s") elif self.timeout_behavior == "continue": break if self.conditional_execution and not self.conditional_execution(self): return self._create_empty_output("Conditional execution failed") human_input_text = "" if self.human_input: human_input_text = self._get_human_input() enhanced_context = self._build_enhanced_context(context, context_outputs, human_input_text) task_prompt = f"{self.description}\n\nExpected Output: {self.expected_output}" if self.prompt_context: task_prompt += f"\n\nAdditional Context: {self.prompt_context}" if hasattr(self.agent, "_generate_format_guidance") and (self.output_json or self.output_pydantic): output_model = self.output_json or self.output_pydantic format_guidance = self.agent._generate_format_guidance(output_model) if format_guidance: task_prompt += format_guidance initial_delegations = getattr(self.agent, "_delegation_count", 0) if use_streaming: buffer, thread = self.agent.execute( task_description=task_prompt, context=enhanced_context, use_thread=True, ) if stream_callback: def process_stream() -> None: """Consume streaming chunks from the buffer and forward each to the callback.""" for chunk in buffer.stream(): # noqa stream_callback(chunk) callback_thread = threading.Thread(target=process_stream, daemon=True) callback_thread.start() buffer.task = self # type: ignore buffer.agent = self.agent # type: ignore return buffer, thread if self.agent.allow_delegation: result = self.agent.execute_with_delegation(task_description=task_prompt, context=enhanced_context) else: result = self.agent.execute(task_description=task_prompt, context=enhanced_context) final_delegations = getattr(self.agent, "_delegation_count", 0) self._execution_stats["delegations"] = final_delegations - initial_delegations validation_passed = True pydantic_output = None validation_results = {} if self.output_json or self.output_pydantic: validation_passed, pydantic_output, validation_results = self._validate_output(result) if not validation_passed: if retries < self.max_retries: retries += 1 self._execution_stats["retry_count"] = retries error_details = [] for key, value in validation_results.items(): if key.endswith("_error"): error_details.append(f"{key}: {value}") error_msg = f"Output validation failed (attempt {retries}/{self.max_retries}): {'; '.join(error_details)}" log_retry(retries, self.max_retries, error_msg) continue else: raise TaskValidationError(f"Output validation failed: {validation_results}") self._output = result if self.output_file: with open(self.output_file, "w") as f: f.write(result) if self.human_feedback and os.getenv("ALLOW_HUMAN_FEEDBACK", "0") == "1": feedback = input("\n💭 Please provide feedback on this output (or press Enter to accept): ") if feedback: result = self.agent.execute( task_description=( f"Revise the following based on feedback:\n{result}\n\nFeedback: {feedback}" ), context=enhanced_context, ) self._output = result if self.save_to_memory and self.memory: self.memory.save_task_result( task_description=self.description, result=result, agent_role=self.agent.role, importance=self.importance, task_metadata={ "expected_output": self.expected_output[:100] if self.expected_output else "", "tools_used": [tool.__class__.__name__ for tool in self.tools], "had_context": bool(context_outputs), "had_human_input": self.human_input, "validation_applied": bool(self.output_json or self.output_pydantic), }, ) execution_time = time.time() - start_time task_output = CortexTaskOutput( task=self, output=result, agent=self.agent, timestamp=start_time, raw_output=result, execution_time=execution_time, used_tools=self._execution_stats.get("used_tools", 0), tools_errors=self._execution_stats.get("tools_errors", 0), delegations=self._execution_stats.get("delegations", 0), retry_count=self._execution_stats.get("retry_count", 0), validation_results=validation_results, pydantic_output=pydantic_output, execution_metadata={ "had_human_input": self.human_input, "had_dependencies": bool(self.dependencies), "security_applied": bool(self.tool_restrictions), "validation_applied": bool(self.output_json or self.output_pydantic), }, performance_metrics={"avg_execution_time": execution_time, "total_retries": retries}, ) try: task_output.json_dict = json.loads(result) except (json.JSONDecodeError, TypeError): pass self._execute_callback(self.callback, task_output, self) log_task_complete( self.description[:50] + "..." if len(self.description) > 50 else self.description, execution_time ) return task_output except Exception as e: last_error = e self._execution_stats["tools_errors"] += 1 self._execute_callback(self.error_callback, e, self) if self._should_retry(e, retries): retries += 1 self._execution_stats["retry_count"] = retries log_retry(retries, self.max_retries, str(e)) if retries < self.max_retries: log_warning("Retrying in 5 seconds...") time.sleep(5) else: log_error(f"Task failed after all retries: {e!s}") break execution_time = time.time() - start_time failure_output = CortexTaskOutput( task=self, output=f"Task failed after {retries} retries: {last_error}", agent=self.agent, timestamp=start_time, execution_time=execution_time, retry_count=retries, execution_metadata={"failed": True, "last_error": str(last_error)}, ) if self.timeout_behavior == "return_partial": return failure_output else: raise Exception(f"Task failed after {retries} retries: {last_error}")
[docs] def get_execution_stats(self) -> dict: """Get a copy of execution statistics from the last run. Returns: Dictionary containing used_tools, tools_errors, delegations, and retry_count from the most recent execution. """ return self._execution_stats.copy()
[docs] def reset_stats(self): """Reset execution statistics to initial values. Clears all counters for tool usage, errors, delegations, and retries. Should be called before re-executing a task if fresh statistics are needed. """ self._execution_stats = { "used_tools": 0, "tools_errors": 0, "delegations": 0, "retry_count": 0, }
@property def output(self) -> str | None: """Get the cached output from the last task execution. Returns: The output string from the most recent execute() call, or None if the task has not been executed. """ return self._output
[docs] def add_dependency(self, task: CortexTask): """Add a task dependency that must complete before this task. Args: task: The CortexTask that must complete before this task executes. Note: Duplicate dependencies are ignored. """ if task not in self.dependencies: self.dependencies.append(task)
[docs] def remove_dependency(self, task: CortexTask): """Remove a task from the dependencies list. Args: task: The CortexTask to remove from dependencies. Note: Silently ignores if task is not in dependencies. """ if task in self.dependencies: self.dependencies.remove(task)
[docs] def add_context(self, tasks: list[CortexTask] | CortexTask): """Add context tasks whose outputs will be available during execution. Args: tasks: A single CortexTask or list of CortexTasks to add as context. The outputs of these tasks will be included when building the execution prompt. """ if not isinstance(tasks, list): tasks = [tasks] if self.context is None: self.context = [] for task in tasks: self.context.append(task)
[docs] def set_callback( self, callback_type: Literal["pre_execution", "post_execution", "error"], callback: Callable, ) -> None: """Set a callback function for task lifecycle events. Args: callback_type: One of 'pre_execution', 'post_execution', or 'error'. callback: The function to call at the specified lifecycle point. Raises: ValueError: If callback_type is not a recognized type. """ if callback_type == "pre_execution": self.pre_execution_callback = callback elif callback_type == "post_execution": self.callback = callback elif callback_type == "error": self.error_callback = callback else: raise ValueError(f"Unknown callback type: {callback_type}")
[docs] def add_retry_condition(self, condition: Callable): """Add a custom retry condition function. Args: condition: A callable that takes (error, retry_count, task) and returns True if retry should be attempted, False otherwise. Note: Duplicate conditions are ignored. """ if condition not in self.retry_conditions: self.retry_conditions.append(condition)
[docs] def set_security_config(self, **config): """Update security configuration with provided options. Args: **config: Key-value pairs to merge into security_config. """ if self.security_config is None: self.security_config = {} self.security_config.update(config)
[docs] def validate_output_with_model(self, model: type[BaseModel]): """Set a Pydantic model for output validation. Args: model: A Pydantic BaseModel subclass for validating output. """ self.output_pydantic = model
[docs] def set_json_output_model(self, model: type[BaseModel]): """Set a Pydantic model for JSON output extraction and validation. Args: model: A Pydantic BaseModel subclass. Output will be parsed as JSON and validated against this model. """ self.output_json = model
[docs] def create_ui(self): """Launch a UI application for interacting with this task. Creates and launches an interactive user interface that allows users to execute and monitor this task visually. Returns: The launched UI application instance. """ from calute.ui import launch_application return launch_application(executor=self)