calute.api_server.cortex_completion_service#

Cortex completion service for handling multi-agent orchestration via API.

This module provides the Cortex-based completion service infrastructure, including: - Multi-agent orchestration for complex task execution - Task mode with dynamic task creation and agent assignment - Instruction mode for direct prompt execution - Streaming and non-streaming response generation - Integration with DynamicCortex for sophisticated agent workflows

The service supports both sequential and parallel execution strategies, with configurable process types and real-time streaming of execution events.

class calute.api_server.cortex_completion_service.CortexCompletionService(llm: BaseLLM, agents: list[CortexAgent] | None = None, use_universal_agent: bool = True, verbose: bool = True)[source]#

Bases: object

Service for handling Cortex-based chat completions with multi-agent orchestration.

Provides advanced multi-agent orchestration capabilities through the Cortex system. Supports two primary execution modes:

  • Task mode: Uses TaskCreator to dynamically decompose prompts into discrete tasks, assign agents to each task, and execute them through DynamicCortex. Activated when the model name contains "task".

  • Instruction mode: Directly executes prompts through DynamicCortex without task decomposition. Used as the default mode.

Both modes support sequential, parallel, and hierarchical process types, as well as streaming and non-streaming response generation.

llm#

The BaseLLM instance used for agent interactions.

verbose#

Flag indicating whether verbose logging is enabled.

logger#

Logger instance for verbose output (None if disabled).

agents#

List of CortexAgent instances available for task execution.

universal_agent#

UniversalAgent instance for fallback handling, or None if disabled.

task_creator#

TaskCreator instance for dynamic task generation with automatic agent assignment.

Example

>>> from calute.api_server.cortex_completion_service import CortexCompletionService
>>> service = CortexCompletionService(llm=my_llm, agents=[agent1, agent2])
async create_completion(messages: MessagesHistory, request: ChatCompletionRequest) ChatCompletionResponse[source]#

Create a non-streaming Cortex completion.

Extracts the task configuration from the request, determines the latest user prompt, and executes either task mode or instruction mode accordingly. The result is wrapped in a ChatCompletionResponse with word-count-based usage estimates.

Parameters
  • messages – The MessagesHistory containing the conversation context to process.

  • request – The ChatCompletionRequest containing the model name (used to determine execution mode) and optional metadata for fine-grained configuration.

Returns

A ChatCompletionResponse with a single choice containing the Cortex execution result as assistant content, estimated usage information (based on word counts), and finish reason "stop". The model field defaults to "cortex" if no model name is specified in the request.

async create_streaming_completion(messages: MessagesHistory, request: ChatCompletionRequest) AsyncIterator[str][source]#

Create a streaming Cortex completion with real-time event updates.

Executes the Cortex system in a background daemon thread, reading events from a StreamerBuffer and yielding them as SSE-formatted strings. The stream includes multiple event types:

  • StreamChunk: Content delta with optional tool call information.

  • FunctionDetection: Notification that functions are being detected.

  • FunctionCallsExtracted: List of functions identified for execution.

  • FunctionExecutionStart: Start signal for a specific function.

  • FunctionExecutionComplete: Completion signal with result or error.

  • AgentSwitch: Notification of agent delegation with reason.

  • ReinvokeSignal: Signal that the agent is being reinvoked.

  • Completion: Final task completion signal with execution stats.

Each event is serialized as a ChatCompletionStreamResponse JSON object with optional metadata. The stream terminates with a final finish_reason="stop" chunk followed by "data: [DONE]".

Parameters
  • messages – The MessagesHistory containing the conversation context to process.

  • request – The ChatCompletionRequest containing the model name (used to determine execution mode) and optional metadata for fine-grained configuration.

Yields

SSE-formatted strings ("data: {json}\n\n") containing streaming response chunks. Each chunk may include content deltas and metadata about execution events. The stream ends with a "data: [DONE]\n\n" sentinel.

calute.api_server.cortex_completion_service.DONE_TAG = '/["DONE"]/'#

Sentinel tag used to signal the end of a Cortex streaming response.