# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""System and environment tools for interacting with the operating system.
This module provides a collection of system tools for agents to interact
with the operating system and environment. It includes:
- System information retrieval (OS, CPU, memory, disk, network)
- Process management and monitoring
- File system operations (copy, move, delete, search)
- Environment variable management
- Temporary file and directory handling
All tools provide comprehensive error handling and return structured
dictionaries with operation results and error information.
Example:
>>> info = SystemInfo.static_call(info_type="cpu")
>>> print(f"CPU cores: {info['cpu']['logical_cores']}")
"""
from __future__ import annotations
import os
import platform
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Any
import psutil # type:ignore
from ..types import AgentBaseFn
[docs]class SystemInfo(AgentBaseFn):
"""Get system and environment information.
Provides comprehensive system information retrieval capabilities
including operating system details, CPU metrics, memory usage,
disk space, and network interface information.
Uses the psutil library for cross-platform system monitoring
and the platform module for OS-level information.
Attributes:
Inherits from AgentBaseFn for agent integration.
Example:
>>> result = SystemInfo.static_call(info_type="memory")
>>> print(f"Available: {result['memory']['available_gb']} GB")
"""
[docs] @staticmethod
def static_call(
info_type: str = "all",
**context_variables,
) -> dict[str, Any]:
"""Get system information.
Retrieves various types of system information based on the
requested info_type. Can retrieve all categories or specific
subsets for efficiency.
Args:
info_type: Type of information to retrieve. Options:
- "all": All available system information
- "os": Operating system and Python version
- "cpu": CPU cores, usage, and frequency
- "memory": RAM usage and availability
- "disk": Disk usage and partitions
- "network": Network interfaces and addresses
**context_variables: Additional context passed from the agent.
Returns:
Dictionary containing requested system information:
- os: System, release, version, machine, processor, python_version
- cpu: Physical/logical cores, usage_percent, frequency
- memory: Total, available, used, percent (in bytes and GB)
- disk: Total, used, free, percent, partitions list
- network: Hostname, interfaces with addresses
"""
result = {}
if info_type in ["all", "os"]:
result["os"] = {
"system": platform.system(),
"release": platform.release(),
"version": platform.version(),
"machine": platform.machine(),
"processor": platform.processor(),
"python_version": platform.python_version(),
"hostname": platform.node(),
}
if info_type in ["all", "cpu"]:
result["cpu"] = {
"physical_cores": psutil.cpu_count(logical=False),
"logical_cores": psutil.cpu_count(logical=True),
"usage_percent": psutil.cpu_percent(interval=1),
"frequency": psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None,
}
if info_type in ["all", "memory"]:
mem = psutil.virtual_memory()
result["memory"] = {
"total": mem.total,
"available": mem.available,
"used": mem.used,
"percent": mem.percent,
"total_gb": round(mem.total / (1024**3), 2),
"available_gb": round(mem.available / (1024**3), 2),
}
if info_type in ["all", "disk"]:
disk = psutil.disk_usage("/")
result["disk"] = {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": disk.percent,
"total_gb": round(disk.total / (1024**3), 2),
"free_gb": round(disk.free / (1024**3), 2),
}
partitions = []
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
partitions.append(
{
"device": partition.device,
"mountpoint": partition.mountpoint,
"fstype": partition.fstype,
"total_gb": round(usage.total / (1024**3), 2),
"used_percent": usage.percent,
}
)
except (PermissionError, OSError):
pass
result["disk"]["partitions"] = partitions
if info_type in ["all", "network"]:
result["network"] = {
"hostname": platform.node(),
"interfaces": [],
}
for interface, addresses in psutil.net_if_addrs().items():
iface_info = {"name": interface, "addresses": []}
for addr in addresses:
iface_info["addresses"].append(
{
"family": str(addr.family),
"address": addr.address,
"netmask": addr.netmask,
}
)
result["network"]["interfaces"].append(iface_info)
return result
[docs]class ProcessManager(AgentBaseFn):
"""Manage and monitor system processes.
Provides process management capabilities including listing,
finding, inspecting, running commands, and terminating processes.
Uses psutil for cross-platform process management.
Supports graceful termination with fallback to force kill,
and provides detailed process information including resource usage.
Attributes:
Inherits from AgentBaseFn for agent integration.
Example:
>>> result = ProcessManager.static_call(
... operation="find",
... process_name="python"
... )
>>> print(f"Found {result['count']} Python processes")
"""
[docs] @staticmethod
def static_call(
operation: str,
process_name: str | None = None,
pid: int | None = None,
command: str | None = None,
limit: int = 20,
**context_variables,
) -> dict[str, Any]:
"""Manage system processes.
Performs various process management operations including listing,
finding, getting detailed info, killing, and running processes.
Args:
operation: Operation to perform:
- "list": List top processes by CPU usage
- "find": Find processes by name
- "info": Get detailed process information by PID
- "kill": Terminate a process by PID
- "run": Execute a shell command
process_name: Name of process to find (for find operation).
pid: Process ID (for info and kill operations).
command: Shell command to run (for run operation).
limit: Maximum number of processes to return (default: 20).
**context_variables: Additional context passed from the agent.
Returns:
Dictionary containing operation-specific results:
- list: processes list with pid, name, cpu_percent, memory_percent
- find: found list with matching processes
- info: detailed process info including memory, threads, cmdline
- kill: status, pid, name, message
- run: completed, returncode, stdout, stderr
- error: Error message if operation failed
"""
result = {}
if operation == "list":
processes = []
for proc in psutil.process_iter(["pid", "name", "cpu_percent", "memory_percent"]):
try:
processes.append(
{
"pid": proc.info["pid"],
"name": proc.info["name"],
"cpu_percent": proc.info["cpu_percent"],
"memory_percent": round(proc.info["memory_percent"], 2),
}
)
except (psutil.AccessDenied, psutil.NoSuchProcess):
pass
processes.sort(key=lambda x: x["cpu_percent"] or 0, reverse=True)
result["processes"] = processes[:limit]
result["total_count"] = len(processes)
elif operation == "find":
if not process_name:
return {"error": "process_name required for find operation"}
found = []
for proc in psutil.process_iter(["pid", "name", "cmdline"]):
try:
if process_name.lower() in proc.info["name"].lower():
found.append(
{
"pid": proc.info["pid"],
"name": proc.info["name"],
"cmdline": " ".join(proc.info["cmdline"]) if proc.info["cmdline"] else "",
}
)
except (psutil.AccessDenied, psutil.NoSuchProcess):
pass
result["found"] = found
result["count"] = len(found)
elif operation == "info":
if not pid:
return {"error": "pid required for info operation"}
try:
proc = psutil.Process(pid)
result["info"] = {
"pid": proc.pid,
"name": proc.name(),
"status": proc.status(),
"created": proc.create_time(),
"cpu_percent": proc.cpu_percent(),
"memory_percent": proc.memory_percent(),
"memory_info": proc.memory_info()._asdict(),
"num_threads": proc.num_threads(),
"cmdline": " ".join(proc.cmdline()),
}
except psutil.NoSuchProcess:
return {"error": f"No process with PID {pid}"}
except Exception as e:
return {"error": f"Failed to get process info: {e!s}"}
elif operation == "run":
if not command:
return {"error": "command required for run operation"}
try:
process = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=30,
)
result["completed"] = True
result["returncode"] = process.returncode
result["stdout"] = process.stdout[:5000]
result["stderr"] = process.stderr[:5000]
except subprocess.TimeoutExpired:
return {"error": "Command timed out after 30 seconds"}
except Exception as e:
return {"error": f"Failed to run command: {e!s}"}
elif operation == "kill":
if not pid:
return {"error": "pid required for kill operation"}
try:
proc = psutil.Process(pid)
proc_name = proc.name()
proc.terminate()
try:
proc.wait(timeout=5)
result["status"] = "terminated"
except psutil.TimeoutExpired:
proc.kill()
result["status"] = "killed"
result["pid"] = pid
result["name"] = proc_name
result["message"] = f"Process {proc_name} (PID {pid}) has been stopped"
except psutil.NoSuchProcess:
return {"error": f"No process with PID {pid}"}
except psutil.AccessDenied:
return {"error": f"Access denied to kill process {pid}"}
except Exception as e:
return {"error": f"Failed to kill process: {e!s}"}
else:
return {"error": f"Unknown operation: {operation}"}
return result
[docs]class EnvironmentManager(AgentBaseFn):
"""Manage environment variables and settings.
Provides operations for getting, setting, listing, and removing
environment variables. Includes filtering capabilities and
automatic listing of commonly important environment variables.
Changes to environment variables affect only the current process
and its child processes.
Attributes:
Inherits from AgentBaseFn for agent integration.
Example:
>>> result = EnvironmentManager.static_call(
... operation="get",
... key="PATH"
... )
>>> print(result["value"])
"""
[docs] @staticmethod
def static_call(
operation: str,
key: str | None = None,
value: str | None = None,
**context_variables,
) -> dict[str, Any]:
"""Manage environment variables.
Performs operations on environment variables within the
current process scope.
Args:
operation: Operation to perform:
- "get": Get value of specific variable
- "set": Set a variable to a value
- "list": List environment variables (filtered or common)
- "remove": Remove an environment variable
key: Environment variable name (required for get/set/remove).
For list, used as prefix filter.
value: Value to set (required for set operation).
**context_variables: Additional context passed from the agent.
Returns:
Dictionary containing operation-specific results:
- get: key, value, exists
- set: success, key, value
- list: environment dict, count
- remove: success, removed key
- error: Error message if operation failed
Note:
Environment changes are process-scoped and do not persist
after the program exits.
"""
result = {}
if operation == "get":
if not key:
return {"error": "key required for get operation"}
value = os.environ.get(key)
result["key"] = key
result["value"] = value
result["exists"] = value is not None
elif operation == "set":
if not key or value is None:
return {"error": "key and value required for set operation"}
os.environ[key] = str(value)
result["success"] = True
result["key"] = key
result["value"] = str(value)
elif operation == "list":
env_vars = {}
if key:
for k, v in os.environ.items():
if k.startswith(key):
env_vars[k] = v
else:
important_keys = [
"PATH",
"HOME",
"USER",
"SHELL",
"LANG",
"PWD",
"PYTHON",
"VIRTUAL_ENV",
"CONDA_DEFAULT_ENV",
"JAVA_HOME",
"NODE_ENV",
"GOPATH",
]
for k in important_keys:
if k in os.environ:
env_vars[k] = os.environ[k]
result["environment"] = env_vars
result["count"] = len(env_vars)
elif operation == "remove":
if not key:
return {"error": "key required for remove operation"}
if key in os.environ:
del os.environ[key]
result["success"] = True
result["removed"] = key
else:
result["success"] = False
result["error"] = f"Variable {key} not found"
else:
return {"error": f"Unknown operation: {operation}"}
return result
[docs]class TempFileManager(AgentBaseFn):
"""Create and manage temporary files and directories.
Provides functionality for creating temporary files and directories
with optional content, custom prefixes/suffixes, and cleanup
operations for Calute-created temporary files.
Uses the system's default temporary directory and provides
options for automatic or manual cleanup.
Attributes:
Inherits from AgentBaseFn for agent integration.
Example:
>>> result = TempFileManager.static_call(
... operation="create_file",
... content="temporary data",
... suffix=".txt"
... )
>>> print(result["path"])
"""
[docs] @staticmethod
def static_call(
operation: str,
content: str | None = None,
suffix: str | None = None,
prefix: str | None = None,
cleanup: bool = True,
**context_variables,
) -> dict[str, Any]:
"""Manage temporary files and directories.
Creates temporary files or directories in the system temp
directory, or cleans up previously created Calute temp files.
Args:
operation: Operation to perform:
- "create_file": Create a temporary file
- "create_dir": Create a temporary directory
- "cleanup": Remove all Calute temp files/directories
content: Content to write to temporary file (optional).
suffix: File extension/suffix (e.g., ".txt", ".json").
prefix: Filename prefix (default: "calute_").
cleanup: Whether to mark for automatic cleanup (default: True).
**context_variables: Additional context passed from the agent.
Returns:
Dictionary containing operation-specific results:
- create_file: path, exists, size
- create_dir: path, exists
- cleanup: temp_dir, found, deleted, failed, deleted_count
- error: Error message if operation failed
Note:
Files created with cleanup=False will persist after program exit.
The cleanup operation only removes files with "calute_" prefix.
"""
result = {}
if operation == "create_file":
try:
fd, path = tempfile.mkstemp(suffix=suffix, prefix=prefix or "calute_")
if content:
with os.fdopen(fd, "w") as f:
f.write(content)
else:
os.close(fd)
result["path"] = path
result["exists"] = os.path.exists(path)
result["size"] = os.path.getsize(path)
if not cleanup:
result["note"] = "File will persist after program exit"
except Exception as e:
return {"error": f"Failed to create temp file: {e!s}"}
elif operation == "create_dir":
try:
path = tempfile.mkdtemp(suffix=suffix, prefix=prefix or "calute_")
result["path"] = path
result["exists"] = os.path.exists(path)
if not cleanup:
result["note"] = "Directory will persist after program exit"
except Exception as e:
return {"error": f"Failed to create temp directory: {e!s}"}
elif operation == "cleanup":
import shutil
temp_dir = tempfile.gettempdir()
result["temp_dir"] = temp_dir
calute_temps = []
deleted = []
failed = []
for item in Path(temp_dir).iterdir():
if item.name.startswith("calute_"):
calute_temps.append(str(item))
try:
if item.is_dir():
shutil.rmtree(item)
else:
item.unlink()
deleted.append(str(item))
except Exception as e:
failed.append({"path": str(item), "error": str(e)})
result["found"] = calute_temps
result["deleted"] = deleted
result["failed"] = failed
result["deleted_count"] = len(deleted)
else:
return {"error": f"Unknown operation: {operation}"}
return result