# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Dynamic prompt-based execution for Cortex framework.
This module provides dynamic task creation and execution capabilities for the
Cortex framework. It enables building workflows from natural language prompts
without pre-defining task structures, making it easier to create flexible
and adaptive multi-agent systems.
Key features:
- Dynamic task creation from natural language prompts
- Automatic task chaining with context passing
- Streaming execution support
- Integration with TaskCreator for intelligent task decomposition
- Flexible agent assignment strategies
The module provides two main classes:
- DynamicTaskBuilder: Utility for creating tasks from prompts
- DynamicCortex: Extended Cortex with dynamic execution capabilities
Typical usage example:
# Create dynamic cortex with agents
cortex = create_dynamic_cortex(
agents=[analyst, writer],
llm=my_llm,
process=ProcessType.SEQUENTIAL
)
# Execute a single prompt
result = cortex.execute_prompt("Analyze the sales data", agent="Data Analyst")
# Or create and execute tasks from a complex prompt
result = cortex.execute_with_task_creation(
prompt="Research AI trends and write a blog post about them"
)
"""
import threading
from collections.abc import Callable
from typing import Any
from calute.cortex.agents.memory_integration import CortexMemory
from calute.memory.compat import MemoryType
from ...core.streamer_buffer import StreamerBuffer
from ...llms import BaseLLM
from ..agents.agent import CortexAgent
from ..core.enums import ProcessType
from .cortex import Cortex, MemoryConfig
from .task import CortexTask
from .task_creator import TaskCreationPlan, TaskCreator
[docs]class DynamicTaskBuilder:
"""Utility for creating tasks dynamically from prompts.
DynamicTaskBuilder provides static methods for converting natural language
prompts into CortexTask objects. It supports single task creation and
chaining multiple prompts into a sequence of dependent tasks.
This class is designed as a stateless utility and should be used through
its static methods rather than instantiation.
Example:
# Create a single task
task = DynamicTaskBuilder.from_prompt(
"Analyze the quarterly report",
agent=analyst_agent
)
# Create a chain of tasks
tasks = DynamicTaskBuilder.chain_prompts(
prompts=["Research topic", "Write draft", "Review and edit"],
agents=[researcher, writer, editor]
)
"""
[docs] @staticmethod
def from_prompt(
prompt: str,
agent: CortexAgent | None = None,
expected_output: str = "Complete the requested task",
tools: list | None = None,
**task_kwargs,
) -> CortexTask:
"""Create a CortexTask dynamically from a natural language prompt.
Converts a simple text prompt into a fully configured CortexTask
instance. The prompt becomes the task description, and optional
parameters allow customization of agent assignment, expected output,
and available tools.
Args:
prompt: The natural language instruction or prompt describing
what the task should accomplish. Becomes the task's
``description`` field.
agent: Optional CortexAgent to assign to this task. If None,
the agent can be assigned later before execution.
expected_output: Description of what successful task completion
looks like. Defaults to "Complete the requested task".
tools: Optional list of tool instances to make available for
this specific task. Defaults to an empty list.
**task_kwargs: Additional keyword arguments passed directly to
the CortexTask constructor (e.g., ``importance``,
``max_retries``, ``human_feedback``).
Returns:
A new CortexTask instance configured with the provided prompt
as its description and the specified parameters.
Example:
>>> task = DynamicTaskBuilder.from_prompt(
... "Analyze the quarterly sales data",
... agent=analyst_agent,
... expected_output="Summary report with key metrics",
... importance=0.8
... )
"""
return CortexTask(
description=prompt, expected_output=expected_output, agent=agent, tools=tools or [], **task_kwargs
)
[docs] @staticmethod
def chain_prompts(
prompts: list[str], agents: list[CortexAgent] | None = None, use_context: bool = True
) -> list[CortexTask]:
"""Create a chain of dependent tasks from a list of prompts.
Converts multiple text prompts into a sequence of CortexTask instances
with optional context dependencies, where each task can receive the
output of the previous task as context. Agents are assigned in a
round-robin fashion if fewer agents than prompts are provided.
Args:
prompts: List of natural language prompt strings, each defining
one task in the chain. Tasks are created in the order given.
agents: Optional list of CortexAgent instances to assign to tasks.
Agents are matched to tasks by index. If fewer agents than
prompts, agents cycle (wrapping via modulo). If None, all
tasks are created without agent assignments.
use_context: Whether each task should receive the output of the
immediately preceding task as context. When True, a context
dependency is set on the previous task. Defaults to True.
Returns:
List of CortexTask instances in execution order. If ``use_context``
is True, each task (except the first) has a context dependency
on the preceding task.
Example:
>>> tasks = DynamicTaskBuilder.chain_prompts(
... prompts=["Research the topic", "Write a draft", "Edit and polish"],
... agents=[researcher, writer, editor],
... use_context=True
... )
>>> len(tasks)
3
"""
tasks = []
for i, prompt in enumerate(prompts):
agent = None
if agents:
agent = agents[i % len(agents)]
task = CortexTask(
description=prompt,
expected_output="Complete the requested task and provide detailed output",
agent=agent,
context=tasks[-1:] if use_context and tasks else None,
)
tasks.append(task)
return tasks
[docs]class DynamicCortex(Cortex):
"""Extended Cortex with dynamic prompt execution capabilities.
DynamicCortex extends the base Cortex class with methods for creating
and executing tasks dynamically from natural language prompts. This
enables more flexible workflows where tasks don't need to be pre-defined,
and the system can adapt to user inputs at runtime.
Key capabilities:
- Execute single prompts with automatic agent selection
- Create tasks from complex objectives using TaskCreator
- Chain multiple prompts with automatic context passing
- Stream responses for real-time feedback
- Launch interactive UI for prompt execution
Attributes:
task_creator: Optional TaskCreator instance for intelligent task
decomposition. Initialized lazily when first needed.
Inherits all attributes from Cortex base class including:
agents, tasks, llm, process, memory, verbose, etc.
Example:
cortex = DynamicCortex(
agents=[researcher, writer],
tasks=[], # Can be empty initially
llm=my_llm
)
# Execute a simple prompt
result = cortex.execute_prompt("Summarize this article")
# Or use task creation for complex objectives
result = cortex.execute_with_task_creation(
prompt="Create a marketing plan for our new product"
)
"""
def __init__(
self,
agents: list[CortexAgent],
tasks: list[CortexTask],
llm: BaseLLM,
process: ProcessType = ProcessType.SEQUENTIAL,
manager_agent: CortexAgent | None = None,
memory_type: MemoryType = MemoryType.SHORT_TERM,
verbose: bool = True,
max_iterations: int = 10,
model: str = "gpt-4",
memory: CortexMemory | None = None,
memory_config: MemoryConfig | None = None,
reinvoke_after_function: bool = True,
enable_calute_memory: bool = False,
cortex_name: str = "CorTex",
):
"""Initialize DynamicCortex with optional TaskCreator.
Creates a new DynamicCortex instance with all capabilities of the
base Cortex class plus dynamic task creation and execution methods.
Args:
agents: List of CortexAgent instances available for task execution.
tasks: Initial list of tasks (can be empty for dynamic execution).
llm: BaseLLM instance for language model interactions.
process: Execution process type (SEQUENTIAL, PARALLEL, or HIERARCHICAL).
Defaults to SEQUENTIAL.
manager_agent: Optional agent for hierarchical process management.
memory_type: Type of memory to use for context. Defaults to SHORT_TERM.
verbose: Whether to enable detailed logging output. Defaults to True.
max_iterations: Maximum retry attempts for failed tasks. Defaults to 10.
model: Default model identifier for agents. Defaults to "gpt-4".
memory: Optional CortexMemory instance for persistent context.
memory_config: Optional MemoryConfig for memory settings.
reinvoke_after_function: Whether to reinvoke LLM after tool calls.
Defaults to True.
enable_calute_memory: Whether to enable Calute-level memory.
Defaults to False.
cortex_name: Name identifier for this Cortex instance.
Defaults to "CorTex".
"""
super().__init__(
agents=agents,
tasks=tasks,
llm=llm,
process=process,
manager_agent=manager_agent,
memory_type=memory_type,
verbose=verbose,
max_iterations=max_iterations,
model=model,
memory=memory,
memory_config=memory_config,
reinvoke_after_function=reinvoke_after_function,
enable_calute_memory=enable_calute_memory,
cortex_name=cortex_name,
)
self.task_creator = None
[docs] def create_tasks_from_prompt(
self,
prompt: str,
background: str | None = None,
auto_assign: bool = True,
stream: bool = False,
stream_callback: Callable[[Any], None] | None = None,
) -> TaskCreationPlan | tuple[TaskCreationPlan, list[CortexTask]]:
"""Create tasks dynamically from a natural language prompt using TaskCreator.
Lazily initializes a TaskCreator instance and uses it to analyze the
provided objective, generating a structured task breakdown. If
``auto_assign`` is True and agents are available, the created tasks
are automatically assigned to appropriate agents and stored on this
DynamicCortex instance.
Args:
prompt: The natural language objective or goal to break down
into actionable tasks.
background: Optional approach or contextual information to guide
the task creation strategy. Helps the creator understand
preferred methodology.
auto_assign: Whether to automatically assign agents to the
generated tasks based on role matching. Defaults to True.
stream: Whether to stream the LLM response during task creation
for real-time feedback. Defaults to False.
stream_callback: Optional callback function invoked with each
streamed chunk during creation. Only used when ``stream=True``.
Returns:
If ``auto_assign`` is True and agents produce CortexTask instances:
Tuple of (TaskCreationPlan, list[CortexTask]) where tasks are
also stored on ``self.tasks``.
Otherwise:
The TaskCreationPlan alone.
Side Effects:
- Initializes ``self.task_creator`` on first call.
- Updates ``self.tasks`` with created CortexTask instances when
auto_assign produces tasks.
"""
if not self.task_creator:
self.task_creator = TaskCreator(verbose=self.verbose, llm=self.llm, auto_assign_agents=auto_assign)
result = self.task_creator.create_tasks_from_prompt(
prompt=prompt,
background=background,
available_agents=self.agents if auto_assign else None,
stream=stream,
stream_callback=stream_callback,
)
if isinstance(result, tuple):
plan, cortex_tasks = result
self.tasks = cortex_tasks
return plan, cortex_tasks
else:
return result
[docs] def execute_with_task_creation(
self,
prompt: str,
inputs: dict[str, Any] | None = None,
background: str | None = None,
process: ProcessType | None = None,
stream: bool = False,
stream_callback: Callable[[Any], None] | None = None,
) -> Any:
"""Create tasks from a prompt and execute them immediately.
Combines task creation and execution into a single operation. First
generates a structured task plan from the given prompt, then executes
all tasks using the Cortex's kickoff mechanism. Optionally overrides
the default process type for this execution.
Args:
prompt: The natural language objective to accomplish. Will be
decomposed into tasks by the TaskCreator.
inputs: Optional dictionary of input values to interpolate into
agent and task templates before execution.
background: Optional approach or context string to guide the
task creation strategy.
process: Optional ProcessType override for this execution only.
The original process type is restored after execution.
If None, uses the DynamicCortex's default process.
stream: Whether to stream execution output for real-time feedback.
Defaults to False.
stream_callback: Optional callback function invoked with each
streamed chunk. Only used when ``stream=True``.
Returns:
The execution result from ``cortex.kickoff()``. Type depends on
whether streaming is enabled (CortexOutput or streaming result).
Side Effects:
- Creates and stores tasks on ``self.tasks``.
- Temporarily overrides ``self.process`` if ``process`` is provided.
"""
_plan, cortex_tasks = self.create_tasks_from_prompt(
prompt=prompt,
background=background,
auto_assign=True,
stream=False,
)
self.tasks = cortex_tasks
if process:
original_process = self.process
self.process = process
try:
if stream:
result = self.kickoff(inputs=inputs, use_streaming=True, stream_callback=stream_callback)
else:
result = self.kickoff(inputs=inputs)
finally:
if process:
self.process = original_process
return result
[docs] def execute_prompt(
self,
prompt: str,
agent: CortexAgent | str | None = None,
stream: bool = False,
stream_callback: Callable[[Any], None] | None = None,
streamer_buffer: StreamerBuffer | None = None,
) -> str | tuple[StreamerBuffer, threading.Thread]:
"""Execute a single prompt with a specific or auto-selected agent.
Creates a temporary task from the prompt, assigns it to the specified
agent (or auto-selects one), and executes it. Supports both blocking
and streaming execution modes.
Args:
prompt: The natural language prompt to execute as a task.
agent: The agent to execute the prompt. Accepts:
- A CortexAgent instance to use directly.
- A string matching an agent's role name (case-insensitive).
- None to use the first available agent in the Cortex.
stream: Whether to stream the response for real-time output.
Defaults to False (blocking execution).
stream_callback: Optional callback function invoked with each
streamed chunk. Used when ``stream=True``.
streamer_buffer: Optional pre-existing StreamerBuffer to use for
streaming. If None and ``stream=True``, a new buffer is created.
Returns:
If ``stream=False``: The agent's response as a string.
If ``stream=True``: Tuple of (StreamerBuffer, Thread) for
asynchronous consumption of the streaming response.
Raises:
ValueError: If no matching agent is found for the given
agent name or if no agents are configured.
"""
target_agent = None
if isinstance(agent, str):
for a in self.agents:
if a.role.lower() == agent.lower():
target_agent = a
break
elif isinstance(agent, CortexAgent):
target_agent = agent
else:
target_agent = self.agents[0] if self.agents else None
if not target_agent:
raise ValueError(f"No agent found for: {agent}")
task = DynamicTaskBuilder.from_prompt(prompt, target_agent)
self.tasks = [task]
if stream:
buffer_was_none = streamer_buffer is None
if streamer_buffer is None:
streamer_buffer = StreamerBuffer()
def execute_with_stream() -> None:
"""Execute the target agent's task in a background thread, writing output to the streamer buffer."""
try:
if stream_callback:
result = target_agent.execute_stream(task_description=prompt, callback=stream_callback)
else:
result = target_agent.execute(
task_description=prompt, streamer_buffer=streamer_buffer, use_thread=False
)
if hasattr(streamer_buffer, "result_holder"):
streamer_buffer.result_holder = [result]
finally:
if buffer_was_none:
streamer_buffer.close()
thread = threading.Thread(target=execute_with_stream, daemon=True)
thread.start()
return streamer_buffer, thread
else:
result = self.kickoff()
return result.raw_output
[docs] def execute_prompts(
self,
prompts: list[str] | dict[str, str],
process: ProcessType | None = None,
stream: bool = False,
stream_callback: Callable[[Any], None] | None = None,
streamer_buffer: StreamerBuffer | None = None,
) -> dict[str, str] | str | tuple[StreamerBuffer, threading.Thread]:
"""Execute multiple prompts with automatic agent assignment.
Supports two input formats: a list of prompts (chained with automatic
agent assignment) or a dictionary mapping agent roles to their prompts
(explicit assignment). Optionally overrides the process type.
Args:
prompts: Either:
- ``list[str]``: List of prompts to chain in sequence.
Agents are assigned round-robin from ``self.agents``.
Context is passed between tasks if process is SEQUENTIAL.
- ``dict[str, str]``: Mapping of agent role names to prompts.
Each prompt is assigned to the agent with the matching role.
process: Optional ProcessType override for this execution.
The original process type is restored after execution.
If None, uses the DynamicCortex's default.
stream: Whether to stream responses for real-time output.
Defaults to False.
stream_callback: Optional callback invoked with each streamed chunk.
streamer_buffer: Optional pre-existing StreamerBuffer for streaming.
If None and ``stream=True``, a new buffer is created.
Returns:
Depends on input format and streaming mode:
- ``stream=False`` with ``dict``: Dict mapping agent roles to response strings.
- ``stream=False`` with ``list``: The final output string (raw_output).
- ``stream=True``: Tuple of (StreamerBuffer, Thread) for async consumption.
Raises:
ValueError: If a dict key does not match any agent's role name.
"""
if isinstance(prompts, dict):
tasks = []
for agent_role, prompt in prompts.items():
agent = None
for a in self.agents:
if a.role.lower() == agent_role.lower():
agent = a
break
if not agent:
raise ValueError(f"Agent not found: {agent_role}")
task = DynamicTaskBuilder.from_prompt(prompt, agent)
tasks.append(task)
else:
tasks = DynamicTaskBuilder.chain_prompts(
prompts, self.agents, use_context=(process == ProcessType.SEQUENTIAL)
)
self.tasks = tasks
if process:
original_process = self.process
self.process = process
if stream:
if streamer_buffer is None:
streamer_buffer = StreamerBuffer()
buffer, thread = self.kickoff(use_streaming=True, stream_callback=stream_callback)
if process:
self.process = original_process
return buffer, thread
else:
result = self.kickoff(use_streaming=False)
if process:
self.process = original_process
if isinstance(prompts, dict):
outputs = {}
for i, (role, _prompt) in enumerate(prompts.items()):
if i < len(result.task_outputs):
outputs[role] = result.task_outputs[i].output
return outputs
else:
return result.raw_output
[docs] def create_ui(self):
"""Create and launch an interactive UI for prompt execution.
Launches a user interface application that allows interactive
prompt execution with this DynamicCortex instance. The UI provides
a convenient way to experiment with prompts and view agent responses.
Returns:
The result of launch_application, typically a UI application
instance or handle.
Note:
Requires the calute.ui module to be available. The UI runs
with this DynamicCortex instance as the execution backend.
"""
from calute.ui import launch_application
return launch_application(executor=self)
[docs]def create_dynamic_cortex(
agents: list[CortexAgent], llm: BaseLLM, process: ProcessType = ProcessType.SEQUENTIAL, **cortex_kwargs
) -> DynamicCortex:
"""Create a DynamicCortex instance for flexible prompt-based execution.
Factory function that creates a DynamicCortex pre-configured with the
given agents and LLM, starting with an empty task list. The resulting
instance supports dynamic task creation and execution from natural
language prompts at runtime.
Args:
agents: List of CortexAgent instances to register with the Cortex.
These agents will be available for task assignment and execution.
llm: BaseLLM instance to use for all language model interactions
across agents and the orchestrator.
process: Default execution strategy for the Cortex. Determines how
tasks are coordinated (e.g., sequential, parallel).
Defaults to ProcessType.SEQUENTIAL.
**cortex_kwargs: Additional keyword arguments passed directly to the
DynamicCortex constructor (e.g., ``verbose``, ``memory_config``,
``max_iterations``, ``cortex_name``).
Returns:
A new DynamicCortex instance with an empty task list, ready for
dynamic prompt execution via ``execute_prompt()``,
``execute_prompts()``, or ``execute_with_task_creation()``.
Example:
>>> cortex = create_dynamic_cortex(
... agents=[researcher, writer],
... llm=my_llm,
... process=ProcessType.SEQUENTIAL,
... verbose=True
... )
>>> result = cortex.execute_prompt("Summarize the report")
"""
return DynamicCortex(agents=agents, tasks=[], llm=llm, process=process, **cortex_kwargs)