# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Persistent PTY session management for operator tooling.
Provides :class:`PTYSessionManager`, which creates and manages
pseudo-terminal (PTY) backed subprocess sessions. Each session is
identified by a unique ``session_id`` and can be written to, read from,
interrupted, or closed independently.
"""
from __future__ import annotations
import os
import pty
import select
import signal
import subprocess
import time
import typing as tp
import uuid
from dataclasses import dataclass
[docs]@dataclass
class PTYSession:
"""Live PTY-backed subprocess session.
Attributes:
session_id: Unique identifier for this session, generated by
:class:`PTYSessionManager`.
process: The underlying :class:`subprocess.Popen` object that
owns the child process.
master_fd: File descriptor for the master side of the PTY pair.
Used for reading output and writing input.
command: The original shell command that was launched.
workdir: Absolute path of the working directory the session was
started in.
"""
session_id: str
process: subprocess.Popen[str]
master_fd: int
command: str
workdir: str
[docs]class PTYSessionManager:
"""Manage persistent PTY-backed shell sessions.
Maintains a registry of active :class:`PTYSession` instances keyed
by ``session_id``. Sessions can be created, written to, listed,
and closed.
Attributes:
_sessions: Internal mapping from ``session_id`` to the
corresponding :class:`PTYSession`.
"""
def __init__(self) -> None:
"""Initialise the manager with an empty session registry."""
self._sessions: dict[str, PTYSession] = {}
[docs] def create_session(
self,
cmd: str,
*,
workdir: str | None = None,
env: dict[str, str] | None = None,
login: bool = True,
yield_time_ms: int = 1000,
max_output_chars: int = 4000,
) -> dict[str, tp.Any]:
"""Start a PTY session and return its initial output.
Creates a new pseudo-terminal pair, spawns a shell process with
the given command, and captures initial output up to the
specified limits.
Args:
cmd: Shell command to execute inside the PTY.
workdir: Working directory for the new session. When
``None``, the current process working directory is used.
env: Additional environment variables merged on top of the
current process environment.
login: When ``True``, the shell is started with login
semantics (``-l`` flag) for bash and zsh so that
profile files are sourced.
yield_time_ms: Maximum time in milliseconds to wait for
initial output before returning.
max_output_chars: Maximum number of characters to capture
from the initial output.
Returns:
A dictionary containing:
- ``session_id``: Unique identifier for the new session.
- ``command``: The shell command that was launched.
- ``workdir``: Resolved working directory path.
- ``stdout``: Initial output captured from the PTY.
- ``running``: ``True`` if the process is still alive.
- ``exit_code``: Process exit code, or ``None`` if still
running.
"""
resolved_workdir = os.path.abspath(workdir or os.getcwd())
master_fd, slave_fd = pty.openpty()
shell = os.environ.get("SHELL", "/bin/sh")
shell_args = [shell]
if login and os.path.basename(shell).endswith("zsh"):
shell_args.append("-l")
elif login and os.path.basename(shell).endswith("bash"):
shell_args.append("-l")
shell_args.extend(["-c", cmd])
process = subprocess.Popen(
shell_args,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
cwd=resolved_workdir,
env={**os.environ, **(env or {})},
text=True,
start_new_session=True,
close_fds=True,
)
os.close(slave_fd)
os.set_blocking(master_fd, False)
session_id = f"pty_{uuid.uuid4().hex[:10]}"
self._sessions[session_id] = PTYSession(
session_id=session_id,
process=process,
master_fd=master_fd,
command=cmd,
workdir=resolved_workdir,
)
output = self._read_output(session_id, yield_time_ms=yield_time_ms, max_output_chars=max_output_chars)
return {
"session_id": session_id,
"command": cmd,
"workdir": resolved_workdir,
"stdout": output,
"running": process.poll() is None,
"exit_code": process.poll(),
}
[docs] def write(
self,
session_id: str,
*,
chars: str = "",
close_stdin: bool = False,
interrupt: bool = False,
yield_time_ms: int = 1000,
max_output_chars: int = 4000,
) -> dict[str, tp.Any]:
"""Write to a PTY session and read back incremental output.
Sends characters, an EOF signal, or an interrupt to the running
process and then collects whatever output the process produces
within the specified time window.
Args:
session_id: Identifier of the target PTY session.
chars: Text to write to the process stdin. Include ``\\n``
to simulate pressing Enter.
close_stdin: When ``True``, send an EOF (``Ctrl-D``) to the
process after writing any provided characters.
interrupt: When ``True``, send ``SIGINT`` to the process
group before reading output.
yield_time_ms: Maximum time in milliseconds to wait for
output after sending input.
max_output_chars: Maximum number of output characters to
collect.
Returns:
A dictionary containing:
- ``session_id``: The session identifier.
- ``stdout``: Incremental output captured since the write.
- ``running``: ``True`` if the process is still alive.
- ``exit_code``: Process exit code, or ``None`` if still
running.
Raises:
ValueError: If the ``session_id`` is not recognised.
"""
session = self._require_session(session_id)
if interrupt and session.process.poll() is None:
os.killpg(session.process.pid, signal.SIGINT)
if chars:
os.write(session.master_fd, chars.encode())
if close_stdin:
try:
os.write(session.master_fd, b"\x04")
except OSError:
pass
output = self._read_output(session_id, yield_time_ms=yield_time_ms, max_output_chars=max_output_chars)
return {
"session_id": session_id,
"stdout": output,
"running": session.process.poll() is None,
"exit_code": session.process.poll(),
}
[docs] def close(self, session_id: str) -> dict[str, tp.Any]:
"""Terminate and forget a PTY session.
Attempts a graceful ``SIGTERM`` first, escalating to ``SIGKILL``
if the process does not exit within two seconds. The master
file descriptor is closed and the session is removed from the
registry.
Args:
session_id: Identifier of the PTY session to close.
Returns:
A dictionary containing:
- ``session_id``: The session identifier.
- ``closed``: Always ``True``.
- ``exit_code``: Final process exit code.
Raises:
ValueError: If the ``session_id`` is not recognised.
"""
session = self._require_session(session_id)
if session.process.poll() is None:
session.process.terminate()
try:
session.process.wait(timeout=2)
except subprocess.TimeoutExpired:
session.process.kill()
try:
os.close(session.master_fd)
except OSError:
pass
self._sessions.pop(session_id, None)
return {"session_id": session_id, "closed": True, "exit_code": session.process.poll()}
[docs] def list_sessions(self) -> list[dict[str, tp.Any]]:
"""Return summaries for current PTY sessions.
Returns:
A list of dictionaries, each containing the ``session_id``,
original ``command``, ``workdir``, ``running`` status, and
``exit_code`` of a tracked session.
"""
return [
{
"session_id": sid,
"command": session.command,
"workdir": session.workdir,
"running": session.process.poll() is None,
"exit_code": session.process.poll(),
}
for sid, session in sorted(self._sessions.items())
]
def _read_output(self, session_id: str, *, yield_time_ms: int, max_output_chars: int) -> str:
"""Read available output from a PTY session within time and size limits.
Uses :func:`select.select` to poll the master file descriptor
and reads in chunks until either the time budget or character
budget is exhausted, or no more data is available.
Args:
session_id: Identifier of the session to read from.
yield_time_ms: Maximum wall-clock time in milliseconds to
spend reading.
max_output_chars: Maximum total characters to accumulate.
Returns:
The concatenated output text, decoded with ``errors="replace"``
to handle non-UTF-8 bytes gracefully.
Raises:
ValueError: If the ``session_id`` is not recognised.
"""
session = self._require_session(session_id)
deadline = time.time() + max(yield_time_ms, 0) / 1000
chunks: list[str] = []
remaining = max_output_chars
while remaining > 0:
timeout = max(0.0, deadline - time.time())
ready, _, _ = select.select([session.master_fd], [], [], timeout)
if not ready:
break
try:
data = os.read(session.master_fd, min(remaining, 4096))
except BlockingIOError:
break
except OSError:
break
if not data:
break
text = data.decode(errors="replace")
chunks.append(text)
remaining -= len(text)
if time.time() >= deadline:
break
return "".join(chunks)
def _require_session(self, session_id: str) -> PTYSession:
"""Return a tracked PTY session or raise.
Args:
session_id: Identifier of the session to look up.
Returns:
The :class:`PTYSession` instance.
Raises:
ValueError: If no session with the given ID exists.
"""
if session_id not in self._sessions:
raise ValueError(f"PTY session not found: {session_id}")
return self._sessions[session_id]