Source code for calute.audit.collector

# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Audit event collectors for buffering, persisting, and fanning-out events.

Collectors implement a simple two-method protocol defined by
:class:`AuditCollector`:

    * :meth:`~AuditCollector.emit` -- accept a single audit event.
    * :meth:`~AuditCollector.flush` -- flush any buffered state to the
      backing store.

Three concrete implementations ship with this module:

    * :class:`InMemoryCollector` -- thread-safe in-memory list with
      filtering and snapshot capabilities.
    * :class:`JSONLSinkCollector` -- streams each event as a single
      newline-delimited JSON line to a file or IO stream.
    * :class:`CompositeCollector` -- fans out every event to an
      arbitrary number of child collectors.

Example:
    Wiring an in-memory collector and a JSONL file sink together::

        from calute.audit.collector import (
            CompositeCollector,
            InMemoryCollector,
            JSONLSinkCollector,
        )

        mem = InMemoryCollector()
        jsonl = JSONLSinkCollector("/tmp/audit.jsonl")
        composite = CompositeCollector([mem, jsonl])
        composite.emit(some_event)
        composite.flush()
"""

from __future__ import annotations

import json
import threading
from collections.abc import Sequence
from pathlib import Path
from typing import IO, Any, Protocol, runtime_checkable

from .events import AuditEvent


[docs]@runtime_checkable class AuditCollector(Protocol): """Protocol that all audit collectors must satisfy. Any object that implements :meth:`emit` and :meth:`flush` is considered a valid audit collector. The protocol is decorated with :func:`~typing.runtime_checkable` so that ``isinstance`` checks work at runtime. Example: Implementing a custom collector:: class MyCollector: def emit(self, event: AuditEvent) -> None: print(event.to_json()) def flush(self) -> None: pass assert isinstance(MyCollector(), AuditCollector) """
[docs] def emit(self, event: AuditEvent) -> None: """Accept a single audit event and process or store it. Args: event: The audit event to collect. """ ...
[docs] def flush(self) -> None: """Flush any buffered state to the collector's backing store. Implementations that write to IO streams or network sockets should ensure all pending data is written. In-memory collectors may treat this as a no-op. """ ...
[docs]class InMemoryCollector: """Thread-safe in-memory audit event buffer. Events are stored in insertion order and can be retrieved, filtered, or cleared at any time. All public methods acquire an internal :class:`threading.Lock` to guarantee safe concurrent access. Attributes: _lock: Internal threading lock for thread-safe access. _events: Internal list storing collected audit events. Example: :: collector = InMemoryCollector() collector.emit(TurnStartEvent(agent_id="a1", prompt_preview="Hi")) assert len(collector) == 1 events = collector.get_events() collector.clear() assert len(collector) == 0 """ def __init__(self) -> None: """Initialize an empty in-memory collector with a threading lock.""" self._lock = threading.Lock() self._events: list[AuditEvent] = []
[docs] def emit(self, event: AuditEvent) -> None: """Append an audit event to the in-memory buffer. The event is appended under the internal lock to ensure thread-safe writes from concurrent producers. Args: event: The audit event to store. """ with self._lock: self._events.append(event)
[docs] def flush(self) -> None: """No-op for in-memory storage. Provided for protocol compatibility with :class:`AuditCollector`. """
[docs] def get_events(self) -> list[AuditEvent]: """Return a shallow-copy snapshot of all collected events. The returned list is a copy, so callers may mutate it without affecting the internal buffer. Returns: list[AuditEvent]: A new list containing all events in insertion order. """ with self._lock: return list(self._events)
[docs] def get_events_by_type(self, event_type: str) -> list[AuditEvent]: """Return events whose ``event_type`` matches the given string. Args: event_type: The event type tag to filter on (e.g. ``"turn_start"``, ``"tool_call_attempt"``). Returns: list[AuditEvent]: A filtered list of matching events in insertion order. """ with self._lock: return [e for e in self._events if e.event_type == event_type]
[docs] def clear(self) -> None: """Discard all collected events from the internal buffer. After this call, :meth:`get_events` will return an empty list and ``len(self)`` will return ``0``. """ with self._lock: self._events.clear()
def __len__(self) -> int: """Return the number of events currently in the buffer. Returns: int: The event count. """ with self._lock: return len(self._events)
[docs]class JSONLSinkCollector: """Writes each event as a single JSON line to a file or IO stream. Each call to :meth:`emit` serializes the event via :meth:`~calute.audit.events.AuditEvent.to_dict` and writes it as a single newline-terminated JSON object. All writes are protected by an internal :class:`threading.Lock`. When a filesystem path is provided, the file is opened in **append** mode so that existing audit logs are preserved across restarts. Attributes: _lock: Internal threading lock for thread-safe writes. _owns_stream: Whether this collector owns (and should close) the underlying IO stream. _stream: The writable IO stream that receives JSON lines. Args: sink: Either a filesystem path (``str`` or ``pathlib.Path``) or an already-open writable text IO stream. When a path is given the file is opened in append mode with UTF-8 encoding. Example: Writing to a file:: collector = JSONLSinkCollector("/tmp/audit.jsonl") collector.emit(some_event) collector.flush() collector.close() Writing to an in-memory stream:: import io buf = io.StringIO() collector = JSONLSinkCollector(buf) collector.emit(some_event) print(buf.getvalue()) """ def __init__(self, sink: str | Path | IO[str]) -> None: """Initialize the JSONL sink collector. Args: sink: A filesystem path (``str`` or ``pathlib.Path``) to open in append mode, or an already-open writable text IO stream. """ self._lock = threading.Lock() self._owns_stream = False if isinstance(sink, str | Path): self._stream: IO[str] = open(sink, "a", encoding="utf-8") self._owns_stream = True else: self._stream = sink
[docs] def emit(self, event: AuditEvent) -> None: """Serialize the event to a JSON line and write it to the stream. Non-serializable values in the event dict are coerced to strings via the ``default=str`` JSON encoder fallback. Args: event: The audit event to serialize and write. """ line = json.dumps(event.to_dict(), default=str) + "\n" with self._lock: self._stream.write(line)
[docs] def flush(self) -> None: """Flush the underlying IO stream. Ensures that all buffered data has been written to the stream's backing store (e.g. the operating system's file buffer). """ with self._lock: self._stream.flush()
[docs] def close(self) -> None: """Flush pending writes and close the stream if owned. If this collector opened the stream itself (i.e. a path was passed to the constructor), the stream is closed after flushing. Externally-supplied streams are flushed but left open so that the caller retains control of the stream lifecycle. """ self.flush() if self._owns_stream: self._stream.close()
[docs]class CompositeCollector: """Fans out every emitted event to multiple child collectors. This collector acts as a multiplexer: each call to :meth:`emit` forwards the event to every registered child, and :meth:`flush` flushes all children in sequence. New children can be added at any time via :meth:`add`. Attributes: _collectors: Internal list of child collectors that receive forwarded events. Args: collectors: An optional initial sequence of child collectors. When ``None`` (the default), the composite starts empty. Example: :: mem = InMemoryCollector() jsonl = JSONLSinkCollector("/tmp/audit.jsonl") composite = CompositeCollector([mem, jsonl]) composite.emit(some_event) assert len(mem) == 1 """ def __init__( self, collectors: Sequence[AuditCollector | InMemoryCollector | JSONLSinkCollector | CompositeCollector] | None = None, ) -> None: """Initialize the composite collector with optional children. Args: collectors: An optional sequence of child collectors to register immediately. Defaults to an empty list when ``None``. """ self._collectors: list[Any] = list(collectors or [])
[docs] def add(self, collector: AuditCollector | InMemoryCollector | JSONLSinkCollector) -> None: """Append a collector to the fan-out list. Args: collector: The child collector to register. It must implement the :class:`AuditCollector` protocol (i.e. provide ``emit`` and ``flush`` methods). """ self._collectors.append(collector)
[docs] def emit(self, event: AuditEvent) -> None: """Forward the event to every registered child collector. Children are called in registration order. If a child raises an exception, subsequent children will **not** receive the event. Args: event: The audit event to broadcast. """ for collector in self._collectors: collector.emit(event)
[docs] def flush(self) -> None: """Flush every registered child collector in registration order. Each child's :meth:`flush` is called sequentially. If a child raises an exception, subsequent children will **not** be flushed. """ for collector in self._collectors: collector.flush()