Layer 1
The Arbiter-level band
This is the primary governance integration point. The governed_process_agent_call() function wraps every agent dispatch. It sits between "Bedrock selects a tool" and "SQS message is sent."
def governed_process_agent_call(agents_config, orchestration, agent_name, agent_input, agent_use_id):
"""
Control-surface band: governance evaluation before every agent dispatch.
Sits between 'Bedrock selects a tool' and 'SQS message is sent'.
"""
# Always increment metrics on dispatch attempt (even in bypass mode)
if GOVERNANCE_BYPASS:
increment_agent_invocation(agent_name)
return process_agent_call(agents_config, orchestration, agent_name, agent_input, agent_use_id)
try:
from governance.engine import GovernanceEngine
from governance.hierarchy import load_governance_state
from governance.ledger import write_finding
from governance.models import DispatchRequest, ArbitrationDecision
except ImportError as e:
print(f"Governance layer not available ({e}), falling back to ungoverned dispatch")
increment_agent_invocation(agent_name)
return process_agent_call(agents_config, orchestration, agent_name, agent_input, agent_use_id)
# Load governance state (cached per container)
authority_units, contracts, case_law, constitutional_layers = load_governance_state()
engine = GovernanceEngine(authority_units, contracts, case_law, constitutional_layers)
# Resolve domain from agent config
agent_cfg = next((a for a in agents_config.get('agents', []) if a['name'] == agent_name), None)
domain = agent_cfg.get('domain', 'default') if agent_cfg else 'unknown'
# Build dispatch context from DynamoDB (agent metrics, workflow state)
dispatch_context = build_dispatch_context(agent_name, orchestration["workflowId"], orchestration)
# Build dispatch request
request = DispatchRequest(
requesting_agent_id="arbiter",
target_agent_id=agent_name,
action_type="invoke_agent",
domain=domain,
workflow_id=orchestration["workflowId"],
agent_use_id=agent_use_id,
context=dispatch_context,
agent_input=agent_input,
)
# Evaluate (deterministic, no LLM)
finding = engine.evaluate(request)
# Write governance record (mandatory — before dispatch or denial)
try:
write_finding(finding)
except Exception as e:
print(f"GOVERNANCE LEDGER WRITE FAILED: {e} — halting dispatch")
return None
# Act on decision
if finding.decision == ArbitrationDecision.PERMIT:
increment_agent_invocation(agent_name)
return process_agent_call(agents_config, orchestration, agent_name, agent_input, agent_use_id)
elif finding.decision == ArbitrationDecision.DENY:
increment_agent_deny(agent_name)
orchestration['_deny_count'] = orchestration.get('_deny_count', 0) + 1
return {"denied": True, "reason": finding.reason, "finding_id": finding.finding_id}
elif finding.decision in (ArbitrationDecision.ESCALATE, ArbitrationDecision.HALT):
_route_escalation(finding)
orchestration['_escalate_count'] = orchestration.get('_escalate_count', 0) + 1
return {"escalated": True, "reason": finding.reason, "finding_id": finding.finding_id}
Every toolUse in the model's response is routed through this function. The call site in invoke_agents_from_conversation() shows the integration:
def invoke_agents_from_conversation(orchestration, agents_config):
for content in output_message.get('content', []):
if 'toolUse' in content:
tool_use = content['toolUse']
# Every tool use passes through the governance band
result = governed_process_agent_call(
agents_config,
orchestration,
tool_use['name'],
tool_use['input'],
tool_use['toolUseId']
)
Graceful Degradation
The governance imports are inside a try/except ImportError. If the governance Lambda Layer is not deployed, the system falls back to ungoverned dispatch with a log message. This means you can deploy the Arbiter first and add the governance layer later without breaking the existing flow.