From 3e71cebfcf670937873fe246041d5fbfdd4de4ac Mon Sep 17 00:00:00 2001 From: saymrwulf Date: Fri, 13 Feb 2026 18:53:06 +0100 Subject: [PATCH] chore: make primary branch Go-only; move Python line to python-legacy --- README.md | 105 +- puncture/__init__.py | 21 - puncture/key_manager.py | 283 --- puncture/simulation.py | 94 - puncture/view_app.py | 477 ---- puncture/view_sync.py | 135 - puncture/web_app.py | 3762 ---------------------------- pyproject.toml | 14 - requirements.txt | 2 - tests/test_asset_helpers_and_ui.py | 450 ---- tests/test_master_asset_mapping.py | 157 -- tests/test_puncture_manager.py | 90 - tests/test_view_app_policy.py | 92 - tests/test_view_sync.py | 88 - 14 files changed, 31 insertions(+), 5739 deletions(-) delete mode 100644 puncture/__init__.py delete mode 100644 puncture/key_manager.py delete mode 100644 puncture/simulation.py delete mode 100644 puncture/view_app.py delete mode 100644 puncture/view_sync.py delete mode 100644 puncture/web_app.py delete mode 100644 pyproject.toml delete mode 100644 requirements.txt delete mode 100644 tests/test_asset_helpers_and_ui.py delete mode 100644 tests/test_master_asset_mapping.py delete mode 100644 tests/test_puncture_manager.py delete mode 100644 tests/test_view_app_policy.py delete mode 100644 tests/test_view_sync.py diff --git a/README.md b/README.md index d24e8ee..a54edb4 100644 --- a/README.md +++ b/README.md @@ -1,96 +1,53 @@ -# Puncture: Zero-Trust Cloud-Wide Forward Secrecy +# Puncture (Go) -Python implementation of puncturable encryption (PE) over a GGM tree, with: +Go implementation of a puncturable-key system (GGM tree) with: -- a **master app** (key management + provider management + asset encryption mapping) -- a **secondary app** (read-only live mirror with password auth and kill-switch login) +- primary macOS app/server for key derivation, puncturing, providers, and asset encryption/decryption +- iOS emergency companion app for remote provider-level puncture -## Core cryptographic model +## Current architecture -- 256-bit master seed root. -- HMAC-SHA256 left/right derivation for GGM child nodes. -- Non-sequential puncture with minimal co-path replacement. -- Tag schema: `[7 bits provider_id] | [25 bits file_time_id]`. -- Active-state model stores only active prefix nodes (not per-file keys). -- Immediate zeroization of replaced node material on puncture. -- Puncture log export/import (`list[str]` of bit strings). +- `goapp/cmd/server`: headless web server (`:9122`) +- `goapp/cmd/desktop`: macOS desktop app (embedded webview + local server) +- `goapp/internal/crypto`: GGM puncturable key manager +- `goapp/internal/app`: provider/key/asset state machine +- `goapp/internal/server`: HTTP API + web UI +- `goapp/ios/EmergencyPuncture`: native iOS app -## Setup (venv) +## Run primary server ```bash -python3 -m venv .venv -source .venv/bin/activate -pip install -r requirements.txt +cd goapp +go run ./cmd/server --host 0.0.0.0 --port 9122 ``` -## Run +Open `http://127.0.0.1:9122`. -Master app on `9122`: +## Build macOS app + installer ```bash -python -m puncture.web_app --host 0.0.0.0 --port 9122 +cd goapp +./packaging/macos/build_dmg.sh ``` -Secondary app on `9222`: +Artifacts: -```bash -python -m puncture.view_app --host 0.0.0.0 --port 9222 -``` +- `goapp/dist/Puncture.app` +- `goapp/dist/Puncture.dmg` -## Master app (`:9122`) workflow +## Persistence -- `/`: main puncture lab (derive/puncture, history, active frontier roots view). -- `/providers`: add/edit/delete providers. -- `/assets`: pick a cleartext file from asset root, select provider/key, encrypt, and register mapping. +Frontier/puncture/providers/assets are persisted locally and survive restarts. +Default desktop paths: -### Asset behavior +- assets: `~/Library/Application Support/PunctureGo/assets` +- state: `~/Library/Application Support/PunctureGo/state.json` -- Ciphertext is written in the **same folder** as the cleartext file. -- Decryption writes recovered cleartext back into the same folder tree (versioned filenames). -- One cleartext file can have multiple provider-key mappings. -- One provider-key can encrypt multiple files. -- After key puncture: - - affected mappings are shown in **red** (`blocked by puncture`) - - if the same cleartext file still has another accessible mapping, that mapping **glows** +## iOS companion -## Secondary app (`:9222`) behavior +- Xcode project: `goapp/ios/EmergencyPuncture/EmergencyPuncture.xcodeproj` +- iPhone install guide: `goapp/ios/INSTALL_IPHONE.md` -- Password-gated access. -- Live read-only mirror from master (`GET /api/live/state`). -- No share setup and no independent derivation state. -- Kill-switch login format: - - normal login: `` - - kill-switch login: `` - - example: `puncture-view42` punctures provider `42` immediately on master. +## Legacy Python line -## Environment variables - -Master app: - -- `PUNCTURE_PORT` (default `9122`) -- `PUNCTURE_ASSET_ROOT` (default `/assets`) -- `PUNCTURE_REMOTE_TOKEN` (optional; required for remote puncture endpoint) -- `PUNCTURE_VIEW_SYNC_KEY` (optional signing key for `/api/view-bundle`) - -Secondary app: - -- `PUNCTURE_VIEW_PORT` (default `9222`) -- `PUNCTURE_MASTER_URL` (default `http://127.0.0.1:9122`) -- `PUNCTURE_SECONDARY_PASSWORD` (default `puncture-view`) -- `PUNCTURE_SECONDARY_SECRET` (Flask session secret) -- `PUNCTURE_REMOTE_TOKEN` (optional; sent as `X-Puncture-Token`) - -## API quick reference - -- `GET /api/state` (master full state) -- `GET /api/live/state` (master live data for secondary app) -- `POST /api/remote/puncture-provider` (master remote provider kill endpoint) -- `GET /api/export` / `POST /api/import` -- `POST /api/puncture-log` -- `GET /api/view-bundle` (legacy signed/unsigned viewer bundle export) - -## Tests - -```bash -pytest -q -``` +The previous Python implementation is preserved in branch `python-legacy`. diff --git a/puncture/__init__.py b/puncture/__init__.py deleted file mode 100644 index 35f11e9..0000000 --- a/puncture/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -from .key_manager import ( - PATH_BITS, - PROVIDER_BITS, - RESOURCE_BITS, - PuncturableKeyManager, - Tag, - binary_path_to_tag, - provider_id_to_prefix, - tag_to_binary_path, -) - -__all__ = [ - "PATH_BITS", - "PROVIDER_BITS", - "RESOURCE_BITS", - "PuncturableKeyManager", - "Tag", - "binary_path_to_tag", - "provider_id_to_prefix", - "tag_to_binary_path", -] diff --git a/puncture/key_manager.py b/puncture/key_manager.py deleted file mode 100644 index 84d00bc..0000000 --- a/puncture/key_manager.py +++ /dev/null @@ -1,283 +0,0 @@ -"""GGM-tree based puncturable key manager. - -This module implements a 32-bit tag space mapped as: -[7 bits provider_id] | [25 bits file/time_id] -""" - -from __future__ import annotations - -import hashlib -import hmac -import json -import os -from dataclasses import dataclass -from typing import Dict, Iterable, List, Optional - - -PROVIDER_BITS = 7 -RESOURCE_BITS = 25 -PATH_BITS = PROVIDER_BITS + RESOURCE_BITS -KEY_SIZE_BYTES = 32 - - -def _zeroize(buf: bytearray) -> None: - for i in range(len(buf)): - buf[i] = 0 - - -def _derive_child(parent_key: bytes | bytearray, bit: str) -> bytearray: - if bit not in {"0", "1"}: - raise ValueError("bit must be '0' or '1'") - marker = b"\x00" if bit == "0" else b"\x01" - child = hmac.new(bytes(parent_key), b"GGM" + marker, hashlib.sha256).digest() - return bytearray(child) - - -def _validate_binary_path(binary_path: str, expected_length: int = PATH_BITS) -> None: - if len(binary_path) != expected_length: - raise ValueError(f"binary_path must be {expected_length} bits") - if any(c not in "01" for c in binary_path): - raise ValueError("binary_path must contain only '0' or '1'") - - -def _validate_binary_prefix(binary_prefix: str, min_len: int = 1, max_len: int = PATH_BITS) -> None: - if not (min_len <= len(binary_prefix) <= max_len): - raise ValueError(f"binary_prefix must be between {min_len} and {max_len} bits") - if any(c not in "01" for c in binary_prefix): - raise ValueError("binary_prefix must contain only '0' or '1'") - - -@dataclass(frozen=True) -class Tag: - provider_id: int - file_time_id: int - - def to_binary_path(self) -> str: - return tag_to_binary_path(self.provider_id, self.file_time_id) - - -def tag_to_binary_path(provider_id: int, file_time_id: int) -> str: - if not (0 <= provider_id < (1 << PROVIDER_BITS)): - raise ValueError(f"provider_id must be in [0, {1 << PROVIDER_BITS})") - if not (0 <= file_time_id < (1 << RESOURCE_BITS)): - raise ValueError(f"file_time_id must be in [0, {1 << RESOURCE_BITS})") - - value = (provider_id << RESOURCE_BITS) | file_time_id - return f"{value:0{PATH_BITS}b}" - - -def provider_id_to_prefix(provider_id: int) -> str: - if not (0 <= provider_id < (1 << PROVIDER_BITS)): - raise ValueError(f"provider_id must be in [0, {1 << PROVIDER_BITS})") - return f"{provider_id:0{PROVIDER_BITS}b}" - - -def binary_path_to_tag(binary_path: str) -> Tag: - _validate_binary_path(binary_path) - value = int(binary_path, 2) - file_time_mask = (1 << RESOURCE_BITS) - 1 - provider_id = value >> RESOURCE_BITS - file_time_id = value & file_time_mask - return Tag(provider_id=provider_id, file_time_id=file_time_id) - - -class PuncturableKeyManager: - """Forward-secret key manager based on puncturable GGM keys. - - State model: - - The manager stores only a prefix-free set of active nodes (prefix -> seed). - - A puncture operation removes one covering ancestor node and replaces it with the - minimal set of sibling/co-path nodes needed to preserve all other leaves. - """ - - def __init__(self, master_seed: bytes): - if len(master_seed) != KEY_SIZE_BYTES: - raise ValueError(f"master_seed must be {KEY_SIZE_BYTES} bytes") - self._active_nodes: Dict[str, bytearray] = {"": bytearray(master_seed)} - self._puncture_log: List[str] = [] - self._punctured_paths: set[str] = set() - self._punctured_prefixes: set[str] = set() - - @staticmethod - def generate_master_seed() -> bytes: - return os.urandom(KEY_SIZE_BYTES) - - @property - def active_node_count(self) -> int: - return len(self._active_nodes) - - def active_prefixes(self) -> List[str]: - return sorted(self._active_nodes.keys(), key=lambda p: (len(p), p)) - - def puncture_log(self) -> List[str]: - return list(self._puncture_log) - - def export_puncture_log_json(self) -> str: - return json.dumps(self._puncture_log) - - def export_state(self) -> dict: - return { - "active_nodes": {prefix: bytes(seed).hex() for prefix, seed in self._active_nodes.items()}, - "puncture_log": list(self._puncture_log), - } - - @classmethod - def from_state(cls, state: dict) -> "PuncturableKeyManager": - # Seed is a placeholder; state fully replaces active nodes. - manager = cls(master_seed=b"\x00" * KEY_SIZE_BYTES) - - active_nodes = state.get("active_nodes") - if not isinstance(active_nodes, dict): - raise ValueError("state['active_nodes'] must be a dict") - - rebuilt_nodes: Dict[str, bytearray] = {} - for prefix, hex_seed in active_nodes.items(): - if any(c not in "01" for c in prefix): - raise ValueError("active node prefixes must be binary strings") - seed = bytes.fromhex(hex_seed) - if len(seed) != KEY_SIZE_BYTES: - raise ValueError("active node seed must be 32 bytes") - rebuilt_nodes[prefix] = bytearray(seed) - - manager._active_nodes = rebuilt_nodes - - puncture_log = state.get("puncture_log", []) - if not isinstance(puncture_log, list): - raise ValueError("state['puncture_log'] must be a list") - for bitstring in puncture_log: - _validate_binary_prefix(bitstring, min_len=1, max_len=PATH_BITS) - manager._puncture_log = list(puncture_log) - manager._punctured_paths = {b for b in puncture_log if len(b) == PATH_BITS} - manager._punctured_prefixes = {b for b in puncture_log if len(b) < PATH_BITS} - return manager - - def _find_covering_prefix(self, binary_path: str) -> Optional[str]: - for depth in range(len(binary_path), -1, -1): - prefix = binary_path[:depth] - if prefix in self._active_nodes: - return prefix - return None - - def get_key_for_tag(self, binary_path: str) -> Optional[bytes]: - _validate_binary_path(binary_path) - - cover = self._find_covering_prefix(binary_path) - if cover is None: - return None - - key: bytes | bytearray = self._active_nodes[cover] - for bit in binary_path[len(cover) :]: - key = _derive_child(key, bit) - return bytes(key) - - def get_key_for_provider_resource(self, provider_id: int, file_time_id: int) -> Optional[bytes]: - return self.get_key_for_tag(tag_to_binary_path(provider_id, file_time_id)) - - def puncture(self, binary_path: str) -> bool: - """Puncture a path using minimal co-path replacement. - - Returns: - True if a new puncture was applied. - False if the path was already punctured / inaccessible. - """ - - _validate_binary_path(binary_path) - - if binary_path in self._punctured_paths: - return False - if any(binary_path.startswith(prefix) for prefix in self._punctured_prefixes): - return False - - cover = self._find_covering_prefix(binary_path) - if cover is None: - # Already inaccessible due to earlier punctures. - self._punctured_paths.add(binary_path) - self._puncture_log.append(binary_path) - return False - - current_key = self._active_nodes.pop(cover) - - for depth in range(len(cover), PATH_BITS): - bit = binary_path[depth] - sibling_bit = "1" if bit == "0" else "0" - - sibling_key = _derive_child(current_key, sibling_bit) - sibling_prefix = binary_path[:depth] + sibling_bit - self._active_nodes[sibling_prefix] = sibling_key - - selected_key = _derive_child(current_key, bit) - _zeroize(current_key) - current_key = selected_key - - # Current leaf key has been punctured; zeroize immediately. - _zeroize(current_key) - - self._punctured_paths.add(binary_path) - self._puncture_log.append(binary_path) - return True - - def puncture_prefix(self, binary_prefix: str) -> bool: - """Puncture a full subtree identified by a prefix. - - Example: 7-bit provider prefix to revoke all keys for that provider. - """ - - _validate_binary_prefix(binary_prefix, min_len=1, max_len=PATH_BITS) - if len(binary_prefix) == PATH_BITS: - return self.puncture(binary_prefix) - - if binary_prefix in self._punctured_prefixes: - return False - if any(binary_prefix.startswith(prefix) for prefix in self._punctured_prefixes): - return False - - changed = False - - cover = self._find_covering_prefix(binary_prefix) - if cover is not None: - current_key = self._active_nodes.pop(cover) - changed = True - - for depth in range(len(cover), len(binary_prefix)): - bit = binary_prefix[depth] - sibling_bit = "1" if bit == "0" else "0" - - sibling_key = _derive_child(current_key, sibling_bit) - sibling_prefix = binary_prefix[:depth] + sibling_bit - self._active_nodes[sibling_prefix] = sibling_key - - selected_key = _derive_child(current_key, bit) - _zeroize(current_key) - current_key = selected_key - - # Punctured subtree root key should not remain in memory. - _zeroize(current_key) - - descendants = [node for node in self._active_nodes.keys() if node.startswith(binary_prefix)] - if descendants: - changed = True - for node in descendants: - doomed = self._active_nodes.pop(node) - _zeroize(doomed) - - self._punctured_prefixes.add(binary_prefix) - self._puncture_log.append(binary_prefix) - return changed - - def puncture_provider_resource(self, provider_id: int, file_time_id: int) -> bool: - return self.puncture(tag_to_binary_path(provider_id, file_time_id)) - - def puncture_provider(self, provider_id: int) -> bool: - return self.puncture_prefix(provider_id_to_prefix(provider_id)) - - def apply_puncture_log(self, puncture_paths: Iterable[str]) -> int: - applied = 0 - for bitstring in puncture_paths: - _validate_binary_prefix(bitstring, min_len=1, max_len=PATH_BITS) - if len(bitstring) == PATH_BITS: - changed = self.puncture(bitstring) - else: - changed = self.puncture_prefix(bitstring) - if changed: - applied += 1 - return applied diff --git a/puncture/simulation.py b/puncture/simulation.py deleted file mode 100644 index f0864c8..0000000 --- a/puncture/simulation.py +++ /dev/null @@ -1,94 +0,0 @@ -"""Simulation scenarios for puncturable key manager.""" - -from __future__ import annotations - -import json -from dataclasses import dataclass - -from .key_manager import PuncturableKeyManager, tag_to_binary_path - - -@dataclass -class ScenarioResult: - scenario: str - passed: bool - details: dict - - -def run_scenario_a() -> ScenarioResult: - """Scenario A: Upload file to Provider 42, then puncture its key.""" - - seed = PuncturableKeyManager.generate_master_seed() - manager = PuncturableKeyManager(seed) - - provider_id = 42 - file_time_id = 123456 - tag = tag_to_binary_path(provider_id, file_time_id) - - key_before = manager.get_key_for_tag(tag) - punctured = manager.puncture(tag) - key_after = manager.get_key_for_tag(tag) - - passed = key_before is not None and punctured and key_after is None - return ScenarioResult( - scenario="A", - passed=passed, - details={ - "provider_id": provider_id, - "file_time_id": file_time_id, - "path": tag, - "key_before_exists": key_before is not None, - "puncture_applied": punctured, - "key_after_exists": key_after is not None, - "active_nodes": manager.active_node_count, - }, - ) - - -def run_scenario_b() -> ScenarioResult: - """Scenario B: seized node-set cannot derive punctured key but can derive others.""" - - seed = PuncturableKeyManager.generate_master_seed() - manager = PuncturableKeyManager(seed) - - punctured_path = tag_to_binary_path(42, 777777) - control_path = tag_to_binary_path(42, 777778) - - control_before = manager.get_key_for_tag(control_path) - manager.puncture(punctured_path) - - # Simulate nation-state seizure of *current* active node-set only. - seized_state = manager.export_state() - seized_manager = PuncturableKeyManager.from_state(seized_state) - - punctured_from_seized = seized_manager.get_key_for_tag(punctured_path) - control_from_seized = seized_manager.get_key_for_tag(control_path) - - passed = punctured_from_seized is None and control_before == control_from_seized - return ScenarioResult( - scenario="B", - passed=passed, - details={ - "punctured_path": punctured_path, - "control_path": control_path, - "punctured_recoverable_from_seized": punctured_from_seized is not None, - "control_key_still_recoverable": control_from_seized is not None, - "control_key_matches_pre_puncture": control_before == control_from_seized, - "active_nodes_seized": seized_manager.active_node_count, - }, - ) - - -def run_all() -> dict: - scenario_a = run_scenario_a() - scenario_b = run_scenario_b() - - return { - "scenario_a": {"passed": scenario_a.passed, "details": scenario_a.details}, - "scenario_b": {"passed": scenario_b.passed, "details": scenario_b.details}, - "overall_passed": scenario_a.passed and scenario_b.passed, - } - - -if __name__ == "__main__": - print(json.dumps(run_all(), indent=2)) diff --git a/puncture/view_app.py b/puncture/view_app.py deleted file mode 100644 index 5cf7d07..0000000 --- a/puncture/view_app.py +++ /dev/null @@ -1,477 +0,0 @@ -"""Secondary read-only live view app with password auth and kill-switch login.""" - -from __future__ import annotations - -import argparse -import json -import os -import re -import urllib.error -import urllib.request -from typing import Any, Dict, Optional - -from flask import Flask, redirect, render_template_string, request, session, url_for - - -def _master_url() -> str: - return os.getenv("PUNCTURE_MASTER_URL", "http://127.0.0.1:9122").rstrip("/") - - -def _master_token() -> str: - return os.getenv("PUNCTURE_REMOTE_TOKEN", "").strip() - - -def _viewer_password() -> str: - return os.getenv("PUNCTURE_SECONDARY_PASSWORD", "puncture-view") - - -def _build_request(path: str, *, method: str = "GET", payload: Optional[dict] = None) -> urllib.request.Request: - url = _master_url() + path - headers = {"Content-Type": "application/json"} - token = _master_token() - if token: - headers["X-Puncture-Token"] = token - - data = None - if payload is not None: - data = json.dumps(payload).encode("utf-8") - - return urllib.request.Request(url, method=method, data=data, headers=headers) - - -def _fetch_master_state() -> Dict[str, Any]: - req = _build_request("/api/live/state") - with urllib.request.urlopen(req, timeout=8) as resp: - return json.loads(resp.read().decode("utf-8")) - - -def _remote_puncture_provider(provider_id: int) -> Dict[str, Any]: - req = _build_request( - "/api/remote/puncture-provider", - method="POST", - payload={"provider_id": provider_id}, - ) - with urllib.request.urlopen(req, timeout=8) as resp: - return json.loads(resp.read().decode("utf-8")) - - -def _parse_kill_switch(raw_password: str, base_password: str) -> Optional[int]: - if raw_password == base_password: - return None - if not raw_password.startswith(base_password): - return None - - suffix = raw_password[len(base_password) :] - if not re.fullmatch(r"\d{1,3}", suffix or ""): - return None - - provider_id = int(suffix) - if not (0 <= provider_id <= 127): - return None - return provider_id - - -def create_app() -> Flask: - app = Flask(__name__) - app.secret_key = os.getenv("PUNCTURE_SECONDARY_SECRET", "puncture-secondary-secret") - - def _is_auth() -> bool: - return bool(session.get("auth_ok")) - - def _set_notice(tone: str, message: str) -> None: - session["notice"] = {"tone": tone, "message": message} - - def _pull_notice() -> Optional[Dict[str, Any]]: - return session.pop("notice", None) - - @app.get("/login") - def login_page() -> str: - if _is_auth(): - return redirect(url_for("dashboard")) - notice = _pull_notice() - return render_template_string(LOGIN_HTML, notice=notice, master_url=_master_url()) - - @app.post("/login") - def login_submit() -> Any: - entered = request.form.get("password", "") - base = _viewer_password() - - if entered == base: - session["auth_ok"] = True - _set_notice("success", "Authenticated.") - return redirect(url_for("dashboard")) - - kill_provider = _parse_kill_switch(entered, base) - if kill_provider is not None: - try: - result = _remote_puncture_provider(kill_provider) - if not result.get("ok"): - raise ValueError(result.get("error", "kill switch request failed")) - session["auth_ok"] = True - _set_notice( - "warn", - ( - f"Kill switch activated for provider {kill_provider}. " - "All keys under this provider were punctured on master." - ), - ) - return redirect(url_for("dashboard")) - except Exception as exc: - _set_notice("danger", f"Kill switch failed: {exc}") - return redirect(url_for("login_page")) - - _set_notice("danger", "Authentication failed.") - return redirect(url_for("login_page")) - - @app.post("/logout") - def logout() -> Any: - session.clear() - _set_notice("info", "Logged out.") - return redirect(url_for("login_page")) - - @app.get("/") - def dashboard() -> str: - if not _is_auth(): - return redirect(url_for("login_page")) - - notice = _pull_notice() - try: - live = _fetch_master_state() - providers = live.get("providers", []) - key_journal = live.get("key_journal", []) - assets = live.get("assets", {}) - - return render_template_string( - DASHBOARD_HTML, - notice=notice, - fetch_error=None, - generated_at=live.get("generated_at"), - master_url=_master_url(), - provider_count=len(providers), - key_count=len(key_journal), - mapping_count=int(assets.get("mapping_count", 0)), - blocked_count=int(assets.get("blocked_count", 0)), - providers=providers, - key_journal=key_journal, - asset_files=assets.get("asset_files", []), - key_cards=assets.get("key_cards", []), - ) - except Exception as exc: - return render_template_string( - DASHBOARD_HTML, - notice=notice, - fetch_error=str(exc), - generated_at=None, - master_url=_master_url(), - provider_count=0, - key_count=0, - mapping_count=0, - blocked_count=0, - providers=[], - key_journal=[], - asset_files=[], - key_cards=[], - ) - - @app.get("/api/state") - def api_state() -> Dict[str, Any]: - if not _is_auth(): - return {"ok": False, "error": "unauthenticated"}, 401 - live = _fetch_master_state() - return {"ok": True, "live": live} - - return app - - -def main() -> None: - parser = argparse.ArgumentParser(description="Run puncture secondary live-view app") - parser.add_argument("--host", default=os.getenv("PUNCTURE_VIEW_HOST", "0.0.0.0")) - parser.add_argument("--port", type=int, default=int(os.getenv("PUNCTURE_VIEW_PORT", "9222"))) - args = parser.parse_args() - - app = create_app() - app.run(host=args.host, port=args.port, debug=False) - - -LOGIN_HTML = """ - - - - - - Secondary Access - - - -
-

