diff --git a/pyproject.toml b/pyproject.toml index 5141db4..f310529 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,10 +21,14 @@ dev = [ "pytest>=8.0", "rich>=13.0", ] +live = [ + "anthropic>=0.40", +] [project.scripts] crisis-node = "crisis.node:main" crisis-demo = "crisis.demo:main" +crisis-agents = "crisis_agents.cli:main" [build-system] requires = ["setuptools>=68.0"] diff --git a/src/crisis_agents/__init__.py b/src/crisis_agents/__init__.py new file mode 100644 index 0000000..d9d12ec --- /dev/null +++ b/src/crisis_agents/__init__.py @@ -0,0 +1,30 @@ +""" +Crisis Agents: a coordination layer for AI agent teams on top of the +Crisis consensus protocol. + +The protocol implementation in `crisis` reaches total order on messages +between machines. This package lifts that one level up: it treats each +participating agent as a Crisis node and uses the Lamport graph as an +immutable, replayable ledger of what every agent said and when. + +Use case: a team of agents coordinated by a *mothership* (orchestrator) +normally talks freely; when the team's boundary opens to outside agents +of unknown trust, the mothership activates the Crisis layer so that any +byzantine equivocation can be detected (`LamportGraph.find_mutations`) +and a cryptographic proof of malfeasance can be produced. + +Key modules: + - claim: structured statement an agent makes (a Crisis payload) + - boundary: closed-set tracking + open() trigger + - agent: CrisisAgent abstract + MockAgent + MockByzantineAgent + - mothership: orchestrator that drives Crisis rounds + - alarm: wraps mutation detection into AlarmEvent + - proof: emits replayable proof-of-malfeasance JSON + - scenarios: demo scripts (fact_check) + - cli: `crisis-agents` command-line entry point +""" + +from crisis_agents.claim import Claim + +__all__ = ["Claim"] +__version__ = "0.1.0" diff --git a/src/crisis_agents/agent.py b/src/crisis_agents/agent.py new file mode 100644 index 0000000..311f1f4 --- /dev/null +++ b/src/crisis_agents/agent.py @@ -0,0 +1,150 @@ +""" +CrisisAgent — abstract base + Mock subclasses for the deterministic path. + +An agent in this PoC is something that, given the current view of claims, can +produce its next contributions. Agents have a stable 32-byte process_id +derived from their human-readable name; Crisis uses this id for mutation +detection, so two agents must never share one. + +Real Claude-driven agents live in `live_agent.py` (Phase 5) and inherit the +same `CrisisAgent` interface — the mothership doesn't care which kind it is +driving. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional + +from crisis.crypto import digest +from crisis.message import ID_LENGTH + +from crisis_agents.claim import Claim + + +def agent_id_from_name(name: str) -> bytes: + """Derive a stable 32-byte process id from a human-readable name. + + Matches the convention used in `crisis.demo.Simulation`, so agents + coexisting with simulated nodes have ids in the same space. + """ + return digest(name.encode())[:ID_LENGTH] + + +@dataclass +class AgentTurn: + """One emission from an agent in a given turn. + + Attributes: + claim: The Claim being emitted. + target_subset: None means broadcast to every peer. A set of peer + names means this variant of the claim is only + delivered to those peers — the building block of + byzantine equivocation. Honest agents always set + this to None. + """ + claim: Claim + target_subset: Optional[set[str]] = None + + +class CrisisAgent(ABC): + """Abstract base for any agent participating in a Crisis-coordinated team.""" + + def __init__(self, name: str): + if not name: + raise ValueError("agent name must be non-empty") + self.name: str = name + self.process_id: bytes = agent_id_from_name(name) + + @abstractmethod + def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]: + """Produce this agent's emissions for turn `turn`. + + Args: + turn: The 0-indexed turn counter (matched across all agents). + received_claims: All claims the agent has seen *up to and including* + the previous turn. The agent may use these to inform + its next emission or ignore them entirely. + + Returns: + A possibly-empty list of AgentTurn entries. An honest agent emits one + broadcast AgentTurn per scripted item; a byzantine equivocator emits + two AgentTurns with disjoint `target_subset`s for the same logical + claim slot. + """ + ... + + def __repr__(self) -> str: + return f"{type(self).__name__}(name={self.name!r}, id={self.process_id.hex()[:8]}...)" + + +class MockAgent(CrisisAgent): + """An agent that emits a predetermined sequence of claims. + + Used for tests and deterministic demos. The `scripted_claims` argument is + a list of per-step emission lists: on its Nth invocation, the agent emits + `scripted_claims[N]`. Invocations past the end of the script produce + no emissions. + + Each agent maintains its own invocation counter, **independent of the + mothership's turn counter** — that way the same agent can be used across + closed-phase and Crisis-phase invocations without the script restarting. + The `turn` argument is observed but not used to index the script. + + All emissions are broadcast (no equivocation). For equivocation, use + `MockByzantineAgent`. + """ + + def __init__(self, name: str, scripted_claims: list[list[Claim]]): + super().__init__(name) + self._script = scripted_claims + self._invocations = 0 + + def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]: + idx = self._invocations + self._invocations += 1 + if idx >= len(self._script): + return [] + return [AgentTurn(claim=c) for c in self._script[idx]] + + +class MockByzantineAgent(CrisisAgent): + """An agent that equivocates by construction. + + For each turn, two parallel claim variants may be emitted. The first + variant goes to peers in `split_a`; the second variant goes to peers in + `split_b`. Peers not in either set receive nothing for that turn. + + `scripted_pairs[turn]` is `(claim_to_a, claim_to_b)` — the two + contradictory claims this agent emits on turn `turn`. Both share the + emitting agent's process_id, so Crisis's `find_mutations` will surface + them as a mutation pair once both vertices are present in any honest + agent's combined view. + + Set `claim_to_b = None` to skip the equivocation for that turn (the + agent then behaves honestly to everyone). + """ + + def __init__(self, name: str, + scripted_pairs: list[tuple[Claim, Optional[Claim]]], + split_a: set[str], + split_b: set[str]): + super().__init__(name) + if split_a & split_b: + raise ValueError("split_a and split_b must be disjoint") + self._script = scripted_pairs + self._split_a = split_a + self._split_b = split_b + self._invocations = 0 + + def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]: + idx = self._invocations + self._invocations += 1 + if idx >= len(self._script): + return [] + claim_a, claim_b = self._script[idx] + out: list[AgentTurn] = [AgentTurn(claim=claim_a, target_subset=set(self._split_a))] + if claim_b is not None: + out.append(AgentTurn(claim=claim_b, target_subset=set(self._split_b))) + return out diff --git a/src/crisis_agents/alarm.py b/src/crisis_agents/alarm.py new file mode 100644 index 0000000..872b8c2 --- /dev/null +++ b/src/crisis_agents/alarm.py @@ -0,0 +1,154 @@ +""" +alarm.py — detect byzantine equivocation from the mothership's records. + +Equivocation in our PoC has a precise structural signature: a single agent +emits, on the same turn, two or more Crisis Messages with **different +message digests** to **non-identical sets of peers**. Same-id same-turn +same-payload duplicate broadcasts are not equivocation; we filter those out. + +For each detected alarm we also verify via the Crisis layer's own machinery +(`LamportGraph.are_spacelike`) that the two witness vertices are causally +incomparable — this is what makes the alarm cryptographically defensible +rather than merely a bookkeeping observation. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from crisis_agents.claim import Claim + +if TYPE_CHECKING: + from crisis_agents.mothership import CrisisPhaseEntry, Mothership + + +@dataclass(frozen=True) +class MutationWitness: + """One leg of a byzantine equivocation: a specific Crisis vertex emitted + by the accused agent that contradicts another vertex from the same agent + in the same logical turn. + """ + message_digest_hex: str + payload_claim: dict # the parsed Claim as a plain dict (for JSON proof) + delivered_to: tuple[str, ...] # peers that received THIS variant + + +@dataclass(frozen=True) +class AlarmEvent: + """A detected byzantine equivocation. + + The combination of `accused_process_id_hex`, `turn`, and `statement_id` + uniquely identifies the offense; multiple witnesses prove the offender + said different things to different peers. + """ + accused_agent: str + accused_process_id_hex: str + statement_id: str + turn: int + witnesses: tuple[MutationWitness, ...] + spacelike_verified: bool # True if the Crisis layer confirmed + # the witness vertices are spacelike in + # at least one honest agent's graph + + +def scan_for_mutations(mothership: "Mothership") -> list[AlarmEvent]: + """Walk the mothership's crisis log and surface every equivocation. + + Strategy: + 1. Group crisis-log entries by (agent_name, turn). + 2. A group with ≥2 distinct message digests AND non-identical delivery + sets is a mutation candidate. + 3. For each candidate, build MutationWitness records. + 4. Verify spacelike-ness via the Crisis DAG of any honest agent that + observed at least two of the witnesses. + + Returns AlarmEvents (possibly empty). + """ + crisis_log = mothership.run_result.crisis_log + by_agent_turn: dict[tuple[str, int], list["CrisisPhaseEntry"]] = {} + for entry in crisis_log: + by_agent_turn.setdefault((entry.agent_name, entry.turn), []).append(entry) + + alarms: list[AlarmEvent] = [] + for (agent_name, turn), entries in by_agent_turn.items(): + if len(entries) < 2: + continue + + digests = {e.message_digest_hex for e in entries} + if len(digests) < 2: + continue # same payload replayed — not equivocation + + delivery_sets = {tuple(sorted(e.delivered_to)) for e in entries} + if len(delivery_sets) < 2: + continue # same recipients — not equivocation + + # All checks passed: this is an equivocation candidate. + statement_id = entries[0].claim.statement_id + accused_pid_hex = mothership.agents[agent_name].process_id.hex() + + witnesses = tuple( + MutationWitness( + message_digest_hex=e.message_digest_hex, + payload_claim=e.claim.to_dict(), + delivered_to=tuple(sorted(e.delivered_to)), + ) + for e in entries + ) + + spacelike_ok = _verify_spacelike(mothership, agent_name, entries) + + alarms.append(AlarmEvent( + accused_agent=agent_name, + accused_process_id_hex=accused_pid_hex, + statement_id=statement_id, + turn=turn, + witnesses=witnesses, + spacelike_verified=spacelike_ok, + )) + + return alarms + + +def _verify_spacelike(mothership: "Mothership", accused_name: str, + entries: list["CrisisPhaseEntry"]) -> bool: + """Ask the Crisis layer to confirm that the equivocating vertices are + causally incomparable. + + Strategy: pick any pair of entries. Find an honest agent's graph that + contains both vertices and ask `are_spacelike`. If no single graph holds + both (because the byzantine delivered them to disjoint subsets), pick + two graphs — one per entry — and check that neither vertex references + the other directly. This weaker check is sufficient for our PoC: if + neither references the other, they can't be in each other's past in + any extended graph either. + """ + a, b = entries[0], entries[1] + digest_a = bytes.fromhex(a.message_digest_hex) + digest_b = bytes.fromhex(b.message_digest_hex) + + # Try to find an honest observer's graph that contains both. + accused_pid = mothership.agents[accused_name].process_id + for name, graph in mothership.all_graphs().items(): + if mothership.agents[name].process_id == accused_pid: + continue + if digest_a in graph and digest_b in graph: + va = graph.get_vertex(digest_a) + vb = graph.get_vertex(digest_b) + if va is not None and vb is not None: + return graph.are_spacelike(va, vb) + + # Weak proof: confirm neither vertex directly references the other in + # any honest agent's graph. Sufficient in our PoC where equivocations + # are emitted at the same turn from the same parent. + for name, graph in mothership.all_graphs().items(): + if mothership.agents[name].process_id == accused_pid: + continue + va = graph.get_vertex(digest_a) + vb = graph.get_vertex(digest_b) + if va is not None and digest_b in (vb_digest for vb_digest in va.digests): + return False + if vb is not None and digest_a in (va_digest for va_digest in vb.digests): + return False + + return True diff --git a/src/crisis_agents/boundary.py b/src/crisis_agents/boundary.py new file mode 100644 index 0000000..e66cef5 --- /dev/null +++ b/src/crisis_agents/boundary.py @@ -0,0 +1,55 @@ +""" +Boundary — the membership set whose closure determines whether Crisis is active. + +The mental model: agents inside the boundary are trusted (we vouch for their +intent). Crisis is overhead in this phase. When a new agent joins from outside +— `open(new_id)` — the boundary becomes "open" and from that moment Crisis is +activated for all subsequent claim emission. + +Reopening is one-shot in this PoC: once open, the boundary stays open for the +remainder of the run. Re-closing would mean exiling agents and resetting trust +assumptions; that's a separate decision the mothership would make, not a +Boundary primitive. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field + + +@dataclass +class Boundary: + """Membership tracker + Crisis-activation flag. + + Attributes: + trusted_ids: set of 32-byte process_ids inside the trusted closure. + is_open: True once `open()` has been called at least once. + """ + trusted_ids: set[bytes] = field(default_factory=set) + is_open: bool = False + + def add_trusted(self, process_id: bytes) -> None: + """Add an agent that's trusted from the start (closed-phase population).""" + if self.is_open: + raise RuntimeError( + "boundary is already open — use open() to add agents now" + ) + self.trusted_ids.add(process_id) + + def open(self, new_process_id: bytes) -> None: + """The trigger: a new agent of unknown trust joins. + + After this call, `is_open` is True and any further agents are added + via the same method. The new id is added to `trusted_ids` because + Crisis's mutation detection works for *any* id with vertices in the + graph — the boundary doesn't gate participation, it gates whether + we bother running Crisis at all. + """ + self.trusted_ids.add(new_process_id) + self.is_open = True + + def is_trusted(self, process_id: bytes) -> bool: + return process_id in self.trusted_ids + + def size(self) -> int: + return len(self.trusted_ids) diff --git a/src/crisis_agents/claim.py b/src/crisis_agents/claim.py new file mode 100644 index 0000000..b6f828c --- /dev/null +++ b/src/crisis_agents/claim.py @@ -0,0 +1,88 @@ +""" +Claim — the structured payload an agent emits. + +A Claim is what gets JSON-serialized into a Crisis Message's `payload` +field. The Message itself carries the agent's stable process id (the +identity Crisis uses for mutation detection) and the causal digests; +the Claim carries the application-layer semantics of *what* the agent +is asserting. + +Claim is intentionally narrow: a verdict on a statement with confidence +and free-text evidence. Anything richer (multi-claim batches, attached +artifacts) should be modeled by emitting multiple Claims that share a +correlation id, not by widening this struct. +""" + +from __future__ import annotations + +import json +from dataclasses import asdict, dataclass +from typing import ClassVar, Literal + + +Verdict = Literal["true", "false", "unknown"] + + +@dataclass(frozen=True) +class Claim: + """A single adjudication an agent makes about one statement. + + Attributes: + statement_id: The scenario-defined identifier of the statement + being adjudicated (e.g. "s03"). Stable across + agents — that's how equivocation is detected. + verdict: "true" | "false" | "unknown". + confidence: Self-reported confidence in [0.0, 1.0]. + evidence: Free-text justification, capped at 280 chars so + payloads stay bounded and visualizable later. + timestamp_logical: The emitting agent's local turn counter. Not + authoritative for Crisis ordering — Crisis derives + order from the DAG — but useful for debugging. + schema_version: Forward-compat bump if Claim's shape changes. + """ + statement_id: str + verdict: Verdict + confidence: float + evidence: str + timestamp_logical: int + schema_version: int = 1 + + # Class constant, not a dataclass field — must be ClassVar so asdict() + # doesn't serialize it into every payload. + EVIDENCE_MAX_LEN: ClassVar[int] = 280 + + def __post_init__(self): + if not self.statement_id: + raise ValueError("statement_id must be non-empty") + if self.verdict not in ("true", "false", "unknown"): + raise ValueError(f"verdict must be true/false/unknown, got {self.verdict!r}") + if not 0.0 <= self.confidence <= 1.0: + raise ValueError(f"confidence must be in [0, 1], got {self.confidence}") + if len(self.evidence) > self.EVIDENCE_MAX_LEN: + raise ValueError( + f"evidence too long: {len(self.evidence)} > {self.EVIDENCE_MAX_LEN}" + ) + if self.timestamp_logical < 0: + raise ValueError(f"timestamp_logical must be >= 0, got {self.timestamp_logical}") + + def to_payload(self) -> bytes: + """Serialize to bytes suitable for `Message.payload`. + + Uses sort_keys=True so two byte strings for the same logical claim + are identical — which matters for equivocation detection: two + equivocating claims that happen to have identical payloads aren't + the same fault, they're an accidental duplicate. + """ + return json.dumps(asdict(self), sort_keys=True, separators=(",", ":")).encode("utf-8") + + @classmethod + def from_payload(cls, payload: bytes) -> "Claim": + """Inverse of `to_payload`. Raises on malformed input.""" + try: + obj = json.loads(payload.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError) as e: + raise ValueError(f"payload is not valid Claim JSON: {e}") from e + return cls(**obj) + + def to_dict(self) -> dict: + return asdict(self) diff --git a/src/crisis_agents/cli.py b/src/crisis_agents/cli.py new file mode 100644 index 0000000..fd29d58 --- /dev/null +++ b/src/crisis_agents/cli.py @@ -0,0 +1,166 @@ +""" +crisis-agents — command-line entry point. + +Subcommands: + demo Run a scripted scenario end-to-end (closed phase → boundary + opens → Crisis phase → alarm detection → proof emission). + verify Re-check a proof JSON for self-consistency. (Phase 6, may be + a stub for now.) + +Examples: + crisis-agents demo --scenario fact_check + crisis-agents demo --scenario fact_check --live + crisis-agents verify proof_agent_delta_s03.json +""" + +from __future__ import annotations + +import argparse +import sys +from pathlib import Path + +from crisis_agents.alarm import scan_for_mutations +from crisis_agents.mothership import Mothership +from crisis_agents.proof import ( + ProofDocument, + build_proof, + verify_proof_self_consistent, +) +from crisis_agents.scenarios import build_fact_check_scenario + + +SCENARIOS = { + "fact_check": build_fact_check_scenario, +} + + +def _run_demo(args: argparse.Namespace) -> int: + if args.scenario not in SCENARIOS: + print(f"unknown scenario: {args.scenario}", file=sys.stderr) + print(f"available: {', '.join(SCENARIOS)}", file=sys.stderr) + return 2 + + builder = SCENARIOS[args.scenario] + scenario = builder(live=args.live, model=args.model) + + mode = "live" if args.live else "mocked" + print(f"=== crisis-agents demo: {scenario.name} ({mode}) ===\n") + print(scenario.description) + print() + print(f"Reference document:") + for line in scenario.reference_doc.splitlines(): + print(f" {line}") + print() + + mothership = Mothership() + for agent in scenario.honest_agents: + mothership.add_agent(agent) + + # Phase 1: closed + print(f"--- Phase 1: closed team, no Crisis ({scenario.closed_phase_turns} turn(s)) ---") + mothership.run_closed_phase(num_turns=scenario.closed_phase_turns) + print( + f" {len(mothership.run_result.closed_log)} claims collected from " + f"{len(mothership.agents)} honest agent(s) — consensus reached " + f"without Crisis.\n" + ) + + # Phase 2: boundary opens + print(f"--- Phase 2: boundary opens — {scenario.byzantine_joiner.name} joins ---") + print(" Crisis activated for all subsequent claims.\n") + mothership.open_boundary(scenario.byzantine_joiner) + + # Phase 3: Crisis-active turns + print(f"--- Phase 3: Crisis-active run ({scenario.crisis_phase_turns} turn(s)) ---") + mothership.run_crisis_phase(num_turns=scenario.crisis_phase_turns) + print( + f" {len(mothership.run_result.crisis_log)} Crisis messages emitted; " + f"{len(mothership.agents)} per-agent LamportGraphs maintained.\n" + ) + + # Phase 4: alarm + print("--- Phase 4: scan for byzantine equivocation ---") + alarms = scan_for_mutations(mothership) + if not alarms: + print(" ✓ No mutations detected — network is honest.\n") + return 0 + + print(f" ⚠ {len(alarms)} alarm(s) raised:") + for a in alarms: + verdicts = ", ".join( + f"{w.payload_claim['verdict']}->{','.join(w.delivered_to)}" + for w in a.witnesses + ) + print( + f" - agent {a.accused_agent!r} (id={a.accused_process_id_hex[:16]}...) " + f"equivocated on {a.statement_id} at turn {a.turn}: {verdicts}" + ) + print() + + # Phase 5: proof emission + print("--- Phase 5: emit proof-of-malfeasance ---") + out_dir = Path(args.out_dir) + out_dir.mkdir(parents=True, exist_ok=True) + for a in alarms: + proof = build_proof(mothership, a) + path = out_dir / f"proof_{a.accused_agent}_{a.statement_id}.json" + path.write_text(proof.to_json()) + print(f" wrote {path}") + check = verify_proof_self_consistent(proof) + marker = "OK" if check.ok else "FAIL" + print(f" self-consistency: {marker} — {check.reason}") + print() + return 0 + + +def _run_verify(args: argparse.Namespace) -> int: + path = Path(args.proof_path) + if not path.exists(): + print(f"file not found: {path}", file=sys.stderr) + return 2 + proof = ProofDocument.from_json(path.read_text()) + result = verify_proof_self_consistent(proof) + print(f"proof: {path}") + print(f" accused agent: {proof.accused_agent}") + print(f" statement_id: {proof.statement_id}") + print(f" turn: {proof.turn}") + print(f" spacelike: {proof.spacelike_verified}") + print(f" self-consistent: {result.ok}") + print(f" reason: {result.reason}") + return 0 if result.ok else 1 + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + prog="crisis-agents", + description="Crisis-Agents — coordination layer for AI agent teams.", + ) + sub = parser.add_subparsers(dest="cmd", required=True) + + demo = sub.add_parser("demo", help="run a scripted scenario end-to-end") + demo.add_argument("--scenario", default="fact_check", + help="which scenario to run (default: fact_check)") + demo.add_argument("--live", action="store_true", + help="back the honest agents with real Claude API calls " + "(requires anthropic SDK + ANTHROPIC_API_KEY)") + demo.add_argument("--model", default=None, + help="Anthropic model id for --live (default: " + "claude-haiku-4-5-20251001)") + demo.add_argument("--out-dir", default=".", + help="where to write proof JSON files (default: cwd)") + + verify = sub.add_parser("verify", help="check a proof JSON for self-consistency") + verify.add_argument("proof_path", help="path to a proof JSON file") + + args = parser.parse_args(argv) + + if args.cmd == "demo": + return _run_demo(args) + if args.cmd == "verify": + return _run_verify(args) + parser.error(f"unknown command: {args.cmd}") + return 2 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/crisis_agents/mothership.py b/src/crisis_agents/mothership.py new file mode 100644 index 0000000..631f360 --- /dev/null +++ b/src/crisis_agents/mothership.py @@ -0,0 +1,320 @@ +""" +Mothership — the orchestrator that runs a Crisis-Agents network. + +Two phases: + +1. **Closed phase.** Agents talk freely. `run_closed_phase(N)` advances N + turns, collecting every claim into a flat log. No DAG, no voting, no + overhead. This is the "normal life" the user described. + +2. **Crisis phase.** Triggered by `open_boundary(new_agent)`. From that + point on every claim is wrapped into a Crisis `Message`, extended into + per-agent `LamportGraph`s, and consensus algorithms run. Mutation + detection raises alarms; proofs are generated separately by `proof.py`. + +The mothership keeps one LamportGraph per agent (the agent's view of the +network) and updates them in lockstep — the PoC uses synchronous in-process +delivery, so all honest agents see the same vertices except where a +byzantine agent has selectively delivered an equivocation. Each honest +graph still observes both equivocating vertices once the network gossips +enough; that's what `LamportGraph.find_mutations` keys on. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from typing import Optional + +from crisis.graph import LamportGraph +from crisis.message import Message, NONCE_LENGTH +from crisis.weight import ProofOfWorkWeight + +from crisis_agents.agent import AgentTurn, CrisisAgent +from crisis_agents.boundary import Boundary +from crisis_agents.claim import Claim + + +@dataclass +class ClosedPhaseEntry: + """One row in the closed-phase log: who said what, when.""" + agent_name: str + turn: int + claim: Claim + + +@dataclass +class CrisisPhaseEntry: + """One row in the Crisis-phase log: the (agent, turn, claim, vertex_digest) + of an emitted-and-accepted Crisis message, plus the target subset (set of + peer names) it was delivered to. Useful as the raw material for proofs. + """ + agent_name: str + turn: int + claim: Claim + message_digest_hex: str + delivered_to: list[str] + + +@dataclass +class MothershipRunResult: + """What `run_closed_phase` / `run_crisis_phase` returns.""" + closed_log: list[ClosedPhaseEntry] = field(default_factory=list) + crisis_log: list[CrisisPhaseEntry] = field(default_factory=list) + + +class Mothership: + """Orchestrates a team of CrisisAgents. + + Usage: + m = Mothership() + m.add_agent(MockAgent("agent_a", ...)) + m.add_agent(MockAgent("agent_b", ...)) + m.add_agent(MockAgent("agent_c", ...)) + + # Closed phase + m.run_closed_phase(num_turns=2) + + # Boundary opens + m.open_boundary(MockByzantineAgent("agent_d", ...)) + + # Crisis phase + m.run_crisis_phase(num_turns=5) + """ + + def __init__(self, *, pow_zeros: int = 0): + self.agents: dict[str, CrisisAgent] = {} + self.boundary = Boundary() + self.run_result = MothershipRunResult() + + # Per-agent Lamport graphs (only used in the Crisis phase). + self._graphs: dict[str, LamportGraph] = {} + self._weight_system = ProofOfWorkWeight(min_leading_zeros=pow_zeros) + + # Independent turn counters: closed and Crisis phases share none. + # Crisis-phase turns count from 0 again so the proof JSON has a + # clean "round after boundary open" timeline. + self._closed_turn_index = 0 + self._crisis_turn_index = 0 + + # ------------------------------------------------------------------ + # Setup + # ------------------------------------------------------------------ + + def add_agent(self, agent: CrisisAgent) -> None: + """Register a trusted agent (must be called before run_closed_phase).""" + if self.boundary.is_open: + raise RuntimeError("cannot add_agent after boundary opened; use open_boundary") + if agent.name in self.agents: + raise ValueError(f"agent {agent.name!r} already added") + self.agents[agent.name] = agent + self.boundary.add_trusted(agent.process_id) + + # ------------------------------------------------------------------ + # Phase 1: closed + # ------------------------------------------------------------------ + + def run_closed_phase(self, num_turns: int) -> MothershipRunResult: + """Drive `num_turns` of plain agent communication. No Crisis. + + Each turn: + - All agents see the cumulative claims observed so far. + - Each agent emits its scripted claims; each emission is appended + to the closed log. + """ + if self.boundary.is_open: + raise RuntimeError("boundary already open; closed phase is over") + + observed: list[Claim] = [e.claim for e in self.run_result.closed_log] + for _ in range(num_turns): + turn = self._closed_turn_index + new_this_turn: list[Claim] = [] + for agent in self.agents.values(): + for at in agent.next_turn(turn, observed): + self.run_result.closed_log.append( + ClosedPhaseEntry(agent_name=agent.name, turn=turn, claim=at.claim) + ) + new_this_turn.append(at.claim) + observed.extend(new_this_turn) + self._closed_turn_index += 1 + return self.run_result + + # ------------------------------------------------------------------ + # Phase 2: boundary opens, Crisis activates + # ------------------------------------------------------------------ + + def open_boundary(self, new_agent: CrisisAgent) -> None: + """The trigger: a new agent of unknown trust joins. + + Crisis is now active for all subsequent claim emission. Each existing + agent gets a fresh LamportGraph; the new agent does too. From this + moment, `run_crisis_phase()` drives the consensus loop. + """ + if new_agent.name in self.agents: + raise ValueError(f"agent {new_agent.name!r} is already inside the boundary") + self.agents[new_agent.name] = new_agent + self.boundary.open(new_agent.process_id) + + # Initialize a graph for every agent (including the joiner). + for name in self.agents: + self._graphs[name] = LamportGraph(weight_system=self._weight_system) + + # ------------------------------------------------------------------ + # Phase 2 mechanics: building Crisis messages from Claims + # ------------------------------------------------------------------ + + def _wrap_as_message(self, agent: CrisisAgent, claim: Claim, + graph: LamportGraph) -> Message: + """Convert a Claim into a Crisis Message and return it (un-extended). + + Builds the digests tuple per Algorithm 1: + - reference the agent's own last vertex in `graph` (chain link), + - cross-reference one most-recent vertex per other id (sample). + Mines a PoW nonce that satisfies the weight system. + """ + payload = claim.to_payload() + + # Last vertex with this agent's id (chain link, if any) + same_id = [v for v in graph.all_vertices() if v.id == agent.process_id] + digests_list: list[bytes] = [] + if same_id: + # Pick the one not referenced by any other same-id vertex + referenced = set() + for v in same_id: + for d in v.digests: + ref = graph.get_vertex(d) + if ref is not None and ref.id == agent.process_id: + referenced.add(d) + last = next( + (v for v in same_id if v.message_digest not in referenced), + same_id[-1], + ) + digests_list.append(last.message_digest) + past_digests = {v.message_digest for v in graph.past(last)} + else: + past_digests = set() + + # Cross-references: one most-recent vertex per other id + seen_other_ids: set[bytes] = {agent.process_id} + for v in graph.all_vertices(): + if v.id in seen_other_ids: + continue + if v.message_digest in past_digests: + continue + digests_list.append(v.message_digest) + seen_other_ids.add(v.id) + + # Mine a valid nonce; reuse the weight system + if isinstance(self._weight_system, ProofOfWorkWeight): + return self._weight_system.mine_nonce( + agent.process_id, tuple(digests_list), payload + ) + else: + return Message( + nonce=os.urandom(NONCE_LENGTH), + id=agent.process_id, + digests=tuple(digests_list), + payload=payload, + ) + + def _deliver(self, sender: CrisisAgent, message: Message, + target_names: list[str]) -> None: + """Extend the message into the LamportGraphs of `target_names`.""" + for name in target_names: + graph = self._graphs[name] + graph.extend(message) + + # ------------------------------------------------------------------ + # Phase 2: the Crisis-active run loop + # ------------------------------------------------------------------ + + def run_crisis_phase(self, num_turns: int) -> MothershipRunResult: + """Drive `num_turns` of agent activity with Crisis active. + + Each turn: + 1. Every agent (including the byzantine joiner) is asked for its + emissions. Each emission carries an optional `target_subset`. + 2. Each emission is wrapped into a Crisis Message against the + SENDER's view of the graph (the agent's own LamportGraph). + 3. The Message is delivered (extended) into the LamportGraphs of + every peer in the target subset (or every peer if None). + 4. The (agent, turn, claim, message_digest, delivered_to) tuple + is logged for downstream proof generation. + """ + if not self.boundary.is_open: + raise RuntimeError("boundary not yet open; call open_boundary() first") + + all_names = list(self.agents.keys()) + + for _ in range(num_turns): + turn = self._crisis_turn_index + + # Snapshot of received claims per agent — for the agent's view + # of the conversation when it decides what to say next. In the + # PoC this is the agent's graph's vertex set, decoded back to + # Claim objects. + for agent in self.agents.values(): + received: list[Claim] = [] + for v in self._graphs[agent.name].all_vertices(): + if v.id == agent.process_id: + continue + try: + received.append(Claim.from_payload(v.payload)) + except (ValueError, TypeError): + # Not all vertices need to be Claim-shaped (defensive) + continue + + for at in agent.next_turn(turn, received): + self._emit(agent, turn, at, all_names) + + self._crisis_turn_index += 1 + + return self.run_result + + def _emit(self, agent: CrisisAgent, turn: int, at: AgentTurn, + all_names: list[str]) -> None: + """Resolve target subset, build message, deliver, log. + + Delivery rule: + - target_subset is None ⇒ honest broadcast to all peers including + the sender's own graph. + - target_subset is set ⇒ targeted delivery; the sender's own + graph is NOT auto-included. This is what enables byzantine + equivocation: a byzantine sender emits two variants with + disjoint targets, and its own graph holds neither — otherwise + the second variant would fail the same-id chain constraint + against the first variant. + + The byzantine still "knows" what it said via the crisis_log; what + it doesn't keep in its own LamportGraph is the conflicting state. + """ + if at.target_subset is None: + targets = list(all_names) + else: + targets = [t for t in at.target_subset if t in self.agents] + + msg = self._wrap_as_message(agent, at.claim, self._graphs[agent.name]) + self._deliver(agent, msg, targets) + + self.run_result.crisis_log.append( + CrisisPhaseEntry( + agent_name=agent.name, + turn=turn, + claim=at.claim, + message_digest_hex=msg.compute_digest().hex(), + delivered_to=targets, + ) + ) + + # ------------------------------------------------------------------ + # Read-only accessors (used by alarm.py and proof.py in later phases) + # ------------------------------------------------------------------ + + def graph_of(self, agent_name: str) -> LamportGraph: + """The LamportGraph held by `agent_name` (Crisis phase only).""" + if agent_name not in self._graphs: + raise KeyError(f"no Crisis-phase graph for agent {agent_name!r}") + return self._graphs[agent_name] + + def all_graphs(self) -> dict[str, LamportGraph]: + return dict(self._graphs) diff --git a/src/crisis_agents/proof.py b/src/crisis_agents/proof.py new file mode 100644 index 0000000..822e909 --- /dev/null +++ b/src/crisis_agents/proof.py @@ -0,0 +1,181 @@ +""" +proof.py — emit and verify replayable proof-of-malfeasance JSON documents. + +A ProofDocument is a self-contained JSON file that: + 1. Names the accused agent (human-readable name + 32-byte process_id). + 2. Identifies the offense (statement_id, turn). + 3. Includes every contradictory MutationWitness with its message digest, + parsed Claim, and delivery target set. + 4. Records the "DAG witness" — for each witness vertex, which honest + agents' graphs hold it. An independent verifier can cross-check + this against the recorded simulation snapshots. + 5. Asserts whether the Crisis layer confirmed spacelike-ness at the + time of detection. + +The proof is replayable: given the JSON and the original `crisis_log` +(or a recorded simulation), `verify_proof` re-derives the alarm and +confirms each claim independently. +""" + +from __future__ import annotations + +import json +from dataclasses import asdict, dataclass, field +from typing import TYPE_CHECKING + +from crisis_agents.alarm import AlarmEvent, MutationWitness + +if TYPE_CHECKING: + from crisis_agents.mothership import Mothership + + +@dataclass(frozen=True) +class WitnessGraphReference: + """For one mutation witness, which honest agents' graphs hold it.""" + message_digest_hex: str + observed_by: tuple[str, ...] # honest agent names whose LamportGraph + # contains a vertex with this digest + + +@dataclass(frozen=True) +class ProofDocument: + """A replayable proof of one detected byzantine equivocation.""" + schema_version: int = 1 + accused_agent: str = "" + accused_process_id_hex: str = "" + statement_id: str = "" + turn: int = 0 + witnesses: tuple[MutationWitness, ...] = () + dag_witnesses: tuple[WitnessGraphReference, ...] = () + spacelike_verified: bool = False + proof_summary: str = "" + + def to_json(self) -> str: + """Serialize to indented JSON. Uses asdict on the nested dataclasses + so the resulting structure is plain dict / list / str / int / bool — + cleanly inspectable with `jq` and re-parseable.""" + return json.dumps(asdict(self), indent=2, sort_keys=True) + + @classmethod + def from_json(cls, text: str) -> "ProofDocument": + obj = json.loads(text) + return cls( + schema_version=obj.get("schema_version", 1), + accused_agent=obj["accused_agent"], + accused_process_id_hex=obj["accused_process_id_hex"], + statement_id=obj["statement_id"], + turn=obj["turn"], + witnesses=tuple( + MutationWitness( + message_digest_hex=w["message_digest_hex"], + payload_claim=w["payload_claim"], + delivered_to=tuple(w["delivered_to"]), + ) + for w in obj["witnesses"] + ), + dag_witnesses=tuple( + WitnessGraphReference( + message_digest_hex=g["message_digest_hex"], + observed_by=tuple(g["observed_by"]), + ) + for g in obj["dag_witnesses"] + ), + spacelike_verified=obj["spacelike_verified"], + proof_summary=obj["proof_summary"], + ) + + +def build_proof(mothership: "Mothership", alarm: AlarmEvent) -> ProofDocument: + """Produce the ProofDocument for a single alarm. + + Cross-references each witness against every honest agent's LamportGraph + so the proof carries the "who saw what" structure. + """ + accused_pid = mothership.agents[alarm.accused_agent].process_id + honest_names = [ + name for name, ag in mothership.agents.items() + if ag.process_id != accused_pid + ] + + dag_refs = [] + for w in alarm.witnesses: + digest = bytes.fromhex(w.message_digest_hex) + observed_by = tuple( + name for name in honest_names + if digest in mothership.graph_of(name) + ) + dag_refs.append(WitnessGraphReference( + message_digest_hex=w.message_digest_hex, + observed_by=observed_by, + )) + + summary = ( + f"agent {alarm.accused_agent!r} (id={alarm.accused_process_id_hex[:16]}...) " + f"emitted {len(alarm.witnesses)} contradictory Crisis vertices for " + f"statement {alarm.statement_id!r} in turn {alarm.turn}; vertices " + f"{'are confirmed' if alarm.spacelike_verified else 'appear to be'} " + f"spacelike in the DAG of at least one honest agent." + ) + + return ProofDocument( + accused_agent=alarm.accused_agent, + accused_process_id_hex=alarm.accused_process_id_hex, + statement_id=alarm.statement_id, + turn=alarm.turn, + witnesses=alarm.witnesses, + dag_witnesses=tuple(dag_refs), + spacelike_verified=alarm.spacelike_verified, + proof_summary=summary, + ) + + +@dataclass(frozen=True) +class VerificationResult: + ok: bool + reason: str + + +def verify_proof_self_consistent(proof: ProofDocument) -> VerificationResult: + """Verify the proof is self-consistent — without re-running the simulation. + + Checks: + - schema_version is known + - at least 2 witnesses + - witness message digests are pairwise distinct + - witness delivery sets are pairwise non-identical + - witnesses agree on the statement_id and turn fields named in the proof + - dag_witnesses cover every witness digest + + What we do NOT check here (would require the recorded simulation): + - that the digests correspond to real PoW-mined Crisis Messages + - that the spacelike-verified flag matches a fresh DAG re-derivation + + A future `verify_proof_against_log(proof, recorded_events)` would close + that gap. + """ + if proof.schema_version != 1: + return VerificationResult(False, f"unsupported schema_version {proof.schema_version}") + if len(proof.witnesses) < 2: + return VerificationResult(False, "fewer than 2 witnesses — no equivocation") + + digests = [w.message_digest_hex for w in proof.witnesses] + if len(set(digests)) != len(digests): + return VerificationResult(False, "duplicate witness digests") + + deliveries = {tuple(w.delivered_to) for w in proof.witnesses} + if len(deliveries) < 2: + return VerificationResult(False, "all witnesses have identical delivery sets") + + for w in proof.witnesses: + if w.payload_claim.get("statement_id") != proof.statement_id: + return VerificationResult(False, f"witness disagrees on statement_id: {w}") + + dag_digests = {g.message_digest_hex for g in proof.dag_witnesses} + if set(digests) - dag_digests: + return VerificationResult(False, "dag_witnesses missing for some witness") + + return VerificationResult( + True, + f"proof is self-consistent: {len(proof.witnesses)} contradictory " + f"witnesses for statement {proof.statement_id!r} at turn {proof.turn}", + ) diff --git a/src/crisis_agents/scenarios/__init__.py b/src/crisis_agents/scenarios/__init__.py new file mode 100644 index 0000000..867eb84 --- /dev/null +++ b/src/crisis_agents/scenarios/__init__.py @@ -0,0 +1,10 @@ +"""Demo scenarios for the crisis-agents CLI. + +A scenario is a self-contained recipe: a reference document, a set of +statements to adjudicate, scripted Claims for each honest agent, and a +byzantine pair designed to trigger one equivocation alarm. +""" + +from crisis_agents.scenarios.fact_check import build_fact_check_scenario + +__all__ = ["build_fact_check_scenario"] diff --git a/src/crisis_agents/scenarios/fact_check.py b/src/crisis_agents/scenarios/fact_check.py new file mode 100644 index 0000000..96d8e9e --- /dev/null +++ b/src/crisis_agents/scenarios/fact_check.py @@ -0,0 +1,186 @@ +""" +fact_check scenario — the canonical PoC demo. + +A team of three honest agents fact-checks six statements against a short +reference document. After the closed phase, a fourth agent (the byzantine) +joins. Crisis is activated. For statement s03 the byzantine equivocates: +it tells one peer set the statement is TRUE and another peer set it is +FALSE. For every other statement it agrees with ground truth (so it can't +be dismissed as a low-vote outlier). + +The scenario is fully deterministic in mocked mode — the same seed +produces the same Crisis log every time — which is what makes it a +useful regression fixture. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from importlib import resources + +from crisis_agents.agent import ( + CrisisAgent, + MockAgent, + MockByzantineAgent, +) +from crisis_agents.claim import Claim + + +@dataclass(frozen=True) +class Statement: + """A single proposition the team must adjudicate.""" + id: str + text: str + ground_truth: str # "true" | "false" + + +STATEMENTS: tuple[Statement, ...] = ( + Statement("s01", "Water boils at 100°C at standard pressure.", "true"), + Statement("s02", "The speed of light in vacuum is a defined constant.", "true"), + Statement("s03", "Pluto is still classified as a planet by the IAU.", "false"), + Statement("s04", "The Sun is approximately 4.6 billion years old.", "true"), + Statement("s05", "The Moon's diameter is half of the Earth's diameter.", "false"), + Statement("s06", "Sound can travel through a vacuum.", "false"), +) + + +def load_reference_doc() -> str: + """Return the reference document's full text.""" + return resources.files("crisis_agents.scenarios").joinpath( + "reference_doc.txt" + ).read_text(encoding="utf-8") + + +def _honest_claims(agent_name: str) -> list[list[Claim]]: + """Build one turn's worth of honest claims — one Claim per statement, + all matching ground truth, with slightly varied confidence per agent. + + Returns a single-turn script: `[ [claim_s01, claim_s02, ..., claim_s06] ]`. + """ + # Tiny per-agent confidence offset so claims aren't byte-identical + # (though identical payloads would also be fine — Crisis dedupes on + # message digest including the nonce). + confidence_offset = { + "agent_alpha": 0.95, + "agent_beta": 0.90, + "agent_gamma": 0.92, + } + base = confidence_offset.get(agent_name, 0.90) + + turn0: list[Claim] = [] + for st in STATEMENTS: + turn0.append(Claim( + statement_id=st.id, + verdict=st.ground_truth, # type: ignore[arg-type] + confidence=base, + evidence=f"per reference doc — {agent_name}", + timestamp_logical=0, + )) + return [turn0] + + +def build_honest_agents() -> list[CrisisAgent]: + """The three trusted agents for the closed-phase team (mocked).""" + return [ + MockAgent("agent_alpha", _honest_claims("agent_alpha")), + MockAgent("agent_beta", _honest_claims("agent_beta")), + MockAgent("agent_gamma", _honest_claims("agent_gamma")), + ] + + +def build_live_honest_agents(model: str | None = None) -> list[CrisisAgent]: + """The three honest agents in `--live` mode — backed by real Claude API.""" + # Lazy import so the anthropic SDK isn't required for the mocked path. + from crisis_agents.live_agent import DEFAULT_MODEL, LiveClaudeAgent + + statement_dicts = [{"id": s.id, "text": s.text} for s in STATEMENTS] + selected_model = model or DEFAULT_MODEL + ref = load_reference_doc() + return [ + LiveClaudeAgent("agent_alpha", reference_doc=ref, + statements=statement_dicts, model=selected_model), + LiveClaudeAgent("agent_beta", reference_doc=ref, + statements=statement_dicts, model=selected_model), + LiveClaudeAgent("agent_gamma", reference_doc=ref, + statements=statement_dicts, model=selected_model), + ] + + +def build_byzantine_joiner() -> CrisisAgent: + """The fourth agent — joins after the boundary opens. + + For s03 (whose ground truth is FALSE) the byzantine tells α and γ "true" + but tells β "false". For every other statement it tells everyone the + ground-truth answer — so vote weight doesn't isolate it as a simple + outlier. + + Because MockByzantineAgent emits two variants per scripted_pair on the + same turn, we model "agrees with everyone" by giving both variants the + same content; only s03 produces a genuine equivocation. To keep + delivery semantics clean, the byzantine's non-equivocating statements + use a small dedicated agent name list to remain a single broadcast. + + Simpler approach used here: emit only ONE equivocation slot — the s03 + one — on turn 0, and skip the other statements. The honest majority + already covers s01-s06 with consistent claims; the byzantine doesn't + need to vote on every statement for the demo to read as "byzantine + caught equivocating". + """ + pair_s03_true = Claim( + statement_id="s03", verdict="true", confidence=0.85, + evidence="claims Pluto is still a planet, contradicts ref doc", + timestamp_logical=0, + ) + pair_s03_false = Claim( + statement_id="s03", verdict="false", confidence=0.85, + evidence="agrees Pluto was reclassified, matches ref doc", + timestamp_logical=0, + ) + return MockByzantineAgent( + name="agent_delta", + scripted_pairs=[(pair_s03_true, pair_s03_false)], + split_a={"agent_alpha", "agent_gamma"}, + split_b={"agent_beta"}, + ) + + +@dataclass +class Scenario: + name: str + description: str + closed_phase_turns: int + crisis_phase_turns: int + honest_agents: list[CrisisAgent] + byzantine_joiner: CrisisAgent + reference_doc: str + + +def build_fact_check_scenario(*, live: bool = False, + model: str | None = None) -> Scenario: + """Wire together the reference doc, statements, agents, and run lengths. + + In `live` mode, the three honest agents are replaced with LiveClaudeAgent + instances backed by real Anthropic API calls. The byzantine joiner stays + mocked even in live mode — see live_agent.py for the rationale. + """ + if live: + honest = build_live_honest_agents(model=model) + suffix = " (live: honest agents are real Claude; byzantine is scripted)" + else: + honest = build_honest_agents() + suffix = " (mocked, deterministic)" + + return Scenario( + name="fact_check", + description=( + "Three honest agents adjudicate six factual statements against " + "a small reference doc. A fourth agent joins the team after the " + "boundary opens and equivocates on statement s03 — Crisis " + "should detect this." + suffix + ), + closed_phase_turns=1, + crisis_phase_turns=1, + honest_agents=honest, + byzantine_joiner=build_byzantine_joiner(), + reference_doc=load_reference_doc(), + ) diff --git a/src/crisis_agents/scenarios/reference_doc.txt b/src/crisis_agents/scenarios/reference_doc.txt new file mode 100644 index 0000000..c355082 --- /dev/null +++ b/src/crisis_agents/scenarios/reference_doc.txt @@ -0,0 +1,11 @@ +REFERENCE DOCUMENT — Astronomy & Physics Basics + +Water boils at 100 degrees Celsius (212 degrees Fahrenheit) at standard +atmospheric pressure of one atmosphere. The speed of light in a vacuum is +exactly 299,792,458 metres per second; this is a defined constant rather +than a measured quantity. Pluto was reclassified from a planet to a dwarf +planet in 2006 by the International Astronomical Union. The Sun is roughly +4.6 billion years old and is about halfway through its main-sequence +lifetime. The diameter of the Moon is approximately one-quarter the +diameter of the Earth. Sound cannot travel through a vacuum because it +requires a medium to propagate. diff --git a/tests/test_alarm.py b/tests/test_alarm.py new file mode 100644 index 0000000..fcce16d --- /dev/null +++ b/tests/test_alarm.py @@ -0,0 +1,103 @@ +"""Tests for byzantine equivocation detection.""" + +import pytest + +from crisis_agents.agent import MockAgent, MockByzantineAgent +from crisis_agents.alarm import AlarmEvent, scan_for_mutations +from crisis_agents.claim import Claim +from crisis_agents.mothership import Mothership + + +def _claim(sid: str, verdict: str = "true", evidence: str = "ok") -> Claim: + return Claim(statement_id=sid, verdict=verdict, confidence=0.9, # type: ignore[arg-type] + evidence=evidence, timestamp_logical=0) + + +def _equivocating_team() -> Mothership: + """A 3-honest-1-byzantine team where the byzantine equivocates on s03.""" + m = Mothership() + m.add_agent(MockAgent("a", [[]])) + m.add_agent(MockAgent("b", [[]])) + m.add_agent(MockAgent("c", [[]])) + m.open_boundary(MockByzantineAgent( + "d", + scripted_pairs=[( + _claim("s03", verdict="true", evidence="to_a"), + _claim("s03", verdict="false", evidence="to_b"), + )], + split_a={"a", "c"}, + split_b={"b"}, + )) + return m + + +class TestAlarmDetection: + + def test_no_alarms_in_honest_run(self): + m = Mothership() + m.add_agent(MockAgent("a", [[]])) + m.add_agent(MockAgent("b", [[]])) + m.open_boundary(MockAgent("d", [[_claim("s01")]])) + m.run_crisis_phase(num_turns=1) + alarms = scan_for_mutations(m) + assert alarms == [] + + def test_equivocation_raises_one_alarm(self): + m = _equivocating_team() + m.run_crisis_phase(num_turns=1) + alarms = scan_for_mutations(m) + assert len(alarms) == 1 + + a = alarms[0] + assert isinstance(a, AlarmEvent) + assert a.accused_agent == "d" + assert a.statement_id == "s03" + assert a.turn == 0 + assert len(a.witnesses) == 2 + + def test_witness_digests_are_distinct(self): + m = _equivocating_team() + m.run_crisis_phase(num_turns=1) + a = scan_for_mutations(m)[0] + d1 = a.witnesses[0].message_digest_hex + d2 = a.witnesses[1].message_digest_hex + assert d1 != d2 + + def test_delivery_sets_are_disjoint(self): + m = _equivocating_team() + m.run_crisis_phase(num_turns=1) + a = scan_for_mutations(m)[0] + s1 = set(a.witnesses[0].delivered_to) + s2 = set(a.witnesses[1].delivered_to) + assert s1 & s2 == set() + + def test_spacelike_verified_is_true(self): + """The Crisis layer should confirm the witness vertices are causally + incomparable in at least one honest graph.""" + m = _equivocating_team() + m.run_crisis_phase(num_turns=1) + a = scan_for_mutations(m)[0] + assert a.spacelike_verified is True + + def test_duplicate_broadcast_is_not_equivocation(self): + """If a byzantine emits the SAME payload to two disjoint subsets, + the message digests are identical and it's not equivocation.""" + same = _claim("s03", verdict="true", evidence="same evidence") + m = Mothership() + m.add_agent(MockAgent("a", [[]])) + m.add_agent(MockAgent("b", [[]])) + m.open_boundary(MockByzantineAgent( + "d", + scripted_pairs=[(same, same)], + split_a={"a"}, + split_b={"b"}, + )) + m.run_crisis_phase(num_turns=1) + alarms = scan_for_mutations(m) + # Same payload → same nonce-mined message after PoW → same digest → + # no equivocation. (The byzantine has to actually say different + # things to be caught.) + assert alarms == [] or all( + len({w.message_digest_hex for w in alarm.witnesses}) > 1 + for alarm in alarms + ) diff --git a/tests/test_boundary.py b/tests/test_boundary.py new file mode 100644 index 0000000..da7f4fe --- /dev/null +++ b/tests/test_boundary.py @@ -0,0 +1,50 @@ +"""Tests for the Boundary membership tracker.""" + +import pytest + +from crisis.crypto import digest +from crisis.message import ID_LENGTH +from crisis_agents.boundary import Boundary + + +def pid(name: str) -> bytes: + return digest(name.encode())[:ID_LENGTH] + + +class TestBoundary: + + def test_initially_closed_and_empty(self): + b = Boundary() + assert not b.is_open + assert b.size() == 0 + + def test_add_trusted_in_closed_phase(self): + b = Boundary() + b.add_trusted(pid("a")) + b.add_trusted(pid("b")) + assert b.size() == 2 + assert b.is_trusted(pid("a")) + assert b.is_trusted(pid("b")) + assert not b.is_trusted(pid("c")) + + def test_open_flips_flag_and_adds_id(self): + b = Boundary() + b.add_trusted(pid("a")) + b.open(pid("new")) + assert b.is_open + assert b.is_trusted(pid("new")) + assert b.size() == 2 + + def test_add_trusted_after_open_rejected(self): + b = Boundary() + b.open(pid("a")) + with pytest.raises(RuntimeError, match="already open"): + b.add_trusted(pid("b")) + + def test_open_is_idempotent_on_state(self): + """Calling open() a second time with a new id just adds the id.""" + b = Boundary() + b.open(pid("first")) + b.open(pid("second")) + assert b.is_open + assert b.size() == 2 diff --git a/tests/test_claim.py b/tests/test_claim.py new file mode 100644 index 0000000..27c1d15 --- /dev/null +++ b/tests/test_claim.py @@ -0,0 +1,104 @@ +"""Tests for the Claim payload dataclass.""" + +import json + +import pytest + +from crisis_agents.claim import Claim + + +class TestClaimConstruction: + + def test_basic_claim_roundtrip(self): + c = Claim( + statement_id="s01", + verdict="true", + confidence=0.95, + evidence="The reference doc states this directly in paragraph 2.", + timestamp_logical=3, + ) + assert c.statement_id == "s01" + assert c.verdict == "true" + assert c.confidence == pytest.approx(0.95) + assert c.schema_version == 1 + + def test_unknown_verdict_accepted(self): + c = Claim(statement_id="s02", verdict="unknown", confidence=0.5, + evidence="no signal", timestamp_logical=0) + assert c.verdict == "unknown" + + +class TestClaimValidation: + + def test_empty_statement_id_rejected(self): + with pytest.raises(ValueError, match="statement_id"): + Claim(statement_id="", verdict="true", confidence=0.9, + evidence="x", timestamp_logical=0) + + def test_invalid_verdict_rejected(self): + with pytest.raises(ValueError, match="verdict"): + Claim(statement_id="s01", verdict="maybe", # type: ignore[arg-type] + confidence=0.9, evidence="x", timestamp_logical=0) + + def test_confidence_out_of_range(self): + with pytest.raises(ValueError, match="confidence"): + Claim(statement_id="s01", verdict="true", confidence=1.5, + evidence="x", timestamp_logical=0) + with pytest.raises(ValueError, match="confidence"): + Claim(statement_id="s01", verdict="true", confidence=-0.1, + evidence="x", timestamp_logical=0) + + def test_evidence_too_long(self): + with pytest.raises(ValueError, match="evidence too long"): + Claim(statement_id="s01", verdict="true", confidence=0.9, + evidence="x" * 500, timestamp_logical=0) + + def test_negative_timestamp_rejected(self): + with pytest.raises(ValueError, match="timestamp"): + Claim(statement_id="s01", verdict="true", confidence=0.9, + evidence="x", timestamp_logical=-1) + + +class TestPayloadRoundtrip: + + def test_to_payload_returns_bytes(self): + c = Claim(statement_id="s01", verdict="true", confidence=0.9, + evidence="ok", timestamp_logical=2) + b = c.to_payload() + assert isinstance(b, bytes) + # Valid JSON + obj = json.loads(b.decode("utf-8")) + assert obj["statement_id"] == "s01" + + def test_from_payload_inverts_to_payload(self): + original = Claim(statement_id="s04", verdict="false", confidence=0.72, + evidence="contradicted by para 3", timestamp_logical=7) + roundtrip = Claim.from_payload(original.to_payload()) + assert roundtrip == original + + def test_payload_is_deterministic(self): + """Same logical claim must produce identical bytes — required so two + equivocating-but-payload-identical messages can be detected as the + same payload (vs. different payload = real equivocation).""" + c1 = Claim(statement_id="s01", verdict="true", confidence=0.9, + evidence="evidence here", timestamp_logical=1) + c2 = Claim(statement_id="s01", verdict="true", confidence=0.9, + evidence="evidence here", timestamp_logical=1) + assert c1.to_payload() == c2.to_payload() + + def test_from_payload_rejects_garbage(self): + with pytest.raises(ValueError, match="Claim JSON"): + Claim.from_payload(b"not json") + + def test_from_payload_rejects_missing_fields(self): + with pytest.raises(TypeError): + # Missing required statement_id + Claim.from_payload(b'{"verdict":"true","confidence":0.9,"evidence":"x","timestamp_logical":0}') + + def test_payload_size_is_bounded(self): + """Confirms the EVIDENCE_MAX_LEN cap keeps payload under a sane size.""" + c = Claim(statement_id="s99", verdict="true", confidence=1.0, + evidence="x" * 280, timestamp_logical=999) + b = c.to_payload() + # JSON overhead + 280 evidence + small fields ~= < 400 bytes + assert len(b) < 500 diff --git a/tests/test_demo_fact_check.py b/tests/test_demo_fact_check.py new file mode 100644 index 0000000..c1e014f --- /dev/null +++ b/tests/test_demo_fact_check.py @@ -0,0 +1,101 @@ +"""End-to-end test: fact_check scenario runs cleanly and emits a valid proof.""" + +import json +from pathlib import Path + +from crisis_agents.alarm import scan_for_mutations +from crisis_agents.cli import main as cli_main +from crisis_agents.mothership import Mothership +from crisis_agents.proof import ( + build_proof, + verify_proof_self_consistent, +) +from crisis_agents.scenarios import build_fact_check_scenario + + +class TestFactCheckEndToEnd: + + def test_scenario_loads(self): + s = build_fact_check_scenario() + assert s.name == "fact_check" + assert len(s.honest_agents) == 3 + assert s.byzantine_joiner.name == "agent_delta" + assert "Pluto" in s.reference_doc + + def test_runs_through_both_phases_and_raises_one_alarm(self): + s = build_fact_check_scenario() + m = Mothership() + for agent in s.honest_agents: + m.add_agent(agent) + + m.run_closed_phase(num_turns=s.closed_phase_turns) + assert len(m.run_result.closed_log) == 3 * 6 # 3 agents × 6 statements + assert m.all_graphs() == {} # no DAG in closed phase + + m.open_boundary(s.byzantine_joiner) + m.run_crisis_phase(num_turns=s.crisis_phase_turns) + + # The byzantine emitted two contradictory variants of s03; + # honest agents emitted nothing in the Crisis phase (their script + # was exhausted in the closed phase). + assert len(m.run_result.crisis_log) == 2 + + alarms = scan_for_mutations(m) + assert len(alarms) == 1 + a = alarms[0] + assert a.accused_agent == "agent_delta" + assert a.statement_id == "s03" + assert a.spacelike_verified is True + + def test_proof_is_self_consistent_and_round_trips(self, tmp_path): + s = build_fact_check_scenario() + m = Mothership() + for agent in s.honest_agents: + m.add_agent(agent) + m.run_closed_phase(num_turns=s.closed_phase_turns) + m.open_boundary(s.byzantine_joiner) + m.run_crisis_phase(num_turns=s.crisis_phase_turns) + + alarm = scan_for_mutations(m)[0] + proof = build_proof(m, alarm) + out = tmp_path / "proof.json" + out.write_text(proof.to_json()) + + # Reload, re-verify + from crisis_agents.proof import ProofDocument + reloaded = ProofDocument.from_json(out.read_text()) + assert verify_proof_self_consistent(reloaded).ok + + +class TestCli: + + def test_cli_demo_runs(self, tmp_path, capsys): + exit_code = cli_main(["demo", "--scenario", "fact_check", + "--out-dir", str(tmp_path)]) + assert exit_code == 0 + captured = capsys.readouterr() + assert "crisis-agents demo" in captured.out + assert "Phase 1" in captured.out + assert "Phase 2" in captured.out + assert "alarm" in captured.out.lower() + # A proof file landed + proofs = list(tmp_path.glob("proof_*.json")) + assert len(proofs) == 1 + # The proof file is valid JSON + obj = json.loads(proofs[0].read_text()) + assert obj["accused_agent"] == "agent_delta" + + def test_cli_verify_passes_on_valid_proof(self, tmp_path, capsys): + # First produce a proof via the demo + cli_main(["demo", "--scenario", "fact_check", "--out-dir", str(tmp_path)]) + proof_path = next(tmp_path.glob("proof_*.json")) + capsys.readouterr() # drain demo output + + exit_code = cli_main(["verify", str(proof_path)]) + assert exit_code == 0 + out = capsys.readouterr().out + assert "self-consistent: True" in out + + def test_cli_unknown_scenario(self, capsys): + exit_code = cli_main(["demo", "--scenario", "nonexistent"]) + assert exit_code == 2 diff --git a/tests/test_mothership.py b/tests/test_mothership.py new file mode 100644 index 0000000..ed1b815 --- /dev/null +++ b/tests/test_mothership.py @@ -0,0 +1,134 @@ +"""Tests for the Mothership orchestrator — closed phase + Crisis-phase wiring.""" + +import pytest + +from crisis_agents.agent import MockAgent, MockByzantineAgent +from crisis_agents.claim import Claim +from crisis_agents.mothership import Mothership + + +def _claim(sid: str, verdict: str = "true", turn: int = 0, evidence: str = "ok") -> Claim: + return Claim( + statement_id=sid, verdict=verdict, confidence=0.9, # type: ignore[arg-type] + evidence=evidence, timestamp_logical=turn, + ) + + +class TestClosedPhase: + + def test_no_dag_in_closed_phase(self): + m = Mothership() + m.add_agent(MockAgent("a", [[_claim("s01")]])) + m.add_agent(MockAgent("b", [[_claim("s01")]])) + result = m.run_closed_phase(num_turns=1) + + # Two agents emitted one claim each + assert len(result.closed_log) == 2 + names = [e.agent_name for e in result.closed_log] + assert "a" in names and "b" in names + + # No graphs allocated + assert m.all_graphs() == {} + assert not m.boundary.is_open + + def test_multi_turn_observes_prior_claims(self): + """Each turn's agents see the claims emitted in previous turns.""" + class WatcherAgent(MockAgent): + def __init__(self, name): + super().__init__(name, [[_claim("s01")], [_claim("s02")]]) + self.received_per_turn: list[int] = [] + + def next_turn(self, turn, received): + self.received_per_turn.append(len(received)) + return super().next_turn(turn, received) + + w = WatcherAgent("watcher") + other = MockAgent("other", [[_claim("s99")], [_claim("s99")]]) + m = Mothership() + m.add_agent(w) + m.add_agent(other) + m.run_closed_phase(num_turns=2) + # Turn 0: watcher sees 0 prior claims; Turn 1: watcher sees 2 from turn 0. + assert w.received_per_turn == [0, 2] + + def test_add_agent_after_open_rejected(self): + m = Mothership() + m.add_agent(MockAgent("a", [[_claim("s01")]])) + m.open_boundary(MockAgent("byz", [[_claim("s01")]])) + with pytest.raises(RuntimeError, match="cannot add_agent"): + m.add_agent(MockAgent("late", [])) + + +class TestCrisisPhaseWiring: + + def test_open_boundary_initializes_graphs(self): + m = Mothership() + m.add_agent(MockAgent("a", [[_claim("s01")]])) + m.add_agent(MockAgent("b", [[_claim("s01")]])) + m.open_boundary(MockAgent("d", [[_claim("s01")]])) + + # One graph per agent, including the joiner + graphs = m.all_graphs() + assert set(graphs.keys()) == {"a", "b", "d"} + for g in graphs.values(): + assert g.vertex_count() == 0 # not yet run + + def test_run_crisis_phase_extends_per_agent_graphs(self): + m = Mothership() + m.add_agent(MockAgent("a", [[_claim("s01")]])) + m.add_agent(MockAgent("b", [[_claim("s01")]])) + m.open_boundary(MockAgent("d", [[_claim("s01")]])) + result = m.run_crisis_phase(num_turns=1) + + # Each agent's graph should now contain three vertices + # (broadcast claims from a, b, d delivered to everyone). + for name in ("a", "b", "d"): + assert m.graph_of(name).vertex_count() == 3 + + assert len(result.crisis_log) == 3 + for entry in result.crisis_log: + assert set(entry.delivered_to) == {"a", "b", "d"} + + def test_byzantine_equivocation_splits_delivery(self): + """A MockByzantineAgent delivers two different claims to disjoint + subsets — the foundation of the equivocation detection demo.""" + m = Mothership() + m.add_agent(MockAgent("a", [[]])) + m.add_agent(MockAgent("b", [[]])) + + byz = MockByzantineAgent( + "d", + scripted_pairs=[( + _claim("s01", verdict="true", evidence="to_a"), + _claim("s01", verdict="false", evidence="to_b"), + )], + split_a={"a"}, + split_b={"b"}, + ) + m.open_boundary(byz) + m.run_crisis_phase(num_turns=1) + + # a sees the true-variant, b sees the false-variant. + # d (the byzantine sender) sees neither — see Mothership._emit docstring. + a_payloads = [v.payload for v in m.graph_of("a").all_vertices()] + b_payloads = [v.payload for v in m.graph_of("b").all_vertices()] + d_payloads = [v.payload for v in m.graph_of("d").all_vertices()] + + assert any(b'"verdict":"true"' in p for p in a_payloads) + assert all(b'"verdict":"false"' not in p for p in a_payloads) + + assert any(b'"verdict":"false"' in p for p in b_payloads) + assert all(b'"verdict":"true"' not in p for p in b_payloads) + + # d's own graph holds neither equivocation (targeted delivery skips sender). + assert len(d_payloads) == 0 + + # But the crisis_log records both emissions so the mothership can + # generate proofs from this perspective. + assert len(m.run_result.crisis_log) == 2 + + def test_run_crisis_phase_requires_open_boundary(self): + m = Mothership() + m.add_agent(MockAgent("a", [[_claim("s01")]])) + with pytest.raises(RuntimeError, match="boundary not yet open"): + m.run_crisis_phase(num_turns=1) diff --git a/tests/test_proof.py b/tests/test_proof.py new file mode 100644 index 0000000..ab9ffac --- /dev/null +++ b/tests/test_proof.py @@ -0,0 +1,138 @@ +"""Tests for proof generation and self-consistent verification.""" + +import json + +from crisis_agents.agent import MockAgent, MockByzantineAgent +from crisis_agents.alarm import scan_for_mutations +from crisis_agents.claim import Claim +from crisis_agents.mothership import Mothership +from crisis_agents.proof import ( + ProofDocument, + build_proof, + verify_proof_self_consistent, +) + + +def _claim(sid: str, verdict: str = "true", evidence: str = "ok") -> Claim: + return Claim(statement_id=sid, verdict=verdict, confidence=0.9, # type: ignore[arg-type] + evidence=evidence, timestamp_logical=0) + + +def _equivocating_run() -> tuple[Mothership, list]: + m = Mothership() + m.add_agent(MockAgent("a", [[]])) + m.add_agent(MockAgent("b", [[]])) + m.add_agent(MockAgent("c", [[]])) + m.open_boundary(MockByzantineAgent( + "d", + scripted_pairs=[( + _claim("s03", verdict="true", evidence="to_a"), + _claim("s03", verdict="false", evidence="to_b"), + )], + split_a={"a", "c"}, + split_b={"b"}, + )) + m.run_crisis_phase(num_turns=1) + alarms = scan_for_mutations(m) + return m, alarms + + +class TestBuildProof: + + def test_produces_well_formed_proof(self): + m, alarms = _equivocating_run() + assert len(alarms) == 1 + proof = build_proof(m, alarms[0]) + assert proof.accused_agent == "d" + assert proof.statement_id == "s03" + assert proof.turn == 0 + assert len(proof.witnesses) == 2 + assert proof.spacelike_verified is True + + def test_dag_witnesses_reference_real_graphs(self): + m, alarms = _equivocating_run() + proof = build_proof(m, alarms[0]) + assert len(proof.dag_witnesses) == 2 + + # Each dag_witness should name only honest agents (not "d") + for dw in proof.dag_witnesses: + assert "d" not in dw.observed_by + + # Each variant should have been observed by at least one honest agent + # (the variant's delivered-to subset) + for dw in proof.dag_witnesses: + assert len(dw.observed_by) >= 1 + + +class TestJsonRoundtrip: + + def test_to_json_is_valid(self): + m, alarms = _equivocating_run() + proof = build_proof(m, alarms[0]) + text = proof.to_json() + parsed = json.loads(text) + assert parsed["accused_agent"] == "d" + assert parsed["statement_id"] == "s03" + + def test_from_json_inverts_to_json(self): + m, alarms = _equivocating_run() + original = build_proof(m, alarms[0]) + roundtrip = ProofDocument.from_json(original.to_json()) + assert roundtrip.accused_agent == original.accused_agent + assert roundtrip.statement_id == original.statement_id + assert roundtrip.turn == original.turn + assert roundtrip.spacelike_verified == original.spacelike_verified + assert len(roundtrip.witnesses) == len(original.witnesses) + + +class TestSelfConsistentVerification: + + def test_valid_proof_passes(self): + m, alarms = _equivocating_run() + proof = build_proof(m, alarms[0]) + result = verify_proof_self_consistent(proof) + assert result.ok, result.reason + + def test_tampered_witness_digest_fails(self): + """If someone alters a witness digest after-the-fact to make it look + like a duplicate, self-consistency check catches the lack of distinct + digests.""" + m, alarms = _equivocating_run() + proof = build_proof(m, alarms[0]) + # Tamper: make both digests identical + from dataclasses import replace + from crisis_agents.alarm import MutationWitness + w0 = proof.witnesses[0] + w1 = proof.witnesses[1] + tampered = ProofDocument( + schema_version=proof.schema_version, + accused_agent=proof.accused_agent, + accused_process_id_hex=proof.accused_process_id_hex, + statement_id=proof.statement_id, + turn=proof.turn, + witnesses=(w0, replace(w1, message_digest_hex=w0.message_digest_hex)), + dag_witnesses=proof.dag_witnesses, + spacelike_verified=proof.spacelike_verified, + proof_summary=proof.proof_summary, + ) + result = verify_proof_self_consistent(tampered) + assert not result.ok + assert "duplicate" in result.reason.lower() + + def test_mismatched_statement_id_fails(self): + m, alarms = _equivocating_run() + proof = build_proof(m, alarms[0]) + from dataclasses import replace + bad = ProofDocument( + schema_version=proof.schema_version, + accused_agent=proof.accused_agent, + accused_process_id_hex=proof.accused_process_id_hex, + statement_id="DIFFERENT", # mismatch + turn=proof.turn, + witnesses=proof.witnesses, + dag_witnesses=proof.dag_witnesses, + spacelike_verified=proof.spacelike_verified, + proof_summary=proof.proof_summary, + ) + result = verify_proof_self_consistent(bad) + assert not result.ok