crisis/tests/test_alarm.py
saymrwulf 0976239ebd crisis_agents: drop the wall-clock, drive asynchronously to quiescence
The previous driver imposed a synchronous turn-counted clock that the
Crisis paper explicitly forbids — Crisis is supposed to work in
asynchronous P2P networks, with any synchronicity being virtual and
derived inside the consensus algorithm from the DAG structure, not
imposed externally by a coordinator. This commit removes the wall clock.

What changed in the engine:

  - `Mothership.run_crisis_phase(num_turns, gossip_rounds_per_turn)`
    is replaced by `run_until_quiescent(max_steps=200)`. The loop
    interleaves three concerns on each iteration — emissions, gossip,
    and alarm emissions — until none make progress. Termination is by
    quiescence, not by a fixed turn count. `max_steps` is a safety
    bound (loop-iteration cap), not an exposed clock.

  - `Mothership.run_closed_phase(num_turns)` becomes
    `run_closed_phase(max_steps=50)`. Same quiescence model — the
    closed-phase conversation runs until no agent has more to say.

  - Agents grew `pending_alarm_claims()`: each agent checks its own
    graph for un-alarmed mutations and produces AlarmClaims directly.
    The driver loop calls this every iteration, so alarms emit and
    propagate in the same loop as regular emissions and gossip — no
    separate "alarm phase."

  - `Mothership.emit_alarms_from_detectors()` and the explicit
    `run_gossip_round()` step are no longer needed by callers; both
    are subsumed by the async loop. `run_gossip_round()` stays as a
    helper but tests no longer call it externally.

What changed in the agent interface:

  - `CrisisAgent.next_turn(turn, received_claims)` becomes
    `try_emit()` — no arguments. Agents in an async network don't see
    a global tick. They decide based on their own internal state.

  - `CrisisAgent.observe(claim)` is the new optional callback the
    closed-phase loop uses to feed context into agents that care
    (overridden by LiveClaudeAgent to populate its prompt buffer).

  - `pending_alarm_claims()` is idempotent: an internal
    `_already_alarmed` set tracks claims this agent has emitted, so
    the loop calls it every step without flooding the network with
    duplicate alarms.

What changed in the dataclass schema:

  - `AlarmClaim.detected_at_turn` -> `emitted_at_step`. The word
    "turn" implies a global clock; "step" is a per-agent sequence
    number used only for log ordering — local, not networked.

  - `ClosedPhaseEntry.turn` and `CrisisPhaseEntry.turn` -> `step`.
    Same rename, same reasoning.

  - `Scenario.closed_phase_turns` and `Scenario.crisis_phase_turns`
    are gone. The scenario no longer prescribes how many turns; it
    just provides agents and lets the async loop run them out.

What changed in the CLI:

  - Phase 3 reports "drove to quiescence in N step(s)" with a
    breakdown of regular emissions / gossip transfers / alarm
    emissions, instead of "ran N turns".

  - `QuiescenceReport` (new dataclass) carries the run statistics
    back from `run_until_quiescent`/`run_closed_phase` — steps taken,
    emissions made, gossip transfers, alarm claims emitted, plus
    whether termination was via quiescence or max-step cap.

New regression tests (`test_async_quiescence.py`):

  - `test_run_until_quiescent_terminates`: the loop must exit.
  - `test_two_runs_produce_identical_final_state`: determinism check —
    if anything in the loop depended on real wall time, this would
    fail.
  - `test_max_steps_bound_caps_runtime`: setting max_steps=1 exits
    immediately and `QuiescenceReport.reached_quiescence` reflects
    reality.
  - `test_no_turn_argument_exposed_to_agents`: introspects
    `CrisisAgent.try_emit` signature; fails if anyone re-adds a
    `turn` parameter.
  - `test_no_turn_field_on_alarmclaim`: introspects the dataclass
    fields; fails if `detected_at_turn` reappears.
  - `test_alarms_propagate_through_async_loop_alone`: the loop alone
    (no manual emit_alarms / run_gossip_round) ratifies an alarm.
  - `test_quiescence_report_counts_match_logs`: sanity check that
    the report's emission count equals the crisis log length.