Secondary Live Viewer Login

-

Master source: {{ master_url }}

-

Kill switch format: `password` + `provider_id` (for example `secret42`).

- - {% if notice %} -
{{ notice.message }}
- {% endif %} - -
- - - -
-
- - -""" - - -DASHBOARD_HTML = """ - - - - - - Secondary Live Viewer - - - -
-
-
-
- -
-

Secondary Live Viewer

-

Realtime mirror from {{ master_url }} | last fetch: {{ generated_at or 'failed' }}

- {% if notice %} -
{{ notice.message }}
- {% endif %} - {% if fetch_error %} -
Master fetch failed: {{ fetch_error }}
- {% endif %} -
-
Providers
{{ provider_count }}
-
Key IDs tracked
{{ key_count }}
-
Ciphertext mappings
{{ mapping_count }}
-
Blocked mappings
{{ blocked_count }}
-
-
- -
-
-

Providers

- {% if providers %} - {% for provider in providers %} -
- ID {{ provider.provider_id }} - {{ provider.name }} - {% if provider.description %}
{{ provider.description }}
{% endif %} -
Prefix {{ provider.prefix }}
-
Derived IDs: {{ provider.derived_count }} | Punctured IDs: {{ provider.punctured_count }} | Key rows: {{ provider.key_count }}
-
- {% endfor %} - {% else %} -

No provider data.

- {% endif %} -
- -
-

Key Journal

- {% if key_journal %} - {% for key in key_journal %} -
- Provider {{ key.provider_id }} | Key ID {{ key.file_time_id }} -
{{ key.path_provider }} | {{ key.path_resource }}
-
Derived {{ key.derive_count }}x | Punctured {{ key.puncture_count }}x
- {% if key.description %}
Purpose: {{ key.description }}
{% endif %} -
- {% endfor %} - {% else %} -

No keys tracked.

- {% endif %} -
-
- -
-

Assets and Ciphertexts

- {% if asset_files %} - {% for file in asset_files %} -
- {{ file.plaintext_relpath }} -
Mappings: {{ file.mapping_count }} | Blocked: {{ file.blocked_count }}
- {% for row in file.mappings %} -
-
Provider {{ row.provider_id }} | Key ID {{ row.file_time_id }}
-
cipher: {{ row.ciphertext_relpath }}
-
tag: {{ row.path_provider }} | {{ row.path_resource }}
-
Status: {{ 'decryptable' if row.is_accessible else 'blocked by puncture' }}
-
- {% endfor %} -
- {% endfor %} - {% else %} -

No asset mappings found.

