chore: make primary branch Go-only; move Python line to python-legacy

This commit is contained in:
saymrwulf 2026-02-13 18:53:06 +01:00
parent bea52820ce
commit 3e71cebfcf
14 changed files with 31 additions and 5739 deletions

105
README.md
View file

@ -1,96 +1,53 @@
# Puncture: Zero-Trust Cloud-Wide Forward Secrecy
# Puncture (Go)
Python implementation of puncturable encryption (PE) over a GGM tree, with:
Go implementation of a puncturable-key system (GGM tree) with:
- a **master app** (key management + provider management + asset encryption mapping)
- a **secondary app** (read-only live mirror with password auth and kill-switch login)
- primary macOS app/server for key derivation, puncturing, providers, and asset encryption/decryption
- iOS emergency companion app for remote provider-level puncture
## Core cryptographic model
## Current architecture
- 256-bit master seed root.
- HMAC-SHA256 left/right derivation for GGM child nodes.
- Non-sequential puncture with minimal co-path replacement.
- Tag schema: `[7 bits provider_id] | [25 bits file_time_id]`.
- Active-state model stores only active prefix nodes (not per-file keys).
- Immediate zeroization of replaced node material on puncture.
- Puncture log export/import (`list[str]` of bit strings).
- `goapp/cmd/server`: headless web server (`:9122`)
- `goapp/cmd/desktop`: macOS desktop app (embedded webview + local server)
- `goapp/internal/crypto`: GGM puncturable key manager
- `goapp/internal/app`: provider/key/asset state machine
- `goapp/internal/server`: HTTP API + web UI
- `goapp/ios/EmergencyPuncture`: native iOS app
## Setup (venv)
## Run primary server
```bash
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
cd goapp
go run ./cmd/server --host 0.0.0.0 --port 9122
```
## Run
Open `http://127.0.0.1:9122`.
Master app on `9122`:
## Build macOS app + installer
```bash
python -m puncture.web_app --host 0.0.0.0 --port 9122
cd goapp
./packaging/macos/build_dmg.sh
```
Secondary app on `9222`:
Artifacts:
```bash
python -m puncture.view_app --host 0.0.0.0 --port 9222
```
- `goapp/dist/Puncture.app`
- `goapp/dist/Puncture.dmg`
## Master app (`:9122`) workflow
## Persistence
- `/`: main puncture lab (derive/puncture, history, active frontier roots view).
- `/providers`: add/edit/delete providers.
- `/assets`: pick a cleartext file from asset root, select provider/key, encrypt, and register mapping.
Frontier/puncture/providers/assets are persisted locally and survive restarts.
Default desktop paths:
### Asset behavior
- assets: `~/Library/Application Support/PunctureGo/assets`
- state: `~/Library/Application Support/PunctureGo/state.json`
- Ciphertext is written in the **same folder** as the cleartext file.
- Decryption writes recovered cleartext back into the same folder tree (versioned filenames).
- One cleartext file can have multiple provider-key mappings.
- One provider-key can encrypt multiple files.
- After key puncture:
- affected mappings are shown in **red** (`blocked by puncture`)
- if the same cleartext file still has another accessible mapping, that mapping **glows**
## iOS companion
## Secondary app (`:9222`) behavior
- Xcode project: `goapp/ios/EmergencyPuncture/EmergencyPuncture.xcodeproj`
- iPhone install guide: `goapp/ios/INSTALL_IPHONE.md`
- Password-gated access.
- Live read-only mirror from master (`GET /api/live/state`).
- No share setup and no independent derivation state.
- Kill-switch login format:
- normal login: `<password>`
- kill-switch login: `<password><provider_id>`
- example: `puncture-view42` punctures provider `42` immediately on master.
## Legacy Python line
## Environment variables
Master app:
- `PUNCTURE_PORT` (default `9122`)
- `PUNCTURE_ASSET_ROOT` (default `<cwd>/assets`)
- `PUNCTURE_REMOTE_TOKEN` (optional; required for remote puncture endpoint)
- `PUNCTURE_VIEW_SYNC_KEY` (optional signing key for `/api/view-bundle`)
Secondary app:
- `PUNCTURE_VIEW_PORT` (default `9222`)
- `PUNCTURE_MASTER_URL` (default `http://127.0.0.1:9122`)
- `PUNCTURE_SECONDARY_PASSWORD` (default `puncture-view`)
- `PUNCTURE_SECONDARY_SECRET` (Flask session secret)
- `PUNCTURE_REMOTE_TOKEN` (optional; sent as `X-Puncture-Token`)
## API quick reference
- `GET /api/state` (master full state)
- `GET /api/live/state` (master live data for secondary app)
- `POST /api/remote/puncture-provider` (master remote provider kill endpoint)
- `GET /api/export` / `POST /api/import`
- `POST /api/puncture-log`
- `GET /api/view-bundle` (legacy signed/unsigned viewer bundle export)
## Tests
```bash
pytest -q
```
The previous Python implementation is preserved in branch `python-legacy`.

View file

@ -1,21 +0,0 @@
from .key_manager import (
PATH_BITS,
PROVIDER_BITS,
RESOURCE_BITS,
PuncturableKeyManager,
Tag,
binary_path_to_tag,
provider_id_to_prefix,
tag_to_binary_path,
)
__all__ = [
"PATH_BITS",
"PROVIDER_BITS",
"RESOURCE_BITS",
"PuncturableKeyManager",
"Tag",
"binary_path_to_tag",
"provider_id_to_prefix",
"tag_to_binary_path",
]

View file

