# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Anthropic Claude LLM provider implementation.
This module provides integration with Anthropic's Claude API for the Calute
framework. It implements the BaseLLM interface to offer synchronous and
asynchronous completion generation, streaming responses, and function/tool
call support for Claude models.
The implementation uses httpx for HTTP communication with the Anthropic API,
providing efficient async operations and streaming support. It handles
message format conversion from OpenAI-style to Anthropic's expected format.
Features:
- Support for all Claude 3.x and 4.x models (Opus, Sonnet, Haiku)
- Streaming responses with Server-Sent Events (SSE) parsing
- Tool/function call support with structured output parsing
- Automatic message format conversion from OpenAI-style to Anthropic format
- Context length metadata for supported models
- Async context manager support for proper resource cleanup
Supported models include:
- claude-3-opus-20240229 (200K context)
- claude-3-sonnet-20240229 (200K context)
- claude-3-haiku-20240307 (200K context)
- claude-3-5-sonnet-20240620 (200K context)
- claude-3-5-haiku-20241022 (200K context)
- claude-opus-4-20250514 (200K context)
- claude-sonnet-4-20250514 (200K context)
Typical usage example:
from calute.llms.anthropic import AnthropicLLM
from calute.llms.base import LLMConfig
config = LLMConfig(
model="claude-3-opus-20240229",
temperature=0.7,
max_tokens=4096,
api_key="your-api-key"
)
async with AnthropicLLM(config) as llm:
response = await llm.generate_completion("Hello, Claude!")
content = llm.extract_content(response)
print(content)
Note:
Requires the httpx library for HTTP communication.
Install with: pip install httpx
"""
from __future__ import annotations
import json
import os
from collections.abc import AsyncIterator, Callable, Iterator
from typing import Any
from .base import BaseLLM, LLMConfig
ANTHROPIC_CONTEXT_LENGTHS = {
"claude-3-opus": 200000,
"claude-3-sonnet": 200000,
"claude-3-haiku": 200000,
"claude-3-5-sonnet": 200000,
"claude-3-5-haiku": 200000,
"claude-opus-4": 200000,
"claude-sonnet-4": 200000,
}
try:
import httpx
HAS_HTTPX = True
except ImportError:
HAS_HTTPX = False
httpx = None
[docs]class AnthropicLLM(BaseLLM):
"""Anthropic Claude LLM provider implementation.
AnthropicLLM provides integration with Anthropic's Claude API, implementing
the BaseLLM interface for seamless integration with the Calute framework.
It supports all Claude 3.x and 4.x model variants with features including
streaming responses, tool/function calling, and automatic message format
conversion.
This implementation uses httpx for async HTTP communication and handles
the conversion between OpenAI-style message formats and Anthropic's
expected format. System messages are automatically merged into the first
user message as Anthropic requires.
Attributes:
config: LLMConfig instance containing provider configuration.
version: Anthropic API version string (e.g., "2023-06-01").
client: httpx.AsyncClient instance for making API requests.
Example:
# Using with explicit config
config = LLMConfig(
model="claude-3-opus-20240229",
temperature=0.5,
max_tokens=4096,
api_key="sk-ant-..."
)
async with AnthropicLLM(config) as llm:
response = await llm.generate_completion("Hello!")
print(llm.extract_content(response))
# Using with kwargs
llm = AnthropicLLM(model="claude-3-haiku-20240307", api_key="sk-ant-...")
response = await llm.generate_completion("What is 2+2?")
await llm.close()
Note:
The API key can be provided via config, kwargs, or the
ANTHROPIC_API_KEY environment variable.
"""
def __init__(self, config: LLMConfig | None = None, version: str = "2023-06-01", **kwargs):
"""Initialize the Anthropic Claude LLM provider.
Creates a new AnthropicLLM instance configured for the Claude API.
If no config is provided, a default configuration is created using
the provided keyword arguments with sensible defaults.
Args:
config: LLM configuration object. If None, a default config is
created using kwargs. Defaults to claude-3-opus-20240229 model.
version: Anthropic API version header value. Different versions
may have different features and behavior. Defaults to
"2023-06-01" which is a stable API version.
**kwargs: Additional configuration parameters when config is None:
- model: Model identifier (default: "claude-3-opus-20240229")
- api_key: API key for authentication
- base_url: Base URL for API (default: "https://api.anthropic.com")
- temperature, max_tokens, top_p, etc.
Raises:
ImportError: If the httpx library is not installed.
Side Effects:
- Stores configuration in self.config
- Sets self.version to the API version
- Calls _initialize_client() to set up the HTTP client
"""
if not HAS_HTTPX:
raise ImportError("httpx library required for Anthropic. Install with: pip install httpx")
if config is None:
config = LLMConfig(
model=kwargs.pop("model", "claude-3-opus-20240229"),
api_key=kwargs.pop("api_key", None),
base_url=kwargs.pop("base_url", "https://api.anthropic.com"),
**kwargs,
)
self.version = version
self.client = None
super().__init__(config)
def _initialize_client(self) -> None:
"""Initialize the Anthropic HTTP client.
Sets up the httpx.AsyncClient with appropriate headers for Anthropic
API authentication and versioning. The client is configured with the
base URL from config and appropriate timeouts.
The API key is retrieved from the config, or falls back to the
ANTHROPIC_API_KEY environment variable if not explicitly provided.
Raises:
ValueError: If no API key is provided via config or environment.
Side Effects:
- Creates and stores httpx.AsyncClient in self.client
- Calls _auto_fetch_model_info() to populate model metadata
"""
api_key = self.config.api_key or os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("Anthropic API key not provided")
self.client = httpx.AsyncClient(
base_url=self.config.base_url,
headers={
"anthropic-version": self.version,
"x-api-key": api_key,
"content-type": "application/json",
},
timeout=self.config.timeout,
)
self._auto_fetch_model_info()
[docs] async def generate_completion(
self,
prompt: str | list[dict[str, str]],
model: str | None = None,
temperature: float | None = None,
max_tokens: int | None = None,
top_p: float | None = None,
stop: list[str] | None = None,
stream: bool | None = None,
**kwargs,
) -> Any:
"""Generate a completion using the Anthropic Claude API.
Sends a prompt to the Claude API and returns the model's response.
Supports both simple string prompts and OpenAI-style message lists.
When using message lists, the method automatically converts them to
Anthropic's expected format, handling system messages appropriately.
Args:
prompt: The input prompt. Can be either:
- A string: Converted to a single user message
- A list of message dicts with 'role' and 'content' keys,
following OpenAI's chat format. Roles can be 'user',
'assistant', or 'system'.
model: Model identifier to use, overriding config.model.
Example: "claude-3-opus-20240229"
temperature: Sampling temperature (0.0-1.0), overriding config.
Higher values produce more random outputs.
max_tokens: Maximum tokens to generate, overriding config.
Claude requires this parameter (unlike some other providers).
top_p: Nucleus sampling parameter, overriding config.
Only applied if different from the default 0.95.
stop: List of stop sequences that will halt generation,
overriding config.stop. Called "stop_sequences" in Anthropic API.
stream: Whether to stream the response, overriding config.stream.
If True, returns an async iterator of chunks.
**kwargs: Additional Anthropic-specific parameters passed directly
to the API, such as:
- tools: List of tool definitions for function calling
- tool_choice: How to select tools
- metadata: Request metadata
Returns:
If stream is False: A dictionary containing the API response with:
- id: Unique message ID
- type: "message"
- role: "assistant"
- content: List of content blocks (text, tool_use)
- model: Model used
- stop_reason: Why generation stopped
- usage: Token usage statistics
If stream is True: An AsyncIterator yielding SSE event dicts.
Raises:
RuntimeError: If the API request fails due to network errors,
authentication issues, or API errors.
Example:
# Simple string prompt
response = await llm.generate_completion("What is Python?")
# Message list with system prompt
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"}
]
response = await llm.generate_completion(messages)
# With streaming
async for chunk in await llm.generate_completion("Hi", stream=True):
print(chunk)
"""
if isinstance(prompt, str):
messages = [{"role": "user", "content": prompt}]
else:
messages = self._convert_messages(prompt)
payload = {
"model": model or self.config.model,
"messages": messages,
"max_tokens": max_tokens if max_tokens is not None else self.config.max_tokens,
"temperature": temperature if temperature is not None else self.config.temperature,
}
if top_p is not None or self.config.top_p != 0.95:
payload["top_p"] = top_p if top_p is not None else self.config.top_p
if stop or self.config.stop:
payload["stop_sequences"] = stop or self.config.stop
payload.update(kwargs)
payload.update(self.config.extra_params)
use_stream = stream if stream is not None else self.config.stream
try:
if use_stream:
return self._stream_completion(payload)
else:
response = await self.client.post("/v1/messages", json=payload)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
raise RuntimeError(f"Anthropic API request failed: {e}") from e
async def _stream_completion(self, payload: dict) -> AsyncIterator[dict]:
"""Stream completion chunks from the Anthropic API.
Internal method that handles streaming HTTP responses from the
Anthropic Messages API. Uses Server-Sent Events (SSE) format to
parse incremental response chunks.
The method streams the response line by line, parsing SSE data
events and yielding parsed JSON chunks. The "[DONE]" sentinel
is filtered out from the output.
Args:
payload: Request payload dictionary to send to the API.
The "stream" key is automatically set to True.
Yields:
dict: Parsed JSON event data from each SSE data line.
Event types include:
- message_start: Initial message metadata
- content_block_start: Start of a content block
- content_block_delta: Incremental content update
- content_block_stop: End of a content block
- message_delta: Message-level updates (stop_reason, usage)
- message_stop: End of the message stream
Raises:
httpx.HTTPError: If the HTTP request fails or returns an error.
Note:
This is an internal method called by generate_completion()
when streaming is enabled. Use generate_completion(stream=True)
instead of calling this directly.
"""
payload["stream"] = True
async with self.client.stream("POST", "/v1/messages", json=payload) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:]
if data != "[DONE]":
yield json.loads(data)
def _convert_messages(self, messages: list[dict[str, str]]) -> list[dict[str, str]]:
"""Convert OpenAI-style messages to Anthropic format.
Transforms a list of messages from OpenAI's chat format to Anthropic's
expected format. The key difference is that Anthropic doesn't support
a separate "system" role - system messages must be prepended to the
first user message.
The conversion handles:
- Extracting system messages and merging them with the first user message
- Filtering out messages with unsupported roles
- Preserving the order of user and assistant messages
Args:
messages: List of message dictionaries in OpenAI format, each with:
- role: "user", "assistant", or "system"
- content: The message text
Returns:
List of message dictionaries in Anthropic format, containing only
"user" and "assistant" roles. System content is prepended to the
first user message with a double newline separator.
Example:
>>> messages = [
... {"role": "system", "content": "You are helpful."},
... {"role": "user", "content": "Hello!"}
... ]
>>> converted = llm._convert_messages(messages)
>>> # Returns: [{"role": "user", "content": "You are helpful.\\n\\nHello!"}]
Note:
If there's a system message but no user message, or if the first
non-system message is from the assistant, the system content is
inserted as a user message at the beginning.
"""
converted = []
system_content = None
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
system_content = content
else:
if role in ["user", "assistant"]:
converted.append({"role": role, "content": content})
if system_content and converted:
if converted[0]["role"] == "user":
converted[0]["content"] = f"{system_content}\n\n{converted[0]['content']}"
else:
converted.insert(0, {"role": "user", "content": system_content})
return converted
[docs] async def process_streaming_response(
self,
response: Any,
callback: Callable[[str, Any], None],
) -> str:
"""Process a streaming response from the Anthropic API.
Iterates through streaming response events and accumulates text content.
For each content delta event, the callback is invoked with the new text
and the raw event data.
This method is useful for real-time display of streaming output while
also collecting the complete response.
Args:
response: An async iterator of streaming events from
generate_completion(stream=True). Events are dictionaries
with a "type" field indicating the event type.
callback: A function called for each text chunk received.
Signature: callback(text: str, raw_chunk: dict)
- text: The incremental text content
- raw_chunk: The raw event dictionary
Returns:
The complete accumulated text content from all content_block_delta
events in the stream.
Example:
def on_chunk(text, chunk):
print(text, end="", flush=True)
response = await llm.generate_completion("Tell me a story", stream=True)
full_text = await llm.process_streaming_response(response, on_chunk)
print(f"\\nTotal: {len(full_text)} chars")
"""
accumulated_content = ""
async for chunk in response:
if chunk.get("type") == "content_block_delta":
delta = chunk.get("delta", {})
if text := delta.get("text"):
accumulated_content += text
callback(text, chunk)
return accumulated_content
[docs] def stream_completion(
self,
response: Any,
agent: Any | None = None,
) -> Iterator[dict[str, Any]]:
"""Stream completion chunks with function call detection.
Processes a synchronous streaming response from Anthropic, yielding
standardized chunk dictionaries that include accumulated content and
detected function/tool calls. This method provides a unified interface
for handling streaming responses across different providers.
The method tracks:
- Incremental text content from content_block_delta events
- Accumulated content across the entire stream
- Tool use blocks for function calling
- Stream completion (message_stop event)
Args:
response: A synchronous iterator of streaming events from
the Anthropic API. Each event is either a dictionary or
an object with a "type" attribute.
agent: Optional agent instance for advanced function detection.
Currently reserved for future use.
Yields:
dict: Standardized chunk information with keys:
- content: Text content in this chunk (str or None)
- buffered_content: All text accumulated so far (str)
- function_calls: List of detected function calls (list)
- tool_calls: Raw tool call data (None for Anthropic)
- raw_chunk: The original event data
- is_final: True if this is the final chunk (bool)
Function call format in function_calls list:
{
"id": "tool_use_id",
"name": "function_name",
"arguments": '{"arg": "value"}' # JSON string
}
Example:
for chunk in llm.stream_completion(response):
if chunk["content"]:
print(chunk["content"], end="")
if chunk["is_final"]:
for call in chunk["function_calls"]:
print(f"Function: {call['name']}")
"""
buffered_content = ""
function_calls = []
for event in response:
chunk_data = {
"content": None,
"buffered_content": buffered_content,
"function_calls": [],
"tool_calls": None,
"raw_chunk": event,
"is_final": False,
}
event_type = event.get("type") if isinstance(event, dict) else getattr(event, "type", None)
if event_type == "content_block_delta":
delta = event.get("delta", {}) if isinstance(event, dict) else getattr(event, "delta", {})
text = delta.get("text", "") if isinstance(delta, dict) else getattr(delta, "text", "")
if text:
buffered_content += text
chunk_data["content"] = text
chunk_data["buffered_content"] = buffered_content
elif event_type == "message_stop":
chunk_data["is_final"] = True
chunk_data["function_calls"] = function_calls
elif event_type == "tool_use":
name = event.get("name") if isinstance(event, dict) else getattr(event, "name", None)
input_data = event.get("input") if isinstance(event, dict) else getattr(event, "input", None)
if name:
function_calls.append(
{
"id": event.get("id") if isinstance(event, dict) else getattr(event, "id", None),
"name": name,
"arguments": json.dumps(input_data) if input_data else "",
}
)
yield chunk_data
[docs] async def astream_completion(
self,
response: Any,
agent: Any | None = None,
) -> AsyncIterator[dict[str, Any]]:
"""Async stream completion chunks with function call detection.
Asynchronous version of stream_completion() that processes streaming
responses from the Anthropic API. Yields standardized chunk dictionaries
containing accumulated content and detected function/tool calls.
This method is the preferred way to handle streaming responses in
async contexts, providing the same unified interface as the synchronous
stream_completion() method.
The method tracks:
- Incremental text content from content_block_delta events
- Accumulated content across the entire stream
- Tool use blocks for function calling
- Stream completion (message_stop event)
Args:
response: An async iterator of streaming events from
generate_completion(stream=True). Each event is either a
dictionary or an object with a "type" attribute.
agent: Optional agent instance for advanced function detection.
Currently reserved for future use.
Yields:
dict: Standardized chunk information with keys:
- content: Text content in this chunk (str or None)
- buffered_content: All text accumulated so far (str)
- function_calls: List of detected function calls (list)
- tool_calls: Raw tool call data (None for Anthropic)
- raw_chunk: The original event data
- is_final: True if this is the final chunk (bool)
Function call format in function_calls list:
{
"id": "tool_use_id",
"name": "function_name",
"arguments": '{"arg": "value"}' # JSON string
}
Example:
response = await llm.generate_completion("Hello", stream=True)
async for chunk in llm.astream_completion(response):
if chunk["content"]:
print(chunk["content"], end="", flush=True)
if chunk["is_final"] and chunk["function_calls"]:
print("\\nFunction calls detected!")
"""
buffered_content = ""
function_calls = []
async for event in response:
chunk_data = {
"content": None,
"buffered_content": buffered_content,
"function_calls": [],
"tool_calls": None,
"raw_chunk": event,
"is_final": False,
}
event_type = event.get("type") if isinstance(event, dict) else getattr(event, "type", None)
if event_type == "content_block_delta":
delta = event.get("delta", {}) if isinstance(event, dict) else getattr(event, "delta", {})
text = delta.get("text", "") if isinstance(delta, dict) else getattr(delta, "text", "")
if text:
buffered_content += text
chunk_data["content"] = text
chunk_data["buffered_content"] = buffered_content
elif event_type == "message_stop":
chunk_data["is_final"] = True
chunk_data["function_calls"] = function_calls
elif event_type == "tool_use":
name = event.get("name") if isinstance(event, dict) else getattr(event, "name", None)
input_data = event.get("input") if isinstance(event, dict) else getattr(event, "input", None)
if name:
function_calls.append(
{
"id": event.get("id") if isinstance(event, dict) else getattr(event, "id", None),
"name": name,
"arguments": json.dumps(input_data) if input_data else "",
}
)
yield chunk_data
[docs] def fetch_model_info(self) -> dict[str, Any]:
"""Fetch model metadata from known Anthropic model specifications.
Since Anthropic does not provide a public API endpoint for querying
model capabilities, this method uses a local mapping of known model
context lengths (ANTHROPIC_CONTEXT_LENGTHS). The model is matched
by prefix to handle versioned model names.
The context length information is useful for:
- Token counting and context window management
- Preventing context overflow errors
- Optimizing prompt construction
Returns:
Dictionary with model metadata. Contains:
- max_model_len: Maximum context length in tokens (int)
Returns an empty dictionary if the model name doesn't match
any known prefixes.
Example:
>>> llm = AnthropicLLM(model="claude-3-opus-20240229")
>>> info = llm.fetch_model_info()
>>> print(info) # {"max_model_len": 200000}
Note:
This method is called automatically during client initialization
via _auto_fetch_model_info(). The result is stored in
config.model_metadata and config.max_model_len.
"""
model = self.config.model
for prefix, context_len in ANTHROPIC_CONTEXT_LENGTHS.items():
if model.startswith(prefix):
return {"max_model_len": context_len}
return {}
[docs] async def close(self) -> None:
"""Close the HTTP client and release resources.
Properly closes the httpx.AsyncClient connection pool. This method
should be called when done using the LLM provider to prevent resource
leaks. It is called automatically when using the provider as an async
context manager.
This method is safe to call multiple times - it checks for client
existence before attempting to close.
Example:
llm = AnthropicLLM(config)
try:
response = await llm.generate_completion("Hello")
finally:
await llm.close()
# Or preferably, use context manager:
async with AnthropicLLM(config) as llm:
response = await llm.generate_completion("Hello")
"""
if self.client:
await self.client.aclose()