Source code for calute.audit.emitter

# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""High-level audit emitter with convenience methods.

:class:`AuditEmitter` wraps an :class:`~calute.audit.collector.AuditCollector`
and exposes one dedicated ``emit_*`` method for every event type defined in
:mod:`calute.audit.events`. This keeps call-sites in the executor and
response loop clean -- callers never need to construct event dataclasses
directly.

The emitter is fully thread-safe: an internal :class:`threading.Lock`
serializes all writes to the underlying collector, and an optional
``session_id`` is automatically stamped onto every event.

Example:
    ::

        from calute.audit import AuditEmitter, InMemoryCollector

        collector = InMemoryCollector()
        emitter = AuditEmitter(collector=collector, session_id="sess-01")
        tid = emitter.emit_turn_start(agent_id="agent-1", prompt="Hello")
        emitter.emit_tool_call_attempt("web_search", args='{"q":"hi"}', turn_id=tid)
        emitter.emit_turn_end(agent_id="agent-1", turn_id=tid, fc_count=1)
        emitter.flush()
"""

from __future__ import annotations

import threading
import uuid
from typing import Any

from .collector import AuditCollector, InMemoryCollector
from .events import (
    AuditEvent,
    ErrorEvent,
    HookMutationEvent,
    SandboxDecisionEvent,
    ToolCallAttemptEvent,
    ToolCallCompleteEvent,
    ToolCallFailureEvent,
    ToolLoopBlockEvent,
    ToolLoopWarningEvent,
    ToolPolicyDecisionEvent,
    TurnEndEvent,
    TurnStartEvent,
)


def _generate_turn_id() -> str:
    """Generate a compact random turn identifier.

    Produces a 12-character hexadecimal string derived from a UUID4,
    providing sufficient uniqueness for correlating events within a
    single session while remaining compact enough for log readability.

    Returns:
        str: A 12-character lowercase hexadecimal string
            (e.g. ``"a1b2c3d4e5f6"``).
    """
    return uuid.uuid4().hex[:12]


[docs]class AuditEmitter: """Thread-safe emitter that converts method calls into typed audit events. The emitter provides a dedicated ``emit_*`` convenience method for each event type so that call-sites do not need to import or construct event dataclasses directly. Every event is automatically stamped with the emitter's ``session_id`` before being forwarded to the underlying collector. Attributes: _collector: The audit collector that receives forwarded events. _session_id: Optional session-level identifier stamped onto every event. _lock: Internal threading lock serializing writes. Args: collector: The audit collector that receives all emitted events. Defaults to a new :class:`InMemoryCollector` when ``None``. session_id: An optional session identifier that is stamped onto every event emitted through this instance. Example: :: from calute.audit import AuditEmitter, InMemoryCollector collector = InMemoryCollector() emitter = AuditEmitter(collector=collector, session_id="s1") tid = emitter.emit_turn_start(agent_id="a1", prompt="Hi") emitter.emit_turn_end(agent_id="a1", turn_id=tid) assert len(collector) == 2 """ def __init__( self, collector: AuditCollector | InMemoryCollector | None = None, session_id: str | None = None, ) -> None: """Initialize the audit emitter. Args: collector: The audit collector that receives all emitted events. When ``None``, an :class:`InMemoryCollector` is created automatically. session_id: An optional session identifier stamped onto every emitted event. """ self._collector: Any = collector if collector is not None else InMemoryCollector() self._session_id = session_id self._lock = threading.Lock() @property def collector(self) -> Any: """Return the underlying audit collector instance. Returns: The collector that this emitter forwards events to. """ return self._collector @property def session_id(self) -> str | None: """Return the session identifier stamped onto every event. Returns: str | None: The session ID, or ``None`` if not set. """ return self._session_id def _emit(self, event: AuditEvent) -> None: """Stamp the session ID and forward the event to the collector. This is the internal dispatch point used by all public ``emit_*`` methods. It sets the event's ``session_id`` field and writes to the collector under the internal lock. Args: event: The fully-constructed audit event to emit. """ event.session_id = self._session_id with self._lock: self._collector.emit(event)
[docs] def emit_turn_start( self, agent_id: str | None = None, turn_id: str | None = None, prompt: str = "", ) -> str: """Emit a :class:`TurnStartEvent` and return the turn identifier. If no ``turn_id`` is provided, a new random 12-hex-character ID is generated via :func:`_generate_turn_id`. The prompt is truncated to 200 characters for the ``prompt_preview`` field. Args: agent_id: Optional identifier of the agent starting the turn. turn_id: Optional pre-assigned turn identifier. When ``None``, a new one is generated automatically. prompt: The full user prompt text. Only the first 200 characters are stored in the event. Returns: str: The turn identifier (either the provided ``turn_id`` or the auto-generated one). """ tid = turn_id or _generate_turn_id() self._emit( TurnStartEvent( agent_id=agent_id, turn_id=tid, prompt_preview=prompt[:200] if prompt else "", ) ) return tid
[docs] def emit_turn_end( self, agent_id: str | None = None, turn_id: str | None = None, content: str = "", fc_count: int = 0, ) -> None: """Emit a :class:`TurnEndEvent` recording turn completion details. The assistant's response content is truncated to 200 characters for the ``content_preview`` field. Args: agent_id: Optional identifier of the agent that completed the turn. turn_id: Optional turn identifier correlating this end event with its corresponding :class:`TurnStartEvent`. content: The full assistant response text. Only the first 200 characters are stored in the event. fc_count: The total number of function / tool calls that were executed during this turn. """ self._emit( TurnEndEvent( agent_id=agent_id, turn_id=turn_id, content_preview=content[:200] if content else "", function_calls_count=fc_count, ) )
[docs] def emit_tool_call_attempt( self, tool_name: str, args: str = "", agent_id: str | None = None, turn_id: str | None = None, ) -> None: """Emit a :class:`ToolCallAttemptEvent` before a tool is dispatched. Should be called immediately before the tool executor runs the tool, so that the audit trail records the intent even if the tool subsequently fails. Args: tool_name: The registered name of the tool about to be invoked. args: A string representation of the tool arguments. Only the first 200 characters are stored in the event. agent_id: Optional identifier of the agent making the call. turn_id: Optional turn identifier for event correlation. """ self._emit( ToolCallAttemptEvent( tool_name=tool_name, arguments_preview=args[:200] if args else "", agent_id=agent_id, turn_id=turn_id, ) )
[docs] def emit_tool_call_complete( self, tool_name: str, status: str = "success", duration_ms: float = 0.0, result: str = "", agent_id: str | None = None, turn_id: str | None = None, ) -> None: """Emit a :class:`ToolCallCompleteEvent` after a tool returns. Should be called immediately after a successful tool execution. The result text is truncated to 200 characters for the ``result_preview`` field. Args: tool_name: The registered name of the tool that completed. status: Completion status string, typically ``"success"``. duration_ms: Wall-clock execution time of the tool in milliseconds. result: The full string representation of the tool result. Only the first 200 characters are stored. agent_id: Optional identifier of the agent that invoked the tool. turn_id: Optional turn identifier for event correlation. """ self._emit( ToolCallCompleteEvent( tool_name=tool_name, status=status, duration_ms=duration_ms, result_preview=result[:200] if result else "", agent_id=agent_id, turn_id=turn_id, ) )
[docs] def emit_tool_call_failure( self, tool_name: str, error_type: str = "", error_msg: str = "", agent_id: str | None = None, turn_id: str | None = None, ) -> None: """Emit a :class:`ToolCallFailureEvent` when a tool raises an error. This should be called in the exception handler of the tool executor, capturing the exception class name and message for the audit trail. Args: tool_name: The registered name of the tool that failed. error_type: A short error classifier, typically the exception class name (e.g. ``"ValueError"``). error_msg: The human-readable error description or stringified exception message. agent_id: Optional identifier of the agent that invoked the tool. turn_id: Optional turn identifier for event correlation. """ self._emit( ToolCallFailureEvent( tool_name=tool_name, error_type=error_type, error_message=error_msg, agent_id=agent_id, turn_id=turn_id, ) )
[docs] def emit_tool_policy_decision( self, tool_name: str, agent_id: str | None = None, action: str = "", source: str = "", turn_id: str | None = None, ) -> None: """Emit a :class:`ToolPolicyDecisionEvent` for a policy verdict. Records whether the tool-policy engine allowed or denied a given tool invocation, along with the policy rule that produced the decision. Args: tool_name: The registered name of the tool under evaluation. agent_id: Optional identifier of the agent requesting the tool call. action: The policy verdict, typically ``"allow"`` or ``"deny"``. source: An identifier for the policy rule or configuration that produced the decision. turn_id: Optional turn identifier for event correlation. """ self._emit( ToolPolicyDecisionEvent( tool_name=tool_name, agent_id=agent_id, action=action, policy_source=source, turn_id=turn_id, ) )
[docs] def emit_tool_loop_warning( self, tool_name: str, pattern: str = "", severity: str = "warning", count: int = 0, agent_id: str | None = None, turn_id: str | None = None, ) -> None: """Emit a :class:`ToolLoopWarningEvent` for a suspected call loop. This is a soft warning -- the tool call is still dispatched, but the event alerts downstream consumers that the agent may be stuck in a repetitive invocation pattern. Args: tool_name: The registered name of the tool involved in the suspected loop. pattern: A short descriptor of the detected repetitive pattern (e.g. ``"same_args_3x"``). severity: The severity qualifier assigned by the loop detector (e.g. ``"warning"``, ``"critical"``). count: The number of consecutive or recent calls that matched the loop pattern. agent_id: Optional identifier of the agent exhibiting the loop behavior. turn_id: Optional turn identifier for event correlation. """ self._emit( ToolLoopWarningEvent( tool_name=tool_name, pattern=pattern, severity_level=severity, call_count=count, agent_id=agent_id, turn_id=turn_id, ) )
[docs] def emit_tool_loop_block( self, tool_name: str, pattern: str = "", count: int = 0, agent_id: str | None = None, turn_id: str | None = None, ) -> None: """Emit a :class:`ToolLoopBlockEvent` when a loop causes a hard block. Unlike :meth:`emit_tool_loop_warning`, this indicates that the loop detector has **prevented** the tool call from executing. Args: tool_name: The registered name of the tool that was blocked. pattern: A short descriptor of the repetitive pattern that triggered the block (e.g. ``"same_args_5x"``). count: The number of consecutive or recent calls that matched the loop pattern before the block was imposed. agent_id: Optional identifier of the agent exhibiting the loop behavior. turn_id: Optional turn identifier for event correlation. """ self._emit( ToolLoopBlockEvent( tool_name=tool_name, pattern=pattern, call_count=count, agent_id=agent_id, turn_id=turn_id, ) )
[docs] def emit_sandbox_decision( self, tool_name: str, context: str = "", reason: str = "", backend_type: str = "", agent_id: str | None = None, turn_id: str | None = None, ) -> None: """Emit a :class:`SandboxDecisionEvent` for a sandbox routing choice. Records which execution backend the sandbox router selected for a given tool call, along with the context and reasoning behind the decision. Args: tool_name: The registered name of the tool being routed. context: A short description of the execution context that influenced the routing decision. reason: A human-readable explanation of why the particular backend was chosen. backend_type: The identifier of the selected sandbox backend (e.g. ``"local"``, ``"docker"``, ``"subprocess"``). agent_id: Optional identifier of the agent whose tool call is being routed. turn_id: Optional turn identifier for event correlation. """ self._emit( SandboxDecisionEvent( tool_name=tool_name, context=context, reason=reason, backend_type=backend_type, agent_id=agent_id, turn_id=turn_id, ) )
[docs] def emit_hook_mutation( self, hook_name: str, tool_name: str = "", agent_id: str | None = None, field: str = "", turn_id: str | None = None, ) -> None: """Emit a :class:`HookMutationEvent` when a hook alters data. Records which hook mutated which field of a tool call or its result, providing a full audit trail for data transformations applied outside the tool's own logic. Args: hook_name: The identifier of the hook that performed the mutation (e.g. ``"sanitize_output"``). tool_name: The registered name of the tool whose call or result was mutated. agent_id: Optional identifier of the agent whose pipeline includes the hook. field: The specific field that was changed (e.g. ``"arguments"``, ``"result"``). turn_id: Optional turn identifier for event correlation. """ self._emit( HookMutationEvent( hook_name=hook_name, tool_name=tool_name, agent_id=agent_id, mutated_field=field, turn_id=turn_id, ) )
[docs] def emit_error( self, error_type: str = "", error_msg: str = "", context: str = "", agent_id: str | None = None, turn_id: str | None = None, ) -> None: """Emit a generic :class:`ErrorEvent` for non-tool-specific errors. Use this for infrastructure-level or agent-level errors that cannot be attributed to a single tool invocation (e.g. LLM API failures, serialization errors). For tool-specific failures, prefer :meth:`emit_tool_call_failure`. Args: error_type: A short error classifier, typically the exception class name (e.g. ``"RuntimeError"``). error_msg: The human-readable error description or stringified exception message. context: Additional context about where or why the error occurred (e.g. ``"during response parsing"``). agent_id: Optional identifier of the agent that encountered the error. turn_id: Optional turn identifier for event correlation. """ self._emit( ErrorEvent( error_type=error_type, error_message=error_msg, error_context=context, agent_id=agent_id, turn_id=turn_id, ) )
[docs] def flush(self) -> None: """Flush the underlying collector's buffered state. Acquires the internal lock and delegates to the collector's :meth:`~calute.audit.collector.AuditCollector.flush` method, ensuring that all previously emitted events have been fully persisted or transmitted. """ with self._lock: self._collector.flush()