@ -1,283 +0,0 @@
"""GGM-tree based puncturable key manager.
This module implements a 32-bit tag space mapped as:
[7 bits provider_id] | [25 bits file/time_id]
"""
from __future__ import annotations
import hashlib
import hmac
import json
import os
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional
PROVIDER_BITS = 7
RESOURCE_BITS = 25
PATH_BITS = PROVIDER_BITS + RESOURCE_BITS
KEY_SIZE_BYTES = 32
def _zeroize(buf: bytearray) -> None:
for i in range(len(buf)):
buf[i] = 0
def _derive_child(parent_key: bytes | bytearray, bit: str) -> bytearray:
if bit not in {"0", "1"}:
raise ValueError("bit must be '0' or '1'")
marker = b"\x00" if bit == "0" else b"\x01"
child = hmac.new(bytes(parent_key), b"GGM" + marker, hashlib.sha256).digest()
return bytearray(child)
def _validate_binary_path(binary_path: str, expected_length: int = PATH_BITS) -> None:
if len(binary_path) != expected_length:
raise ValueError(f"binary_path must be {expected_length} bits")
if any(c not in "01" for c in binary_path):
raise ValueError("binary_path must contain only '0' or '1'")
def _validate_binary_prefix(binary_prefix: str, min_len: int = 1, max_len: int = PATH_BITS) -> None:
if not (min_len <= len(binary_prefix) <= max_len):
raise ValueError(f"binary_prefix must be between {min_len} and {max_len} bits")
if any(c not in "01" for c in binary_prefix):
raise ValueError("binary_prefix must contain only '0' or '1'")
@dataclass(frozen=True)
class Tag:
provider_id: int
file_time_id: int
def to_binary_path(self) -> str:
return tag_to_binary_path(self.provider_id, self.file_time_id)
def tag_to_binary_path(provider_id: int, file_time_id: int) -> str:
if not (0 <= provider_id < (1 << PROVIDER_BITS)):
raise ValueError(f"provider_id must be in [0, {1 << PROVIDER_BITS})")
if not (0 <= file_time_id < (1 << RESOURCE_BITS)):
raise ValueError(f"file_time_id must be in [0, {1 << RESOURCE_BITS})")
value = (provider_id << RESOURCE_BITS) | file_time_id
return f"{value:0{PATH_BITS}b}"
def provider_id_to_prefix(provider_id: int) -> str:
if not (0 <= provider_id < (1 << PROVIDER_BITS)):
raise ValueError(f"provider_id must be in [0, {1 << PROVIDER_BITS})")
return f"{provider_id:0{PROVIDER_BITS}b}"
def binary_path_to_tag(binary_path: str) -> Tag:
_validate_binary_path(binary_path)
value = int(binary_path, 2)
file_time_mask = (1 << RESOURCE_BITS) - 1
provider_id = value >> RESOURCE_BITS
file_time_id = value & file_time_mask
return Tag(provider_id=provider_id, file_time_id=file_time_id)
class PuncturableKeyManager:
"""Forward-secret key manager based on puncturable GGM keys.
State model:
- The manager stores only a prefix-free set of active nodes (prefix -> seed).
- A puncture operation removes one covering ancestor node and replaces it with the
minimal set of sibling/co-path nodes needed to preserve all other leaves.
"""
def __init__(self, master_seed: bytes):
if len(master_seed) != KEY_SIZE_BYTES:
raise ValueError(f"master_seed must be {KEY_SIZE_BYTES} bytes")
self._active_nodes: Dict[str, bytearray] = {"": bytearray(master_seed)}
self._puncture_log: List[str] = []
self._punctured_paths: set[str] = set()
self._punctured_prefixes: set[str] = set()
@staticmethod
def generate_master_seed() -> bytes:
return os.urandom(KEY_SIZE_BYTES)
@property
def active_node_count(self) -> int:
return len(self._active_nodes)
def active_prefixes(self) -> List[str]:
return sorted(self._active_nodes.keys(), key=lambda p: (len(p), p))
def puncture_log(self) -> List[str]:
return list(self._puncture_log)
def export_puncture_log_json(self) -> str:
return json.dumps(self._puncture_log)
def export_state(self) -> dict:
return {
"active_nodes": {prefix: bytes(seed).hex() for prefix, seed in self._active_nodes.items()},
"puncture_log": list(self._puncture_log),
}
@classmethod
def from_state(cls, state: dict) -> "PuncturableKeyManager":
# Seed is a placeholder; state fully replaces active nodes.
manager = cls(master_seed=b"\x00" * KEY_SIZE_BYTES)
active_nodes = state.get("active_nodes")
if not isinstance(active_nodes, dict):
raise ValueError("state['active_nodes'] must be a dict")
rebuilt_nodes: Dict[str, bytearray] = {}
for prefix, hex_seed in active_nodes.items():
if any(c not in "01" for c in prefix):
raise ValueError("active node prefixes must be binary strings")
seed = bytes.fromhex(hex_seed)
if len(seed) != KEY_SIZE_BYTES:
raise ValueError("active node seed must be 32 bytes")
rebuilt_nodes[prefix] = bytearray(seed)
manager._active_nodes = rebuilt_nodes
puncture_log = state.get("puncture_log", [])
if not isinstance(puncture_log, list):
raise ValueError("state['puncture_log'] must be a list")
for bitstring in puncture_log:
_validate_binary_prefix(bitstring, min_len=1, max_len=PATH_BITS)
manager._puncture_log = list(puncture_log)
manager._punctured_paths = {b for b in puncture_log if len(b) == PATH_BITS}
manager._punctured_prefixes = {b for b in puncture_log if len(b) < PATH_BITS}
return manager
def _find_covering_prefix(self, binary_path: str) -> Optional[str]:
for depth in range(len(binary_path), -1, -1):
prefix = binary_path[:depth]
if prefix in self._active_nodes:
return prefix
return None
def get_key_for_tag(self, binary_path: str) -> Optional[bytes]:
_validate_binary_path(binary_path)
cover = self._find_covering_prefix(binary_path)
if cover is None:
return None
key: bytes | bytearray = self._active_nodes[cover]
for bit in binary_path[len(cover) :]:
key = _derive_child(key, bit)
return bytes(key)
def get_key_for_provider_resource(self, provider_id: int, file_time_id: int) -> Optional[bytes]:
return self.get_key_for_tag(tag_to_binary_path(provider_id, file_time_id))
def puncture(self, binary_path: str) -> bool:
"""Puncture a path using minimal co-path replacement.
Returns:
True if a new puncture was applied.
False if the path was already punctured / inaccessible.
"""
_validate_binary_path(binary_path)
if binary_path in self._punctured_paths:
return False
if any(binary_path.startswith(prefix) for prefix in self._punctured_prefixes):
return False
cover = self._find_covering_prefix(binary_path)
if cover is None:
# Already inaccessible due to earlier punctures.
self._punctured_paths.add(binary_path)
self._puncture_log.append(binary_path)
return False
current_key = self._active_nodes.pop(cover)
for depth in range(len(cover), PATH_BITS):
bit = binary_path[depth]
sibling_bit = "1" if bit == "0" else "0"
sibling_key = _derive_child(current_key, sibling_bit)
sibling_prefix = binary_path[:depth] + sibling_bit
self._active_nodes[sibling_prefix] = sibling_key
selected_key = _derive_child(current_key, bit)
_zeroize(current_key)
current_key = selected_key
# Current leaf key has been punctured; zeroize immediately.
_zeroize(current_key)
self._punctured_paths.add(binary_path)
self._puncture_log.append(binary_path)
return True
def puncture_prefix(self, binary_prefix: str) -> bool:
"""Puncture a full subtree identified by a prefix.
Example: 7-bit provider prefix to revoke all keys for that provider.
"""
_validate_binary_prefix(binary_prefix, min_len=1, max_len=PATH_BITS)
if len(binary_prefix) == PATH_BITS:
return self.puncture(binary_prefix)
if binary_prefix in self._punctured_prefixes:
return False
if any(binary_prefix.startswith(prefix) for prefix in self._punctured_prefixes):
return False
changed = False
cover = self._find_covering_prefix(binary_prefix)
if cover is not None:
current_key = self._active_nodes.pop(cover)
changed = True
for depth in range(len(cover), len(binary_prefix)):
bit = binary_prefix[depth]
sibling_bit = "1" if bit == "0" else "0"
sibling_key = _derive_child(current_key, sibling_bit)
sibling_prefix = binary_prefix[:depth] + sibling_bit
self._active_nodes[sibling_prefix] = sibling_key
selected_key = _derive_child(current_key, bit)
_zeroize(current_key)
current_key = selected_key
# Punctured subtree root key should not remain in memory.
_zeroize(current_key)
descendants = [node for node in self._active_nodes.keys() if node.startswith(binary_prefix)]
if descendants:
changed = True
for node in descendants:
doomed = self._active_nodes.pop(node)
_zeroize(doomed)
self._punctured_prefixes.add(binary_prefix)
self._puncture_log.append(binary_prefix)
return changed
def puncture_provider_resource(self, provider_id: int, file_time_id: int) -> bool:
return self.puncture(tag_to_binary_path(provider_id, file_time_id))
def puncture_provider(self, provider_id: int) -> bool:
return self.puncture_prefix(provider_id_to_prefix(provider_id))
def apply_puncture_log(self, puncture_paths: Iterable[str]) -> int:
applied = 0
for bitstring in puncture_paths:
_validate_binary_prefix(bitstring, min_len=1, max_len=PATH_BITS)
if len(bitstring) == PATH_BITS:
changed = self.puncture(bitstring)
else:
changed = self.puncture_prefix(bitstring)
if changed:
applied += 1
return applied

View file

