calute.core.streamer_buffer#

Streaming buffer utilities for response handling.

This module provides buffering infrastructure for streaming responses in Calute, including: - Thread-safe queue-based buffering for streaming data - Support for both synchronous and asynchronous consumption - Graceful shutdown and cleanup mechanisms - Debug logging for troubleshooting streaming issues

The StreamerBuffer class enables efficient streaming of responses from background threads or async tasks to the main consumer, with proper lifecycle management and error handling.

class calute.core.streamer_buffer.StreamerBuffer(maxsize: int = 0)[source]#

Bases: object

Thread-safe buffer for streaming responses with put/get interface.

Provides a queue-based buffering mechanism for streaming responses from background threads or async tasks. Supports both blocking and non-blocking access patterns, and handles graceful shutdown via a kill signal.

thread#

Optional thread running the streaming producer.

task#

Optional asyncio task for async streaming operations.

result_holder#

Optional list to store the final result.

exception_holder#

Optional list to store exceptions during streaming.

get_result#

Optional callable to retrieve the final result synchronously.

aget_result#

Optional callable to retrieve the final result asynchronously.

close() None[source]#

Permanently close the buffer.

Marks the buffer as closed and sends a kill signal to terminate any active stream consumers. This operation is thread-safe and idempotent - calling close() multiple times has no additional effect.

Returns

None

property closed: bool#

Check if buffer is closed.

Returns

True if the buffer has been permanently closed, False otherwise.

get(timeout: float | None = None) calute.types.function_execution_types.StreamChunk | calute.types.function_execution_types.FunctionDetection | calute.types.function_execution_types.FunctionCallsExtracted | calute.types.function_execution_types.FunctionExecutionStart | calute.types.function_execution_types.FunctionExecutionComplete | calute.types.function_execution_types.AgentSwitch | calute.types.function_execution_types.Completion | calute.types.function_execution_types.ReinvokeSignal | None[source]#

Get an item from the buffer.

Retrieves and removes an item from the internal queue. Blocks until an item is available or the timeout expires.

Parameters

timeout – Timeout in seconds (None for blocking indefinitely).

Returns

The streaming response item, or None if the timeout expired without an item becoming available.

maybe_finish(arg: Any) None[source]#

Conditionally close the buffer based on completion state.

Closes the buffer if the provided argument is None and a Completion item has been previously encountered during streaming. This enables automatic cleanup when the stream has naturally completed.

Parameters

arg – The argument to check. If None and a completion was seen, the buffer will be closed.

Returns

None

put(item: calute.types.function_execution_types.StreamChunk | calute.types.function_execution_types.FunctionDetection | calute.types.function_execution_types.FunctionCallsExtracted | calute.types.function_execution_types.FunctionExecutionStart | calute.types.function_execution_types.FunctionExecutionComplete | calute.types.function_execution_types.AgentSwitch | calute.types.function_execution_types.Completion | calute.types.function_execution_types.ReinvokeSignal | None) None[source]#

Put an item into the buffer.

Adds a streaming response item to the internal queue. If the buffer is closed, the item will be dropped (with a warning if debug mode is enabled).

Parameters

item – The streaming response to buffer (None signals end of current stream).

Returns

None

stream() Generator[calute.types.function_execution_types.StreamChunk | calute.types.function_execution_types.FunctionDetection | calute.types.function_execution_types.FunctionCallsExtracted | calute.types.function_execution_types.FunctionExecutionStart | calute.types.function_execution_types.FunctionExecutionComplete | calute.types.function_execution_types.AgentSwitch | calute.types.function_execution_types.Completion | calute.types.function_execution_types.ReinvokeSignal, None, None][source]#

Generator that yields all items from buffer until terminated.

Continuously retrieves items from the buffer and yields them until a kill signal (KILL_TAG) is received. Automatically tracks when a Completion item is encountered to support graceful shutdown.

Yields

StreamingResponseType – Streaming response items from the buffer.