# Copyright 2025 The EasyDeL/Calute Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tool policy enforcement layer for Calute.
Provides configurable allow/deny policies for tool execution at both
global and per-agent levels. Policies are evaluated before any tool
call is dispatched, blocking unauthorized calls with a clear error.
Design:
- Global policy applies to all agents unless overridden.
- Per-agent policy takes precedence over global for that agent.
- An explicit allow-list means only those tools are permitted.
- An explicit deny-list means all tools *except* those are permitted.
- If both allow and deny are set, allow takes precedence (intersection).
- Optional tools require explicit opt-in via the allow list.
"""
from __future__ import annotations
import logging
import typing as tp
from dataclasses import dataclass, field
from enum import Enum
logger = logging.getLogger(__name__)
[docs]class PolicyAction(Enum):
"""Result of a policy evaluation.
Represents the two possible outcomes when a tool invocation is checked
against a :class:`ToolPolicy`.
Attributes:
ALLOW: The tool invocation is permitted by the policy.
DENY: The tool invocation is blocked by the policy.
Example:
>>> action = PolicyAction.ALLOW
>>> action.value
'allow'
"""
ALLOW = "allow"
DENY = "deny"
[docs]class PolicyEngine:
"""Evaluates tool policies at global and per-agent level.
The engine holds a *global_policy* that applies to every agent and an
optional dict of *agent_policies* keyed by agent ID. Per-agent policies
fully override the global policy for that agent (no merging).
Listeners can be registered to observe every policy check, which is
useful for audit logging or metrics collection.
Attributes:
global_policy: The default :class:`ToolPolicy` applied when no
per-agent policy matches.
agent_policies: Mapping of agent ID to :class:`ToolPolicy`. When an
agent ID matches a key in this dict, that policy is used instead
of the global policy.
Example:
>>> engine = PolicyEngine(
... global_policy=ToolPolicy(deny={"execute_shell"}),
... )
>>> engine.check("execute_shell", agent_id="coder")
PolicyAction.DENY
>>> engine.set_agent_policy("coder", ToolPolicy(allow={"execute_shell"}))
>>> engine.check("execute_shell", agent_id="coder")
PolicyAction.ALLOW
"""
def __init__(
self,
global_policy: ToolPolicy | None = None,
agent_policies: dict[str, ToolPolicy] | None = None,
) -> None:
"""Initialise the policy engine.
Args:
global_policy: The default policy applied to all agents that do
not have a per-agent override. Defaults to an empty
:class:`ToolPolicy` (allow all).
agent_policies: Optional mapping of agent IDs to their specific
policies. Per-agent policies fully replace (not merge with)
the global policy for that agent.
"""
self.global_policy = global_policy or ToolPolicy()
self.agent_policies: dict[str, ToolPolicy] = agent_policies or {}
self._listeners: list[tp.Callable[[str, str | None, PolicyAction], None]] = []
[docs] def set_global_policy(self, policy: ToolPolicy) -> None:
"""Replace the global policy applied to all agents without a per-agent override.
Args:
policy: The new :class:`ToolPolicy` to use as the global default.
"""
self.global_policy = policy
[docs] def set_agent_policy(self, agent_id: str, policy: ToolPolicy) -> None:
"""Set or replace the per-agent policy for a specific agent.
When set, this policy fully overrides the global policy for the
given agent (no merging occurs).
Args:
agent_id: The unique identifier of the agent.
policy: The :class:`ToolPolicy` to assign to this agent.
"""
self.agent_policies[agent_id] = policy
[docs] def remove_agent_policy(self, agent_id: str) -> None:
"""Remove a per-agent policy so the agent falls back to the global policy.
If no per-agent policy exists for the given agent, this is a no-op.
Args:
agent_id: The unique identifier of the agent whose policy
should be removed.
"""
self.agent_policies.pop(agent_id, None)
[docs] def add_listener(self, callback: tp.Callable[[str, str | None, PolicyAction], None]) -> None:
"""Register a listener that is notified on every policy check.
Listeners are called synchronously after each policy evaluation.
If a listener raises an exception, the error is logged as a warning
and the remaining listeners are still invoked.
Args:
callback: A callable that receives ``(tool_name, agent_id, action)``
where *tool_name* is the tool being checked, *agent_id* is
the optional agent identifier, and *action* is the resulting
:class:`PolicyAction`.
"""
self._listeners.append(callback)
[docs] def check(self, tool_name: str, agent_id: str | None = None) -> PolicyAction:
"""Check whether a tool invocation is allowed for a given agent.
Resolves the applicable policy (per-agent if available, otherwise
global), evaluates it, notifies all registered listeners, and logs
denied actions at INFO level.
Args:
tool_name: The name of the tool to check.
agent_id: Optional identifier of the agent requesting the tool.
When ``None``, only the global policy is consulted.
Returns:
:attr:`PolicyAction.ALLOW` if the tool is permitted, or
:attr:`PolicyAction.DENY` if it is blocked.
"""
policy = self.agent_policies.get(agent_id) if agent_id else None
if policy is None:
policy = self.global_policy
action = policy.evaluate(tool_name)
for listener in self._listeners:
try:
listener(tool_name, agent_id, action)
except Exception:
logger.warning("Policy listener error", exc_info=True)
if action == PolicyAction.DENY:
logger.info("Policy DENIED tool=%s agent=%s", tool_name, agent_id)
return action
[docs] def enforce(self, tool_name: str, agent_id: str | None = None) -> None:
"""Check a tool invocation and raise on denial.
This is a convenience wrapper around :meth:`check` that raises a
:class:`ToolPolicyViolation` when the policy decision is DENY,
making it suitable for use in enforcement points where a blocked
tool should halt execution.
Args:
tool_name: The name of the tool to check.
agent_id: Optional identifier of the agent requesting the tool.
Raises:
ToolPolicyViolation: If the tool is denied by the applicable
policy.
"""
action = self.check(tool_name, agent_id)
if action == PolicyAction.DENY:
raise ToolPolicyViolation(tool_name, agent_id)