crisis_agents: drop the wall-clock, drive asynchronously to quiescence

The previous driver imposed a synchronous turn-counted clock that the
Crisis paper explicitly forbids — Crisis is supposed to work in
asynchronous P2P networks, with any synchronicity being virtual and
derived inside the consensus algorithm from the DAG structure, not
imposed externally by a coordinator. This commit removes the wall clock.

What changed in the engine:

  - `Mothership.run_crisis_phase(num_turns, gossip_rounds_per_turn)`
    is replaced by `run_until_quiescent(max_steps=200)`. The loop
    interleaves three concerns on each iteration — emissions, gossip,
    and alarm emissions — until none make progress. Termination is by
    quiescence, not by a fixed turn count. `max_steps` is a safety
    bound (loop-iteration cap), not an exposed clock.

  - `Mothership.run_closed_phase(num_turns)` becomes
    `run_closed_phase(max_steps=50)`. Same quiescence model — the
    closed-phase conversation runs until no agent has more to say.

  - Agents grew `pending_alarm_claims()`: each agent checks its own
    graph for un-alarmed mutations and produces AlarmClaims directly.
    The driver loop calls this every iteration, so alarms emit and
    propagate in the same loop as regular emissions and gossip — no
    separate "alarm phase."

  - `Mothership.emit_alarms_from_detectors()` and the explicit
    `run_gossip_round()` step are no longer needed by callers; both
    are subsumed by the async loop. `run_gossip_round()` stays as a
    helper but tests no longer call it externally.

What changed in the agent interface:

  - `CrisisAgent.next_turn(turn, received_claims)` becomes
    `try_emit()` — no arguments. Agents in an async network don't see
    a global tick. They decide based on their own internal state.

  - `CrisisAgent.observe(claim)` is the new optional callback the
    closed-phase loop uses to feed context into agents that care
    (overridden by LiveClaudeAgent to populate its prompt buffer).

  - `pending_alarm_claims()` is idempotent: an internal
    `_already_alarmed` set tracks claims this agent has emitted, so
    the loop calls it every step without flooding the network with
    duplicate alarms.

What changed in the dataclass schema:

  - `AlarmClaim.detected_at_turn` -> `emitted_at_step`. The word
    "turn" implies a global clock; "step" is a per-agent sequence
    number used only for log ordering — local, not networked.

  - `ClosedPhaseEntry.turn` and `CrisisPhaseEntry.turn` -> `step`.
    Same rename, same reasoning.

  - `Scenario.closed_phase_turns` and `Scenario.crisis_phase_turns`
    are gone. The scenario no longer prescribes how many turns; it
    just provides agents and lets the async loop run them out.

What changed in the CLI:

  - Phase 3 reports "drove to quiescence in N step(s)" with a
    breakdown of regular emissions / gossip transfers / alarm
    emissions, instead of "ran N turns".

  - `QuiescenceReport` (new dataclass) carries the run statistics
    back from `run_until_quiescent`/`run_closed_phase` — steps taken,
    emissions made, gossip transfers, alarm claims emitted, plus
    whether termination was via quiescence or max-step cap.

New regression tests (`test_async_quiescence.py`):

  - `test_run_until_quiescent_terminates`: the loop must exit.
  - `test_two_runs_produce_identical_final_state`: determinism check —
    if anything in the loop depended on real wall time, this would
    fail.
  - `test_max_steps_bound_caps_runtime`: setting max_steps=1 exits
    immediately and `QuiescenceReport.reached_quiescence` reflects
    reality.
  - `test_no_turn_argument_exposed_to_agents`: introspects
    `CrisisAgent.try_emit` signature; fails if anyone re-adds a
    `turn` parameter.
  - `test_no_turn_field_on_alarmclaim`: introspects the dataclass
    fields; fails if `detected_at_turn` reappears.
  - `test_alarms_propagate_through_async_loop_alone`: the loop alone
    (no manual emit_alarms / run_gossip_round) ratifies an alarm.
  - `test_quiescence_report_counts_match_logs`: sanity check that
    the report's emission count equals the crisis log length.

Suite: 163 -> 170 tests, all green in 0.79s.

Behavioral end-state is identical to the previous (synchronous)
version: same fact-check scenario, same byzantine equivocation, same
proof JSON shape, same three signers, same quorum-met outcome. The
difference is structural: the protocol now matches the paper's async
shape, and a future port to actual TCP gossip + concurrent agents
needs no change to this engine.

CrisisViz: still untouched. The `crisis_data.json` pipeline that
drives the visualizer is orthogonal.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
saymrwulf 2026-05-14 22:06:56 +02:00
parent a1064660d5
commit 0976239ebd
13 changed files with 509 additions and 401 deletions

View file