@ -1,94 +0,0 @@
"""Simulation scenarios for puncturable key manager."""
from __future__ import annotations
import json
from dataclasses import dataclass
from .key_manager import PuncturableKeyManager, tag_to_binary_path
@dataclass
class ScenarioResult:
scenario: str
passed: bool
details: dict
def run_scenario_a() -> ScenarioResult:
"""Scenario A: Upload file to Provider 42, then puncture its key."""
seed = PuncturableKeyManager.generate_master_seed()
manager = PuncturableKeyManager(seed)
provider_id = 42
file_time_id = 123456
tag = tag_to_binary_path(provider_id, file_time_id)
key_before = manager.get_key_for_tag(tag)
punctured = manager.puncture(tag)
key_after = manager.get_key_for_tag(tag)
passed = key_before is not None and punctured and key_after is None
return ScenarioResult(
scenario="A",
passed=passed,
details={
"provider_id": provider_id,
"file_time_id": file_time_id,
"path": tag,
"key_before_exists": key_before is not None,
"puncture_applied": punctured,
"key_after_exists": key_after is not None,
"active_nodes": manager.active_node_count,
},
)
def run_scenario_b() -> ScenarioResult:
"""Scenario B: seized node-set cannot derive punctured key but can derive others."""
seed = PuncturableKeyManager.generate_master_seed()
manager = PuncturableKeyManager(seed)
punctured_path = tag_to_binary_path(42, 777777)
control_path = tag_to_binary_path(42, 777778)
control_before = manager.get_key_for_tag(control_path)
manager.puncture(punctured_path)
# Simulate nation-state seizure of *current* active node-set only.
seized_state = manager.export_state()
seized_manager = PuncturableKeyManager.from_state(seized_state)
punctured_from_seized = seized_manager.get_key_for_tag(punctured_path)
control_from_seized = seized_manager.get_key_for_tag(control_path)
passed = punctured_from_seized is None and control_before == control_from_seized
return ScenarioResult(
scenario="B",
passed=passed,
details={
"punctured_path": punctured_path,
"control_path": control_path,
"punctured_recoverable_from_seized": punctured_from_seized is not None,
"control_key_still_recoverable": control_from_seized is not None,
"control_key_matches_pre_puncture": control_before == control_from_seized,
"active_nodes_seized": seized_manager.active_node_count,
},
)
def run_all() -> dict:
scenario_a = run_scenario_a()
scenario_b = run_scenario_b()
return {
"scenario_a": {"passed": scenario_a.passed, "details": scenario_a.details},
"scenario_b": {"passed": scenario_b.passed, "details": scenario_b.details},
"overall_passed": scenario_a.passed and scenario_b.passed,
}
if __name__ == "__main__":
print(json.dumps(run_all(), indent=2))

View file

