calute.audit.collector#

Audit event collectors for buffering, persisting, and fanning-out events.

Collectors implement a simple two-method protocol defined by AuditCollector:

  • emit() – accept a single audit event.

  • flush() – flush any buffered state to the backing store.

Three concrete implementations ship with this module:

  • InMemoryCollector – thread-safe in-memory list with filtering and snapshot capabilities.

  • JSONLSinkCollector – streams each event as a single newline-delimited JSON line to a file or IO stream.

  • CompositeCollector – fans out every event to an arbitrary number of child collectors.

Example

Wiring an in-memory collector and a JSONL file sink together:

from calute.audit.collector import (
    CompositeCollector,
    InMemoryCollector,
    JSONLSinkCollector,
)

mem = InMemoryCollector()
jsonl = JSONLSinkCollector("/tmp/audit.jsonl")
composite = CompositeCollector([mem, jsonl])
composite.emit(some_event)
composite.flush()
class calute.audit.collector.AuditCollector(*args, **kwargs)[source]#

Bases: Protocol

Protocol that all audit collectors must satisfy.

Any object that implements emit() and flush() is considered a valid audit collector. The protocol is decorated with runtime_checkable() so that isinstance checks work at runtime.

Example

Implementing a custom collector:

class MyCollector:
    def emit(self, event: AuditEvent) -> None:
        print(event.to_json())

    def flush(self) -> None:
        pass

assert isinstance(MyCollector(), AuditCollector)
emit(event: AuditEvent) None[source]#

Accept a single audit event and process or store it.

Parameters

event – The audit event to collect.

flush() None[source]#

Flush any buffered state to the collector’s backing store.

Implementations that write to IO streams or network sockets should ensure all pending data is written. In-memory collectors may treat this as a no-op.

class calute.audit.collector.CompositeCollector(collectors: collections.abc.Sequence[calute.audit.collector.AuditCollector | calute.audit.collector.InMemoryCollector | calute.audit.collector.JSONLSinkCollector | calute.audit.collector.CompositeCollector] | None = None)[source]#

Bases: object

Fans out every emitted event to multiple child collectors.

This collector acts as a multiplexer: each call to emit() forwards the event to every registered child, and flush() flushes all children in sequence. New children can be added at any time via add().

_collectors#

Internal list of child collectors that receive forwarded events.

Parameters

collectors – An optional initial sequence of child collectors. When None (the default), the composite starts empty.

Example

mem = InMemoryCollector()
jsonl = JSONLSinkCollector("/tmp/audit.jsonl")
composite = CompositeCollector([mem, jsonl])
composite.emit(some_event)
assert len(mem) == 1
add(collector: calute.audit.collector.AuditCollector | calute.audit.collector.InMemoryCollector | calute.audit.collector.JSONLSinkCollector) None[source]#

Append a collector to the fan-out list.

Parameters

collector – The child collector to register. It must implement the AuditCollector protocol (i.e. provide emit and flush methods).

emit(event: AuditEvent) None[source]#

Forward the event to every registered child collector.

Children are called in registration order. If a child raises an exception, subsequent children will not receive the event.

Parameters

event – The audit event to broadcast.

flush() None[source]#

Flush every registered child collector in registration order.

Each child’s flush() is called sequentially. If a child raises an exception, subsequent children will not be flushed.

class calute.audit.collector.InMemoryCollector[source]#

Bases: object

Thread-safe in-memory audit event buffer.

Events are stored in insertion order and can be retrieved, filtered, or cleared at any time. All public methods acquire an internal threading.Lock to guarantee safe concurrent access.

_lock#

Internal threading lock for thread-safe access.

_events#

Internal list storing collected audit events.

Example

collector = InMemoryCollector()
collector.emit(TurnStartEvent(agent_id="a1", prompt_preview="Hi"))
assert len(collector) == 1
events = collector.get_events()
collector.clear()
assert len(collector) == 0
clear() None[source]#

Discard all collected events from the internal buffer.

After this call, get_events() will return an empty list and len(self) will return 0.

emit(event: AuditEvent) None[source]#

Append an audit event to the in-memory buffer.

The event is appended under the internal lock to ensure thread-safe writes from concurrent producers.

Parameters

event – The audit event to store.

flush() None[source]#

No-op for in-memory storage.

Provided for protocol compatibility with AuditCollector.

get_events() list[calute.audit.events.AuditEvent][source]#

Return a shallow-copy snapshot of all collected events.

The returned list is a copy, so callers may mutate it without affecting the internal buffer.

Returns

A new list containing all events in

insertion order.

Return type

list[AuditEvent]

get_events_by_type(event_type: str) list[calute.audit.events.AuditEvent][source]#

Return events whose event_type matches the given string.

Parameters

event_type – The event type tag to filter on (e.g. "turn_start", "tool_call_attempt").

Returns

A filtered list of matching events in

insertion order.

Return type

list[AuditEvent]

class calute.audit.collector.JSONLSinkCollector(sink: Union[str, Path, IO[str]])[source]#

Bases: object

Writes each event as a single JSON line to a file or IO stream.

Each call to emit() serializes the event via to_dict() and writes it as a single newline-terminated JSON object. All writes are protected by an internal threading.Lock.

When a filesystem path is provided, the file is opened in append mode so that existing audit logs are preserved across restarts.

_lock#

Internal threading lock for thread-safe writes.

_owns_stream#

Whether this collector owns (and should close) the underlying IO stream.

_stream#

The writable IO stream that receives JSON lines.

Parameters

sink – Either a filesystem path (str or pathlib.Path) or an already-open writable text IO stream. When a path is given the file is opened in append mode with UTF-8 encoding.

Example

Writing to a file:

collector = JSONLSinkCollector("/tmp/audit.jsonl")
collector.emit(some_event)
collector.flush()
collector.close()

Writing to an in-memory stream:

import io
buf = io.StringIO()
collector = JSONLSinkCollector(buf)
collector.emit(some_event)
print(buf.getvalue())
close() None[source]#

Flush pending writes and close the stream if owned.

If this collector opened the stream itself (i.e. a path was passed to the constructor), the stream is closed after flushing. Externally-supplied streams are flushed but left open so that the caller retains control of the stream lifecycle.

emit(event: AuditEvent) None[source]#

Serialize the event to a JSON line and write it to the stream.

Non-serializable values in the event dict are coerced to strings via the default=str JSON encoder fallback.

Parameters

event – The audit event to serialize and write.

flush() None[source]#

Flush the underlying IO stream.

Ensures that all buffered data has been written to the stream’s backing store (e.g. the operating system’s file buffer).