# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Session storage backends and high-level session manager.
Provides an abstract SessionStore protocol with two concrete implementations
(in-memory and file-based) and a SessionManager for session lifecycle management.
"""
from __future__ import annotations
import json
import logging
import threading
import typing as tp
import uuid
from abc import ABC, abstractmethod
from datetime import UTC, datetime
from pathlib import Path
from .models import AgentTransitionRecord, SessionRecord, TurnRecord
logger = logging.getLogger(__name__)
[docs]class SessionStore(ABC):
"""Abstract base class for session storage backends.
All concrete session stores must implement the four CRUD methods defined
here: :meth:`save_session`, :meth:`load_session`, :meth:`list_sessions`,
and :meth:`delete_session`. Implementations are expected to be
thread-safe.
"""
[docs] @abstractmethod
def save_session(self, session: SessionRecord) -> None:
"""Persist a session record.
Args:
session: The session record to save.
"""
[docs] @abstractmethod
def load_session(self, session_id: str) -> SessionRecord | None:
"""Load a session record by ID.
Args:
session_id: The unique session identifier.
Returns:
The session record, or None if not found.
"""
[docs] @abstractmethod
def list_sessions(self, workspace_id: str | None = None) -> list[str]:
"""List session IDs, optionally filtered by workspace.
Args:
workspace_id: If provided, only return sessions in this workspace.
Returns:
List of session ID strings.
"""
[docs] @abstractmethod
def delete_session(self, session_id: str) -> bool:
"""Delete a session record.
Args:
session_id: The unique session identifier.
Returns:
True if a session was deleted, False if not found.
"""
[docs]class InMemorySessionStore(SessionStore):
"""Thread-safe in-memory session store backed by a dictionary.
Sessions are kept in a plain ``dict`` protected by a :class:`threading.Lock`.
Data does not survive process restarts; use :class:`FileSessionStore` for
persistent storage.
Attributes:
_sessions: Internal dictionary mapping session IDs to records.
_lock: Threading lock for safe concurrent access.
Example:
>>> store = InMemorySessionStore()
>>> store.list_sessions()
[]
"""
def __init__(self) -> None:
"""Initialise an empty in-memory session store with a threading lock."""
self._sessions: dict[str, SessionRecord] = {}
self._lock = threading.Lock()
[docs] def save_session(self, session: SessionRecord) -> None:
"""Save or overwrite a session record in memory.
If a session with the same ``session_id`` already exists, it is
silently replaced.
Args:
session: The session record to store.
"""
with self._lock:
self._sessions[session.session_id] = session
[docs] def load_session(self, session_id: str) -> SessionRecord | None:
"""Load a session record from memory by its identifier.
Args:
session_id: The unique session identifier to look up.
Returns:
The matching ``SessionRecord``, or ``None`` if no session with
the given ID exists.
"""
with self._lock:
return self._sessions.get(session_id)
[docs] def list_sessions(self, workspace_id: str | None = None) -> list[str]:
"""List session IDs stored in memory, optionally filtered by workspace.
Args:
workspace_id: When provided, only session IDs belonging to this
workspace are returned. When ``None``, all session IDs are
returned.
Returns:
A list of session ID strings.
"""
with self._lock:
if workspace_id is None:
return list(self._sessions.keys())
return [sid for sid, s in self._sessions.items() if s.workspace_id == workspace_id]
[docs] def delete_session(self, session_id: str) -> bool:
"""Delete a session record from memory.
Args:
session_id: The unique session identifier to delete.
Returns:
``True`` if a session was found and deleted, ``False`` if no
session with the given ID existed.
"""
with self._lock:
if session_id in self._sessions:
del self._sessions[session_id]
return True
return False
[docs]class FileSessionStore(SessionStore):
"""File-backed session store using JSON files.
Each session is persisted as an individual JSON file on disk. The
directory layout is determined by whether the session has a workspace ID:
Directory layout::
{base_dir}/{session_id}.json -- workspace_id is None
{base_dir}/{workspace_id}/{session_id}.json -- workspace_id is set
Thread-safe via an internal :class:`threading.Lock`.
Attributes:
_base_dir: Root directory for session JSON files.
_lock: Threading lock for safe concurrent access.
Example:
>>> import tempfile
>>> store = FileSessionStore(tempfile.mkdtemp())
>>> store.list_sessions()
[]
"""
def __init__(self, base_dir: str | Path) -> None:
"""Initialise the file session store.
Creates the *base_dir* directory (and any parents) if it does not
already exist.
Args:
base_dir: Root directory path where session JSON files will be
stored. Accepts a string or :class:`~pathlib.Path`.
"""
self._base_dir = Path(base_dir)
self._base_dir.mkdir(parents=True, exist_ok=True)
self._lock = threading.Lock()
def _session_path(self, session: SessionRecord) -> Path:
"""Resolve the file path for a given session record.
Sessions with a ``workspace_id`` are nested under a workspace
subdirectory; sessions without one are stored directly under
*base_dir*.
Args:
session: The session record whose file path is needed.
Returns:
The :class:`~pathlib.Path` where the session JSON file should
be written.
"""
if session.workspace_id:
directory = self._base_dir / session.workspace_id
else:
directory = self._base_dir
return directory / f"{session.session_id}.json"
def _find_session_path(self, session_id: str) -> Path | None:
"""Find the file path for a session ID by searching the directory tree.
First checks for a flat file at ``{base_dir}/{session_id}.json``, then
searches one level of workspace subdirectories.
Args:
session_id: The session identifier to locate on disk.
Returns:
The :class:`~pathlib.Path` to the JSON file if found, otherwise
``None``.
"""
flat = self._base_dir / f"{session_id}.json"
if flat.exists():
return flat
for child in self._base_dir.iterdir():
if child.is_dir():
candidate = child / f"{session_id}.json"
if candidate.exists():
return candidate
return None
[docs] def save_session(self, session: SessionRecord) -> None:
"""Save a session record as a JSON file on disk.
Creates the parent directory (including workspace subdirectories)
if it does not already exist. Existing files are overwritten.
Args:
session: The session record to persist.
"""
with self._lock:
path = self._session_path(session)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(session.to_dict(), indent=2), encoding="utf-8")
[docs] def load_session(self, session_id: str) -> SessionRecord | None:
"""Load a session record from a JSON file on disk.
Searches the directory tree for a matching JSON file, reads it,
and deserializes it into a ``SessionRecord``.
Args:
session_id: The unique session identifier to load.
Returns:
The deserialized ``SessionRecord``, or ``None`` if no matching
file is found.
"""
with self._lock:
path = self._find_session_path(session_id)
if path is None:
return None
data = json.loads(path.read_text(encoding="utf-8"))
return SessionRecord.from_dict(data)
[docs] def list_sessions(self, workspace_id: str | None = None) -> list[str]:
"""List session IDs by scanning JSON files on disk.
When *workspace_id* is provided, only the corresponding workspace
subdirectory is scanned. Otherwise, both the flat base directory
and all workspace subdirectories are scanned.
Args:
workspace_id: When provided, restricts the scan to the
``{base_dir}/{workspace_id}/`` subdirectory only. When
``None``, all JSON files across all directories are included.
Returns:
A list of session ID strings (JSON file stems).
"""
with self._lock:
results: list[str] = []
if workspace_id is not None:
search_dir = self._base_dir / workspace_id
if not search_dir.is_dir():
return []
for f in search_dir.glob("*.json"):
results.append(f.stem)
else:
for f in self._base_dir.glob("*.json"):
results.append(f.stem)
for child in self._base_dir.iterdir():
if child.is_dir():
for f in child.glob("*.json"):
results.append(f.stem)
return results
[docs] def delete_session(self, session_id: str) -> bool:
"""Delete a session JSON file from disk.
Searches the directory tree for the matching file and removes it.
Args:
session_id: The unique session identifier to delete.
Returns:
``True`` if the file was found and deleted, ``False`` if no
matching file existed.
"""
with self._lock:
path = self._find_session_path(session_id)
if path is None:
return False
path.unlink()
return True
[docs]class SessionManager:
"""High-level API for session lifecycle management.
Wraps a :class:`SessionStore` to provide convenient methods for creating,
recording turns, recording agent transitions, and ending sessions. Each
mutation automatically updates the ``updated_at`` timestamp and persists
the session back to the store.
Attributes:
_store: The underlying session store backend used for persistence.
Example:
>>> store = InMemorySessionStore()
>>> manager = SessionManager(store)
>>> session = manager.start_session(agent_id="default")
>>> session.agent_id
'default'
"""
def __init__(self, store: SessionStore) -> None:
"""Initialise the session manager with a storage backend.
Args:
store: The :class:`SessionStore` implementation to delegate
persistence operations to.
"""
self._store = store
@property
def store(self) -> SessionStore:
"""The underlying session store."""
return self._store
[docs] def start_session(
self,
workspace_id: str | None = None,
agent_id: str | None = None,
*,
session_id: str | None = None,
metadata: dict[str, tp.Any] | None = None,
) -> SessionRecord:
"""Create and persist a new session.
Args:
workspace_id: Workspace to associate the session with.
agent_id: Initial agent for the session.
session_id: Explicit session ID; auto-generated if omitted.
metadata: Optional metadata dict.
Returns:
The newly created SessionRecord.
"""
now = datetime.now(UTC).isoformat()
session = SessionRecord(
session_id=session_id or uuid.uuid4().hex,
workspace_id=workspace_id,
created_at=now,
updated_at=now,
agent_id=agent_id,
metadata=metadata or {},
)
self._store.save_session(session)
logger.debug("Started session %s", session.session_id)
return session
[docs] def record_turn(self, session_id: str, turn: TurnRecord) -> None:
"""Append a turn record to an existing session.
Args:
session_id: The session to append to.
turn: The turn record to add.
Raises:
ValueError: If the session does not exist.
"""
session = self._store.load_session(session_id)
if session is None:
raise ValueError(f"Session not found: {session_id}")
session.turns.append(turn)
session.updated_at = datetime.now(UTC).isoformat()
self._store.save_session(session)
[docs] def record_agent_transition(self, session_id: str, transition: AgentTransitionRecord) -> None:
"""Record an agent transition in a session.
Args:
session_id: The session to record the transition in.
transition: The transition record.
Raises:
ValueError: If the session does not exist.
"""
session = self._store.load_session(session_id)
if session is None:
raise ValueError(f"Session not found: {session_id}")
session.agent_transitions.append(transition)
session.updated_at = datetime.now(UTC).isoformat()
self._store.save_session(session)
[docs] def end_session(self, session_id: str) -> None:
"""Mark a session as ended by updating its timestamp.
Args:
session_id: The session to end.
Raises:
ValueError: If the session does not exist.
"""
session = self._store.load_session(session_id)
if session is None:
raise ValueError(f"Session not found: {session_id}")
session.updated_at = datetime.now(UTC).isoformat()
session.metadata["ended"] = True
self._store.save_session(session)
logger.debug("Ended session %s", session_id)
[docs] def get_session(self, session_id: str) -> SessionRecord | None:
"""Retrieve a session record.
Args:
session_id: The session to retrieve.
Returns:
The session record, or None if not found.
"""
return self._store.load_session(session_id)
[docs] def list_sessions(self, workspace_id: str | None = None) -> list[str]:
"""List session IDs.
Args:
workspace_id: If provided, filter by workspace.
Returns:
List of session ID strings.
"""
return self._store.list_sessions(workspace_id)