@ -1,477 +0,0 @@
"""Secondary read-only live view app with password auth and kill-switch login."""
from __future__ import annotations
import argparse
import json
import os
import re
import urllib.error
import urllib.request
from typing import Any, Dict, Optional
from flask import Flask, redirect, render_template_string, request, session, url_for
def _master_url() -> str:
return os.getenv("PUNCTURE_MASTER_URL", "http://127.0.0.1:9122").rstrip("/")
def _master_token() -> str:
return os.getenv("PUNCTURE_REMOTE_TOKEN", "").strip()
def _viewer_password() -> str:
return os.getenv("PUNCTURE_SECONDARY_PASSWORD", "puncture-view")
def _build_request(path: str, *, method: str = "GET", payload: Optional[dict] = None) -> urllib.request.Request:
url = _master_url() + path
headers = {"Content-Type": "application/json"}
token = _master_token()
if token:
headers["X-Puncture-Token"] = token
data = None
if payload is not None:
data = json.dumps(payload).encode("utf-8")
return urllib.request.Request(url, method=method, data=data, headers=headers)
def _fetch_master_state() -> Dict[str, Any]:
req = _build_request("/api/live/state")
with urllib.request.urlopen(req, timeout=8) as resp:
return json.loads(resp.read().decode("utf-8"))
def _remote_puncture_provider(provider_id: int) -> Dict[str, Any]:
req = _build_request(
"/api/remote/puncture-provider",
method="POST",
payload={"provider_id": provider_id},
)
with urllib.request.urlopen(req, timeout=8) as resp:
return json.loads(resp.read().decode("utf-8"))
def _parse_kill_switch(raw_password: str, base_password: str) -> Optional[int]:
if raw_password == base_password:
return None
if not raw_password.startswith(base_password):
return None
suffix = raw_password[len(base_password) :]
if not re.fullmatch(r"\d{1,3}", suffix or ""):
return None
provider_id = int(suffix)
if not (0 <= provider_id <= 127):
return None
return provider_id
def create_app() -> Flask:
app = Flask(__name__)
app.secret_key = os.getenv("PUNCTURE_SECONDARY_SECRET", "puncture-secondary-secret")
def _is_auth() -> bool:
return bool(session.get("auth_ok"))
def _set_notice(tone: str, message: str) -> None:
session["notice"] = {"tone": tone, "message": message}
def _pull_notice() -> Optional[Dict[str, Any]]:
return session.pop("notice", None)
@app.get("/login")
def login_page() -> str:
if _is_auth():
return redirect(url_for("dashboard"))
notice = _pull_notice()
return render_template_string(LOGIN_HTML, notice=notice, master_url=_master_url())
@app.post("/login")
def login_submit() -> Any:
entered = request.form.get("password", "")
base = _viewer_password()
if entered == base:
session["auth_ok"] = True
_set_notice("success", "Authenticated.")
return redirect(url_for("dashboard"))
kill_provider = _parse_kill_switch(entered, base)
if kill_provider is not None:
try:
result = _remote_puncture_provider(kill_provider)
if not result.get("ok"):
raise ValueError(result.get("error", "kill switch request failed"))
session["auth_ok"] = True
_set_notice(
"warn",
(
f"Kill switch activated for provider {kill_provider}. "
"All keys under this provider were punctured on master."
),
)
return redirect(url_for("dashboard"))
except Exception as exc:
_set_notice("danger", f"Kill switch failed: {exc}")
return redirect(url_for("login_page"))
_set_notice("danger", "Authentication failed.")
return redirect(url_for("login_page"))
@app.post("/logout")
def logout() -> Any:
session.clear()
_set_notice("info", "Logged out.")
return redirect(url_for("login_page"))
@app.get("/")
def dashboard() -> str:
if not _is_auth():
return redirect(url_for("login_page"))
notice = _pull_notice()
try:
live = _fetch_master_state()
providers = live.get("providers", [])
key_journal = live.get("key_journal", [])
assets = live.get("assets", {})
return render_template_string(
DASHBOARD_HTML,
notice=notice,
fetch_error=None,
generated_at=live.get("generated_at"),
master_url=_master_url(),
provider_count=len(providers),
key_count=len(key_journal),
mapping_count=int(assets.get("mapping_count", 0)),
blocked_count=int(assets.get("blocked_count", 0)),
providers=providers,
key_journal=key_journal,
asset_files=assets.get("asset_files", []),
key_cards=assets.get("key_cards", []),
)
except Exception as exc:
return render_template_string(
DASHBOARD_HTML,
notice=notice,
fetch_error=str(exc),
generated_at=None,
master_url=_master_url(),
provider_count=0,
key_count=0,
mapping_count=0,
blocked_count=0,
providers=[],
key_journal=[],
asset_files=[],
key_cards=[],
)
@app.get("/api/state")
def api_state() -> Dict[str, Any]:
if not _is_auth():
return {"ok": False, "error": "unauthenticated"}, 401
live = _fetch_master_state()
return {"ok": True, "live": live}
return app
def main() -> None:
parser = argparse.ArgumentParser(description="Run puncture secondary live-view app")
parser.add_argument("--host", default=os.getenv("PUNCTURE_VIEW_HOST", "0.0.0.0"))
parser.add_argument("--port", type=int, default=int(os.getenv("PUNCTURE_VIEW_PORT", "9222")))
args = parser.parse_args()
app = create_app()
app.run(host=args.host, port=args.port, debug=False)
LOGIN_HTML = """
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>Secondary Access</title>
<style>
:root {
--bg: #f5f2e9;
--card: #fffdf8;
--ink: #172126;
--muted: #5e6b70;
--line: #d8d0bf;
--teal: #0f766e;
--danger: #8b1d1d;
--warn: #9a3412;
--radius: 14px;
--sans: "Avenir Next", "Trebuchet MS", "Lucida Grande", sans-serif;
}
* { box-sizing: border-box; }
body {
margin: 0;
font-family: var(--sans);
color: var(--ink);
background:
radial-gradient(860px 430px at -10% -10%, #d7ece8 0%, transparent 60%),
radial-gradient(680px 350px at 105% 0%, #fae3cf 0%, transparent 55%),
var(--bg);
min-height: 100vh;
display: grid;
place-items: center;
padding: 14px;
}
.card {
width: min(560px, 100%);
background: var(--card);
border: 1px solid var(--line);
border-radius: var(--radius);
padding: 16px;
}
h1 { margin: 0 0 8px; }
p { margin: 0 0 8px; }
.muted { color: var(--muted); }
label { display: block; margin: 9px 0 4px; font-weight: 700; }
input {
width: 100%;
border: 1px solid var(--line);
border-radius: 10px;
padding: 10px;
font: inherit;
color: var(--ink);
}
button {
margin-top: 10px;
width: 100%;
border: 0;
border-radius: 10px;
padding: 10px;
font: inherit;
font-weight: 700;
cursor: pointer;
color: #fff;
background: var(--teal);
}
.notice {
border-radius: 10px;
border: 1px solid var(--line);
padding: 9px;
font-weight: 600;
margin-bottom: 8px;
}
.notice.success { background: #dcf4ef; color: #0c4f49; border-color: #b7e5dc; }
.notice.warn { background: #ffeede; color: #6f2d13; border-color: #f0d2bb; }
.notice.danger { background: #ffe7e7; color: #7f1717; border-color: #efc3c3; }
.notice.info { background: #edf4ff; color: #23466b; border-color: #cad9ee; }
</style>
</head>
<body>
<main class="card">
<h1>Secondary Live Viewer Login</h1>
<p class="muted">Master source: {{ master_url }}</p>
<p class="muted">Kill switch format: `password` + `provider_id` (for example `secret42`).</p>
{% if notice %}
<div class="notice {{ notice.tone }}">{{ notice.message }}</div>
{% endif %}
<form method="post" action="{{ url_for('login_submit') }}">
<label for="password">Password</label>
<input id="password" name="password" type="password" required autofocus />
<button type="submit">Enter</button>
</form>
</main>
</body>
</html>
"""
DASHBOARD_HTML = """
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>Secondary Live Viewer</title>
<style>
:root {
--bg: #f5f2e9;
--card: #fffdf8;
--ink: #172126;
--muted: #5e6b70;
--line: #d8d0bf;
--teal: #0f766e;
--danger: #8b1d1d;
--warn: #9a3412;
--radius: 14px;
--sans: "Avenir Next", "Trebuchet MS", "Lucida Grande", sans-serif;
--mono: Menlo, Consolas, Monaco, "Liberation Mono", monospace;
}
* { box-sizing: border-box; }
body {
margin: 0;
font-family: var(--sans);
color: var(--ink);
background:
radial-gradient(860px 430px at -10% -10%, #d7ece8 0%, transparent 60%),
radial-gradient(680px 350px at 105% 0%, #fae3cf 0%, transparent 55%),
var(--bg);
}
.wrap { max-width: 1200px; margin: 0 auto; padding: 14px 14px 30px; }
.card {
background: var(--card);
border: 1px solid var(--line);
border-radius: var(--radius);
padding: 14px;
margin-bottom: 12px;
}
h1 { margin: 4px 0 8px; font-size: clamp(1.5rem, 4vw, 2.1rem); }
h2 { margin: 0 0 8px; font-size: 1.1rem; }
p { margin: 0 0 8px; }
.muted { color: var(--muted); }
.mono { font-family: var(--mono); font-size: 0.82rem; word-break: break-all; }
.button-row { display: flex; gap: 8px; flex-wrap: wrap; }
button {
border: 0;
border-radius: 10px;
padding: 9px 12px;
font: inherit;
font-weight: 700;
cursor: pointer;
}
.btn { background: var(--teal); color: #fff; }
.btn-ghost { background: #fff; color: var(--ink); border: 1px solid var(--line); }
.stats { display: grid; gap: 8px; grid-template-columns: repeat(4, minmax(0, 1fr)); margin-top: 8px; }
.stat { border: 1px solid var(--line); border-radius: 10px; padding: 8px; background: #fff; }
.stat .label { color: var(--muted); font-size: 0.78rem; }
.stat .value { font-size: 1.2rem; font-weight: 700; }
.grid { display: grid; gap: 10px; grid-template-columns: 1fr 1fr; }
.notice {
border-radius: 10px;
border: 1px solid var(--line);
padding: 9px;
font-weight: 600;
margin-bottom: 8px;
}
.notice.success { background: #dcf4ef; color: #0c4f49; border-color: #b7e5dc; }
.notice.warn { background: #ffeede; color: #6f2d13; border-color: #f0d2bb; }
.notice.danger { background: #ffe7e7; color: #7f1717; border-color: #efc3c3; }
.notice.info { background: #edf4ff; color: #23466b; border-color: #cad9ee; }
.provider, .row {
border: 1px solid var(--line);
border-radius: 10px;
padding: 8px;
margin-top: 8px;
background: #fff;
}
.blocked { background: #ffe7e7; border-color: #edc1c1; color: #7b1c1c; }
.glow { box-shadow: 0 0 0 2px rgba(30, 154, 132, 0.2), 0 0 18px rgba(30, 154, 132, 0.3); }
@media (max-width: 940px) {
.stats { grid-template-columns: 1fr; }
.grid { grid-template-columns: 1fr; }
}
</style>
</head>
<body>
<main class="wrap">
<section class="card">
<div class="button-row">
<form method="post" action="{{ url_for('logout') }}"><button class="btn-ghost" type="submit">Logout</button></form>
<button class="btn" type="button" onclick="window.location.reload()">Refresh Now</button>
</div>
<h1>Secondary Live Viewer</h1>
<p class="muted">Realtime mirror from {{ master_url }} | last fetch: {{ generated_at or 'failed' }}</p>
{% if notice %}
<div class="notice {{ notice.tone }}">{{ notice.message }}</div>
{% endif %}
{% if fetch_error %}
<div class="notice danger">Master fetch failed: {{ fetch_error }}</div>
{% endif %}
<div class="stats">
<div class="stat"><div class="label">Providers</div><div class="value">{{ provider_count }}</div></div>
<div class="stat"><div class="label">Key IDs tracked</div><div class="value">{{ key_count }}</div></div>
<div class="stat"><div class="label">Ciphertext mappings</div><div class="value">{{ mapping_count }}</div></div>
<div class="stat"><div class="label">Blocked mappings</div><div class="value">{{ blocked_count }}</div></div>
</div>
</section>
<section class="grid">
<article class="card">
<h2>Providers</h2>
{% if providers %}
{% for provider in providers %}
<div class="provider">
<strong>ID {{ provider.provider_id }} - {{ provider.name }}</strong>
{% if provider.description %}<div class="muted">{{ provider.description }}</div>{% endif %}
<div class="mono">Prefix {{ provider.prefix }}</div>
<div class="muted">Derived IDs: {{ provider.derived_count }} | Punctured IDs: {{ provider.punctured_count }} | Key rows: {{ provider.key_count }}</div>
</div>
{% endfor %}
{% else %}
<p class="muted">No provider data.</p>
{% endif %}
</article>
<article class="card">
<h2>Key Journal</h2>
{% if key_journal %}
{% for key in key_journal %}
<div class="row{% if key.ever_punctured %} blocked{% endif %}">
<strong>Provider {{ key.provider_id }} | Key ID {{ key.file_time_id }}</strong>
<div class="mono">{{ key.path_provider }} | {{ key.path_resource }}</div>
<div class="muted">Derived {{ key.derive_count }}x | Punctured {{ key.puncture_count }}x</div>
{% if key.description %}<div class="muted">Purpose: {{ key.description }}</div>{% endif %}
</div>
{% endfor %}
{% else %}
<p class="muted">No keys tracked.</p>
{% endif %}
</article>
</section>
<section class="card">
<h2>Assets and Ciphertexts</h2>
{% if asset_files %}
{% for file in asset_files %}
<div class="provider">
<strong>{{ file.plaintext_relpath }}</strong>
<div class="muted">Mappings: {{ file.mapping_count }} | Blocked: {{ file.blocked_count }}</div>
{% for row in file.mappings %}
<div class="row{% if row.show_red %} blocked{% endif %}{% if row.show_glow %} glow{% endif %}">
<div><strong>Provider {{ row.provider_id }} | Key ID {{ row.file_time_id }}</strong></div>
<div class="mono">cipher: {{ row.ciphertext_relpath }}</div>
<div class="mono">tag: {{ row.path_provider }} | {{ row.path_resource }}</div>
<div class="muted">Status: {{ 'decryptable' if row.is_accessible else 'blocked by puncture' }}</div>
</div>
{% endfor %}
</div>
{% endfor %}
{% else %}
<p class="muted">No asset mappings found.</p>
{% endif %}
</section>
</main>
<script>
setTimeout(() => window.location.reload(), 8000);
</script>
</body>
</html>
"""
if __name__ == "__main__":
main()

View file