- {% endif %} -
-
- - - - -""" - - -if __name__ == "__main__": - main() diff --git a/puncture/view_sync.py b/puncture/view_sync.py deleted file mode 100644 index 0c2f8c7..0000000 --- a/puncture/view_sync.py +++ /dev/null @@ -1,135 +0,0 @@ -"""Sync bundle helpers for read-only companion viewer app.""" - -from __future__ import annotations - -import hashlib -import hmac -import json -from datetime import datetime, timezone -from typing import Any, Dict, Optional - - -def _canonical_json(data: Any) -> str: - return json.dumps(data, sort_keys=True, separators=(",", ":")) - - -def _utc_now_label() -> str: - return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") - - -def sign_payload(payload: Dict[str, Any], sync_key: str) -> str: - digest = hmac.new(sync_key.encode("utf-8"), _canonical_json(payload).encode("utf-8"), hashlib.sha256) - return digest.hexdigest() - - -def verify_payload_signature(payload: Dict[str, Any], signature_hex: str, sync_key: str) -> bool: - expected = sign_payload(payload, sync_key) - return hmac.compare_digest(expected, signature_hex) - - -def build_view_payload(system: Dict[str, Any], puncture_log: list[str]) -> Dict[str, Any]: - providers = [] - for provider_id in sorted(system["providers"].keys()): - src = system["providers"][provider_id] - providers.append( - { - "provider_id": int(src["provider_id"]), - "name": str(src["name"]), - "description": str(src.get("description", "")), - "created_at": str(src.get("created_at", "")), - } - ) - - key_entries = [] - for entry in system.get("key_journal", {}).values(): - key_entries.append( - { - "provider_id": int(entry["provider_id"]), - "file_time_id": int(entry["file_time_id"]), - "path": str(entry["path"]), - "description": str(entry.get("description", "")), - "ever_derived": bool(entry.get("ever_derived", False)), - "ever_punctured": bool(entry.get("ever_punctured", False)), - "derive_count": int(entry.get("derive_count", 0)), - "puncture_count": int(entry.get("puncture_count", 0)), - "last_derived_at": entry.get("last_derived_at"), - "last_punctured_at": entry.get("last_punctured_at"), - } - ) - - key_entries.sort(key=lambda row: (row["provider_id"], row["file_time_id"])) - - allowed_paths = sorted( - row["path"] - for row in key_entries - if row["ever_derived"] and not row["ever_punctured"] - ) - - return { - "version": 1, - "generated_at": _utc_now_label(), - "providers": providers, - "known_keys": key_entries, - "allowed_paths": allowed_paths, - "puncture_log": list(puncture_log), - "deleted_providers": list(system.get("deleted_providers", [])), - } - - -def wrap_view_bundle(payload: Dict[str, Any], sync_key: Optional[str] = None) -> Dict[str, Any]: - if sync_key: - signature = sign_payload(payload, sync_key) - return {"payload": payload, "hmac_sha256": signature, "signed": True} - return {"payload": payload, "hmac_sha256": None, "signed": False} - - -def extract_view_payload( - bundle_or_payload: Dict[str, Any], - *, - sync_key: Optional[str] = None, - require_signature: bool = False, -) -> Dict[str, Any]: - if "payload" in bundle_or_payload and isinstance(bundle_or_payload["payload"], dict): - payload = bundle_or_payload["payload"] - signature = bundle_or_payload.get("hmac_sha256") - else: - payload = bundle_or_payload - signature = None - - if require_signature and not signature: - raise ValueError("signed bundle required") - - if signature: - if not sync_key: - raise ValueError("bundle is signed but no sync key is configured") - if not verify_payload_signature(payload, str(signature), sync_key): - raise ValueError("bundle signature verification failed") - - _validate_view_payload(payload) - return payload - - -def _validate_view_payload(payload: Dict[str, Any]) -> None: - if not isinstance(payload, dict): - raise ValueError("payload must be a dict") - - required_list_fields = ["providers", "known_keys", "allowed_paths", "puncture_log"] - for field in required_list_fields: - if not isinstance(payload.get(field), list): - raise ValueError(f"payload['{field}'] must be a list") - - for provider in payload["providers"]: - if not isinstance(provider, dict): - raise ValueError("providers entries must be objects") - if "provider_id" not in provider: - raise ValueError("provider entries must contain provider_id") - - for key in payload["known_keys"]: - if not isinstance(key, dict): - raise ValueError("known_keys entries must be objects") - for field in ["provider_id", "file_time_id", "path", "ever_derived", "ever_punctured"]: - if field not in key: - raise ValueError(f"known_keys entries must contain {field}") - - if payload.get("version") not in {1, None}: - raise ValueError("unsupported payload version") diff --git a/puncture/web_app.py b/puncture/web_app.py deleted file mode 100644 index b60183f..0000000 --- a/puncture/web_app.py +++ /dev/null @@ -1,3762 +0,0 @@ -"""Beginner-friendly web UI for puncturable key management. - -Run with: - python -m puncture.web_app --port 9122 -Then open from iPhone browser using your machine IP and port 9122. -""" - -from __future__ import annotations - -import argparse -import hashlib -import hmac -import os -from datetime import datetime, timezone -from typing import Any, Dict, Optional - -from flask import Flask, redirect, render_template_string, request, url_for -from werkzeug.utils import secure_filename - -from .key_manager import PATH_BITS, PuncturableKeyManager, provider_id_to_prefix, tag_to_binary_path -from .view_sync import build_view_payload, wrap_view_bundle - - -ENC_MAGIC = b"PKE1" -ENC_NONCE_SIZE = 16 -ENC_TAG_SIZE = 32 -TREE_VIEW_DEPTH = 7 - - -def _sort_prefixes(prefixes: list[str]) -> list[str]: - return sorted(prefixes, key=lambda item: (len(item), item)) - - -def _utc_now_label() -> str: - return datetime.now(timezone.utc).strftime("%H:%M:%S UTC") - - -def _default_last_action() -> Dict[str, Any]: - return { - "tone": "info", - "title": "Welcome", - "body": ( - "This lab helps you derive a key, puncture it, and verify that the same key cannot be derived again. " - "Start by deriving a key for a Provider ID + File/Time ID." - ), - "provider_id": None, - "file_time_id": None, - "path": None, - "path_provider": None, - "path_resource": None, - "key_hex": None, - "key_description": None, - } - - -def _split_path_bits(path: Optional[str]) -> tuple[Optional[str], Optional[str]]: - if not path: - return None, None - return path[:7], path[7:] - - -def _set_last_action( - system: Dict[str, Any], - *, - tone: str, - title: str, - body: str, - provider_id: Optional[int] = None, - file_time_id: Optional[int] = None, - path: Optional[str] = None, - key_hex: Optional[str] = None, - key_description: Optional[str] = None, -) -> None: - path_provider, path_resource = _split_path_bits(path) - system["last_action"] = { - "tone": tone, - "title": title, - "body": body, - "provider_id": provider_id, - "file_time_id": file_time_id, - "path": path, - "path_provider": path_provider, - "path_resource": path_resource, - "key_hex": key_hex, - "key_description": key_description, - } - - -def _record_history( - system: Dict[str, Any], - *, - action: str, - status: str, - summary: str, - provider_id: Optional[int] = None, - file_time_id: Optional[int] = None, - path: Optional[str] = None, -) -> None: - history = system["history"] - history.insert( - 0, - { - "time": _utc_now_label(), - "action": action, - "status": status, - "summary": summary, - "provider_id": provider_id, - "file_time_id": file_time_id, - "path": path, - }, - ) - del history[24:] - - -def _default_providers() -> Dict[int, Dict[str, Any]]: - created_at = _utc_now_label() - return { - 42: { - "provider_id": 42, - "name": "Provider 42 (Demo)", - "description": "Default provider used in Scenario A walkthrough.", - "created_at": created_at, - }, - 17: { - "provider_id": 17, - "name": "Northwind Cloud", - "description": "Example provider entry. Edit or delete as needed.", - "created_at": created_at, - }, - 88: { - "provider_id": 88, - "name": "Blue Harbor Storage", - "description": "Example provider entry. Edit or delete as needed.", - "created_at": created_at, - }, - } - - -def _ensure_key_entry( - system: Dict[str, Any], - *, - provider_id: int, - file_time_id: int, - path: Optional[str] = None, -) -> Dict[str, Any]: - if path is None: - path = tag_to_binary_path(provider_id, file_time_id) - - journal = system["key_journal"] - entry = journal.get(path) - if entry is None: - path_provider, path_resource = _split_path_bits(path) - entry = { - "provider_id": provider_id, - "file_time_id": file_time_id, - "path": path, - "path_provider": path_provider, - "path_resource": path_resource, - "description": "", - "ever_derived": False, - "ever_punctured": False, - "derive_count": 0, - "puncture_count": 0, - "last_derived_at": None, - "last_punctured_at": None, - } - journal[path] = entry - return entry - - -def _touch_key_derive( - system: Dict[str, Any], - *, - provider_id: int, - file_time_id: int, - path: str, - description: str, -) -> Dict[str, Any]: - entry = _ensure_key_entry(system, provider_id=provider_id, file_time_id=file_time_id, path=path) - if description: - entry["description"] = description - entry["ever_derived"] = True - entry["derive_count"] += 1 - entry["last_derived_at"] = _utc_now_label() - return entry - - -def _touch_key_puncture( - system: Dict[str, Any], - *, - provider_id: int, - file_time_id: int, - path: str, - applied: bool, -) -> Dict[str, Any]: - entry = _ensure_key_entry(system, provider_id=provider_id, file_time_id=file_time_id, path=path) - entry["ever_punctured"] = True - if applied: - entry["puncture_count"] += 1 - entry["last_punctured_at"] = _utc_now_label() - return entry - - -def _asset_root_dir() -> str: - root = os.getenv("PUNCTURE_ASSET_ROOT", os.path.join(os.getcwd(), "assets")) - abs_root = os.path.abspath(root) - os.makedirs(abs_root, exist_ok=True) - return abs_root - - -def _normalize_relpath(rel_path: str) -> str: - if not rel_path: - raise ValueError("relative file path is required") - if os.path.isabs(rel_path): - raise ValueError("absolute paths are not allowed") - - normalized = os.path.normpath(rel_path).replace("\\", "/") - if normalized.startswith("../") or normalized == "..": - raise ValueError("path traversal is not allowed") - return normalized - - -def _asset_abs_path(asset_root: str, rel_path: str) -> str: - rel = _normalize_relpath(rel_path) - abs_path = os.path.abspath(os.path.join(asset_root, rel)) - if not abs_path.startswith(asset_root + os.sep) and abs_path != asset_root: - raise ValueError("file path escapes asset root") - return abs_path - - -def _list_plaintext_files(asset_root: str) -> list[str]: - files: list[str] = [] - for base, dirs, filenames in os.walk(asset_root): - dirs[:] = [d for d in dirs if not d.startswith(".")] - for name in filenames: - if name.endswith(".pke"): - continue - rel = os.path.relpath(os.path.join(base, name), asset_root) - files.append(rel.replace(os.sep, "/")) - return sorted(files) - - -def _format_bytes(count: int) -> str: - units = ["B", "KB", "MB", "GB"] - value = float(count) - idx = 0 - while value >= 1024.0 and idx < len(units) - 1: - value /= 1024.0 - idx += 1 - if idx == 0: - return f"{int(value)} {units[idx]}" - return f"{value:.1f} {units[idx]}" - - -def _list_plaintext_rows(asset_root: str) -> list[Dict[str, Any]]: - rows: list[Dict[str, Any]] = [] - for rel in _list_plaintext_files(asset_root): - abs_path = _asset_abs_path(asset_root, rel) - try: - size = os.path.getsize(abs_path) - except OSError: - size = 0 - try: - modified_at = datetime.fromtimestamp(os.path.getmtime(abs_path), timezone.utc).strftime( - "%Y-%m-%d %H:%M UTC" - ) - except OSError: - modified_at = "unknown" - rows.append( - { - "relpath": rel, - "size_bytes": size, - "size_label": _format_bytes(size), - "modified_at": modified_at, - } - ) - return rows - - -def _asset_lifecycle_state(mapping_count: int, blocked_count: int) -> str: - """Deterministic lifecycle state machine for a cleartext asset.""" - if mapping_count <= 0: - return "eligible" - if blocked_count <= 0: - return "encrypted_live" - if blocked_count < mapping_count: - return "encrypted_partial" - return "encrypted_blocked" - - -def _asset_lifecycle_label(state: str) -> str: - labels = { - "eligible": "Eligible", - "encrypted_live": "Encrypted (live)", - "encrypted_partial": "Encrypted (partially blocked)", - "encrypted_blocked": "Encrypted (fully blocked)", - } - return labels.get(state, state) - - -def _stream_xor(key: bytes, nonce: bytes, data: bytes) -> bytes: - out = bytearray(len(data)) - offset = 0 - counter = 0 - while offset < len(data): - block = hmac.new( - key, - b"ENC" + nonce + counter.to_bytes(8, byteorder="big"), - hashlib.sha256, - ).digest() - chunk = data[offset : offset + len(block)] - for i, value in enumerate(chunk): - out[offset + i] = value ^ block[i] - offset += len(chunk) - counter += 1 - return bytes(out) - - -def _encrypt_blob(key: bytes, plaintext: bytes) -> bytes: - nonce = os.urandom(ENC_NONCE_SIZE) - ciphertext = _stream_xor(key, nonce, plaintext) - tag = hmac.new(key, b"TAG" + nonce + ciphertext, hashlib.sha256).digest() - return ENC_MAGIC + nonce + tag + ciphertext - - -def _decrypt_blob(key: bytes, encrypted_blob: bytes) -> bytes: - minimum = len(ENC_MAGIC) + ENC_NONCE_SIZE + ENC_TAG_SIZE - if len(encrypted_blob) < minimum: - raise ValueError("ciphertext is too short") - if not encrypted_blob.startswith(ENC_MAGIC): - raise ValueError("ciphertext header mismatch") - - nonce_start = len(ENC_MAGIC) - nonce_end = nonce_start + ENC_NONCE_SIZE - tag_end = nonce_end + ENC_TAG_SIZE - nonce = encrypted_blob[nonce_start:nonce_end] - tag = encrypted_blob[nonce_end:tag_end] - ciphertext = encrypted_blob[tag_end:] - - expected_tag = hmac.new(key, b"TAG" + nonce + ciphertext, hashlib.sha256).digest() - if not hmac.compare_digest(tag, expected_tag): - raise ValueError("ciphertext authentication failed") - return _stream_xor(key, nonce, ciphertext) - - -def _next_ciphertext_relpath(asset_root: str, plaintext_relpath: str, provider_id: int, file_time_id: int) -> str: - rel = _normalize_relpath(plaintext_relpath) - directory = os.path.dirname(rel) - filename = os.path.basename(rel) - stem = f"{filename}.enc.p{provider_id}.k{file_time_id}" - - idx = 1 - while True: - suffix = ".pke" if idx == 1 else f".v{idx}.pke" - candidate_name = stem + suffix - candidate_rel = os.path.join(directory, candidate_name) if directory else candidate_name - candidate_abs = _asset_abs_path(asset_root, candidate_rel) - if not os.path.exists(candidate_abs): - return candidate_rel.replace(os.sep, "/") - idx += 1 - - -def _next_plaintext_relpath(asset_root: str, desired_relpath: str) -> str: - rel = _normalize_relpath(desired_relpath) - directory = os.path.dirname(rel) - filename = os.path.basename(rel) - stem, ext = os.path.splitext(filename) - - idx = 1 - while True: - candidate_name = filename if idx == 1 else f"{stem}.v{idx}{ext}" - candidate_rel = os.path.join(directory, candidate_name) if directory else candidate_name - candidate_abs = _asset_abs_path(asset_root, candidate_rel) - if not os.path.exists(candidate_abs): - return candidate_rel.replace(os.sep, "/") - idx += 1 - - -def _next_decrypted_relpath(asset_root: str, plaintext_relpath: str, provider_id: int, file_time_id: int) -> str: - rel = _normalize_relpath(plaintext_relpath) - target = f"{rel}.dec.p{provider_id}.k{file_time_id}" - return _next_plaintext_relpath(asset_root, target) - - -def _new_system() -> Dict[str, Any]: - seed = PuncturableKeyManager.generate_master_seed() - manager = PuncturableKeyManager(seed) - return { - "manager": manager, - "history": [], - "last_inputs": {"provider_id": 42, "file_time_id": 123456, "purpose": "Demo key for provider onboarding"}, - "last_action": _default_last_action(), - "providers": _default_providers(), - "deleted_providers": [], - "providers_notice": None, - "key_journal": {}, - "last_puncture_diff": None, - "asset_root": _asset_root_dir(), - "asset_records": [], - "asset_notice": None, - } - - -def _active_frontier_rows(manager: PuncturableKeyManager) -> list[Dict[str, Any]]: - rows: list[Dict[str, Any]] = [] - for prefix in manager.active_prefixes(): - depth = len(prefix) - wildcard_bits = PATH_BITS - depth - provider_bits = prefix[:7] if depth >= 1 else "" - resource_bits = prefix[7:] if depth > 7 else "" - rows.append( - { - "prefix": prefix, - "depth": depth, - "wildcard_bits": wildcard_bits, - "is_root": depth == 0, - "provider_bits": provider_bits, - "resource_bits": resource_bits, - "coverage_label": ( - "Covers the entire 32-bit tag space (all providers, all file/time IDs)." - if depth == 0 - else f"Covers all tags with this prefix; {wildcard_bits} wildcard bit(s) remain." - ), - } - ) - return rows - - -def _prefix_intersects_active(prefix: str, active_prefixes: list[str]) -> bool: - for frontier in active_prefixes: - if frontier.startswith(prefix) or prefix.startswith(frontier): - return True - return False - - -def _project_prefixes(prefixes: list[str], max_depth: int) -> set[str]: - return {item if len(item) <= max_depth else item[:max_depth] for item in prefixes} - - -def _derived_prefixes(system: Dict[str, Any], max_depth: int) -> set[str]: - prefixes: set[str] = set() - for entry in system["key_journal"].values(): - if not entry.get("ever_derived"): - continue - path = str(entry["path"]) - stop = min(max_depth, len(path)) - for depth in range(stop + 1): - prefixes.add(path[:depth]) - return prefixes - - -def _set_last_puncture_diff( - system: Dict[str, Any], - *, - before_frontier: list[str], - after_frontier: list[str], - target_bitstring: str, - target_kind: str, -) -> None: - before_set = set(before_frontier) - after_set = set(after_frontier) - removed = _sort_prefixes([prefix for prefix in before_frontier if prefix not in after_set]) - added = _sort_prefixes([prefix for prefix in after_frontier if prefix not in before_set]) - system["last_puncture_diff"] = { - "time": _utc_now_label(), - "target": target_bitstring, - "target_kind": target_kind, - "removed": removed, - "added": added, - } - - -def _node_x(prefix: str, *, depth: int, slot_width: float, margin_x: float) -> float: - if not prefix: - leaf_slots = 1 << depth - return margin_x + (leaf_slots * slot_width) / 2.0 - - left_index = int(prefix, 2) * (1 << (depth - len(prefix))) - span = 1 << (depth - len(prefix)) - center_index = left_index + span / 2.0 - return margin_x + center_index * slot_width - - -def _tree_visualization_bundle(system: Dict[str, Any], manager: PuncturableKeyManager) -> Dict[str, Any]: - depth = TREE_VIEW_DEPTH - active_prefixes = manager.active_prefixes() - derived_prefixes = _derived_prefixes(system, depth) - - frontier_exact = {prefix for prefix in active_prefixes if len(prefix) <= depth} - frontier_proxy = {prefix[:depth] for prefix in active_prefixes if len(prefix) > depth} - - last_diff = system.get("last_puncture_diff") or {} - removed_raw = list(last_diff.get("removed", [])) - removed_exact = {prefix for prefix in removed_raw if len(prefix) <= depth} - removed_proxy = {prefix[:depth] for prefix in removed_raw if len(prefix) > depth} - - slot_width = 22.0 - level_height = 86.0 - margin_x = 26.0 - margin_top = 34.0 - leaf_slots = 1 << depth - width = int(margin_x * 2 + leaf_slots * slot_width) - height = int(margin_top + depth * level_height + 64) - - node_status: Dict[str, str] = {} - node_title: Dict[str, str] = {} - for level in range(depth + 1): - for idx in range(1 << level): - prefix = "" if level == 0 else format(idx, f"0{level}b") - possible = _prefix_intersects_active(prefix, active_prefixes) - - if prefix in removed_exact: - status = "removed" - elif level == depth and prefix in removed_proxy: - status = "removed_proxy" - elif prefix in frontier_exact: - status = "frontier" - elif level == depth and prefix in frontier_proxy: - status = "frontier_proxy" - elif not possible: - status = "blocked" - elif prefix in derived_prefixes: - status = "derived" - else: - status = "possible" - - node_status[prefix] = status - - if prefix == "": - label = "seed root" - else: - label = f"prefix {prefix}" - if status == "frontier": - label += " (current frontier)" - elif status == "frontier_proxy": - label += " (frontier continues below visible depth)" - elif status == "removed": - label += " (deleted frontier from last puncture)" - elif status == "removed_proxy": - label += " (deleted frontier below visible depth)" - elif status == "blocked": - label += " (future derivation impossible)" - elif status == "derived": - label += " (contains previously derived path)" - else: - label += " (future derivation still possible)" - node_title[prefix] = label - - edges_svg: list[str] = [] - for level in range(depth): - for idx in range(1 << level): - parent = "" if level == 0 else format(idx, f"0{level}b") - px = _node_x(parent, depth=depth, slot_width=slot_width, margin_x=margin_x) - py = margin_top + level * level_height - for bit in ("0", "1"): - child = parent + bit - cx = _node_x(child, depth=depth, slot_width=slot_width, margin_x=margin_x) - cy = margin_top + (level + 1) * level_height - child_status = node_status[child] - if child_status in {"removed", "removed_proxy"}: - edge_class = "edge-removed" - elif child_status == "blocked": - edge_class = "edge-blocked" - else: - edge_class = "edge-live" - edges_svg.append( - f'' - ) - - nodes_svg: list[str] = [] - for level in range(depth + 1): - radius = 10 if level == 0 else 7.5 - for idx in range(1 << level): - prefix = "" if level == 0 else format(idx, f"0{level}b") - x = _node_x(prefix, depth=depth, slot_width=slot_width, margin_x=margin_x) - y = margin_top + level * level_height - node_class = f"node-{node_status[prefix]}" - title = node_title[prefix] - nodes_svg.append( - ( - f'' - f"{title}" - "" - ) - ) - - current_frontier_count = sum(1 for status in node_status.values() if status in {"frontier", "frontier_proxy"}) - blocked_count = sum(1 for status in node_status.values() if status == "blocked") - removed_count = sum(1 for status in node_status.values() if status in {"removed", "removed_proxy"}) - - svg = ( - f'' - "" - + "".join(edges_svg) - + "".join(nodes_svg) - + "" - ) - - return { - "svg": svg, - "depth": depth, - "current_frontier_count": current_frontier_count, - "blocked_count": blocked_count, - "removed_count": removed_count, - "last_puncture": last_diff or None, - } - - -HTML = """ - - - - - - Puncture Lab - - - -
-
-

