Source code for calute.extensions.dependency

# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Dependency resolution for Calute plugins and skills.

Provides version constraint parsing, dependency specification, and
topological sorting with circular dependency detection for local
plugin/skill dependency graphs.
"""

from __future__ import annotations

import re
from dataclasses import dataclass, field


def _parse_version_tuple(version: str, *, pad: bool = True) -> tuple[int, ...]:
    """Parse a version string into a tuple of integers.

    Splits a dotted version string (e.g., ``'1.2.3'``) on ``'.'`` and converts
    each segment to an integer. Non-numeric trailing segments are silently
    dropped.  When *pad* is ``True`` the tuple is zero-padded to at least
    three elements so that ``'1.2'`` becomes ``(1, 2, 0)``.

    Args:
        version: A dotted version string such as ``'1.2.3'`` or ``'2.0'``.
        pad: If ``True`` (the default), pad the result with zeros so it
            contains at least three elements.

    Returns:
        A tuple of integers representing the parsed version components.

    Example:
        >>> _parse_version_tuple("1.2.3")
        (1, 2, 3)
        >>> _parse_version_tuple("1.2", pad=False)
        (1, 2)
    """
    parts = []
    for part in version.strip().split("."):
        try:
            parts.append(int(part))
        except ValueError:
            break
    if pad:
        while len(parts) < 3:
            parts.append(0)
    return tuple(parts)


[docs]class VersionConstraint: """Parse and evaluate PEP 440-style version constraint strings. A ``VersionConstraint`` encapsulates one or more comparison operators applied to version strings. Supported operators are ``==``, ``!=``, ``>=``, ``<=``, ``>``, ``<``, and ``~=`` (compatible release). Multiple constraints can be combined with commas (e.g., ``">=1.0,<2.0"``), in which case *all* constraints must be satisfied. Attributes: raw: The original constraint string passed to the constructor. Example: >>> vc = VersionConstraint(">=1.0.0") >>> vc.satisfies("1.2.3") True >>> vc = VersionConstraint(">=1.0,<2.0") >>> vc.satisfies("2.0.0") False >>> vc = VersionConstraint("~=1.2") >>> vc.satisfies("1.3.0") True """ _OP_PATTERN = re.compile(r"^\s*(~=|==|!=|>=|<=|>|<)\s*(.+)\s*$") def __init__(self, constraint_str: str) -> None: """Initialize a VersionConstraint by parsing the constraint string. The constraint string is split on commas and each segment is parsed into an ``(operator, version_tuple, raw_parts)`` triple stored in ``self._constraints``. If a segment contains no recognized operator the ``==`` operator is assumed. Args: constraint_str: A version constraint expression such as ``">=1.0.0"``, ``">=1.0,<2.0"``, or ``"~=1.2"``. An empty string produces a constraint that matches any version. """ self.raw = constraint_str.strip() self._constraints: list[tuple[str, tuple[int, ...], int]] = [] if not self.raw: return for part in self.raw.split(","): part = part.strip() if not part: continue m = self._OP_PATTERN.match(part) if m: op = m.group(1) raw_ver = _parse_version_tuple(m.group(2), pad=False) ver = _parse_version_tuple(m.group(2)) self._constraints.append((op, ver, len(raw_ver))) else: raw_ver = _parse_version_tuple(part, pad=False) self._constraints.append(("==", _parse_version_tuple(part), len(raw_ver)))
[docs] def satisfies(self, version: str) -> bool: """Check whether *version* satisfies every constraint. If no constraints were parsed (e.g., the constraint string was empty), any version is considered satisfying. Args: version: A dotted version string to test (e.g., ``"1.2.3"``). Returns: ``True`` if *version* satisfies all individual constraints, ``False`` otherwise. """ if not self._constraints: return True ver = _parse_version_tuple(version) for op, target, raw_parts in self._constraints: if not self._check(op, ver, target, raw_parts): return False return True
@staticmethod def _check(op: str, ver: tuple[int, ...], target: tuple[int, ...], raw_parts: int = 3) -> bool: """Evaluate a single version comparison operation. For the ``~=`` (compatible release) operator the check ensures that *ver* is at least *target* and shares the same prefix up to the second-to-last specified segment (e.g., ``~=1.2`` allows ``>=1.2,<2.0``). Args: op: The comparison operator string (one of ``==``, ``!=``, ``>=``, ``<=``, ``>``, ``<``, ``~=``). ver: The version being tested, as a tuple of ints. target: The constraint version to compare against. raw_parts: The number of segments originally specified in the constraint (before zero-padding). Used by ``~=`` to determine the prefix length. Returns: ``True`` if the comparison holds, ``False`` otherwise. Returns ``None`` implicitly if *op* is unrecognized (should not happen in normal use). """ if op == "==": return ver == target if op == "!=": return ver != target if op == ">=": return ver >= target if op == "<=": return ver <= target if op == ">": return ver > target if op == "<": return ver < target if op == "~=": if ver < target: return False raw_target = list(target[:raw_parts]) prefix = raw_target[:-1] if prefix: prefix[-1] += 1 upper = tuple(prefix) + (0,) * (len(target) - len(prefix)) return ver < upper return True def __repr__(self) -> str: """Return a developer-friendly string representation. Returns: A string of the form ``VersionConstraint('<raw>')``. """ return f"VersionConstraint({self.raw!r})"
[docs]@dataclass class DependencySpec: """A single dependency requirement consisting of a name and optional version constraint. Attributes: name: The identifier of the required dependency (e.g., ``"my_plugin"``). version_constraint: An optional version constraint string such as ``">=1.0.0"`` or ``"~=2.1"``. ``None`` means any version is acceptable. Example: >>> spec = DependencySpec(name="my_plugin", version_constraint=">=1.0") >>> spec.to_version_constraint().satisfies("1.5.0") True """ name: str version_constraint: str | None = None
[docs] def to_version_constraint(self) -> VersionConstraint: """Convert the version constraint string into a ``VersionConstraint`` object. Returns: A ``VersionConstraint`` parsed from :attr:`version_constraint`. If :attr:`version_constraint` is ``None``, an unconstrained (always-satisfied) ``VersionConstraint`` is returned. """ return VersionConstraint(self.version_constraint or "")
[docs]def parse_dependency(dep_str: str) -> DependencySpec: """Parse a dependency string into a ``DependencySpec``. The string is expected to start with an alphanumeric-plus-hyphens/underscores name, optionally followed by a version constraint expression. If no constraint is present, ``version_constraint`` on the returned spec will be ``None``. Args: dep_str: A dependency declaration such as ``"my_plugin>=1.0.0"`` or plain ``"my_plugin"``. Returns: A ``DependencySpec`` with the parsed ``name`` and optional ``version_constraint``. Example: >>> parse_dependency("my_plugin>=1.0.0") DependencySpec(name='my_plugin', version_constraint='>=1.0.0') >>> parse_dependency("my_plugin") DependencySpec(name='my_plugin', version_constraint=None) """ dep_str = dep_str.strip() m = re.match(r"^([A-Za-z0-9_\-]+)(.*)", dep_str) if m: name = m.group(1) constraint = m.group(2).strip() return DependencySpec(name=name, version_constraint=constraint if constraint else None) return DependencySpec(name=dep_str, version_constraint=None)
[docs]@dataclass class ResolveResult: """Result of a dependency resolution pass. Attributes: satisfied: ``True`` when all requirements are met with no missing dependencies and no version conflicts. missing: Names of required packages that were not found in the available set. conflicts: Human-readable descriptions of version mismatches (e.g., ``"foo: requires >=2.0, found 1.3.0"``). resolution_order: An ordered list of package names representing a safe loading sequence (populated when topological sorting is performed separately). """ satisfied: bool missing: list[str] = field(default_factory=list) conflicts: list[str] = field(default_factory=list) resolution_order: list[str] = field(default_factory=list)
[docs]class CircularDependencyError(Exception): """Raised when a circular dependency is detected during topological sorting. Attributes: cycle: An ordered list of node names that form the cycle. The first and last elements are identical, illustrating the loop (e.g., ``["A", "B", "C", "A"]``). """ def __init__(self, cycle: list[str]) -> None: """Initialize with the detected dependency cycle path. Args: cycle: The list of node names forming the cycle. The last element should repeat the first to close the loop. """ self.cycle = cycle super().__init__(f"Circular dependency detected: {' -> '.join(cycle)}")
[docs]class DependencyResolver: """Resolve a dependency graph, detecting missing deps, version conflicts, and circular dependencies, and produce a topological load order. The resolver provides two main operations: * :meth:`resolve` -- checks that a flat list of ``DependencySpec`` requirements are met by a set of available packages with known versions. * :meth:`topological_sort` -- orders nodes in a directed acyclic graph so that every dependency appears before the nodes that depend on it. Example: >>> resolver = DependencyResolver() >>> available = {"A": "1.0.0", "B": "2.0.0"} >>> reqs = [DependencySpec("A", ">=1.0")] >>> result = resolver.resolve(available, reqs) >>> result.satisfied True """
[docs] def resolve( self, available: dict[str, str], requirements: list[DependencySpec], ) -> ResolveResult: """Check that all *requirements* are met by *available* packages. Args: available: Mapping of package name to version string. requirements: List of dependency specifications to check. Returns: A ResolveResult with satisfaction status and details. """ missing: list[str] = [] conflicts: list[str] = [] for req in requirements: if req.name not in available: missing.append(req.name) elif req.version_constraint: vc = req.to_version_constraint() actual_version = available[req.name] if not vc.satisfies(actual_version): conflicts.append(f"{req.name}: requires {req.version_constraint}, found {actual_version}") satisfied = not missing and not conflicts return ResolveResult( satisfied=satisfied, missing=missing, conflicts=conflicts, resolution_order=[], )
[docs] def topological_sort( self, graph: dict[str, list[str]], ) -> list[str]: """Topological sort of a dependency graph. Args: graph: Mapping of node -> list of nodes it depends on. Returns: List of node names in dependency-safe order (dependencies first). Raises: CircularDependencyError: If a cycle is detected. """ state: dict[str, int] = {node: 0 for node in graph} order: list[str] = [] path: list[str] = [] def visit(node: str) -> None: """Recursively visit a node using depth-first search. Uses a tri-colour marking scheme (0 = unvisited, 1 = in-progress, 2 = finished) to detect back-edges that indicate cycles. Args: node: The graph node to visit. Raises: CircularDependencyError: If a back-edge (cycle) is detected. """ if state.get(node) == 2: return if state.get(node) == 1: cycle_start = path.index(node) cycle = [*path[cycle_start:], node] raise CircularDependencyError(cycle) state[node] = 1 path.append(node) for dep in graph.get(node, []): if dep in graph: visit(dep) path.pop() state[node] = 2 order.append(node) for node in sorted(graph.keys()): if state.get(node) == 0: visit(node) return order