@ -1,135 +0,0 @@
"""Sync bundle helpers for read-only companion viewer app."""
from __future__ import annotations
import hashlib
import hmac
import json
from datetime import datetime, timezone
from typing import Any, Dict, Optional
def _canonical_json(data: Any) -> str:
return json.dumps(data, sort_keys=True, separators=(",", ":"))
def _utc_now_label() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
def sign_payload(payload: Dict[str, Any], sync_key: str) -> str:
digest = hmac.new(sync_key.encode("utf-8"), _canonical_json(payload).encode("utf-8"), hashlib.sha256)
return digest.hexdigest()
def verify_payload_signature(payload: Dict[str, Any], signature_hex: str, sync_key: str) -> bool:
expected = sign_payload(payload, sync_key)
return hmac.compare_digest(expected, signature_hex)
def build_view_payload(system: Dict[str, Any], puncture_log: list[str]) -> Dict[str, Any]:
providers = []
for provider_id in sorted(system["providers"].keys()):
src = system["providers"][provider_id]
providers.append(
{
"provider_id": int(src["provider_id"]),
"name": str(src["name"]),
"description": str(src.get("description", "")),
"created_at": str(src.get("created_at", "")),
}
)
key_entries = []
for entry in system.get("key_journal", {}).values():
key_entries.append(
{
"provider_id": int(entry["provider_id"]),
"file_time_id": int(entry["file_time_id"]),
"path": str(entry["path"]),
"description": str(entry.get("description", "")),
"ever_derived": bool(entry.get("ever_derived", False)),
"ever_punctured": bool(entry.get("ever_punctured", False)),
"derive_count": int(entry.get("derive_count", 0)),
"puncture_count": int(entry.get("puncture_count", 0)),
"last_derived_at": entry.get("last_derived_at"),
"last_punctured_at": entry.get("last_punctured_at"),
}
)
key_entries.sort(key=lambda row: (row["provider_id"], row["file_time_id"]))
allowed_paths = sorted(
row["path"]
for row in key_entries
if row["ever_derived"] and not row["ever_punctured"]
)
return {
"version": 1,
"generated_at": _utc_now_label(),
"providers": providers,
"known_keys": key_entries,
"allowed_paths": allowed_paths,
"puncture_log": list(puncture_log),
"deleted_providers": list(system.get("deleted_providers", [])),
}
def wrap_view_bundle(payload: Dict[str, Any], sync_key: Optional[str] = None) -> Dict[str, Any]:
if sync_key:
signature = sign_payload(payload, sync_key)
return {"payload": payload, "hmac_sha256": signature, "signed": True}
return {"payload": payload, "hmac_sha256": None, "signed": False}
def extract_view_payload(
bundle_or_payload: Dict[str, Any],
*,
sync_key: Optional[str] = None,
require_signature: bool = False,
) -> Dict[str, Any]:
if "payload" in bundle_or_payload and isinstance(bundle_or_payload["payload"], dict):
payload = bundle_or_payload["payload"]
signature = bundle_or_payload.get("hmac_sha256")
else:
payload = bundle_or_payload
signature = None
if require_signature and not signature:
raise ValueError("signed bundle required")
if signature:
if not sync_key:
raise ValueError("bundle is signed but no sync key is configured")
if not verify_payload_signature(payload, str(signature), sync_key):
raise ValueError("bundle signature verification failed")
_validate_view_payload(payload)
return payload
def _validate_view_payload(payload: Dict[str, Any]) -> None:
if not isinstance(payload, dict):
raise ValueError("payload must be a dict")
required_list_fields = ["providers", "known_keys", "allowed_paths", "puncture_log"]
for field in required_list_fields:
if not isinstance(payload.get(field), list):
raise ValueError(f"payload['{field}'] must be a list")
for provider in payload["providers"]:
if not isinstance(provider, dict):
raise ValueError("providers entries must be objects")
if "provider_id" not in provider:
raise ValueError("provider entries must contain provider_id")
for key in payload["known_keys"]:
if not isinstance(key, dict):
raise ValueError("known_keys entries must be objects")
for field in ["provider_id", "file_time_id", "path", "ever_derived", "ever_punctured"]:
if field not in key:
raise ValueError(f"known_keys entries must contain {field}")
if payload.get("version") not in {1, None}:
raise ValueError("unsupported payload version")

File diff suppressed because it is too large Load diff

View file

@ -1,14 +0,0 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "puncture"
version = "0.1.0"
description = "Zero-trust cloud-wide forward secrecy via puncturable GGM keys"
requires-python = ">=3.10"
dependencies = ["flask>=3.0.0"]
[tool.pytest.ini_options]
pythonpath = ["."]
addopts = "-q"

View file

@ -1,2 +0,0 @@
flask>=3.0.0
pytest>=8.0.0

View file

