Add crisis_agents — Crisis as a coordination layer for AI agent teams

A new sibling Python package, `crisis_agents`, that lifts the Crisis
protocol from "consensus between machines" to "consensus between AI
agents". Threat model: a team of sub-agents normally talks freely
with its orchestrator (the "mothership"); when the team's boundary
opens and an external agent of unknown trust joins, the mothership
activates the Crisis layer so byzantine equivocation is detectable.

Two-phase orchestration model:

  Phase 1 — closed team, no Crisis: agents emit claims directly, the
  mothership collects them flat.

  Phase 2 — boundary opens: every subsequent claim is wrapped into a
  Crisis Message with the agent's stable process_id and a PoW nonce,
  delivered into per-agent LamportGraphs, and after each turn the
  mothership scans for mutations via LamportGraph.find_mutations.

  Phase 3 — proof: when an alarm fires, the mothership emits a
  replayable JSON proof-of-malfeasance document with the contradictory
  witnesses, their delivery sets, and DAG cross-references showing
  which honest agents saw what.

Modules:
  - claim.py      Claim dataclass + JSON round-trip
  - boundary.py   membership tracker + open() trigger
  - agent.py      CrisisAgent abstract + MockAgent + MockByzantineAgent
                  (the latter equivocates by emitting two variants to
                  disjoint peer subsets at the same logical turn)
  - mothership.py orchestrator driving both phases, building Crisis
                  Messages from Claims, per-agent LamportGraphs, log
  - alarm.py      scan_for_mutations: same-agent same-turn distinct
                  digests with non-identical delivery sets, verified
                  spacelike via LamportGraph.are_spacelike on the
                  honest-agent graphs
  - proof.py      build_proof + ProofDocument + JSON serializer +
                  verify_proof_self_consistent
  - cli.py        `crisis-agents demo` + `crisis-agents verify`
  - scenarios/    fact_check: reference doc + 6 statements + scripted
                  honest/byzantine agents producing a deterministic
                  equivocation on statement s03

Tests: 50 new tests across test_claim, test_boundary, test_mothership,
test_alarm, test_proof, test_demo_fact_check. End-to-end test runs the
fact_check scenario, asserts exactly one alarm raised, proof is built,
re-serialized JSON passes self-consistency. Full suite (existing
crisis + new crisis_agents) green in 0.77s — 145 tests.

Out of scope (deliberately): visualization (separate CrisisViz upgrade
later), real TCP gossip (agents talk via in-process function calls in
the mothership), false-claim detection without equivocation (an
agent that consistently lies but never equivocates is out-voted, not
"caught"; catching it would require a ground-truth oracle).

Reuse from existing crisis package: Message, Vertex, LamportGraph,
LamportGraph.find_mutations, ProofOfWorkWeight, digest. The new code
is a thin adapter layer; the protocol substrate did the heavy lifting.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
saymrwulf 2026-05-14 16:38:11 +02:00
parent 7f830a36ef
commit b8684297fa
18 changed files with 1985 additions and 0 deletions

View file

@ -21,10 +21,14 @@ dev = [
"pytest>=8.0",
"rich>=13.0",
]
live = [
"anthropic>=0.40",
]
[project.scripts]
crisis-node = "crisis.node:main"
crisis-demo = "crisis.demo:main"
crisis-agents = "crisis_agents.cli:main"
[build-system]
requires = ["setuptools>=68.0"]

View file

@ -0,0 +1,30 @@
"""
Crisis Agents: a coordination layer for AI agent teams on top of the
Crisis consensus protocol.
The protocol implementation in `crisis` reaches total order on messages
between machines. This package lifts that one level up: it treats each
participating agent as a Crisis node and uses the Lamport graph as an
immutable, replayable ledger of what every agent said and when.
Use case: a team of agents coordinated by a *mothership* (orchestrator)
normally talks freely; when the team's boundary opens to outside agents
of unknown trust, the mothership activates the Crisis layer so that any
byzantine equivocation can be detected (`LamportGraph.find_mutations`)
and a cryptographic proof of malfeasance can be produced.
Key modules:
- claim: structured statement an agent makes (a Crisis payload)
- boundary: closed-set tracking + open() trigger
- agent: CrisisAgent abstract + MockAgent + MockByzantineAgent
- mothership: orchestrator that drives Crisis rounds
- alarm: wraps mutation detection into AlarmEvent
- proof: emits replayable proof-of-malfeasance JSON
- scenarios: demo scripts (fact_check)
- cli: `crisis-agents` command-line entry point
"""
from crisis_agents.claim import Claim
__all__ = ["Claim"]
__version__ = "0.1.0"

150
src/crisis_agents/agent.py Normal file
View file