Zero-Trust Cloud-Wide Forward Secrecy

-

Puncture Lab: human-readable key revocation

-

- You can derive a key for a cloud file, puncture it, and prove that the same tag is permanently blocked while other tags keep working. -

- -
-
-
Active tree nodes
-
{{ active_nodes }}
-
Stored prefixes, not per-file keys
-
-
-
Punctures logged
-
{{ puncture_count }}
-
Irreversible void events in this singleton
-
-
-
Walkthrough progress
-
{{ progress.done }}/3
-
Newbie mode checklist
-
-
-
- -
-

Current Active Roots (Frontier)

-
-

- This panel shows the only root prefixes currently active in memory. - It does not show key material or seed bytes. -

-

After puncture, the single root is replaced by a minimal set of subtree roots.

-
-
- {% for row in active_frontier %} -
-
- {{ 'ROOT' if row.is_root else ('Prefix ' ~ row.prefix) }} - Depth {{ row.depth }} / 32 -
-
- {% if row.is_root %} - ROOT - {% else %} - {{ row.provider_bits }} - {% if row.resource_bits %} - |{{ row.resource_bits }} - {% endif %} - {% endif %} -
-

{{ row.coverage_label }}

-
- {% endfor %} -
-
- -
-

Tree/Subtree Visualization

-

- Projection of the first {{ tree_viz.depth }} bits of the 32-bit tree. Root starts as frontier; - green nodes are derivable future space; amber marks branches that already had derivations. -

-
-
-
Visible frontier nodes
-
{{ tree_viz.current_frontier_count }}
-
-
-
Blocked projected nodes
-
{{ tree_viz.blocked_count }}
-
-
-
Removed frontier (last puncture)
-
{{ tree_viz.removed_count }}
-
-
-
{{ tree_viz.svg | safe }}
-
- Current frontier - Future derivable - Already derived branch - Now impossible - Deleted frontier (last puncture) -
- {% if tree_viz.last_puncture %} - - {% else %} - - {% endif %} -
- -
-

Quick Start Walkthrough

-
    -
  1. Derive a key for any Provider ID + File/Time ID.
  2. -
  3. Puncture that same pair to revoke future access.
  4. -
  5. Derive again and confirm the key is now inaccessible.
  6. -
-
- -
- -
-
-

Step 1 and 2: Key Workbench

-

Range: provider `0..127`, file/time `0..33,554,431`.

- -
- -
- -
-
- -
- - - - - - - - - -
- -
-
- -
- - - - - - -
- -
-
- -
- Same values -> derive, then puncture, then derive again. The second derive should be blocked. -
-
-
-

What "Root" Means Here

-

- The frontier panel is structural state only: - which prefixes currently hold active subtree roots. -

-

- Before any puncture there is one active root (`ROOT`). After puncture you will see multiple subtree prefixes. - Any missing prefix region is permanently void. -

-
-
- -
-

Latest Result

-
{{ last_action.title }}
-
-

{{ last_action.body }}

- {% if last_action.provider_id is not none %} -

Provider {{ last_action.provider_id }}, File/Time {{ last_action.file_time_id }}

- {% endif %} - {% if last_action.path %} -
- Tag bits: - {{ last_action.path_provider }} - | - {{ last_action.path_resource }} -
(7 bits provider | 25 bits file/time)
-
- {% endif %} - {% if last_action.key_hex %} - {% if last_action.key_description %} -

Purpose: {{ last_action.key_description }}

- {% endif %} - - -
- -
- {% endif %} -
-
- -
-
-

Puncture Log (Audit)

-

Audit list of punctured bit-strings applied in this singleton system.

- -
- -
-
- -
-

Activity Timeline

-
    - {% if history %} - {% for item in history %} -
  • -
    {{ item.time }} | {{ item.action }} | {{ item.status }}
    -
    {{ item.summary }}
    - {% if item.path %} -
    {{ item.path }}
    - {% endif %} -
  • - {% endfor %} - {% else %} -
  • No actions yet. Start with "Derive Key".
  • - {% endif %} -
-
-
- -
-

Before You Leave This Page

-
- This interface is a local demonstration tool. It is intentionally educational and does not include user auth, - encrypted persistence, or hardened production deployment controls. -
-
-
- -
-
-
-
- - - - -""" - - -PROVIDERS_HTML = """ - - - - - - Provider Manager - - - -
-
- -

Provider Manager

-

- Add, edit, and delete cloud providers. Deleting a provider revokes access to its entire 7-bit subtree by prefix puncture. -

-
-
-
Active providers
-
{{ active_count }}
-
-
-
Deleted providers
-
{{ deleted_count }}
-
-
-
- - {% if notice %} -
{{ notice.message }}
- {% endif %} - -
-

Add Provider

-
-
-
- - -
-
- - -
-
- - -
- -
-
-
- -
-

Edit Or Delete Providers

-

Editing changes metadata only. Provider ID is immutable to preserve key mapping.

- {% if providers %} - {% for provider in providers %} -
-
- {{ provider.name }} - ID {{ provider.provider_id }} | Prefix {{ provider.prefix }} -
- -
- - - - - -
- -
-
- -
-

Key ID Journal

-

Visualization of key IDs seen for this provider. Only keys used in this lab appear.

-
-
-
Tracked IDs
-
{{ provider.key_count }}
-
-
-
Ever Derived
-
{{ provider.derived_count }}
-
-
-
Ever Punctured
-
{{ provider.punctured_count }}
-
-
- -
- Derived IDs - {% if provider.derived_ids %} - {% for file_id in provider.derived_ids %} - {{ file_id }} - {% endfor %} - {% else %} - none - {% endif %} -
- -
- Punctured IDs - {% if provider.punctured_ids %} - {% for file_id in provider.punctured_ids %} - {{ file_id }} - {% endfor %} - {% else %} - none - {% endif %} -
- - {% if provider.key_rows %} - {% for key in provider.key_rows %} -
-
-
File/Time ID {{ key.file_time_id }}
-
- {% if key.ever_derived %}Derived{% endif %} - {% if key.ever_punctured %}Punctured{% endif %} -
-
-
{{ key.path_provider }} | {{ key.path_resource }}
-
Derived count: {{ key.derive_count }} | Puncture count: {{ key.puncture_count }}
-
Last derived: {{ key.last_derived_at or 'never' }} | Last punctured: {{ key.last_punctured_at or 'never' }}
-
- - - - -
- -
-
-
- {% endfor %} - {% else %} -

No key IDs tracked for this provider yet.

- {% endif %} -
- -
- -
- -
-
-
- {% endfor %} - {% else %} -

No active providers. Add one above.

- {% endif %} -
- -
-

Deleted Providers (Audit)

- {% if deleted_providers %} - {% for item in deleted_providers %} -
-
ID {{ item.provider_id }} - {{ item.name }} at {{ item.deleted_at }}
-
Prefix puncture: {{ item.prefix }} | Structural change: {{ 'yes' if item.applied else 'no' }}
-
- {% endfor %} - {% else %} -

No providers deleted yet.

- {% endif %} -
-
- - - -""" - - -ASSETS_HTML = """ - - - - - - Asset Workflow - - - -
-
- -

Asset Workflow

-

State machine lifecycle: upload -> eligible -> encrypted (live/partial/blocked). Deselect anytime before encryption.

-

Each ciphertext mapping now includes direct decryption to filesystem when its key is still accessible.

-

-
-
Cleartext Files
0
-
Ciphertext Mappings
0
-
Blocked Mappings
0
-
Glow Mappings
0
-
-
- -
{% if initial_notice %}{{ initial_notice.message }}{% endif %}
- -
-

Single Lifecycle Flow

-
-
-

1) Choose Files

- - - - -
- - Choose files to enable upload. -
- -

2) Eligible Files

-
- - -
-
-
- 0 selected - 0 eligible -
-
- -
-

3) Encrypt Selection

- - - - - - - - - - - - -
- - -
-

Ciphertexts are saved to filesystem immediately after encryption.

-
-
-
- -
-

Asset Lifecycle States

-
-
- -
-
-

Asset-Centric Mappings

-
-
-
-

Key-Centric Usage