@ -1,450 +0,0 @@
import hashlib
import hmac
from io import BytesIO
from pathlib import Path
from puncture.web_app import (
ENC_MAGIC,
ENC_NONCE_SIZE,
_asset_abs_path,
_asset_lifecycle_state,
_decrypt_blob,
_encrypt_blob,
_list_plaintext_rows,
_next_ciphertext_relpath,
_next_plaintext_relpath,
_normalize_relpath,
create_app,
)
def test_normalize_relpath_rejects_absolute_and_traversal() -> None:
assert _normalize_relpath("docs/a.txt") == "docs/a.txt"
try:
_normalize_relpath("/etc/passwd")
assert False, "absolute paths must fail"
except ValueError:
pass
try:
_normalize_relpath("../secret.txt")
assert False, "path traversal must fail"
except ValueError:
pass
def test_asset_abs_path_stays_inside_root(tmp_path: Path) -> None:
root = tmp_path / "assets"
root.mkdir()
ok = _asset_abs_path(str(root), "a/b.txt")
assert ok.startswith(str(root))
try:
_asset_abs_path(str(root), "../escape.txt")
assert False, "escape should fail"
except ValueError:
pass
def test_next_plaintext_relpath_versions_collisions(tmp_path: Path) -> None:
root = tmp_path / "assets"
(root / "docs").mkdir(parents=True)
(root / "docs" / "a.txt").write_text("x", encoding="utf-8")
(root / "docs" / "a.v2.txt").write_text("x", encoding="utf-8")
candidate = _next_plaintext_relpath(str(root), "docs/a.txt")
assert candidate == "docs/a.v3.txt"
def test_next_ciphertext_relpath_versions_collisions(tmp_path: Path) -> None:
root = tmp_path / "assets"
(root / "docs").mkdir(parents=True)
first = _next_ciphertext_relpath(str(root), "docs/a.txt", 42, 123)
assert first == "docs/a.txt.enc.p42.k123.pke"
(root / first).write_bytes(b"x")
second = _next_ciphertext_relpath(str(root), "docs/a.txt", 42, 123)
assert second == "docs/a.txt.enc.p42.k123.v2.pke"
def test_encrypt_blob_has_expected_format_and_tag() -> None:
key = b"k" * 32
plaintext = b"hello-world"
blob = _encrypt_blob(key, plaintext)
assert blob.startswith(ENC_MAGIC)
nonce = blob[len(ENC_MAGIC) : len(ENC_MAGIC) + ENC_NONCE_SIZE]
tag = blob[len(ENC_MAGIC) + ENC_NONCE_SIZE : len(ENC_MAGIC) + ENC_NONCE_SIZE + 32]
ciphertext = blob[len(ENC_MAGIC) + ENC_NONCE_SIZE + 32 :]
assert len(nonce) == ENC_NONCE_SIZE
assert len(tag) == 32
assert len(ciphertext) == len(plaintext)
assert ciphertext != plaintext
expected_tag = hmac.new(key, b"TAG" + nonce + ciphertext, hashlib.sha256).digest()
assert tag == expected_tag
def test_decrypt_blob_roundtrip_and_authentication() -> None:
key = b"z" * 32
plaintext = b"secret-content"
blob = _encrypt_blob(key, plaintext)
assert _decrypt_blob(key, blob) == plaintext
tampered = bytearray(blob)
tampered[-1] ^= 0xFF
try:
_decrypt_blob(key, bytes(tampered))
assert False, "tampering should fail authentication"
except ValueError as exc:
assert "authentication" in str(exc)
def test_list_plaintext_rows_excludes_ciphertexts(tmp_path: Path) -> None:
root = tmp_path / "assets"
root.mkdir()
(root / "a.txt").write_text("a", encoding="utf-8")
(root / "a.txt.enc.p42.k1.pke").write_bytes(b"cipher")
rows = _list_plaintext_rows(str(root))
assert len(rows) == 1
assert rows[0]["relpath"] == "a.txt"
assert rows[0]["size_bytes"] == 1
assert rows[0]["size_label"].endswith("B")
def test_asset_lifecycle_state_machine_classification() -> None:
assert _asset_lifecycle_state(mapping_count=0, blocked_count=0) == "eligible"
assert _asset_lifecycle_state(mapping_count=2, blocked_count=0) == "encrypted_live"
assert _asset_lifecycle_state(mapping_count=3, blocked_count=1) == "encrypted_partial"
assert _asset_lifecycle_state(mapping_count=4, blocked_count=4) == "encrypted_blocked"
def test_assets_page_renders_state_machine_workflow(monkeypatch, tmp_path: Path) -> None:
root = tmp_path / "assets"
root.mkdir()
(root / "one.txt").write_text("1", encoding="utf-8")
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
app = create_app()
client = app.test_client()
resp = client.get("/assets")
html = resp.data.decode("utf-8")
assert resp.status_code == 200
assert "Asset Workflow" in html
assert "Single Lifecycle Flow" in html
assert "id=\"upload_btn\"" in html
assert "id=\"eligible_list\"" in html
assert "id=\"encrypt_btn\"" in html
assert "id=\"wipe_btn\"" in html
assert "id=\"combo_quick\"" in html
assert "const INITIAL_STATE" in html
assert "/api/assets/workflow/upload" in html
assert "/api/assets/workflow/encrypt" in html
assert "/api/assets/workflow/decrypt" in html
def test_assets_workflow_api_shows_quick_key_combo_options(monkeypatch, tmp_path: Path) -> None:
root = tmp_path / "assets"
root.mkdir()
(root / "one.txt").write_text("1", encoding="utf-8")
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
app = create_app()
client = app.test_client()
client.post(
"/derive",
data={"provider_id": "42", "file_time_id": "999", "purpose": "seed combo"},
follow_redirects=True,
)
payload = client.get("/api/assets/workflow").get_json()
combos = payload["state"]["key_combo_options"]
assert any(item["provider_id"] == 42 and item["file_time_id"] == 999 for item in combos)
labels = [item["label"] for item in combos]
assert any("Provider 42 | Key 999 | active" in label for label in labels)
def test_asset_workflow_encrypt_api_requires_selection(monkeypatch, tmp_path: Path) -> None:
root = tmp_path / "assets"
root.mkdir()
(root / "one.txt").write_text("1", encoding="utf-8")
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
app = create_app()
client = app.test_client()
resp = client.post(
"/api/assets/workflow/encrypt",
json={"provider_id": 42, "file_time_id": 7, "purpose": "none selected", "plaintext_relpaths": []},
)
assert resp.status_code == 400
assert "select existing files or upload files before encrypting" in resp.get_json()["error"]
def test_asset_upload_duplicate_filename_is_versioned(monkeypatch, tmp_path: Path) -> None:
root = tmp_path / "assets"
root.mkdir()
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
app = create_app()
client = app.test_client()
client.post(
"/assets/upload",
data={"target_subdir": "docs", "files": [(BytesIO(b"a"), "dup.txt")]},
content_type="multipart/form-data",
follow_redirects=True,
)
client.post(
"/assets/upload",
data={"target_subdir": "docs", "files": [(BytesIO(b"b"), "dup.txt")]},
content_type="multipart/form-data",
follow_redirects=True,
)
assert (root / "docs" / "dup.txt").is_file()
assert (root / "docs" / "dup.v2.txt").is_file()
def test_asset_page_renders_blocked_and_glow_mappings(monkeypatch, tmp_path: Path) -> None:
root = tmp_path / "assets"
root.mkdir()
(root / "a.txt").write_text("alpha", encoding="utf-8")
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
app = create_app()
client = app.test_client()
client.post(
"/assets/encrypt",
data={
"plaintext_relpaths": ["a.txt"],
"provider_id": "42",
"file_time_id": "100",
"purpose": "a via p42",
},
follow_redirects=True,
)
client.post(
"/assets/encrypt",
data={
"plaintext_relpaths": ["a.txt"],
"provider_id": "17",
"file_time_id": "200",
"purpose": "a via p17",
},
follow_redirects=True,
)
client.post("/puncture", data={"provider_id": "42", "file_time_id": "100"}, follow_redirects=True)
payload = client.get("/api/assets/workflow").get_json()["state"]
file_map = {row["plaintext_relpath"]: row for row in payload["asset_files"]}
rows = {(r["provider_id"], r["file_time_id"]): r for r in file_map["a.txt"]["mappings"]}
assert rows[(42, 100)]["show_red"] is True
assert rows[(17, 200)]["show_glow"] is True
def test_asset_workflow_upload_api_makes_files_immediately_eligible(monkeypatch, tmp_path: Path) -> None:
root = tmp_path / "assets_upload_eligible"
root.mkdir()
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
app = create_app()
client = app.test_client()
resp = client.post(
"/api/assets/workflow/upload",
data={"target_subdir": "incoming", "files": [(BytesIO(b"hello"), "a.txt"), (BytesIO(b"world"), "b.txt")]},
content_type="multipart/form-data",
)
assert resp.status_code == 200
data = resp.get_json()
assert data["ok"] is True
assert sorted(data["uploaded"]) == ["incoming/a.txt", "incoming/b.txt"]
assert (root / "incoming" / "a.txt").is_file()
assert (root / "incoming" / "b.txt").is_file()
states = {row["relpath"]: row["lifecycle_state"] for row in data["state"]["files"]}
assert states["incoming/a.txt"] == "eligible"
assert states["incoming/b.txt"] == "eligible"
def test_asset_workflow_encrypt_api_saves_ciphertext_immediately(monkeypatch, tmp_path: Path) -> None:
root = tmp_path / "assets_encrypt_now"
root.mkdir()
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
app = create_app()
client = app.test_client()
upload = client.post(
"/api/assets/workflow/upload",
data={"target_subdir": "x", "files": [(BytesIO(b"alpha"), "one.txt")]},
content_type="multipart/form-data",
).get_json()
assert upload["ok"] is True
enc = client.post(
"/api/assets/workflow/encrypt",
json={
"provider_id": 17,
"file_time_id": 987,
"purpose": "encrypt now",
"plaintext_relpaths": ["x/one.txt"],
},
)
assert enc.status_code == 200
data = enc.get_json()
assert data["ok"] is True
assert (root / "x" / "one.txt.enc.p17.k987.pke").is_file()
file_states = {row["relpath"]: row for row in data["state"]["files"]}
assert file_states["x/one.txt"]["lifecycle_state"] == "encrypted_live"
assert file_states["x/one.txt"]["mapping_count"] == 1
def test_asset_workflow_clear_api_resets_saved_inputs(monkeypatch, tmp_path: Path) -> None:
root = tmp_path / "assets_wipe"
root.mkdir()
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
app = create_app()
client = app.test_client()
client.post(
"/api/assets/workflow/encrypt",
json={
"provider_id": 42,
"file_time_id": 111,
"purpose": "will fail no files but sets no state",
"plaintext_relpaths": [],
},
)
clear = client.post("/api/assets/workflow/clear")
assert clear.status_code == 200
payload = clear.get_json()
assert payload["ok"] is True
assert payload["state"]["last_inputs"]["provider_id"] == 42
assert payload["state"]["last_inputs"]["file_time_id"] == 123456
def test_index_shows_frontier_and_removes_share_step(monkeypatch, tmp_path: Path) -> None:
root = tmp_path / "assets_frontier_index"
root.mkdir()
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
app = create_app()
client = app.test_client()
resp = client.get("/")
html = resp.data.decode("utf-8")
assert resp.status_code == 200
assert "Current Active Roots (Frontier)" in html
assert "Tree/Subtree Visualization" in html
assert "No puncture yet. Root frontier covers the full derivation space." in html
assert "Step 1: Backup Shares (2-of-3)" not in html
assert html.find("Current Active Roots (Frontier)") < html.find("Quick Start Walkthrough")
def test_api_state_includes_frontier_and_excludes_share_fields(monkeypatch, tmp_path: Path) -> None:
root = tmp_path / "assets_frontier_api"
root.mkdir()
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
app = create_app()
client = app.test_client()
payload = client.get("/api/state").get_json()
assert payload["active_prefixes"] == [""]
assert len(payload["active_frontier"]) == 1
assert payload["active_frontier"][0]["is_root"] is True
assert "seed_shares" not in payload
assert "shares_acknowledged" not in payload
def test_frontier_moves_from_root_after_puncture(monkeypatch, tmp_path: Path) -> None:
root = tmp_path / "assets_frontier_puncture"
root.mkdir()
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
app = create_app()
client = app.test_client()
before = client.get("/api/state").get_json()
assert before["active_prefixes"] == [""]
client.post("/puncture", data={"provider_id": "42", "file_time_id": "123456"}, follow_redirects=True)
after = client.get("/api/state").get_json()
assert "" not in after["active_prefixes"]
assert len(after["active_prefixes"]) > 1
assert all(row["depth"] > 0 for row in after["active_frontier"])
def test_api_state_tracks_last_puncture_frontier_diff(monkeypatch, tmp_path: Path) -> None:
root = tmp_path / "assets_last_diff"
root.mkdir()
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
app = create_app()
client = app.test_client()
client.post("/puncture", data={"provider_id": "42", "file_time_id": "123456"}, follow_redirects=True)
payload = client.get("/api/state").get_json()
diff = payload["last_puncture_diff"]
assert diff["target_kind"] == "tag"
assert diff["target"] == "01010100000000011110001001000000"
assert "" in diff["removed"]
assert isinstance(diff["added"], list)
def test_asset_workflow_decrypt_api_restores_cleartext(monkeypatch, tmp_path: Path) -> None:
root = tmp_path / "assets_decrypt_ok"
root.mkdir()
(root / "p.txt").write_text("plain-alpha", encoding="utf-8")
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
app = create_app()
client = app.test_client()
enc = client.post(
"/api/assets/workflow/encrypt",
json={
"provider_id": 42,
"file_time_id": 456,
"purpose": "decrypt-test",
"plaintext_relpaths": ["p.txt"],
},
).get_json()
mapping = enc["state"]["asset_files"][0]["mappings"][0]
record_id = int(mapping["record_id"])
dec = client.post("/api/assets/workflow/decrypt", json={"record_ids": [record_id]})
assert dec.status_code == 200
payload = dec.get_json()
assert payload["ok"] is True
restored = payload["restored"][0]["decrypted_relpath"]
assert (root / restored).is_file()
assert (root / restored).read_text(encoding="utf-8") == "plain-alpha"
def test_asset_workflow_decrypt_fails_when_key_punctured(monkeypatch, tmp_path: Path) -> None:
root = tmp_path / "assets_decrypt_blocked"
root.mkdir()
(root / "q.txt").write_text("plain-beta", encoding="utf-8")
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
app = create_app()
client = app.test_client()
enc = client.post(
"/api/assets/workflow/encrypt",
json={
"provider_id": 17,
"file_time_id": 700,
"purpose": "puncture-then-decrypt",
"plaintext_relpaths": ["q.txt"],
},
).get_json()
mapping = enc["state"]["asset_files"][0]["mappings"][0]
record_id = int(mapping["record_id"])
client.post("/puncture", data={"provider_id": "17", "file_time_id": "700"}, follow_redirects=True)
dec = client.post("/api/assets/workflow/decrypt", json={"record_ids": [record_id]})
assert dec.status_code == 400
assert "punctured" in dec.get_json()["error"]

