Source code for calute.mcp.client

# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""MCP client implementation for connecting to MCP servers.

This module provides the core MCP client functionality for Calute,
including:
- Connection management for multiple transport types (STDIO, SSE, Streamable HTTP)
- JSON-RPC 2.0 message handling for the MCP protocol
- Tool discovery and invocation
- Resource reading and prompt fetching
- Session lifecycle management

The client supports both subprocess-based (STDIO) communication for local
MCP servers and HTTP-based transports for remote servers. SSE and Streamable
HTTP transports require the optional `mcp` SDK package.
"""

import asyncio
import json
import subprocess
from contextlib import AsyncExitStack
from typing import Any

from ..logging.console import get_logger
from .types import MCPPrompt, MCPResource, MCPServerConfig, MCPTool, MCPTransportType

_MCP_SDK_AVAILABLE: bool | None = None


def _check_mcp_sdk() -> bool:
    """Check if the MCP SDK is installed.

    This function lazily checks for the presence of the `mcp` package,
    caching the result for subsequent calls.

    Returns:
        True if the MCP SDK is installed, False otherwise.
    """
    global _MCP_SDK_AVAILABLE
    if _MCP_SDK_AVAILABLE is None:
        try:
            import mcp  # noqa: F401

            _MCP_SDK_AVAILABLE = True
        except ImportError:
            _MCP_SDK_AVAILABLE = False
    return _MCP_SDK_AVAILABLE


[docs]class MCPClient: """Client for connecting to and interacting with MCP servers. This client supports multiple transports for communicating with MCP servers: - STDIO: Local subprocess communication (for npx, uvx style servers) - SSE: Server-Sent Events over HTTP (legacy 2024-11-05 protocol) - STREAMABLE_HTTP: Streamable HTTP transport (recommended for 2025+) Note: SSE and STREAMABLE_HTTP transports require the optional `mcp` package. Install with: pip install calute[mcp] Attributes: config: MCP server configuration process: Subprocess for stdio transport (if applicable) session_id: Session identifier for this connection tools: Available tools from the server resources: Available resources from the server prompts: Available prompts from the server """ _request_id_counter: int = 0 def __init__(self, config: MCPServerConfig): """Initialize MCP client with server configuration. Sets up the client in a disconnected state with empty capability lists. Call ``connect()`` to establish a connection to the MCP server and discover its capabilities. Args: config: Configuration for the MCP server, specifying the transport type, command/URL, and optional environment variables or headers. Example: >>> config = MCPServerConfig(name="fs", command="npx", args=["-y", "@mcp/server-fs"]) >>> client = MCPClient(config) >>> connected = await client.connect() """ self.config = config self.process: subprocess.Popen | None = None self.session_id: str | None = None self.connected = False self.logger = get_logger() self.tools: list[MCPTool] = [] self.resources: list[MCPResource] = [] self.prompts: list[MCPPrompt] = [] self._session: Any = None self._exit_stack: AsyncExitStack | None = None def _next_request_id(self) -> int: """Generate unique JSON-RPC request ID. Uses a class-level counter that is thread-safe via Python's GIL. Each call returns a monotonically increasing integer. Returns: A unique integer ID for the JSON-RPC request. """ MCPClient._request_id_counter += 1 return MCPClient._request_id_counter
[docs] async def connect(self) -> bool: """Connect to the MCP server. Establishes a connection using the transport type specified in the configuration. Handles deprecated transport aliases (HTTP -> SSE, WEBSOCKET -> STREAMABLE_HTTP) for backward compatibility. Returns: True if connection successful, False otherwise. Note: After successful connection, use `disconnect()` to clean up resources. """ try: transport = self.config.transport if self.config.url and self.config.url.startswith(("ws://", "wss://")): self.logger.error(f"WebSocket MCP transport is not implemented for {self.config.name}") return False if transport == MCPTransportType.HTTP: transport = MCPTransportType.SSE elif transport == MCPTransportType.WEBSOCKET: transport = MCPTransportType.STREAMABLE_HTTP if transport == MCPTransportType.STDIO: return await self._connect_stdio() elif transport == MCPTransportType.SSE: return await self._connect_sse() elif transport == MCPTransportType.STREAMABLE_HTTP: return await self._connect_streamable_http() else: self.logger.error(f"Unsupported transport type: {self.config.transport}") return False except Exception as e: self.logger.error(f"Failed to connect to MCP server {self.config.name}: {e}") return False
async def _connect_stdio(self) -> bool: """Connect using stdio transport. Spawns the MCP server as a subprocess and communicates via stdin/stdout. This is the standard transport for local MCP servers invoked via npx, uvx, or similar package runners. The connection process: 1. Spawns the subprocess with configured command and arguments 2. Sends JSON-RPC initialize request 3. Waits for and validates the server response 4. Sends initialized notification 5. Discovers server capabilities (tools, resources, prompts) Returns: True if connection successful, False otherwise. """ if not self.config.command: self.logger.error(f"No command specified for stdio MCP server {self.config.name}") return False try: env = None if self.config.env: import os env = os.environ.copy() env.update(self.config.env) self.logger.info(f"Starting MCP server: {self.config.command} {' '.join(self.config.args)}") self.process = subprocess.Popen( [self.config.command, *self.config.args], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, text=True, ) await asyncio.sleep(0.5) if self.process.poll() is not None: stderr_output = "" if self.process.stderr: stderr_output = await asyncio.to_thread(self.process.stderr.read) self.logger.error(f"MCP server process failed to start. Exit code: {self.process.returncode}") self.logger.error(f"stderr output: {stderr_output}") return False init_request = { "jsonrpc": "2.0", "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "Calute", "version": "0.1.2"}, }, "id": self._next_request_id(), } self._write_message(init_request) response = await self._read_message() if response and response.get("result"): initialized_notification = {"jsonrpc": "2.0", "method": "notifications/initialized"} self._write_message(initialized_notification) self.session_id = str(id(self)) self.connected = True self.logger.debug(f"Connected to MCP server {self.config.name}") await self._discover_capabilities() return True elif response and response.get("error"): self.logger.error(f"MCP server returned error: {response['error']}") return False else: self.logger.error(f"No valid response from MCP server {self.config.name}") return False except FileNotFoundError: self.logger.error(f"Command not found: {self.config.command}. Make sure it's installed and in PATH.") return False except Exception as e: self.logger.error(f"Failed to connect via stdio: {e}") if self.process: self.logger.error(f"Process poll: {self.process.poll()}") return False async def _connect_sse(self) -> bool: """Connect using SSE transport (legacy HTTP+SSE protocol). This transport uses Server-Sent Events over HTTP, which was the standard for MCP protocol version 2024-11-05. For newer deployments, prefer STREAMABLE_HTTP transport. Returns: True if connection successful, False otherwise. Raises: ImportError: If the MCP SDK is not installed. Note: Requires the optional `mcp` package: pip install calute[mcp] """ if not _check_mcp_sdk(): raise ImportError("SSE transport requires the MCP SDK. Install with: pip install calute[mcp]") if not self.config.url: self.logger.error(f"No URL specified for SSE MCP server {self.config.name}") return False try: from mcp import ClientSession from mcp.client.sse import sse_client self._exit_stack = AsyncExitStack() await self._exit_stack.__aenter__() sse_transport = await self._exit_stack.enter_async_context( sse_client( url=self.config.url, headers=self.config.headers, timeout=self.config.timeout, sse_read_timeout=self.config.sse_read_timeout, ) ) read_stream, write_stream = sse_transport self._session = await self._exit_stack.enter_async_context(ClientSession(read_stream, write_stream)) await self._session.initialize() self.session_id = str(id(self)) self.connected = True self.logger.debug(f"Connected to MCP server {self.config.name} via SSE") await self._discover_capabilities_sdk() return True except Exception as e: self.logger.error(f"Failed to connect via SSE: {e}") if self._exit_stack: await self._exit_stack.__aexit__(None, None, None) self._exit_stack = None return False async def _connect_streamable_http(self) -> bool: """Connect using Streamable HTTP transport (recommended for 2025+). This is the recommended transport for new MCP deployments. It uses standard HTTP with streaming support for bidirectional communication. Returns: True if connection successful, False otherwise. Raises: ImportError: If the MCP SDK is not installed. Note: Requires the optional `mcp` package: pip install calute[mcp] """ if not _check_mcp_sdk(): raise ImportError("Streamable HTTP transport requires the MCP SDK. Install with: pip install calute[mcp]") if not self.config.url: self.logger.error(f"No URL specified for Streamable HTTP MCP server {self.config.name}") return False try: from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client self._exit_stack = AsyncExitStack() await self._exit_stack.__aenter__() http_transport = await self._exit_stack.enter_async_context( streamablehttp_client( url=self.config.url, headers=self.config.headers, timeout=self.config.timeout, ) ) read_stream, write_stream, get_session_id = http_transport self._session = await self._exit_stack.enter_async_context(ClientSession(read_stream, write_stream)) await self._session.initialize() session_id = get_session_id() self.session_id = session_id if session_id else str(id(self)) self.connected = True self.logger.debug(f"Connected to MCP server {self.config.name} via Streamable HTTP") await self._discover_capabilities_sdk() return True except Exception as e: self.logger.error(f"Failed to connect via Streamable HTTP: {e}") if self._exit_stack: await self._exit_stack.__aexit__(None, None, None) self._exit_stack = None return False async def _discover_capabilities_sdk(self) -> None: """Discover tools, resources, and prompts using the SDK session. Queries the MCP server for available capabilities and populates the `tools`, `resources`, and `prompts` attributes. This method is used for SSE and Streamable HTTP transports that use the MCP SDK. Failures in discovering individual capability types are logged but do not cause the entire operation to fail. """ if not self._session: return try: tools_result = await self._session.list_tools() self.tools = [ MCPTool( name=tool.name, description=tool.description or "", input_schema=tool.inputSchema if hasattr(tool, "inputSchema") else {}, server_name=self.config.name, ) for tool in tools_result.tools ] self.logger.info(f"Discovered {len(self.tools)} tools from {self.config.name}") except Exception as e: self.logger.debug(f"Failed to list tools: {e}") try: resources_result = await self._session.list_resources() self.resources = [ MCPResource( uri=resource.uri, name=resource.name or "", description=resource.description or "", mime_type=resource.mimeType if hasattr(resource, "mimeType") else None, server_name=self.config.name, ) for resource in resources_result.resources ] self.logger.info(f"Discovered {len(self.resources)} resources from {self.config.name}") except Exception as e: self.logger.debug(f"Failed to list resources: {e}") try: prompts_result = await self._session.list_prompts() self.prompts = [ MCPPrompt( name=prompt.name, description=prompt.description or "", arguments=prompt.arguments if hasattr(prompt, "arguments") else [], server_name=self.config.name, ) for prompt in prompts_result.prompts ] self.logger.info(f"Discovered {len(self.prompts)} prompts from {self.config.name}") except Exception as e: self.logger.debug(f"Failed to list prompts: {e}") def _write_message(self, message: dict[str, Any]) -> None: """Write a message to the MCP server via stdio. Serializes the message to JSON and writes it to the subprocess stdin, followed by a newline character. Args: message: Dictionary containing the JSON-RPC message to send. Raises: RuntimeError: If the MCP server process is not available. """ if not self.process or not self.process.stdin: raise RuntimeError("MCP server process not available") json_str = json.dumps(message) self.process.stdin.write(json_str + "\n") self.process.stdin.flush() async def _read_message(self) -> dict[str, Any] | None: """Read a message from the MCP server via stdio. Reads a line from the subprocess stdout and parses it as JSON. Uses asyncio to avoid blocking the event loop while waiting. Returns: Parsed JSON message as a dictionary, or None if no message was received or parsing failed. """ if not self.process or not self.process.stdout: return None try: line = await asyncio.wait_for( asyncio.create_task(asyncio.to_thread(self.process.stdout.readline)), timeout=10.0 ) if line: line_str = line.strip() if line_str: return json.loads(line_str) return None except TimeoutError: self.logger.error( f"Timeout reading from MCP server {self.config.name}. Server may not be running or configured correctly." ) if self.process.poll() is not None: stderr_output = "" if self.process.stderr: stderr_output = await asyncio.to_thread(self.process.stderr.read) self.logger.error(f"MCP server process exited. stderr: {stderr_output}") return None except json.JSONDecodeError as e: self.logger.error(f"Failed to parse MCP response: {e}") return None async def _discover_capabilities(self) -> None: """Discover tools, resources, and prompts from the server. Sends JSON-RPC requests to list available tools, resources, and prompts from the MCP server. Populates the `tools`, `resources`, and `prompts` attributes with the discovered capabilities. This method is used for STDIO transport connections. """ tools_request = {"jsonrpc": "2.0", "method": "tools/list", "params": {}, "id": self._next_request_id()} self._write_message(tools_request) tools_response = await self._read_message() if tools_response and tools_response.get("result"): tools_data = tools_response["result"].get("tools", []) self.tools = [ MCPTool( name=tool["name"], description=tool.get("description", ""), input_schema=tool.get("inputSchema", {}), server_name=self.config.name, ) for tool in tools_data ] self.logger.info(f"Discovered {len(self.tools)} tools from {self.config.name}") resources_request = {"jsonrpc": "2.0", "method": "resources/list", "params": {}, "id": self._next_request_id()} self._write_message(resources_request) resources_response = await self._read_message() if resources_response and resources_response.get("result"): resources_data = resources_response["result"].get("resources", []) self.resources = [ MCPResource( uri=resource["uri"], name=resource.get("name", ""), description=resource.get("description", ""), mime_type=resource.get("mimeType"), server_name=self.config.name, ) for resource in resources_data ] self.logger.info(f"Discovered {len(self.resources)} resources from {self.config.name}") prompts_request = {"jsonrpc": "2.0", "method": "prompts/list", "params": {}, "id": self._next_request_id()} self._write_message(prompts_request) prompts_response = await self._read_message() if prompts_response and prompts_response.get("result"): prompts_data = prompts_response["result"].get("prompts", []) self.prompts = [ MCPPrompt( name=prompt["name"], description=prompt.get("description", ""), arguments=prompt.get("arguments", []), server_name=self.config.name, ) for prompt in prompts_data ] self.logger.info(f"Discovered {len(self.prompts)} prompts from {self.config.name}")
[docs] async def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: """Call a tool on the MCP server. Invokes a tool by name with the provided arguments. Uses the SDK session for SSE/Streamable HTTP transports or JSON-RPC for STDIO transport. Args: tool_name: Name of the tool to call. arguments: Dictionary of arguments to pass to the tool. Returns: Tool execution result content. Raises: RuntimeError: If not connected to the server or tool call fails. """ if not self.connected: raise RuntimeError(f"Not connected to MCP server {self.config.name}") if self._session: result = await self._session.call_tool(tool_name, arguments) return result.content request = { "jsonrpc": "2.0", "method": "tools/call", "params": {"name": tool_name, "arguments": arguments}, "id": self._next_request_id(), } self._write_message(request) response = await self._read_message() if response and response.get("result"): return response["result"].get("content", []) elif response and response.get("error"): raise RuntimeError(f"MCP tool call error: {response['error']}") else: raise RuntimeError("Invalid response from MCP server")
[docs] async def read_resource(self, uri: str) -> Any: """Read a resource from the MCP server. Fetches the content of a resource identified by its URI. Uses the SDK session for SSE/Streamable HTTP transports or JSON-RPC for STDIO transport. Args: uri: Resource URI to read. Returns: Resource content as returned by the server. Raises: RuntimeError: If not connected to the server or resource read fails. """ if not self.connected: raise RuntimeError(f"Not connected to MCP server {self.config.name}") if self._session: result = await self._session.read_resource(uri) return result.contents request = {"jsonrpc": "2.0", "method": "resources/read", "params": {"uri": uri}, "id": self._next_request_id()} self._write_message(request) response = await self._read_message() if response and response.get("result"): return response["result"].get("contents", []) elif response and response.get("error"): raise RuntimeError(f"MCP resource read error: {response['error']}") else: raise RuntimeError("Invalid response from MCP server")
[docs] async def get_prompt(self, name: str, arguments: dict[str, Any] | None = None) -> str: """Get a prompt from the MCP server. Retrieves and renders a prompt template with the provided arguments. Uses the SDK session for SSE/Streamable HTTP transports or JSON-RPC for STDIO transport. Args: name: Name of the prompt to retrieve. arguments: Optional dictionary of arguments for prompt rendering. Returns: Rendered prompt text as a string. Raises: RuntimeError: If not connected to the server or prompt fetch fails. """ if not self.connected: raise RuntimeError(f"Not connected to MCP server {self.config.name}") if self._session: result = await self._session.get_prompt(name, arguments or {}) messages = result.messages if messages: content = messages[0].content if hasattr(content, "text"): return content.text return str(content) return "" request = { "jsonrpc": "2.0", "method": "prompts/get", "params": {"name": name, "arguments": arguments or {}}, "id": self._next_request_id(), } self._write_message(request) response = await self._read_message() if response and response.get("result"): messages = response["result"].get("messages", []) if messages: return messages[0].get("content", {}).get("text", "") return "" elif response and response.get("error"): raise RuntimeError(f"MCP prompt get error: {response['error']}") else: raise RuntimeError("Invalid response from MCP server")
[docs] async def disconnect(self) -> None: """Disconnect from the MCP server. Gracefully closes the connection and cleans up resources. For SDK-based transports (SSE, Streamable HTTP), closes the async exit stack. For STDIO transport, terminates the subprocess. This method is safe to call multiple times. """ if self._exit_stack: try: await self._exit_stack.__aexit__(None, None, None) except Exception as e: self.logger.error(f"Error closing MCP session: {e}") self._exit_stack = None self._session = None if self.process: try: self.process.terminate() self.process.wait(timeout=5) except subprocess.TimeoutExpired: self.process.kill() except Exception as e: self.logger.error(f"Error disconnecting from MCP server: {e}") self.connected = False self.session_id = None self.logger.info(f"Disconnected from MCP server {self.config.name}")
def __del__(self): """Cleanup subprocess on object deletion. Ensures the subprocess is terminated if still running when the client object is garbage collected. This is a fallback; prefer using `disconnect()` for explicit cleanup. """ if self.process and self.process.poll() is None: try: self.process.terminate() except Exception: pass