diff --git a/src/crisis_agents/live_agent.py b/src/crisis_agents/live_agent.py new file mode 100644 index 0000000..fc188a9 --- /dev/null +++ b/src/crisis_agents/live_agent.py @@ -0,0 +1,211 @@ +""" +live_agent.py — Real Claude sub-agent driven by the Anthropic API. + +LiveClaudeAgent makes a single Anthropic API call per `next_turn()` invocation, +asking Claude to fact-check the scenario's statements against the reference +document. The response is expected to be a JSON array of Claim-shaped objects; +we parse and validate. + +For the demo's byzantine slot we deliberately keep `MockByzantineAgent` even +in --live mode: the byzantine's behavior must be *reliably* equivocating to +make the demo legibly show the alarm. Asking an LLM to produce deterministic +equivocation requires multiple API calls per turn (one per peer subset) and +isn't worth the complexity for a PoC. The narrative is still honest: real +Claude agents adjudicate fact-checks; a misbehaving (mocked) peer joins; +Crisis catches the equivocation. + +Dependency: `anthropic` SDK. Install via `pip install -e ".[live]"`. +""" + +from __future__ import annotations + +import json +import os +import textwrap +from typing import Optional + +from crisis_agents.agent import AgentTurn, CrisisAgent +from crisis_agents.claim import Claim + + +# Default model — Haiku 4.5 is fast and cheap enough for this kind of +# structured-output adjudication. Override via --model. +DEFAULT_MODEL = "claude-haiku-4-5-20251001" + + +class LiveClaudeAgent(CrisisAgent): + """A CrisisAgent backed by a real Claude API invocation per turn. + + On each `next_turn()`: + 1. Render a structured prompt with the reference doc, the statements + to adjudicate, and the peer claims observed so far. + 2. Call the Anthropic Messages API. + 3. Parse the response as a JSON array of Claim objects. + 4. Wrap into AgentTurns and return. + + Errors during parsing fall back to emitting nothing for that turn — the + agent stays alive but contributes nothing this round, which is more + forgiving than crashing the whole demo. + """ + + def __init__(self, + name: str, + *, + reference_doc: str, + statements: list[dict], + model: str = DEFAULT_MODEL, + client=None, + system_prompt: Optional[str] = None): + """ + Args: + name: Stable agent name (drives the Crisis process_id). + reference_doc: Full reference document text passed in every prompt. + statements: [{"id": "s01", "text": "..."}] — what to adjudicate. + model: Anthropic model id. Default: claude-haiku-4-5. + client: Optional pre-built `anthropic.Anthropic()`. If None, + constructed lazily on first use (requires ANTHROPIC_API_KEY). + system_prompt: Optional override. If None, the default honest + fact-checking system prompt is used. + """ + super().__init__(name) + self._reference_doc = reference_doc + self._statements = statements + self._model = model + self._client = client + self._system_prompt = system_prompt or self._default_system_prompt() + self._invocations = 0 + self._already_adjudicated: set[str] = set() + + @staticmethod + def _default_system_prompt() -> str: + return textwrap.dedent(""" + You are one of several AI agents on a fact-checking team. You read a + reference document and adjudicate factual statements about it. You + answer honestly based on the reference doc alone — you do not invoke + outside knowledge. + + For every statement you have not yet adjudicated this run, you output + one JSON object with this exact schema: + { + "statement_id": "...", # the id of the statement + "verdict": "true" | "false" | "unknown", + "confidence": 0.0..1.0, + "evidence": "short justification grounded in the reference doc" + } + + You output a JSON array of these objects, nothing else — no prose + around it, no markdown fences, no preamble. Evidence must be at most + 280 characters. + """).strip() + + def _get_client(self): + """Lazy-import anthropic so the SDK isn't a hard dependency.""" + if self._client is not None: + return self._client + try: + import anthropic # type: ignore[import-not-found] + except ImportError as e: + raise RuntimeError( + "live mode requires the anthropic SDK: pip install -e \".[live]\"" + ) from e + if not os.environ.get("ANTHROPIC_API_KEY"): + raise RuntimeError( + "live mode requires ANTHROPIC_API_KEY in the environment" + ) + self._client = anthropic.Anthropic() + return self._client + + def next_turn(self, turn: int, received_claims: list[Claim]) -> list[AgentTurn]: + """Issue one API call, parse, return Claims as AgentTurns.""" + self._invocations += 1 + + # Which statements still need a verdict from me? + pending = [s for s in self._statements + if s["id"] not in self._already_adjudicated] + if not pending: + return [] + + user_message = self._render_user_message(pending, received_claims) + + client = self._get_client() + response = client.messages.create( + model=self._model, + max_tokens=2048, + system=self._system_prompt, + messages=[{"role": "user", "content": user_message}], + ) + + text = "".join( + block.text for block in response.content + if getattr(block, "type", None) == "text" + ) + claims = self._parse_response(text) + + out: list[AgentTurn] = [] + for c in claims: + self._already_adjudicated.add(c.statement_id) + out.append(AgentTurn(claim=c)) + return out + + def _render_user_message(self, pending_statements: list[dict], + received_claims: list[Claim]) -> str: + statements_block = "\n".join( + f" {s['id']}: {s['text']}" for s in pending_statements + ) + if received_claims: + peer_block = "\n".join( + f" - {c.statement_id}: peer claims {c.verdict!r} (conf {c.confidence:.2f}) — {c.evidence}" + for c in received_claims[-12:] + ) + else: + peer_block = " (no peer claims yet — you're going first)" + + return textwrap.dedent(f"""\ + === REFERENCE DOCUMENT === + {self._reference_doc} + + === STATEMENTS TO ADJUDICATE === + {statements_block} + + === CLAIMS FROM PEERS SO FAR === + {peer_block} + + Output your verdicts now as a JSON array. + """) + + def _parse_response(self, text: str) -> list[Claim]: + """Tolerantly extract the JSON array from the response.""" + text = text.strip() + + # Strip markdown fences if Claude added them despite instructions + if text.startswith("```"): + lines = text.splitlines() + if lines[0].startswith("```"): + lines = lines[1:] + if lines and lines[-1].startswith("```"): + lines = lines[:-1] + text = "\n".join(lines).strip() + + try: + data = json.loads(text) + except json.JSONDecodeError: + return [] + + if not isinstance(data, list): + return [] + + claims: list[Claim] = [] + for item in data: + if not isinstance(item, dict): + continue + try: + claims.append(Claim( + statement_id=str(item.get("statement_id", "")), + verdict=item.get("verdict", "unknown"), + confidence=float(item.get("confidence", 0.5)), + evidence=str(item.get("evidence", ""))[:Claim.EVIDENCE_MAX_LEN], + timestamp_logical=self._invocations - 1, + )) + except (ValueError, TypeError): + continue + return claims diff --git a/tests/test_live_agent.py b/tests/test_live_agent.py new file mode 100644 index 0000000..b43298b --- /dev/null +++ b/tests/test_live_agent.py @@ -0,0 +1,156 @@ +"""Tests for LiveClaudeAgent — uses a fake Anthropic client (no real API calls).""" + +from dataclasses import dataclass +from typing import Any + +import pytest + +from crisis_agents.claim import Claim +from crisis_agents.live_agent import LiveClaudeAgent + + +# --------------------------------------------------------------------------- +# Fakes — we never hit the real Anthropic API in CI. +# --------------------------------------------------------------------------- + +@dataclass +class _FakeContentBlock: + type: str + text: str + + +@dataclass +class _FakeResponse: + content: list[_FakeContentBlock] + + +class _FakeAnthropicClient: + """Stand-in for anthropic.Anthropic that returns whatever JSON we hand it.""" + + def __init__(self, scripted_responses: list[str]): + self._responses = list(scripted_responses) + self.calls: list[dict[str, Any]] = [] + + # The real SDK exposes .messages.create; mirror that. + outer = self + + class _MessagesProxy: + def create(self_inner, **kwargs): + outer.calls.append(kwargs) + text = outer._responses.pop(0) if outer._responses else "[]" + return _FakeResponse(content=[_FakeContentBlock("text", text)]) + + self.messages = _MessagesProxy() + + +# --------------------------------------------------------------------------- +# The statements + reference doc fixture +# --------------------------------------------------------------------------- + +_STATEMENTS = [ + {"id": "s01", "text": "Water boils at 100C at standard pressure."}, + {"id": "s02", "text": "Pluto is still classified as a planet by the IAU."}, +] +_REF = "Water boils at 100C. Pluto was reclassified to a dwarf planet in 2006." + + +class TestLiveClaudeAgent: + + def test_parses_clean_json_response(self): + response = ( + '[{"statement_id":"s01","verdict":"true","confidence":0.95,"evidence":"per ref"},' + ' {"statement_id":"s02","verdict":"false","confidence":0.9,"evidence":"per ref"}]' + ) + client = _FakeAnthropicClient([response]) + agent = LiveClaudeAgent( + "agent_alpha", reference_doc=_REF, + statements=_STATEMENTS, client=client, + ) + turns = agent.next_turn(turn=0, received_claims=[]) + assert len(turns) == 2 + assert {t.claim.statement_id for t in turns} == {"s01", "s02"} + verdicts = {t.claim.statement_id: t.claim.verdict for t in turns} + assert verdicts == {"s01": "true", "s02": "false"} + + def test_strips_markdown_fences(self): + """Claude sometimes wraps JSON in ```json fences despite instructions.""" + response = ( + "```json\n" + '[{"statement_id":"s01","verdict":"true","confidence":0.9,"evidence":"ok"}]\n' + "```\n" + ) + client = _FakeAnthropicClient([response]) + agent = LiveClaudeAgent( + "agent_alpha", reference_doc=_REF, + statements=_STATEMENTS, client=client, + ) + turns = agent.next_turn(turn=0, received_claims=[]) + assert len(turns) == 1 + assert turns[0].claim.statement_id == "s01" + + def test_returns_empty_on_malformed_response(self): + client = _FakeAnthropicClient(["not json at all"]) + agent = LiveClaudeAgent( + "agent_alpha", reference_doc=_REF, + statements=_STATEMENTS, client=client, + ) + turns = agent.next_turn(turn=0, received_claims=[]) + assert turns == [] + + def test_skips_invalid_claim_objects_in_response(self): + response = ( + '[{"statement_id":"s01","verdict":"true","confidence":0.9,"evidence":"ok"},' + ' "not a dict",' + ' {"statement_id":"s02","verdict":"bogus","confidence":0.5,"evidence":"x"}]' + ) + client = _FakeAnthropicClient([response]) + agent = LiveClaudeAgent( + "agent_alpha", reference_doc=_REF, + statements=_STATEMENTS, client=client, + ) + turns = agent.next_turn(turn=0, received_claims=[]) + # Only the first item passes validation: bogus verdict and non-dict get skipped. + assert len(turns) == 1 + assert turns[0].claim.statement_id == "s01" + + def test_already_adjudicated_statements_are_skipped(self): + response_1 = '[{"statement_id":"s01","verdict":"true","confidence":0.9,"evidence":"ok"}]' + response_2 = '[{"statement_id":"s02","verdict":"false","confidence":0.9,"evidence":"ok"}]' + client = _FakeAnthropicClient([response_1, response_2]) + agent = LiveClaudeAgent( + "agent_alpha", reference_doc=_REF, + statements=_STATEMENTS, client=client, + ) + # First call adjudicates s01 + first = agent.next_turn(turn=0, received_claims=[]) + assert {t.claim.statement_id for t in first} == {"s01"} + + # Second call should only ask about s02 (s01 is already done) + second = agent.next_turn(turn=1, received_claims=[]) + assert {t.claim.statement_id for t in second} == {"s02"} + + # The prompt sent for the second call should NOT mention s01 + second_call = client.calls[1] + user_msg = second_call["messages"][0]["content"] + assert "s02:" in user_msg + # s01 was previously adjudicated; it should not appear in the + # "STATEMENTS TO ADJUDICATE" block of the second prompt. + statements_section = user_msg.split("=== STATEMENTS TO ADJUDICATE ===")[1] + next_section_start = statements_section.find("===") + statements_only = statements_section[:next_section_start] + assert "s01:" not in statements_only + + def test_evidence_length_is_truncated(self): + long_evidence = "x" * 500 + response = ( + f'[{{"statement_id":"s01","verdict":"true","confidence":0.9,' + f'"evidence":"{long_evidence}"}}]' + ) + client = _FakeAnthropicClient([response]) + agent = LiveClaudeAgent( + "agent_alpha", reference_doc=_REF, + statements=_STATEMENTS, client=client, + ) + turns = agent.next_turn(turn=0, received_claims=[]) + assert len(turns) == 1 + assert len(turns[0].claim.evidence) == Claim.EVIDENCE_MAX_LEN