Integration Pattern

The Control-
Surface Band

Deploying the governance evaluation layer between agent reasoning and system execution. Two layers. One ledger. Every action evaluated before it can proceed.

2 Governance Layers
1 Unified Ledger
0 Bypass Without Audit

Your agent decides and executes in the same step

In a typical Bedrock agent flow, the model selects a tool and the tool executes. There is no structural point between "the model decided" and "the action happened" where governance can evaluate whether the action should be permitted.

The control-surface band creates that point. It sits between bedrock.converse() (the model selects a tool) and sqs.send_message() (the action dispatches). Every action passes through it. If the governance engine denies, the action never reaches the worker.

The Integration Seam
The governance engine evaluates before the SQS message is sent. If the engine denies, the agent is never invoked. The deny result feeds back into the Bedrock conversation.

Without the Band

  • Model selects tool
  • SQS message sent immediately
  • Worker executes
  • Log records what happened after the fact
Governance follows action

With the Band

  • Model selects tool
  • Governance engine evaluates authority scope
  • Finding recorded at evaluation time
  • Only on PERMIT: SQS message sent
Governance precedes action

Two layers, one ledger

The governance engine runs at two levels. At the Arbiter level, it evaluates authority scopes and composition contracts before every agent dispatch. At the Worker level, it intercepts every tool call within a worker agent. Both layers write to the same DynamoDB governance ledger.

Two-layer governance architecture: Arbiter-level band evaluates before SQS dispatch, Worker-level band evaluates before tool execution
Layer 1

Arbiter-Level Band

Evaluates authority scopes, composition contracts, case law, and constitutional invariants before every SQS dispatch. The agent is never invoked if denied.

governed_process_agent_call()
Layer 2

Worker-Level Band

Intercepts every Strands SDK tool call within a worker agent via AgentToolHandler.preprocess(). Denied tools are blocked before execution.

GovernedToolHandler

The Arbiter-level band

This is the primary governance integration point. The governed_process_agent_call() function wraps every agent dispatch. It sits between "Bedrock selects a tool" and "SQS message is sent."

def governed_process_agent_call(agents_config, orchestration, agent_name, agent_input, agent_use_id):
    """
    Control-surface band: governance evaluation before every agent dispatch.
    Sits between 'Bedrock selects a tool' and 'SQS message is sent'.
    """
    # Always increment metrics on dispatch attempt (even in bypass mode)
    if GOVERNANCE_BYPASS:
        increment_agent_invocation(agent_name)
        return process_agent_call(agents_config, orchestration, agent_name, agent_input, agent_use_id)

    try:
        from governance.engine import GovernanceEngine
        from governance.hierarchy import load_governance_state
        from governance.ledger import write_finding
        from governance.models import DispatchRequest, ArbitrationDecision
    except ImportError as e:
        print(f"Governance layer not available ({e}), falling back to ungoverned dispatch")
        increment_agent_invocation(agent_name)
        return process_agent_call(agents_config, orchestration, agent_name, agent_input, agent_use_id)

    # Load governance state (cached per container)
    authority_units, contracts, case_law, constitutional_layers = load_governance_state()
    engine = GovernanceEngine(authority_units, contracts, case_law, constitutional_layers)

    # Resolve domain from agent config
    agent_cfg = next((a for a in agents_config.get('agents', []) if a['name'] == agent_name), None)
    domain = agent_cfg.get('domain', 'default') if agent_cfg else 'unknown'

    # Build dispatch context from DynamoDB (agent metrics, workflow state)
    dispatch_context = build_dispatch_context(agent_name, orchestration["workflowId"], orchestration)

    # Build dispatch request
    request = DispatchRequest(
        requesting_agent_id="arbiter",
        target_agent_id=agent_name,
        action_type="invoke_agent",
        domain=domain,
        workflow_id=orchestration["workflowId"],
        agent_use_id=agent_use_id,
        context=dispatch_context,
        agent_input=agent_input,
    )

    # Evaluate (deterministic, no LLM)
    finding = engine.evaluate(request)

    # Write governance record (mandatory — before dispatch or denial)
    try:
        write_finding(finding)
    except Exception as e:
        print(f"GOVERNANCE LEDGER WRITE FAILED: {e} — halting dispatch")
        return None

    # Act on decision
    if finding.decision == ArbitrationDecision.PERMIT:
        increment_agent_invocation(agent_name)
        return process_agent_call(agents_config, orchestration, agent_name, agent_input, agent_use_id)

    elif finding.decision == ArbitrationDecision.DENY:
        increment_agent_deny(agent_name)
        orchestration['_deny_count'] = orchestration.get('_deny_count', 0) + 1
        return {"denied": True, "reason": finding.reason, "finding_id": finding.finding_id}

    elif finding.decision in (ArbitrationDecision.ESCALATE, ArbitrationDecision.HALT):
        _route_escalation(finding)
        orchestration['_escalate_count'] = orchestration.get('_escalate_count', 0) + 1
        return {"escalated": True, "reason": finding.reason, "finding_id": finding.finding_id}