@ -1,19 +1,19 @@
"""
CrisisAgent a first-class network participant.
CrisisAgent a first-class network participant in an asynchronous network.
Each agent owns:
- a stable 32-byte process_id (derived from its name)
- its own LamportGraph (the agent's view of the network)
- its own weight system (shared across the network for compatibility)
- the means to wrap Claims into Crisis Messages and extend its own graph
- decision logic: `try_emit()` is asked "do you have something to say?",
`pending_alarm_claims()` is asked "do you currently observe an
un-alarmed equivocation?"
Crucially the agent is **NOT a passive script driven by the mothership**. The
mothership coordinates the clock and the bootstrap; the agent does the work.
This is the change from the centralized version: previously the mothership
held a dict of all agents' graphs and called `_wrap_as_message` against each.
Now `emit_claim` lives on the agent. The mothership routes the resulting
message to its delivery targets, but it never reads the graph.
There is no global clock. Agents don't see a "turn number" because there
isn't one — any synchronicity in the network is virtual, derived from the
DAG structure by the consensus algorithm itself (not by the driver loop).
The mothership/driver just cycles asking each agent for any pending
content until the network is quiescent.
"""
from __future__ import annotations
@ -38,27 +38,29 @@ def agent_id_from_name(name: str) -> bytes:
@dataclass
class AgentTurn:
"""One emission from an agent in a given turn.
"""One emission from an agent.
The word "turn" here is vestigial it means "this single emission
event," not "a tick of a global clock." Kept because renaming the type
causes more churn than it's worth.
Attributes:
claim: The Claim being emitted.
target_subset: None means broadcast (every agent including sender
receives it via the mothership's initial routing).
A set of peer names means initial delivery is limited
to those peers the byzantine equivocation building
block. Subsequent gossip rounds may propagate it to
other peers anyway.
target_subset: None means broadcast to every peer including
sender. A set of peer names means initial delivery
is limited to those peers (the byzantine
equivocation building block).
"""
claim: Claim
target_subset: Optional[set[str]] = None
class CrisisAgent(ABC):
"""A network participant with its own graph and its own brain.
"""An asynchronous network participant.
Concrete subclasses implement `next_turn` to decide what to say. The
base class handles emit/receive/gossip mechanics so every agent mock
or live uses the same Crisis-protocol machinery underneath.
Concrete subclasses implement `try_emit` to decide what to say. The
base class handles emit/receive/gossip/detect mechanics uniformly so
every agent mock or live uses the same Crisis substrate.
"""
def __init__(self, name: str, *, weight_system: Optional[WeightSystem] = None):
@ -68,16 +70,38 @@ class CrisisAgent(ABC):
self.process_id: bytes = agent_id_from_name(name)
self.weight_system: WeightSystem = weight_system or ProofOfWorkWeight(min_leading_zeros=0)
self.graph: LamportGraph = LamportGraph(weight_system=self.weight_system)
# Track alarms we've already emitted so pending_alarm_claims doesn't
# repeat. Keyed by (accused, statement_id, sorted-witness-pair).
self._already_alarmed: set[tuple[str, str, tuple[str, str]]] = set()
# ------------------------------------------------------------------
# Decision-making — to be implemented by subclasses
# Decision-making — implemented by subclasses
# ------------------------------------------------------------------
@abstractmethod
def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]:
"""Produce this agent's emissions for the given turn."""
def try_emit(self) -> list[AgentTurn]:
"""Return any emissions the agent is ready to make right now.
The agent decides based on its own internal state. The driver loop
asks this repeatedly until the agent returns nothing. There is no
turn argument agents in an async network don't see a global tick.
"""
...
def observe(self, claim: Claim) -> None:
"""Optional callback for pre-Crisis context.
Used by the closed-phase loop: when one agent emits a claim, every
other agent's `observe(claim)` is called so they can incorporate
the conversation history into their own state. Default is no-op;
subclasses like LiveClaudeAgent override to maintain a context
buffer for their LLM prompt.
In the Crisis phase this is NOT called agents introspect their
own LamportGraph for context. The closed phase has no graph yet.
"""
pass
# ------------------------------------------------------------------
# Crisis-protocol mechanics — uniform across all agents
# ------------------------------------------------------------------
@ -86,17 +110,20 @@ class CrisisAgent(ABC):
"""Wrap a Claim into a fully-valid Crisis Message built FROM this
agent's own graph state.
The agent does NOT extend its own graph here the mothership decides
whether the sender receives a copy (broadcast: yes; targeted: no, to
enable byzantine equivocation without immediately failing the chain
constraint in the sender's own graph).
The agent does NOT extend its own graph here the routing layer
decides whether the sender's own graph receives a copy.
"""
payload = claim.to_payload()
return self._build_message(claim.to_payload())
def _build_message(self, payload: bytes) -> Message:
"""Build a Crisis Message with arbitrary payload bytes.
Used by both `emit_claim` (regular Claims) and the alarm-emission
path (AlarmClaim payloads).
"""
digests_list: list[bytes] = []
# Step 1: chain link — if there's any same-id vertex in MY graph, the
# new message must reference one of them.
# Chain link
same_id = [v for v in self.graph.all_vertices() if v.id == self.process_id]
past_digests: set[bytes] = set()
if same_id:
@ -113,7 +140,7 @@ class CrisisAgent(ABC):
digests_list.append(last.message_digest)
past_digests = {v.message_digest for v in self.graph.past(last)}
# Step 2: cross-references — one most-recent vertex per other id.
# Cross-references
seen_other_ids: set[bytes] = {self.process_id}
for v in self.graph.all_vertices():
if v.id in seen_other_ids:
@ -123,10 +150,10 @@ class CrisisAgent(ABC):
digests_list.append(v.message_digest)
seen_other_ids.add(v.id)
# Step 3: mine a valid PoW nonce.
# Mine PoW
if isinstance(self.weight_system, ProofOfWorkWeight):
return self.weight_system.mine_nonce(
self.process_id, tuple(digests_list), payload
self.process_id, tuple(digests_list), payload,
)
return Message(
nonce=os.urandom(NONCE_LENGTH),
@ -136,36 +163,17 @@ class CrisisAgent(ABC):
)
def receive(self, message: Message) -> Optional[Vertex]:
"""Extend my graph with the given message if integrity holds.
Returns the resulting Vertex on success, None if the integrity check
rejects it (duplicate, missing references, broken chain). Receiving
is idempotent: extending with a message whose digest is already in
the graph is a silent no-op (returns None).
"""
"""Extend my graph with the given message if integrity holds."""
if message.compute_digest() in self.graph:
return None
return self.graph.extend(message)
def detect_mutations(self):
"""Scan MY graph for byzantine equivocation. Returns a list of
LocalAlarms (defined in alarm.py). Imported lazily to avoid a
cyclic import at module load time.
"""
from crisis_agents.alarm import detect_mutations_in_graph
return detect_mutations_in_graph(self.graph, self.name, self.process_id)
def gossip_to(self, peer: "CrisisAgent") -> int:
"""Share my vertices with `peer`. Returns the count newly accepted.
"""Share my vertices with `peer`. Returns count newly accepted.
Iterates until no progress: a message can only be accepted after all
its referenced digests already exist in the peer's graph, so this is
a multi-pass extend (Algorithm 4 in the paper, in-process flavor).
Honest gossip: the sender doesn't pick what to share — it shares
everything it has, and the peer's integrity check filters. A byzantine
could selectively gossip, but that's modeled at emit time, not gossip
time; we don't expose a "skip this vertex" hook here.
Iterates until no progress: a message is only accepted after all
its referenced digests already exist in the peer's graph
(Algorithm 4 in the paper, in-process flavor).
"""
accepted = 0
progress = True
@ -179,6 +187,49 @@ class CrisisAgent(ABC):
progress = True
return accepted
def detect_mutations(self):
"""Scan MY graph for same-id spacelike vertex pairs.
Returns a list of LocalAlarms. Imported lazily to avoid a cyclic
import at module load time.
"""
from crisis_agents.alarm import detect_mutations_in_graph
return detect_mutations_in_graph(self.graph, self.name, self.process_id)
def pending_alarm_claims(self) -> list:
"""Run detection and produce AlarmClaim payloads for any newly
observed equivocations.
An "already alarmed" set tracks claims this agent has already
emitted, so calling this repeatedly is idempotent the driver
loop can call it until quiescence without flooding the network
with duplicate AlarmClaims.
Returns a list of AlarmClaim instances (defined in vote.py) that
the driver should broadcast on the agent's behalf.
"""
from crisis_agents.vote import AlarmClaim
local_alarms = self.detect_mutations()
new_claims: list = []
for la in local_alarms:
# Canonical key for dedup
key = (la.accused_process_id_hex, la.statement_id, la.witness_digests)
if key in self._already_alarmed:
continue
# detected_at_step is the agent's local sequence number — we
# don't have a meaningful global step, so we use the count of
# alarms already raised by this agent as a stable ordinal.
ac = AlarmClaim(
accused_process_id_hex=la.accused_process_id_hex,
statement_id=la.statement_id,
witness_digests=la.witness_digests,
emitted_at_step=len(self._already_alarmed),
)
new_claims.append(ac)
self._already_alarmed.add(key)
return new_claims
def __repr__(self) -> str:
return f"{type(self).__name__}(name={self.name!r}, id={self.process_id.hex()[:8]}...)"
@ -191,13 +242,10 @@ class CrisisAgent(ABC):
class MockAgent(CrisisAgent):
"""An agent that emits a predetermined sequence of claims.
`scripted_claims[N]` is the list of Claims this agent emits on its Nth
`next_turn()` invocation. The agent maintains its own invocation counter
independent of the mothership's turn counter, so it can be reused across
closed-phase and Crisis-phase calls without restarting.
All emissions are broadcast (no equivocation). For equivocation, use
`MockByzantineAgent`.
`scripted_claims[N]` is the list of Claims emitted on the agent's Nth
`try_emit()` invocation. After the script is exhausted, the agent
emits nothing forever the driver loop's quiescence check terminates
naturally.
"""
def __init__(self, name: str, scripted_claims: list[list[Claim]],
@ -206,7 +254,7 @@ class MockAgent(CrisisAgent):
self._script = scripted_claims
self._invocations = 0
def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]:
def try_emit(self) -> list[AgentTurn]:
idx = self._invocations
self._invocations += 1
if idx >= len(self._script):
@ -217,19 +265,16 @@ class MockAgent(CrisisAgent):
class MockByzantineAgent(CrisisAgent):
"""An agent designed to equivocate.
Lifecycle:
- Invocation 0: emit a broadcast `intro_claim` (a benign "I've joined"
message). This is **necessary** for the equivocation step: both
variants of the equivocating claim will chain to this intro, so they
can propagate through the gossip layer (otherwise the chain constraint
in `Message.message_integrity` step 6 would reject the second variant
in any graph that already holds the first).
- Invocations 1..N: emit pairs of contradictory claims, with the first
variant targeted at `split_a` and the second at `split_b`. Both
variants in a pair carry the same `statement_id` but contradict on
`verdict`.
On its first `try_emit()` invocation it broadcasts an `intro_claim` so
every honest agent has a same-id vertex to chain the equivocation off.
On subsequent invocations it emits pairs of contradictory claims from
`scripted_pairs`, with the first variant targeted at `split_a` and the
second at `split_b`.
Set `scripted_pairs` empty to test "byzantine joined but didn't equivocate".
Byzantines never emit AlarmClaims about other agents there's a
subclass override of `pending_alarm_claims` that returns empty. (A
more advanced byzantine could emit FALSE AlarmClaims to test the
quorum-vote isolation; not in scope for this PoC.)
"""
def __init__(self, name: str, intro_claim: Claim,
@ -245,12 +290,10 @@ class MockByzantineAgent(CrisisAgent):
self._split_b = split_b
self._invocations = 0
def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]:
def try_emit(self) -> list[AgentTurn]:
idx = self._invocations
self._invocations += 1
if idx == 0:
# The introduction turn: a single broadcast so all peers learn my
# identity and have a same-id vertex to chain my equivocations to.
return [AgentTurn(claim=self._intro, target_subset=None)]
pair_idx = idx - 1
if pair_idx >= len(self._script):
@ -260,3 +303,7 @@ class MockByzantineAgent(CrisisAgent):
if claim_b is not None:
out.append(AgentTurn(claim=claim_b, target_subset=set(self._split_b)))
return out
def pending_alarm_claims(self) -> list:
"""Byzantines don't emit alarms in our threat model."""
return []

