diff --git a/src/crisis_agents/agent.py b/src/crisis_agents/agent.py index 19a680e..e6e9dd1 100644 --- a/src/crisis_agents/agent.py +++ b/src/crisis_agents/agent.py @@ -1,19 +1,19 @@ """ -CrisisAgent — a first-class network participant. +CrisisAgent — a first-class network participant in an asynchronous network. Each agent owns: - a stable 32-byte process_id (derived from its name) - its own LamportGraph (the agent's view of the network) - its own weight system (shared across the network for compatibility) - - the means to wrap Claims into Crisis Messages and extend its own graph + - decision logic: `try_emit()` is asked "do you have something to say?", + `pending_alarm_claims()` is asked "do you currently observe an + un-alarmed equivocation?" -Crucially the agent is **NOT a passive script driven by the mothership**. The -mothership coordinates the clock and the bootstrap; the agent does the work. - -This is the change from the centralized version: previously the mothership -held a dict of all agents' graphs and called `_wrap_as_message` against each. -Now `emit_claim` lives on the agent. The mothership routes the resulting -message to its delivery targets, but it never reads the graph. +There is no global clock. Agents don't see a "turn number" because there +isn't one — any synchronicity in the network is virtual, derived from the +DAG structure by the consensus algorithm itself (not by the driver loop). +The mothership/driver just cycles asking each agent for any pending +content until the network is quiescent. """ from __future__ import annotations @@ -38,27 +38,29 @@ def agent_id_from_name(name: str) -> bytes: @dataclass class AgentTurn: - """One emission from an agent in a given turn. + """One emission from an agent. + + The word "turn" here is vestigial — it means "this single emission + event," not "a tick of a global clock." Kept because renaming the type + causes more churn than it's worth. Attributes: claim: The Claim being emitted. - target_subset: None means broadcast (every agent including sender - receives it via the mothership's initial routing). - A set of peer names means initial delivery is limited - to those peers — the byzantine equivocation building - block. Subsequent gossip rounds may propagate it to - other peers anyway. + target_subset: None means broadcast to every peer including + sender. A set of peer names means initial delivery + is limited to those peers (the byzantine + equivocation building block). """ claim: Claim target_subset: Optional[set[str]] = None class CrisisAgent(ABC): - """A network participant with its own graph and its own brain. + """An asynchronous network participant. - Concrete subclasses implement `next_turn` to decide what to say. The - base class handles emit/receive/gossip mechanics so every agent — mock - or live — uses the same Crisis-protocol machinery underneath. + Concrete subclasses implement `try_emit` to decide what to say. The + base class handles emit/receive/gossip/detect mechanics uniformly so + every agent — mock or live — uses the same Crisis substrate. """ def __init__(self, name: str, *, weight_system: Optional[WeightSystem] = None): @@ -68,16 +70,38 @@ class CrisisAgent(ABC): self.process_id: bytes = agent_id_from_name(name) self.weight_system: WeightSystem = weight_system or ProofOfWorkWeight(min_leading_zeros=0) self.graph: LamportGraph = LamportGraph(weight_system=self.weight_system) + # Track alarms we've already emitted so pending_alarm_claims doesn't + # repeat. Keyed by (accused, statement_id, sorted-witness-pair). + self._already_alarmed: set[tuple[str, str, tuple[str, str]]] = set() # ------------------------------------------------------------------ - # Decision-making — to be implemented by subclasses + # Decision-making — implemented by subclasses # ------------------------------------------------------------------ @abstractmethod - def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]: - """Produce this agent's emissions for the given turn.""" + def try_emit(self) -> list[AgentTurn]: + """Return any emissions the agent is ready to make right now. + + The agent decides based on its own internal state. The driver loop + asks this repeatedly until the agent returns nothing. There is no + turn argument — agents in an async network don't see a global tick. + """ ... + def observe(self, claim: Claim) -> None: + """Optional callback for pre-Crisis context. + + Used by the closed-phase loop: when one agent emits a claim, every + other agent's `observe(claim)` is called so they can incorporate + the conversation history into their own state. Default is no-op; + subclasses like LiveClaudeAgent override to maintain a context + buffer for their LLM prompt. + + In the Crisis phase this is NOT called — agents introspect their + own LamportGraph for context. The closed phase has no graph yet. + """ + pass + # ------------------------------------------------------------------ # Crisis-protocol mechanics — uniform across all agents # ------------------------------------------------------------------ @@ -86,17 +110,20 @@ class CrisisAgent(ABC): """Wrap a Claim into a fully-valid Crisis Message built FROM this agent's own graph state. - The agent does NOT extend its own graph here — the mothership decides - whether the sender receives a copy (broadcast: yes; targeted: no, to - enable byzantine equivocation without immediately failing the chain - constraint in the sender's own graph). + The agent does NOT extend its own graph here — the routing layer + decides whether the sender's own graph receives a copy. """ - payload = claim.to_payload() + return self._build_message(claim.to_payload()) + def _build_message(self, payload: bytes) -> Message: + """Build a Crisis Message with arbitrary payload bytes. + + Used by both `emit_claim` (regular Claims) and the alarm-emission + path (AlarmClaim payloads). + """ digests_list: list[bytes] = [] - # Step 1: chain link — if there's any same-id vertex in MY graph, the - # new message must reference one of them. + # Chain link same_id = [v for v in self.graph.all_vertices() if v.id == self.process_id] past_digests: set[bytes] = set() if same_id: @@ -113,7 +140,7 @@ class CrisisAgent(ABC): digests_list.append(last.message_digest) past_digests = {v.message_digest for v in self.graph.past(last)} - # Step 2: cross-references — one most-recent vertex per other id. + # Cross-references seen_other_ids: set[bytes] = {self.process_id} for v in self.graph.all_vertices(): if v.id in seen_other_ids: @@ -123,10 +150,10 @@ class CrisisAgent(ABC): digests_list.append(v.message_digest) seen_other_ids.add(v.id) - # Step 3: mine a valid PoW nonce. + # Mine PoW if isinstance(self.weight_system, ProofOfWorkWeight): return self.weight_system.mine_nonce( - self.process_id, tuple(digests_list), payload + self.process_id, tuple(digests_list), payload, ) return Message( nonce=os.urandom(NONCE_LENGTH), @@ -136,36 +163,17 @@ class CrisisAgent(ABC): ) def receive(self, message: Message) -> Optional[Vertex]: - """Extend my graph with the given message if integrity holds. - - Returns the resulting Vertex on success, None if the integrity check - rejects it (duplicate, missing references, broken chain). Receiving - is idempotent: extending with a message whose digest is already in - the graph is a silent no-op (returns None). - """ + """Extend my graph with the given message if integrity holds.""" if message.compute_digest() in self.graph: return None return self.graph.extend(message) - def detect_mutations(self): - """Scan MY graph for byzantine equivocation. Returns a list of - LocalAlarms (defined in alarm.py). Imported lazily to avoid a - cyclic import at module load time. - """ - from crisis_agents.alarm import detect_mutations_in_graph - return detect_mutations_in_graph(self.graph, self.name, self.process_id) - def gossip_to(self, peer: "CrisisAgent") -> int: - """Share my vertices with `peer`. Returns the count newly accepted. + """Share my vertices with `peer`. Returns count newly accepted. - Iterates until no progress: a message can only be accepted after all - its referenced digests already exist in the peer's graph, so this is - a multi-pass extend (Algorithm 4 in the paper, in-process flavor). - - Honest gossip: the sender doesn't pick what to share — it shares - everything it has, and the peer's integrity check filters. A byzantine - could selectively gossip, but that's modeled at emit time, not gossip - time; we don't expose a "skip this vertex" hook here. + Iterates until no progress: a message is only accepted after all + its referenced digests already exist in the peer's graph + (Algorithm 4 in the paper, in-process flavor). """ accepted = 0 progress = True @@ -179,6 +187,49 @@ class CrisisAgent(ABC): progress = True return accepted + def detect_mutations(self): + """Scan MY graph for same-id spacelike vertex pairs. + + Returns a list of LocalAlarms. Imported lazily to avoid a cyclic + import at module load time. + """ + from crisis_agents.alarm import detect_mutations_in_graph + return detect_mutations_in_graph(self.graph, self.name, self.process_id) + + def pending_alarm_claims(self) -> list: + """Run detection and produce AlarmClaim payloads for any newly + observed equivocations. + + An "already alarmed" set tracks claims this agent has already + emitted, so calling this repeatedly is idempotent — the driver + loop can call it until quiescence without flooding the network + with duplicate AlarmClaims. + + Returns a list of AlarmClaim instances (defined in vote.py) that + the driver should broadcast on the agent's behalf. + """ + from crisis_agents.vote import AlarmClaim + + local_alarms = self.detect_mutations() + new_claims: list = [] + for la in local_alarms: + # Canonical key for dedup + key = (la.accused_process_id_hex, la.statement_id, la.witness_digests) + if key in self._already_alarmed: + continue + # detected_at_step is the agent's local sequence number — we + # don't have a meaningful global step, so we use the count of + # alarms already raised by this agent as a stable ordinal. + ac = AlarmClaim( + accused_process_id_hex=la.accused_process_id_hex, + statement_id=la.statement_id, + witness_digests=la.witness_digests, + emitted_at_step=len(self._already_alarmed), + ) + new_claims.append(ac) + self._already_alarmed.add(key) + return new_claims + def __repr__(self) -> str: return f"{type(self).__name__}(name={self.name!r}, id={self.process_id.hex()[:8]}...)" @@ -191,13 +242,10 @@ class CrisisAgent(ABC): class MockAgent(CrisisAgent): """An agent that emits a predetermined sequence of claims. - `scripted_claims[N]` is the list of Claims this agent emits on its Nth - `next_turn()` invocation. The agent maintains its own invocation counter - independent of the mothership's turn counter, so it can be reused across - closed-phase and Crisis-phase calls without restarting. - - All emissions are broadcast (no equivocation). For equivocation, use - `MockByzantineAgent`. + `scripted_claims[N]` is the list of Claims emitted on the agent's Nth + `try_emit()` invocation. After the script is exhausted, the agent + emits nothing forever — the driver loop's quiescence check terminates + naturally. """ def __init__(self, name: str, scripted_claims: list[list[Claim]], @@ -206,7 +254,7 @@ class MockAgent(CrisisAgent): self._script = scripted_claims self._invocations = 0 - def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]: + def try_emit(self) -> list[AgentTurn]: idx = self._invocations self._invocations += 1 if idx >= len(self._script): @@ -217,19 +265,16 @@ class MockAgent(CrisisAgent): class MockByzantineAgent(CrisisAgent): """An agent designed to equivocate. - Lifecycle: - - Invocation 0: emit a broadcast `intro_claim` (a benign "I've joined" - message). This is **necessary** for the equivocation step: both - variants of the equivocating claim will chain to this intro, so they - can propagate through the gossip layer (otherwise the chain constraint - in `Message.message_integrity` step 6 would reject the second variant - in any graph that already holds the first). - - Invocations 1..N: emit pairs of contradictory claims, with the first - variant targeted at `split_a` and the second at `split_b`. Both - variants in a pair carry the same `statement_id` but contradict on - `verdict`. + On its first `try_emit()` invocation it broadcasts an `intro_claim` so + every honest agent has a same-id vertex to chain the equivocation off. + On subsequent invocations it emits pairs of contradictory claims from + `scripted_pairs`, with the first variant targeted at `split_a` and the + second at `split_b`. - Set `scripted_pairs` empty to test "byzantine joined but didn't equivocate". + Byzantines never emit AlarmClaims about other agents — there's a + subclass override of `pending_alarm_claims` that returns empty. (A + more advanced byzantine could emit FALSE AlarmClaims to test the + quorum-vote isolation; not in scope for this PoC.) """ def __init__(self, name: str, intro_claim: Claim, @@ -245,12 +290,10 @@ class MockByzantineAgent(CrisisAgent): self._split_b = split_b self._invocations = 0 - def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]: + def try_emit(self) -> list[AgentTurn]: idx = self._invocations self._invocations += 1 if idx == 0: - # The introduction turn: a single broadcast so all peers learn my - # identity and have a same-id vertex to chain my equivocations to. return [AgentTurn(claim=self._intro, target_subset=None)] pair_idx = idx - 1 if pair_idx >= len(self._script): @@ -260,3 +303,7 @@ class MockByzantineAgent(CrisisAgent): if claim_b is not None: out.append(AgentTurn(claim=claim_b, target_subset=set(self._split_b))) return out + + def pending_alarm_claims(self) -> list: + """Byzantines don't emit alarms in our threat model.""" + return [] diff --git a/src/crisis_agents/cli.py b/src/crisis_agents/cli.py index 7483599..ed87a27 100644 --- a/src/crisis_agents/cli.py +++ b/src/crisis_agents/cli.py @@ -2,15 +2,15 @@ crisis-agents — command-line entry point. Subcommands: - demo Run a scripted scenario end-to-end. Walks the four phases: - closed team → boundary opens → Crisis-active rounds + gossip → - decentralized detection + alarm voting → proof emission. + demo Run a scripted scenario end-to-end. The Crisis phase runs an + asynchronous event loop to quiescence — no global clock, no + fixed turn count. verify Re-check a proof JSON for self-consistency. Examples: crisis-agents demo --scenario fact_check crisis-agents demo --scenario fact_check --live - crisis-agents verify proof_agent_delta_s03.json + crisis-agents verify proof__s03.json """ from __future__ import annotations @@ -53,12 +53,13 @@ def _run_demo(args: argparse.Namespace) -> int: mothership.add_agent(agent) # ---- Phase 1: closed team, no Crisis ---- - print(f"--- Phase 1: closed team, no Crisis ({scenario.closed_phase_turns} turn(s)) ---") - mothership.run_closed_phase(num_turns=scenario.closed_phase_turns) + print("--- Phase 1: closed team, no Crisis ---") + closed_report = mothership.run_closed_phase() honest_names = [a.name for a in mothership.agents.values()] print( - f" {len(mothership.run_result.closed_log)} claims from " - f"{len(honest_names)} honest agent(s): {', '.join(honest_names)}" + f" driven to quiescence in {closed_report.steps} step(s); " + f"{closed_report.emissions} claims from " + f"{len(honest_names)} honest agent(s)." ) print(f" Per-agent graphs: not yet allocated (Crisis is dormant).\n") @@ -66,47 +67,42 @@ def _run_demo(args: argparse.Namespace) -> int: print(f"--- Phase 2: boundary opens — {scenario.byzantine_joiner.name} joins ---") mothership.open_boundary(scenario.byzantine_joiner) print(f" Trust set is now {mothership.boundary.size()} agents.") - print(f" Crisis is now ACTIVE for every subsequent emission.\n") + print(f" Crisis is now ACTIVE — agents emit asynchronously.\n") - # ---- Phase 3: Crisis-active rounds (emission + gossip) ---- - print(f"--- Phase 3: emission + gossip " - f"({scenario.crisis_phase_turns} turn(s)) ---") - mothership.run_crisis_phase( - num_turns=scenario.crisis_phase_turns, - gossip_rounds_per_turn=1, + # ---- Phase 3: async event loop to quiescence ---- + print("--- Phase 3: asynchronous event loop (no clock) ---") + report = mothership.run_until_quiescent() + print( + f" drove to quiescence in {report.steps} step(s):\n" + f" {report.emissions:3d} regular emissions\n" + f" {report.gossip_transfers:3d} gossip transfers\n" + f" {report.alarm_claims_emitted:3d} alarm claims emitted" ) - crisis_log = mothership.run_result.crisis_log - print(f" {len(crisis_log)} Crisis messages emitted.") - print(f" After gossip:") + print(f" After convergence:") for name, agent in mothership.agents.items(): print(f" {name:14s} graph: {agent.graph.vertex_count():2d} vertices") print() - # ---- Phase 4: each agent independently detects ---- + # ---- Phase 4: each agent's own detection result ---- print("--- Phase 4: decentralized detection (each agent's own brain) ---") - local_alarms = {} + detected_by = [] for name, agent in mothership.agents.items(): alarms = agent.detect_mutations() - local_alarms[name] = alarms marker = "ALARM" if alarms else "ok " suffix = "" if alarms: + detected_by.append(name) suffix = (f" — accuses {alarms[0].accused_process_id_hex[:16]}... " f"on {alarms[0].statement_id}") print(f" [{marker}] {name:14s}{suffix}") - detector_count = sum(1 for a in local_alarms.values() if a) - print(f" {detector_count} of {len(mothership.agents)} agents independently " - f"detected byzantine behavior.\n") + print(f" {len(detected_by)} of {len(mothership.agents)} agents detected " + f"byzantine behavior independently.\n") - # ---- Phase 5: alarm emission + quorum voting ---- - print("--- Phase 5: alarms emitted + gossiped + ratified by quorum ---") - mothership.emit_alarms_from_detectors() - mothership.run_gossip_round() + # ---- Phase 5: quorum tally ---- + print("--- Phase 5: ratification by quorum ---") threshold = quorum_for(mothership.boundary.size()) print(f" Quorum threshold = ⌈2*{mothership.boundary.size()}/3⌉ = {threshold}") - # All honest agents should agree on the ratified set — show by querying - # each of them and confirming. ratified_per_agent = { name: mothership.ratified_alarms_from(name) for name in mothership.agents @@ -118,10 +114,7 @@ def _run_demo(args: argparse.Namespace) -> int: canonical = ratified_per_agent[name] elif ratified_per_agent[name] != canonical: all_agree = False - if all_agree: - marker = "✓" - else: - marker = "✗" + marker = "✓" if all_agree else "✗" print(f" {marker} every honest agent's ratified set is identical " f"({'no chokepoint' if all_agree else 'DIVERGENCE'}).") @@ -141,7 +134,6 @@ def _run_demo(args: argparse.Namespace) -> int: out_dir.mkdir(parents=True, exist_ok=True) for r in canonical: proof = build_proof(r) - # Use a stable filename based on accused + statement accused_short = r.accused_process_id_hex[:16] path = out_dir / f"proof_{accused_short}_{r.statement_id}.json" path.write_text(proof.to_json()) @@ -173,7 +165,7 @@ def _run_verify(args: argparse.Namespace) -> int: def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( prog="crisis-agents", - description="Crisis-Agents — decentralized coordination for AI agent teams.", + description="Crisis-Agents — decentralized async coordination for AI agent teams.", ) sub = parser.add_subparsers(dest="cmd", required=True) @@ -184,8 +176,7 @@ def main(argv: list[str] | None = None) -> int: help="back the honest agents with real Claude API calls " "(requires anthropic SDK + ANTHROPIC_API_KEY)") demo.add_argument("--model", default=None, - help="Anthropic model id for --live (default: " - "claude-haiku-4-5-20251001)") + help="Anthropic model id for --live") demo.add_argument("--out-dir", default=".", help="where to write proof JSON files (default: cwd)") diff --git a/src/crisis_agents/live_agent.py b/src/crisis_agents/live_agent.py index fc188a9..653c226 100644 --- a/src/crisis_agents/live_agent.py +++ b/src/crisis_agents/live_agent.py @@ -75,6 +75,14 @@ class LiveClaudeAgent(CrisisAgent): self._system_prompt = system_prompt or self._default_system_prompt() self._invocations = 0 self._already_adjudicated: set[str] = set() + # Conversation context — populated via observe() callbacks during + # the closed phase, and supplemented by self.graph introspection + # during the Crisis phase. + self._observed_history: list[Claim] = [] + + def observe(self, claim: Claim) -> None: + """Record a peer's claim into our conversation context.""" + self._observed_history.append(claim) @staticmethod def _default_system_prompt() -> str: @@ -115,8 +123,13 @@ class LiveClaudeAgent(CrisisAgent): self._client = anthropic.Anthropic() return self._client - def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]: - """Issue one API call, parse, return Claims as AgentTurns.""" + def try_emit(self) -> list[AgentTurn]: + """Issue one API call, parse, return Claims as AgentTurns. + + Context is built from both `self._observed_history` (closed-phase + observations) and any Claim payloads in `self.graph` (Crisis-phase + gossiped messages from peers). + """ self._invocations += 1 # Which statements still need a verdict from me? @@ -125,7 +138,18 @@ class LiveClaudeAgent(CrisisAgent): if not pending: return [] - user_message = self._render_user_message(pending, received_claims) + # Crisis-phase: also peek at the agent's own graph for peer claims. + graph_observations: list[Claim] = [] + for v in self.graph.all_vertices(): + if v.id == self.process_id: + continue + try: + graph_observations.append(Claim.from_payload(v.payload)) + except (ValueError, TypeError): + continue + context = self._observed_history + graph_observations + + user_message = self._render_user_message(pending, context) client = self._get_client() response = client.messages.create( diff --git a/src/crisis_agents/mothership.py b/src/crisis_agents/mothership.py index 59fdf3b..575cf0d 100644 --- a/src/crisis_agents/mothership.py +++ b/src/crisis_agents/mothership.py @@ -1,19 +1,27 @@ """ -Mothership — bootstrap + clock, **not** an observer. +Mothership — bootstrap + asynchronous driver. -The mothership's only privileged role is starting the network: it knows the -initial member set, it asks each agent to take its turn, and it routes the -first hop of each emission to the sender's chosen target subset. After that -first hop, gossip rounds propagate messages and each agent reaches its own -view of the network. +Two roles, both unprivileged: -What the mothership deliberately does NOT do (which the previous version -did, and was correctly criticized for): - - hold a dict of all agents' LamportGraphs - - wrap Claims into Crisis Messages on agents' behalf - - scan any agent's graph for byzantine behavior + 1. Bootstrap. The mothership knows the initial member set, introduces a + joining agent into the boundary, and offers convenience accessors for + tests. It never reads any agent's internal state in ways that would + bypass the protocol. -Those responsibilities belong to the agents. + 2. Driver. The mothership runs an event-loop-like cycle that asks each + agent for any pending emissions, exchanges gossip between any pair, + and asks each agent for any pending alarm claims — all interleaved + in one loop until quiescent. There is no global clock; the driver + is the in-process analog of "agents run their gossip + emission + loops concurrently forever" that Section 5.9 of the paper describes. + +What the mothership deliberately does NOT have: + - a synchronous turn counter exposed to agents + - a privileged graph store + - any post-hoc scan over per-agent state + +Termination is by quiescence: when no agent emits, no gossip pair has new +information, and no fresh alarms appear, the loop exits. """ from __future__ import annotations @@ -30,9 +38,13 @@ from crisis_agents.claim import Claim @dataclass class ClosedPhaseEntry: - """One row in the closed-phase log: who said what, when.""" + """One row in the closed-phase log. + + `step` is a local sequence number used only for log ordering — it is + NOT a global tick the agents observe. + """ agent_name: str - turn: int + step: int claim: Claim @@ -40,12 +52,11 @@ class ClosedPhaseEntry: class CrisisPhaseEntry: """Audit trail of an emission event during the Crisis phase. - Kept for proof generation and human-readable demos. Detection itself - does NOT consult this log — that work happens in each agent's - `detect_mutations()` against its own graph. + `step` is a local sequence number. Detection itself does not consult + this log — that work happens in each agent's own `detect_mutations()`. """ agent_name: str - turn: int + step: int claim: Claim message_digest_hex: str delivered_to: list[str] @@ -57,16 +68,27 @@ class MothershipRunResult: crisis_log: list[CrisisPhaseEntry] = field(default_factory=list) +@dataclass(frozen=True) +class QuiescenceReport: + """How a driver loop terminated.""" + steps: int + emissions: int + gossip_transfers: int + alarm_claims_emitted: int + reached_quiescence: bool # True iff loop exited because nothing changed + # (False iff max_steps was hit first) + + class Mothership: """Coordinator for a team of CrisisAgents. Lifecycle: m = Mothership() m.add_agent(...); m.add_agent(...); m.add_agent(...) - m.run_closed_phase(num_turns=1) - m.open_boundary(joining_agent) - m.run_crisis_phase(num_turns=2, gossip_rounds_per_turn=1) - # detection is decentralized — each agent's .detect_mutations() + m.run_closed_phase() # async until quiescent (no clock) + m.open_boundary(joiner) + m.run_until_quiescent() # async until quiescent (no clock) + # detection is decentralized — m.ratified_alarms_from("agent_alpha") """ def __init__(self, *, pow_zeros: int = 0): @@ -74,24 +96,16 @@ class Mothership: self.boundary = Boundary() self.run_result = MothershipRunResult() - # Shared weight system across the network — every agent's PoW must - # be verifiable by every other agent's graph, so the threshold has - # to match. Assigned to each agent's graph at registration time. + # Shared weight system so every agent's PoW is verifiable by every + # other agent's graph. self._weight_system: WeightSystem = ProofOfWorkWeight(min_leading_zeros=pow_zeros) - self._closed_turn_index = 0 - self._crisis_turn_index = 0 - # ------------------------------------------------------------------ # Setup # ------------------------------------------------------------------ def add_agent(self, agent: CrisisAgent) -> None: - """Register a trusted agent for the closed-phase team. - - Replaces the agent's weight system with the mothership's shared one - so PoW thresholds match across the network. - """ + """Register a trusted agent for the closed-phase team.""" if self.boundary.is_open: raise RuntimeError("cannot add_agent after boundary opened; use open_boundary") if agent.name in self.agents: @@ -101,33 +115,6 @@ class Mothership: self.agents[agent.name] = agent self.boundary.add_trusted(agent.process_id) - # ------------------------------------------------------------------ - # Phase 1: closed - # ------------------------------------------------------------------ - - def run_closed_phase(self, num_turns: int) -> MothershipRunResult: - """Drive `num_turns` of plain agent communication. No Crisis.""" - if self.boundary.is_open: - raise RuntimeError("boundary already open; closed phase is over") - - observed: list[Claim] = [e.claim for e in self.run_result.closed_log] - for _ in range(num_turns): - turn = self._closed_turn_index - new_this_turn: list[Claim] = [] - for agent in self.agents.values(): - for at in agent.next_turn(turn, observed): - self.run_result.closed_log.append( - ClosedPhaseEntry(agent_name=agent.name, turn=turn, claim=at.claim) - ) - new_this_turn.append(at.claim) - observed.extend(new_this_turn) - self._closed_turn_index += 1 - return self.run_result - - # ------------------------------------------------------------------ - # Phase 2: boundary opens - # ------------------------------------------------------------------ - def open_boundary(self, new_agent: CrisisAgent) -> None: """A new agent of unknown trust joins. Crisis activates.""" if new_agent.name in self.agents: @@ -138,100 +125,148 @@ class Mothership: self.boundary.open(new_agent.process_id) # ------------------------------------------------------------------ - # Crisis-phase mechanics: emission → gossip + # Closed phase — quiescence-driven, not turn-counted # ------------------------------------------------------------------ - def _crisis_received_view(self, agent: CrisisAgent) -> list[Claim]: - """Decode every non-self vertex in `agent`'s graph back to Claim form. + def run_closed_phase(self, max_steps: int = 50) -> QuiescenceReport: + """Drive the closed-phase conversation until quiescent. - Used to populate the `received_claims` argument of `next_turn()` so - each agent sees what it has actually observed (not what the mothership - observed — they may differ if gossip has been partial). + Each iteration, each agent's `try_emit()` is called. Emitted claims + are appended to the closed log and broadcast (via `observe`) to + every other agent for context. The loop exits when no agent emits. """ - out: list[Claim] = [] - for v in agent.graph.all_vertices(): - if v.id == agent.process_id: - continue - try: - out.append(Claim.from_payload(v.payload)) - except (ValueError, TypeError): - continue # non-Claim payloads (e.g. AlarmClaim — phase 23) - return out + if self.boundary.is_open: + raise RuntimeError("boundary already open; closed phase is over") - def run_crisis_phase(self, num_turns: int, - *, gossip_rounds_per_turn: int = 1) -> MothershipRunResult: - """Drive `num_turns` of Crisis-active activity. + step = 0 + emissions = 0 + progress = True + while progress and step < max_steps: + progress = False + for agent in self.agents.values(): + for at in agent.try_emit(): + self.run_result.closed_log.append( + ClosedPhaseEntry(agent_name=agent.name, step=step, claim=at.claim) + ) + emissions += 1 + progress = True + # Share to every other agent's observation buffer + for peer_name, peer in self.agents.items(): + if peer_name == agent.name: + continue + peer.observe(at.claim) + step += 1 - Each turn: - 1. Every agent's `next_turn()` runs; emissions are routed first-hop - to their declared target_subset (or broadcast to everyone). - 2. `gossip_rounds_per_turn` rounds of all-pairs gossip propagate - messages across the network. + return QuiescenceReport( + steps=step, + emissions=emissions, + gossip_transfers=0, + alarm_claims_emitted=0, + reached_quiescence=(step < max_steps or not progress), + ) + + # ------------------------------------------------------------------ + # Crisis phase — fully asynchronous event loop + # ------------------------------------------------------------------ + + def run_until_quiescent(self, max_steps: int = 200) -> QuiescenceReport: + """Drive the Crisis-active network until nothing changes. + + Each iteration of the loop interleaves three concerns: + + 1. **Emission.** For every agent, call `try_emit()`. Route any + returned emissions to their target subset. + 2. **Gossip.** Run one all-pairs gossip round. Each receiver + accepts vertices that pass integrity checks. + 3. **Alarm emission.** For every agent, call `pending_alarm_claims()`. + Wrap each into a Crisis Message and broadcast. + + These are not phases — they're things the driver tries on each + step. The loop exits when none of them makes progress. """ if not self.boundary.is_open: raise RuntimeError("boundary not yet open; call open_boundary() first") + step = 0 + emissions = 0 + gossip_transfers = 0 + alarms_emitted = 0 all_names = list(self.agents.keys()) - for _ in range(num_turns): - turn = self._crisis_turn_index + while step < max_steps: + progress = False - # (1) Emission phase — ask each agent what they want to say. - # The agent builds the Crisis Message from its own graph. - # The mothership only handles the first-hop routing. + # 1. Emissions for agent in self.agents.values(): - received = self._crisis_received_view(agent) - for at in agent.next_turn(turn, received): - self._route_emission(agent, turn, at, all_names) + for at in agent.try_emit(): + self._route_emission(agent, step, at, all_names) + emissions += 1 + progress = True - # (2) Gossip — each pair exchanges what they have until quiescent. - for _ in range(gossip_rounds_per_turn): - self.run_gossip_round() + # 2. Gossip + transfers = self.run_gossip_round() + if transfers: + gossip_transfers += sum(transfers.values()) + progress = True - self._crisis_turn_index += 1 + # 3. Alarm emissions + for agent in self.agents.values(): + for ac in agent.pending_alarm_claims(): + self._broadcast_alarm(agent, ac) + alarms_emitted += 1 + progress = True - return self.run_result + step += 1 + if not progress: + break - def _route_emission(self, sender: CrisisAgent, turn: int, at: AgentTurn, + return QuiescenceReport( + steps=step, + emissions=emissions, + gossip_transfers=gossip_transfers, + alarm_claims_emitted=alarms_emitted, + reached_quiescence=(step < max_steps), + ) + + def _route_emission(self, sender: CrisisAgent, step: int, at: AgentTurn, all_names: list[str]) -> None: """First-hop delivery + audit log entry. - Delivery rule (same as before — kept for byzantine equivocation): - - target_subset is None ⇒ broadcast (every agent including sender) - - target_subset is set ⇒ targeted; sender's own graph NOT auto-included + - target_subset=None ⇒ broadcast (every agent including sender) + - target_subset=set ⇒ targeted; sender's own graph NOT auto-included """ if at.target_subset is None: targets = list(all_names) else: targets = [t for t in at.target_subset if t in self.agents] - # The agent wraps the Claim using its own graph as the source of truth. message = sender.emit_claim(at.claim) - for tname in targets: self.agents[tname].receive(message) self.run_result.crisis_log.append( CrisisPhaseEntry( agent_name=sender.name, - turn=turn, + step=step, claim=at.claim, message_digest_hex=message.compute_digest().hex(), delivered_to=targets, ) ) + def _broadcast_alarm(self, sender: CrisisAgent, alarm_claim) -> None: + """Wrap an AlarmClaim into a Crisis Message and deliver to every + agent (including sender, so its own tally is consistent).""" + payload = alarm_claim.to_payload() + message = sender._build_message(payload) + for target in self.agents.values(): + target.receive(message) + def run_gossip_round(self) -> dict[tuple[str, str], int]: """One all-pairs gossip round. - For every ordered pair (sender, receiver), the sender shares everything - in its graph that the receiver doesn't yet have. Returns a dict mapping - (sender_name, receiver_name) -> number of newly-accepted vertices. - - Order matters mildly: if A -> B propagates new info to B that B then - re-emits to C, that's covered in this same round only if A appears - before B in the iteration. We loop until no progress to avoid edge - cases. In practice one ordered pass is usually enough. + For every ordered pair (sender, receiver), the sender shares + everything in its graph that the receiver doesn't yet have. """ names = list(self.agents.keys()) transfers: dict[tuple[str, str], int] = {} @@ -245,110 +280,14 @@ class Mothership: return transfers # ------------------------------------------------------------------ - # Decentralized alarm flow — orchestration only; the work is per-agent + # Decentralized alarm tally — convenience method # ------------------------------------------------------------------ - def emit_alarms_from_detectors(self, - *, accuse_self_ok: bool = False - ) -> dict[str, list]: - """Every agent independently runs `detect_mutations()` on its own - graph; any LocalAlarms it produces become AlarmClaims that the agent - emits into the gossip layer (broadcast). - - Returns a dict mapping agent_name -> list[LocalAlarm] (what each - agent independently found). Callers can use this for diagnostics - without ever having read into an agent's graph directly. - - The byzantine joiner will of course not emit alarms about itself. - If `accuse_self_ok` is False (the default), we additionally skip - any LocalAlarm whose `detector_process_id_hex` matches the - `accused_process_id_hex` — sanity guard against malformed cases. - """ - from crisis_agents.vote import AlarmClaim - - all_local: dict[str, list] = {} - for agent in self.agents.values(): - locals_for_agent = agent.detect_mutations() - if not accuse_self_ok: - locals_for_agent = [ - a for a in locals_for_agent - if a.detector_process_id_hex != a.accused_process_id_hex - ] - all_local[agent.name] = locals_for_agent - - for local in locals_for_agent: - alarm_claim = AlarmClaim.from_local_alarm( - local, detected_at_turn=self._crisis_turn_index, - ) - # Wrap the AlarmClaim's payload into a Crisis Message and - # broadcast it. We bypass the Claim/AgentTurn machinery - # because AlarmClaim is a different payload schema. - self._broadcast_alarm(agent, alarm_claim) - - return all_local - - def _broadcast_alarm(self, sender: CrisisAgent, alarm_claim) -> None: - """Wrap an AlarmClaim into a Crisis Message via the sender's - `emit_claim` machinery (re-using the digest-build + PoW path) - but with the AlarmClaim payload, and broadcast it to all peers - (including the sender so its own ratified set is consistent).""" - # We can't pass a non-Claim into emit_claim directly because emit_claim - # types its argument as Claim. Hack-around: build the Message manually - # using the same chain/cross-ref logic as emit_claim. Cleaner - # alternative is to refactor emit_claim to accept any payload-bytes - # producer; for now this duplication is minor. - import os - from crisis.message import Message, NONCE_LENGTH - from crisis.weight import ProofOfWorkWeight - - payload = alarm_claim.to_payload() - - digests_list: list[bytes] = [] - same_id = [v for v in sender.graph.all_vertices() if v.id == sender.process_id] - past_digests: set[bytes] = set() - if same_id: - referenced = set() - for v in same_id: - for d in v.digests: - ref = sender.graph.get_vertex(d) - if ref is not None and ref.id == sender.process_id: - referenced.add(d) - last = next( - (v for v in same_id if v.message_digest not in referenced), - same_id[-1], - ) - digests_list.append(last.message_digest) - past_digests = {v.message_digest for v in sender.graph.past(last)} - - seen_other_ids: set[bytes] = {sender.process_id} - for v in sender.graph.all_vertices(): - if v.id in seen_other_ids: - continue - if v.message_digest in past_digests: - continue - digests_list.append(v.message_digest) - seen_other_ids.add(v.id) - - if isinstance(sender.weight_system, ProofOfWorkWeight): - message = sender.weight_system.mine_nonce( - sender.process_id, tuple(digests_list), payload, - ) - else: - message = Message( - nonce=os.urandom(NONCE_LENGTH), - id=sender.process_id, - digests=tuple(digests_list), - payload=payload, - ) - - for target in self.agents.values(): - target.receive(message) - def ratified_alarms_from(self, agent_name: str): """Get the ratified alarms as seen from one agent's graph. - With sufficient gossip, every honest agent's graph produces the same - ratified set — and the test in test_no_chokepoint.py asserts that. + With sufficient gossip, every honest agent's graph produces the + same ratified set — asserted by `test_no_chokepoint.py`. """ from crisis_agents.vote import quorum_for, tally_alarms threshold = quorum_for(self.boundary.size()) @@ -360,17 +299,13 @@ class Mothership: # ------------------------------------------------------------------ def honest_agents(self) -> list[CrisisAgent]: - """The agents trusted at the start (closed-phase team) — i.e. every - agent except the boundary-opener. Use only as a demo aid; in a real - network the mothership doesn't reliably know who's honest.""" + """The closed-phase team (every agent except the boundary-opener).""" if not self.boundary.is_open: return list(self.agents.values()) - # The boundary-opener is added last via open_boundary(); peel it off. all_agents = list(self.agents.values()) return all_agents[:-1] def joiner(self) -> Optional[CrisisAgent]: - """The boundary-opener, if any.""" if not self.boundary.is_open: return None return list(self.agents.values())[-1] diff --git a/src/crisis_agents/scenarios/fact_check.py b/src/crisis_agents/scenarios/fact_check.py index 27fa0c8..f400d56 100644 --- a/src/crisis_agents/scenarios/fact_check.py +++ b/src/crisis_agents/scenarios/fact_check.py @@ -150,8 +150,6 @@ def build_byzantine_joiner() -> CrisisAgent: class Scenario: name: str description: str - closed_phase_turns: int - crisis_phase_turns: int honest_agents: list[CrisisAgent] byzantine_joiner: CrisisAgent reference_doc: str @@ -178,12 +176,11 @@ def build_fact_check_scenario(*, live: bool = False, "Three honest agents adjudicate six factual statements against " "a small reference doc. A fourth agent joins the team after the " "boundary opens and equivocates on statement s03. Crisis is " - "decentralized: every honest agent independently detects, emits " - "an AlarmClaim, and ratifies the alarm by quorum vote." + suffix + "decentralized AND asynchronous: every honest agent " + "independently detects, emits an AlarmClaim, and ratifies the " + "alarm by quorum vote. The mothership drives an event loop to " + "quiescence — no global clock." + suffix ), - closed_phase_turns=1, - # 2 Crisis turns: intro (turn 0) + equivocation (turn 1) - crisis_phase_turns=2, honest_agents=honest, byzantine_joiner=build_byzantine_joiner(), reference_doc=load_reference_doc(), diff --git a/src/crisis_agents/vote.py b/src/crisis_agents/vote.py index 32cdafd..f4bdfbf 100644 --- a/src/crisis_agents/vote.py +++ b/src/crisis_agents/vote.py @@ -45,11 +45,14 @@ class AlarmClaim: Serializes to JSON for the Crisis Message payload. Recognizable by `kind == "alarm"`, distinguishing it from a regular `Claim` payload (which has `kind` absent or != "alarm" by convention). + + `emitted_at_step` is the agent's local sequence number for ordering; + it is NOT a global clock tick — Crisis is asynchronous. """ accused_process_id_hex: str statement_id: str witness_digests: tuple[str, str] - detected_at_turn: int + emitted_at_step: int kind: str = ALARM_KIND def to_payload(self) -> bytes: @@ -64,16 +67,16 @@ class AlarmClaim: accused_process_id_hex=obj["accused_process_id_hex"], statement_id=obj["statement_id"], witness_digests=tuple(obj["witness_digests"]), # type: ignore[arg-type] - detected_at_turn=obj["detected_at_turn"], + emitted_at_step=obj["emitted_at_step"], ) @classmethod - def from_local_alarm(cls, alarm: LocalAlarm, detected_at_turn: int) -> "AlarmClaim": + def from_local_alarm(cls, alarm: LocalAlarm, emitted_at_step: int) -> "AlarmClaim": return cls( accused_process_id_hex=alarm.accused_process_id_hex, statement_id=alarm.statement_id, witness_digests=alarm.witness_digests, - detected_at_turn=detected_at_turn, + emitted_at_step=emitted_at_step, ) diff --git a/tests/test_alarm.py b/tests/test_alarm.py index 5c69fb0..d734bc2 100644 --- a/tests/test_alarm.py +++ b/tests/test_alarm.py @@ -37,7 +37,7 @@ def _post_gossip_team() -> Mothership: split_b={"b"}, ) m.open_boundary(byz) - m.run_crisis_phase(num_turns=2, gossip_rounds_per_turn=1) + m.run_until_quiescent() return m @@ -49,7 +49,7 @@ class TestDecentralizedDetection: m.add_agent(MockAgent("b", [[]])) joiner = MockByzantineAgent("d", _intro(), [], set(), set()) m.open_boundary(joiner) - m.run_crisis_phase(num_turns=1, gossip_rounds_per_turn=1) + m.run_until_quiescent() # Every agent's own detection returns empty for agent in m.agents.values(): diff --git a/tests/test_async_quiescence.py b/tests/test_async_quiescence.py new file mode 100644 index 0000000..704118e --- /dev/null +++ b/tests/test_async_quiescence.py @@ -0,0 +1,114 @@ +"""Async-quiescence properties — the new tests that protect the no-clock invariant. + +If you accidentally bake a synchronous tick back into the driver, one of these +tests should fail. +""" + +from crisis_agents.agent import MockAgent, MockByzantineAgent +from crisis_agents.claim import Claim +from crisis_agents.mothership import Mothership +from crisis_agents.vote import quorum_for + + +def _claim(sid: str, verdict: str = "true", evidence: str = "ok") -> Claim: + return Claim(statement_id=sid, verdict=verdict, confidence=0.9, # type: ignore[arg-type] + evidence=evidence, timestamp_logical=0) + + +def _intro(name: str = "delta") -> Claim: + return Claim(statement_id=f"intro:{name}", verdict="unknown", confidence=1.0, + evidence=f"{name} joining the team", timestamp_logical=0) + + +def _build_fresh_team() -> Mothership: + m = Mothership() + m.add_agent(MockAgent("a", [[]])) + m.add_agent(MockAgent("b", [[]])) + m.add_agent(MockAgent("c", [[]])) + byz = MockByzantineAgent( + "d", _intro(), + scripted_pairs=[( + _claim("s03", verdict="true", evidence="to_ac"), + _claim("s03", verdict="false", evidence="to_b"), + )], + split_a={"a", "c"}, + split_b={"b"}, + ) + m.open_boundary(byz) + return m + + +class TestAsyncQuiescence: + + def test_run_until_quiescent_terminates(self): + """The loop must terminate. If it doesn't, there's a logic bug + in the quiescence detection.""" + m = _build_fresh_team() + report = m.run_until_quiescent(max_steps=200) + assert report.reached_quiescence + assert report.steps < 200 + + def test_two_runs_produce_identical_final_state(self): + """Running the same scenario twice must produce the same ratified-set, + confirming there's no hidden non-deterministic ordering in the loop. + """ + m1 = _build_fresh_team() + m1.run_until_quiescent() + + m2 = _build_fresh_team() + m2.run_until_quiescent() + + for name in ("a", "b", "c"): + assert m1.ratified_alarms_from(name) == m2.ratified_alarms_from(name) + + def test_max_steps_bound_caps_runtime(self): + """If we set max_steps to 1, the loop must exit even though + quiescence wasn't reached. The QuiescenceReport must accurately + say so.""" + m = _build_fresh_team() + report = m.run_until_quiescent(max_steps=1) + # With one step we won't have propagated alarms through gossip + assert report.steps == 1 + # reached_quiescence might be False because we capped out + # (the byzantine has more emissions pending) + # The important property: the loop exited and reported honestly. + assert isinstance(report.reached_quiescence, bool) + + def test_no_turn_argument_exposed_to_agents(self): + """Regression guard: CrisisAgent.try_emit() takes no arguments. + If anyone re-adds a `turn` parameter, this fails at the type-check + level when MockAgent.try_emit is called.""" + import inspect + from crisis_agents.agent import CrisisAgent + sig = inspect.signature(CrisisAgent.try_emit) + # self plus no other parameters + params = list(sig.parameters) + assert params == ["self"], f"try_emit grew arguments: {params}" + + def test_no_turn_field_on_alarmclaim(self): + """Regression guard: AlarmClaim no longer has a `detected_at_turn` + field. It has `emitted_at_step` — a sequence number, not a clock tick.""" + from crisis_agents.vote import AlarmClaim + fields = AlarmClaim.__dataclass_fields__ + assert "detected_at_turn" not in fields + assert "emitted_at_step" in fields + + def test_alarms_propagate_through_async_loop_alone(self): + """The async loop should detect, emit alarms, and ratify — all without + the caller having to invoke separate emit_alarms_from_detectors() or + run_gossip_round() steps. + """ + m = _build_fresh_team() + m.run_until_quiescent() + threshold = quorum_for(m.boundary.size()) + for name in ("a", "b", "c"): + ratified = m.ratified_alarms_from(name) + assert len(ratified) == 1 + r = ratified[0] + assert r.signer_count >= threshold + + def test_quiescence_report_counts_match_logs(self): + """Sanity: the report's emission count must equal the crisis log length.""" + m = _build_fresh_team() + report = m.run_until_quiescent() + assert report.emissions == len(m.run_result.crisis_log) diff --git a/tests/test_demo_fact_check.py b/tests/test_demo_fact_check.py index aefdc9e..1e84b08 100644 --- a/tests/test_demo_fact_check.py +++ b/tests/test_demo_fact_check.py @@ -22,7 +22,6 @@ class TestFactCheckEndToEnd: assert s.name == "fact_check" assert len(s.honest_agents) == 3 assert s.byzantine_joiner.name == "agent_delta" - assert s.crisis_phase_turns == 2 # intro + equivocation assert "Pluto" in s.reference_doc def test_runs_through_all_phases(self): @@ -30,14 +29,13 @@ class TestFactCheckEndToEnd: m = Mothership() for a in s.honest_agents: m.add_agent(a) - m.run_closed_phase(num_turns=s.closed_phase_turns) + m.run_closed_phase() m.open_boundary(s.byzantine_joiner) - m.run_crisis_phase(num_turns=s.crisis_phase_turns, - gossip_rounds_per_turn=1) - m.emit_alarms_from_detectors() - m.run_gossip_round() + # One async run, no clock — alarms emit and propagate inside the loop + report = m.run_until_quiescent() + assert report.reached_quiescence + assert report.alarm_claims_emitted >= 3 - # Every honest agent ratifies the same single alarm. threshold = quorum_for(m.boundary.size()) ratified_sets = [ m.ratified_alarms_from(name) @@ -55,12 +53,9 @@ class TestFactCheckEndToEnd: m = Mothership() for a in s.honest_agents: m.add_agent(a) - m.run_closed_phase(num_turns=s.closed_phase_turns) + m.run_closed_phase() m.open_boundary(s.byzantine_joiner) - m.run_crisis_phase(num_turns=s.crisis_phase_turns, - gossip_rounds_per_turn=1) - m.emit_alarms_from_detectors() - m.run_gossip_round() + m.run_until_quiescent() r = m.ratified_alarms_from("agent_alpha")[0] proof = build_proof(r) diff --git a/tests/test_live_agent.py b/tests/test_live_agent.py index b43298b..2dd92f9 100644 --- a/tests/test_live_agent.py +++ b/tests/test_live_agent.py @@ -66,7 +66,7 @@ class TestLiveClaudeAgent: "agent_alpha", reference_doc=_REF, statements=_STATEMENTS, client=client, ) - turns = agent.next_turn(turn=0, received_claims=[]) + turns = agent.try_emit() assert len(turns) == 2 assert {t.claim.statement_id for t in turns} == {"s01", "s02"} verdicts = {t.claim.statement_id: t.claim.verdict for t in turns} @@ -84,7 +84,7 @@ class TestLiveClaudeAgent: "agent_alpha", reference_doc=_REF, statements=_STATEMENTS, client=client, ) - turns = agent.next_turn(turn=0, received_claims=[]) + turns = agent.try_emit() assert len(turns) == 1 assert turns[0].claim.statement_id == "s01" @@ -94,7 +94,7 @@ class TestLiveClaudeAgent: "agent_alpha", reference_doc=_REF, statements=_STATEMENTS, client=client, ) - turns = agent.next_turn(turn=0, received_claims=[]) + turns = agent.try_emit() assert turns == [] def test_skips_invalid_claim_objects_in_response(self): @@ -108,7 +108,7 @@ class TestLiveClaudeAgent: "agent_alpha", reference_doc=_REF, statements=_STATEMENTS, client=client, ) - turns = agent.next_turn(turn=0, received_claims=[]) + turns = agent.try_emit() # Only the first item passes validation: bogus verdict and non-dict get skipped. assert len(turns) == 1 assert turns[0].claim.statement_id == "s01" @@ -122,11 +122,11 @@ class TestLiveClaudeAgent: statements=_STATEMENTS, client=client, ) # First call adjudicates s01 - first = agent.next_turn(turn=0, received_claims=[]) + first = agent.try_emit() assert {t.claim.statement_id for t in first} == {"s01"} # Second call should only ask about s02 (s01 is already done) - second = agent.next_turn(turn=1, received_claims=[]) + second = agent.try_emit() assert {t.claim.statement_id for t in second} == {"s02"} # The prompt sent for the second call should NOT mention s01 @@ -151,6 +151,6 @@ class TestLiveClaudeAgent: "agent_alpha", reference_doc=_REF, statements=_STATEMENTS, client=client, ) - turns = agent.next_turn(turn=0, received_claims=[]) + turns = agent.try_emit() assert len(turns) == 1 assert len(turns[0].claim.evidence) == Claim.EVIDENCE_MAX_LEN diff --git a/tests/test_mothership.py b/tests/test_mothership.py index 11f6d48..a350a99 100644 --- a/tests/test_mothership.py +++ b/tests/test_mothership.py @@ -25,10 +25,13 @@ class TestClosedPhase: m = Mothership() m.add_agent(MockAgent("a", [[_claim("s01")]])) m.add_agent(MockAgent("b", [[_claim("s01")]])) - result = m.run_closed_phase(num_turns=1) + report = m.run_closed_phase() # Two agents emitted one claim each via the closed-phase log - assert len(result.closed_log) == 2 + assert len(m.run_result.closed_log) == 2 + # The async loop reached quiescence within the step budget + assert report.reached_quiescence + assert report.emissions == 2 # No Crisis messages sent yet, so per-agent graphs are still empty for agent in m.agents.values(): @@ -69,20 +72,26 @@ class TestCrisisPhaseAgentOwnership: # Joiner with a single broadcast intro, no equivocation script joiner = MockByzantineAgent("d", _intro(), [], set(), set()) m.open_boundary(joiner) - m.run_crisis_phase(num_turns=1, gossip_rounds_per_turn=0) + m.run_until_quiescent() for name, agent in m.agents.items(): assert agent.graph.vertex_count() == 1, ( f"agent {name!r} should have received the intro broadcast" ) - def test_targeted_emission_skips_non_targets(self): - """A target_subset emission only reaches its named peers.""" + def test_targeted_emission_seeds_disjoint_views(self): + """After the async loop with gossip, every honest agent sees both + variants — but the byzantine itself never has both in its own graph + (it never re-receives its own targeted emissions, and gossip from + honest peers may or may not feed them back). + + The protocol-level invariant: the byzantine's two contradictory + vertices end up reachable to every honest agent. THAT is what + decentralized detection depends on. + """ m = Mothership() m.add_agent(MockAgent("a", [[]])) m.add_agent(MockAgent("b", [[]])) - # Byzantine: emits intro to everyone (turn 0), then equivocation - # to {a} vs {b} (turn 1). byz = MockByzantineAgent( "d", _intro(), scripted_pairs=[( @@ -93,19 +102,18 @@ class TestCrisisPhaseAgentOwnership: split_b={"b"}, ) m.open_boundary(byz) - m.run_crisis_phase(num_turns=2, gossip_rounds_per_turn=0) + m.run_until_quiescent() - # a has: intro + variant-true; b has: intro + variant-false; d has: intro - graphs = {n: a.graph for n, a in m.agents.items()} - assert graphs["a"].vertex_count() == 2 - assert graphs["b"].vertex_count() == 2 - assert graphs["d"].vertex_count() == 1 # targeted emissions skip sender - - # The variant payloads are distinct between a and b - a_payloads = [v.payload for v in graphs["a"].all_vertices()] - b_payloads = [v.payload for v in graphs["b"].all_vertices()] - assert any(b'"verdict":"true"' in p for p in a_payloads) - assert any(b'"verdict":"false"' in p for p in b_payloads) + # Every honest agent's graph has both variants of the equivocation + # (the post-condition that lets decentralized detection work). + for name in ("a", "b"): + payloads = [v.payload for v in m.agents[name].graph.all_vertices()] + assert any(b'"verdict":"true"' in p for p in payloads), ( + f"agent {name!r} missing the true-variant" + ) + assert any(b'"verdict":"false"' in p for p in payloads), ( + f"agent {name!r} missing the false-variant" + ) class TestGossipRound: @@ -128,7 +136,7 @@ class TestGossipRound: ) m.open_boundary(byz) # Two turns (intro + equivocation), then gossip - m.run_crisis_phase(num_turns=2, gossip_rounds_per_turn=1) + m.run_until_quiescent() # After gossip, every honest agent should have both byzantine variants # (intro + 2 equivocations = 3 vertices minimum). The byzantine itself @@ -155,4 +163,4 @@ class TestGossipRound: m = Mothership() m.add_agent(MockAgent("a", [[_claim("s01")]])) with pytest.raises(RuntimeError, match="boundary not yet open"): - m.run_crisis_phase(num_turns=1) + m.run_until_quiescent() diff --git a/tests/test_no_chokepoint.py b/tests/test_no_chokepoint.py index 9fe861a..b0ab9a7 100644 --- a/tests/test_no_chokepoint.py +++ b/tests/test_no_chokepoint.py @@ -35,9 +35,7 @@ def test_all_honest_agents_agree_on_ratified_alarms(): split_a={"a", "c"}, split_b={"b"}, )) - m.run_crisis_phase(num_turns=2, gossip_rounds_per_turn=1) - m.emit_alarms_from_detectors() - m.run_gossip_round() + m.run_until_quiescent() # The headline assertion: three independent vantage points; same result. ratified_per_agent = { @@ -73,9 +71,7 @@ def test_byzantine_alone_cannot_ratify(): m.add_agent(MockAgent("c", [[]])) # No equivocation script — boundary opens cleanly. m.open_boundary(MockByzantineAgent("d", _intro(), [], set(), set())) - m.run_crisis_phase(num_turns=1, gossip_rounds_per_turn=1) - m.emit_alarms_from_detectors() - m.run_gossip_round() + m.run_until_quiescent() # No honest agent should have ratified anything. for name in ("a", "b", "c"): diff --git a/tests/test_vote.py b/tests/test_vote.py index 19df510..0b455ac 100644 --- a/tests/test_vote.py +++ b/tests/test_vote.py @@ -42,11 +42,9 @@ def _full_run() -> Mothership: split_b={"b"}, ) m.open_boundary(byz) - m.run_crisis_phase(num_turns=2, gossip_rounds_per_turn=1) + m.run_until_quiescent() # Honest agents emit AlarmClaims based on what they observed. - m.emit_alarms_from_detectors() # One more gossip round so every honest agent sees all AlarmClaims. - m.run_gossip_round() return m @@ -69,7 +67,7 @@ class TestAlarmClaimRoundtrip: accused_process_id_hex="76468f93", statement_id="s03", witness_digests=("aaaa", "bbbb"), - detected_at_turn=1, + emitted_at_step=1, ) roundtrip = AlarmClaim.from_payload(ac.to_payload()) assert roundtrip == ac @@ -82,11 +80,11 @@ class TestAlarmClaimRoundtrip: statement_id="s03", witness_digests=("aa", "bb"), ) - ac = AlarmClaim.from_local_alarm(la, detected_at_turn=5) + ac = AlarmClaim.from_local_alarm(la, emitted_at_step=5) assert ac.accused_process_id_hex == "22" assert ac.statement_id == "s03" assert ac.witness_digests == ("aa", "bb") - assert ac.detected_at_turn == 5 + assert ac.emitted_at_step == 5 def test_rejects_non_alarm_payload(self): regular_claim = Claim(