Every toolUse in the model's response is routed through this function. The call site in invoke_agents_from_conversation() shows the integration:

def invoke_agents_from_conversation(orchestration, agents_config):
    for content in output_message.get('content', []):
        if 'toolUse' in content:
            tool_use = content['toolUse']
            # Every tool use passes through the governance band
            result = governed_process_agent_call(
                agents_config,
                orchestration,
                tool_use['name'],
                tool_use['input'],
                tool_use['toolUseId']
            )
Graceful Degradation

The governance imports are inside a try/except ImportError. If the governance Lambda Layer is not deployed, the system falls back to ungoverned dispatch with a log message. This means you can deploy the Arbiter first and add the governance layer later without breaking the existing flow.

What makes scope conditions operational

Without runtime context, authority scopes evaluate against an empty dict. Conditions never match. Limits never fire. The build_dispatch_context() function populates DispatchRequest.context with real agent metrics and workflow state from DynamoDB.

def build_dispatch_context(agent_name, workflow_id, orchestration) -> dict:
    """
    Build the context dict for DispatchRequest.context.
    All values are deterministic facts — no LLM, no interpretation.
    This is what activates governance scope conditions and limits.
    """
    context = {}

    # Per-agent metrics from DynamoDB (atomic counters)
    metrics = load_agent_metrics(agent_name)
    if metrics:
        invocations = int(metrics.get('invocationCount', 0))
        denies = int(metrics.get('governanceDenyCount', 0))
        failures = int(metrics.get('failureCount', 0))
        successes = int(metrics.get('successCount', 0))
        total_duration = int(metrics.get('totalDurationMs', 0))

        context['agent_invocation_count'] = invocations
        context['agent_deny_rate_pct'] = (denies / max(invocations, 1)) * 100
        context['agent_failure_rate_pct'] = (failures / max(invocations, 1)) * 100
        context['agent_avg_duration_ms'] = total_duration / max(successes, 1)

    # Per-workflow state
    context['workflow_fabrication_pending'] = orchestration.get('pending_fabrication', False)
    context['workflow_agent_count'] = len(agents_used)  # distinct agents in this workflow
    context['workflow_deny_count'] = orchestration.get('_deny_count', 0)
    context['workflow_escalate_count'] = orchestration.get('_escalate_count', 0)

    return context

With this context populated, authority scopes like these become operational:

# "This agent may be invoked no more than 1000 times"
AuthorityScope(
    decision_type="invoke_agent",
    domain="*",
    conditions={},
    limits={"agent_invocation_count": 1000}
)

# "Halt if the agent's deny rate exceeds 20%"
AuthorityScope(
    decision_type="invoke_agent",
    domain="payment",
    conditions={},
    limits={"agent_deny_rate_pct": 20.0}
)

# "Do not invoke during active fabrication"
AuthorityScope(
    decision_type="invoke_agent",
    domain="*",
    conditions={"workflow_fabrication_pending": False},
    limits={}
)
State-Aware Governance
If any context key starts with unconfirmed_ and matches a governance-relevant condition, the engine triggers monotonic reduction: scope contracts, the action halts. The system does not permit under uncertainty.

Three outcomes, all observable

The governance engine returns one of three decisions. Each produces a different downstream effect. All three are observable by the model.

PERMIT

Action Proceeds

The SQS message is sent. The worker agent is invoked. Agent invocation metrics are incremented. The governance finding records which authority unit permitted the action.

DENY

Action Blocked

No SQS message. The deny result (including the reason and finding ID) is returned to the Bedrock conversation as a tool result. The model sees the denial and can adjust.

ESCALATE

Human Review

