calute.audit.collector#
Audit event collectors for buffering, persisting, and fanning-out events.
Collectors implement a simple two-method protocol defined by
AuditCollector:
Three concrete implementations ship with this module:
InMemoryCollector– thread-safe in-memory list with filtering and snapshot capabilities.
JSONLSinkCollector– streams each event as a single newline-delimited JSON line to a file or IO stream.
CompositeCollector– fans out every event to an arbitrary number of child collectors.
Example
Wiring an in-memory collector and a JSONL file sink together:
from calute.audit.collector import (
CompositeCollector,
InMemoryCollector,
JSONLSinkCollector,
)
mem = InMemoryCollector()
jsonl = JSONLSinkCollector("/tmp/audit.jsonl")
composite = CompositeCollector([mem, jsonl])
composite.emit(some_event)
composite.flush()
- class calute.audit.collector.AuditCollector(*args, **kwargs)[source]#
Bases:
ProtocolProtocol that all audit collectors must satisfy.
Any object that implements
emit()andflush()is considered a valid audit collector. The protocol is decorated withruntime_checkable()so thatisinstancechecks work at runtime.Example
Implementing a custom collector:
class MyCollector: def emit(self, event: AuditEvent) -> None: print(event.to_json()) def flush(self) -> None: pass assert isinstance(MyCollector(), AuditCollector)
- emit(event: AuditEvent) None[source]#
Accept a single audit event and process or store it.
- Parameters
event – The audit event to collect.
- class calute.audit.collector.CompositeCollector(collectors: collections.abc.Sequence[calute.audit.collector.AuditCollector | calute.audit.collector.InMemoryCollector | calute.audit.collector.JSONLSinkCollector | calute.audit.collector.CompositeCollector] | None = None)[source]#
Bases:
objectFans out every emitted event to multiple child collectors.
This collector acts as a multiplexer: each call to
emit()forwards the event to every registered child, andflush()flushes all children in sequence. New children can be added at any time viaadd().- _collectors#
Internal list of child collectors that receive forwarded events.
- Parameters
collectors – An optional initial sequence of child collectors. When
None(the default), the composite starts empty.
Example
mem = InMemoryCollector() jsonl = JSONLSinkCollector("/tmp/audit.jsonl") composite = CompositeCollector([mem, jsonl]) composite.emit(some_event) assert len(mem) == 1
- add(collector: calute.audit.collector.AuditCollector | calute.audit.collector.InMemoryCollector | calute.audit.collector.JSONLSinkCollector) None[source]#
Append a collector to the fan-out list.
- Parameters
collector – The child collector to register. It must implement the
AuditCollectorprotocol (i.e. provideemitandflushmethods).
- emit(event: AuditEvent) None[source]#
Forward the event to every registered child collector.
Children are called in registration order. If a child raises an exception, subsequent children will not receive the event.
- Parameters
event – The audit event to broadcast.
- class calute.audit.collector.InMemoryCollector[source]#
Bases:
objectThread-safe in-memory audit event buffer.
Events are stored in insertion order and can be retrieved, filtered, or cleared at any time. All public methods acquire an internal
threading.Lockto guarantee safe concurrent access.- _lock#
Internal threading lock for thread-safe access.
- _events#
Internal list storing collected audit events.
Example
collector = InMemoryCollector() collector.emit(TurnStartEvent(agent_id="a1", prompt_preview="Hi")) assert len(collector) == 1 events = collector.get_events() collector.clear() assert len(collector) == 0
- clear() None[source]#
Discard all collected events from the internal buffer.
After this call,
get_events()will return an empty list andlen(self)will return0.
- emit(event: AuditEvent) None[source]#
Append an audit event to the in-memory buffer.
The event is appended under the internal lock to ensure thread-safe writes from concurrent producers.
- Parameters
event – The audit event to store.
- flush() None[source]#
No-op for in-memory storage.
Provided for protocol compatibility with
AuditCollector.
- get_events() list[calute.audit.events.AuditEvent][source]#
Return a shallow-copy snapshot of all collected events.
The returned list is a copy, so callers may mutate it without affecting the internal buffer.
- Returns
- A new list containing all events in
insertion order.
- Return type
list[AuditEvent]
- get_events_by_type(event_type: str) list[calute.audit.events.AuditEvent][source]#
Return events whose
event_typematches the given string.- Parameters
event_type – The event type tag to filter on (e.g.
"turn_start","tool_call_attempt").- Returns
- A filtered list of matching events in
insertion order.
- Return type
list[AuditEvent]
- class calute.audit.collector.JSONLSinkCollector(sink: Union[str, Path, IO[str]])[source]#
Bases:
objectWrites each event as a single JSON line to a file or IO stream.
Each call to
emit()serializes the event viato_dict()and writes it as a single newline-terminated JSON object. All writes are protected by an internalthreading.Lock.When a filesystem path is provided, the file is opened in append mode so that existing audit logs are preserved across restarts.
- _lock#
Internal threading lock for thread-safe writes.
- _owns_stream#
Whether this collector owns (and should close) the underlying IO stream.
- _stream#
The writable IO stream that receives JSON lines.
- Parameters
sink – Either a filesystem path (
strorpathlib.Path) or an already-open writable text IO stream. When a path is given the file is opened in append mode with UTF-8 encoding.
Example
Writing to a file:
collector = JSONLSinkCollector("/tmp/audit.jsonl") collector.emit(some_event) collector.flush() collector.close()
Writing to an in-memory stream:
import io buf = io.StringIO() collector = JSONLSinkCollector(buf) collector.emit(some_event) print(buf.getvalue())
- close() None[source]#
Flush pending writes and close the stream if owned.
If this collector opened the stream itself (i.e. a path was passed to the constructor), the stream is closed after flushing. Externally-supplied streams are flushed but left open so that the caller retains control of the stream lifecycle.
- emit(event: AuditEvent) None[source]#
Serialize the event to a JSON line and write it to the stream.
Non-serializable values in the event dict are coerced to strings via the
default=strJSON encoder fallback.- Parameters
event – The audit event to serialize and write.