View file

@ -1,157 +0,0 @@
from pathlib import Path
from io import BytesIO
from puncture.web_app import create_app
def test_asset_mapping_status_red_and_glow(monkeypatch, tmp_path: Path) -> None:
asset_root = tmp_path / "assets"
asset_root.mkdir()
(asset_root / "a.txt").write_text("alpha", encoding="utf-8")
(asset_root / "b.txt").write_text("beta", encoding="utf-8")
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(asset_root))
app = create_app()
client = app.test_client()
client.post(
"/assets/encrypt",
data={
"plaintext_relpath": "a.txt",
"provider_id": "42",
"file_time_id": "100",
"purpose": "a via p42",
},
)
client.post(
"/assets/encrypt",
data={
"plaintext_relpath": "a.txt",
"provider_id": "17",
"file_time_id": "200",
"purpose": "a via p17",
},
)
client.post(
"/assets/encrypt",
data={
"plaintext_relpath": "b.txt",
"provider_id": "42",
"file_time_id": "100",
"purpose": "b via p42",
},
)
# Puncture shared key (provider 42 / key 100) used by two files.
client.post("/puncture", data={"provider_id": "42", "file_time_id": "100"}, follow_redirects=True)
live = client.get("/api/live/state").get_json()
files = {row["plaintext_relpath"]: row for row in live["assets"]["asset_files"]}
file_a = files["a.txt"]
rows_a = {(r["provider_id"], r["file_time_id"]): r for r in file_a["mappings"]}
assert rows_a[(42, 100)]["show_red"] is True
assert rows_a[(42, 100)]["is_accessible"] is False
assert rows_a[(17, 200)]["show_glow"] is True
assert rows_a[(17, 200)]["is_accessible"] is True
file_b = files["b.txt"]
rows_b = {(r["provider_id"], r["file_time_id"]): r for r in file_b["mappings"]}
assert rows_b[(42, 100)]["show_red"] is True
assert rows_b[(42, 100)]["is_accessible"] is False
def test_remote_puncture_provider_endpoint_requires_token(monkeypatch, tmp_path: Path) -> None:
asset_root = tmp_path / "assets2"
asset_root.mkdir()
(asset_root / "c.txt").write_text("content", encoding="utf-8")
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(asset_root))
monkeypatch.setenv("PUNCTURE_REMOTE_TOKEN", "tok")
app = create_app()
client = app.test_client()
# Derive once pre-kill to verify accessibility.
resp_pre = client.post(
"/derive",
data={"provider_id": "42", "file_time_id": "300", "purpose": "pre"},
follow_redirects=True,
)
assert b"Derive succeeded" in resp_pre.data
denied = client.post("/api/remote/puncture-provider", json={"provider_id": 42})
assert denied.status_code == 403
allowed = client.post(
"/api/remote/puncture-provider",
json={"provider_id": 42},
headers={"X-Puncture-Token": "tok"},
)
assert allowed.status_code == 200
assert allowed.get_json()["ok"] is True
resp_post = client.post(
"/derive",
data={"provider_id": "42", "file_time_id": "300", "purpose": "post"},
follow_redirects=True,
)
assert b"Derive blocked" in resp_post.data
def test_asset_page_can_encrypt_multiple_selected_files(monkeypatch, tmp_path: Path) -> None:
asset_root = tmp_path / "assets3"
asset_root.mkdir()
(asset_root / "f1.txt").write_text("f1", encoding="utf-8")
(asset_root / "f2.txt").write_text("f2", encoding="utf-8")
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(asset_root))
app = create_app()
client = app.test_client()
client.post(
"/assets/encrypt",
data={
"plaintext_relpaths": ["f1.txt", "f2.txt"],
"provider_id": "42",
"file_time_id": "444",
"purpose": "batch",
},
follow_redirects=True,
)
ciphers = sorted(asset_root.glob("*.pke"))
assert len(ciphers) == 2
assert any(".enc.p42.k444.pke" in p.name for p in ciphers)
live = client.get("/api/live/state").get_json()
assert live["assets"]["mapping_count"] == 2
files = {row["plaintext_relpath"]: row for row in live["assets"]["asset_files"]}
assert "f1.txt" in files
assert "f2.txt" in files
def test_asset_upload_persists_cleartext_files(monkeypatch, tmp_path: Path) -> None:
asset_root = tmp_path / "assets4"
asset_root.mkdir()
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(asset_root))
app = create_app()
client = app.test_client()
resp = client.post(
"/assets/upload",
data={
"target_subdir": "docs",
"files": [
(BytesIO(b"alpha"), "a.txt"),
(BytesIO(b"beta"), "b.txt"),
],
},
content_type="multipart/form-data",
follow_redirects=True,
)
assert resp.status_code == 200
assert (asset_root / "docs" / "a.txt").is_file()
assert (asset_root / "docs" / "b.txt").is_file()