No SQS message. The finding is published to an SNS topic for human review. The escalation result is returned to the Bedrock conversation. The model observes the escalation.

Escalation routing

def _route_escalation(finding):
    """Route a governance escalation to the configured SNS topic."""
    if not ESCALATION_TOPIC_ARN:
        print("ESCALATION_TOPIC_ARN not configured, escalation logged only")
        return
    sns_client.publish(
        TopicArn=ESCALATION_TOPIC_ARN,
        Message=json.dumps({
            'finding_id': finding.finding_id,
            'workflow_id': finding.workflow_id,
            'reason': finding.reason,
            'requesting_agent': finding.requesting_agent,
            'target_agent': finding.target_agent,
            'contract_evaluated': finding.contract_evaluated,
        }, default=str),
        Subject=f"Governance Escalation: {finding.reason[:80]}"
    )
Governance as Observable Environment

Deny and escalate results feed back into the Bedrock conversation as tool results. The model observes governance outcomes and adjusts its next action. Governance is not invisible to the agent. It is part of the observable environment.

Worker-level governance

Worker agents use the Strands SDK. The GovernedToolHandler subclasses AgentToolHandler.preprocess() to intercept every tool call before execution. This is the fine-grained governance layer: which tools is this specific agent permitted to use?

class GovernedToolHandler(AgentToolHandler):
    """
    Tool handler that evaluates governance policy before every tool execution.
    If preprocess() returns a ToolResult, the tool call is short-circuited.
    If preprocess() returns None, the tool executes normally.
    """

    def __init__(self, tool_registry, agent_id, workflow_id, denied_tools=None):
        super().__init__(tool_registry)
        self.agent_id = agent_id
        self.workflow_id = workflow_id
        self.denied_tools = denied_tools or set()

        # Load from env var if not passed directly
        if not self.denied_tools:
            denied_env = os.environ.get('DENIED_TOOLS', '')
            self.denied_tools = set(t.strip() for t in denied_env.split(',') if t.strip())

    def preprocess(self, tool, tool_config, **kwargs):
        """
        Pre-execution governance check. Deterministic — no LLM.
        Returns a denial ToolResult if the tool is not permitted.
        Returns None to allow the tool call to proceed.
        """
        tool_name = tool.get('name', '')
        tool_use_id = tool.get('toolUseId', '')

        if tool_name in self.denied_tools:
            finding_id = str(uuid.uuid4())
            self._write_finding(finding_id, tool_name, 'deny',
                f'tool_denied:explicit_deny_list:{tool_name}')
            return ToolResult(
                toolUseId=tool_use_id,
                status='error',
                content=[{'text': f"Tool '{tool_name}' is not authorised. Finding: {finding_id}"}],
            )

        # Permit + governance record
        self._write_finding(str(uuid.uuid4()), tool_name, 'permit',
            'tool_permitted:no_constraints_violated')
        return None  # Proceed with execution

Both governance layers write to the same DynamoDB table. A single query by workflowId produces the complete governance trace: Arbiter-level dispatch decisions and Worker-level tool decisions in chronological order.

Injecting governance into worker agents

Worker agents are loaded dynamically from DynamoDB configuration. They cannot be modified at import time. The governance handler is injected at runtime by patching the Strands module namespace before the agent code loads.

def _inject_governance(agent_name, workflow_id, denied_tools):
    """
    Patch the Strands AgentToolHandler so dynamically-loaded agents
    get governance-aware tool handling. Returns a restore function.
    """
    import strands.agent.agent as agent_module
    from strands.handlers.tool_handler import AgentToolHandler
    original_class = AgentToolHandler

    # Create a factory that returns GovernedToolHandler instances
    def governed_handler_factory(tool_registry):
        return GovernedToolHandler(
            tool_registry=tool_registry,
            agent_id=agent_name,
            workflow_id=workflow_id,
            denied_tools=denied_tools,
        )

    # Patch: when Agent.__init__ calls AgentToolHandler(tool_registry=...),
    # it gets our governed version instead
    agent_module.AgentToolHandler = governed_handler_factory

    def restore():
        agent_module.AgentToolHandler = original_class
    return restore
Why Monkey-Patching?
The Strands SDK creates the AgentToolHandler inside Agent.__init__(). Worker agents are loaded dynamically from source files stored in DynamoDB. We cannot subclass the handler before the agent is instantiated because we do not control the agent's import chain. The patch replaces the handler class before the agent loads, and the restore function cleans up after the agent completes.