View file

@ -2,15 +2,15 @@
crisis-agents command-line entry point.
Subcommands:
demo Run a scripted scenario end-to-end. Walks the four phases:
closed team boundary opens Crisis-active rounds + gossip
decentralized detection + alarm voting proof emission.
demo Run a scripted scenario end-to-end. The Crisis phase runs an
asynchronous event loop to quiescence no global clock, no
fixed turn count.
verify Re-check a proof JSON for self-consistency.
Examples:
crisis-agents demo --scenario fact_check
crisis-agents demo --scenario fact_check --live
crisis-agents verify proof_agent_delta_s03.json
crisis-agents verify proof_<accused>_s03.json
"""
from __future__ import annotations
@ -53,12 +53,13 @@ def _run_demo(args: argparse.Namespace) -> int:
mothership.add_agent(agent)
# ---- Phase 1: closed team, no Crisis ----
print(f"--- Phase 1: closed team, no Crisis ({scenario.closed_phase_turns} turn(s)) ---")
mothership.run_closed_phase(num_turns=scenario.closed_phase_turns)
print("--- Phase 1: closed team, no Crisis ---")
closed_report = mothership.run_closed_phase()
honest_names = [a.name for a in mothership.agents.values()]
print(
f" {len(mothership.run_result.closed_log)} claims from "
f"{len(honest_names)} honest agent(s): {', '.join(honest_names)}"
f" driven to quiescence in {closed_report.steps} step(s); "
f"{closed_report.emissions} claims from "
f"{len(honest_names)} honest agent(s)."
)
print(f" Per-agent graphs: not yet allocated (Crisis is dormant).\n")
@ -66,47 +67,42 @@ def _run_demo(args: argparse.Namespace) -> int:
print(f"--- Phase 2: boundary opens — {scenario.byzantine_joiner.name} joins ---")
mothership.open_boundary(scenario.byzantine_joiner)
print(f" Trust set is now {mothership.boundary.size()} agents.")
print(f" Crisis is now ACTIVE for every subsequent emission.\n")
print(f" Crisis is now ACTIVE — agents emit asynchronously.\n")
# ---- Phase 3: Crisis-active rounds (emission + gossip) ----
print(f"--- Phase 3: emission + gossip "
f"({scenario.crisis_phase_turns} turn(s)) ---")
mothership.run_crisis_phase(
num_turns=scenario.crisis_phase_turns,
gossip_rounds_per_turn=1,
# ---- Phase 3: async event loop to quiescence ----
print("--- Phase 3: asynchronous event loop (no clock) ---")
report = mothership.run_until_quiescent()
print(
f" drove to quiescence in {report.steps} step(s):\n"
f" {report.emissions:3d} regular emissions\n"
f" {report.gossip_transfers:3d} gossip transfers\n"
f" {report.alarm_claims_emitted:3d} alarm claims emitted"
)
crisis_log = mothership.run_result.crisis_log
print(f" {len(crisis_log)} Crisis messages emitted.")
print(f" After gossip:")
print(f" After convergence:")
for name, agent in mothership.agents.items():
print(f" {name:14s} graph: {agent.graph.vertex_count():2d} vertices")
print()
# ---- Phase 4: each agent independently detects ----
# ---- Phase 4: each agent's own detection result ----
print("--- Phase 4: decentralized detection (each agent's own brain) ---")
local_alarms = {}
detected_by = []
for name, agent in mothership.agents.items():
alarms = agent.detect_mutations()
local_alarms[name] = alarms
marker = "ALARM" if alarms else "ok "
suffix = ""
if alarms:
detected_by.append(name)
suffix = (f" — accuses {alarms[0].accused_process_id_hex[:16]}... "
f"on {alarms[0].statement_id}")
print(f" [{marker}] {name:14s}{suffix}")
detector_count = sum(1 for a in local_alarms.values() if a)
print(f" {detector_count} of {len(mothership.agents)} agents independently "
f"detected byzantine behavior.\n")
print(f" {len(detected_by)} of {len(mothership.agents)} agents detected "
f"byzantine behavior independently.\n")
# ---- Phase 5: alarm emission + quorum voting ----
print("--- Phase 5: alarms emitted + gossiped + ratified by quorum ---")
mothership.emit_alarms_from_detectors()
mothership.run_gossip_round()
# ---- Phase 5: quorum tally ----
print("--- Phase 5: ratification by quorum ---")
threshold = quorum_for(mothership.boundary.size())
print(f" Quorum threshold = ⌈2*{mothership.boundary.size()}/3⌉ = {threshold}")
# All honest agents should agree on the ratified set — show by querying
# each of them and confirming.
ratified_per_agent = {
name: mothership.ratified_alarms_from(name)
for name in mothership.agents
@ -118,10 +114,7 @@ def _run_demo(args: argparse.Namespace) -> int:
canonical = ratified_per_agent[name]
elif ratified_per_agent[name] != canonical:
all_agree = False
if all_agree:
marker = ""
else:
marker = ""
marker = "" if all_agree else ""
print(f" {marker} every honest agent's ratified set is identical "
f"({'no chokepoint' if all_agree else 'DIVERGENCE'}).")
@ -141,7 +134,6 @@ def _run_demo(args: argparse.Namespace) -> int:
out_dir.mkdir(parents=True, exist_ok=True)
for r in canonical:
proof = build_proof(r)
# Use a stable filename based on accused + statement
accused_short = r.accused_process_id_hex[:16]
path = out_dir / f"proof_{accused_short}_{r.statement_id}.json"
path.write_text(proof.to_json())
@ -173,7 +165,7 @@ def _run_verify(args: argparse.Namespace) -> int:
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
prog="crisis-agents",
description="Crisis-Agents — decentralized coordination for AI agent teams.",
description="Crisis-Agents — decentralized async coordination for AI agent teams.",
)
sub = parser.add_subparsers(dest="cmd", required=True)
@ -184,8 +176,7 @@ def main(argv: list[str] | None = None) -> int:
help="back the honest agents with real Claude API calls "
"(requires anthropic SDK + ANTHROPIC_API_KEY)")
demo.add_argument("--model", default=None,
help="Anthropic model id for --live (default: "
"claude-haiku-4-5-20251001)")
help="Anthropic model id for --live")
demo.add_argument("--out-dir", default=".",
help="where to write proof JSON files (default: cwd)")

View file

@ -75,6 +75,14 @@ class LiveClaudeAgent(CrisisAgent):
self._system_prompt = system_prompt or self._default_system_prompt()
self._invocations = 0
self._already_adjudicated: set[str] = set()
# Conversation context — populated via observe() callbacks during
# the closed phase, and supplemented by self.graph introspection
# during the Crisis phase.
self._observed_history: list[Claim] = []
def observe(self, claim: Claim) -> None:
"""Record a peer's claim into our conversation context."""
self._observed_history.append(claim)
@staticmethod
def _default_system_prompt() -> str:
@ -115,8 +123,13 @@ class LiveClaudeAgent(CrisisAgent):
self._client = anthropic.Anthropic()
return self._client
def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]:
"""Issue one API call, parse, return Claims as AgentTurns."""
def try_emit(self) -> list[AgentTurn]:
"""Issue one API call, parse, return Claims as AgentTurns.
Context is built from both `self._observed_history` (closed-phase
observations) and any Claim payloads in `self.graph` (Crisis-phase
gossiped messages from peers).
"""
self._invocations += 1
# Which statements still need a verdict from me?
@ -125,7 +138,18 @@ class LiveClaudeAgent(CrisisAgent):
if not pending:
return []
user_message = self._render_user_message(pending, received_claims)
# Crisis-phase: also peek at the agent's own graph for peer claims.
graph_observations: list[Claim] = []
for v in self.graph.all_vertices():
if v.id == self.process_id:
continue
try:
graph_observations.append(Claim.from_payload(v.payload))
except (ValueError, TypeError):
continue
context = self._observed_history + graph_observations
user_message = self._render_user_message(pending, context)
client = self._get_client()
response = client.messages.create(

View file

@ -1,19 +1,27 @@
"""
Mothership bootstrap + clock, **not** an observer.
Mothership bootstrap + asynchronous driver.
The mothership's only privileged role is starting the network: it knows the
initial member set, it asks each agent to take its turn, and it routes the
first hop of each emission to the sender's chosen target subset. After that
first hop, gossip rounds propagate messages and each agent reaches its own
view of the network.
Two roles, both unprivileged:
What the mothership deliberately does NOT do (which the previous version
did, and was correctly criticized for):
- hold a dict of all agents' LamportGraphs
- wrap Claims into Crisis Messages on agents' behalf
- scan any agent's graph for byzantine behavior
1. Bootstrap. The mothership knows the initial member set, introduces a
joining agent into the boundary, and offers convenience accessors for
tests. It never reads any agent's internal state in ways that would
bypass the protocol.
Those responsibilities belong to the agents.
2. Driver. The mothership runs an event-loop-like cycle that asks each
agent for any pending emissions, exchanges gossip between any pair,
and asks each agent for any pending alarm claims all interleaved
in one loop until quiescent. There is no global clock; the driver
is the in-process analog of "agents run their gossip + emission
loops concurrently forever" that Section 5.9 of the paper describes.
What the mothership deliberately does NOT have:
- a synchronous turn counter exposed to agents
- a privileged graph store
- any post-hoc scan over per-agent state
Termination is by quiescence: when no agent emits, no gossip pair has new
information, and no fresh alarms appear, the loop exits.
"""
from __future__ import annotations
@ -30,9 +38,13 @@ from crisis_agents.claim import Claim
@dataclass
class ClosedPhaseEntry:
"""One row in the closed-phase log: who said what, when."""
"""One row in the closed-phase log.
`step` is a local sequence number used only for log ordering it is
NOT a global tick the agents observe.
"""
agent_name: str
turn: int
step: int
claim: Claim
@ -40,12 +52,11 @@ class ClosedPhaseEntry:
class CrisisPhaseEntry:
"""Audit trail of an emission event during the Crisis phase.
Kept for proof generation and human-readable demos. Detection itself
does NOT consult this log that work happens in each agent's
`detect_mutations()` against its own graph.
`step` is a local sequence number. Detection itself does not consult
this log that work happens in each agent's own `detect_mutations()`.
"""
agent_name: str
turn: int
step: int
claim: Claim
message_digest_hex: str
delivered_to: list[str]
@ -57,16 +68,27 @@ class MothershipRunResult:
crisis_log: list[CrisisPhaseEntry] = field(default_factory=list)
@dataclass(frozen=True)
class QuiescenceReport:
"""How a driver loop terminated."""
steps: int
emissions: int
gossip_transfers: int
alarm_claims_emitted: int
reached_quiescence: bool # True iff loop exited because nothing changed
# (False iff max_steps was hit first)
class Mothership:
"""Coordinator for a team of CrisisAgents.
Lifecycle:
m = Mothership()
m.add_agent(...); m.add_agent(...); m.add_agent(...)
m.run_closed_phase(num_turns=1)
m.open_boundary(joining_agent)
m.run_crisis_phase(num_turns=2, gossip_rounds_per_turn=1)
# detection is decentralized — each agent's .detect_mutations()
m.run_closed_phase() # async until quiescent (no clock)
m.open_boundary(joiner)
m.run_until_quiescent() # async until quiescent (no clock)
# detection is decentralized — m.ratified_alarms_from("agent_alpha")
"""
def __init__(self, *, pow_zeros: int = 0):
@ -74,24 +96,16 @@ class Mothership:
self.boundary = Boundary()
self.run_result = MothershipRunResult()
# Shared weight system across the network — every agent's PoW must
# be verifiable by every other agent's graph, so the threshold has
# to match. Assigned to each agent's graph at registration time.
# Shared weight system so every agent's PoW is verifiable by every
# other agent's graph.
self._weight_system: WeightSystem = ProofOfWorkWeight(min_leading_zeros=pow_zeros)
self._closed_turn_index = 0
self._crisis_turn_index = 0
# ------------------------------------------------------------------
# Setup
# ------------------------------------------------------------------
def add_agent(self, agent: CrisisAgent) -> None:
"""Register a trusted agent for the closed-phase team.
Replaces the agent's weight system with the mothership's shared one
so PoW thresholds match across the network.
"""
"""Register a trusted agent for the closed-phase team."""
if self.boundary.is_open:
raise RuntimeError("cannot add_agent after boundary opened; use open_boundary")
if agent.name in self.agents:
@ -101,33 +115,6 @@ class Mothership:
self.agents[agent.name] = agent
self.boundary.add_trusted(agent.process_id)
# ------------------------------------------------------------------
# Phase 1: closed
# ------------------------------------------------------------------
def run_closed_phase(self, num_turns: int) -> MothershipRunResult:
"""Drive `num_turns` of plain agent communication. No Crisis."""
if self.boundary.is_open:
raise RuntimeError("boundary already open; closed phase is over")
observed: list[Claim] = [e.claim for e in self.run_result.closed_log]
for _ in range(num_turns):
turn = self._closed_turn_index
new_this_turn: list[Claim] = []
for agent in self.agents.values():
for at in agent.next_turn(turn, observed):
self.run_result.closed_log.append(
ClosedPhaseEntry(agent_name=agent.name, turn=turn, claim=at.claim)
)
new_this_turn.append(at.claim)
observed.extend(new_this_turn)
self._closed_turn_index += 1
return self.run_result
# ------------------------------------------------------------------
# Phase 2: boundary opens
# ------------------------------------------------------------------
def open_boundary(self, new_agent: CrisisAgent) -> None:
"""A new agent of unknown trust joins. Crisis activates."""
if new_agent.name in self.agents:
@ -138,100 +125,148 @@ class Mothership:
self.boundary.open(new_agent.process_id)
# ------------------------------------------------------------------
# Crisis-phase mechanics: emission → gossip
# Closed phase — quiescence-driven, not turn-counted
# ------------------------------------------------------------------
def _crisis_received_view(self, agent: CrisisAgent) -> list[Claim]:
"""Decode every non-self vertex in `agent`'s graph back to Claim form.
def run_closed_phase(self, max_steps: int = 50) -> QuiescenceReport:
"""Drive the closed-phase conversation until quiescent.
Used to populate the `received_claims` argument of `next_turn()` so
each agent sees what it has actually observed (not what the mothership
observed they may differ if gossip has been partial).
Each iteration, each agent's `try_emit()` is called. Emitted claims
are appended to the closed log and broadcast (via `observe`) to
every other agent for context. The loop exits when no agent emits.
"""
out: list[Claim] = []
for v in agent.graph.all_vertices():
if v.id == agent.process_id:
if self.boundary.is_open:
raise RuntimeError("boundary already open; closed phase is over")
step = 0
emissions = 0
progress = True
while progress and step < max_steps:
progress = False
for agent in self.agents.values():
for at in agent.try_emit():
self.run_result.closed_log.append(
ClosedPhaseEntry(agent_name=agent.name, step=step, claim=at.claim)
)
emissions += 1
progress = True
# Share to every other agent's observation buffer
for peer_name, peer in self.agents.items():
if peer_name == agent.name:
continue
try:
out.append(Claim.from_payload(v.payload))
except (ValueError, TypeError):
continue # non-Claim payloads (e.g. AlarmClaim — phase 23)
return out
peer.observe(at.claim)
step += 1
def run_crisis_phase(self, num_turns: int,
*, gossip_rounds_per_turn: int = 1) -> MothershipRunResult:
"""Drive `num_turns` of Crisis-active activity.
return QuiescenceReport(
steps=step,
emissions=emissions,
gossip_transfers=0,
alarm_claims_emitted=0,
reached_quiescence=(step < max_steps or not progress),
)
Each turn:
1. Every agent's `next_turn()` runs; emissions are routed first-hop
to their declared target_subset (or broadcast to everyone).
2. `gossip_rounds_per_turn` rounds of all-pairs gossip propagate
messages across the network.
# ------------------------------------------------------------------
# Crisis phase — fully asynchronous event loop
# ------------------------------------------------------------------
def run_until_quiescent(self, max_steps: int = 200) -> QuiescenceReport:
"""Drive the Crisis-active network until nothing changes.
Each iteration of the loop interleaves three concerns:
1. **Emission.** For every agent, call `try_emit()`. Route any
returned emissions to their target subset.
2. **Gossip.** Run one all-pairs gossip round. Each receiver
accepts vertices that pass integrity checks.
3. **Alarm emission.** For every agent, call `pending_alarm_claims()`.
Wrap each into a Crisis Message and broadcast.
These are not phases they're things the driver tries on each
step. The loop exits when none of them makes progress.
"""
if not self.boundary.is_open:
raise RuntimeError("boundary not yet open; call open_boundary() first")
step = 0
emissions = 0
gossip_transfers = 0
alarms_emitted = 0
all_names = list(self.agents.keys())
for _ in range(num_turns):
turn = self._crisis_turn_index
while step < max_steps:
progress = False
# (1) Emission phase — ask each agent what they want to say.
# The agent builds the Crisis Message from its own graph.
# The mothership only handles the first-hop routing.
# 1. Emissions
for agent in self.agents.values():
received = self._crisis_received_view(agent)
for at in agent.next_turn(turn, received):
self._route_emission(agent, turn, at, all_names)
for at in agent.try_emit():
self._route_emission(agent, step, at, all_names)
emissions += 1
progress = True
# (2) Gossip — each pair exchanges what they have until quiescent.
for _ in range(gossip_rounds_per_turn):
self.run_gossip_round()
# 2. Gossip
transfers = self.run_gossip_round()
if transfers:
gossip_transfers += sum(transfers.values())
progress = True
self._crisis_turn_index += 1
# 3. Alarm emissions
for agent in self.agents.values():
for ac in agent.pending_alarm_claims():
self._broadcast_alarm(agent, ac)
alarms_emitted += 1
progress = True
return self.run_result
step += 1
if not progress:
break
def _route_emission(self, sender: CrisisAgent, turn: int, at: AgentTurn,
return QuiescenceReport(
steps=step,
emissions=emissions,
gossip_transfers=gossip_transfers,
alarm_claims_emitted=alarms_emitted,
reached_quiescence=(step < max_steps),
)
def _route_emission(self, sender: CrisisAgent, step: int, at: AgentTurn,
all_names: list[str]) -> None:
"""First-hop delivery + audit log entry.
Delivery rule (same as before kept for byzantine equivocation):
- target_subset is None broadcast (every agent including sender)
- target_subset is set targeted; sender's own graph NOT auto-included
- target_subset=None broadcast (every agent including sender)
- target_subset=set targeted; sender's own graph NOT auto-included
"""
if at.target_subset is None:
targets = list(all_names)
else:
targets = [t for t in at.target_subset if t in self.agents]
# The agent wraps the Claim using its own graph as the source of truth.
message = sender.emit_claim(at.claim)
for tname in targets:
self.agents[tname].receive(message)
self.run_result.crisis_log.append(
CrisisPhaseEntry(
agent_name=sender.name,
turn=turn,
step=step,
claim=at.claim,
message_digest_hex=message.compute_digest().hex(),
delivered_to=targets,
)
)
def _broadcast_alarm(self, sender: CrisisAgent, alarm_claim) -> None:
"""Wrap an AlarmClaim into a Crisis Message and deliver to every
agent (including sender, so its own tally is consistent)."""
payload = alarm_claim.to_payload()
message = sender._build_message(payload)
for target in self.agents.values():
target.receive(message)
def run_gossip_round(self) -> dict[tuple[str, str], int]:
"""One all-pairs gossip round.
For every ordered pair (sender, receiver), the sender shares everything
in its graph that the receiver doesn't yet have. Returns a dict mapping
(sender_name, receiver_name) -> number of newly-accepted vertices.
Order matters mildly: if A -> B propagates new info to B that B then
re-emits to C, that's covered in this same round only if A appears
before B in the iteration. We loop until no progress to avoid edge
cases. In practice one ordered pass is usually enough.
For every ordered pair (sender, receiver), the sender shares
everything in its graph that the receiver doesn't yet have.
"""
names = list(self.agents.keys())
transfers: dict[tuple[str, str], int] = {}
@ -245,110 +280,14 @@ class Mothership:
return transfers
# ------------------------------------------------------------------
# Decentralized alarm flow — orchestration only; the work is per-agent
# Decentralized alarm tally — convenience method
# ------------------------------------------------------------------
def emit_alarms_from_detectors(self,
*, accuse_self_ok: bool = False
) -> dict[str, list]:
"""Every agent independently runs `detect_mutations()` on its own
graph; any LocalAlarms it produces become AlarmClaims that the agent
emits into the gossip layer (broadcast).
Returns a dict mapping agent_name -> list[LocalAlarm] (what each
agent independently found). Callers can use this for diagnostics
without ever having read into an agent's graph directly.
The byzantine joiner will of course not emit alarms about itself.
If `accuse_self_ok` is False (the default), we additionally skip
any LocalAlarm whose `detector_process_id_hex` matches the
`accused_process_id_hex` sanity guard against malformed cases.
"""
from crisis_agents.vote import AlarmClaim
all_local: dict[str, list] = {}
for agent in self.agents.values():
locals_for_agent = agent.detect_mutations()
if not accuse_self_ok:
locals_for_agent = [
a for a in locals_for_agent
if a.detector_process_id_hex != a.accused_process_id_hex
]
all_local[agent.name] = locals_for_agent
for local in locals_for_agent:
alarm_claim = AlarmClaim.from_local_alarm(
local, detected_at_turn=self._crisis_turn_index,
)
# Wrap the AlarmClaim's payload into a Crisis Message and
# broadcast it. We bypass the Claim/AgentTurn machinery
# because AlarmClaim is a different payload schema.
self._broadcast_alarm(agent, alarm_claim)
return all_local
def _broadcast_alarm(self, sender: CrisisAgent, alarm_claim) -> None:
"""Wrap an AlarmClaim into a Crisis Message via the sender's
`emit_claim` machinery (re-using the digest-build + PoW path)
but with the AlarmClaim payload, and broadcast it to all peers
(including the sender so its own ratified set is consistent)."""
# We can't pass a non-Claim into emit_claim directly because emit_claim
# types its argument as Claim. Hack-around: build the Message manually
# using the same chain/cross-ref logic as emit_claim. Cleaner
# alternative is to refactor emit_claim to accept any payload-bytes
# producer; for now this duplication is minor.
import os
from crisis.message import Message, NONCE_LENGTH
from crisis.weight import ProofOfWorkWeight
payload = alarm_claim.to_payload()
digests_list: list[bytes] = []
same_id = [v for v in sender.graph.all_vertices() if v.id == sender.process_id]
past_digests: set[bytes] = set()
if same_id:
referenced = set()
for v in same_id:
for d in v.digests:
ref = sender.graph.get_vertex(d)
if ref is not None and ref.id == sender.process_id:
referenced.add(d)
last = next(
(v for v in same_id if v.message_digest not in referenced),
same_id[-1],
)
digests_list.append(last.message_digest)
past_digests = {v.message_digest for v in sender.graph.past(last)}
seen_other_ids: set[bytes] = {sender.process_id}
for v in sender.graph.all_vertices():
if v.id in seen_other_ids:
continue
if v.message_digest in past_digests:
continue
digests_list.append(v.message_digest)
seen_other_ids.add(v.id)
if isinstance(sender.weight_system, ProofOfWorkWeight):
message = sender.weight_system.mine_nonce(
sender.process_id, tuple(digests_list), payload,
)
else:
message = Message(
nonce=os.urandom(NONCE_LENGTH),
id=sender.process_id,
digests=tuple(digests_list),
payload=payload,
)
for target in self.agents.values():
target.receive(message)
def ratified_alarms_from(self, agent_name: str):
"""Get the ratified alarms as seen from one agent's graph.
With sufficient gossip, every honest agent's graph produces the same
ratified set and the test in test_no_chokepoint.py asserts that.
With sufficient gossip, every honest agent's graph produces the
same ratified set asserted by `test_no_chokepoint.py`.
"""
from crisis_agents.vote import quorum_for, tally_alarms
threshold = quorum_for(self.boundary.size())
@ -360,17 +299,13 @@ class Mothership:
# ------------------------------------------------------------------
def honest_agents(self) -> list[CrisisAgent]:
"""The agents trusted at the start (closed-phase team) — i.e. every
agent except the boundary-opener. Use only as a demo aid; in a real
network the mothership doesn't reliably know who's honest."""
"""The closed-phase team (every agent except the boundary-opener)."""
if not self.boundary.is_open:
return list(self.agents.values())
# The boundary-opener is added last via open_boundary(); peel it off.
all_agents = list(self.agents.values())
return all_agents[:-1]
def joiner(self) -> Optional[CrisisAgent]:
"""The boundary-opener, if any."""
if not self.boundary.is_open:
return None
return list(self.agents.values())[-1]

View file

@ -150,8 +150,6 @@ def build_byzantine_joiner() -> CrisisAgent:
class Scenario:
name: str
description: str
closed_phase_turns: int
crisis_phase_turns: int
honest_agents: list[CrisisAgent]
byzantine_joiner: CrisisAgent
reference_doc: str
@ -178,12 +176,11 @@ def build_fact_check_scenario(*, live: bool = False,
"Three honest agents adjudicate six factual statements against "
"a small reference doc. A fourth agent joins the team after the "
"boundary opens and equivocates on statement s03. Crisis is "
"decentralized: every honest agent independently detects, emits "
"an AlarmClaim, and ratifies the alarm by quorum vote." + suffix
"decentralized AND asynchronous: every honest agent "
"independently detects, emits an AlarmClaim, and ratifies the "
"alarm by quorum vote. The mothership drives an event loop to "
"quiescence — no global clock." + suffix
),
closed_phase_turns=1,
# 2 Crisis turns: intro (turn 0) + equivocation (turn 1)
crisis_phase_turns=2,
honest_agents=honest,
byzantine_joiner=build_byzantine_joiner(),
reference_doc=load_reference_doc(),

View file

@ -45,11 +45,14 @@ class AlarmClaim:
Serializes to JSON for the Crisis Message payload. Recognizable by
`kind == "alarm"`, distinguishing it from a regular `Claim` payload
(which has `kind` absent or != "alarm" by convention).
`emitted_at_step` is the agent's local sequence number for ordering;
it is NOT a global clock tick Crisis is asynchronous.
"""
accused_process_id_hex: str
statement_id: str
witness_digests: tuple[str, str]
detected_at_turn: int
emitted_at_step: int
kind: str = ALARM_KIND
def to_payload(self) -> bytes:
@ -64,16 +67,16 @@ class AlarmClaim:
accused_process_id_hex=obj["accused_process_id_hex"],
statement_id=obj["statement_id"],
witness_digests=tuple(obj["witness_digests"]), # type: ignore[arg-type]
detected_at_turn=obj["detected_at_turn"],
emitted_at_step=obj["emitted_at_step"],
)
@classmethod
def from_local_alarm(cls, alarm: LocalAlarm, detected_at_turn: int) -> "AlarmClaim":
def from_local_alarm(cls, alarm: LocalAlarm, emitted_at_step: int) -> "AlarmClaim":
return cls(
accused_process_id_hex=alarm.accused_process_id_hex,
statement_id=alarm.statement_id,
witness_digests=alarm.witness_digests,
detected_at_turn=detected_at_turn,
emitted_at_step=emitted_at_step,
)

View file

@ -37,7 +37,7 @@ def _post_gossip_team() -> Mothership:
split_b={"b"},
)
m.open_boundary(byz)
m.run_crisis_phase(num_turns=2, gossip_rounds_per_turn=1)
m.run_until_quiescent()
return m
@ -49,7 +49,7 @@ class TestDecentralizedDetection:
m.add_agent(MockAgent("b", [[]]))
joiner = MockByzantineAgent("d", _intro(), [], set(), set())
m.open_boundary(joiner)
m.run_crisis_phase(num_turns=1, gossip_rounds_per_turn=1)
m.run_until_quiescent()
# Every agent's own detection returns empty
for agent in m.agents.values():

View file

@ -0,0 +1,114 @@
"""Async-quiescence properties — the new tests that protect the no-clock invariant.
If you accidentally bake a synchronous tick back into the driver, one of these
tests should fail.
"""
from crisis_agents.agent import MockAgent, MockByzantineAgent
from crisis_agents.claim import Claim
from crisis_agents.mothership import Mothership
from crisis_agents.vote import quorum_for
def _claim(sid: str, verdict: str = "true", evidence: str = "ok") -> Claim:
return Claim(statement_id=sid, verdict=verdict, confidence=0.9, # type: ignore[arg-type]
evidence=evidence, timestamp_logical=0)
def _intro(name: str = "delta") -> Claim:
return Claim(statement_id=f"intro:{name}", verdict="unknown", confidence=1.0,
evidence=f"{name} joining the team", timestamp_logical=0)
def _build_fresh_team() -> Mothership:
m = Mothership()
m.add_agent(MockAgent("a", [[]]))
m.add_agent(MockAgent("b", [[]]))
m.add_agent(MockAgent("c", [[]]))
byz = MockByzantineAgent(
"d", _intro(),
scripted_pairs=[(
_claim("s03", verdict="true", evidence="to_ac"),
_claim("s03", verdict="false", evidence="to_b"),
)],
split_a={"a", "c"},
split_b={"b"},
)
m.open_boundary(byz)
return m
class TestAsyncQuiescence:
def test_run_until_quiescent_terminates(self):
"""The loop must terminate. If it doesn't, there's a logic bug
in the quiescence detection."""
m = _build_fresh_team()
report = m.run_until_quiescent(max_steps=200)
assert report.reached_quiescence
assert report.steps < 200
def test_two_runs_produce_identical_final_state(self):
"""Running the same scenario twice must produce the same ratified-set,
confirming there's no hidden non-deterministic ordering in the loop.
"""
m1 = _build_fresh_team()
m1.run_until_quiescent()
m2 = _build_fresh_team()
m2.run_until_quiescent()
for name in ("a", "b", "c"):
assert m1.ratified_alarms_from(name) == m2.ratified_alarms_from(name)
def test_max_steps_bound_caps_runtime(self):
"""If we set max_steps to 1, the loop must exit even though
quiescence wasn't reached. The QuiescenceReport must accurately
say so."""
m = _build_fresh_team()
report = m.run_until_quiescent(max_steps=1)
# With one step we won't have propagated alarms through gossip
assert report.steps == 1
# reached_quiescence might be False because we capped out
# (the byzantine has more emissions pending)
# The important property: the loop exited and reported honestly.
assert isinstance(report.reached_quiescence, bool)
def test_no_turn_argument_exposed_to_agents(self):
"""Regression guard: CrisisAgent.try_emit() takes no arguments.
If anyone re-adds a `turn` parameter, this fails at the type-check
level when MockAgent.try_emit is called."""
import inspect
from crisis_agents.agent import CrisisAgent
sig = inspect.signature(CrisisAgent.try_emit)
# self plus no other parameters
params = list(sig.parameters)
assert params == ["self"], f"try_emit grew arguments: {params}"
def test_no_turn_field_on_alarmclaim(self):
"""Regression guard: AlarmClaim no longer has a `detected_at_turn`
field. It has `emitted_at_step` a sequence number, not a clock tick."""
from crisis_agents.vote import AlarmClaim
fields = AlarmClaim.__dataclass_fields__
assert "detected_at_turn" not in fields
assert "emitted_at_step" in fields
def test_alarms_propagate_through_async_loop_alone(self):
"""The async loop should detect, emit alarms, and ratify — all without
the caller having to invoke separate emit_alarms_from_detectors() or
run_gossip_round() steps.
"""
m = _build_fresh_team()
m.run_until_quiescent()
threshold = quorum_for(m.boundary.size())
for name in ("a", "b", "c"):
ratified = m.ratified_alarms_from(name)
assert len(ratified) == 1
r = ratified[0]
assert r.signer_count >= threshold
def test_quiescence_report_counts_match_logs(self):
"""Sanity: the report's emission count must equal the crisis log length."""
m = _build_fresh_team()
report = m.run_until_quiescent()
assert report.emissions == len(m.run_result.crisis_log)

View file

@ -22,7 +22,6 @@ class TestFactCheckEndToEnd:
assert s.name == "fact_check"
assert len(s.honest_agents) == 3
assert s.byzantine_joiner.name == "agent_delta"
assert s.crisis_phase_turns == 2 # intro + equivocation
assert "Pluto" in s.reference_doc
def test_runs_through_all_phases(self):
@ -30,14 +29,13 @@ class TestFactCheckEndToEnd:
m = Mothership()
for a in s.honest_agents:
m.add_agent(a)
m.run_closed_phase(num_turns=s.closed_phase_turns)
m.run_closed_phase()
m.open_boundary(s.byzantine_joiner)
m.run_crisis_phase(num_turns=s.crisis_phase_turns,
gossip_rounds_per_turn=1)
m.emit_alarms_from_detectors()
m.run_gossip_round()
# One async run, no clock — alarms emit and propagate inside the loop
report = m.run_until_quiescent()
assert report.reached_quiescence
assert report.alarm_claims_emitted >= 3
# Every honest agent ratifies the same single alarm.
threshold = quorum_for(m.boundary.size())
ratified_sets = [
m.ratified_alarms_from(name)
@ -55,12 +53,9 @@ class TestFactCheckEndToEnd:
m = Mothership()
for a in s.honest_agents:
m.add_agent(a)
m.run_closed_phase(num_turns=s.closed_phase_turns)
m.run_closed_phase()
m.open_boundary(s.byzantine_joiner)
m.run_crisis_phase(num_turns=s.crisis_phase_turns,
gossip_rounds_per_turn=1)
m.emit_alarms_from_detectors()
m.run_gossip_round()
m.run_until_quiescent()
r = m.ratified_alarms_from("agent_alpha")[0]
proof = build_proof(r)

View file

@ -66,7 +66,7 @@ class TestLiveClaudeAgent:
"agent_alpha", reference_doc=_REF,
statements=_STATEMENTS, client=client,
)
turns = agent.next_turn(turn=0, received_claims=[])
turns = agent.try_emit()
assert len(turns) == 2
assert {t.claim.statement_id for t in turns} == {"s01", "s02"}
verdicts = {t.claim.statement_id: t.claim.verdict for t in turns}
@ -84,7 +84,7 @@ class TestLiveClaudeAgent:
"agent_alpha", reference_doc=_REF,
statements=_STATEMENTS, client=client,
)
turns = agent.next_turn(turn=0, received_claims=[])
turns = agent.try_emit()
assert len(turns) == 1
assert turns[0].claim.statement_id == "s01"
@ -94,7 +94,7 @@ class TestLiveClaudeAgent:
"agent_alpha", reference_doc=_REF,
statements=_STATEMENTS, client=client,
)
turns = agent.next_turn(turn=0, received_claims=[])
turns = agent.try_emit()
assert turns == []
def test_skips_invalid_claim_objects_in_response(self):
@ -108,7 +108,7 @@ class TestLiveClaudeAgent:
"agent_alpha", reference_doc=_REF,
statements=_STATEMENTS, client=client,
)
turns = agent.next_turn(turn=0, received_claims=[])
turns = agent.try_emit()
# Only the first item passes validation: bogus verdict and non-dict get skipped.
assert len(turns) == 1
assert turns[0].claim.statement_id == "s01"
@ -122,11 +122,11 @@ class TestLiveClaudeAgent:
statements=_STATEMENTS, client=client,
)
# First call adjudicates s01
first = agent.next_turn(turn=0, received_claims=[])
first = agent.try_emit()
assert {t.claim.statement_id for t in first} == {"s01"}
# Second call should only ask about s02 (s01 is already done)
second = agent.next_turn(turn=1, received_claims=[])
second = agent.try_emit()
assert {t.claim.statement_id for t in second} == {"s02"}
# The prompt sent for the second call should NOT mention s01
@ -151,6 +151,6 @@ class TestLiveClaudeAgent:
"agent_alpha", reference_doc=_REF,
statements=_STATEMENTS, client=client,
)
turns = agent.next_turn(turn=0, received_claims=[])
turns = agent.try_emit()
assert len(turns) == 1
assert len(turns[0].claim.evidence) == Claim.EVIDENCE_MAX_LEN