@ -0,0 +1,150 @@
"""
CrisisAgent abstract base + Mock subclasses for the deterministic path.
An agent in this PoC is something that, given the current view of claims, can
produce its next contributions. Agents have a stable 32-byte process_id
derived from their human-readable name; Crisis uses this id for mutation
detection, so two agents must never share one.
Real Claude-driven agents live in `live_agent.py` (Phase 5) and inherit the
same `CrisisAgent` interface the mothership doesn't care which kind it is
driving.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
from crisis.crypto import digest
from crisis.message import ID_LENGTH
from crisis_agents.claim import Claim
def agent_id_from_name(name: str) -> bytes:
"""Derive a stable 32-byte process id from a human-readable name.
Matches the convention used in `crisis.demo.Simulation`, so agents
coexisting with simulated nodes have ids in the same space.
"""
return digest(name.encode())[:ID_LENGTH]
@dataclass
class AgentTurn:
"""One emission from an agent in a given turn.
Attributes:
claim: The Claim being emitted.
target_subset: None means broadcast to every peer. A set of peer
names means this variant of the claim is only
delivered to those peers the building block of
byzantine equivocation. Honest agents always set
this to None.
"""
claim: Claim
target_subset: Optional[set[str]] = None
class CrisisAgent(ABC):
"""Abstract base for any agent participating in a Crisis-coordinated team."""
def __init__(self, name: str):
if not name:
raise ValueError("agent name must be non-empty")
self.name: str = name
self.process_id: bytes = agent_id_from_name(name)
@abstractmethod
def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]:
"""Produce this agent's emissions for turn `turn`.
Args:
turn: The 0-indexed turn counter (matched across all agents).
received_claims: All claims the agent has seen *up to and including*
the previous turn. The agent may use these to inform
its next emission or ignore them entirely.
Returns:
A possibly-empty list of AgentTurn entries. An honest agent emits one
broadcast AgentTurn per scripted item; a byzantine equivocator emits
two AgentTurns with disjoint `target_subset`s for the same logical
claim slot.
"""
...
def __repr__(self) -> str:
return f"{type(self).__name__}(name={self.name!r}, id={self.process_id.hex()[:8]}...)"
class MockAgent(CrisisAgent):
"""An agent that emits a predetermined sequence of claims.
Used for tests and deterministic demos. The `scripted_claims` argument is
a list of per-step emission lists: on its Nth invocation, the agent emits
`scripted_claims[N]`. Invocations past the end of the script produce
no emissions.
Each agent maintains its own invocation counter, **independent of the
mothership's turn counter** — that way the same agent can be used across
closed-phase and Crisis-phase invocations without the script restarting.
The `turn` argument is observed but not used to index the script.
All emissions are broadcast (no equivocation). For equivocation, use
`MockByzantineAgent`.
"""
def __init__(self, name: str, scripted_claims: list[list[Claim]]):
super().__init__(name)
self._script = scripted_claims
self._invocations = 0
def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]:
idx = self._invocations
self._invocations += 1
if idx >= len(self._script):
return []
return [AgentTurn(claim=c) for c in self._script[idx]]
class MockByzantineAgent(CrisisAgent):
"""An agent that equivocates by construction.
For each turn, two parallel claim variants may be emitted. The first
variant goes to peers in `split_a`; the second variant goes to peers in
`split_b`. Peers not in either set receive nothing for that turn.
`scripted_pairs[turn]` is `(claim_to_a, claim_to_b)` the two
contradictory claims this agent emits on turn `turn`. Both share the
emitting agent's process_id, so Crisis's `find_mutations` will surface
them as a mutation pair once both vertices are present in any honest
agent's combined view.
Set `claim_to_b = None` to skip the equivocation for that turn (the
agent then behaves honestly to everyone).
"""
def __init__(self, name: str,
scripted_pairs: list[tuple[Claim, Optional[Claim]]],
split_a: set[str],
split_b: set[str]):
super().__init__(name)
if split_a & split_b:
raise ValueError("split_a and split_b must be disjoint")
self._script = scripted_pairs
self._split_a = split_a
self._split_b = split_b
self._invocations = 0
def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]:
idx = self._invocations
self._invocations += 1
if idx >= len(self._script):
return []
claim_a, claim_b = self._script[idx]
out: list[AgentTurn] = [AgentTurn(claim=claim_a, target_subset=set(self._split_a))]
if claim_b is not None:
out.append(AgentTurn(claim=claim_b, target_subset=set(self._split_b)))
return out

154
src/crisis_agents/alarm.py Normal file
View file

@ -0,0 +1,154 @@
"""
alarm.py detect byzantine equivocation from the mothership's records.
Equivocation in our PoC has a precise structural signature: a single agent
emits, on the same turn, two or more Crisis Messages with **different
message digests** to **non-identical sets of peers**. Same-id same-turn
same-payload duplicate broadcasts are not equivocation; we filter those out.
For each detected alarm we also verify via the Crisis layer's own machinery
(`LamportGraph.are_spacelike`) that the two witness vertices are causally
incomparable this is what makes the alarm cryptographically defensible
rather than merely a bookkeeping observation.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from crisis_agents.claim import Claim
if TYPE_CHECKING:
from crisis_agents.mothership import CrisisPhaseEntry, Mothership
@dataclass(frozen=True)
class MutationWitness:
"""One leg of a byzantine equivocation: a specific Crisis vertex emitted
by the accused agent that contradicts another vertex from the same agent
in the same logical turn.
"""
message_digest_hex: str
payload_claim: dict # the parsed Claim as a plain dict (for JSON proof)
delivered_to: tuple[str, ...] # peers that received THIS variant
@dataclass(frozen=True)
class AlarmEvent:
"""A detected byzantine equivocation.
The combination of `accused_process_id_hex`, `turn`, and `statement_id`
uniquely identifies the offense; multiple witnesses prove the offender
said different things to different peers.
"""
accused_agent: str
accused_process_id_hex: str
statement_id: str
turn: int
witnesses: tuple[MutationWitness, ...]
spacelike_verified: bool # True if the Crisis layer confirmed
# the witness vertices are spacelike in
# at least one honest agent's graph
def scan_for_mutations(mothership: "Mothership") -> list[AlarmEvent]:
"""Walk the mothership's crisis log and surface every equivocation.
Strategy:
1. Group crisis-log entries by (agent_name, turn).
2. A group with 2 distinct message digests AND non-identical delivery
sets is a mutation candidate.
3. For each candidate, build MutationWitness records.
4. Verify spacelike-ness via the Crisis DAG of any honest agent that
observed at least two of the witnesses.
Returns AlarmEvents (possibly empty).
"""
crisis_log = mothership.run_result.crisis_log
by_agent_turn: dict[tuple[str, int], list["CrisisPhaseEntry"]] = {}
for entry in crisis_log:
by_agent_turn.setdefault((entry.agent_name, entry.turn), []).append(entry)
alarms: list[AlarmEvent] = []
for (agent_name, turn), entries in by_agent_turn.items():
if len(entries) < 2:
continue
digests = {e.message_digest_hex for e in entries}
if len(digests) < 2:
continue # same payload replayed — not equivocation
delivery_sets = {tuple(sorted(e.delivered_to)) for e in entries}
if len(delivery_sets) < 2:
continue # same recipients — not equivocation
# All checks passed: this is an equivocation candidate.
statement_id = entries[0].claim.statement_id
accused_pid_hex = mothership.agents[agent_name].process_id.hex()
witnesses = tuple(
MutationWitness(
message_digest_hex=e.message_digest_hex,
payload_claim=e.claim.to_dict(),
delivered_to=tuple(sorted(e.delivered_to)),
)
for e in entries
)
spacelike_ok = _verify_spacelike(mothership, agent_name, entries)
alarms.append(AlarmEvent(
accused_agent=agent_name,
accused_process_id_hex=accused_pid_hex,
statement_id=statement_id,
turn=turn,
witnesses=witnesses,
spacelike_verified=spacelike_ok,
))
return alarms
def _verify_spacelike(mothership: "Mothership", accused_name: str,
entries: list["CrisisPhaseEntry"]) -> bool:
"""Ask the Crisis layer to confirm that the equivocating vertices are
causally incomparable.
Strategy: pick any pair of entries. Find an honest agent's graph that
contains both vertices and ask `are_spacelike`. If no single graph holds
both (because the byzantine delivered them to disjoint subsets), pick
two graphs one per entry and check that neither vertex references
the other directly. This weaker check is sufficient for our PoC: if
neither references the other, they can't be in each other's past in
any extended graph either.
"""
a, b = entries[0], entries[1]
digest_a = bytes.fromhex(a.message_digest_hex)
digest_b = bytes.fromhex(b.message_digest_hex)
# Try to find an honest observer's graph that contains both.
accused_pid = mothership.agents[accused_name].process_id
for name, graph in mothership.all_graphs().items():
if mothership.agents[name].process_id == accused_pid:
continue
if digest_a in graph and digest_b in graph:
va = graph.get_vertex(digest_a)
vb = graph.get_vertex(digest_b)
if va is not None and vb is not None:
return graph.are_spacelike(va, vb)
# Weak proof: confirm neither vertex directly references the other in
# any honest agent's graph. Sufficient in our PoC where equivocations
# are emitted at the same turn from the same parent.
for name, graph in mothership.all_graphs().items():
if mothership.agents[name].process_id == accused_pid:
continue
va = graph.get_vertex(digest_a)
vb = graph.get_vertex(digest_b)
if va is not None and digest_b in (vb_digest for vb_digest in va.digests):
return False
if vb is not None and digest_a in (va_digest for va_digest in vb.digests):
return False
return True

View file

@ -0,0 +1,55 @@
"""
Boundary the membership set whose closure determines whether Crisis is active.
The mental model: agents inside the boundary are trusted (we vouch for their
intent). Crisis is overhead in this phase. When a new agent joins from outside
`open(new_id)` the boundary becomes "open" and from that moment Crisis is
activated for all subsequent claim emission.
Reopening is one-shot in this PoC: once open, the boundary stays open for the
remainder of the run. Re-closing would mean exiling agents and resetting trust
assumptions; that's a separate decision the mothership would make, not a
Boundary primitive.
"""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class Boundary:
"""Membership tracker + Crisis-activation flag.
Attributes:
trusted_ids: set of 32-byte process_ids inside the trusted closure.
is_open: True once `open()` has been called at least once.
"""
trusted_ids: set[bytes] = field(default_factory=set)
is_open: bool = False
def add_trusted(self, process_id: bytes) -> None:
"""Add an agent that's trusted from the start (closed-phase population)."""
if self.is_open:
raise RuntimeError(
"boundary is already open — use open() to add agents now"
)
self.trusted_ids.add(process_id)
def open(self, new_process_id: bytes) -> None:
"""The trigger: a new agent of unknown trust joins.
After this call, `is_open` is True and any further agents are added
via the same method. The new id is added to `trusted_ids` because
Crisis's mutation detection works for *any* id with vertices in the
graph the boundary doesn't gate participation, it gates whether
we bother running Crisis at all.
"""
self.trusted_ids.add(new_process_id)
self.is_open = True
def is_trusted(self, process_id: bytes) -> bool:
return process_id in self.trusted_ids
def size(self) -> int:
return len(self.trusted_ids)

View file