-
-
-
-
- - - - -""" - - -def create_app() -> Flask: - app = Flask(__name__) - app.config["system"] = _new_system() - - def _system() -> Dict[str, Any]: - return app.config["system"] - - def _compute_progress(system: Dict[str, Any]) -> Dict[str, Any]: - history = system["history"] - derived_once = any(item["action"] == "derive" and item["status"] == "derived" for item in history) - punctured_once = any(item["action"] == "puncture" and item["status"] == "applied" for item in history) - verified_void = any(item["action"] == "derive" and item["status"] == "void" for item in history) - - done = sum([derived_once, punctured_once, verified_void]) - return { - "derived_once": derived_once, - "punctured_once": punctured_once, - "verified_void": verified_void, - "done": done, - "percent": int((done / 3) * 100), - } - - def _provider_rows(system: Dict[str, Any]) -> list[Dict[str, Any]]: - rows = [] - journal = system["key_journal"] - for provider_id in sorted(system["providers"].keys()): - row = dict(system["providers"][provider_id]) - row["prefix"] = provider_id_to_prefix(provider_id) - - keys = [] - for entry in journal.values(): - if entry["provider_id"] == provider_id: - keys.append(dict(entry)) - keys.sort(key=lambda item: item["file_time_id"]) - - row["key_rows"] = keys - row["keys"] = keys - row["key_count"] = len(keys) - row["derived_count"] = sum(1 for item in keys if item["ever_derived"]) - row["punctured_count"] = sum(1 for item in keys if item["ever_punctured"]) - row["derived_ids"] = [item["file_time_id"] for item in keys if item["ever_derived"]] - row["punctured_ids"] = [item["file_time_id"] for item in keys if item["ever_punctured"]] - rows.append(row) - return rows - - def _set_provider_notice(system: Dict[str, Any], tone: str, message: str) -> None: - system["providers_notice"] = {"tone": tone, "message": message} - - def _set_asset_notice(system: Dict[str, Any], tone: str, message: str) -> None: - system["asset_notice"] = {"tone": tone, "message": message} - - def _normalize_unique_relpaths(raw_relpaths: list[str]) -> list[str]: - normalized = [_normalize_relpath(path) for path in raw_relpaths if str(path).strip()] - if not normalized: - raise ValueError("select existing files or upload files before encrypting") - # de-duplicate while preserving order - return list(dict.fromkeys(normalized)) - - def _persist_uploaded_files(system: Dict[str, Any]) -> list[str]: - target_subdir = request.form.get("target_subdir", "").strip() - target_prefix = "" - if target_subdir: - target_prefix = _normalize_relpath(target_subdir).strip("/") - - files = request.files.getlist("files") - saved: list[str] = [] - for item in files: - filename = secure_filename(item.filename or "") - if not filename: - continue - desired_rel = os.path.join(target_prefix, filename) if target_prefix else filename - final_rel = _next_plaintext_relpath(system["asset_root"], desired_rel) - final_abs = _asset_abs_path(system["asset_root"], final_rel) - os.makedirs(os.path.dirname(final_abs), exist_ok=True) - item.save(final_abs) - saved.append(final_rel) - return saved - - def _encrypt_plaintext_relpaths( - system: Dict[str, Any], - *, - plaintext_relpaths: list[str], - provider_id: int, - file_time_id: int, - purpose: str, - ) -> Dict[str, Any]: - manager: PuncturableKeyManager = system["manager"] - path = tag_to_binary_path(provider_id, file_time_id) - key = manager.get_key_for_tag(path) - if key is None: - raise ValueError("selected key is punctured/inaccessible") - - _touch_key_derive( - system, - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - description=purpose, - ) - - saved: list[tuple[str, str]] = [] - errors: list[str] = [] - for plaintext_relpath in plaintext_relpaths: - try: - plaintext_abs = _asset_abs_path(system["asset_root"], plaintext_relpath) - if not os.path.isfile(plaintext_abs): - raise ValueError("cleartext file does not exist") - - with open(plaintext_abs, "rb") as in_file: - plaintext = in_file.read() - - encrypted_blob = _encrypt_blob(key, plaintext) - ciphertext_relpath = _next_ciphertext_relpath( - system["asset_root"], - plaintext_relpath, - provider_id, - file_time_id, - ) - ciphertext_abs = _asset_abs_path(system["asset_root"], ciphertext_relpath) - os.makedirs(os.path.dirname(ciphertext_abs), exist_ok=True) - with open(ciphertext_abs, "wb") as out_file: - out_file.write(encrypted_blob) - - system["asset_records"].append( - { - "record_id": len(system["asset_records"]) + 1, - "plaintext_relpath": plaintext_relpath, - "ciphertext_relpath": ciphertext_relpath, - "provider_id": provider_id, - "file_time_id": file_time_id, - "path": path, - "purpose": purpose, - "created_at": _utc_now_label(), - "plaintext_size": len(plaintext), - "ciphertext_size": len(encrypted_blob), - "decrypt_count": 0, - "last_decrypted_at": None, - "last_decrypted_relpath": None, - } - ) - saved.append((plaintext_relpath, ciphertext_relpath)) - except Exception as item_exc: - errors.append(f"{plaintext_relpath}: {item_exc}") - - if not saved: - raise ValueError("; ".join(errors[:3]) or "no file could be encrypted") - - return {"path": path, "saved": saved, "errors": errors} - - def _decrypt_asset_records(system: Dict[str, Any], *, record_ids: list[int]) -> Dict[str, Any]: - manager: PuncturableKeyManager = system["manager"] - index: Dict[int, Dict[str, Any]] = { - int(item["record_id"]): item for item in system["asset_records"] if "record_id" in item - } - - restored: list[tuple[int, str, str]] = [] - errors: list[str] = [] - for record_id in record_ids: - row = index.get(record_id) - if row is None: - errors.append(f"record {record_id}: not found") - continue - - try: - key = manager.get_key_for_tag(str(row["path"])) - if key is None: - raise ValueError("key is punctured/inaccessible") - - ciphertext_abs = _asset_abs_path(system["asset_root"], str(row["ciphertext_relpath"])) - if not os.path.isfile(ciphertext_abs): - raise ValueError("ciphertext file missing from filesystem") - - with open(ciphertext_abs, "rb") as in_file: - blob = in_file.read() - - plaintext = _decrypt_blob(key, blob) - decrypted_relpath = _next_decrypted_relpath( - system["asset_root"], - str(row["plaintext_relpath"]), - int(row["provider_id"]), - int(row["file_time_id"]), - ) - decrypted_abs = _asset_abs_path(system["asset_root"], decrypted_relpath) - os.makedirs(os.path.dirname(decrypted_abs), exist_ok=True) - with open(decrypted_abs, "wb") as out_file: - out_file.write(plaintext) - - row["decrypt_count"] = int(row.get("decrypt_count", 0)) + 1 - row["last_decrypted_at"] = _utc_now_label() - row["last_decrypted_relpath"] = decrypted_relpath - - restored.append((record_id, str(row["ciphertext_relpath"]), decrypted_relpath)) - except Exception as item_exc: - errors.append(f"record {record_id}: {item_exc}") - - if not restored: - raise ValueError("; ".join(errors[:3]) or "no ciphertext could be decrypted") - return {"restored": restored, "errors": errors} - - def _key_combo_options(system: Dict[str, Any]) -> list[Dict[str, Any]]: - options: list[Dict[str, Any]] = [] - for row in sorted( - [dict(item) for item in system["key_journal"].values()], - key=lambda item: (item["provider_id"], item["file_time_id"]), - ): - status = "blocked" if row["ever_punctured"] else "active" - options.append( - { - "provider_id": row["provider_id"], - "file_time_id": row["file_time_id"], - "status": status, - "label": f"Provider {row['provider_id']} | Key {row['file_time_id']} | {status}", - } - ) - return options - - def _asset_workflow_snapshot(system: Dict[str, Any]) -> Dict[str, Any]: - dashboard = _asset_dashboard(system) - plain_rows = _list_plaintext_rows(system["asset_root"]) - providers = [ - {"provider_id": item["provider_id"], "name": item["name"]} - for item in _provider_rows(system) - ] - - mapped_by_relpath = {item["plaintext_relpath"]: item for item in dashboard["asset_files"]} - files = [] - for row in plain_rows: - mapped = mapped_by_relpath.get(row["relpath"]) - mapping_count = int(mapped["mapping_count"]) if mapped else 0 - blocked_count = int(mapped["blocked_count"]) if mapped else 0 - lifecycle_state = _asset_lifecycle_state(mapping_count, blocked_count) - files.append( - { - **row, - "mapping_count": mapping_count, - "blocked_count": blocked_count, - "lifecycle_state": lifecycle_state, - "lifecycle_label": _asset_lifecycle_label(lifecycle_state), - } - ) - - return { - "generated_at": _utc_now_label(), - "asset_root": system["asset_root"], - "stats": { - "cleartext_count": len(files), - "mapping_count": int(dashboard["mapping_count"]), - "blocked_count": int(dashboard["blocked_count"]), - "glow_count": int(dashboard["glow_count"]), - }, - "files": files, - "providers": providers, - "key_combo_options": _key_combo_options(system), - "last_inputs": dict(system["last_inputs"]), - "asset_files": dashboard["asset_files"], - "key_cards": dashboard["key_cards"], - } - - def _mark_known_provider_keys_punctured(system: Dict[str, Any], provider_id: int) -> int: - touched = 0 - stamp = _utc_now_label() - for entry in system["key_journal"].values(): - if int(entry["provider_id"]) != provider_id: - continue - if not entry["ever_punctured"]: - entry["puncture_count"] += 1 - entry["ever_punctured"] = True - entry["last_punctured_at"] = stamp - touched += 1 - return touched - - def _asset_dashboard(system: Dict[str, Any]) -> Dict[str, Any]: - manager: PuncturableKeyManager = system["manager"] - records = [dict(item) for item in system["asset_records"]] - - file_map: Dict[str, list[Dict[str, Any]]] = {} - key_map: Dict[str, Dict[str, Any]] = {} - blocked_total = 0 - glow_total = 0 - - for row in records: - path = row["path"] - row["is_accessible"] = manager.get_key_for_tag(path) is not None - row["path_provider"], row["path_resource"] = _split_path_bits(path) - if not row["is_accessible"]: - blocked_total += 1 - - file_map.setdefault(row["plaintext_relpath"], []).append(row) - - key_id = f"{row['provider_id']}:{row['file_time_id']}:{path}" - bucket = key_map.get(key_id) - if bucket is None: - bucket = { - "provider_id": row["provider_id"], - "file_time_id": row["file_time_id"], - "path": path, - "path_provider": row["path_provider"], - "path_resource": row["path_resource"], - "files": set(), - "is_accessible": row["is_accessible"], - } - key_map[key_id] = bucket - bucket["files"].add(row["plaintext_relpath"]) - bucket["is_accessible"] = bucket["is_accessible"] and row["is_accessible"] - - file_cards: list[Dict[str, Any]] = [] - for plaintext_relpath in sorted(file_map.keys()): - mappings = file_map[plaintext_relpath] - mappings.sort(key=lambda item: item["created_at"]) - blocked_count = sum(1 for item in mappings if not item["is_accessible"]) - for item in mappings: - item["show_red"] = not item["is_accessible"] - item["show_glow"] = item["is_accessible"] and blocked_count > 0 - if item["show_glow"]: - glow_total += 1 - - file_cards.append( - { - "plaintext_relpath": plaintext_relpath, - "mapping_count": len(mappings), - "blocked_count": blocked_count, - "mappings": mappings, - } - ) - - key_cards: list[Dict[str, Any]] = [] - for bucket in key_map.values(): - key_cards.append( - { - "provider_id": bucket["provider_id"], - "file_time_id": bucket["file_time_id"], - "path": bucket["path"], - "path_provider": bucket["path_provider"], - "path_resource": bucket["path_resource"], - "file_count": len(bucket["files"]), - "files": sorted(bucket["files"]), - "is_accessible": bucket["is_accessible"], - } - ) - key_cards.sort(key=lambda item: (item["provider_id"], item["file_time_id"])) - - return { - "asset_files": file_cards, - "key_cards": key_cards, - "mapping_count": len(records), - "blocked_count": blocked_total, - "glow_count": glow_total, - } - - def _remote_token_valid() -> bool: - configured = os.getenv("PUNCTURE_REMOTE_TOKEN", "").strip() - if not configured: - return True - supplied = request.headers.get("X-Puncture-Token", "") - return hmac.compare_digest(supplied, configured) - - @app.get("/") - def index() -> str: - sys = _system() - manager: PuncturableKeyManager = sys["manager"] - return render_template_string( - HTML, - tree_viz=_tree_visualization_bundle(sys, manager), - active_nodes=manager.active_node_count, - active_frontier=_active_frontier_rows(manager), - puncture_count=len(manager.puncture_log()), - puncture_log_json=manager.export_puncture_log_json(), - history=sys["history"], - progress=_compute_progress(sys), - last_action=sys["last_action"], - last_inputs=sys["last_inputs"], - ) - - @app.get("/assets") - def assets_page() -> str: - sys = _system() - snapshot = _asset_workflow_snapshot(sys) - return render_template_string( - ASSETS_HTML, - snapshot=snapshot, - initial_notice=sys["asset_notice"], - ) - - @app.get("/api/assets/workflow") - def api_assets_workflow() -> Dict[str, Any]: - sys = _system() - return {"ok": True, "state": _asset_workflow_snapshot(sys)} - - @app.post("/api/assets/workflow/upload") - def api_assets_workflow_upload() -> Dict[str, Any]: - sys = _system() - try: - saved = _persist_uploaded_files(sys) - if not saved: - raise ValueError("choose at least one file to upload") - - preview = ", ".join(saved[:3]) - extra = "" if len(saved) <= 3 else f" and {len(saved) - 3} more" - message = f"Uploaded {len(saved)} file(s): {preview}{extra}." - _set_asset_notice(sys, "success", message) - _record_history( - sys, - action="asset-upload", - status="uploaded", - summary=f"Uploaded {len(saved)} cleartext file(s) into asset root.", - ) - return { - "ok": True, - "uploaded": saved, - "message": message, - "state": _asset_workflow_snapshot(sys), - } - except Exception as exc: - message = f"Upload failed: {exc}" - _set_asset_notice(sys, "danger", message) - _record_history(sys, action="asset-upload", status="error", summary=message) - return {"ok": False, "error": str(exc), "state": _asset_workflow_snapshot(sys)}, 400 - - @app.post("/api/assets/workflow/clear") - def api_assets_workflow_clear() -> Dict[str, Any]: - sys = _system() - sys["last_inputs"] = {"provider_id": 42, "file_time_id": 123456, "purpose": ""} - message = "Cleared saved encryption decisions." - _set_asset_notice(sys, "info", message) - _record_history( - sys, - action="asset-decisions", - status="cleared", - summary="User cleared saved asset form decisions.", - ) - return {"ok": True, "message": message, "state": _asset_workflow_snapshot(sys)} - - @app.post("/api/assets/workflow/encrypt") - def api_assets_workflow_encrypt() -> Dict[str, Any]: - sys = _system() - payload = request.get_json(silent=True) or {} - try: - raw_relpaths = payload.get("plaintext_relpaths", []) - if not isinstance(raw_relpaths, list): - raise ValueError("plaintext_relpaths must be a list") - plaintext_relpaths = _normalize_unique_relpaths([str(item) for item in raw_relpaths]) - - provider_id = int(payload.get("provider_id")) - file_time_id = int(payload.get("file_time_id")) - purpose = str(payload.get("purpose", "")).strip() - sys["last_inputs"] = { - "provider_id": provider_id, - "file_time_id": file_time_id, - "purpose": purpose, - } - - result = _encrypt_plaintext_relpaths( - sys, - plaintext_relpaths=plaintext_relpaths, - provider_id=provider_id, - file_time_id=file_time_id, - purpose=purpose, - ) - saved = result["saved"] - errors = result["errors"] - preview = ", ".join(f"{plain} -> {cipher}" for plain, cipher in saved[:2]) - extra = "" if len(saved) <= 2 else f" and {len(saved) - 2} more" - - if errors: - message = ( - f"Encrypted {len(saved)} file(s) with provider={provider_id}, key_id={file_time_id}: " - f"{preview}{extra}. Failed: {len(errors)}." - ) - _set_asset_notice(sys, "warn", message) - else: - message = ( - f"Encrypted {len(saved)} file(s) with provider={provider_id}, key_id={file_time_id}: " - f"{preview}{extra}." - ) - _set_asset_notice(sys, "success", message) - - _record_history( - sys, - action="asset-encrypt", - status="encrypted", - summary=( - f"Encrypted {len(saved)} file(s) with provider={provider_id}, file_time={file_time_id}. " - + (f"Failures: {len(errors)}." if errors else "") - ), - provider_id=provider_id, - file_time_id=file_time_id, - path=result["path"], - ) - return { - "ok": True, - "saved": [{"plaintext_relpath": p, "ciphertext_relpath": c} for p, c in saved], - "errors": errors, - "message": message, - "state": _asset_workflow_snapshot(sys), - } - except Exception as exc: - message = f"Encryption failed: {exc}" - _set_asset_notice(sys, "danger", message) - _record_history(sys, action="asset-encrypt", status="error", summary=message) - return {"ok": False, "error": str(exc), "state": _asset_workflow_snapshot(sys)}, 400 - - @app.post("/api/assets/workflow/decrypt") - def api_assets_workflow_decrypt() -> Dict[str, Any]: - sys = _system() - payload = request.get_json(silent=True) or {} - try: - raw_ids = payload.get("record_ids", []) - if not isinstance(raw_ids, list): - raise ValueError("record_ids must be a list") - if not raw_ids: - raise ValueError("select at least one ciphertext mapping to decrypt") - - record_ids: list[int] = [] - for value in raw_ids: - record_ids.append(int(value)) - record_ids = list(dict.fromkeys(record_ids)) - - result = _decrypt_asset_records(sys, record_ids=record_ids) - restored = result["restored"] - errors = result["errors"] - preview = ", ".join(f"{src} -> {dst}" for _, src, dst in restored[:2]) - extra = "" if len(restored) <= 2 else f" and {len(restored) - 2} more" - - if errors: - message = f"Decrypted {len(restored)} mapping(s): {preview}{extra}. Failed: {len(errors)}." - _set_asset_notice(sys, "warn", message) - else: - message = f"Decrypted {len(restored)} mapping(s): {preview}{extra}." - _set_asset_notice(sys, "success", message) - - _record_history( - sys, - action="asset-decrypt", - status="decrypted", - summary=f"Decrypted {len(restored)} mapping(s)." + (f" Failures: {len(errors)}." if errors else ""), - ) - return { - "ok": True, - "restored": [ - {"record_id": record_id, "ciphertext_relpath": src, "decrypted_relpath": dst} - for record_id, src, dst in restored - ], - "errors": errors, - "message": message, - "state": _asset_workflow_snapshot(sys), - } - except Exception as exc: - message = f"Decryption failed: {exc}" - _set_asset_notice(sys, "danger", message) - _record_history(sys, action="asset-decrypt", status="error", summary=message) - return {"ok": False, "error": str(exc), "state": _asset_workflow_snapshot(sys)}, 400 - - @app.post("/assets/upload") - def asset_upload() -> Any: - sys = _system() - try: - saved = _persist_uploaded_files(sys) - if not saved: - raise ValueError("no files were selected") - - preview = ", ".join(saved[:3]) - extra = "" if len(saved) <= 3 else f" and {len(saved) - 3} more" - _set_asset_notice( - sys, - "success", - f"Uploaded {len(saved)} cleartext file(s): {preview}{extra}.", - ) - _record_history( - sys, - action="asset-upload", - status="uploaded", - summary=f"Uploaded {len(saved)} cleartext file(s) into asset root.", - ) - except Exception as exc: - _set_asset_notice(sys, "danger", f"Upload failed: {exc}") - _record_history(sys, action="asset-upload", status="error", summary=f"Upload failed: {exc}") - return redirect(url_for("assets_page")) - - @app.post("/assets/encrypt") - def asset_encrypt() -> Any: - sys = _system() - try: - operation = request.form.get("operation", "encrypt").strip().lower() - if operation == "wipe": - sys["last_inputs"] = { - "provider_id": 42, - "file_time_id": 123456, - "purpose": "", - } - _set_asset_notice(sys, "info", "Cleared saved form decisions for asset encryption.") - _record_history( - sys, - action="asset-decisions", - status="cleared", - summary="User cleared saved asset form decisions.", - ) - return redirect(url_for("assets_page")) - - uploaded_relpaths = _persist_uploaded_files(sys) - include_uploads = request.form.get("include_uploads") == "1" - - raw_relpaths = request.form.getlist("plaintext_relpaths") - if not raw_relpaths: - fallback = request.form.get("plaintext_relpath", "").strip() - if fallback: - raw_relpaths = [fallback] - if include_uploads: - raw_relpaths.extend(uploaded_relpaths) - plaintext_relpaths = _normalize_unique_relpaths(raw_relpaths) - provider_id = int(request.form["provider_id"]) - file_time_id = int(request.form["file_time_id"]) - purpose = request.form.get("purpose", "").strip() - sys["last_inputs"] = { - "provider_id": provider_id, - "file_time_id": file_time_id, - "purpose": purpose, - } - - result = _encrypt_plaintext_relpaths( - sys, - plaintext_relpaths=plaintext_relpaths, - provider_id=provider_id, - file_time_id=file_time_id, - purpose=purpose, - ) - saved = result["saved"] - errors = result["errors"] - - preview = ", ".join(f"{plain} -> {cipher}" for plain, cipher in saved[:2]) - extra = "" if len(saved) <= 2 else f" and {len(saved) - 2} more" - if errors: - _set_asset_notice( - sys, - "warn", - ( - f"Encrypted {len(saved)} file(s) with provider={provider_id}, key_id={file_time_id}: " - f"{preview}{extra}. Uploads saved: {len(uploaded_relpaths)}. Failed: {len(errors)}." - ), - ) - else: - _set_asset_notice( - sys, - "success", - ( - f"Encrypted {len(saved)} file(s) with provider={provider_id}, key_id={file_time_id}: " - f"{preview}{extra}. Uploads saved: {len(uploaded_relpaths)}." - ), - ) - _record_history( - sys, - action="asset-encrypt", - status="encrypted", - summary=( - f"Encrypted {len(saved)} file(s) with provider={provider_id}, file_time={file_time_id}. " - + f"Uploads saved: {len(uploaded_relpaths)}. " - + (f"Failures: {len(errors)}." if errors else "") - ), - provider_id=provider_id, - file_time_id=file_time_id, - path=result["path"], - ) - except Exception as exc: - _set_asset_notice(sys, "danger", f"Encryption failed: {exc}") - _record_history(sys, action="asset-encrypt", status="error", summary=f"Encryption failed: {exc}") - return redirect(url_for("assets_page")) - - @app.get("/providers") - def providers_page() -> str: - sys = _system() - return render_template_string( - PROVIDERS_HTML, - providers=_provider_rows(sys), - deleted_providers=sys["deleted_providers"], - active_count=len(sys["providers"]), - deleted_count=len(sys["deleted_providers"]), - notice=sys["providers_notice"], - ) - - @app.post("/providers/add") - def provider_add() -> Any: - sys = _system() - try: - provider_id = int(request.form["provider_id"]) - name = request.form["name"].strip() - description = request.form.get("description", "").strip() - - if not name: - raise ValueError("Display name is required.") - if provider_id in sys["providers"]: - raise ValueError(f"Provider ID {provider_id} already exists.") - - # Validation for provider ID bit range. - provider_id_to_prefix(provider_id) - - sys["providers"][provider_id] = { - "provider_id": provider_id, - "name": name, - "description": description, - "created_at": _utc_now_label(), - } - _set_provider_notice(sys, "success", f"Added provider {provider_id}: {name}.") - _record_history( - sys, - action="provider-add", - status="added", - summary=f"Added provider {provider_id} ({name}).", - ) - except Exception as exc: - _set_provider_notice(sys, "danger", f"Add failed: {exc}") - _record_history(sys, action="provider-add", status="error", summary=f"Add failed: {exc}") - return redirect(url_for("providers_page")) - - @app.post("/providers/edit") - def provider_edit() -> Any: - sys = _system() - try: - provider_id = int(request.form["provider_id"]) - name = request.form["name"].strip() - description = request.form.get("description", "").strip() - - if provider_id not in sys["providers"]: - raise ValueError(f"Provider ID {provider_id} does not exist.") - if not name: - raise ValueError("Display name is required.") - - sys["providers"][provider_id]["name"] = name - sys["providers"][provider_id]["description"] = description - _set_provider_notice(sys, "success", f"Updated provider {provider_id}.") - _record_history( - sys, - action="provider-edit", - status="updated", - summary=f"Updated provider {provider_id}.", - ) - except Exception as exc: - _set_provider_notice(sys, "danger", f"Edit failed: {exc}") - _record_history(sys, action="provider-edit", status="error", summary=f"Edit failed: {exc}") - return redirect(url_for("providers_page")) - - @app.post("/providers/key-note") - def provider_key_note_update() -> Any: - sys = _system() - try: - provider_id = int(request.form["provider_id"]) - file_time_id = int(request.form["file_time_id"]) - description = request.form.get("description", "").strip() - - if provider_id not in sys["providers"]: - raise ValueError(f"Provider ID {provider_id} does not exist.") - - path = tag_to_binary_path(provider_id, file_time_id) - entry = _ensure_key_entry(sys, provider_id=provider_id, file_time_id=file_time_id, path=path) - entry["description"] = description - - _set_provider_notice( - sys, - "success", - f"Saved key purpose for provider={provider_id}, file/time={file_time_id}.", - ) - _record_history( - sys, - action="key-note-edit", - status="updated", - summary=f"Updated key purpose for provider={provider_id}, file/time={file_time_id}.", - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - ) - except Exception as exc: - _set_provider_notice(sys, "danger", f"Key note update failed: {exc}") - _record_history(sys, action="key-note-edit", status="error", summary=f"Key note update failed: {exc}") - return redirect(url_for("providers_page")) - - @app.post("/providers/delete") - def provider_delete() -> Any: - sys = _system() - manager: PuncturableKeyManager = sys["manager"] - try: - provider_id = int(request.form["provider_id"]) - provider = sys["providers"].pop(provider_id, None) - if provider is None: - raise ValueError(f"Provider ID {provider_id} does not exist.") - - prefix = provider_id_to_prefix(provider_id) - before_frontier = manager.active_prefixes() - applied = manager.puncture_prefix(prefix) - after_frontier = manager.active_prefixes() - _set_last_puncture_diff( - sys, - before_frontier=before_frontier, - after_frontier=after_frontier, - target_bitstring=prefix, - target_kind="provider-prefix", - ) - touched = _mark_known_provider_keys_punctured(sys, provider_id) - - sys["deleted_providers"].insert( - 0, - { - "provider_id": provider_id, - "name": provider["name"], - "prefix": prefix, - "deleted_at": _utc_now_label(), - "applied": applied, - }, - ) - del sys["deleted_providers"][32:] - - _set_provider_notice( - sys, - "warn", - ( - f"Deleted provider {provider_id}. Prefix {prefix} punctured across its full subtree; " - f"structural change: {'yes' if applied else 'no'}. " - f"Known key IDs marked punctured: {touched}." - ), - ) - _set_last_action( - sys, - tone="warn", - title="Provider deleted and subtree punctured", - body=( - f"Provider {provider_id} was removed from registry and its 7-bit prefix was punctured. " - "All keys under that provider are now inaccessible." - ), - provider_id=provider_id, - ) - _record_history( - sys, - action="provider-delete", - status="punctured" if applied else "already-inaccessible", - summary=f"Deleted provider {provider_id}; prefix puncture {prefix}.", - ) - except Exception as exc: - _set_provider_notice(sys, "danger", f"Delete failed: {exc}") - _record_history(sys, action="provider-delete", status="error", summary=f"Delete failed: {exc}") - return redirect(url_for("providers_page")) - - @app.post("/derive") - def derive() -> Any: - sys = _system() - manager: PuncturableKeyManager = sys["manager"] - try: - provider_id = int(request.form["provider_id"]) - file_time_id = int(request.form["file_time_id"]) - purpose = request.form.get("purpose", "").strip() - sys["last_inputs"] = {"provider_id": provider_id, "file_time_id": file_time_id, "purpose": purpose} - - path = tag_to_binary_path(provider_id, file_time_id) - key = manager.get_key_for_tag(path) - - if key is None: - body = ( - "No key is derivable for this tag. It was punctured earlier or excluded by previous punctures. " - "This is the expected forward-secrecy behavior." - ) - _set_last_action( - sys, - tone="warn", - title="Derive blocked: key is inaccessible", - body=body, - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - ) - _record_history( - sys, - action="derive", - status="void", - summary=f"Derive blocked for provider={provider_id}, file={file_time_id}.", - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - ) - else: - key_hex = key.hex() - entry = _touch_key_derive( - sys, - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - description=purpose, - ) - body = ( - "Key derivation succeeded. Keep in mind this demo shows the key directly for learning. " - "In production, this should feed cryptographic operations without UI exposure." - ) - _set_last_action( - sys, - tone="success", - title="Derive succeeded", - body=body, - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - key_hex=key_hex, - key_description=entry["description"] or None, - ) - _record_history( - sys, - action="derive", - status="derived", - summary=( - f"Derived key for provider={provider_id}, file={file_time_id}." - + (f" Purpose: {entry['description']}." if entry["description"] else "") - ), - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - ) - - except Exception as exc: - _set_last_action( - sys, - tone="danger", - title="Input error", - body=str(exc), - ) - _record_history(sys, action="derive", status="error", summary=f"Derive failed: {exc}") - return redirect(url_for("index")) - - @app.post("/puncture") - def puncture() -> Any: - sys = _system() - manager: PuncturableKeyManager = sys["manager"] - try: - provider_id = int(request.form["provider_id"]) - file_time_id = int(request.form["file_time_id"]) - sys["last_inputs"] = { - "provider_id": provider_id, - "file_time_id": file_time_id, - "purpose": sys["last_inputs"].get("purpose", ""), - } - - path = tag_to_binary_path(provider_id, file_time_id) - before_frontier = manager.active_prefixes() - applied = manager.puncture(path) - after_frontier = manager.active_prefixes() - _set_last_puncture_diff( - sys, - before_frontier=before_frontier, - after_frontier=after_frontier, - target_bitstring=path, - target_kind="tag", - ) - entry = _touch_key_puncture( - sys, - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - applied=applied, - ) - - if applied: - body = ( - "Puncture applied. This exact tag can no longer derive a key. " - "Other tags remain derivable through the co-path node set." - ) - _set_last_action( - sys, - tone="success", - title="Puncture succeeded", - body=body, - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - key_description=entry["description"] or None, - ) - _record_history( - sys, - action="puncture", - status="applied", - summary=f"Punctured provider={provider_id}, file={file_time_id}.", - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - ) - else: - body = ( - "No change was needed. This tag was already inaccessible, likely due to an earlier puncture." - ) - _set_last_action( - sys, - tone="warn", - title="Puncture no-op", - body=body, - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - key_description=entry["description"] or None, - ) - _record_history( - sys, - action="puncture", - status="noop", - summary=f"No-op puncture for provider={provider_id}, file={file_time_id}.", - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - ) - - except Exception as exc: - _set_last_action( - sys, - tone="danger", - title="Input error", - body=str(exc), - ) - _record_history(sys, action="puncture", status="error", summary=f"Puncture failed: {exc}") - return redirect(url_for("index")) - - @app.post("/demo/scenario-a") - def run_demo_a() -> Any: - sys = _system() - manager: PuncturableKeyManager = sys["manager"] - provider_id = 42 - file_time_id = 123456 - path = tag_to_binary_path(provider_id, file_time_id) - sys["last_inputs"] = { - "provider_id": provider_id, - "file_time_id": file_time_id, - "purpose": "Scenario A demonstration key", - } - - key_before = manager.get_key_for_tag(path) - if key_before is not None: - _touch_key_derive( - sys, - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - description="Scenario A demonstration key", - ) - before_frontier = manager.active_prefixes() - punctured = manager.puncture(path) - after_frontier = manager.active_prefixes() - _set_last_puncture_diff( - sys, - before_frontier=before_frontier, - after_frontier=after_frontier, - target_bitstring=path, - target_kind="tag", - ) - _touch_key_puncture( - sys, - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - applied=punctured, - ) - key_after = manager.get_key_for_tag(path) - - passed = key_before is not None and punctured and key_after is None - if passed: - _set_last_action( - sys, - tone="success", - title="Scenario A passed", - body=( - "Demo complete: key existed before puncture, puncture was applied, and the same key is now inaccessible." - ), - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - ) - _record_history( - sys, - action="scenario-a", - status="passed", - summary="Auto-ran Scenario A successfully.", - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - ) - else: - _set_last_action( - sys, - tone="danger", - title="Scenario A failed", - body="Unexpected result; check activity timeline and state consistency.", - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - ) - _record_history( - sys, - action="scenario-a", - status="failed", - summary="Auto-ran Scenario A and it failed.", - provider_id=provider_id, - file_time_id=file_time_id, - path=path, - ) - - return redirect(url_for("index")) - - @app.post("/reset") - def reset() -> Any: - app.config["system"] = _new_system() - sys = _system() - _record_history(sys, action="system", status="reset", summary="Lab was reset with a fresh root state.") - return redirect(url_for("index")) - - @app.get("/api/state") - def api_state() -> Dict[str, Any]: - sys = _system() - manager: PuncturableKeyManager = sys["manager"] - dashboard = _asset_dashboard(sys) - key_journal_rows = sorted( - [dict(item) for item in sys["key_journal"].values()], - key=lambda row: (row["provider_id"], row["file_time_id"]), - ) - return { - "active_nodes": manager.active_node_count, - "active_prefixes": manager.active_prefixes(), - "active_frontier": _active_frontier_rows(manager), - "last_puncture_diff": sys["last_puncture_diff"], - "puncture_log": manager.puncture_log(), - "last_action": sys["last_action"], - "history": sys["history"], - "providers": list(_provider_rows(sys)), - "deleted_providers": sys["deleted_providers"], - "key_journal": key_journal_rows, - "asset_root": sys["asset_root"], - "assets": dashboard, - } - - @app.get("/api/export") - def api_export() -> Dict[str, Any]: - manager: PuncturableKeyManager = _system()["manager"] - return manager.export_state() - - @app.get("/api/view-bundle") - def api_view_bundle() -> Dict[str, Any]: - sys = _system() - manager: PuncturableKeyManager = sys["manager"] - payload = build_view_payload(sys, manager.puncture_log()) - sync_key = os.getenv("PUNCTURE_VIEW_SYNC_KEY", "").strip() or None - return wrap_view_bundle(payload, sync_key) - - @app.get("/api/live/state") - def api_live_state() -> Dict[str, Any]: - sys = _system() - manager: PuncturableKeyManager = sys["manager"] - dashboard = _asset_dashboard(sys) - key_journal_rows = sorted( - [dict(item) for item in sys["key_journal"].values()], - key=lambda row: (row["provider_id"], row["file_time_id"]), - ) - return { - "generated_at": _utc_now_label(), - "active_nodes": manager.active_node_count, - "active_prefixes": manager.active_prefixes(), - "active_frontier": _active_frontier_rows(manager), - "last_puncture_diff": sys["last_puncture_diff"], - "providers": _provider_rows(sys), - "key_journal": key_journal_rows, - "assets": dashboard, - "asset_root": sys["asset_root"], - } - - @app.post("/api/remote/puncture-provider") - def api_remote_puncture_provider() -> Dict[str, Any]: - if not _remote_token_valid(): - return {"ok": False, "error": "unauthorized"}, 403 - - sys = _system() - manager: PuncturableKeyManager = sys["manager"] - payload = request.get_json(silent=True) or {} - try: - raw_provider_id = payload.get("provider_id", request.form.get("provider_id")) - provider_id = int(raw_provider_id) - provider_id_to_prefix(provider_id) - - before_frontier = manager.active_prefixes() - applied = manager.puncture_provider(provider_id) - after_frontier = manager.active_prefixes() - provider_prefix = provider_id_to_prefix(provider_id) - _set_last_puncture_diff( - sys, - before_frontier=before_frontier, - after_frontier=after_frontier, - target_bitstring=provider_prefix, - target_kind="provider-prefix", - ) - touched = _mark_known_provider_keys_punctured(sys, provider_id) - - _set_last_action( - sys, - tone="warn", - title="Remote kill-switch puncture", - body=( - f"Remote command punctured provider {provider_id}. " - "All keys under this provider prefix are now blocked." - ), - provider_id=provider_id, - ) - _record_history( - sys, - action="remote-provider-puncture", - status="punctured" if applied else "already-inaccessible", - summary=f"Remote puncture on provider {provider_id}.", - provider_id=provider_id, - ) - - return { - "ok": True, - "provider_id": provider_id, - "applied": applied, - "known_key_rows_marked": touched, - "puncture_count": len(manager.puncture_log()), - } - except Exception as exc: - return {"ok": False, "error": str(exc)}, 400 - - @app.post("/api/import") - def api_import() -> Dict[str, Any]: - payload = request.get_json(force=True) - manager = PuncturableKeyManager.from_state(payload) - sys = _system() - sys["manager"] = manager - _set_last_action( - sys, - tone="info", - title="State imported", - body="Imported active-node state and puncture log.", - ) - _record_history(sys, action="system", status="imported", summary="Imported manager state via API.") - return {"ok": True, "active_nodes": manager.active_node_count} - - @app.post("/api/puncture-log") - def api_apply_puncture_log() -> Dict[str, Any]: - payload = request.get_json(force=True) - paths = payload.get("paths", []) - if not isinstance(paths, list): - return {"ok": False, "error": "paths must be a list"}, 400 - - manager: PuncturableKeyManager = _system()["manager"] - applied = manager.apply_puncture_log(paths) - return { - "ok": True, - "applied": applied, - "puncture_count": len(manager.puncture_log()), - "active_nodes": manager.active_node_count, - } - - return app - - -def main() -> None: - parser = argparse.ArgumentParser(description="Run puncture web app") - parser.add_argument("--host", default=os.getenv("PUNCTURE_HOST", "0.0.0.0")) - parser.add_argument("--port", type=int, default=int(os.getenv("PUNCTURE_PORT", "9122"))) - args = parser.parse_args() - - app = create_app() - app.run(host=args.host, port=args.port, debug=False) - - -if __name__ == "__main__": - main() diff --git a/pyproject.toml b/pyproject.toml deleted file mode 100644 index 2771935..0000000 --- a/pyproject.toml +++ /dev/null @@ -1,14 +0,0 @@ -[build-system] -requires = ["setuptools>=68", "wheel"] -build-backend = "setuptools.build_meta" - -[project] -name = "puncture" -version = "0.1.0" -description = "Zero-trust cloud-wide forward secrecy via puncturable GGM keys" -requires-python = ">=3.10" -dependencies = ["flask>=3.0.0"] - -[tool.pytest.ini_options] -pythonpath = ["."] -addopts = "-q" diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index bacdfe9..0000000 --- a/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -flask>=3.0.0 -pytest>=8.0.0 diff --git a/tests/test_asset_helpers_and_ui.py b/tests/test_asset_helpers_and_ui.py deleted file mode 100644 index 297a473..0000000 --- a/tests/test_asset_helpers_and_ui.py +++ /dev/null @@ -1,450 +0,0 @@ -import hashlib -import hmac -from io import BytesIO -from pathlib import Path - -from puncture.web_app import ( - ENC_MAGIC, - ENC_NONCE_SIZE, - _asset_abs_path, - _asset_lifecycle_state, - _decrypt_blob, - _encrypt_blob, - _list_plaintext_rows, - _next_ciphertext_relpath, - _next_plaintext_relpath, - _normalize_relpath, - create_app, -) - - -def test_normalize_relpath_rejects_absolute_and_traversal() -> None: - assert _normalize_relpath("docs/a.txt") == "docs/a.txt" - - try: - _normalize_relpath("/etc/passwd") - assert False, "absolute paths must fail" - except ValueError: - pass - - try: - _normalize_relpath("../secret.txt") - assert False, "path traversal must fail" - except ValueError: - pass - - -def test_asset_abs_path_stays_inside_root(tmp_path: Path) -> None: - root = tmp_path / "assets" - root.mkdir() - - ok = _asset_abs_path(str(root), "a/b.txt") - assert ok.startswith(str(root)) - - try: - _asset_abs_path(str(root), "../escape.txt") - assert False, "escape should fail" - except ValueError: - pass - - -def test_next_plaintext_relpath_versions_collisions(tmp_path: Path) -> None: - root = tmp_path / "assets" - (root / "docs").mkdir(parents=True) - (root / "docs" / "a.txt").write_text("x", encoding="utf-8") - (root / "docs" / "a.v2.txt").write_text("x", encoding="utf-8") - - candidate = _next_plaintext_relpath(str(root), "docs/a.txt") - assert candidate == "docs/a.v3.txt" - - -def test_next_ciphertext_relpath_versions_collisions(tmp_path: Path) -> None: - root = tmp_path / "assets" - (root / "docs").mkdir(parents=True) - first = _next_ciphertext_relpath(str(root), "docs/a.txt", 42, 123) - assert first == "docs/a.txt.enc.p42.k123.pke" - (root / first).write_bytes(b"x") - second = _next_ciphertext_relpath(str(root), "docs/a.txt", 42, 123) - assert second == "docs/a.txt.enc.p42.k123.v2.pke" - - -def test_encrypt_blob_has_expected_format_and_tag() -> None: - key = b"k" * 32 - plaintext = b"hello-world" - blob = _encrypt_blob(key, plaintext) - - assert blob.startswith(ENC_MAGIC) - nonce = blob[len(ENC_MAGIC) : len(ENC_MAGIC) + ENC_NONCE_SIZE] - tag = blob[len(ENC_MAGIC) + ENC_NONCE_SIZE : len(ENC_MAGIC) + ENC_NONCE_SIZE + 32] - ciphertext = blob[len(ENC_MAGIC) + ENC_NONCE_SIZE + 32 :] - - assert len(nonce) == ENC_NONCE_SIZE - assert len(tag) == 32 - assert len(ciphertext) == len(plaintext) - assert ciphertext != plaintext - - expected_tag = hmac.new(key, b"TAG" + nonce + ciphertext, hashlib.sha256).digest() - assert tag == expected_tag - - -def test_decrypt_blob_roundtrip_and_authentication() -> None: - key = b"z" * 32 - plaintext = b"secret-content" - blob = _encrypt_blob(key, plaintext) - assert _decrypt_blob(key, blob) == plaintext - - tampered = bytearray(blob) - tampered[-1] ^= 0xFF - try: - _decrypt_blob(key, bytes(tampered)) - assert False, "tampering should fail authentication" - except ValueError as exc: - assert "authentication" in str(exc) - - -def test_list_plaintext_rows_excludes_ciphertexts(tmp_path: Path) -> None: - root = tmp_path / "assets" - root.mkdir() - (root / "a.txt").write_text("a", encoding="utf-8") - (root / "a.txt.enc.p42.k1.pke").write_bytes(b"cipher") - - rows = _list_plaintext_rows(str(root)) - assert len(rows) == 1 - assert rows[0]["relpath"] == "a.txt" - assert rows[0]["size_bytes"] == 1 - assert rows[0]["size_label"].endswith("B") - - -def test_asset_lifecycle_state_machine_classification() -> None: - assert _asset_lifecycle_state(mapping_count=0, blocked_count=0) == "eligible" - assert _asset_lifecycle_state(mapping_count=2, blocked_count=0) == "encrypted_live" - assert _asset_lifecycle_state(mapping_count=3, blocked_count=1) == "encrypted_partial" - assert _asset_lifecycle_state(mapping_count=4, blocked_count=4) == "encrypted_blocked" - - -def test_assets_page_renders_state_machine_workflow(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "assets" - root.mkdir() - (root / "one.txt").write_text("1", encoding="utf-8") - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root)) - - app = create_app() - client = app.test_client() - resp = client.get("/assets") - html = resp.data.decode("utf-8") - - assert resp.status_code == 200 - assert "Asset Workflow" in html - assert "Single Lifecycle Flow" in html - assert "id=\"upload_btn\"" in html - assert "id=\"eligible_list\"" in html - assert "id=\"encrypt_btn\"" in html - assert "id=\"wipe_btn\"" in html - assert "id=\"combo_quick\"" in html - assert "const INITIAL_STATE" in html - assert "/api/assets/workflow/upload" in html - assert "/api/assets/workflow/encrypt" in html - assert "/api/assets/workflow/decrypt" in html - - -def test_assets_workflow_api_shows_quick_key_combo_options(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "assets" - root.mkdir() - (root / "one.txt").write_text("1", encoding="utf-8") - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root)) - - app = create_app() - client = app.test_client() - client.post( - "/derive", - data={"provider_id": "42", "file_time_id": "999", "purpose": "seed combo"}, - follow_redirects=True, - ) - payload = client.get("/api/assets/workflow").get_json() - combos = payload["state"]["key_combo_options"] - assert any(item["provider_id"] == 42 and item["file_time_id"] == 999 for item in combos) - labels = [item["label"] for item in combos] - assert any("Provider 42 | Key 999 | active" in label for label in labels) - - -def test_asset_workflow_encrypt_api_requires_selection(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "assets" - root.mkdir() - (root / "one.txt").write_text("1", encoding="utf-8") - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root)) - - app = create_app() - client = app.test_client() - resp = client.post( - "/api/assets/workflow/encrypt", - json={"provider_id": 42, "file_time_id": 7, "purpose": "none selected", "plaintext_relpaths": []}, - ) - assert resp.status_code == 400 - assert "select existing files or upload files before encrypting" in resp.get_json()["error"] - - -def test_asset_upload_duplicate_filename_is_versioned(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "assets" - root.mkdir() - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root)) - - app = create_app() - client = app.test_client() - client.post( - "/assets/upload", - data={"target_subdir": "docs", "files": [(BytesIO(b"a"), "dup.txt")]}, - content_type="multipart/form-data", - follow_redirects=True, - ) - client.post( - "/assets/upload", - data={"target_subdir": "docs", "files": [(BytesIO(b"b"), "dup.txt")]}, - content_type="multipart/form-data", - follow_redirects=True, - ) - - assert (root / "docs" / "dup.txt").is_file() - assert (root / "docs" / "dup.v2.txt").is_file() - - -def test_asset_page_renders_blocked_and_glow_mappings(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "assets" - root.mkdir() - (root / "a.txt").write_text("alpha", encoding="utf-8") - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root)) - - app = create_app() - client = app.test_client() - client.post( - "/assets/encrypt", - data={ - "plaintext_relpaths": ["a.txt"], - "provider_id": "42", - "file_time_id": "100", - "purpose": "a via p42", - }, - follow_redirects=True, - ) - client.post( - "/assets/encrypt", - data={ - "plaintext_relpaths": ["a.txt"], - "provider_id": "17", - "file_time_id": "200", - "purpose": "a via p17", - }, - follow_redirects=True, - ) - client.post("/puncture", data={"provider_id": "42", "file_time_id": "100"}, follow_redirects=True) - - payload = client.get("/api/assets/workflow").get_json()["state"] - file_map = {row["plaintext_relpath"]: row for row in payload["asset_files"]} - rows = {(r["provider_id"], r["file_time_id"]): r for r in file_map["a.txt"]["mappings"]} - assert rows[(42, 100)]["show_red"] is True - assert rows[(17, 200)]["show_glow"] is True - - -def test_asset_workflow_upload_api_makes_files_immediately_eligible(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "assets_upload_eligible" - root.mkdir() - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root)) - - app = create_app() - client = app.test_client() - resp = client.post( - "/api/assets/workflow/upload", - data={"target_subdir": "incoming", "files": [(BytesIO(b"hello"), "a.txt"), (BytesIO(b"world"), "b.txt")]}, - content_type="multipart/form-data", - ) - assert resp.status_code == 200 - data = resp.get_json() - assert data["ok"] is True - assert sorted(data["uploaded"]) == ["incoming/a.txt", "incoming/b.txt"] - assert (root / "incoming" / "a.txt").is_file() - assert (root / "incoming" / "b.txt").is_file() - - states = {row["relpath"]: row["lifecycle_state"] for row in data["state"]["files"]} - assert states["incoming/a.txt"] == "eligible" - assert states["incoming/b.txt"] == "eligible" - - -def test_asset_workflow_encrypt_api_saves_ciphertext_immediately(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "assets_encrypt_now" - root.mkdir() - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root)) - - app = create_app() - client = app.test_client() - upload = client.post( - "/api/assets/workflow/upload", - data={"target_subdir": "x", "files": [(BytesIO(b"alpha"), "one.txt")]}, - content_type="multipart/form-data", - ).get_json() - assert upload["ok"] is True - - enc = client.post( - "/api/assets/workflow/encrypt", - json={ - "provider_id": 17, - "file_time_id": 987, - "purpose": "encrypt now", - "plaintext_relpaths": ["x/one.txt"], - }, - ) - assert enc.status_code == 200 - data = enc.get_json() - assert data["ok"] is True - assert (root / "x" / "one.txt.enc.p17.k987.pke").is_file() - - file_states = {row["relpath"]: row for row in data["state"]["files"]} - assert file_states["x/one.txt"]["lifecycle_state"] == "encrypted_live" - assert file_states["x/one.txt"]["mapping_count"] == 1 - - -def test_asset_workflow_clear_api_resets_saved_inputs(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "assets_wipe" - root.mkdir() - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root)) - - app = create_app() - client = app.test_client() - client.post( - "/api/assets/workflow/encrypt", - json={ - "provider_id": 42, - "file_time_id": 111, - "purpose": "will fail no files but sets no state", - "plaintext_relpaths": [], - }, - ) - clear = client.post("/api/assets/workflow/clear") - assert clear.status_code == 200 - payload = clear.get_json() - assert payload["ok"] is True - assert payload["state"]["last_inputs"]["provider_id"] == 42 - assert payload["state"]["last_inputs"]["file_time_id"] == 123456 - - -def test_index_shows_frontier_and_removes_share_step(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "assets_frontier_index" - root.mkdir() - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root)) - - app = create_app() - client = app.test_client() - resp = client.get("/") - html = resp.data.decode("utf-8") - - assert resp.status_code == 200 - assert "Current Active Roots (Frontier)" in html - assert "Tree/Subtree Visualization" in html - assert "No puncture yet. Root frontier covers the full derivation space." in html - assert "Step 1: Backup Shares (2-of-3)" not in html - assert html.find("Current Active Roots (Frontier)") < html.find("Quick Start Walkthrough") - - -def test_api_state_includes_frontier_and_excludes_share_fields(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "assets_frontier_api" - root.mkdir() - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root)) - - app = create_app() - client = app.test_client() - payload = client.get("/api/state").get_json() - - assert payload["active_prefixes"] == [""] - assert len(payload["active_frontier"]) == 1 - assert payload["active_frontier"][0]["is_root"] is True - assert "seed_shares" not in payload - assert "shares_acknowledged" not in payload - - -def test_frontier_moves_from_root_after_puncture(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "assets_frontier_puncture" - root.mkdir() - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root)) - - app = create_app() - client = app.test_client() - - before = client.get("/api/state").get_json() - assert before["active_prefixes"] == [""] - - client.post("/puncture", data={"provider_id": "42", "file_time_id": "123456"}, follow_redirects=True) - after = client.get("/api/state").get_json() - - assert "" not in after["active_prefixes"] - assert len(after["active_prefixes"]) > 1 - assert all(row["depth"] > 0 for row in after["active_frontier"]) - - -def test_api_state_tracks_last_puncture_frontier_diff(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "assets_last_diff" - root.mkdir() - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root)) - - app = create_app() - client = app.test_client() - client.post("/puncture", data={"provider_id": "42", "file_time_id": "123456"}, follow_redirects=True) - payload = client.get("/api/state").get_json() - - diff = payload["last_puncture_diff"] - assert diff["target_kind"] == "tag" - assert diff["target"] == "01010100000000011110001001000000" - assert "" in diff["removed"] - assert isinstance(diff["added"], list) - - -def test_asset_workflow_decrypt_api_restores_cleartext(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "assets_decrypt_ok" - root.mkdir() - (root / "p.txt").write_text("plain-alpha", encoding="utf-8") - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root)) - - app = create_app() - client = app.test_client() - enc = client.post( - "/api/assets/workflow/encrypt", - json={ - "provider_id": 42, - "file_time_id": 456, - "purpose": "decrypt-test", - "plaintext_relpaths": ["p.txt"], - }, - ).get_json() - mapping = enc["state"]["asset_files"][0]["mappings"][0] - record_id = int(mapping["record_id"]) - - dec = client.post("/api/assets/workflow/decrypt", json={"record_ids": [record_id]}) - assert dec.status_code == 200 - payload = dec.get_json() - assert payload["ok"] is True - restored = payload["restored"][0]["decrypted_relpath"] - assert (root / restored).is_file() - assert (root / restored).read_text(encoding="utf-8") == "plain-alpha" - - -def test_asset_workflow_decrypt_fails_when_key_punctured(monkeypatch, tmp_path: Path) -> None: - root = tmp_path / "assets_decrypt_blocked" - root.mkdir() - (root / "q.txt").write_text("plain-beta", encoding="utf-8") - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root)) - - app = create_app() - client = app.test_client() - enc = client.post( - "/api/assets/workflow/encrypt", - json={ - "provider_id": 17, - "file_time_id": 700, - "purpose": "puncture-then-decrypt", - "plaintext_relpaths": ["q.txt"], - }, - ).get_json() - mapping = enc["state"]["asset_files"][0]["mappings"][0] - record_id = int(mapping["record_id"]) - - client.post("/puncture", data={"provider_id": "17", "file_time_id": "700"}, follow_redirects=True) - dec = client.post("/api/assets/workflow/decrypt", json={"record_ids": [record_id]}) - assert dec.status_code == 400 - assert "punctured" in dec.get_json()["error"] diff --git a/tests/test_master_asset_mapping.py b/tests/test_master_asset_mapping.py deleted file mode 100644 index 4a71b79..0000000 --- a/tests/test_master_asset_mapping.py +++ /dev/null @@ -1,157 +0,0 @@ -from pathlib import Path -from io import BytesIO - -from puncture.web_app import create_app - - -def test_asset_mapping_status_red_and_glow(monkeypatch, tmp_path: Path) -> None: - asset_root = tmp_path / "assets" - asset_root.mkdir() - (asset_root / "a.txt").write_text("alpha", encoding="utf-8") - (asset_root / "b.txt").write_text("beta", encoding="utf-8") - - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(asset_root)) - - app = create_app() - client = app.test_client() - - client.post( - "/assets/encrypt", - data={ - "plaintext_relpath": "a.txt", - "provider_id": "42", - "file_time_id": "100", - "purpose": "a via p42", - }, - ) - client.post( - "/assets/encrypt", - data={ - "plaintext_relpath": "a.txt", - "provider_id": "17", - "file_time_id": "200", - "purpose": "a via p17", - }, - ) - client.post( - "/assets/encrypt", - data={ - "plaintext_relpath": "b.txt", - "provider_id": "42", - "file_time_id": "100", - "purpose": "b via p42", - }, - ) - - # Puncture shared key (provider 42 / key 100) used by two files. - client.post("/puncture", data={"provider_id": "42", "file_time_id": "100"}, follow_redirects=True) - - live = client.get("/api/live/state").get_json() - files = {row["plaintext_relpath"]: row for row in live["assets"]["asset_files"]} - - file_a = files["a.txt"] - rows_a = {(r["provider_id"], r["file_time_id"]): r for r in file_a["mappings"]} - assert rows_a[(42, 100)]["show_red"] is True - assert rows_a[(42, 100)]["is_accessible"] is False - assert rows_a[(17, 200)]["show_glow"] is True - assert rows_a[(17, 200)]["is_accessible"] is True - - file_b = files["b.txt"] - rows_b = {(r["provider_id"], r["file_time_id"]): r for r in file_b["mappings"]} - assert rows_b[(42, 100)]["show_red"] is True - assert rows_b[(42, 100)]["is_accessible"] is False - - -def test_remote_puncture_provider_endpoint_requires_token(monkeypatch, tmp_path: Path) -> None: - asset_root = tmp_path / "assets2" - asset_root.mkdir() - (asset_root / "c.txt").write_text("content", encoding="utf-8") - - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(asset_root)) - monkeypatch.setenv("PUNCTURE_REMOTE_TOKEN", "tok") - - app = create_app() - client = app.test_client() - - # Derive once pre-kill to verify accessibility. - resp_pre = client.post( - "/derive", - data={"provider_id": "42", "file_time_id": "300", "purpose": "pre"}, - follow_redirects=True, - ) - assert b"Derive succeeded" in resp_pre.data - - denied = client.post("/api/remote/puncture-provider", json={"provider_id": 42}) - assert denied.status_code == 403 - - allowed = client.post( - "/api/remote/puncture-provider", - json={"provider_id": 42}, - headers={"X-Puncture-Token": "tok"}, - ) - assert allowed.status_code == 200 - assert allowed.get_json()["ok"] is True - - resp_post = client.post( - "/derive", - data={"provider_id": "42", "file_time_id": "300", "purpose": "post"}, - follow_redirects=True, - ) - assert b"Derive blocked" in resp_post.data - - -def test_asset_page_can_encrypt_multiple_selected_files(monkeypatch, tmp_path: Path) -> None: - asset_root = tmp_path / "assets3" - asset_root.mkdir() - (asset_root / "f1.txt").write_text("f1", encoding="utf-8") - (asset_root / "f2.txt").write_text("f2", encoding="utf-8") - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(asset_root)) - - app = create_app() - client = app.test_client() - - client.post( - "/assets/encrypt", - data={ - "plaintext_relpaths": ["f1.txt", "f2.txt"], - "provider_id": "42", - "file_time_id": "444", - "purpose": "batch", - }, - follow_redirects=True, - ) - - ciphers = sorted(asset_root.glob("*.pke")) - assert len(ciphers) == 2 - assert any(".enc.p42.k444.pke" in p.name for p in ciphers) - - live = client.get("/api/live/state").get_json() - assert live["assets"]["mapping_count"] == 2 - files = {row["plaintext_relpath"]: row for row in live["assets"]["asset_files"]} - assert "f1.txt" in files - assert "f2.txt" in files - - -def test_asset_upload_persists_cleartext_files(monkeypatch, tmp_path: Path) -> None: - asset_root = tmp_path / "assets4" - asset_root.mkdir() - monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(asset_root)) - - app = create_app() - client = app.test_client() - - resp = client.post( - "/assets/upload", - data={ - "target_subdir": "docs", - "files": [ - (BytesIO(b"alpha"), "a.txt"), - (BytesIO(b"beta"), "b.txt"), - ], - }, - content_type="multipart/form-data", - follow_redirects=True, - ) - assert resp.status_code == 200 - assert (asset_root / "docs" / "a.txt").is_file() - assert (asset_root / "docs" / "b.txt").is_file() diff --git a/tests/test_puncture_manager.py b/tests/test_puncture_manager.py deleted file mode 100644 index 0f0b95a..0000000 --- a/tests/test_puncture_manager.py +++ /dev/null @@ -1,90 +0,0 @@ -from puncture.key_manager import ( - PATH_BITS, - PuncturableKeyManager, - provider_id_to_prefix, - tag_to_binary_path, -) - - -def test_mapping_and_derivation_roundtrip() -> None: - manager = PuncturableKeyManager(master_seed=b"\x11" * 32) - path = tag_to_binary_path(42, 123) - key = manager.get_key_for_tag(path) - assert key is not None - assert len(key) == 32 - - -def test_scenario_a_provider_42_then_puncture() -> None: - manager = PuncturableKeyManager(master_seed=b"\x22" * 32) - path = tag_to_binary_path(42, 999) - - key_before = manager.get_key_for_tag(path) - assert key_before is not None - - punctured = manager.puncture(path) - assert punctured is True - assert manager.get_key_for_tag(path) is None - - -def test_non_target_paths_still_accessible_after_puncture() -> None: - manager = PuncturableKeyManager(master_seed=b"\x33" * 32) - target = tag_to_binary_path(42, 12345) - other = tag_to_binary_path(42, 12346) - - other_before = manager.get_key_for_tag(other) - manager.puncture(target) - other_after = manager.get_key_for_tag(other) - - assert manager.get_key_for_tag(target) is None - assert other_before == other_after - - -def test_minimal_copath_replacement_from_root() -> None: - manager = PuncturableKeyManager(master_seed=b"\x44" * 32) - path = tag_to_binary_path(1, 1) - assert manager.active_node_count == 1 - - manager.puncture(path) - - # Remove root (1) and add one sibling node per level (32). - assert manager.active_node_count == PATH_BITS - - -def test_scenario_b_seized_state_cannot_recover_punctured() -> None: - manager = PuncturableKeyManager(master_seed=b"\x55" * 32) - punctured = tag_to_binary_path(42, 2024) - control = tag_to_binary_path(42, 2025) - - control_before = manager.get_key_for_tag(control) - manager.puncture(punctured) - - seized = PuncturableKeyManager.from_state(manager.export_state()) - assert seized.get_key_for_tag(punctured) is None - assert seized.get_key_for_tag(control) == control_before - - -def test_provider_puncture_blocks_all_provider_keys() -> None: - manager = PuncturableKeyManager(master_seed=b"\x66" * 32) - p42_a = tag_to_binary_path(42, 100) - p42_b = tag_to_binary_path(42, 101) - p41 = tag_to_binary_path(41, 100) - - p41_before = manager.get_key_for_tag(p41) - assert manager.puncture_provider(42) is True - - assert manager.get_key_for_tag(p42_a) is None - assert manager.get_key_for_tag(p42_b) is None - assert manager.get_key_for_tag(p41) == p41_before - - -def test_prefix_puncture_log_replay() -> None: - seed = b"\x77" * 32 - source = PuncturableKeyManager(master_seed=seed) - target = PuncturableKeyManager(master_seed=seed) - - provider_prefix = provider_id_to_prefix(42) - source.puncture_prefix(provider_prefix) - - applied = target.apply_puncture_log(source.puncture_log()) - assert applied == 1 - assert target.get_key_for_tag(tag_to_binary_path(42, 1234)) is None diff --git a/tests/test_view_app_policy.py b/tests/test_view_app_policy.py deleted file mode 100644 index 743e478..0000000 --- a/tests/test_view_app_policy.py +++ /dev/null @@ -1,92 +0,0 @@ -from puncture import view_app - - -def _sample_live_state() -> dict: - return { - "generated_at": "2026-02-12 19:00:00 UTC", - "providers": [ - { - "provider_id": 42, - "name": "Provider 42", - "description": "Demo", - "prefix": "0101010", - "derived_count": 1, - "punctured_count": 0, - "key_count": 1, - } - ], - "key_journal": [ - { - "provider_id": 42, - "file_time_id": 1001, - "path_provider": "0101010", - "path_resource": "0000000000000001111101001", - "derive_count": 1, - "puncture_count": 0, - "description": "alpha", - "ever_punctured": False, - } - ], - "assets": { - "mapping_count": 1, - "blocked_count": 0, - "glow_count": 0, - "asset_files": [ - { - "plaintext_relpath": "docs/a.txt", - "mapping_count": 1, - "blocked_count": 0, - "mappings": [ - { - "provider_id": 42, - "file_time_id": 1001, - "ciphertext_relpath": "docs/a.txt.enc.p42.k1001.pke", - "path_provider": "0101010", - "path_resource": "0000000000000001111101001", - "is_accessible": True, - "show_red": False, - "show_glow": False, - } - ], - } - ], - "key_cards": [], - }, - } - - -def test_secondary_login_success(monkeypatch) -> None: - monkeypatch.setenv("PUNCTURE_SECONDARY_PASSWORD", "secret") - monkeypatch.setattr(view_app, "_fetch_master_state", lambda: _sample_live_state()) - - app = view_app.create_app() - client = app.test_client() - - resp = client.post("/login", data={"password": "secret"}, follow_redirects=True) - assert resp.status_code == 200 - assert b"Secondary Live Viewer" in resp.data - - api_resp = client.get("/api/state") - assert api_resp.status_code == 200 - assert api_resp.get_json()["ok"] is True - - -def test_secondary_kill_switch_password_triggers_remote_puncture(monkeypatch) -> None: - monkeypatch.setenv("PUNCTURE_SECONDARY_PASSWORD", "secret") - monkeypatch.setattr(view_app, "_fetch_master_state", lambda: _sample_live_state()) - - called: dict[str, int] = {} - - def _fake_remote(provider_id: int) -> dict: - called["provider_id"] = provider_id - return {"ok": True, "provider_id": provider_id} - - monkeypatch.setattr(view_app, "_remote_puncture_provider", _fake_remote) - - app = view_app.create_app() - client = app.test_client() - - resp = client.post("/login", data={"password": "secret42"}, follow_redirects=True) - assert resp.status_code == 200 - assert called["provider_id"] == 42 - assert b"Kill switch activated" in resp.data diff --git a/tests/test_view_sync.py b/tests/test_view_sync.py deleted file mode 100644 index 96738c5..0000000 --- a/tests/test_view_sync.py +++ /dev/null @@ -1,88 +0,0 @@ -from puncture.view_sync import ( - build_view_payload, - extract_view_payload, - sign_payload, - verify_payload_signature, - wrap_view_bundle, -) - - -def _sample_system() -> dict: - return { - "providers": { - 42: { - "provider_id": 42, - "name": "Provider 42", - "description": "Demo", - "created_at": "10:00:00 UTC", - } - }, - "key_journal": { - "01010100000000000000000000000001": { - "provider_id": 42, - "file_time_id": 1, - "path": "01010100000000000000000000000001", - "description": "active", - "ever_derived": True, - "ever_punctured": False, - "derive_count": 1, - "puncture_count": 0, - "last_derived_at": "10:01:00 UTC", - "last_punctured_at": None, - }, - "01010100000000000000000000000010": { - "provider_id": 42, - "file_time_id": 2, - "path": "01010100000000000000000000000010", - "description": "punctured", - "ever_derived": True, - "ever_punctured": True, - "derive_count": 1, - "puncture_count": 1, - "last_derived_at": "10:02:00 UTC", - "last_punctured_at": "10:03:00 UTC", - }, - "01010100000000000000000000000011": { - "provider_id": 42, - "file_time_id": 3, - "path": "01010100000000000000000000000011", - "description": "never derived", - "ever_derived": False, - "ever_punctured": False, - "derive_count": 0, - "puncture_count": 0, - "last_derived_at": None, - "last_punctured_at": None, - }, - }, - "deleted_providers": [], - } - - -def test_build_view_payload_allows_only_derived_non_punctured() -> None: - payload = build_view_payload(_sample_system(), puncture_log=["0101010"]) - assert payload["allowed_paths"] == ["01010100000000000000000000000001"] - assert payload["puncture_log"] == ["0101010"] - assert len(payload["known_keys"]) == 3 - - -def test_sign_and_verify_bundle() -> None: - payload = build_view_payload(_sample_system(), puncture_log=[]) - key = "sync-secret" - signature = sign_payload(payload, key) - assert verify_payload_signature(payload, signature, key) - - wrapped = wrap_view_bundle(payload, key) - extracted = extract_view_payload(wrapped, sync_key=key, require_signature=True) - assert extracted["allowed_paths"] == payload["allowed_paths"] - - -def test_extract_rejects_bad_signature() -> None: - payload = build_view_payload(_sample_system(), puncture_log=[]) - wrapped = {"payload": payload, "hmac_sha256": "deadbeef", "signed": True} - - try: - extract_view_payload(wrapped, sync_key="sync-secret", require_signature=True) - assert False, "expected signature verification failure" - except ValueError as exc: - assert "signature" in str(exc).lower()