# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Ollama local LLM provider implementation.
This module provides integration with Ollama for running large language models
locally. Ollama is an open-source tool for running LLMs on your own machine,
supporting models like Llama, Mistral, CodeLlama, and many others.
The module handles:
- HTTP communication with the Ollama server via async httpx
- Both chat-style (/api/chat) and generate-style (/api/generate) endpoints
- Streaming response processing with callback support
- Automatic model metadata fetching (context length, parameters)
- Configurable timeout for long-running generations
Supported models include:
- llama2, llama3 (default: llama2)
- mistral, mixtral
- codellama
- phi, phi3
- Any model available in your local Ollama installation
Typical usage example:
from calute.llms.ollama import OllamaLLM
from calute.llms.base import LLMConfig
# Ensure Ollama is running locally (ollama serve)
config = LLMConfig(
model="llama3",
base_url="http://localhost:11434",
temperature=0.7,
max_tokens=2048,
)
async with OllamaLLM(config) as llm:
response = await llm.generate_completion("Explain recursion")
content = llm.extract_content(response)
print(content)
Note:
Requires the httpx package to be installed:
pip install httpx
Also requires Ollama to be installed and running:
https://ollama.ai
"""
from __future__ import annotations
import json
from collections.abc import AsyncIterator, Callable, Iterator
from typing import Any
from .base import BaseLLM, LLMConfig
try:
import httpx
HAS_HTTPX = True
except ImportError:
HAS_HTTPX = False
httpx = None
[docs]class OllamaLLM(BaseLLM):
"""Ollama local LLM provider implementation.
OllamaLLM provides integration with the Ollama server for running
language models locally. It communicates with Ollama via HTTP,
supporting both text generation and chat-style conversations.
This implementation automatically selects the appropriate Ollama
endpoint based on the prompt format:
- String prompts use /api/generate
- Message lists use /api/chat
Attributes:
config: LLMConfig instance containing provider configuration.
client: httpx.AsyncClient instance for HTTP communication with
the Ollama server.
Example:
# Basic usage with string prompt
llm = OllamaLLM(model="llama3", base_url="http://localhost:11434")
response = await llm.generate_completion("What is Python?")
print(llm.extract_content(response))
# Using with chat-style messages
messages = [
{"role": "system", "content": "You are a helpful coding assistant."},
{"role": "user", "content": "Write a hello world in Python"},
]
response = await llm.generate_completion(messages)
# Streaming response
response = await llm.generate_completion("Tell me a joke", stream=True)
async for chunk in response:
print(llm.extract_content(chunk), end="")
Note:
Ollama must be running locally (or remotely) before using this class.
Start Ollama with: ollama serve
"""
def __init__(self, config: LLMConfig | None = None, **kwargs):
"""Initialize the Ollama LLM provider.
Creates a new OllamaLLM instance with the specified configuration.
If no config is provided, a default configuration is created using
sensible defaults for local Ollama deployment.
Args:
config: LLM configuration object. If None, a default config is
created with:
- model: "llama2" (or from kwargs)
- base_url: "http://localhost:11434" (or from kwargs)
- timeout: 120.0 seconds (or from kwargs)
**kwargs: Additional configuration parameters passed to LLMConfig
when config is None. Common kwargs include:
- model: Model name (e.g., "llama3", "mistral")
- base_url: Ollama server URL
- timeout: Request timeout in seconds
- temperature: Sampling temperature
Raises:
ImportError: If httpx package is not installed.
Example:
# Using explicit config
config = LLMConfig(
model="codellama",
base_url="http://localhost:11434",
temperature=0.5,
)
llm = OllamaLLM(config)
# Using kwargs for convenience
llm = OllamaLLM(model="mistral", temperature=0.7)
# Remote Ollama server
llm = OllamaLLM(
model="llama3",
base_url="http://remote-server:11434"
)
"""
if not HAS_HTTPX:
raise ImportError("httpx library required for Ollama. Install with: pip install httpx")
if config is None:
config = LLMConfig(
model=kwargs.pop("model", "llama2"),
base_url=kwargs.pop("base_url", "http://localhost:11434"),
timeout=kwargs.pop("timeout", 120.0),
**kwargs,
)
self.client = None
super().__init__(config)
def _initialize_client(self) -> None:
"""Initialize the async HTTP client for Ollama communication.
Creates an httpx.AsyncClient configured with the base URL and
timeout from the provider configuration. This method is called
automatically at the end of __init__.
Side Effects:
- Creates httpx.AsyncClient in self.client
- Calls _auto_fetch_model_info() to populate model metadata
Note:
The client uses the Ollama server URL from config.base_url
(defaults to http://localhost:11434) and config.timeout
(defaults to 120 seconds for long generations).
"""
self.client = httpx.AsyncClient(
base_url=self.config.base_url,
timeout=self.config.timeout,
)
self._auto_fetch_model_info()
[docs] async def generate_completion(
self,
prompt: str | list[dict[str, str]],
model: str | None = None,
temperature: float | None = None,
max_tokens: int | None = None,
top_p: float | None = None,
stop: list[str] | None = None,
stream: bool | None = None,
**kwargs,
) -> Any:
"""Generate a completion using the Ollama API.
Sends a prompt to the local Ollama server and returns the generated
response. Automatically selects the appropriate API endpoint based
on the prompt format:
- String prompts → /api/generate
- Message lists → /api/chat
Args:
prompt: The input for generation. Can be either:
- A string containing the prompt text
- A list of message dictionaries with 'role' and 'content'
keys for chat-style conversations
model: Optional model override. Uses config.model if not specified.
temperature: Sampling temperature override. Controls randomness
in the output (higher = more random).
max_tokens: Maximum number of tokens to generate (maps to
Ollama's num_predict option).
top_p: Nucleus sampling parameter override (0.0 to 1.0).
stop: List of sequences that will stop generation when encountered.
stream: Whether to stream the response. If True, returns an
async iterator yielding chunks as they are generated.
**kwargs: Additional Ollama-specific parameters. Supports:
- options: Dict of model options (merged with defaults)
- Any other parameters passed directly to the API
Returns:
If stream=False: A dictionary containing the complete response
with 'response' key (for /api/generate) or 'message' key
(for /api/chat).
If stream=True: An async iterator yielding response chunks
as dictionaries.
Raises:
RuntimeError: If the Ollama API request fails (server error,
timeout, connection refused, etc.).
Example:
# Simple text completion
response = await llm.generate_completion("Write a haiku about coding")
text = llm.extract_content(response)
# Chat-style with messages
messages = [
{"role": "system", "content": "You are a poet."},
{"role": "user", "content": "Write about the moon."},
]
response = await llm.generate_completion(messages)
# Streaming response
response = await llm.generate_completion("Tell a story", stream=True)
async for chunk in response:
print(llm.extract_content(chunk), end="", flush=True)
# With custom Ollama options
response = await llm.generate_completion(
"Hello",
options={"seed": 42, "repeat_penalty": 1.1}
)
"""
use_stream = stream if stream is not None else self.config.stream
if isinstance(prompt, list):
endpoint = "/api/chat"
payload = {
"model": model or self.config.model,
"messages": prompt,
"stream": use_stream,
"options": {
"temperature": temperature if temperature is not None else self.config.temperature,
"top_p": top_p if top_p is not None else self.config.top_p,
"num_predict": max_tokens if max_tokens is not None else self.config.max_tokens,
},
}
else:
endpoint = "/api/generate"
payload = {
"model": model or self.config.model,
"prompt": prompt,
"stream": use_stream,
"options": {
"temperature": temperature if temperature is not None else self.config.temperature,
"top_p": top_p if top_p is not None else self.config.top_p,
"num_predict": max_tokens if max_tokens is not None else self.config.max_tokens,
},
}
if stop or self.config.stop:
payload["options"]["stop"] = stop or self.config.stop
if self.config.top_k:
payload["options"]["top_k"] = self.config.top_k
if "options" in kwargs:
payload["options"].update(kwargs.pop("options"))
payload.update(kwargs)
try:
if use_stream:
return self._stream_completion(endpoint, payload)
else:
response = await self.client.post(endpoint, json=payload)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
raise RuntimeError(f"Ollama API request failed: {e}") from e
async def _stream_completion(self, endpoint: str, payload: dict) -> AsyncIterator[dict]:
"""Stream completion responses from the Ollama API.
Internal method that handles streaming HTTP requests to the Ollama
server. Parses the newline-delimited JSON (NDJSON) response format
used by Ollama's streaming endpoints.
Args:
endpoint: The Ollama API endpoint path (e.g., "/api/generate"
or "/api/chat").
payload: The request payload dictionary to send as JSON body,
containing model, prompt/messages, and options.
Yields:
Dictionaries parsed from each line of the streaming response.
For /api/generate: {"response": "text", "done": bool, ...}
For /api/chat: {"message": {"content": "text"}, "done": bool, ...}
Raises:
httpx.HTTPStatusError: If the server returns an error status code.
httpx.HTTPError: If the request fails for network reasons.
Note:
This method uses httpx's async streaming context manager to
efficiently process large responses without buffering the
entire response in memory.
"""
async with self.client.stream("POST", endpoint, json=payload) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line:
yield json.loads(line)
[docs] async def process_streaming_response(
self,
response: Any,
callback: Callable[[str, Any], None],
) -> str:
"""Process a streaming response from Ollama with callback support.
Iterates through the streaming response chunks from the Ollama API,
extracting text content from each chunk and invoking the provided
callback function. Accumulates and returns the complete response.
This method is useful for real-time display of generated content
or for implementing progress indicators during long generations.
Args:
response: An async iterator yielding response chunk dictionaries
from _stream_completion().
callback: A function called for each chunk containing text,
with two arguments:
- content (str): The text content in the current chunk
- chunk (dict): The raw chunk dictionary from Ollama API
Returns:
The complete accumulated text content from all chunks
concatenated together.
Example:
def on_chunk(content: str, chunk: dict) -> None:
print(content, end="", flush=True)
response = await llm.generate_completion("Tell a story", stream=True)
full_text = await llm.process_streaming_response(response, on_chunk)
print(f"\\nTotal length: {len(full_text)}")
"""
accumulated_content = ""
async for chunk in response:
if isinstance(chunk, dict):
if "message" in chunk:
content = chunk["message"].get("content", "")
else:
content = chunk.get("response", "")
if content:
accumulated_content += content
callback(content, chunk)
return accumulated_content
[docs] def stream_completion(
self,
response: Any,
agent: Any | None = None,
) -> Iterator[dict[str, Any]]:
"""Stream completion chunks with function call detection.
Processes a synchronous streaming response from the Ollama API,
yielding standardized chunk dictionaries compatible with the
Calute agent framework. Tracks accumulated content and detects
the final chunk via Ollama's "done" flag.
This method is used internally by agents to process streaming
responses while maintaining compatibility with other LLM providers.
Args:
response: A synchronous iterator yielding response chunk
dictionaries from the Ollama streaming API.
agent: Optional agent instance for function call detection.
Currently not used as Ollama has limited function calling
support, but provided for interface compatibility.
Yields:
Dictionary containing streaming chunk information:
- content (str | None): Text content in this chunk
- buffered_content (str): Accumulated content so far
- function_calls (list): Detected function calls (empty)
- tool_calls (Any): Raw tool call data (None)
- raw_chunk (dict): The original Ollama chunk dictionary
- is_final (bool): True when Ollama's "done" flag is True
Example:
response = await llm.generate_completion("Hello", stream=True)
for chunk in llm.stream_completion(response):
if chunk["content"]:
print(chunk["content"], end="")
if chunk["is_final"]:
print("\\n--- Generation complete ---")
"""
buffered_content = ""
for chunk in response:
chunk_data = {
"content": None,
"buffered_content": buffered_content,
"function_calls": [],
"tool_calls": None,
"raw_chunk": chunk,
"is_final": False,
}
if isinstance(chunk, dict):
if "message" in chunk:
content = chunk["message"].get("content", "")
else:
content = chunk.get("response", "")
if content:
buffered_content += content
chunk_data["content"] = content
chunk_data["buffered_content"] = buffered_content
if chunk.get("done", False):
chunk_data["is_final"] = True
yield chunk_data
[docs] async def astream_completion(
self,
response: Any,
agent: Any | None = None,
) -> AsyncIterator[dict[str, Any]]:
"""Asynchronously stream completion chunks with function call detection.
Processes an asynchronous streaming response from the Ollama API,
yielding standardized chunk dictionaries compatible with the
Calute agent framework. This is the async counterpart to
stream_completion().
This method enables non-blocking streaming of responses, allowing
other async operations to proceed while waiting for chunks from
the Ollama server.
Args:
response: An asynchronous iterator yielding response chunk
dictionaries from _stream_completion().
agent: Optional agent instance for function call detection.
Currently not used as Ollama has limited function calling
support, but provided for interface compatibility.
Yields:
Dictionary containing streaming chunk information:
- content (str | None): Text content in this chunk
- buffered_content (str): Accumulated content so far
- function_calls (list): Detected function calls (empty)
- tool_calls (Any): Raw tool call data (None)
- raw_chunk (dict): The original Ollama chunk dictionary
- is_final (bool): True when Ollama's "done" flag is True
Example:
response = await llm.generate_completion("Hello", stream=True)
async for chunk in llm.astream_completion(response):
if chunk["content"]:
print(chunk["content"], end="", flush=True)
"""
buffered_content = ""
async for chunk in response:
chunk_data = {
"content": None,
"buffered_content": buffered_content,
"function_calls": [],
"tool_calls": None,
"raw_chunk": chunk,
"is_final": False,
}
if isinstance(chunk, dict):
if "message" in chunk:
content = chunk["message"].get("content", "")
else:
content = chunk.get("response", "")
if content:
buffered_content += content
chunk_data["content"] = content
chunk_data["buffered_content"] = buffered_content
if chunk.get("done", False):
chunk_data["is_final"] = True
yield chunk_data
[docs] def fetch_model_info(self) -> dict[str, Any]:
"""Fetch model metadata from the Ollama /api/show endpoint.
Queries the Ollama server for detailed information about the
configured model, including context length, parameter count,
model family, and quantization level.
The method is called automatically during client initialization
via _auto_fetch_model_info() to populate config.max_model_len
and config.model_metadata.
Returns:
A dictionary containing model metadata:
- max_model_len (int | None): Maximum context length in tokens
- parameter_size (str | None): Model parameter count (e.g., "7B")
- family (str | None): Model family (e.g., "llama")
- quantization_level (str | None): Quantization type (e.g., "Q4_0")
Returns an empty dictionary if the model info cannot be
fetched (e.g., model not found, server not running).
Note:
This method uses a synchronous HTTP client with a short timeout
to avoid blocking async initialization. Context length is
looked up in multiple possible locations in the response.
"""
try:
with httpx.Client(base_url=self.config.base_url, timeout=10.0) as client:
resp = client.post("/api/show", json={"name": self.config.model})
if resp.status_code == 200:
data = resp.json()
model_info = data.get("model_info", {})
details = data.get("details", {})
context_len = (
model_info.get("context_length")
or model_info.get("llama.context_length")
or model_info.get("num_ctx")
)
return {
"max_model_len": context_len,
"parameter_size": details.get("parameter_size"),
"family": details.get("family"),
"quantization_level": details.get("quantization_level"),
}
except Exception:
pass
return {}
[docs] async def close(self) -> None:
"""Close the HTTP client and release resources.
Properly closes the async HTTP client connection to the Ollama
server. This method should be called when done using the LLM
provider to ensure clean resource cleanup.
This method is called automatically when using the provider
as an async context manager:
Example:
async with OllamaLLM(model="llama3") as llm:
response = await llm.generate_completion("Hello")
# client is automatically closed here
Note:
Safe to call multiple times; will only close if client exists.
"""
if self.client:
await self.client.aclose()
[docs]class LocalLLM(OllamaLLM):
"""Alias for OllamaLLM for backward compatibility.
LocalLLM is a convenience alias that points to OllamaLLM. It provides
backward compatibility for code that used the LocalLLM name before
the rename to the more specific OllamaLLM.
All functionality is identical to OllamaLLM. New code should prefer
using OllamaLLM directly.
Example:
# These are equivalent:
llm1 = LocalLLM(model="llama3")
llm2 = OllamaLLM(model="llama3")
See Also:
OllamaLLM: The primary class for Ollama integration.
"""