Track manual Braiins exposure

This commit is contained in:
saymrwulf 2026-04-27 18:45:34 +02:00
parent d8e8113c59
commit dae66daa60
14 changed files with 536 additions and 8 deletions

View file

@ -37,6 +37,18 @@ For the durable forever lifecycle supervisor:
It persists lifecycle state in `data/ratchet.sqlite`. If the process crashes or the Mac reboots, start the same command again and it resumes from SQLite.
When you manually place a Braiins bid, record the exposure so the supervisor blocks new experiments:
```bash
./scripts/ratchet position open --description "Braiins order abc" --maturity-hours 72
```
Close it only when finished:
```bash
./scripts/ratchet position close POSITION_ID
```
For the native macOS SwiftUI shell:
```bash

View file

@ -98,6 +98,32 @@ Use this to inspect persisted state without starting the loop:
./scripts/ratchet supervise --status
```
## Manual Braiins Exposure
If you manually start a Braiins bid, record it immediately:
```bash
./scripts/ratchet position open --description "Braiins order abc, 0.0001 BTC, 3h canary" --maturity-hours 72
```
While a manual position is active:
1. The cockpit says `HOLD`.
2. The supervisor blocks new watch experiments.
3. Restarting the app keeps the manual exposure state.
List positions:
```bash
./scripts/ratchet position list
```
When the Braiins/OCEAN exposure is truly finished:
```bash
./scripts/ratchet position close POSITION_ID
```
## Native Mac App
The native SwiftUI shell is in:
@ -115,6 +141,8 @@ swift run BraiinsRatchetMac
The app is a native cockpit over the same durable Python lifecycle engine.
The app includes controls to record and close manual exposure, but the same rule applies: it never places Braiins orders.
## Research Pathway
The cockpit has two different time horizons:

View file

@ -62,6 +62,28 @@ PYTHONPATH=src ./.venv/bin/python -m braiins_ratchet.cli supervise --status
Crash/reboot contract: start `./scripts/ratchet supervise` again and it resumes from persisted SQLite lifecycle state.
## `position`
Records manually executed Braiins exposure so the lifecycle supervisor can hold state across days/weeks.
Open a manual position:
```bash
PYTHONPATH=src ./.venv/bin/python -m braiins_ratchet.cli position open --description "Braiins order abc" --maturity-hours 72
```
List positions:
```bash
PYTHONPATH=src ./.venv/bin/python -m braiins_ratchet.cli position list
```
Close a finished position:
```bash
PYTHONPATH=src ./.venv/bin/python -m braiins_ratchet.cli position close 1
```
## `init-db`
Creates `data/ratchet.sqlite` if it does not exist.

View file