View file

@ -25,10 +25,13 @@ class TestClosedPhase:
m = Mothership()
m.add_agent(MockAgent("a", [[_claim("s01")]]))
m.add_agent(MockAgent("b", [[_claim("s01")]]))
result = m.run_closed_phase(num_turns=1)
report = m.run_closed_phase()
# Two agents emitted one claim each via the closed-phase log
assert len(result.closed_log) == 2
assert len(m.run_result.closed_log) == 2
# The async loop reached quiescence within the step budget
assert report.reached_quiescence
assert report.emissions == 2
# No Crisis messages sent yet, so per-agent graphs are still empty
for agent in m.agents.values():
@ -69,20 +72,26 @@ class TestCrisisPhaseAgentOwnership:
# Joiner with a single broadcast intro, no equivocation script
joiner = MockByzantineAgent("d", _intro(), [], set(), set())
m.open_boundary(joiner)
m.run_crisis_phase(num_turns=1, gossip_rounds_per_turn=0)
m.run_until_quiescent()
for name, agent in m.agents.items():
assert agent.graph.vertex_count() == 1, (
f"agent {name!r} should have received the intro broadcast"
)
def test_targeted_emission_skips_non_targets(self):
"""A target_subset emission only reaches its named peers."""
def test_targeted_emission_seeds_disjoint_views(self):
"""After the async loop with gossip, every honest agent sees both
variants but the byzantine itself never has both in its own graph
(it never re-receives its own targeted emissions, and gossip from
honest peers may or may not feed them back).
The protocol-level invariant: the byzantine's two contradictory
vertices end up reachable to every honest agent. THAT is what
decentralized detection depends on.
"""
m = Mothership()
m.add_agent(MockAgent("a", [[]]))
m.add_agent(MockAgent("b", [[]]))
# Byzantine: emits intro to everyone (turn 0), then equivocation
# to {a} vs {b} (turn 1).
byz = MockByzantineAgent(
"d", _intro(),
scripted_pairs=[(
@ -93,19 +102,18 @@ class TestCrisisPhaseAgentOwnership:
split_b={"b"},
)
m.open_boundary(byz)
m.run_crisis_phase(num_turns=2, gossip_rounds_per_turn=0)
m.run_until_quiescent()
# a has: intro + variant-true; b has: intro + variant-false; d has: intro
graphs = {n: a.graph for n, a in m.agents.items()}
assert graphs["a"].vertex_count() == 2
assert graphs["b"].vertex_count() == 2
assert graphs["d"].vertex_count() == 1 # targeted emissions skip sender
# The variant payloads are distinct between a and b
a_payloads = [v.payload for v in graphs["a"].all_vertices()]
b_payloads = [v.payload for v in graphs["b"].all_vertices()]
assert any(b'"verdict":"true"' in p for p in a_payloads)
assert any(b'"verdict":"false"' in p for p in b_payloads)
# Every honest agent's graph has both variants of the equivocation
# (the post-condition that lets decentralized detection work).
for name in ("a", "b"):
payloads = [v.payload for v in m.agents[name].graph.all_vertices()]
assert any(b'"verdict":"true"' in p for p in payloads), (
f"agent {name!r} missing the true-variant"
)
assert any(b'"verdict":"false"' in p for p in payloads), (
f"agent {name!r} missing the false-variant"
)
class TestGossipRound:
@ -128,7 +136,7 @@ class TestGossipRound:
)
m.open_boundary(byz)
# Two turns (intro + equivocation), then gossip
m.run_crisis_phase(num_turns=2, gossip_rounds_per_turn=1)
m.run_until_quiescent()
# After gossip, every honest agent should have both byzantine variants
# (intro + 2 equivocations = 3 vertices minimum). The byzantine itself
@ -155,4 +163,4 @@ class TestGossipRound:
m = Mothership()
m.add_agent(MockAgent("a", [[_claim("s01")]]))
with pytest.raises(RuntimeError, match="boundary not yet open"):
m.run_crisis_phase(num_turns=1)
m.run_until_quiescent()

View file

@ -35,9 +35,7 @@ def test_all_honest_agents_agree_on_ratified_alarms():
split_a={"a", "c"},
split_b={"b"},
))
m.run_crisis_phase(num_turns=2, gossip_rounds_per_turn=1)
m.emit_alarms_from_detectors()
m.run_gossip_round()
m.run_until_quiescent()
# The headline assertion: three independent vantage points; same result.
ratified_per_agent = {
@ -73,9 +71,7 @@ def test_byzantine_alone_cannot_ratify():
m.add_agent(MockAgent("c", [[]]))
# No equivocation script — boundary opens cleanly.
m.open_boundary(MockByzantineAgent("d", _intro(), [], set(), set()))
m.run_crisis_phase(num_turns=1, gossip_rounds_per_turn=1)
m.emit_alarms_from_detectors()
m.run_gossip_round()
m.run_until_quiescent()
# No honest agent should have ratified anything.
for name in ("a", "b", "c"):

View file

@ -42,11 +42,9 @@ def _full_run() -> Mothership:
split_b={"b"},
)
m.open_boundary(byz)
m.run_crisis_phase(num_turns=2, gossip_rounds_per_turn=1)
m.run_until_quiescent()
# Honest agents emit AlarmClaims based on what they observed.
m.emit_alarms_from_detectors()
# One more gossip round so every honest agent sees all AlarmClaims.
m.run_gossip_round()
return m
@ -69,7 +67,7 @@ class TestAlarmClaimRoundtrip:
accused_process_id_hex="76468f93",
statement_id="s03",
witness_digests=("aaaa", "bbbb"),
detected_at_turn=1,
emitted_at_step=1,
)
roundtrip = AlarmClaim.from_payload(ac.to_payload())
assert roundtrip == ac
@ -82,11 +80,11 @@ class TestAlarmClaimRoundtrip:
statement_id="s03",
witness_digests=("aa", "bb"),
)
ac = AlarmClaim.from_local_alarm(la, detected_at_turn=5)
ac = AlarmClaim.from_local_alarm(la, emitted_at_step=5)
assert ac.accused_process_id_hex == "22"
assert ac.statement_id == "s03"
assert ac.witness_digests == ("aa", "bb")
assert ac.detected_at_turn == 5
assert ac.emitted_at_step == 5
def test_rejects_non_alarm_payload(self):
regular_claim = Claim(