Harden forever watch recovery

This commit is contained in:
saymrwulf 2026-04-29 12:58:33 +02:00
parent 69bd3ea420
commit 5a47a0bdab
5 changed files with 356 additions and 30 deletions

View file

@ -23,6 +23,7 @@ from .guidance import build_operator_cockpit, get_operator_state
from .lifecycle import (
close_manual_position,
open_manual_position,
recover_stale_active_watch,
render_lifecycle_status,
render_manual_positions,
render_supervisor_plan,
@ -42,6 +43,7 @@ from .storage import (
save_proposal,
)
from .strategy import propose
from .watch_loop import run_watch_loop
def cmd_init_db(_: argparse.Namespace) -> int:
@ -108,21 +110,31 @@ def cmd_watch(args: argparse.Namespace) -> int:
print(f"experiment: {experiment.run_id}")
with connect() as conn:
status = "completed"
return_code = 0
try:
for index in range(args.cycles):
result = run_cycle(conn, config)
print(
f"cycle {index + 1}/{args.cycles}: "
f"{result.proposal.action} - {result.proposal.reason}"
)
if index + 1 < args.cycles:
time.sleep(args.interval_seconds)
except KeyboardInterrupt:
status = "interrupted"
summary = run_watch_loop(
conn,
config,
planned_cycles=args.cycles,
interval_seconds=args.interval_seconds,
on_cycle=lambda index, total, result: print(
f"cycle {index}/{total}: {result.proposal.action} - {result.proposal.reason}",
flush=True,
),
on_failure=lambda index, total, exc, consecutive: print(
f"cycle {index}/{total}: transient_error - {type(exc).__name__}: {exc} "
f"(consecutive failures: {consecutive}/3)",
flush=True,
),
sleep=time.sleep,
)
if summary.status == "interrupted":
return_code = 130
print("interrupted: writing partial experiment report before exit")
elif summary.failed_cycles:
print(
f"watch degraded: {summary.failed_cycles} failed cycle(s), "
f"{summary.successful_cycles} successful cycle(s); writing {summary.status} report"
)
report_path = finish_experiment(
conn,
experiment.run_id,
@ -130,7 +142,7 @@ def cmd_watch(args: argparse.Namespace) -> int:
args.cycles,
args.interval_seconds,
args.hypothesis,
status=status,
status=summary.status,
)
print(f"experiment_report: {report_path}")
return return_code
@ -156,6 +168,7 @@ def cmd_report(args: argparse.Namespace) -> int:
def cmd_next(_: argparse.Namespace) -> int:
with connect() as conn:
init_db(conn)
recover_stale_active_watch(conn)
print(build_operator_cockpit(conn))
return 0
@ -164,6 +177,7 @@ def cmd_app_state(_: argparse.Namespace) -> int:
config = load_config(None)
with connect() as conn:
init_db(conn)
recover_stale_active_watch(conn)
operator_state = get_operator_state(conn)
automation_plan = build_automation_plan(conn)
payload = {

View file

@ -3,17 +3,19 @@ from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
import json
import os
import time
from .config import AppConfig
from .experiments import finish_experiment, start_experiment
from .experiments import ACTIVE_WATCH, finish_experiment, start_experiment
from .guidance import POST_WATCH_COOLDOWN_MINUTES, build_operator_cockpit, get_operator_state
from .monitor import run_cycle
from .storage import connect, init_db
from .watch_loop import run_watch_loop
DEFAULT_WATCH_CYCLES = 24
DEFAULT_INTERVAL_SECONDS = 300
MAX_CONSECUTIVE_CYCLE_FAILURES = 3
@dataclass(frozen=True)
@ -134,11 +136,13 @@ def render_supervisor_plan() -> str:
def run_supervisor(config: AppConfig, *, once: bool = False) -> int:
with connect() as conn:
init_lifecycle_db(conn)
recover_stale_active_watch(conn)
_record_event(conn, "supervisor_started", {"once": once})
while True:
with connect() as conn:
init_lifecycle_db(conn)
recover_stale_active_watch(conn)
active_positions = list_manual_positions(conn, status="active")
if active_positions:
_handle_manual_exposure(conn, active_positions)
@ -188,6 +192,52 @@ def run_supervisor(config: AppConfig, *, once: bool = False) -> int:
return 0
def recover_stale_active_watch(conn) -> str | None:
init_lifecycle_db(conn)
payload = _read_active_watch_payload()
if payload is None:
return None
pid = payload.get("pid")
if isinstance(pid, int) and _pid_exists(pid):
return None
run_id = str(payload.get("run_id") or "recovered-watch")
started_utc = str(payload.get("started_utc") or datetime.now(UTC).isoformat(timespec="seconds"))
planned_cycles = _safe_int(payload.get("planned_cycles"), DEFAULT_WATCH_CYCLES)
interval_seconds = _safe_int(payload.get("interval_seconds"), DEFAULT_INTERVAL_SECONDS)
report_path = finish_experiment(
conn,
run_id,
started_utc,
planned_cycles,
interval_seconds,
"recovered stale watch after the monitor engine stopped before final bookkeeping",
status="recovered_after_crash",
)
next_action = datetime.now(UTC) + timedelta(minutes=POST_WATCH_COOLDOWN_MINUTES)
_write_state(
conn,
{
"phase": "cooldown",
"next_action_utc": next_action.isoformat(timespec="seconds"),
"last_run_id": run_id,
"message": "watch recovered after engine crash; partial report written; cooldown active",
},
)
_record_event(
conn,
"watch_recovered_after_crash",
{
"run_id": run_id,
"stale_pid": pid,
"report": report_path,
"next_action_utc": next_action.isoformat(timespec="seconds"),
},
)
return report_path
def _run_watch_stage(config: AppConfig) -> str:
experiment = start_experiment(
DEFAULT_WATCH_CYCLES,
@ -207,20 +257,31 @@ def _run_watch_stage(config: AppConfig) -> str:
)
_record_event(conn, "watch_started", {"run_id": experiment.run_id})
status = "completed"
try:
for index in range(DEFAULT_WATCH_CYCLES):
result = run_cycle(conn, config)
print(
f"cycle {index + 1}/{DEFAULT_WATCH_CYCLES}: "
f"{result.proposal.action} - {result.proposal.reason}",
flush=True,
)
if index + 1 < DEFAULT_WATCH_CYCLES:
time.sleep(DEFAULT_INTERVAL_SECONDS)
except KeyboardInterrupt:
status = "interrupted"
summary = run_watch_loop(
conn,
config,
planned_cycles=DEFAULT_WATCH_CYCLES,
interval_seconds=DEFAULT_INTERVAL_SECONDS,
max_consecutive_failures=MAX_CONSECUTIVE_CYCLE_FAILURES,
on_cycle=_print_cycle_result,
on_failure=lambda index, total, exc, consecutive: _record_cycle_failure(
conn,
experiment.run_id,
index,
total,
exc,
consecutive,
),
sleep=time.sleep,
)
if summary.status == "interrupted":
print("interrupted: writing partial experiment report before exit", flush=True)
elif summary.failed_cycles:
print(
f"watch degraded: {summary.failed_cycles} failed cycle(s), "
f"{summary.successful_cycles} successful cycle(s); writing {summary.status} report",
flush=True,
)
report_path = finish_experiment(
conn,
experiment.run_id,
@ -228,12 +289,58 @@ def _run_watch_stage(config: AppConfig) -> str:
DEFAULT_WATCH_CYCLES,
DEFAULT_INTERVAL_SECONDS,
"forever supervisor: bounded passive watch stage",
status=status,
status=summary.status,
)
_record_event(
conn,
"watch_report_written",
{
"run_id": experiment.run_id,
"report": report_path,
"status": summary.status,
"successful_cycles": summary.successful_cycles,
"failed_cycles": summary.failed_cycles,
"last_error": summary.last_error,
},
)
_record_event(conn, "watch_report_written", {"run_id": experiment.run_id, "report": report_path})
return experiment.run_id
def _print_cycle_result(index: int, total: int, result) -> None:
print(
f"cycle {index}/{total}: "
f"{result.proposal.action} - {result.proposal.reason}",
flush=True,
)
def _record_cycle_failure(
conn,
run_id: str,
index: int,
total: int,
exc: Exception,
consecutive_failures: int,
) -> None:
message = f"{type(exc).__name__}: {exc}"
print(
f"cycle {index}/{total}: transient_error - {message} "
f"(consecutive failures: {consecutive_failures}/{MAX_CONSECUTIVE_CYCLE_FAILURES})",
flush=True,
)
_record_event(
conn,
"watch_cycle_failed",
{
"run_id": run_id,
"cycle": index,
"planned_cycles": total,
"consecutive_failures": consecutive_failures,
"error": message,
},
)
def _sync_recent_watch_cooldown(conn) -> int:
operator_state = get_operator_state(conn)
completed = operator_state.completed_watch
@ -427,6 +534,31 @@ def _read_state(conn) -> dict[str, str]:
return {row[0]: row[1] for row in rows}
def _read_active_watch_payload() -> dict[str, object] | None:
try:
return json.loads(ACTIVE_WATCH.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError):
return None
def _safe_int(value: object, default: int) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
return default
return parsed if parsed > 0 else default
def _pid_exists(pid: int) -> bool:
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
return True
def _write_state(conn, values: dict[str, str]) -> None:
conn.execute("DELETE FROM lifecycle_state")
for key, value in values.items():

View file

@ -0,0 +1,85 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable
import time
from .config import AppConfig
from .monitor import CycleResult, run_cycle
@dataclass(frozen=True)
class WatchLoopSummary:
status: str
planned_cycles: int
successful_cycles: int
failed_cycles: int
stopped_early: bool
last_error: str | None
CycleCallback = Callable[[int, int, CycleResult], None]
FailureCallback = Callable[[int, int, Exception, int], None]
SleepCallback = Callable[[int], None]
def run_watch_loop(
conn,
config: AppConfig,
*,
planned_cycles: int,
interval_seconds: int,
max_consecutive_failures: int = 3,
on_cycle: CycleCallback | None = None,
on_failure: FailureCallback | None = None,
sleep: SleepCallback = time.sleep,
) -> WatchLoopSummary:
successful_cycles = 0
failed_cycles = 0
consecutive_failures = 0
last_error: str | None = None
status = "completed"
stopped_early = False
for index in range(planned_cycles):
try:
result = run_cycle(conn, config)
except KeyboardInterrupt:
status = "interrupted"
stopped_early = True
break
except Exception as exc:
failed_cycles += 1
consecutive_failures += 1
last_error = f"{type(exc).__name__}: {exc}"
status = "partial_failed" if successful_cycles else "failed"
if on_failure:
on_failure(index + 1, planned_cycles, exc, consecutive_failures)
if consecutive_failures >= max_consecutive_failures:
stopped_early = True
break
else:
successful_cycles += 1
consecutive_failures = 0
if on_cycle:
on_cycle(index + 1, planned_cycles, result)
if index + 1 < planned_cycles:
sleep(interval_seconds)
if status != "interrupted":
if failed_cycles and successful_cycles:
status = "partial_failed" if stopped_early else "partial"
elif failed_cycles:
status = "failed"
else:
status = "completed"
return WatchLoopSummary(
status=status,
planned_cycles=planned_cycles,
successful_cycles=successful_cycles,
failed_cycles=failed_cycles,
stopped_early=stopped_early,
last_error=last_error,
)

View file

@ -10,6 +10,7 @@ from braiins_ratchet.lifecycle import (
init_lifecycle_db,
list_manual_positions,
open_manual_position,
recover_stale_active_watch,
render_manual_positions,
render_lifecycle_status,
render_supervisor_plan,
@ -93,6 +94,32 @@ class LifecycleTests(unittest.TestCase):
self.assertEqual(status.next_action_utc, "2026-04-28T15:41:51+00:00")
self.assertIn("recent watch report", status.message)
def test_recover_stale_active_watch_writes_partial_report_and_cooldown(self) -> None:
conn = sqlite3.connect(":memory:")
init_lifecycle_db(conn)
active_watch = SimpleNamespace(
read_text=lambda encoding: (
'{"pid": 123456, "run_id": "run-crashed", '
'"started_utc": "2026-04-29T08:48:06+00:00", '
'"planned_cycles": 24, "interval_seconds": 300}'
)
)
with (
patch("braiins_ratchet.lifecycle.ACTIVE_WATCH", active_watch),
patch("braiins_ratchet.lifecycle._pid_exists", return_value=False),
patch("braiins_ratchet.lifecycle.finish_experiment", return_value="reports/run-crashed.md") as finish,
):
report = recover_stale_active_watch(conn)
status = get_lifecycle_status(conn)
self.assertEqual(report, "reports/run-crashed.md")
self.assertEqual(status.phase, "cooldown")
self.assertEqual(status.last_run_id, "run-crashed")
self.assertIn("watch recovered after engine crash", status.message)
finish.assert_called_once()
self.assertEqual(finish.call_args.kwargs["status"], "recovered_after_crash")
if __name__ == "__main__":
unittest.main()

68
tests/test_watch_loop.py Normal file
View file

@ -0,0 +1,68 @@
import sqlite3
import unittest
from types import SimpleNamespace
from unittest.mock import patch
from braiins_ratchet.watch_loop import run_watch_loop
class WatchLoopTests(unittest.TestCase):
def test_single_network_failure_creates_partial_report_not_crash(self) -> None:
conn = sqlite3.connect(":memory:")
result = SimpleNamespace(proposal=SimpleNamespace(action="manual_canary", reason="ok"))
failures: list[str] = []
with patch("braiins_ratchet.watch_loop.run_cycle", side_effect=[result, RuntimeError("HTTP 520")]):
summary = run_watch_loop(
conn,
SimpleNamespace(),
planned_cycles=2,
interval_seconds=1,
on_failure=lambda *_args: failures.append("failed"),
sleep=lambda _seconds: None,
)
self.assertEqual(summary.status, "partial")
self.assertEqual(summary.successful_cycles, 1)
self.assertEqual(summary.failed_cycles, 1)
self.assertFalse(summary.stopped_early)
self.assertEqual(failures, ["failed"])
def test_consecutive_failures_stop_early(self) -> None:
conn = sqlite3.connect(":memory:")
with patch("braiins_ratchet.watch_loop.run_cycle", side_effect=RuntimeError("offline")):
summary = run_watch_loop(
conn,
SimpleNamespace(),
planned_cycles=24,
interval_seconds=1,
max_consecutive_failures=3,
sleep=lambda _seconds: None,
)
self.assertEqual(summary.status, "failed")
self.assertEqual(summary.successful_cycles, 0)
self.assertEqual(summary.failed_cycles, 3)
self.assertTrue(summary.stopped_early)
def test_keyboard_interrupt_returns_interrupted_summary(self) -> None:
conn = sqlite3.connect(":memory:")
with patch("braiins_ratchet.watch_loop.run_cycle", side_effect=KeyboardInterrupt):
summary = run_watch_loop(
conn,
SimpleNamespace(),
planned_cycles=24,
interval_seconds=1,
sleep=lambda _seconds: None,
)
self.assertEqual(summary.status, "interrupted")
self.assertEqual(summary.successful_cycles, 0)
self.assertEqual(summary.failed_cycles, 0)
self.assertTrue(summary.stopped_early)
if __name__ == "__main__":
unittest.main()