Source code for calute.operators.pty

# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Persistent PTY session management for operator tooling.

Provides :class:`PTYSessionManager`, which creates and manages
pseudo-terminal (PTY) backed subprocess sessions.  Each session is
identified by a unique ``session_id`` and can be written to, read from,
interrupted, or closed independently.
"""

from __future__ import annotations

import os
import pty
import select
import signal
import subprocess
import time
import typing as tp
import uuid
from dataclasses import dataclass


[docs]@dataclass class PTYSession: """Live PTY-backed subprocess session. Attributes: session_id: Unique identifier for this session, generated by :class:`PTYSessionManager`. process: The underlying :class:`subprocess.Popen` object that owns the child process. master_fd: File descriptor for the master side of the PTY pair. Used for reading output and writing input. command: The original shell command that was launched. workdir: Absolute path of the working directory the session was started in. """ session_id: str process: subprocess.Popen[str] master_fd: int command: str workdir: str
[docs]class PTYSessionManager: """Manage persistent PTY-backed shell sessions. Maintains a registry of active :class:`PTYSession` instances keyed by ``session_id``. Sessions can be created, written to, listed, and closed. Attributes: _sessions: Internal mapping from ``session_id`` to the corresponding :class:`PTYSession`. """ def __init__(self) -> None: """Initialise the manager with an empty session registry.""" self._sessions: dict[str, PTYSession] = {}
[docs] def create_session( self, cmd: str, *, workdir: str | None = None, env: dict[str, str] | None = None, login: bool = True, yield_time_ms: int = 1000, max_output_chars: int = 4000, ) -> dict[str, tp.Any]: """Start a PTY session and return its initial output. Creates a new pseudo-terminal pair, spawns a shell process with the given command, and captures initial output up to the specified limits. Args: cmd: Shell command to execute inside the PTY. workdir: Working directory for the new session. When ``None``, the current process working directory is used. env: Additional environment variables merged on top of the current process environment. login: When ``True``, the shell is started with login semantics (``-l`` flag) for bash and zsh so that profile files are sourced. yield_time_ms: Maximum time in milliseconds to wait for initial output before returning. max_output_chars: Maximum number of characters to capture from the initial output. Returns: A dictionary containing: - ``session_id``: Unique identifier for the new session. - ``command``: The shell command that was launched. - ``workdir``: Resolved working directory path. - ``stdout``: Initial output captured from the PTY. - ``running``: ``True`` if the process is still alive. - ``exit_code``: Process exit code, or ``None`` if still running. """ resolved_workdir = os.path.abspath(workdir or os.getcwd()) master_fd, slave_fd = pty.openpty() shell = os.environ.get("SHELL", "/bin/sh") shell_args = [shell] if login and os.path.basename(shell).endswith("zsh"): shell_args.append("-l") elif login and os.path.basename(shell).endswith("bash"): shell_args.append("-l") shell_args.extend(["-c", cmd]) process = subprocess.Popen( shell_args, stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, cwd=resolved_workdir, env={**os.environ, **(env or {})}, text=True, start_new_session=True, close_fds=True, ) os.close(slave_fd) os.set_blocking(master_fd, False) session_id = f"pty_{uuid.uuid4().hex[:10]}" self._sessions[session_id] = PTYSession( session_id=session_id, process=process, master_fd=master_fd, command=cmd, workdir=resolved_workdir, ) output = self._read_output(session_id, yield_time_ms=yield_time_ms, max_output_chars=max_output_chars) return { "session_id": session_id, "command": cmd, "workdir": resolved_workdir, "stdout": output, "running": process.poll() is None, "exit_code": process.poll(), }
[docs] def write( self, session_id: str, *, chars: str = "", close_stdin: bool = False, interrupt: bool = False, yield_time_ms: int = 1000, max_output_chars: int = 4000, ) -> dict[str, tp.Any]: """Write to a PTY session and read back incremental output. Sends characters, an EOF signal, or an interrupt to the running process and then collects whatever output the process produces within the specified time window. Args: session_id: Identifier of the target PTY session. chars: Text to write to the process stdin. Include ``\\n`` to simulate pressing Enter. close_stdin: When ``True``, send an EOF (``Ctrl-D``) to the process after writing any provided characters. interrupt: When ``True``, send ``SIGINT`` to the process group before reading output. yield_time_ms: Maximum time in milliseconds to wait for output after sending input. max_output_chars: Maximum number of output characters to collect. Returns: A dictionary containing: - ``session_id``: The session identifier. - ``stdout``: Incremental output captured since the write. - ``running``: ``True`` if the process is still alive. - ``exit_code``: Process exit code, or ``None`` if still running. Raises: ValueError: If the ``session_id`` is not recognised. """ session = self._require_session(session_id) if interrupt and session.process.poll() is None: os.killpg(session.process.pid, signal.SIGINT) if chars: os.write(session.master_fd, chars.encode()) if close_stdin: try: os.write(session.master_fd, b"\x04") except OSError: pass output = self._read_output(session_id, yield_time_ms=yield_time_ms, max_output_chars=max_output_chars) return { "session_id": session_id, "stdout": output, "running": session.process.poll() is None, "exit_code": session.process.poll(), }
[docs] def close(self, session_id: str) -> dict[str, tp.Any]: """Terminate and forget a PTY session. Attempts a graceful ``SIGTERM`` first, escalating to ``SIGKILL`` if the process does not exit within two seconds. The master file descriptor is closed and the session is removed from the registry. Args: session_id: Identifier of the PTY session to close. Returns: A dictionary containing: - ``session_id``: The session identifier. - ``closed``: Always ``True``. - ``exit_code``: Final process exit code. Raises: ValueError: If the ``session_id`` is not recognised. """ session = self._require_session(session_id) if session.process.poll() is None: session.process.terminate() try: session.process.wait(timeout=2) except subprocess.TimeoutExpired: session.process.kill() try: os.close(session.master_fd) except OSError: pass self._sessions.pop(session_id, None) return {"session_id": session_id, "closed": True, "exit_code": session.process.poll()}
[docs] def list_sessions(self) -> list[dict[str, tp.Any]]: """Return summaries for current PTY sessions. Returns: A list of dictionaries, each containing the ``session_id``, original ``command``, ``workdir``, ``running`` status, and ``exit_code`` of a tracked session. """ return [ { "session_id": sid, "command": session.command, "workdir": session.workdir, "running": session.process.poll() is None, "exit_code": session.process.poll(), } for sid, session in sorted(self._sessions.items()) ]
def _read_output(self, session_id: str, *, yield_time_ms: int, max_output_chars: int) -> str: """Read available output from a PTY session within time and size limits. Uses :func:`select.select` to poll the master file descriptor and reads in chunks until either the time budget or character budget is exhausted, or no more data is available. Args: session_id: Identifier of the session to read from. yield_time_ms: Maximum wall-clock time in milliseconds to spend reading. max_output_chars: Maximum total characters to accumulate. Returns: The concatenated output text, decoded with ``errors="replace"`` to handle non-UTF-8 bytes gracefully. Raises: ValueError: If the ``session_id`` is not recognised. """ session = self._require_session(session_id) deadline = time.time() + max(yield_time_ms, 0) / 1000 chunks: list[str] = [] remaining = max_output_chars while remaining > 0: timeout = max(0.0, deadline - time.time()) ready, _, _ = select.select([session.master_fd], [], [], timeout) if not ready: break try: data = os.read(session.master_fd, min(remaining, 4096)) except BlockingIOError: break except OSError: break if not data: break text = data.decode(errors="replace") chunks.append(text) remaining -= len(text) if time.time() >= deadline: break return "".join(chunks) def _require_session(self, session_id: str) -> PTYSession: """Return a tracked PTY session or raise. Args: session_id: Identifier of the session to look up. Returns: The :class:`PTYSession` instance. Raises: ValueError: If no session with the given ID exists. """ if session_id not in self._sessions: raise ValueError(f"PTY session not found: {session_id}") return self._sessions[session_id]