View file

@ -1,90 +0,0 @@
from puncture.key_manager import (
PATH_BITS,
PuncturableKeyManager,
provider_id_to_prefix,
tag_to_binary_path,
)
def test_mapping_and_derivation_roundtrip() -> None:
manager = PuncturableKeyManager(master_seed=b"\x11" * 32)
path = tag_to_binary_path(42, 123)
key = manager.get_key_for_tag(path)
assert key is not None
assert len(key) == 32
def test_scenario_a_provider_42_then_puncture() -> None:
manager = PuncturableKeyManager(master_seed=b"\x22" * 32)
path = tag_to_binary_path(42, 999)
key_before = manager.get_key_for_tag(path)
assert key_before is not None
punctured = manager.puncture(path)
assert punctured is True
assert manager.get_key_for_tag(path) is None
def test_non_target_paths_still_accessible_after_puncture() -> None:
manager = PuncturableKeyManager(master_seed=b"\x33" * 32)
target = tag_to_binary_path(42, 12345)
other = tag_to_binary_path(42, 12346)
other_before = manager.get_key_for_tag(other)
manager.puncture(target)
other_after = manager.get_key_for_tag(other)
assert manager.get_key_for_tag(target) is None
assert other_before == other_after
def test_minimal_copath_replacement_from_root() -> None:
manager = PuncturableKeyManager(master_seed=b"\x44" * 32)
path = tag_to_binary_path(1, 1)
assert manager.active_node_count == 1
manager.puncture(path)
# Remove root (1) and add one sibling node per level (32).
assert manager.active_node_count == PATH_BITS
def test_scenario_b_seized_state_cannot_recover_punctured() -> None:
manager = PuncturableKeyManager(master_seed=b"\x55" * 32)
punctured = tag_to_binary_path(42, 2024)
control = tag_to_binary_path(42, 2025)
control_before = manager.get_key_for_tag(control)
manager.puncture(punctured)
seized = PuncturableKeyManager.from_state(manager.export_state())
assert seized.get_key_for_tag(punctured) is None
assert seized.get_key_for_tag(control) == control_before
def test_provider_puncture_blocks_all_provider_keys() -> None:
manager = PuncturableKeyManager(master_seed=b"\x66" * 32)
p42_a = tag_to_binary_path(42, 100)
p42_b = tag_to_binary_path(42, 101)
p41 = tag_to_binary_path(41, 100)
p41_before = manager.get_key_for_tag(p41)
assert manager.puncture_provider(42) is True
assert manager.get_key_for_tag(p42_a) is None
assert manager.get_key_for_tag(p42_b) is None
assert manager.get_key_for_tag(p41) == p41_before
def test_prefix_puncture_log_replay() -> None:
seed = b"\x77" * 32
source = PuncturableKeyManager(master_seed=seed)
target = PuncturableKeyManager(master_seed=seed)
provider_prefix = provider_id_to_prefix(42)
source.puncture_prefix(provider_prefix)
applied = target.apply_puncture_log(source.puncture_log())
assert applied == 1
assert target.get_key_for_tag(tag_to_binary_path(42, 1234)) is None

View file

@ -1,92 +0,0 @@
from puncture import view_app
def _sample_live_state() -> dict:
return {
"generated_at": "2026-02-12 19:00:00 UTC",
"providers": [
{
"provider_id": 42,
"name": "Provider 42",
"description": "Demo",
"prefix": "0101010",
"derived_count": 1,
"punctured_count": 0,
"key_count": 1,
}
],
"key_journal": [
{
"provider_id": 42,
"file_time_id": 1001,
"path_provider": "0101010",
"path_resource": "0000000000000001111101001",
"derive_count": 1,
"puncture_count": 0,
"description": "alpha",
"ever_punctured": False,
}
],
"assets": {
"mapping_count": 1,
"blocked_count": 0,
"glow_count": 0,
"asset_files": [
{
"plaintext_relpath": "docs/a.txt",
"mapping_count": 1,
"blocked_count": 0,
"mappings": [
{
"provider_id": 42,
"file_time_id": 1001,
"ciphertext_relpath": "docs/a.txt.enc.p42.k1001.pke",
"path_provider": "0101010",
"path_resource": "0000000000000001111101001",
"is_accessible": True,
"show_red": False,
"show_glow": False,
}
],
}
],
"key_cards": [],
},
}
def test_secondary_login_success(monkeypatch) -> None:
monkeypatch.setenv("PUNCTURE_SECONDARY_PASSWORD", "secret")
monkeypatch.setattr(view_app, "_fetch_master_state", lambda: _sample_live_state())
app = view_app.create_app()
client = app.test_client()
resp = client.post("/login", data={"password": "secret"}, follow_redirects=True)
assert resp.status_code == 200
assert b"Secondary Live Viewer" in resp.data
api_resp = client.get("/api/state")
assert api_resp.status_code == 200
assert api_resp.get_json()["ok"] is True
def test_secondary_kill_switch_password_triggers_remote_puncture(monkeypatch) -> None:
monkeypatch.setenv("PUNCTURE_SECONDARY_PASSWORD", "secret")
monkeypatch.setattr(view_app, "_fetch_master_state", lambda: _sample_live_state())
called: dict[str, int] = {}
def _fake_remote(provider_id: int) -> dict:
called["provider_id"] = provider_id
return {"ok": True, "provider_id": provider_id}
monkeypatch.setattr(view_app, "_remote_puncture_provider", _fake_remote)
app = view_app.create_app()
client = app.test_client()
resp = client.post("/login", data={"password": "secret42"}, follow_redirects=True)
assert resp.status_code == 200
assert called["provider_id"] == 42
assert b"Kill switch activated" in resp.data

View file

@ -1,88 +0,0 @@
from puncture.view_sync import (
build_view_payload,
extract_view_payload,
sign_payload,
verify_payload_signature,
wrap_view_bundle,
)
def _sample_system() -> dict:
return {
"providers": {
42: {
"provider_id": 42,
"name": "Provider 42",
"description": "Demo",
"created_at": "10:00:00 UTC",
}
},
"key_journal": {
"01010100000000000000000000000001": {
"provider_id": 42,
"file_time_id": 1,
"path": "01010100000000000000000000000001",
"description": "active",
"ever_derived": True,
"ever_punctured": False,
"derive_count": 1,
"puncture_count": 0,
"last_derived_at": "10:01:00 UTC",
"last_punctured_at": None,
},
"01010100000000000000000000000010": {
"provider_id": 42,
"file_time_id": 2,
"path": "01010100000000000000000000000010",
"description": "punctured",
"ever_derived": True,
"ever_punctured": True,
"derive_count": 1,
"puncture_count": 1,
"last_derived_at": "10:02:00 UTC",
"last_punctured_at": "10:03:00 UTC",
},
"01010100000000000000000000000011": {
"provider_id": 42,
"file_time_id": 3,
"path": "01010100000000000000000000000011",
"description": "never derived",
"ever_derived": False,
"ever_punctured": False,
"derive_count": 0,
"puncture_count": 0,
"last_derived_at": None,
"last_punctured_at": None,
},
},
"deleted_providers": [],
}
def test_build_view_payload_allows_only_derived_non_punctured() -> None:
payload = build_view_payload(_sample_system(), puncture_log=["0101010"])
assert payload["allowed_paths"] == ["01010100000000000000000000000001"]
assert payload["puncture_log"] == ["0101010"]
assert len(payload["known_keys"]) == 3
def test_sign_and_verify_bundle() -> None:
payload = build_view_payload(_sample_system(), puncture_log=[])
key = "sync-secret"
signature = sign_payload(payload, key)
assert verify_payload_signature(payload, signature, key)
wrapped = wrap_view_bundle(payload, key)
extracted = extract_view_payload(wrapped, sync_key=key, require_signature=True)
assert extracted["allowed_paths"] == payload["allowed_paths"]
def test_extract_rejects_bad_signature() -> None:
payload = build_view_payload(_sample_system(), puncture_log=[])
wrapped = {"payload": payload, "hmac_sha256": "deadbeef", "signed": True}
try:
extract_view_payload(wrapped, sync_key="sync-secret", require_signature=True)
assert False, "expected signature verification failure"
except ValueError as exc:
assert "signature" in str(exc).lower()