Suite: 163 -> 170 tests, all green in 0.79s.

Behavioral end-state is identical to the previous (synchronous)
version: same fact-check scenario, same byzantine equivocation, same
proof JSON shape, same three signers, same quorum-met outcome. The
difference is structural: the protocol now matches the paper's async
shape, and a future port to actual TCP gossip + concurrent agents
needs no change to this engine.

CrisisViz: still untouched. The `crisis_data.json` pipeline that
drives the visualizer is orthogonal.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-14 22:06:56 +02:00

101 lines
3.8 KiB
Python

"""Tests for decentralized mutation detection.
Each agent's `detect_mutations()` is called on its own graph. There is no
mothership-side scan.
"""
from crisis_agents.agent import MockAgent, MockByzantineAgent
from crisis_agents.alarm import LocalAlarm, detect_mutations_in_graph
from crisis_agents.claim import Claim
from crisis_agents.mothership import Mothership
def _claim(sid: str, verdict: str = "true", evidence: str = "ok") -> Claim:
return Claim(statement_id=sid, verdict=verdict, confidence=0.9, # type: ignore[arg-type]
evidence=evidence, timestamp_logical=0)
def _intro(name: str = "delta") -> Claim:
return Claim(statement_id=f"intro:{name}", verdict="unknown", confidence=1.0,
evidence=f"{name} joining the team", timestamp_logical=0)
def _post_gossip_team() -> Mothership:
"""3 honest + 1 byzantine; equivocation; one gossip round so every
honest agent has both variants in its own graph."""
m = Mothership()
m.add_agent(MockAgent("a", [[]]))
m.add_agent(MockAgent("b", [[]]))
m.add_agent(MockAgent("c", [[]]))
byz = MockByzantineAgent(
"d", _intro(),
scripted_pairs=[(
_claim("s03", verdict="true", evidence="to_ac"),
_claim("s03", verdict="false", evidence="to_b"),
)],
split_a={"a", "c"},
split_b={"b"},
)
m.open_boundary(byz)
m.run_until_quiescent()
return m
class TestDecentralizedDetection:
def test_no_alarms_in_honest_run(self):
m = Mothership()
m.add_agent(MockAgent("a", [[]]))
m.add_agent(MockAgent("b", [[]]))
joiner = MockByzantineAgent("d", _intro(), [], set(), set())
m.open_boundary(joiner)
m.run_until_quiescent()
# Every agent's own detection returns empty
for agent in m.agents.values():
assert agent.detect_mutations() == []
def test_each_honest_agent_detects_the_same_mutation(self):
"""The key decentralization property."""
m = _post_gossip_team()
for name in ("a", "b", "c"):
alarms = m.agents[name].detect_mutations()
assert len(alarms) == 1
assert alarms[0].statement_id == "s03"
assert alarms[0].detector_name == name
# Both honest detectors agree on the canonical witness pair
assert alarms[0].witness_digests[0] != alarms[0].witness_digests[1]
def test_byzantine_does_not_detect_its_own_equivocation(self):
"""An agent never accuses itself."""
m = _post_gossip_team()
# The byzantine ended up with both equivocating variants in its
# own graph (via gossip-back from honest peers). Its detect should
# still return empty because it skips its own process id.
d_alarms = m.agents["d"].detect_mutations()
assert d_alarms == []
def test_all_honest_detectors_produce_canonical_witness_pairs(self):
"""Three independent detectors must agree on the witness digest pair
(sorted hex) so their AlarmClaims can be voted together."""
m = _post_gossip_team()
pairs = set()
for name in ("a", "b", "c"):
local = m.agents[name].detect_mutations()
assert len(local) == 1
pairs.add(local[0].witness_digests)
assert len(pairs) == 1, "detectors disagree on the witness pair"
class TestDirectDetectionFunction:
"""The function detect_mutations_in_graph is the heart of detection;
test it directly on a constructed graph too."""
def test_returns_LocalAlarm_instances(self):
m = _post_gossip_team()
alarms = detect_mutations_in_graph(
m.agents["a"].graph,
detector_name="a",
detector_process_id=m.agents["a"].process_id,
)
assert all(isinstance(a, LocalAlarm) for a in alarms)