@ -76,6 +76,22 @@ Check the persisted lifecycle state:
This is still monitor-only. Manual Braiins bids remain outside the app unless separately recorded.
## Manual Exposure Tracking
When you manually place a Braiins order, record it:
```bash
./scripts/ratchet position open --description "Braiins order abc" --maturity-hours 72
```
The supervisor then blocks new experiments while the position is active. This is the stateful bridge for real-money operations that can run for days or weeks.
Close it only when the exposure is truly finished:
```bash
./scripts/ratchet position close POSITION_ID
```
If a run already happened before automatic bookkeeping was available, reconstruct it from the stored SQLite snapshots:
```bash

View file

@ -16,6 +16,7 @@ swift run BraiinsRatchetMac
- Native macOS SwiftUI cockpit.
- Liquid-glass-inspired material panels.
- Buttons for cockpit, lifecycle status, automation proposal, and full report.
- Manual exposure recording and closing controls.
- Monitor-only. It never places Braiins orders.
## Product Direction

View file

@ -14,6 +14,9 @@ struct BraiinsRatchetApp: App {
struct ContentView: View {
@State private var output = "Press Refresh Cockpit."
@State private var isRunning = false
@State private var manualDescription = ""
@State private var maturityHours = "72"
@State private var closePositionId = ""
var body: some View {
ZStack {
@ -31,6 +34,7 @@ struct ContentView: View {
VStack(alignment: .leading, spacing: 22) {
header
controls
manualExposureControls
outputPanel
}
.padding(30)
@ -62,6 +66,9 @@ struct ContentView: View {
glassButton("Automation Plan") {
Task { await runRatchet(["pipeline"], input: "no\n") }
}
glassButton("Manual Positions") {
Task { await runRatchet(["position", "list"]) }
}
glassButton("Full Report") {
Task { await runRatchet(["report"]) }
}
@ -73,6 +80,62 @@ struct ContentView: View {
}
}
private var manualExposureControls: some View {
VStack(alignment: .leading, spacing: 10) {
Text("Manual Braiins Exposure")
.font(.system(size: 15, weight: .bold, design: .rounded))
.foregroundStyle(.white.opacity(0.82))
HStack(spacing: 10) {
TextField("Description, e.g. Braiins order abc 0.0001 BTC", text: $manualDescription)
.textFieldStyle(.plain)
.padding(10)
.background(.thinMaterial, in: RoundedRectangle(cornerRadius: 12))
.foregroundStyle(.white)
TextField("Hours", text: $maturityHours)
.textFieldStyle(.plain)
.frame(width: 72)
.padding(10)
.background(.thinMaterial, in: RoundedRectangle(cornerRadius: 12))
.foregroundStyle(.white)
glassButton("Record Exposure") {
let description = manualDescription.trimmingCharacters(in: .whitespacesAndNewlines)
let hours = maturityHours.trimmingCharacters(in: .whitespacesAndNewlines)
guard !description.isEmpty else {
output = "Enter a manual exposure description first."
return
}
Task {
await runRatchet([
"position", "open",
"--description", description,
"--maturity-hours", hours.isEmpty ? "72" : hours
])
}
}
TextField("ID", text: $closePositionId)
.textFieldStyle(.plain)
.frame(width: 54)
.padding(10)
.background(.thinMaterial, in: RoundedRectangle(cornerRadius: 12))
.foregroundStyle(.white)
glassButton("Close Exposure") {
let positionId = closePositionId.trimmingCharacters(in: .whitespacesAndNewlines)
guard !positionId.isEmpty else {
output = "Enter a manual position ID first."
return
}
Task { await runRatchet(["position", "close", positionId]) }
}
}
}
.padding(16)
.background(.ultraThinMaterial, in: RoundedRectangle(cornerRadius: 22, style: .continuous))
.overlay(
RoundedRectangle(cornerRadius: 22, style: .continuous)
.stroke(.white.opacity(0.14), lineWidth: 1)
)
}
private var outputPanel: some View {
ScrollView {
Text(output)

View file

@ -15,6 +15,7 @@ Commands:
watch [hours] Run repeated monitor cycles for N hours. Default: 6.
pipeline Propose timed automation, then ask yes/no.
supervise Run the durable forever lifecycle supervisor.
position Record/list/close manually executed Braiins exposure.
report Print the latest stored report without fetching new data.
experiments Print the Karpathy-style experiment ledger.
retro SINCE [UNTIL] Write a retroactive report from stored snapshots.
@ -30,6 +31,7 @@ Examples:
./scripts/ratchet watch 6
./scripts/ratchet pipeline
./scripts/ratchet supervise
./scripts/ratchet position list
./scripts/ratchet report
./scripts/ratchet experiments
./scripts/ratchet retro 2026-04-25T19:08:00+00:00 2026-04-25T21:05:00+00:00
@ -117,6 +119,10 @@ cmd_supervise() {
run_python -m braiins_ratchet.cli supervise "$@"
}
cmd_position() {
run_python -m braiins_ratchet.cli position "$@"
}
cmd_experiments() {
run_python -m braiins_ratchet.cli experiments
}
@ -159,6 +165,7 @@ main() {
watch) cmd_watch "$@" ;;
pipeline|auto) cmd_pipeline "$@" ;;
supervise|daemon) cmd_supervise "$@" ;;
position|positions) cmd_position "$@" ;;
report) cmd_report "$@" ;;
experiments) cmd_experiments "$@" ;;
retro) cmd_retro "$@" ;;

View file

@ -14,7 +14,7 @@ class AutomationPlan:
@property
def needs_confirmation(self) -> bool:
return self.kind not in {"no_action", "external_wait"}
return self.kind not in {"no_action", "external_wait", "manual_exposure_hold"}
def build_automation_plan(conn) -> AutomationPlan:
@ -33,6 +33,17 @@ def build_automation_plan_from_state(state: OperatorState) -> AutomationPlan:
],
)
if state.active_manual_positions:
return AutomationPlan(
kind="manual_exposure_hold",
title="Hold because manual Braiins exposure is active.",
steps=[
"Do not start a new watch.",
"Keep lifecycle supervision focused on the active manual position.",
"Close the manual position explicitly when it is truly finished.",
],
)
if state.completed_watch and state.action == "manual_canary":
return AutomationPlan(
kind="wait_then_once",

View file

@ -1,6 +1,7 @@
from __future__ import annotations
import argparse
from datetime import UTC, datetime, timedelta
import json
from pathlib import Path
import sys
@ -17,7 +18,14 @@ from .experiments import (
write_retro_report,
)
from .guidance import build_operator_cockpit
from .lifecycle import render_lifecycle_status, render_supervisor_plan, run_supervisor
from .lifecycle import (
close_manual_position,
open_manual_position,
render_lifecycle_status,
render_manual_positions,
render_supervisor_plan,
run_supervisor,
)
from .monitor import run_cycle
from .ocean import fetch_snapshot
from .report import build_text_report
@ -209,6 +217,45 @@ def cmd_supervise(args: argparse.Namespace) -> int:
return run_supervisor(config, once=args.once)
def cmd_position_open(args: argparse.Namespace) -> int:
expected_maturity_utc = args.expected_maturity_utc
if expected_maturity_utc is None and args.maturity_hours is not None:
expected = datetime.now(UTC) + timedelta(hours=args.maturity_hours)
expected_maturity_utc = expected.isoformat(timespec="seconds")
payload = {}
if args.payload_json:
payload = json.loads(args.payload_json)
with connect() as conn:
init_db(conn)
position_id = open_manual_position(
conn,
venue=args.venue,
description=args.description,
expected_maturity_utc=expected_maturity_utc,
payload=payload,
)
print(f"manual_position_opened: #{position_id}")
print("Supervisor will hold new experiments while this position is active.")
return 0
def cmd_position_close(args: argparse.Namespace) -> int:
with connect() as conn:
init_db(conn)
closed = close_manual_position(conn, args.position_id)
if not closed:
raise SystemExit(f"no active manual position found with id {args.position_id}")
print(f"manual_position_closed: #{args.position_id}")
return 0
def cmd_position_list(_: argparse.Namespace) -> int:
with connect() as conn:
init_db(conn)
print(render_manual_positions(conn))
return 0
def cmd_experiments(_: argparse.Namespace) -> int:
if not EXPERIMENT_LOG.exists():
print("No experiment log yet. Run ./scripts/ratchet watch 2.")
@ -352,6 +399,24 @@ def build_parser() -> argparse.ArgumentParser:
supervise.add_argument("--status", action="store_true", help="print persisted lifecycle status")
supervise.set_defaults(func=cmd_supervise)
position = sub.add_parser("position", help="record manually executed Braiins exposure")
position_sub = position.add_subparsers(required=True)
position_open = position_sub.add_parser("open", help="record an active manual position")
position_open.add_argument("--venue", default="braiins")
position_open.add_argument("--description", required=True)
position_open.add_argument("--maturity-hours", type=int)
position_open.add_argument("--expected-maturity-utc")
position_open.add_argument("--payload-json")
position_open.set_defaults(func=cmd_position_open)
position_close = position_sub.add_parser("close", help="mark a manual position closed")
position_close.add_argument("position_id", type=int)
position_close.set_defaults(func=cmd_position_close)
position_list = position_sub.add_parser("list", help="list manual positions")
position_list.set_defaults(func=cmd_position_list)
experiments = sub.add_parser("experiments", help="print the Karpathy-style experiment log")
experiments.set_defaults(func=cmd_experiments)

View file

@ -36,6 +36,7 @@ class OperatorState:
running_runs: list[str]
latest_ocean_timestamp: str | None
latest_market_timestamp: str | None
active_manual_positions: list[str]
def get_operator_state(conn) -> OperatorState:
@ -56,6 +57,7 @@ def get_operator_state(conn) -> OperatorState:
running_runs=_running_runs(),
latest_ocean_timestamp=ocean.timestamp_utc if ocean else None,
latest_market_timestamp=market.timestamp_utc if market else None,
active_manual_positions=_active_manual_positions(conn),
)
@ -74,6 +76,7 @@ def build_operator_cockpit(conn) -> str:
f" Latest run report: {state.latest_report or 'none yet'}",
f" Experiment ledger: {EXPERIMENT_LOG.relative_to(REPORTS_DIR.parent) if EXPERIMENT_LOG.exists() else 'none yet'}",
f" Active watch: {state.active_watch or 'none detected'}",
f" Active manual exposure: {_manual_exposure_text(state.active_manual_positions)}",
f" Research stage: {_research_stage(state.active_watch, state.completed_watch)}",
]
@ -87,6 +90,7 @@ def build_operator_cockpit(conn) -> str:
lines.extend(
_do_this_now(
active_watch=state.active_watch,
active_manual_positions=state.active_manual_positions,
completed_watch=state.completed_watch,
has_ocean=state.has_ocean,
has_market=state.has_market,
@ -98,6 +102,7 @@ def build_operator_cockpit(conn) -> str:
lines.extend(
_pathway_forecast(
active_watch=state.active_watch,
active_manual_positions=state.active_manual_positions,
completed_watch=state.completed_watch,
has_ocean=state.has_ocean,
has_market=state.has_market,
@ -132,6 +137,7 @@ def build_operator_cockpit(conn) -> str:
def _do_this_now(
active_watch: str | None,
active_manual_positions: list[str],
completed_watch: CompletedWatch | None,
has_ocean: bool,
has_market: bool,
@ -147,6 +153,16 @@ def _do_this_now(
" ./scripts/ratchet",
]
if active_manual_positions:
return [
" HOLD.",
" Manual Braiins exposure is active. Do not start new watch experiments.",
" Keep supervisor running or check status with:",
" ./scripts/ratchet supervise --status",
" When the Braiins position is really finished, close it with:",
" ./scripts/ratchet position close POSITION_ID",
]
if completed_watch and action == "manual_canary":
return [
" STOP.",
@ -203,6 +219,7 @@ def _do_this_now(
def _pathway_forecast(
active_watch: str | None,
active_manual_positions: list[str],
completed_watch: CompletedWatch | None,
has_ocean: bool,
has_market: bool,
@ -217,6 +234,14 @@ def _pathway_forecast(
" Longterm, possible: adjust one strategy knob if the report says the run taught us something.",
]
if active_manual_positions:
return [
" Planning probabilities are workflow estimates, not profit probabilities.",
" Immediate, certain: supervise the active manual exposure; workload is observation only.",
" Midterm, likely: close the position manually when Braiins/OCEAN state confirms it is done.",
" Longterm, possible: resume passive ratchet experiments only after exposure is closed.",
]
if completed_watch and action == "manual_canary":
return [
" Planning probabilities are workflow estimates, not profit probabilities.",
@ -300,6 +325,30 @@ def _latest_report() -> str | None:
return str(reports[0].relative_to(REPORTS_DIR.parent))
def _active_manual_positions(conn) -> list[str]:
try:
rows = conn.execute(
"""
SELECT id, venue, description, expected_maturity_utc
FROM manual_positions
WHERE status = 'active'
ORDER BY id DESC
"""
).fetchall()
except Exception:
return []
return [
f"#{row[0]} {row[1]} {row[2]} maturity={row[3] or 'unknown'}"
for row in rows
]
def _manual_exposure_text(positions: list[str]) -> str:
if not positions:
return "none recorded"
return "; ".join(positions)
def _recent_completed_watch(latest_report: str | None, latest_market_timestamp: str | None) -> CompletedWatch | None:
if latest_report is None:
return None

View file

@ -24,6 +24,18 @@ class LifecycleStatus:
message: str
@dataclass(frozen=True)
class ManualPosition:
id: int
opened_utc: str
closed_utc: str | None
status: str
venue: str
description: str
expected_maturity_utc: str | None
payload_json: str
def init_lifecycle_db(conn) -> None:
init_db(conn)
conn.executescript(
@ -68,6 +80,7 @@ def get_lifecycle_status(conn) -> LifecycleStatus:
def render_lifecycle_status(conn) -> str:
status = get_lifecycle_status(conn)
positions = list_manual_positions(conn, status="active")
lines = [
"Braiins Ratchet Lifecycle",
"",
@ -79,6 +92,13 @@ def render_lifecycle_status(conn) -> str:
if status.next_action_utc:
remaining = _seconds_until(status.next_action_utc)
lines.append(f"Countdown: {_format_duration(remaining)}")
if positions:
lines.extend(["", "Active Manual Exposure"])
for position in positions:
lines.append(
f" #{position.id} {position.venue}: {position.description} "
f"(expected maturity: {position.expected_maturity_utc or 'unknown'})"
)
return "\n".join(lines)
@ -92,10 +112,11 @@ def render_supervisor_plan() -> str:
"Loop:",
" 1. Resume persisted lifecycle state from data/ratchet.sqlite.",
" 2. If cooldown is active, wait until the persisted next-action time.",
" 3. Run one 2-hour passive watch when the lifecycle is ready.",
" 4. Write the experiment ledger and run report.",
" 5. Enter post-watch cooldown.",
" 6. Repeat forever until you stop the process.",
" 3. If manual Braiins exposure is active, hold and do not start new experiments.",
" 4. Run one 2-hour passive watch when the lifecycle is ready.",
" 5. Write the experiment ledger and run report.",
" 6. Enter post-watch cooldown.",
" 7. Repeat forever until you stop the process.",
"",
"Crash/reboot behavior:",
" Restart ./scripts/ratchet supervise and it resumes from SQLite.",
@ -118,6 +139,13 @@ def run_supervisor(config: AppConfig, *, once: bool = False) -> int:
while True:
with connect() as conn:
init_lifecycle_db(conn)
active_positions = list_manual_positions(conn, status="active")
if active_positions:
_handle_manual_exposure(conn, active_positions)
if once:
return 0
time.sleep(60)
continue
state = _read_state(conn)
phase = state.get("phase", "idle")
next_action_utc = state.get("next_action_utc")
@ -199,6 +227,165 @@ def _run_watch_stage(config: AppConfig) -> str:
return experiment.run_id
def open_manual_position(
conn,
*,
venue: str,
description: str,
expected_maturity_utc: str | None,
payload: dict[str, object] | None = None,
) -> int:
init_lifecycle_db(conn)
opened = datetime.now(UTC).isoformat(timespec="seconds")
cursor = conn.execute(
"""
INSERT INTO manual_positions (
opened_utc, closed_utc, status, venue, description,
expected_maturity_utc, payload_json
)
VALUES (?, NULL, 'active', ?, ?, ?, ?)
""",
(
opened,
venue,
description,
expected_maturity_utc,
json.dumps(payload or {}, sort_keys=True),
),
)
position_id = int(cursor.lastrowid)
_write_state(
conn,
{
"phase": "manual_exposure_active",
"next_action_utc": expected_maturity_utc or "",
"last_run_id": _read_state(conn).get("last_run_id", ""),
"message": f"manual exposure active: position #{position_id}",
},
)
_record_event(
conn,
"manual_position_opened",
{"position_id": position_id, "venue": venue, "description": description},
)
return position_id
def close_manual_position(conn, position_id: int) -> bool:
init_lifecycle_db(conn)
closed = datetime.now(UTC).isoformat(timespec="seconds")
cursor = conn.execute(
"""
UPDATE manual_positions
SET status = 'closed', closed_utc = ?
WHERE id = ? AND status = 'active'
""",
(closed, position_id),
)
if cursor.rowcount == 0:
conn.commit()
return False
_record_event(conn, "manual_position_closed", {"position_id": position_id})
if not list_manual_positions(conn, status="active"):
state = _read_state(conn)
_write_state(
conn,
{
"phase": "idle",
"next_action_utc": "",
"last_run_id": state.get("last_run_id", ""),
"message": "manual exposure closed; lifecycle ready",
},
)
return True
def list_manual_positions(conn, *, status: str | None = None) -> list[ManualPosition]:
init_lifecycle_db(conn)
where = "WHERE status = ?" if status else ""
params: tuple[object, ...] = (status,) if status else ()
rows = conn.execute(
f"""
SELECT id, opened_utc, closed_utc, status, venue, description,
expected_maturity_utc, payload_json
FROM manual_positions
{where}
ORDER BY id DESC
""",
params,
).fetchall()
return [
ManualPosition(
id=int(row[0]),
opened_utc=row[1],
closed_utc=row[2],
status=row[3],
venue=row[4],
description=row[5],
expected_maturity_utc=row[6],
payload_json=row[7],
)
for row in rows
]
def render_manual_positions(conn) -> str:
positions = list_manual_positions(conn)
if not positions:
return "Manual Positions\n\nNo manual positions recorded."
lines = ["Manual Positions", ""]
for position in positions:
lines.extend(
[
f"#{position.id} {position.status} {position.venue}",
f" opened_utc: {position.opened_utc}",
f" closed_utc: {position.closed_utc or 'n/a'}",
f" expected_maturity_utc: {position.expected_maturity_utc or 'n/a'}",
f" description: {position.description}",
]
)
return "\n".join(lines)
def _handle_manual_exposure(conn, positions: list[ManualPosition]) -> None:
next_maturity = _earliest_maturity(positions)
now = datetime.now(UTC)
if next_maturity and next_maturity > now:
phase = "manual_exposure_active"
message = "manual Braiins exposure active; supervisor will not start new experiments"
next_action = next_maturity.isoformat(timespec="seconds")
else:
phase = "manual_exposure_review"
message = "manual exposure needs review or explicit close before lifecycle resumes"
next_action = ""
state = _read_state(conn)
_write_state(
conn,
{
"phase": phase,
"next_action_utc": next_action,
"last_run_id": state.get("last_run_id", ""),
"message": message,
},
)
_record_event(
conn,
"manual_exposure_hold",
{"active_position_ids": [position.id for position in positions], "phase": phase},
)
print(render_lifecycle_status(conn), flush=True)
def _earliest_maturity(positions: list[ManualPosition]) -> datetime | None:
maturities = [
_parse_utc(position.expected_maturity_utc)
for position in positions
if position.expected_maturity_utc
]
parsed = [maturity for maturity in maturities if maturity is not None]
return min(parsed) if parsed else None
def _read_state(conn) -> dict[str, str]:
rows = conn.execute("SELECT key, value FROM lifecycle_state").fetchall()
return {row[0]: row[1] for row in rows}
@ -226,13 +413,22 @@ def _record_event(conn, event_type: str, payload: dict[str, object]) -> None:
def _seconds_until(timestamp_utc: str) -> int:
target = _parse_utc(timestamp_utc)
if target is None:
return 0
return max(0, int((target - datetime.now(UTC)).total_seconds()))
def _parse_utc(timestamp_utc: str | None) -> datetime | None:
if not timestamp_utc:
return None
try:
target = datetime.fromisoformat(timestamp_utc.replace("Z", "+00:00"))
except ValueError:
return 0
return None
if target.tzinfo is None:
target = target.replace(tzinfo=UTC)
return max(0, int((target.astimezone(UTC) - datetime.now(UTC)).total_seconds()))
return target.astimezone(UTC)
def _sleep_with_progress(seconds: int) -> None:

View file

@ -33,12 +33,22 @@ class AutomationTests(unittest.TestCase):
self.assertEqual(plan.kind, "external_wait")
self.assertFalse(plan.needs_confirmation)
def test_manual_exposure_hold_does_not_prompt(self) -> None:
plan = build_automation_plan_from_state(
_state(active_manual_positions=["#1 braiins long order"])
)
self.assertEqual(plan.kind, "manual_exposure_hold")
self.assertFalse(plan.needs_confirmation)
self.assertIn("manual Braiins exposure", render_automation_plan(plan))
def _state(
*,
action: str | None = None,
active_watch: str | None = None,
completed_watch: CompletedWatch | None = None,
active_manual_positions: list[str] | None = None,
) -> OperatorState:
return OperatorState(
has_ocean=True,
@ -52,6 +62,7 @@ def _state(
running_runs=[],
latest_ocean_timestamp="2026-04-27T12:00:00+00:00",
latest_market_timestamp="2026-04-27T12:00:00+00:00",
active_manual_positions=active_manual_positions or [],
)

View file

@ -74,6 +74,7 @@ class GuidanceTests(unittest.TestCase):
def test_stale_market_data_routes_operator_to_once(self) -> None:
lines = _do_this_now(
active_watch=None,
active_manual_positions=[],
completed_watch=None,
has_ocean=True,
has_market=True,
@ -89,6 +90,7 @@ class GuidanceTests(unittest.TestCase):
def test_recent_completed_watch_stops_identical_watch_loop(self) -> None:
lines = _do_this_now(
active_watch=None,
active_manual_positions=[],
completed_watch=_completed_watch(age_minutes=4),
has_ocean=True,
has_market=True,
@ -105,6 +107,7 @@ class GuidanceTests(unittest.TestCase):
def test_recent_completed_watch_forecast_enters_cooldown(self) -> None:
lines = _pathway_forecast(
active_watch=None,
active_manual_positions=[],
completed_watch=_completed_watch(age_minutes=4),
has_ocean=True,
has_market=True,
@ -117,6 +120,23 @@ class GuidanceTests(unittest.TestCase):
self.assertIn("Immediate, certain: stop this stage", text)
self.assertIn("Midterm, likely: after cooldown", text)
def test_active_manual_exposure_blocks_new_experiments(self) -> None:
lines = _do_this_now(
active_watch=None,
active_manual_positions=["#1 braiins long order"],
completed_watch=None,
has_ocean=True,
has_market=True,
is_fresh=True,
action="manual_canary",
)
text = "\n".join(lines)
self.assertIn("HOLD.", text)
self.assertIn("Manual Braiins exposure is active", text)
self.assertIn("position close POSITION_ID", text)
def test_cooldown_status_includes_timer_and_progress_bar(self) -> None:
lines = _cooldown_status_lines(_completed_watch(age_minutes=90))

View file

@ -3,8 +3,12 @@ import sqlite3
import unittest
from braiins_ratchet.lifecycle import (
close_manual_position,
get_lifecycle_status,
init_lifecycle_db,
list_manual_positions,
open_manual_position,
render_manual_positions,
render_lifecycle_status,
render_supervisor_plan,
)
@ -44,6 +48,29 @@ class LifecycleTests(unittest.TestCase):
self.assertIn("Restart ./scripts/ratchet supervise", text)
self.assertIn("never places", text)
def test_manual_position_open_blocks_lifecycle_until_closed(self) -> None:
conn = sqlite3.connect(":memory:")
init_lifecycle_db(conn)
position_id = open_manual_position(
conn,
venue="braiins",
description="manual long bid",
expected_maturity_utc="2026-04-30T00:00:00+00:00",
payload={"spend_btc": "0.0001"},
)
status = get_lifecycle_status(conn)
active = list_manual_positions(conn, status="active")
self.assertEqual(status.phase, "manual_exposure_active")
self.assertEqual(len(active), 1)
self.assertEqual(active[0].id, position_id)
self.assertIn("manual long bid", render_manual_positions(conn))
self.assertTrue(close_manual_position(conn, position_id))
self.assertEqual(list_manual_positions(conn, status="active"), [])
self.assertEqual(get_lifecycle_status(conn).phase, "idle")
if __name__ == "__main__":
unittest.main()