Add controlled automation pipeline

This commit is contained in:
saymrwulf 2026-04-27 18:12:47 +02:00
parent 70beecd615
commit 27f27992c3
10 changed files with 478 additions and 46 deletions

View file

@ -21,6 +21,14 @@ The first implementation is deliberately conservative:
`./scripts/ratchet` is the cockpit. It tells you exactly what to do next.
If the cockpit is in cooldown and you want the app to wait until the earliest next action, run:
```bash
./scripts/ratchet pipeline
```
It prints the exact monitor-only plan and asks `yes/no` before doing anything.
For a 6-hour monitoring session:
```bash

View file

@ -48,6 +48,31 @@ Post-watch cooldown means:
3. The run report is the evidence artifact.
4. The next planned touch is a later fresh sample, usually `./scripts/ratchet once`.
During cooldown, the cockpit shows:
1. A progress bar.
2. The earliest next action time.
3. The remaining minutes.
## Controlled Automation
If you do not want to babysit the cooldown manually, run:
```bash
./scripts/ratchet pipeline
```
The pipeline first prints a proposal like:
```text
I am going to: wait until this time, run one fresh sample, print the cockpit, then stop.
Are you OK with this? Type yes or no.
```
It only runs after you type `yes`.
It is still monitor-only. It never places, changes, or cancels Braiins orders.
## Research Pathway
The cockpit has two different time horizons:

View file

@ -29,6 +29,23 @@ Prints the cockpit: current state, exact next operator action, interpretation, a
PYTHONPATH=src ./.venv/bin/python -m braiins_ratchet.cli next
```
## `pipeline`
Prints a monitor-only automation proposal and asks for `yes` or `no`.
Example behavior during post-watch cooldown:
```text
I am going to: Wait for post-watch cooldown, then refresh once.
Are you OK with this? Type yes or no.
```
If approved, it waits until the earliest next action time, runs one fresh monitor cycle, prints the cockpit, and stops.
```bash
PYTHONPATH=src ./.venv/bin/python -m braiins_ratchet.cli pipeline
```
## `init-db`
Creates `data/ratchet.sqlite` if it does not exist.

View file