Every evaluation produces a record

The governance record is produced at evaluation time as a structural byproduct. Not reconstructed after the fact. If the ledger write fails, the action is denied. An action that produces no record is a governance failure.

def write_finding(finding: GovernanceFinding) -> None:
    """Write a governance finding to the ledger table. Raises on failure."""
    table_name = os.environ.get('GOVERNANCE_LEDGER_TABLE')
    if not table_name:
        raise RuntimeError("GOVERNANCE_LEDGER_TABLE not configured")

    table = _dynamodb.Table(table_name)
    table.put_item(Item={
        'findingId': finding.finding_id,        # UUID — write-once key
        'workflowId': finding.workflow_id,       # queryable — all findings for a workflow
        'timestamp': str(finding.timestamp),
        'decision': finding.decision.value,      # permit | deny | escalate | halt
        'requestingAgent': finding.requesting_agent,
        'targetAgent': finding.target_agent,
        'scopeEvaluated': finding.scope_evaluated or 'none',
        'contractEvaluated': finding.contract_evaluated or 'none',
        'reason': finding.reason,                # machine-readable cause string
        'escalationTarget': finding.escalation_target or 'none',
        'residualAuthorityDenial': finding.residual_authority_denial,
        'ttl': int(finding.timestamp) + (90 * 24 * 3600),  # 90-day retention
    })
Write-Once

put_item with UUID key. Never updated.

Mandatory

Failure to write halts dispatch entirely.

Unified

Both layers write to the same table.

TTL-Managed

90-day retention via DynamoDB TTL.

In governed_process_agent_call(), the ledger write failure explicitly halts dispatch:

    # Write governance record (mandatory — before dispatch or denial)
    try:
        write_finding(finding)
    except Exception as e:
        print(f"GOVERNANCE LEDGER WRITE FAILED: {e} — halting dispatch")
        return None  # No dispatch, no denial, no escalation — just halt

The bypass and how to exit it

You cannot govern a system that does not yet have authority units. The GOVERNANCE_BYPASS environment variable provides the bootstrapping escape hatch. It is an explicit, auditable decision, not a silent absence.

GOVERNANCE_BYPASS = os.environ.get('GOVERNANCE_BYPASS', 'false').lower() == 'true'

def governed_process_agent_call(agents_config, orchestration, agent_name, agent_input, agent_use_id):
    # Always increment metrics on dispatch attempt (even in bypass mode)
    if GOVERNANCE_BYPASS:
        increment_agent_invocation(agent_name)
        return process_agent_call(agents_config, orchestration, ...)
Exiting Bypass Mode

The recommended sequence:

  1. Deploy with GOVERNANCE_BYPASS=true
  2. Seed authority units and composition contracts via seedConfig
  3. Verify the governance state loads correctly (check Lambda logs for "Loaded N authority units")
  4. Set GOVERNANCE_BYPASS=false in the Lambda environment
  5. Monitor the governance ledger for PERMIT/DENY/ESCALATE findings

In bypass mode, metrics are still incremented. The system tracks agent invocations even when governance is not evaluating them. This means the context that governance needs (invocation counts, deny rates) is already populated when you activate the band.

Deploy and observe

# Clone the repository
git clone https://github.com/aws-samples/sample-agentic-fabric
cd sample-agentic-fabric/Arbiter/app

# Install and build
npm install
npm run build

# Deploy (starts in bypass mode)
ENVIRONMENT=dev npx cdk bootstrap --profile your-profile
ENVIRONMENT=dev npx cdk deploy --all --require-approval never --profile your-profile

# 1. Invoke an agent (governance bypass active)
#    Check CloudWatch — you should see "GOVERNANCE_BYPASS active"

# 2. Set GOVERNANCE_BYPASS=false in the Lambda environment
#    Redeploy or update the environment variable directly

# 3. Invoke an agent again
#    Check CloudWatch — you should see "Governance PERMIT: scope_match:..."
#    Check the governance ledger DynamoDB table — a finding should be written

# 4. Add a denied tool to a worker agent config
#    Invoke a workflow that uses that tool
#    Check CloudWatch — "Governance DENY tool 'tool-name'"
#    The agent receives the denial as a ToolResult error