# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Data processing and manipulation tools for Calute agents.
This module provides a comprehensive set of data processing tools
for the Calute framework. It includes:
- JSON data processing with load, save, query, and validation operations
- CSV file processing with read, write, analyze, and convert capabilities
- Advanced text processing with statistics, extraction, and formatting
- Data format conversion between JSON, YAML, Base64, Hex, and hashes
- Date and time processing with parsing, formatting, and delta calculations
Each tool is implemented as a class inheriting from AgentBaseFn,
making them directly usable as agent tools for data manipulation tasks.
Example:
>>> processor = JSONProcessor()
>>> result = processor(operation="load", file_path="data.json")
>>> print(result["data"])
"""
from __future__ import annotations
import base64
import csv
import hashlib
import json
import re
from datetime import datetime, timedelta
from typing import Any
from ..types import AgentBaseFn
[docs]class JSONProcessor(AgentBaseFn):
"""JSON data processing and manipulation tool.
Provides operations for loading, saving, validating, querying,
and transforming JSON data. Supports both file-based and in-memory
JSON operations with simple dot-notation queries.
Supported operations:
load: Load JSON data from a file.
save: Save JSON data to a file.
validate: Check if data is valid JSON.
query: Extract data using dot-notation paths (e.g., "user.name").
transform: Get metadata and formatted output of JSON data.
"""
[docs] @staticmethod
def static_call(
operation: str,
data: Any = None,
file_path: str | None = None,
query: str | None = None,
pretty: bool = True,
**context_variables,
) -> dict[str, Any]:
"""Process JSON data with various operations.
Performs load, save, validate, query, or transform operations on
JSON data. Supports both file-based and in-memory JSON manipulation.
Args:
operation: The operation to perform. Options:
- "load": Load JSON from a file. Requires ``file_path``.
- "save": Save data to a JSON file. Requires ``file_path``
and ``data``.
- "validate": Check if ``data`` is valid JSON (accepts
both string and object inputs).
- "query": Extract a value from ``data`` using dot-notation
paths (e.g., "user.name", "items[0].id"). Requires
``query`` and ``data``.
- "transform": Get metadata about ``data`` including type,
keys, length, and optionally pretty-printed output.
data: The JSON data to process. Can be a Python dict/list or
a JSON string (for validate). Required for save, validate,
query, and transform operations.
file_path: Path to the JSON file for load/save operations.
query: Dot-notation query path for data extraction. Supports
bracket notation for array indexing (e.g., "items[0]").
pretty: Whether to use indented formatting when saving or
transforming JSON. Defaults to True.
**context_variables: Runtime context from the agent (unused).
Returns:
A dictionary containing operation-specific results:
For "load": data, success.
For "save": success, file_path.
For "validate": valid (bool), error (if invalid).
For "query": result (extracted value).
For "transform": keys, type, length, formatted (if pretty).
- error (str): Error message if the operation failed.
Example:
>>> result = JSONProcessor.static_call("validate", data='{"key": 1}')
>>> print(result["valid"])
True
"""
result = {}
if operation == "load":
if not file_path:
return {"error": "file_path required for load operation"}
try:
with open(file_path, "r") as f:
result["data"] = json.load(f)
result["success"] = True
except Exception as e:
return {"error": f"Failed to load JSON: {e!s}"}
elif operation == "save":
if not file_path or data is None:
return {"error": "file_path and data required for save operation"}
try:
with open(file_path, "w") as f:
json.dump(data, f, indent=2 if pretty else None)
result["success"] = True
result["file_path"] = file_path
except Exception as e:
return {"error": f"Failed to save JSON: {e!s}"}
elif operation == "validate":
try:
if isinstance(data, str):
json.loads(data)
else:
json.dumps(data)
result["valid"] = True
except Exception as e:
result["valid"] = False
result["error"] = str(e)
elif operation == "query":
if not query or data is None:
return {"error": "query and data required for query operation"}
try:
parts = query.split(".")
current = data
for part in parts:
if "[" in part and "]" in part:
key = part[: part.index("[")]
index = int(part[part.index("[") + 1 : part.index("]")])
current = current[key][index] if key else current[index]
else:
current = current[part]
result["result"] = current
except Exception as e:
return {"error": f"Query failed: {e!s}"}
elif operation == "transform":
if data:
result["keys"] = list(data.keys()) if isinstance(data, dict) else None
result["type"] = type(data).__name__
result["length"] = len(data) if hasattr(data, "__len__") else None
if pretty:
result["formatted"] = json.dumps(data, indent=2)
else:
return {"error": f"Unknown operation: {operation}"}
return result
[docs]class CSVProcessor(AgentBaseFn):
"""CSV data processing and manipulation tool.
Provides operations for reading, writing, analyzing, and converting
CSV files. Supports custom delimiters, headers, and row limits.
Supported operations:
read: Read CSV file into a list of dictionaries.
write: Write list of dictionaries to a CSV file.
analyze: Get statistics about a CSV file structure.
convert: Convert CSV data to JSON format.
"""
[docs] @staticmethod
def static_call(
operation: str,
file_path: str | None = None,
data: list[dict] | None = None,
delimiter: str = ",",
headers: list[str] | None = None,
has_header: bool = True,
max_rows: int | None = None,
**context_variables,
) -> dict[str, Any]:
"""Process CSV data with various operations.
Performs read, write, analyze, or convert operations on CSV files.
Supports custom delimiters, header configuration, and row limits.
Args:
operation: The operation to perform. Options:
- "read": Read a CSV file into a list of dictionaries.
Requires ``file_path``.
- "write": Write a list of dictionaries to a CSV file.
Requires ``file_path`` and ``data``.
- "analyze": Get structural statistics about a CSV file
including row/column counts, headers, sample data, and
empty cell count. Requires ``file_path``.
- "convert": Convert a CSV file to a list of JSON-like
dictionaries. Requires ``file_path``.
file_path: Path to the CSV file for read/write/analyze/convert.
data: List of dictionaries to write. Each dict represents a row
with column names as keys. Required for the "write" operation.
delimiter: Column delimiter character. Defaults to ",".
headers: Explicit column headers. For "write", used as fieldnames;
if not provided, inferred from the first data dict. For "read"
with ``has_header=False``, used as the column names.
has_header: Whether the CSV file's first row is a header row.
If False and no ``headers`` are provided, columns are
auto-named as "col_0", "col_1", etc. Defaults to True.
max_rows: Maximum number of rows to read. None reads all rows.
**context_variables: Runtime context from the agent (unused).
Returns:
A dictionary containing operation-specific results:
For "read": data (list[dict]), count (int), columns (list[str]).
For "write": success (bool), rows_written (int), file_path (str).
For "analyze": total_rows, total_columns, headers, sample_data,
empty_cells.
For "convert": json (list[dict]), count (int).
- error (str): Error message if the operation failed.
Example:
>>> result = CSVProcessor.static_call("read", file_path="data.csv", max_rows=5)
>>> print(result["count"])
5
"""
result = {}
if operation == "read":
if not file_path:
return {"error": "file_path required for read operation"}
try:
rows = []
with open(file_path, "r", newline="", encoding="utf-8") as f:
fieldnames = None
if not has_header:
if headers:
fieldnames = headers
else:
first_line = f.readline()
col_count = len(first_line.split(delimiter))
fieldnames = [f"col_{i}" for i in range(col_count)]
f.seek(0)
reader = csv.DictReader(f, fieldnames=fieldnames, delimiter=delimiter)
for i, row in enumerate(reader):
if max_rows and i >= max_rows:
break
rows.append(row)
result["data"] = rows
result["count"] = len(rows)
if rows:
result["columns"] = list(rows[0].keys())
except Exception as e:
return {"error": f"Failed to read CSV: {e!s}"}
elif operation == "write":
if not file_path or not data:
return {"error": "file_path and data required for write operation"}
try:
if not headers and data:
headers = list(data[0].keys())
with open(file_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=headers, delimiter=delimiter)
writer.writeheader()
writer.writerows(data)
result["success"] = True
result["rows_written"] = len(data)
result["file_path"] = file_path
except Exception as e:
return {"error": f"Failed to write CSV: {e!s}"}
elif operation == "analyze":
if not file_path:
return {"error": "file_path required for analyze operation"}
try:
with open(file_path, "r", newline="", encoding="utf-8") as f:
reader = csv.reader(f, delimiter=delimiter)
rows = list(reader)
result["total_rows"] = len(rows)
result["total_columns"] = len(rows[0]) if rows else 0
if rows:
result["headers"] = rows[0]
result["sample_data"] = rows[1 : min(6, len(rows))]
result["empty_cells"] = sum(1 for row in rows[1:] for cell in row if not cell.strip())
except Exception as e:
return {"error": f"Failed to analyze CSV: {e!s}"}
elif operation == "convert":
if not file_path:
return {"error": "file_path required for convert operation"}
try:
rows = []
with open(file_path, "r", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f, delimiter=delimiter)
rows = list(reader)
result["json"] = rows
result["count"] = len(rows)
except Exception as e:
return {"error": f"Failed to convert CSV: {e!s}"}
else:
return {"error": f"Unknown operation: {operation}"}
return result
[docs]class TextProcessor(AgentBaseFn):
"""Advanced text processing and manipulation tool.
Provides operations for analyzing, cleaning, extracting patterns,
replacing content, and formatting text. Supports regular expressions
for pattern matching and extraction.
Supported operations:
stats: Get text statistics (length, word count, character frequency).
clean: Remove extra whitespace and optionally matched patterns.
extract: Extract patterns like emails, URLs, phone numbers, or custom regex.
replace: Replace patterns in text using regex.
split: Split text by pattern or whitespace.
format: Apply formatting (title, upper, lower, sentence case).
"""
[docs] @staticmethod
def static_call(
text: str,
operation: str,
pattern: str | None = None,
replacement: str | None = None,
case_sensitive: bool = True,
**context_variables,
) -> dict[str, Any]:
"""Process text with various operations.
Applies the specified text processing operation, ranging from
statistical analysis to pattern-based extraction and formatting.
Args:
text: The input text to process.
operation: The operation to perform. Options:
- "stats": Compute text statistics including length, word
count, line count, character frequency, and word frequency.
- "clean": Remove extra whitespace and optionally remove
content matching ``pattern``.
- "extract": Extract patterns from text. ``pattern`` can be
a named shortcut ("emails", "urls", "phones", "numbers")
or a custom regular expression.
- "replace": Replace occurrences of ``pattern`` in text with
``replacement``. Uses regex matching.
- "split": Split text by ``pattern`` (regex) or by
whitespace if no pattern is given.
- "format": Apply text formatting. ``pattern`` specifies the
format: "title", "upper", "lower", "sentence", or
"no_punctuation".
pattern: Regex pattern or named shortcut for extract/replace/split/
format operations. Required for "extract" and "replace".
replacement: Replacement string for the "replace" operation.
Defaults to empty string if None.
case_sensitive: Whether pattern matching is case-sensitive.
Defaults to True.
**context_variables: Runtime context from the agent (unused).
Returns:
A dictionary containing operation-specific results:
For "stats": length, words, lines, characters_no_spaces,
most_common_chars, most_common_words.
For "clean": cleaned_text, original_length, cleaned_length.
For "extract": matches (list[str]), count (int).
For "replace": replaced_text, replacements_made (int).
For "split": parts (list[str]), count (int).
For "format": formatted_text (str).
- error (str): Error message if the operation failed.
Example:
>>> result = TextProcessor.static_call("Hello World!", "stats")
>>> print(result["words"])
2
"""
result = {}
if operation == "stats":
result["length"] = len(text)
result["words"] = len(text.split())
result["lines"] = len(text.splitlines())
result["characters_no_spaces"] = len(text.replace(" ", "").replace("\n", "").replace("\t", ""))
char_freq = {}
for char in text.lower():
if char.isalpha():
char_freq[char] = char_freq.get(char, 0) + 1
result["most_common_chars"] = sorted(char_freq.items(), key=lambda x: x[1], reverse=True)[:5]
words = re.findall(r"\b\w+\b", text.lower())
word_freq = {}
for word in words:
word_freq[word] = word_freq.get(word, 0) + 1
result["most_common_words"] = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:10]
elif operation == "clean":
cleaned = text
cleaned = re.sub(r"\s+", " ", cleaned)
if pattern:
cleaned = re.sub(pattern, "", cleaned)
cleaned = cleaned.strip()
result["cleaned_text"] = cleaned
result["original_length"] = len(text)
result["cleaned_length"] = len(cleaned)
elif operation == "extract":
if not pattern:
return {"error": "pattern required for extract operation"}
flags = 0 if case_sensitive else re.IGNORECASE
if pattern == "emails":
pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
elif pattern == "urls":
pattern = r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
elif pattern == "phones":
pattern = r"[\+]?[(]?[0-9]{1,4}[)]?[-\s\.]?[(]?[0-9]{1,4}[)]?[-\s\.]?[0-9]{1,4}[-\s\.]?[0-9]{1,9}"
elif pattern == "numbers":
pattern = r"-?\d+\.?\d*"
matches = re.findall(pattern, text, flags)
result["matches"] = matches
result["count"] = len(matches)
elif operation == "replace":
if not pattern:
return {"error": "pattern required for replace operation"}
if replacement is None:
replacement = ""
flags = 0 if case_sensitive else re.IGNORECASE
replaced = re.sub(pattern, replacement, text, flags=flags)
result["replaced_text"] = replaced
result["replacements_made"] = len(re.findall(pattern, text, flags))
elif operation == "split":
if pattern:
parts = re.split(pattern, text)
else:
parts = text.split()
result["parts"] = parts
result["count"] = len(parts)
elif operation == "format":
formatted = text
if pattern == "title":
formatted = text.title()
elif pattern == "upper":
formatted = text.upper()
elif pattern == "lower":
formatted = text.lower()
elif pattern == "sentence":
formatted = ". ".join(s.capitalize() for s in text.split(". "))
elif pattern == "no_punctuation":
formatted = re.sub(r"[^\w\s]", "", text)
result["formatted_text"] = formatted
else:
return {"error": f"Unknown operation: {operation}"}
return result
[docs]class DataConverter(AgentBaseFn):
"""Convert data between different formats.
Provides conversion between various data formats including
JSON, YAML, Base64, hexadecimal, and cryptographic hashes.
Supports bidirectional conversion where applicable.
Supported formats:
json: JSON string format.
yaml: YAML format (requires PyYAML).
base64: Base64 encoded string.
hex: Hexadecimal string representation.
hash: Generate MD5, SHA1, SHA256, and SHA512 hashes (output only).
"""
[docs] @staticmethod
def static_call(
data: Any,
from_format: str,
to_format: str,
encoding: str = "utf-8",
**context_variables,
) -> dict[str, Any]:
"""Convert data between different formats.
First parses the input data from the source format into an
intermediate Python object, then serializes it to the target format.
Args:
data: Input data to convert. Can be a string (for json, yaml,
base64, hex source formats) or a Python object (dict, list).
from_format: Source format of the data. Options:
- "json": JSON string or Python dict/list.
- "yaml": YAML string or Python object. Requires PyYAML.
- "base64": Base64-encoded string.
- "hex": Hexadecimal-encoded string.
to_format: Target format to convert to. Options:
- "json": Pretty-printed JSON string.
- "yaml": YAML string. Requires PyYAML.
- "base64": Base64-encoded string.
- "hex": Hexadecimal string.
- "hash": Dictionary of cryptographic hashes (MD5, SHA1,
SHA256, SHA512). Output only.
encoding: Character encoding for encoding/decoding operations.
Defaults to "utf-8".
**context_variables: Runtime context from the agent (unused).
Returns:
A dictionary containing:
- output: The converted data in the target format. For
"hash" target, this is a dict with md5, sha1, sha256,
and sha512 hex digest strings.
- success (bool): True if conversion succeeded.
- error (str): Error message if the conversion failed.
Example:
>>> result = DataConverter.static_call(
... '{"key": "value"}', from_format="json", to_format="base64"
... )
>>> print(result["success"])
True
"""
result = {}
try:
parsed_data = None
if from_format == "json":
if isinstance(data, str):
parsed_data = json.loads(data)
else:
parsed_data = data
elif from_format == "yaml":
try:
import yaml
if isinstance(data, str):
parsed_data = yaml.safe_load(data)
else:
parsed_data = data
except ImportError:
return {"error": "PyYAML required for YAML operations"}
elif from_format == "base64":
if isinstance(data, str):
parsed_data = base64.b64decode(data).decode(encoding)
else:
return {"error": "Base64 input must be string"}
elif from_format == "hex":
if isinstance(data, str):
parsed_data = bytes.fromhex(data).decode(encoding)
else:
return {"error": "Hex input must be string"}
else:
parsed_data = data
if to_format == "json":
result["output"] = json.dumps(parsed_data, indent=2)
elif to_format == "yaml":
try:
import yaml
result["output"] = yaml.dump(parsed_data, default_flow_style=False)
except ImportError:
return {"error": "PyYAML required for YAML operations"}
elif to_format == "base64":
if isinstance(parsed_data, str):
result["output"] = base64.b64encode(parsed_data.encode(encoding)).decode("ascii")
else:
result["output"] = base64.b64encode(json.dumps(parsed_data).encode(encoding)).decode("ascii")
elif to_format == "hex":
if isinstance(parsed_data, str):
result["output"] = parsed_data.encode(encoding).hex()
else:
result["output"] = json.dumps(parsed_data).encode(encoding).hex()
elif to_format == "hash":
if not isinstance(parsed_data, str):
parsed_data = json.dumps(parsed_data)
data_bytes = parsed_data.encode(encoding)
result["output"] = {
"md5": hashlib.md5(data_bytes).hexdigest(),
"sha1": hashlib.sha1(data_bytes).hexdigest(),
"sha256": hashlib.sha256(data_bytes).hexdigest(),
"sha512": hashlib.sha512(data_bytes).hexdigest(),
}
else:
return {"error": f"Unknown target format: {to_format}"}
result["success"] = True
except Exception as e:
return {"error": f"Conversion failed: {e!s}"}
return result
[docs]class DateTimeProcessor(AgentBaseFn):
"""Date and time processing utilities.
Provides operations for parsing, formatting, and manipulating
dates and times. Supports multiple date formats and time delta
calculations.
Supported operations:
now: Get current date and time in various formats.
parse: Parse a date string into components.
delta: Add or subtract time from a date.
format: Format a date in various output styles.
"""
[docs] @staticmethod
def static_call(
operation: str,
date_string: str | None = None,
format: str | None = None, # noqa: A002
timezone: str | None = None,
delta_days: int = 0,
delta_hours: int = 0,
delta_minutes: int = 0,
**context_variables,
) -> dict[str, Any]:
"""Process dates and times with various operations.
Provides operations for getting the current time, parsing date
strings, computing time deltas, and formatting dates in various
output styles.
Args:
operation: The operation to perform. Options:
- "now": Get current date and time in multiple formats.
- "parse": Parse a date string into components. Tries
common formats automatically; use ``format`` for a
specific strptime format. Falls back to dateutil if
available.
- "delta": Add or subtract time from a date. Uses
``date_string`` as the base (defaults to now).
- "format": Format a date in various output styles. Uses
``date_string`` as input (defaults to now). If ``format``
is provided, uses it as a strftime pattern; otherwise
returns all common formats.
date_string: Date string to parse, use as base for delta, or
format. Expected to be in ISO format for delta/format
operations. For parse, accepts many common formats.
format: Explicit strftime/strptime format pattern. For "parse",
used as the preferred parsing format. For "format", used as
the output format pattern.
timezone: Timezone name. Currently reserved for future use.
delta_days: Number of days to add (positive) or subtract
(negative) from the base date. Defaults to 0.
delta_hours: Number of hours to add or subtract. Defaults to 0.
delta_minutes: Number of minutes to add or subtract. Defaults to 0.
**context_variables: Runtime context from the agent (unused).
Returns:
A dictionary containing operation-specific results:
For "now": datetime (ISO), timestamp, formatted (dict with
date, time, datetime, iso, human keys).
For "parse": parsed (ISO), timestamp, components (dict with
year, month, day, hour, minute, second, weekday).
For "delta": original (ISO), new (ISO), delta (dict with
days, hours, minutes, total_seconds).
For "format": formats (dict of format name to value) or
formatted (str) when a specific format is provided.
- error (str): Error message if the operation failed.
Example:
>>> result = DateTimeProcessor.static_call("parse", date_string="2024-01-15")
>>> print(result["components"]["weekday"])
'Monday'
"""
result = {}
if operation == "now":
now = datetime.now()
result["datetime"] = now.isoformat()
result["timestamp"] = now.timestamp()
result["formatted"] = {
"date": now.strftime("%Y-%m-%d"),
"time": now.strftime("%H:%M:%S"),
"datetime": now.strftime("%Y-%m-%d %H:%M:%S"),
"iso": now.isoformat(),
"human": now.strftime("%B %d, %Y at %I:%M %p"),
}
elif operation == "parse":
if not date_string:
return {"error": "date_string required for parse operation"}
try:
formats = [
"%Y-%m-%d",
"%Y-%m-%d %H:%M:%S",
"%Y/%m/%d",
"%d/%m/%Y",
"%m/%d/%Y",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%SZ",
]
if format:
formats.insert(0, format)
parsed_date = None
for fmt in formats:
try:
parsed_date = datetime.strptime(date_string, fmt)
break
except Exception:
continue
if not parsed_date:
try:
from dateutil import parser # type:ignore
parsed_date = parser.parse(date_string)
except Exception:
return {"error": "Could not parse date string"}
result["parsed"] = parsed_date.isoformat()
result["timestamp"] = parsed_date.timestamp()
result["components"] = {
"year": parsed_date.year,
"month": parsed_date.month,
"day": parsed_date.day,
"hour": parsed_date.hour,
"minute": parsed_date.minute,
"second": parsed_date.second,
"weekday": parsed_date.strftime("%A"),
}
except Exception as e:
return {"error": f"Failed to parse date: {e!s}"}
elif operation == "delta":
base_date = datetime.now()
if date_string:
try:
base_date = datetime.fromisoformat(date_string)
except Exception:
return {"error": "Invalid date_string for delta operation"}
delta = timedelta(days=delta_days, hours=delta_hours, minutes=delta_minutes)
new_date = base_date + delta
result["original"] = base_date.isoformat()
result["new"] = new_date.isoformat()
result["delta"] = {
"days": delta_days,
"hours": delta_hours,
"minutes": delta_minutes,
"total_seconds": delta.total_seconds(),
}
elif operation == "format":
if not date_string:
date_string = datetime.now().isoformat()
try:
dt = datetime.fromisoformat(date_string.replace("Z", "+00:00"))
if not format:
result["formats"] = {
"iso": dt.isoformat(),
"date": dt.strftime("%Y-%m-%d"),
"time": dt.strftime("%H:%M:%S"),
"us": dt.strftime("%m/%d/%Y"),
"eu": dt.strftime("%d/%m/%Y"),
"human": dt.strftime("%B %d, %Y at %I:%M %p"),
"short": dt.strftime("%b %d, %Y"),
"timestamp": dt.timestamp(),
}
else:
result["formatted"] = dt.strftime(format)
except Exception as e:
return {"error": f"Failed to format date: {e!s}"}
else:
return {"error": f"Unknown operation: {operation}"}
return result