@ -50,6 +50,14 @@ The ledger is the main artifact. It says what was tested, how long it ran, what
After a completed watch, the cockpit should not immediately recommend another identical watch. It enters post-watch cooldown, which means the current stage is complete and the next useful operator touch is a later fresh sample.
During cooldown, the cockpit prints a progress bar, remaining minutes, and the earliest next action time. To let the app wait and perform the next monitor-only refresh after explicit approval, run:
```bash
./scripts/ratchet pipeline
```
The pipeline prints its plan and asks for `yes` or `no` before doing anything. It never places Braiins orders.
If a run already happened before automatic bookkeeping was available, reconstruct it from the stored SQLite snapshots:
```bash

View file

@ -13,6 +13,7 @@ Commands:
setup Create the local .venv and initialize the local database.
once Fetch one fresh sample, then print the cockpit.
watch [hours] Run repeated monitor cycles for N hours. Default: 6.
pipeline Propose timed automation, then ask yes/no.
report Print the latest stored report without fetching new data.
experiments Print the Karpathy-style experiment ledger.
retro SINCE [UNTIL] Write a retroactive report from stored snapshots.
@ -26,6 +27,7 @@ Examples:
./scripts/ratchet setup
./scripts/ratchet once
./scripts/ratchet watch 6
./scripts/ratchet pipeline
./scripts/ratchet report
./scripts/ratchet experiments
./scripts/ratchet retro 2026-04-25T19:08:00+00:00 2026-04-25T21:05:00+00:00
@ -105,6 +107,10 @@ cmd_report() {
run_python -m braiins_ratchet.cli report
}
cmd_pipeline() {
run_python -m braiins_ratchet.cli pipeline "$@"
}
cmd_experiments() {
run_python -m braiins_ratchet.cli experiments
}
@ -145,6 +151,7 @@ main() {
setup) cmd_setup "$@" ;;
once) cmd_once "$@" ;;
watch) cmd_watch "$@" ;;
pipeline|auto) cmd_pipeline "$@" ;;
report) cmd_report "$@" ;;
experiments) cmd_experiments "$@" ;;
retro) cmd_retro "$@" ;;

View file

@ -0,0 +1,125 @@
from __future__ import annotations
from dataclasses import dataclass
from .guidance import OperatorState, get_operator_state
@dataclass(frozen=True)
class AutomationPlan:
kind: str
title: str
steps: list[str]
wait_seconds: int = 0
@property
def needs_confirmation(self) -> bool:
return self.kind not in {"no_action", "external_wait"}
def build_automation_plan(conn) -> AutomationPlan:
state = get_operator_state(conn)
return build_automation_plan_from_state(state)
def build_automation_plan_from_state(state: OperatorState) -> AutomationPlan:
if state.active_watch:
return AutomationPlan(
kind="external_wait",
title="No new automation will start because a watch is already running.",
steps=[
"Wait for the running watch terminal to finish.",
"After it finishes, run ./scripts/ratchet or ./scripts/ratchet pipeline again.",
],
)
if state.completed_watch and state.action == "manual_canary":
return AutomationPlan(
kind="wait_then_once",
title="Wait for post-watch cooldown, then refresh once.",
wait_seconds=state.completed_watch.remaining_minutes * 60,
steps=[
f"Wait until {state.completed_watch.earliest_action_local}.",
"Run one fresh monitor cycle.",
"Print the cockpit.",
"Stop. It will not start another watch automatically.",
],
)
if not state.has_ocean or not state.has_market:
return AutomationPlan(
kind="once_now",
title="Collect the first fresh sample.",
steps=[
"Run one fresh monitor cycle.",
"Print the cockpit.",
"Stop.",
],
)
if not state.is_fresh:
return AutomationPlan(
kind="once_now",
title="Refresh stale market data.",
steps=[
"Run one fresh monitor cycle.",
"Print the cockpit.",
"Stop.",
],
)
if state.action == "manual_canary":
return AutomationPlan(
kind="watch_2h",
title="Run one bounded passive watch.",
steps=[
"Run a 2-hour watch.",
"Collect one public/OCEAN sample every 5 minutes.",
"Write the experiment ledger and run report.",
"Print the cockpit.",
"Stop. It will not place orders.",
],
)
if state.action == "manual_bid":
return AutomationPlan(
kind="report_only",
title="Show the full report for manual review.",
steps=[
"Print the full report.",
"Stop. Any Braiins action remains manual.",
],
)
return AutomationPlan(
kind="no_action",
title="No automation is useful right now.",
steps=[
"Do not bid.",
"Do not start a watch automatically.",
],
)
def render_automation_plan(plan: AutomationPlan) -> str:
lines = [
"Automation Proposal",
"",
f"I am going to: {plan.title}",
"",
"Planned steps:",
]
lines.extend(f" {index}. {step}" for index, step in enumerate(plan.steps, start=1))
lines.extend(
[
"",
"Safety:",
" This pipeline is monitor-only.",
" It never places, changes, or cancels Braiins orders.",
]
)
if plan.needs_confirmation:
lines.extend(["", "Are you OK with this? Type yes or no."])
else:
lines.extend(["", "No confirmation needed because no automated action will run."])
return "\n".join(lines)

View file

@ -6,6 +6,7 @@ from pathlib import Path
import sys
import time
from .automation import build_automation_plan, render_automation_plan
from .braiins import BraiinsPublicClient, market_snapshot_from_json_file
from .config import load_config
from .experiments import (
@ -147,6 +148,50 @@ def cmd_next(_: argparse.Namespace) -> int:
return 0
def cmd_pipeline(args: argparse.Namespace) -> int:
config = load_config(Path(args.config) if args.config else None)
with connect() as conn:
init_db(conn)
plan = build_automation_plan(conn)
print(render_automation_plan(plan))
if not plan.needs_confirmation:
return 0
if not args.yes:
answer = input("> ").strip().lower()
if answer not in {"y", "yes"}:
print("Automation cancelled. No action was taken.")
return 0
if plan.kind == "wait_then_once":
_wait_with_progress(plan.wait_seconds)
_run_one_fresh_cycle(config)
_print_cockpit()
return 0
if plan.kind == "once_now":
_run_one_fresh_cycle(config)
_print_cockpit()
return 0
if plan.kind == "watch_2h":
result = cmd_watch(
argparse.Namespace(
config=args.config,
cycles=24,
interval_seconds=300,
hypothesis="automation pipeline: bounded passive watch",
)
)
_print_cockpit()
return result
if plan.kind == "report_only":
with connect() as conn:
init_db(conn)
print(build_text_report(conn))
return 0
print("Automation plan had no executable action.")
return 0
def cmd_experiments(_: argparse.Namespace) -> int:
if not EXPERIMENT_LOG.exists():
print("No experiment log yet. Run ./scripts/ratchet watch 2.")
@ -208,6 +253,29 @@ def _proposal_json(proposal: object) -> str:
return json.dumps(proposal, default=default, indent=2)
def _run_one_fresh_cycle(config: object) -> None:
with connect() as conn:
run_cycle(conn, config)
def _print_cockpit() -> None:
with connect() as conn:
init_db(conn)
print(build_operator_cockpit(conn))
def _wait_with_progress(wait_seconds: int) -> None:
remaining = max(0, wait_seconds)
if remaining == 0:
return
print(f"Waiting {remaining // 60} minute(s) before the next allowed action.")
while remaining > 0:
sleep_for = min(60, remaining)
time.sleep(sleep_for)
remaining -= sleep_for
print(f"Timer: {remaining // 60} minute(s) remaining.")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="braiins-ratchet")
sub = parser.add_subparsers(required=True)
@ -255,6 +323,11 @@ def build_parser() -> argparse.ArgumentParser:
next_step = sub.add_parser("next", help="print exactly what the operator should do next")
next_step.set_defaults(func=cmd_next)
pipeline = sub.add_parser("pipeline", help="propose and confirm the next automation step")
pipeline.add_argument("--config")
pipeline.add_argument("--yes", action="store_true", help="accept the printed plan without prompting")
pipeline.set_defaults(func=cmd_pipeline)
experiments = sub.add_parser("experiments", help="print the Karpathy-style experiment log")
experiments.set_defaults(func=cmd_experiments)

View file

@ -1,5 +1,6 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime
import json
import os
@ -12,59 +13,100 @@ from .storage import latest_market_snapshot, latest_ocean_snapshot, latest_propo
POST_WATCH_COOLDOWN_MINUTES = 360
def build_operator_cockpit(conn) -> str:
@dataclass(frozen=True)
class CompletedWatch:
report_path: str
age_minutes: int
remaining_minutes: int
cooldown_minutes: int
earliest_action_utc: str
earliest_action_local: str
@dataclass(frozen=True)
class OperatorState:
has_ocean: bool
has_market: bool
action: str | None
active_watch: str | None
completed_watch: CompletedWatch | None
is_fresh: bool
freshness_minutes: int | None
latest_report: str | None
running_runs: list[str]
latest_ocean_timestamp: str | None
latest_market_timestamp: str | None
def get_operator_state(conn) -> OperatorState:
ocean = latest_ocean_snapshot(conn)
market = latest_market_snapshot(conn)
proposal = latest_proposal(conn)
latest_report = _latest_report()
running_runs = _running_runs()
active_watch = _active_watch()
completed_watch = _recent_completed_watch(latest_report, market.timestamp_utc if market else None)
freshness = _freshness_minutes(market.timestamp_utc if market else None)
is_fresh = freshness is not None and freshness <= 30
return OperatorState(
has_ocean=ocean is not None,
has_market=market is not None,
action=proposal.action if proposal else None,
active_watch=_active_watch(),
completed_watch=_recent_completed_watch(latest_report, market.timestamp_utc if market else None),
is_fresh=freshness is not None and freshness <= 30,
freshness_minutes=freshness,
latest_report=latest_report,
running_runs=_running_runs(),
latest_ocean_timestamp=ocean.timestamp_utc if ocean else None,
latest_market_timestamp=market.timestamp_utc if market else None,
)
def build_operator_cockpit(conn) -> str:
state = get_operator_state(conn)
lines = [
"Braiins Ratchet Cockpit",
"",
"Situation",
f" Database: {'ready' if ocean or market or proposal else 'empty'}",
f" Latest OCEAN sample: {ocean.timestamp_utc if ocean else 'none'}",
f" Latest Braiins sample: {market.timestamp_utc if market else 'none'}",
f" Braiins sample freshness: {_freshness_text(freshness)}",
f" Latest strategy action: {proposal.action if proposal else 'none'}",
f" Latest run report: {latest_report or 'none yet'}",
f" Database: {'ready' if state.has_ocean or state.has_market or state.action else 'empty'}",
f" Latest OCEAN sample: {state.latest_ocean_timestamp or 'none'}",
f" Latest Braiins sample: {state.latest_market_timestamp or 'none'}",
f" Braiins sample freshness: {_freshness_text(state.freshness_minutes)}",
f" Latest strategy action: {state.action or 'none'}",
f" Latest run report: {state.latest_report or 'none yet'}",
f" Experiment ledger: {EXPERIMENT_LOG.relative_to(REPORTS_DIR.parent) if EXPERIMENT_LOG.exists() else 'none yet'}",
f" Active watch: {active_watch or 'none detected'}",
f" Research stage: {_research_stage(active_watch, completed_watch)}",
f" Active watch: {state.active_watch or 'none detected'}",
f" Research stage: {_research_stage(state.active_watch, state.completed_watch)}",
]
if running_runs:
lines.append(f" Ledger has unfinished run markers: {', '.join(running_runs)}")
if state.completed_watch:
lines.extend(_cooldown_status_lines(state.completed_watch))
if state.running_runs:
lines.append(f" Ledger has unfinished run markers: {', '.join(state.running_runs)}")
lines.extend(["", "DO THIS NOW"])
lines.extend(
_do_this_now(
active_watch=active_watch,
completed_watch=completed_watch,
has_ocean=ocean is not None,
has_market=market is not None,
is_fresh=is_fresh,
action=proposal.action if proposal else None,
active_watch=state.active_watch,
completed_watch=state.completed_watch,
has_ocean=state.has_ocean,
has_market=state.has_market,
is_fresh=state.is_fresh,
action=state.action,
)
)
lines.extend(["", "Ratchet Pathway Forecast"])
lines.extend(
_pathway_forecast(
active_watch=active_watch,
completed_watch=completed_watch,
has_ocean=ocean is not None,
has_market=market is not None,
is_fresh=is_fresh,
action=proposal.action if proposal else None,
active_watch=state.active_watch,
completed_watch=state.completed_watch,
has_ocean=state.has_ocean,
has_market=state.has_market,
is_fresh=state.is_fresh,
action=state.action,
)
)
lines.extend(["", "How To Interpret The Current Action"])
lines.extend(_action_explanation(proposal.action if proposal else None))
lines.extend(_action_explanation(state.action))
lines.extend(["", "Ratchet Rule"])
lines.extend(
[
@ -80,6 +122,7 @@ def build_operator_cockpit(conn) -> str:
" ./scripts/ratchet next # read this cockpit",
" ./scripts/ratchet once # fetch one fresh sample and report",
" ./scripts/ratchet watch 2 # run a bounded 2-hour experiment",
" ./scripts/ratchet pipeline # propose automation, then ask yes/no",
" ./scripts/ratchet experiments # read the experiment ledger",
" ./scripts/ratchet report # read the latest raw human report",
]
@ -89,7 +132,7 @@ def build_operator_cockpit(conn) -> str:
def _do_this_now(
active_watch: str | None,
completed_watch: tuple[str, int] | None,
completed_watch: CompletedWatch | None,
has_ocean: bool,
has_market: bool,
is_fresh: bool,
@ -105,14 +148,14 @@ def _do_this_now(
]
if completed_watch and action == "manual_canary":
report_path, age_minutes = completed_watch
remaining = max(0, POST_WATCH_COOLDOWN_MINUTES - age_minutes)
return [
" STOP.",
f" The latest 2-hour watch already finished {age_minutes} minutes ago.",
f" Report written: {report_path}",
f" The latest 2-hour watch already finished {completed_watch.age_minutes} minutes ago.",
f" Report written: {completed_watch.report_path}",
" Do not start another identical watch now.",
f" Next planned operator touch: after about {remaining} minutes, run exactly:",
f" Earliest next action: {completed_watch.earliest_action_local}",
f" Time remaining: {completed_watch.remaining_minutes} minutes.",
" At or after that time, run exactly:",
" ./scripts/ratchet once",
" Reason: this ratchet stage is complete; repeating it immediately would be loop-chasing, not research.",
]
@ -160,7 +203,7 @@ def _do_this_now(
def _pathway_forecast(
active_watch: str | None,
completed_watch: tuple[str, int] | None,
completed_watch: CompletedWatch | None,
has_ocean: bool,
has_market: bool,
is_fresh: bool,
@ -175,10 +218,9 @@ def _pathway_forecast(
]
if completed_watch and action == "manual_canary":
report_path, _age_minutes = completed_watch
return [
" Planning probabilities are workflow estimates, not profit probabilities.",
f" Immediate, certain: stop this stage and keep {report_path} as the evidence artifact.",
f" Immediate, certain: stop this stage and keep {completed_watch.report_path} as the evidence artifact.",
" Midterm, likely: after cooldown, refresh with one once command and compare against this report.",
" Longterm, possible: adjust exactly one knob only if repeated reports show the same pattern.",
]
@ -258,7 +300,7 @@ def _latest_report() -> str | None:
return str(reports[0].relative_to(REPORTS_DIR.parent))
def _recent_completed_watch(latest_report: str | None, latest_market_timestamp: str | None) -> tuple[str, int] | None:
def _recent_completed_watch(latest_report: str | None, latest_market_timestamp: str | None) -> CompletedWatch | None:
if latest_report is None:
return None
report_path = REPORTS_DIR.parent / latest_report
@ -271,19 +313,50 @@ def _recent_completed_watch(latest_report: str | None, latest_market_timestamp:
age_minutes = max(0, int(age_seconds // 60))
if age_minutes > POST_WATCH_COOLDOWN_MINUTES:
return None
return latest_report, age_minutes
remaining_minutes = max(0, POST_WATCH_COOLDOWN_MINUTES - age_minutes)
earliest_action = datetime.fromtimestamp(report_path.stat().st_mtime, UTC)
earliest_action = earliest_action.replace(microsecond=0)
earliest_action = earliest_action.timestamp() + (POST_WATCH_COOLDOWN_MINUTES * 60)
earliest_action_utc = datetime.fromtimestamp(earliest_action, UTC)
earliest_action_local = earliest_action_utc.astimezone()
return CompletedWatch(
report_path=latest_report,
age_minutes=age_minutes,
remaining_minutes=remaining_minutes,
cooldown_minutes=POST_WATCH_COOLDOWN_MINUTES,
earliest_action_utc=earliest_action_utc.isoformat(timespec="seconds"),
earliest_action_local=earliest_action_local.isoformat(timespec="seconds"),
)
def _research_stage(active_watch: str | None, completed_watch: tuple[str, int] | None) -> str:
def _research_stage(active_watch: str | None, completed_watch: CompletedWatch | None) -> str:
if active_watch:
return "watch running"
if completed_watch:
report_path, age_minutes = completed_watch
remaining = max(0, POST_WATCH_COOLDOWN_MINUTES - age_minutes)
return f"post-watch cooldown ({remaining} minutes left, report {report_path})"
return (
"post-watch cooldown "
f"({completed_watch.remaining_minutes} minutes left, report {completed_watch.report_path})"
)
return "ready"
def _cooldown_status_lines(completed_watch: CompletedWatch) -> list[str]:
elapsed = min(completed_watch.cooldown_minutes, completed_watch.age_minutes)
percent = int((elapsed / completed_watch.cooldown_minutes) * 100)
return [
f" Cooldown progress: {_progress_bar(elapsed, completed_watch.cooldown_minutes)} {percent}%",
f" Earliest next action: {completed_watch.earliest_action_local}",
f" Cooldown remaining: {completed_watch.remaining_minutes} minutes",
]
def _progress_bar(elapsed: int, total: int, width: int = 20) -> str:
if total <= 0:
return "[" + ("#" * width) + "]"
filled = min(width, max(0, int(round((elapsed / total) * width))))
return "[" + ("#" * filled) + ("-" * (width - filled)) + "]"
def _running_runs() -> list[str]:
if not EXPERIMENT_LOG.exists():
return []

70
tests/test_automation.py Normal file
View file

@ -0,0 +1,70 @@
import unittest
from braiins_ratchet.automation import build_automation_plan_from_state, render_automation_plan
from braiins_ratchet.guidance import CompletedWatch, OperatorState
class AutomationTests(unittest.TestCase):
def test_completed_watch_plan_waits_then_refreshes_once(self) -> None:
plan = build_automation_plan_from_state(
_state(
action="manual_canary",
completed_watch=_completed_watch(remaining_minutes=42),
)
)
self.assertEqual(plan.kind, "wait_then_once")
self.assertEqual(plan.wait_seconds, 42 * 60)
rendered = render_automation_plan(plan)
self.assertIn("Wait until 2026-04-28T00:00:00+02:00.", rendered)
self.assertIn("Are you OK with this? Type yes or no.", rendered)
def test_manual_canary_plan_runs_one_bounded_watch(self) -> None:
plan = build_automation_plan_from_state(_state(action="manual_canary"))
self.assertEqual(plan.kind, "watch_2h")
rendered = render_automation_plan(plan)
self.assertIn("Run a 2-hour watch.", rendered)
self.assertIn("It never places", rendered)
def test_active_watch_plan_does_not_prompt(self) -> None:
plan = build_automation_plan_from_state(_state(active_watch="run pid=1"))
self.assertEqual(plan.kind, "external_wait")
self.assertFalse(plan.needs_confirmation)
def _state(
*,
action: str | None = None,
active_watch: str | None = None,
completed_watch: CompletedWatch | None = None,
) -> OperatorState:
return OperatorState(
has_ocean=True,
has_market=True,
action=action,
active_watch=active_watch,
completed_watch=completed_watch,
is_fresh=True,
freshness_minutes=0,
latest_report="reports/run-example.md",
running_runs=[],
latest_ocean_timestamp="2026-04-27T12:00:00+00:00",
latest_market_timestamp="2026-04-27T12:00:00+00:00",
)
def _completed_watch(remaining_minutes: int) -> CompletedWatch:
return CompletedWatch(
report_path="reports/run-example.md",
age_minutes=360 - remaining_minutes,
remaining_minutes=remaining_minutes,
cooldown_minutes=360,
earliest_action_utc="2026-04-27T22:00:00+00:00",
earliest_action_local="2026-04-28T00:00:00+02:00",
)
if __name__ == "__main__":
unittest.main()

View file

@ -3,7 +3,13 @@ from datetime import UTC, datetime
import sqlite3
import unittest
from braiins_ratchet.guidance import _do_this_now, _pathway_forecast, build_operator_cockpit
from braiins_ratchet.guidance import (
CompletedWatch,
_cooldown_status_lines,
_do_this_now,
_pathway_forecast,
build_operator_cockpit,
)
from braiins_ratchet.models import CandidateOrder, MarketSnapshot, OceanSnapshot, StrategyProposal
from braiins_ratchet.storage import init_db, save_market_snapshot, save_ocean_snapshot, save_proposal
@ -83,7 +89,7 @@ class GuidanceTests(unittest.TestCase):
def test_recent_completed_watch_stops_identical_watch_loop(self) -> None:
lines = _do_this_now(
active_watch=None,
completed_watch=("reports/run-example.md", 4),
completed_watch=_completed_watch(age_minutes=4),
has_ocean=True,
has_market=True,
is_fresh=True,
@ -99,7 +105,7 @@ class GuidanceTests(unittest.TestCase):
def test_recent_completed_watch_forecast_enters_cooldown(self) -> None:
lines = _pathway_forecast(
active_watch=None,
completed_watch=("reports/run-example.md", 4),
completed_watch=_completed_watch(age_minutes=4),
has_ocean=True,
has_market=True,
is_fresh=True,
@ -111,6 +117,26 @@ class GuidanceTests(unittest.TestCase):
self.assertIn("Immediate, certain: stop this stage", text)
self.assertIn("Midterm, likely: after cooldown", text)
def test_cooldown_status_includes_timer_and_progress_bar(self) -> None:
lines = _cooldown_status_lines(_completed_watch(age_minutes=90))
text = "\n".join(lines)
self.assertIn("Cooldown progress: [#####---------------] 25%", text)
self.assertIn("Earliest next action: 2026-04-28T00:00:00+02:00", text)
self.assertIn("Cooldown remaining: 270 minutes", text)
def _completed_watch(age_minutes: int) -> CompletedWatch:
return CompletedWatch(
report_path="reports/run-example.md",
age_minutes=age_minutes,
remaining_minutes=360 - age_minutes,
cooldown_minutes=360,
earliest_action_utc="2026-04-27T22:00:00+00:00",
earliest_action_local="2026-04-28T00:00:00+02:00",
)
if __name__ == "__main__":
unittest.main()