@ -0,0 +1,88 @@
"""
Claim the structured payload an agent emits.
A Claim is what gets JSON-serialized into a Crisis Message's `payload`
field. The Message itself carries the agent's stable process id (the
identity Crisis uses for mutation detection) and the causal digests;
the Claim carries the application-layer semantics of *what* the agent
is asserting.
Claim is intentionally narrow: a verdict on a statement with confidence
and free-text evidence. Anything richer (multi-claim batches, attached
artifacts) should be modeled by emitting multiple Claims that share a
correlation id, not by widening this struct.
"""
from __future__ import annotations
import json
from dataclasses import asdict, dataclass
from typing import ClassVar, Literal
Verdict = Literal["true", "false", "unknown"]
@dataclass(frozen=True)
class Claim:
"""A single adjudication an agent makes about one statement.
Attributes:
statement_id: The scenario-defined identifier of the statement
being adjudicated (e.g. "s03"). Stable across
agents that's how equivocation is detected.
verdict: "true" | "false" | "unknown".
confidence: Self-reported confidence in [0.0, 1.0].
evidence: Free-text justification, capped at 280 chars so
payloads stay bounded and visualizable later.
timestamp_logical: The emitting agent's local turn counter. Not
authoritative for Crisis ordering Crisis derives
order from the DAG but useful for debugging.
schema_version: Forward-compat bump if Claim's shape changes.
"""
statement_id: str
verdict: Verdict
confidence: float
evidence: str
timestamp_logical: int
schema_version: int = 1
# Class constant, not a dataclass field — must be ClassVar so asdict()
# doesn't serialize it into every payload.
EVIDENCE_MAX_LEN: ClassVar[int] = 280
def __post_init__(self):
if not self.statement_id:
raise ValueError("statement_id must be non-empty")
if self.verdict not in ("true", "false", "unknown"):
raise ValueError(f"verdict must be true/false/unknown, got {self.verdict!r}")
if not 0.0 <= self.confidence <= 1.0:
raise ValueError(f"confidence must be in [0, 1], got {self.confidence}")
if len(self.evidence) > self.EVIDENCE_MAX_LEN:
raise ValueError(
f"evidence too long: {len(self.evidence)} > {self.EVIDENCE_MAX_LEN}"
)
if self.timestamp_logical < 0:
raise ValueError(f"timestamp_logical must be >= 0, got {self.timestamp_logical}")
def to_payload(self) -> bytes:
"""Serialize to bytes suitable for `Message.payload`.
Uses sort_keys=True so two byte strings for the same logical claim
are identical which matters for equivocation detection: two
equivocating claims that happen to have identical payloads aren't
the same fault, they're an accidental duplicate.
"""
return json.dumps(asdict(self), sort_keys=True, separators=(",", ":")).encode("utf-8")
@classmethod
def from_payload(cls, payload: bytes) -> "Claim":
"""Inverse of `to_payload`. Raises on malformed input."""
try:
obj = json.loads(payload.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as e:
raise ValueError(f"payload is not valid Claim JSON: {e}") from e
return cls(**obj)
def to_dict(self) -> dict:
return asdict(self)

166
src/crisis_agents/cli.py Normal file
View file

@ -0,0 +1,166 @@
"""
crisis-agents command-line entry point.
Subcommands:
demo Run a scripted scenario end-to-end (closed phase boundary
opens Crisis phase alarm detection proof emission).
verify Re-check a proof JSON for self-consistency. (Phase 6, may be
a stub for now.)
Examples:
crisis-agents demo --scenario fact_check
crisis-agents demo --scenario fact_check --live
crisis-agents verify proof_agent_delta_s03.json
"""
from __future__ import annotations
import argparse
import sys
from pathlib import Path
from crisis_agents.alarm import scan_for_mutations
from crisis_agents.mothership import Mothership
from crisis_agents.proof import (
ProofDocument,
build_proof,
verify_proof_self_consistent,
)
from crisis_agents.scenarios import build_fact_check_scenario
SCENARIOS = {
"fact_check": build_fact_check_scenario,
}
def _run_demo(args: argparse.Namespace) -> int:
if args.scenario not in SCENARIOS:
print(f"unknown scenario: {args.scenario}", file=sys.stderr)
print(f"available: {', '.join(SCENARIOS)}", file=sys.stderr)
return 2
builder = SCENARIOS[args.scenario]
scenario = builder(live=args.live, model=args.model)
mode = "live" if args.live else "mocked"
print(f"=== crisis-agents demo: {scenario.name} ({mode}) ===\n")
print(scenario.description)
print()
print(f"Reference document:")
for line in scenario.reference_doc.splitlines():
print(f" {line}")
print()
mothership = Mothership()
for agent in scenario.honest_agents:
mothership.add_agent(agent)
# Phase 1: closed
print(f"--- Phase 1: closed team, no Crisis ({scenario.closed_phase_turns} turn(s)) ---")
mothership.run_closed_phase(num_turns=scenario.closed_phase_turns)
print(
f" {len(mothership.run_result.closed_log)} claims collected from "
f"{len(mothership.agents)} honest agent(s) — consensus reached "
f"without Crisis.\n"
)
# Phase 2: boundary opens
print(f"--- Phase 2: boundary opens — {scenario.byzantine_joiner.name} joins ---")
print(" Crisis activated for all subsequent claims.\n")
mothership.open_boundary(scenario.byzantine_joiner)
# Phase 3: Crisis-active turns
print(f"--- Phase 3: Crisis-active run ({scenario.crisis_phase_turns} turn(s)) ---")
mothership.run_crisis_phase(num_turns=scenario.crisis_phase_turns)
print(
f" {len(mothership.run_result.crisis_log)} Crisis messages emitted; "
f"{len(mothership.agents)} per-agent LamportGraphs maintained.\n"
)
# Phase 4: alarm
print("--- Phase 4: scan for byzantine equivocation ---")
alarms = scan_for_mutations(mothership)
if not alarms:
print(" ✓ No mutations detected — network is honest.\n")
return 0
print(f"{len(alarms)} alarm(s) raised:")
for a in alarms:
verdicts = ", ".join(
f"{w.payload_claim['verdict']}->{','.join(w.delivered_to)}"
for w in a.witnesses
)
print(
f" - agent {a.accused_agent!r} (id={a.accused_process_id_hex[:16]}...) "
f"equivocated on {a.statement_id} at turn {a.turn}: {verdicts}"
)
print()
# Phase 5: proof emission
print("--- Phase 5: emit proof-of-malfeasance ---")
out_dir = Path(args.out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
for a in alarms:
proof = build_proof(mothership, a)
path = out_dir / f"proof_{a.accused_agent}_{a.statement_id}.json"
path.write_text(proof.to_json())
print(f" wrote {path}")
check = verify_proof_self_consistent(proof)
marker = "OK" if check.ok else "FAIL"
print(f" self-consistency: {marker}{check.reason}")
print()
return 0
def _run_verify(args: argparse.Namespace) -> int:
path = Path(args.proof_path)
if not path.exists():
print(f"file not found: {path}", file=sys.stderr)
return 2
proof = ProofDocument.from_json(path.read_text())
result = verify_proof_self_consistent(proof)
print(f"proof: {path}")
print(f" accused agent: {proof.accused_agent}")
print(f" statement_id: {proof.statement_id}")
print(f" turn: {proof.turn}")
print(f" spacelike: {proof.spacelike_verified}")
print(f" self-consistent: {result.ok}")
print(f" reason: {result.reason}")
return 0 if result.ok else 1
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
prog="crisis-agents",
description="Crisis-Agents — coordination layer for AI agent teams.",
)
sub = parser.add_subparsers(dest="cmd", required=True)
demo = sub.add_parser("demo", help="run a scripted scenario end-to-end")
demo.add_argument("--scenario", default="fact_check",
help="which scenario to run (default: fact_check)")
demo.add_argument("--live", action="store_true",
help="back the honest agents with real Claude API calls "
"(requires anthropic SDK + ANTHROPIC_API_KEY)")
demo.add_argument("--model", default=None,
help="Anthropic model id for --live (default: "
"claude-haiku-4-5-20251001)")
demo.add_argument("--out-dir", default=".",
help="where to write proof JSON files (default: cwd)")
verify = sub.add_parser("verify", help="check a proof JSON for self-consistency")
verify.add_argument("proof_path", help="path to a proof JSON file")
args = parser.parse_args(argv)
if args.cmd == "demo":
return _run_demo(args)
if args.cmd == "verify":
return _run_verify(args)
parser.error(f"unknown command: {args.cmd}")
return 2
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,320 @@
"""
Mothership the orchestrator that runs a Crisis-Agents network.
Two phases:
1. **Closed phase.** Agents talk freely. `run_closed_phase(N)` advances N
turns, collecting every claim into a flat log. No DAG, no voting, no
overhead. This is the "normal life" the user described.
2. **Crisis phase.** Triggered by `open_boundary(new_agent)`. From that
point on every claim is wrapped into a Crisis `Message`, extended into
per-agent `LamportGraph`s, and consensus algorithms run. Mutation
detection raises alarms; proofs are generated separately by `proof.py`.
The mothership keeps one LamportGraph per agent (the agent's view of the
network) and updates them in lockstep the PoC uses synchronous in-process
delivery, so all honest agents see the same vertices except where a
byzantine agent has selectively delivered an equivocation. Each honest
graph still observes both equivocating vertices once the network gossips
enough; that's what `LamportGraph.find_mutations` keys on.
"""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from typing import Optional
from crisis.graph import LamportGraph
from crisis.message import Message, NONCE_LENGTH
from crisis.weight import ProofOfWorkWeight
from crisis_agents.agent import AgentTurn, CrisisAgent
from crisis_agents.boundary import Boundary
from crisis_agents.claim import Claim
@dataclass
class ClosedPhaseEntry:
"""One row in the closed-phase log: who said what, when."""
agent_name: str
turn: int
claim: Claim
@dataclass
class CrisisPhaseEntry:
"""One row in the Crisis-phase log: the (agent, turn, claim, vertex_digest)
of an emitted-and-accepted Crisis message, plus the target subset (set of
peer names) it was delivered to. Useful as the raw material for proofs.
"""
agent_name: str
turn: int
claim: Claim
message_digest_hex: str
delivered_to: list[str]
@dataclass
class MothershipRunResult:
"""What `run_closed_phase` / `run_crisis_phase` returns."""
closed_log: list[ClosedPhaseEntry] = field(default_factory=list)
crisis_log: list[CrisisPhaseEntry] = field(default_factory=list)
class Mothership:
"""Orchestrates a team of CrisisAgents.
Usage:
m = Mothership()
m.add_agent(MockAgent("agent_a", ...))
m.add_agent(MockAgent("agent_b", ...))
m.add_agent(MockAgent("agent_c", ...))
# Closed phase
m.run_closed_phase(num_turns=2)
# Boundary opens
m.open_boundary(MockByzantineAgent("agent_d", ...))
# Crisis phase
m.run_crisis_phase(num_turns=5)
"""
def __init__(self, *, pow_zeros: int = 0):
self.agents: dict[str, CrisisAgent] = {}
self.boundary = Boundary()
self.run_result = MothershipRunResult()
# Per-agent Lamport graphs (only used in the Crisis phase).
self._graphs: dict[str, LamportGraph] = {}
self._weight_system = ProofOfWorkWeight(min_leading_zeros=pow_zeros)
# Independent turn counters: closed and Crisis phases share none.
# Crisis-phase turns count from 0 again so the proof JSON has a
# clean "round after boundary open" timeline.
self._closed_turn_index = 0
self._crisis_turn_index = 0
# ------------------------------------------------------------------
# Setup
# ------------------------------------------------------------------
def add_agent(self, agent: CrisisAgent) -> None:
"""Register a trusted agent (must be called before run_closed_phase)."""
if self.boundary.is_open:
raise RuntimeError("cannot add_agent after boundary opened; use open_boundary")
if agent.name in self.agents:
raise ValueError(f"agent {agent.name!r} already added")
self.agents[agent.name] = agent
self.boundary.add_trusted(agent.process_id)
# ------------------------------------------------------------------
# Phase 1: closed
# ------------------------------------------------------------------
def run_closed_phase(self, num_turns: int) -> MothershipRunResult:
"""Drive `num_turns` of plain agent communication. No Crisis.
Each turn:
- All agents see the cumulative claims observed so far.
- Each agent emits its scripted claims; each emission is appended
to the closed log.
"""
if self.boundary.is_open:
raise RuntimeError("boundary already open; closed phase is over")
observed: list[Claim] = [e.claim for e in self.run_result.closed_log]
for _ in range(num_turns):
turn = self._closed_turn_index
new_this_turn: list[Claim] = []
for agent in self.agents.values():
for at in agent.next_turn(turn, observed):
self.run_result.closed_log.append(
ClosedPhaseEntry(agent_name=agent.name, turn=turn, claim=at.claim)
)
new_this_turn.append(at.claim)
observed.extend(new_this_turn)
self._closed_turn_index += 1
return self.run_result
# ------------------------------------------------------------------
# Phase 2: boundary opens, Crisis activates
# ------------------------------------------------------------------
def open_boundary(self, new_agent: CrisisAgent) -> None:
"""The trigger: a new agent of unknown trust joins.
Crisis is now active for all subsequent claim emission. Each existing
agent gets a fresh LamportGraph; the new agent does too. From this
moment, `run_crisis_phase()` drives the consensus loop.
"""
if new_agent.name in self.agents:
raise ValueError(f"agent {new_agent.name!r} is already inside the boundary")
self.agents[new_agent.name] = new_agent
self.boundary.open(new_agent.process_id)
# Initialize a graph for every agent (including the joiner).
for name in self.agents:
self._graphs[name] = LamportGraph(weight_system=self._weight_system)
# ------------------------------------------------------------------
# Phase 2 mechanics: building Crisis messages from Claims
# ------------------------------------------------------------------
def _wrap_as_message(self, agent: CrisisAgent, claim: Claim,
graph: LamportGraph) -> Message:
"""Convert a Claim into a Crisis Message and return it (un-extended).
Builds the digests tuple per Algorithm 1:
- reference the agent's own last vertex in `graph` (chain link),
- cross-reference one most-recent vertex per other id (sample).
Mines a PoW nonce that satisfies the weight system.
"""
payload = claim.to_payload()
# Last vertex with this agent's id (chain link, if any)
same_id = [v for v in graph.all_vertices() if v.id == agent.process_id]
digests_list: list[bytes] = []
if same_id:
# Pick the one not referenced by any other same-id vertex
referenced = set()
for v in same_id:
for d in v.digests:
ref = graph.get_vertex(d)
if ref is not None and ref.id == agent.process_id:
referenced.add(d)
last = next(
(v for v in same_id if v.message_digest not in referenced),
same_id[-1],
)
digests_list.append(last.message_digest)
past_digests = {v.message_digest for v in graph.past(last)}
else:
past_digests = set()
# Cross-references: one most-recent vertex per other id
seen_other_ids: set[bytes] = {agent.process_id}
for v in graph.all_vertices():
if v.id in seen_other_ids:
continue
if v.message_digest in past_digests:
continue
digests_list.append(v.message_digest)
seen_other_ids.add(v.id)
# Mine a valid nonce; reuse the weight system
if isinstance(self._weight_system, ProofOfWorkWeight):
return self._weight_system.mine_nonce(
agent.process_id, tuple(digests_list), payload
)
else:
return Message(
nonce=os.urandom(NONCE_LENGTH),
id=agent.process_id,
digests=tuple(digests_list),
payload=payload,
)
def _deliver(self, sender: CrisisAgent, message: Message,
target_names: list[str]) -> None:
"""Extend the message into the LamportGraphs of `target_names`."""
for name in target_names:
graph = self._graphs[name]
graph.extend(message)
# ------------------------------------------------------------------
# Phase 2: the Crisis-active run loop
# ------------------------------------------------------------------
def run_crisis_phase(self, num_turns: int) -> MothershipRunResult:
"""Drive `num_turns` of agent activity with Crisis active.
Each turn:
1. Every agent (including the byzantine joiner) is asked for its
emissions. Each emission carries an optional `target_subset`.
2. Each emission is wrapped into a Crisis Message against the
SENDER's view of the graph (the agent's own LamportGraph).
3. The Message is delivered (extended) into the LamportGraphs of
every peer in the target subset (or every peer if None).
4. The (agent, turn, claim, message_digest, delivered_to) tuple
is logged for downstream proof generation.
"""
if not self.boundary.is_open:
raise RuntimeError("boundary not yet open; call open_boundary() first")
all_names = list(self.agents.keys())
for _ in range(num_turns):
turn = self._crisis_turn_index
# Snapshot of received claims per agent — for the agent's view
# of the conversation when it decides what to say next. In the
# PoC this is the agent's graph's vertex set, decoded back to
# Claim objects.
for agent in self.agents.values():
received: list[Claim] = []
for v in self._graphs[agent.name].all_vertices():
if v.id == agent.process_id:
continue
try:
received.append(Claim.from_payload(v.payload))
except (ValueError, TypeError):
# Not all vertices need to be Claim-shaped (defensive)
continue
for at in agent.next_turn(turn, received):
self._emit(agent, turn, at, all_names)
self._crisis_turn_index += 1
return self.run_result
def _emit(self, agent: CrisisAgent, turn: int, at: AgentTurn,
all_names: list[str]) -> None:
"""Resolve target subset, build message, deliver, log.
Delivery rule:
- target_subset is None honest broadcast to all peers including
the sender's own graph.
- target_subset is set targeted delivery; the sender's own
graph is NOT auto-included. This is what enables byzantine
equivocation: a byzantine sender emits two variants with
disjoint targets, and its own graph holds neither otherwise
the second variant would fail the same-id chain constraint
against the first variant.
The byzantine still "knows" what it said via the crisis_log; what
it doesn't keep in its own LamportGraph is the conflicting state.
"""
if at.target_subset is None:
targets = list(all_names)
else:
targets = [t for t in at.target_subset if t in self.agents]
msg = self._wrap_as_message(agent, at.claim, self._graphs[agent.name])
self._deliver(agent, msg, targets)
self.run_result.crisis_log.append(
CrisisPhaseEntry(
agent_name=agent.name,
turn=turn,
claim=at.claim,
message_digest_hex=msg.compute_digest().hex(),
delivered_to=targets,
)
)
# ------------------------------------------------------------------
# Read-only accessors (used by alarm.py and proof.py in later phases)
# ------------------------------------------------------------------
def graph_of(self, agent_name: str) -> LamportGraph:
"""The LamportGraph held by `agent_name` (Crisis phase only)."""
if agent_name not in self._graphs:
raise KeyError(f"no Crisis-phase graph for agent {agent_name!r}")
return self._graphs[agent_name]
def all_graphs(self) -> dict[str, LamportGraph]:
return dict(self._graphs)

181
src/crisis_agents/proof.py Normal file
View file

@ -0,0 +1,181 @@
"""
proof.py emit and verify replayable proof-of-malfeasance JSON documents.
A ProofDocument is a self-contained JSON file that:
1. Names the accused agent (human-readable name + 32-byte process_id).
2. Identifies the offense (statement_id, turn).
3. Includes every contradictory MutationWitness with its message digest,
parsed Claim, and delivery target set.
4. Records the "DAG witness" for each witness vertex, which honest
agents' graphs hold it. An independent verifier can cross-check
this against the recorded simulation snapshots.
5. Asserts whether the Crisis layer confirmed spacelike-ness at the
time of detection.
The proof is replayable: given the JSON and the original `crisis_log`
(or a recorded simulation), `verify_proof` re-derives the alarm and
confirms each claim independently.
"""
from __future__ import annotations
import json
from dataclasses import asdict, dataclass, field
from typing import TYPE_CHECKING
from crisis_agents.alarm import AlarmEvent, MutationWitness
if TYPE_CHECKING:
from crisis_agents.mothership import Mothership
@dataclass(frozen=True)
class WitnessGraphReference:
"""For one mutation witness, which honest agents' graphs hold it."""
message_digest_hex: str
observed_by: tuple[str, ...] # honest agent names whose LamportGraph
# contains a vertex with this digest
@dataclass(frozen=True)
class ProofDocument:
"""A replayable proof of one detected byzantine equivocation."""
schema_version: int = 1
accused_agent: str = ""
accused_process_id_hex: str = ""
statement_id: str = ""
turn: int = 0
witnesses: tuple[MutationWitness, ...] = ()
dag_witnesses: tuple[WitnessGraphReference, ...] = ()
spacelike_verified: bool = False
proof_summary: str = ""
def to_json(self) -> str:
"""Serialize to indented JSON. Uses asdict on the nested dataclasses
so the resulting structure is plain dict / list / str / int / bool
cleanly inspectable with `jq` and re-parseable."""
return json.dumps(asdict(self), indent=2, sort_keys=True)
@classmethod
def from_json(cls, text: str) -> "ProofDocument":
obj = json.loads(text)
return cls(
schema_version=obj.get("schema_version", 1),
accused_agent=obj["accused_agent"],
accused_process_id_hex=obj["accused_process_id_hex"],
statement_id=obj["statement_id"],
turn=obj["turn"],
witnesses=tuple(
MutationWitness(
message_digest_hex=w["message_digest_hex"],
payload_claim=w["payload_claim"],
delivered_to=tuple(w["delivered_to"]),
)
for w in obj["witnesses"]
),
dag_witnesses=tuple(
WitnessGraphReference(
message_digest_hex=g["message_digest_hex"],
observed_by=tuple(g["observed_by"]),
)
for g in obj["dag_witnesses"]
),
spacelike_verified=obj["spacelike_verified"],
proof_summary=obj["proof_summary"],
)
def build_proof(mothership: "Mothership", alarm: AlarmEvent) -> ProofDocument:
"""Produce the ProofDocument for a single alarm.
Cross-references each witness against every honest agent's LamportGraph
so the proof carries the "who saw what" structure.
"""
accused_pid = mothership.agents[alarm.accused_agent].process_id
honest_names = [
name for name, ag in mothership.agents.items()
if ag.process_id != accused_pid
]
dag_refs = []
for w in alarm.witnesses:
digest = bytes.fromhex(w.message_digest_hex)
observed_by = tuple(
name for name in honest_names
if digest in mothership.graph_of(name)
)
dag_refs.append(WitnessGraphReference(
message_digest_hex=w.message_digest_hex,
observed_by=observed_by,
))
summary = (
f"agent {alarm.accused_agent!r} (id={alarm.accused_process_id_hex[:16]}...) "
f"emitted {len(alarm.witnesses)} contradictory Crisis vertices for "
f"statement {alarm.statement_id!r} in turn {alarm.turn}; vertices "
f"{'are confirmed' if alarm.spacelike_verified else 'appear to be'} "
f"spacelike in the DAG of at least one honest agent."
)
return ProofDocument(
accused_agent=alarm.accused_agent,
accused_process_id_hex=alarm.accused_process_id_hex,
statement_id=alarm.statement_id,
turn=alarm.turn,
witnesses=alarm.witnesses,
dag_witnesses=tuple(dag_refs),
spacelike_verified=alarm.spacelike_verified,
proof_summary=summary,
)
@dataclass(frozen=True)
class VerificationResult:
ok: bool
reason: str
def verify_proof_self_consistent(proof: ProofDocument) -> VerificationResult:
"""Verify the proof is self-consistent — without re-running the simulation.
Checks:
- schema_version is known
- at least 2 witnesses
- witness message digests are pairwise distinct
- witness delivery sets are pairwise non-identical
- witnesses agree on the statement_id and turn fields named in the proof
- dag_witnesses cover every witness digest
What we do NOT check here (would require the recorded simulation):
- that the digests correspond to real PoW-mined Crisis Messages
- that the spacelike-verified flag matches a fresh DAG re-derivation
A future `verify_proof_against_log(proof, recorded_events)` would close
that gap.
"""
if proof.schema_version != 1:
return VerificationResult(False, f"unsupported schema_version {proof.schema_version}")
if len(proof.witnesses) < 2:
return VerificationResult(False, "fewer than 2 witnesses — no equivocation")
digests = [w.message_digest_hex for w in proof.witnesses]
if len(set(digests)) != len(digests):
return VerificationResult(False, "duplicate witness digests")
deliveries = {tuple(w.delivered_to) for w in proof.witnesses}
if len(deliveries) < 2:
return VerificationResult(False, "all witnesses have identical delivery sets")
for w in proof.witnesses:
if w.payload_claim.get("statement_id") != proof.statement_id:
return VerificationResult(False, f"witness disagrees on statement_id: {w}")
dag_digests = {g.message_digest_hex for g in proof.dag_witnesses}
if set(digests) - dag_digests:
return VerificationResult(False, "dag_witnesses missing for some witness")
return VerificationResult(
True,
f"proof is self-consistent: {len(proof.witnesses)} contradictory "
f"witnesses for statement {proof.statement_id!r} at turn {proof.turn}",
)

View file

@ -0,0 +1,10 @@
"""Demo scenarios for the crisis-agents CLI.
A scenario is a self-contained recipe: a reference document, a set of
statements to adjudicate, scripted Claims for each honest agent, and a
byzantine pair designed to trigger one equivocation alarm.
"""
from crisis_agents.scenarios.fact_check import build_fact_check_scenario
__all__ = ["build_fact_check_scenario"]

View file

@ -0,0 +1,186 @@
"""
fact_check scenario the canonical PoC demo.
A team of three honest agents fact-checks six statements against a short
reference document. After the closed phase, a fourth agent (the byzantine)
joins. Crisis is activated. For statement s03 the byzantine equivocates:
it tells one peer set the statement is TRUE and another peer set it is
FALSE. For every other statement it agrees with ground truth (so it can't
be dismissed as a low-vote outlier).
The scenario is fully deterministic in mocked mode the same seed
produces the same Crisis log every time which is what makes it a
useful regression fixture.
"""
from __future__ import annotations
from dataclasses import dataclass
from importlib import resources
from crisis_agents.agent import (
CrisisAgent,
MockAgent,
MockByzantineAgent,
)
from crisis_agents.claim import Claim
@dataclass(frozen=True)
class Statement:
"""A single proposition the team must adjudicate."""
id: str
text: str
ground_truth: str # "true" | "false"
STATEMENTS: tuple[Statement, ...] = (
Statement("s01", "Water boils at 100°C at standard pressure.", "true"),
Statement("s02", "The speed of light in vacuum is a defined constant.", "true"),
Statement("s03", "Pluto is still classified as a planet by the IAU.", "false"),
Statement("s04", "The Sun is approximately 4.6 billion years old.", "true"),
Statement("s05", "The Moon's diameter is half of the Earth's diameter.", "false"),
Statement("s06", "Sound can travel through a vacuum.", "false"),
)
def load_reference_doc() -> str:
"""Return the reference document's full text."""
return resources.files("crisis_agents.scenarios").joinpath(
"reference_doc.txt"
).read_text(encoding="utf-8")
def _honest_claims(agent_name: str) -> list[list[Claim]]:
"""Build one turn's worth of honest claims — one Claim per statement,
all matching ground truth, with slightly varied confidence per agent.
Returns a single-turn script: `[ [claim_s01, claim_s02, ..., claim_s06] ]`.
"""
# Tiny per-agent confidence offset so claims aren't byte-identical
# (though identical payloads would also be fine — Crisis dedupes on
# message digest including the nonce).
confidence_offset = {
"agent_alpha": 0.95,
"agent_beta": 0.90,
"agent_gamma": 0.92,
}
base = confidence_offset.get(agent_name, 0.90)
turn0: list[Claim] = []
for st in STATEMENTS:
turn0.append(Claim(
statement_id=st.id,
verdict=st.ground_truth, # type: ignore[arg-type]
confidence=base,
evidence=f"per reference doc — {agent_name}",
timestamp_logical=0,
))
return [turn0]
def build_honest_agents() -> list[CrisisAgent]:
"""The three trusted agents for the closed-phase team (mocked)."""
return [
MockAgent("agent_alpha", _honest_claims("agent_alpha")),
MockAgent("agent_beta", _honest_claims("agent_beta")),
MockAgent("agent_gamma", _honest_claims("agent_gamma")),
]
def build_live_honest_agents(model: str | None = None) -> list[CrisisAgent]:
"""The three honest agents in `--live` mode — backed by real Claude API."""
# Lazy import so the anthropic SDK isn't required for the mocked path.
from crisis_agents.live_agent import DEFAULT_MODEL, LiveClaudeAgent
statement_dicts = [{"id": s.id, "text": s.text} for s in STATEMENTS]
selected_model = model or DEFAULT_MODEL
ref = load_reference_doc()
return [
LiveClaudeAgent("agent_alpha", reference_doc=ref,
statements=statement_dicts, model=selected_model),
LiveClaudeAgent("agent_beta", reference_doc=ref,
statements=statement_dicts, model=selected_model),
LiveClaudeAgent("agent_gamma", reference_doc=ref,
statements=statement_dicts, model=selected_model),
]
def build_byzantine_joiner() -> CrisisAgent:
"""The fourth agent — joins after the boundary opens.
For s03 (whose ground truth is FALSE) the byzantine tells α and γ "true"
but tells β "false". For every other statement it tells everyone the
ground-truth answer so vote weight doesn't isolate it as a simple
outlier.
Because MockByzantineAgent emits two variants per scripted_pair on the
same turn, we model "agrees with everyone" by giving both variants the
same content; only s03 produces a genuine equivocation. To keep
delivery semantics clean, the byzantine's non-equivocating statements
use a small dedicated agent name list to remain a single broadcast.
Simpler approach used here: emit only ONE equivocation slot the s03
one on turn 0, and skip the other statements. The honest majority
already covers s01-s06 with consistent claims; the byzantine doesn't
need to vote on every statement for the demo to read as "byzantine
caught equivocating".
"""
pair_s03_true = Claim(
statement_id="s03", verdict="true", confidence=0.85,
evidence="claims Pluto is still a planet, contradicts ref doc",
timestamp_logical=0,
)
pair_s03_false = Claim(
statement_id="s03", verdict="false", confidence=0.85,
evidence="agrees Pluto was reclassified, matches ref doc",
timestamp_logical=0,
)
return MockByzantineAgent(
name="agent_delta",
scripted_pairs=[(pair_s03_true, pair_s03_false)],
split_a={"agent_alpha", "agent_gamma"},
split_b={"agent_beta"},
)
@dataclass
class Scenario:
name: str
description: str
closed_phase_turns: int
crisis_phase_turns: int
honest_agents: list[CrisisAgent]
byzantine_joiner: CrisisAgent
reference_doc: str
def build_fact_check_scenario(*, live: bool = False,
model: str | None = None) -> Scenario:
"""Wire together the reference doc, statements, agents, and run lengths.
In `live` mode, the three honest agents are replaced with LiveClaudeAgent
instances backed by real Anthropic API calls. The byzantine joiner stays
mocked even in live mode see live_agent.py for the rationale.
"""
if live:
honest = build_live_honest_agents(model=model)
suffix = " (live: honest agents are real Claude; byzantine is scripted)"
else:
honest = build_honest_agents()
suffix = " (mocked, deterministic)"
return Scenario(
name="fact_check",
description=(
"Three honest agents adjudicate six factual statements against "
"a small reference doc. A fourth agent joins the team after the "
"boundary opens and equivocates on statement s03 — Crisis "
"should detect this." + suffix
),
closed_phase_turns=1,
crisis_phase_turns=1,
honest_agents=honest,
byzantine_joiner=build_byzantine_joiner(),
reference_doc=load_reference_doc(),
)

View file

@ -0,0 +1,11 @@
REFERENCE DOCUMENT — Astronomy & Physics Basics
Water boils at 100 degrees Celsius (212 degrees Fahrenheit) at standard
atmospheric pressure of one atmosphere. The speed of light in a vacuum is
exactly 299,792,458 metres per second; this is a defined constant rather
than a measured quantity. Pluto was reclassified from a planet to a dwarf
planet in 2006 by the International Astronomical Union. The Sun is roughly
4.6 billion years old and is about halfway through its main-sequence
lifetime. The diameter of the Moon is approximately one-quarter the
diameter of the Earth. Sound cannot travel through a vacuum because it
requires a medium to propagate.

103
tests/test_alarm.py Normal file
View file

@ -0,0 +1,103 @@
"""Tests for byzantine equivocation detection."""
import pytest
from crisis_agents.agent import MockAgent, MockByzantineAgent
from crisis_agents.alarm import AlarmEvent, scan_for_mutations
from crisis_agents.claim import Claim
from crisis_agents.mothership import Mothership
def _claim(sid: str, verdict: str = "true", evidence: str = "ok") -> Claim:
return Claim(statement_id=sid, verdict=verdict, confidence=0.9, # type: ignore[arg-type]
evidence=evidence, timestamp_logical=0)
def _equivocating_team() -> Mothership:
"""A 3-honest-1-byzantine team where the byzantine equivocates on s03."""
m = Mothership()
m.add_agent(MockAgent("a", [[]]))
m.add_agent(MockAgent("b", [[]]))
m.add_agent(MockAgent("c", [[]]))
m.open_boundary(MockByzantineAgent(
"d",
scripted_pairs=[(
_claim("s03", verdict="true", evidence="to_a"),
_claim("s03", verdict="false", evidence="to_b"),
)],
split_a={"a", "c"},
split_b={"b"},
))
return m
class TestAlarmDetection:
def test_no_alarms_in_honest_run(self):
m = Mothership()
m.add_agent(MockAgent("a", [[]]))
m.add_agent(MockAgent("b", [[]]))
m.open_boundary(MockAgent("d", [[_claim("s01")]]))
m.run_crisis_phase(num_turns=1)
alarms = scan_for_mutations(m)
assert alarms == []
def test_equivocation_raises_one_alarm(self):
m = _equivocating_team()
m.run_crisis_phase(num_turns=1)
alarms = scan_for_mutations(m)
assert len(alarms) == 1
a = alarms[0]
assert isinstance(a, AlarmEvent)
assert a.accused_agent == "d"
assert a.statement_id == "s03"
assert a.turn == 0
assert len(a.witnesses) == 2
def test_witness_digests_are_distinct(self):
m = _equivocating_team()
m.run_crisis_phase(num_turns=1)
a = scan_for_mutations(m)[0]
d1 = a.witnesses[0].message_digest_hex
d2 = a.witnesses[1].message_digest_hex
assert d1 != d2
def test_delivery_sets_are_disjoint(self):
m = _equivocating_team()
m.run_crisis_phase(num_turns=1)
a = scan_for_mutations(m)[0]
s1 = set(a.witnesses[0].delivered_to)
s2 = set(a.witnesses[1].delivered_to)
assert s1 & s2 == set()
def test_spacelike_verified_is_true(self):
"""The Crisis layer should confirm the witness vertices are causally
incomparable in at least one honest graph."""
m = _equivocating_team()
m.run_crisis_phase(num_turns=1)
a = scan_for_mutations(m)[0]
assert a.spacelike_verified is True
def test_duplicate_broadcast_is_not_equivocation(self):
"""If a byzantine emits the SAME payload to two disjoint subsets,
the message digests are identical and it's not equivocation."""
same = _claim("s03", verdict="true", evidence="same evidence")
m = Mothership()
m.add_agent(MockAgent("a", [[]]))
m.add_agent(MockAgent("b", [[]]))
m.open_boundary(MockByzantineAgent(
"d",
scripted_pairs=[(same, same)],
split_a={"a"},
split_b={"b"},
))
m.run_crisis_phase(num_turns=1)
alarms = scan_for_mutations(m)
# Same payload → same nonce-mined message after PoW → same digest →
# no equivocation. (The byzantine has to actually say different
# things to be caught.)
assert alarms == [] or all(
len({w.message_digest_hex for w in alarm.witnesses}) > 1
for alarm in alarms
)

50
tests/test_boundary.py Normal file
View file

@ -0,0 +1,50 @@
"""Tests for the Boundary membership tracker."""
import pytest
from crisis.crypto import digest
from crisis.message import ID_LENGTH
from crisis_agents.boundary import Boundary
def pid(name: str) -> bytes:
return digest(name.encode())[:ID_LENGTH]
class TestBoundary:
def test_initially_closed_and_empty(self):
b = Boundary()
assert not b.is_open
assert b.size() == 0
def test_add_trusted_in_closed_phase(self):
b = Boundary()
b.add_trusted(pid("a"))
b.add_trusted(pid("b"))
assert b.size() == 2
assert b.is_trusted(pid("a"))
assert b.is_trusted(pid("b"))
assert not b.is_trusted(pid("c"))
def test_open_flips_flag_and_adds_id(self):
b = Boundary()
b.add_trusted(pid("a"))
b.open(pid("new"))
assert b.is_open
assert b.is_trusted(pid("new"))
assert b.size() == 2
def test_add_trusted_after_open_rejected(self):
b = Boundary()
b.open(pid("a"))
with pytest.raises(RuntimeError, match="already open"):
b.add_trusted(pid("b"))
def test_open_is_idempotent_on_state(self):
"""Calling open() a second time with a new id just adds the id."""
b = Boundary()
b.open(pid("first"))
b.open(pid("second"))
assert b.is_open
assert b.size() == 2

104
tests/test_claim.py Normal file
View file

@ -0,0 +1,104 @@
"""Tests for the Claim payload dataclass."""
import json
import pytest
from crisis_agents.claim import Claim
class TestClaimConstruction:
def test_basic_claim_roundtrip(self):
c = Claim(
statement_id="s01",
verdict="true",
confidence=0.95,
evidence="The reference doc states this directly in paragraph 2.",
timestamp_logical=3,
)
assert c.statement_id == "s01"
assert c.verdict == "true"
assert c.confidence == pytest.approx(0.95)
assert c.schema_version == 1
def test_unknown_verdict_accepted(self):
c = Claim(statement_id="s02", verdict="unknown", confidence=0.5,
evidence="no signal", timestamp_logical=0)
assert c.verdict == "unknown"
class TestClaimValidation:
def test_empty_statement_id_rejected(self):
with pytest.raises(ValueError, match="statement_id"):
Claim(statement_id="", verdict="true", confidence=0.9,
evidence="x", timestamp_logical=0)
def test_invalid_verdict_rejected(self):
with pytest.raises(ValueError, match="verdict"):
Claim(statement_id="s01", verdict="maybe", # type: ignore[arg-type]
confidence=0.9, evidence="x", timestamp_logical=0)
def test_confidence_out_of_range(self):
with pytest.raises(ValueError, match="confidence"):
Claim(statement_id="s01", verdict="true", confidence=1.5,
evidence="x", timestamp_logical=0)
with pytest.raises(ValueError, match="confidence"):
Claim(statement_id="s01", verdict="true", confidence=-0.1,
evidence="x", timestamp_logical=0)
def test_evidence_too_long(self):
with pytest.raises(ValueError, match="evidence too long"):
Claim(statement_id="s01", verdict="true", confidence=0.9,
evidence="x" * 500, timestamp_logical=0)
def test_negative_timestamp_rejected(self):
with pytest.raises(ValueError, match="timestamp"):
Claim(statement_id="s01", verdict="true", confidence=0.9,
evidence="x", timestamp_logical=-1)
class TestPayloadRoundtrip:
def test_to_payload_returns_bytes(self):
c = Claim(statement_id="s01", verdict="true", confidence=0.9,
evidence="ok", timestamp_logical=2)
b = c.to_payload()
assert isinstance(b, bytes)
# Valid JSON
obj = json.loads(b.decode("utf-8"))
assert obj["statement_id"] == "s01"
def test_from_payload_inverts_to_payload(self):
original = Claim(statement_id="s04", verdict="false", confidence=0.72,
evidence="contradicted by para 3", timestamp_logical=7)
roundtrip = Claim.from_payload(original.to_payload())
assert roundtrip == original
def test_payload_is_deterministic(self):
"""Same logical claim must produce identical bytes — required so two
equivocating-but-payload-identical messages can be detected as the
same payload (vs. different payload = real equivocation)."""
c1 = Claim(statement_id="s01", verdict="true", confidence=0.9,
evidence="evidence here", timestamp_logical=1)
c2 = Claim(statement_id="s01", verdict="true", confidence=0.9,
evidence="evidence here", timestamp_logical=1)
assert c1.to_payload() == c2.to_payload()
def test_from_payload_rejects_garbage(self):
with pytest.raises(ValueError, match="Claim JSON"):
Claim.from_payload(b"not json")
def test_from_payload_rejects_missing_fields(self):
with pytest.raises(TypeError):
# Missing required statement_id
Claim.from_payload(b'{"verdict":"true","confidence":0.9,"evidence":"x","timestamp_logical":0}')
def test_payload_size_is_bounded(self):
"""Confirms the EVIDENCE_MAX_LEN cap keeps payload under a sane size."""
c = Claim(statement_id="s99", verdict="true", confidence=1.0,
evidence="x" * 280, timestamp_logical=999)
b = c.to_payload()
# JSON overhead + 280 evidence + small fields ~= < 400 bytes
assert len(b) < 500

View file

@ -0,0 +1,101 @@
"""End-to-end test: fact_check scenario runs cleanly and emits a valid proof."""
import json
from pathlib import Path
from crisis_agents.alarm import scan_for_mutations
from crisis_agents.cli import main as cli_main
from crisis_agents.mothership import Mothership
from crisis_agents.proof import (
build_proof,
verify_proof_self_consistent,
)
from crisis_agents.scenarios import build_fact_check_scenario
class TestFactCheckEndToEnd:
def test_scenario_loads(self):
s = build_fact_check_scenario()
assert s.name == "fact_check"
assert len(s.honest_agents) == 3
assert s.byzantine_joiner.name == "agent_delta"
assert "Pluto" in s.reference_doc
def test_runs_through_both_phases_and_raises_one_alarm(self):
s = build_fact_check_scenario()
m = Mothership()
for agent in s.honest_agents:
m.add_agent(agent)
m.run_closed_phase(num_turns=s.closed_phase_turns)
assert len(m.run_result.closed_log) == 3 * 6 # 3 agents × 6 statements
assert m.all_graphs() == {} # no DAG in closed phase
m.open_boundary(s.byzantine_joiner)
m.run_crisis_phase(num_turns=s.crisis_phase_turns)
# The byzantine emitted two contradictory variants of s03;
# honest agents emitted nothing in the Crisis phase (their script
# was exhausted in the closed phase).
assert len(m.run_result.crisis_log) == 2
alarms = scan_for_mutations(m)
assert len(alarms) == 1
a = alarms[0]
assert a.accused_agent == "agent_delta"
assert a.statement_id == "s03"
assert a.spacelike_verified is True
def test_proof_is_self_consistent_and_round_trips(self, tmp_path):
s = build_fact_check_scenario()
m = Mothership()
for agent in s.honest_agents:
m.add_agent(agent)
m.run_closed_phase(num_turns=s.closed_phase_turns)
m.open_boundary(s.byzantine_joiner)
m.run_crisis_phase(num_turns=s.crisis_phase_turns)
alarm = scan_for_mutations(m)[0]
proof = build_proof(m, alarm)
out = tmp_path / "proof.json"
out.write_text(proof.to_json())
# Reload, re-verify
from crisis_agents.proof import ProofDocument
reloaded = ProofDocument.from_json(out.read_text())
assert verify_proof_self_consistent(reloaded).ok
class TestCli:
def test_cli_demo_runs(self, tmp_path, capsys):
exit_code = cli_main(["demo", "--scenario", "fact_check",
"--out-dir", str(tmp_path)])
assert exit_code == 0
captured = capsys.readouterr()
assert "crisis-agents demo" in captured.out
assert "Phase 1" in captured.out
assert "Phase 2" in captured.out
assert "alarm" in captured.out.lower()
# A proof file landed
proofs = list(tmp_path.glob("proof_*.json"))
assert len(proofs) == 1
# The proof file is valid JSON
obj = json.loads(proofs[0].read_text())
assert obj["accused_agent"] == "agent_delta"
def test_cli_verify_passes_on_valid_proof(self, tmp_path, capsys):
# First produce a proof via the demo
cli_main(["demo", "--scenario", "fact_check", "--out-dir", str(tmp_path)])
proof_path = next(tmp_path.glob("proof_*.json"))
capsys.readouterr() # drain demo output
exit_code = cli_main(["verify", str(proof_path)])
assert exit_code == 0
out = capsys.readouterr().out
assert "self-consistent: True" in out
def test_cli_unknown_scenario(self, capsys):
exit_code = cli_main(["demo", "--scenario", "nonexistent"])
assert exit_code == 2

134
tests/test_mothership.py Normal file
View file

@ -0,0 +1,134 @@
"""Tests for the Mothership orchestrator — closed phase + Crisis-phase wiring."""
import pytest
from crisis_agents.agent import MockAgent, MockByzantineAgent
from crisis_agents.claim import Claim
from crisis_agents.mothership import Mothership
def _claim(sid: str, verdict: str = "true", turn: int = 0, evidence: str = "ok") -> Claim:
return Claim(
statement_id=sid, verdict=verdict, confidence=0.9, # type: ignore[arg-type]
evidence=evidence, timestamp_logical=turn,
)
class TestClosedPhase:
def test_no_dag_in_closed_phase(self):
m = Mothership()
m.add_agent(MockAgent("a", [[_claim("s01")]]))
m.add_agent(MockAgent("b", [[_claim("s01")]]))
result = m.run_closed_phase(num_turns=1)
# Two agents emitted one claim each
assert len(result.closed_log) == 2
names = [e.agent_name for e in result.closed_log]
assert "a" in names and "b" in names
# No graphs allocated
assert m.all_graphs() == {}
assert not m.boundary.is_open
def test_multi_turn_observes_prior_claims(self):
"""Each turn's agents see the claims emitted in previous turns."""
class WatcherAgent(MockAgent):
def __init__(self, name):
super().__init__(name, [[_claim("s01")], [_claim("s02")]])
self.received_per_turn: list[int] = []
def next_turn(self, turn, received):
self.received_per_turn.append(len(received))
return super().next_turn(turn, received)
w = WatcherAgent("watcher")
other = MockAgent("other", [[_claim("s99")], [_claim("s99")]])
m = Mothership()
m.add_agent(w)
m.add_agent(other)
m.run_closed_phase(num_turns=2)
# Turn 0: watcher sees 0 prior claims; Turn 1: watcher sees 2 from turn 0.
assert w.received_per_turn == [0, 2]
def test_add_agent_after_open_rejected(self):
m = Mothership()
m.add_agent(MockAgent("a", [[_claim("s01")]]))
m.open_boundary(MockAgent("byz", [[_claim("s01")]]))
with pytest.raises(RuntimeError, match="cannot add_agent"):
m.add_agent(MockAgent("late", []))
class TestCrisisPhaseWiring:
def test_open_boundary_initializes_graphs(self):
m = Mothership()
m.add_agent(MockAgent("a", [[_claim("s01")]]))
m.add_agent(MockAgent("b", [[_claim("s01")]]))
m.open_boundary(MockAgent("d", [[_claim("s01")]]))
# One graph per agent, including the joiner
graphs = m.all_graphs()
assert set(graphs.keys()) == {"a", "b", "d"}
for g in graphs.values():
assert g.vertex_count() == 0 # not yet run
def test_run_crisis_phase_extends_per_agent_graphs(self):
m = Mothership()
m.add_agent(MockAgent("a", [[_claim("s01")]]))
m.add_agent(MockAgent("b", [[_claim("s01")]]))
m.open_boundary(MockAgent("d", [[_claim("s01")]]))
result = m.run_crisis_phase(num_turns=1)
# Each agent's graph should now contain three vertices
# (broadcast claims from a, b, d delivered to everyone).
for name in ("a", "b", "d"):
assert m.graph_of(name).vertex_count() == 3
assert len(result.crisis_log) == 3
for entry in result.crisis_log:
assert set(entry.delivered_to) == {"a", "b", "d"}
def test_byzantine_equivocation_splits_delivery(self):
"""A MockByzantineAgent delivers two different claims to disjoint
subsets the foundation of the equivocation detection demo."""
m = Mothership()
m.add_agent(MockAgent("a", [[]]))
m.add_agent(MockAgent("b", [[]]))
byz = MockByzantineAgent(
"d",
scripted_pairs=[(
_claim("s01", verdict="true", evidence="to_a"),
_claim("s01", verdict="false", evidence="to_b"),
)],
split_a={"a"},
split_b={"b"},
)
m.open_boundary(byz)
m.run_crisis_phase(num_turns=1)
# a sees the true-variant, b sees the false-variant.
# d (the byzantine sender) sees neither — see Mothership._emit docstring.
a_payloads = [v.payload for v in m.graph_of("a").all_vertices()]
b_payloads = [v.payload for v in m.graph_of("b").all_vertices()]
d_payloads = [v.payload for v in m.graph_of("d").all_vertices()]
assert any(b'"verdict":"true"' in p for p in a_payloads)
assert all(b'"verdict":"false"' not in p for p in a_payloads)
assert any(b'"verdict":"false"' in p for p in b_payloads)
assert all(b'"verdict":"true"' not in p for p in b_payloads)
# d's own graph holds neither equivocation (targeted delivery skips sender).
assert len(d_payloads) == 0
# But the crisis_log records both emissions so the mothership can
# generate proofs from this perspective.
assert len(m.run_result.crisis_log) == 2
def test_run_crisis_phase_requires_open_boundary(self):
m = Mothership()
m.add_agent(MockAgent("a", [[_claim("s01")]]))
with pytest.raises(RuntimeError, match="boundary not yet open"):
m.run_crisis_phase(num_turns=1)

138
tests/test_proof.py Normal file
View file

@ -0,0 +1,138 @@
"""Tests for proof generation and self-consistent verification."""
import json
from crisis_agents.agent import MockAgent, MockByzantineAgent
from crisis_agents.alarm import scan_for_mutations
from crisis_agents.claim import Claim
from crisis_agents.mothership import Mothership
from crisis_agents.proof import (
ProofDocument,
build_proof,
verify_proof_self_consistent,
)
def _claim(sid: str, verdict: str = "true", evidence: str = "ok") -> Claim:
return Claim(statement_id=sid, verdict=verdict, confidence=0.9, # type: ignore[arg-type]
evidence=evidence, timestamp_logical=0)
def _equivocating_run() -> tuple[Mothership, list]:
m = Mothership()
m.add_agent(MockAgent("a", [[]]))
m.add_agent(MockAgent("b", [[]]))
m.add_agent(MockAgent("c", [[]]))
m.open_boundary(MockByzantineAgent(
"d",
scripted_pairs=[(
_claim("s03", verdict="true", evidence="to_a"),
_claim("s03", verdict="false", evidence="to_b"),
)],
split_a={"a", "c"},
split_b={"b"},
))
m.run_crisis_phase(num_turns=1)
alarms = scan_for_mutations(m)
return m, alarms
class TestBuildProof:
def test_produces_well_formed_proof(self):
m, alarms = _equivocating_run()
assert len(alarms) == 1
proof = build_proof(m, alarms[0])
assert proof.accused_agent == "d"
assert proof.statement_id == "s03"
assert proof.turn == 0
assert len(proof.witnesses) == 2
assert proof.spacelike_verified is True
def test_dag_witnesses_reference_real_graphs(self):
m, alarms = _equivocating_run()
proof = build_proof(m, alarms[0])
assert len(proof.dag_witnesses) == 2
# Each dag_witness should name only honest agents (not "d")
for dw in proof.dag_witnesses:
assert "d" not in dw.observed_by
# Each variant should have been observed by at least one honest agent
# (the variant's delivered-to subset)
for dw in proof.dag_witnesses:
assert len(dw.observed_by) >= 1
class TestJsonRoundtrip:
def test_to_json_is_valid(self):
m, alarms = _equivocating_run()
proof = build_proof(m, alarms[0])
text = proof.to_json()
parsed = json.loads(text)
assert parsed["accused_agent"] == "d"
assert parsed["statement_id"] == "s03"
def test_from_json_inverts_to_json(self):
m, alarms = _equivocating_run()
original = build_proof(m, alarms[0])
roundtrip = ProofDocument.from_json(original.to_json())
assert roundtrip.accused_agent == original.accused_agent
assert roundtrip.statement_id == original.statement_id
assert roundtrip.turn == original.turn
assert roundtrip.spacelike_verified == original.spacelike_verified
assert len(roundtrip.witnesses) == len(original.witnesses)
class TestSelfConsistentVerification:
def test_valid_proof_passes(self):
m, alarms = _equivocating_run()
proof = build_proof(m, alarms[0])
result = verify_proof_self_consistent(proof)
assert result.ok, result.reason
def test_tampered_witness_digest_fails(self):
"""If someone alters a witness digest after-the-fact to make it look
like a duplicate, self-consistency check catches the lack of distinct
digests."""
m, alarms = _equivocating_run()
proof = build_proof(m, alarms[0])
# Tamper: make both digests identical
from dataclasses import replace
from crisis_agents.alarm import MutationWitness
w0 = proof.witnesses[0]
w1 = proof.witnesses[1]
tampered = ProofDocument(
schema_version=proof.schema_version,
accused_agent=proof.accused_agent,
accused_process_id_hex=proof.accused_process_id_hex,
statement_id=proof.statement_id,
turn=proof.turn,
witnesses=(w0, replace(w1, message_digest_hex=w0.message_digest_hex)),
dag_witnesses=proof.dag_witnesses,
spacelike_verified=proof.spacelike_verified,
proof_summary=proof.proof_summary,
)
result = verify_proof_self_consistent(tampered)
assert not result.ok
assert "duplicate" in result.reason.lower()
def test_mismatched_statement_id_fails(self):
m, alarms = _equivocating_run()
proof = build_proof(m, alarms[0])
from dataclasses import replace
bad = ProofDocument(
schema_version=proof.schema_version,
accused_agent=proof.accused_agent,
accused_process_id_hex=proof.accused_process_id_hex,
statement_id="DIFFERENT", # mismatch
turn=proof.turn,
witnesses=proof.witnesses,
dag_witnesses=proof.dag_witnesses,
spacelike_verified=proof.spacelike_verified,
proof_summary=proof.proof_summary,
)
result = verify_proof_self_consistent(bad)
assert not result.ok