mirror of
https://github.com/saymrwulf/puncture.git
synced 2026-06-11 00:49:38 +00:00
450 lines
15 KiB
Python
450 lines
15 KiB
Python
import hashlib
|
|
import hmac
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
|
|
from puncture.web_app import (
|
|
ENC_MAGIC,
|
|
ENC_NONCE_SIZE,
|
|
_asset_abs_path,
|
|
_asset_lifecycle_state,
|
|
_decrypt_blob,
|
|
_encrypt_blob,
|
|
_list_plaintext_rows,
|
|
_next_ciphertext_relpath,
|
|
_next_plaintext_relpath,
|
|
_normalize_relpath,
|
|
create_app,
|
|
)
|
|
|
|
|
|
def test_normalize_relpath_rejects_absolute_and_traversal() -> None:
|
|
assert _normalize_relpath("docs/a.txt") == "docs/a.txt"
|
|
|
|
try:
|
|
_normalize_relpath("/etc/passwd")
|
|
assert False, "absolute paths must fail"
|
|
except ValueError:
|
|
pass
|
|
|
|
try:
|
|
_normalize_relpath("../secret.txt")
|
|
assert False, "path traversal must fail"
|
|
except ValueError:
|
|
pass
|
|
|
|
|
|
def test_asset_abs_path_stays_inside_root(tmp_path: Path) -> None:
|
|
root = tmp_path / "assets"
|
|
root.mkdir()
|
|
|
|
ok = _asset_abs_path(str(root), "a/b.txt")
|
|
assert ok.startswith(str(root))
|
|
|
|
try:
|
|
_asset_abs_path(str(root), "../escape.txt")
|
|
assert False, "escape should fail"
|
|
except ValueError:
|
|
pass
|
|
|
|
|
|
def test_next_plaintext_relpath_versions_collisions(tmp_path: Path) -> None:
|
|
root = tmp_path / "assets"
|
|
(root / "docs").mkdir(parents=True)
|
|
(root / "docs" / "a.txt").write_text("x", encoding="utf-8")
|
|
(root / "docs" / "a.v2.txt").write_text("x", encoding="utf-8")
|
|
|
|
candidate = _next_plaintext_relpath(str(root), "docs/a.txt")
|
|
assert candidate == "docs/a.v3.txt"
|
|
|
|
|
|
def test_next_ciphertext_relpath_versions_collisions(tmp_path: Path) -> None:
|
|
root = tmp_path / "assets"
|
|
(root / "docs").mkdir(parents=True)
|
|
first = _next_ciphertext_relpath(str(root), "docs/a.txt", 42, 123)
|
|
assert first == "docs/a.txt.enc.p42.k123.pke"
|
|
(root / first).write_bytes(b"x")
|
|
second = _next_ciphertext_relpath(str(root), "docs/a.txt", 42, 123)
|
|
assert second == "docs/a.txt.enc.p42.k123.v2.pke"
|
|
|
|
|
|
def test_encrypt_blob_has_expected_format_and_tag() -> None:
|
|
key = b"k" * 32
|
|
plaintext = b"hello-world"
|
|
blob = _encrypt_blob(key, plaintext)
|
|
|
|
assert blob.startswith(ENC_MAGIC)
|
|
nonce = blob[len(ENC_MAGIC) : len(ENC_MAGIC) + ENC_NONCE_SIZE]
|
|
tag = blob[len(ENC_MAGIC) + ENC_NONCE_SIZE : len(ENC_MAGIC) + ENC_NONCE_SIZE + 32]
|
|
ciphertext = blob[len(ENC_MAGIC) + ENC_NONCE_SIZE + 32 :]
|
|
|
|
assert len(nonce) == ENC_NONCE_SIZE
|
|
assert len(tag) == 32
|
|
assert len(ciphertext) == len(plaintext)
|
|
assert ciphertext != plaintext
|
|
|
|
expected_tag = hmac.new(key, b"TAG" + nonce + ciphertext, hashlib.sha256).digest()
|
|
assert tag == expected_tag
|
|
|
|
|
|
def test_decrypt_blob_roundtrip_and_authentication() -> None:
|
|
key = b"z" * 32
|
|
plaintext = b"secret-content"
|
|
blob = _encrypt_blob(key, plaintext)
|
|
assert _decrypt_blob(key, blob) == plaintext
|
|
|
|
tampered = bytearray(blob)
|
|
tampered[-1] ^= 0xFF
|
|
try:
|
|
_decrypt_blob(key, bytes(tampered))
|
|
assert False, "tampering should fail authentication"
|
|
except ValueError as exc:
|
|
assert "authentication" in str(exc)
|
|
|
|
|
|
def test_list_plaintext_rows_excludes_ciphertexts(tmp_path: Path) -> None:
|
|
root = tmp_path / "assets"
|
|
root.mkdir()
|
|
(root / "a.txt").write_text("a", encoding="utf-8")
|
|
(root / "a.txt.enc.p42.k1.pke").write_bytes(b"cipher")
|
|
|
|
rows = _list_plaintext_rows(str(root))
|
|
assert len(rows) == 1
|
|
assert rows[0]["relpath"] == "a.txt"
|
|
assert rows[0]["size_bytes"] == 1
|
|
assert rows[0]["size_label"].endswith("B")
|
|
|
|
|
|
def test_asset_lifecycle_state_machine_classification() -> None:
|
|
assert _asset_lifecycle_state(mapping_count=0, blocked_count=0) == "eligible"
|
|
assert _asset_lifecycle_state(mapping_count=2, blocked_count=0) == "encrypted_live"
|
|
assert _asset_lifecycle_state(mapping_count=3, blocked_count=1) == "encrypted_partial"
|
|
assert _asset_lifecycle_state(mapping_count=4, blocked_count=4) == "encrypted_blocked"
|
|
|
|
|
|
def test_assets_page_renders_state_machine_workflow(monkeypatch, tmp_path: Path) -> None:
|
|
root = tmp_path / "assets"
|
|
root.mkdir()
|
|
(root / "one.txt").write_text("1", encoding="utf-8")
|
|
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
|
|
|
|
app = create_app()
|
|
client = app.test_client()
|
|
resp = client.get("/assets")
|
|
html = resp.data.decode("utf-8")
|
|
|
|
assert resp.status_code == 200
|
|
assert "Asset Workflow" in html
|
|
assert "Single Lifecycle Flow" in html
|
|
assert "id=\"upload_btn\"" in html
|
|
assert "id=\"eligible_list\"" in html
|
|
assert "id=\"encrypt_btn\"" in html
|
|
assert "id=\"wipe_btn\"" in html
|
|
assert "id=\"combo_quick\"" in html
|
|
assert "const INITIAL_STATE" in html
|
|
assert "/api/assets/workflow/upload" in html
|
|
assert "/api/assets/workflow/encrypt" in html
|
|
assert "/api/assets/workflow/decrypt" in html
|
|
|
|
|
|
def test_assets_workflow_api_shows_quick_key_combo_options(monkeypatch, tmp_path: Path) -> None:
|
|
root = tmp_path / "assets"
|
|
root.mkdir()
|
|
(root / "one.txt").write_text("1", encoding="utf-8")
|
|
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
|
|
|
|
app = create_app()
|
|
client = app.test_client()
|
|
client.post(
|
|
"/derive",
|
|
data={"provider_id": "42", "file_time_id": "999", "purpose": "seed combo"},
|
|
follow_redirects=True,
|
|
)
|
|
payload = client.get("/api/assets/workflow").get_json()
|
|
combos = payload["state"]["key_combo_options"]
|
|
assert any(item["provider_id"] == 42 and item["file_time_id"] == 999 for item in combos)
|
|
labels = [item["label"] for item in combos]
|
|
assert any("Provider 42 | Key 999 | active" in label for label in labels)
|
|
|
|
|
|
def test_asset_workflow_encrypt_api_requires_selection(monkeypatch, tmp_path: Path) -> None:
|
|
root = tmp_path / "assets"
|
|
root.mkdir()
|
|
(root / "one.txt").write_text("1", encoding="utf-8")
|
|
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
|
|
|
|
app = create_app()
|
|
client = app.test_client()
|
|
resp = client.post(
|
|
"/api/assets/workflow/encrypt",
|
|
json={"provider_id": 42, "file_time_id": 7, "purpose": "none selected", "plaintext_relpaths": []},
|
|
)
|
|
assert resp.status_code == 400
|
|
assert "select existing files or upload files before encrypting" in resp.get_json()["error"]
|
|
|
|
|
|
def test_asset_upload_duplicate_filename_is_versioned(monkeypatch, tmp_path: Path) -> None:
|
|
root = tmp_path / "assets"
|
|
root.mkdir()
|
|
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
|
|
|
|
app = create_app()
|
|
client = app.test_client()
|
|
client.post(
|
|
"/assets/upload",
|
|
data={"target_subdir": "docs", "files": [(BytesIO(b"a"), "dup.txt")]},
|
|
content_type="multipart/form-data",
|
|
follow_redirects=True,
|
|
)
|
|
client.post(
|
|
"/assets/upload",
|
|
data={"target_subdir": "docs", "files": [(BytesIO(b"b"), "dup.txt")]},
|
|
content_type="multipart/form-data",
|
|
follow_redirects=True,
|
|
)
|
|
|
|
assert (root / "docs" / "dup.txt").is_file()
|
|
assert (root / "docs" / "dup.v2.txt").is_file()
|
|
|
|
|
|
def test_asset_page_renders_blocked_and_glow_mappings(monkeypatch, tmp_path: Path) -> None:
|
|
root = tmp_path / "assets"
|
|
root.mkdir()
|
|
(root / "a.txt").write_text("alpha", encoding="utf-8")
|
|
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
|
|
|
|
app = create_app()
|
|
client = app.test_client()
|
|
client.post(
|
|
"/assets/encrypt",
|
|
data={
|
|
"plaintext_relpaths": ["a.txt"],
|
|
"provider_id": "42",
|
|
"file_time_id": "100",
|
|
"purpose": "a via p42",
|
|
},
|
|
follow_redirects=True,
|
|
)
|
|
client.post(
|
|
"/assets/encrypt",
|
|
data={
|
|
"plaintext_relpaths": ["a.txt"],
|
|
"provider_id": "17",
|
|
"file_time_id": "200",
|
|
"purpose": "a via p17",
|
|
},
|
|
follow_redirects=True,
|
|
)
|
|
client.post("/puncture", data={"provider_id": "42", "file_time_id": "100"}, follow_redirects=True)
|
|
|
|
payload = client.get("/api/assets/workflow").get_json()["state"]
|
|
file_map = {row["plaintext_relpath"]: row for row in payload["asset_files"]}
|
|
rows = {(r["provider_id"], r["file_time_id"]): r for r in file_map["a.txt"]["mappings"]}
|
|
assert rows[(42, 100)]["show_red"] is True
|
|
assert rows[(17, 200)]["show_glow"] is True
|
|
|
|
|
|
def test_asset_workflow_upload_api_makes_files_immediately_eligible(monkeypatch, tmp_path: Path) -> None:
|
|
root = tmp_path / "assets_upload_eligible"
|
|
root.mkdir()
|
|
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
|
|
|
|
app = create_app()
|
|
client = app.test_client()
|
|
resp = client.post(
|
|
"/api/assets/workflow/upload",
|
|
data={"target_subdir": "incoming", "files": [(BytesIO(b"hello"), "a.txt"), (BytesIO(b"world"), "b.txt")]},
|
|
content_type="multipart/form-data",
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.get_json()
|
|
assert data["ok"] is True
|
|
assert sorted(data["uploaded"]) == ["incoming/a.txt", "incoming/b.txt"]
|
|
assert (root / "incoming" / "a.txt").is_file()
|
|
assert (root / "incoming" / "b.txt").is_file()
|
|
|
|
states = {row["relpath"]: row["lifecycle_state"] for row in data["state"]["files"]}
|
|
assert states["incoming/a.txt"] == "eligible"
|
|
assert states["incoming/b.txt"] == "eligible"
|
|
|
|
|
|
def test_asset_workflow_encrypt_api_saves_ciphertext_immediately(monkeypatch, tmp_path: Path) -> None:
|
|
root = tmp_path / "assets_encrypt_now"
|
|
root.mkdir()
|
|
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
|
|
|
|
app = create_app()
|
|
client = app.test_client()
|
|
upload = client.post(
|
|
"/api/assets/workflow/upload",
|
|
data={"target_subdir": "x", "files": [(BytesIO(b"alpha"), "one.txt")]},
|
|
content_type="multipart/form-data",
|
|
).get_json()
|
|
assert upload["ok"] is True
|
|
|
|
enc = client.post(
|
|
"/api/assets/workflow/encrypt",
|
|
json={
|
|
"provider_id": 17,
|
|
"file_time_id": 987,
|
|
"purpose": "encrypt now",
|
|
"plaintext_relpaths": ["x/one.txt"],
|
|
},
|
|
)
|
|
assert enc.status_code == 200
|
|
data = enc.get_json()
|
|
assert data["ok"] is True
|
|
assert (root / "x" / "one.txt.enc.p17.k987.pke").is_file()
|
|
|
|
file_states = {row["relpath"]: row for row in data["state"]["files"]}
|
|
assert file_states["x/one.txt"]["lifecycle_state"] == "encrypted_live"
|
|
assert file_states["x/one.txt"]["mapping_count"] == 1
|
|
|
|
|
|
def test_asset_workflow_clear_api_resets_saved_inputs(monkeypatch, tmp_path: Path) -> None:
|
|
root = tmp_path / "assets_wipe"
|
|
root.mkdir()
|
|
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
|
|
|
|
app = create_app()
|
|
client = app.test_client()
|
|
client.post(
|
|
"/api/assets/workflow/encrypt",
|
|
json={
|
|
"provider_id": 42,
|
|
"file_time_id": 111,
|
|
"purpose": "will fail no files but sets no state",
|
|
"plaintext_relpaths": [],
|
|
},
|
|
)
|
|
clear = client.post("/api/assets/workflow/clear")
|
|
assert clear.status_code == 200
|
|
payload = clear.get_json()
|
|
assert payload["ok"] is True
|
|
assert payload["state"]["last_inputs"]["provider_id"] == 42
|
|
assert payload["state"]["last_inputs"]["file_time_id"] == 123456
|
|
|
|
|
|
def test_index_shows_frontier_and_removes_share_step(monkeypatch, tmp_path: Path) -> None:
|
|
root = tmp_path / "assets_frontier_index"
|
|
root.mkdir()
|
|
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
|
|
|
|
app = create_app()
|
|
client = app.test_client()
|
|
resp = client.get("/")
|
|
html = resp.data.decode("utf-8")
|
|
|
|
assert resp.status_code == 200
|
|
assert "Current Active Roots (Frontier)" in html
|
|
assert "Tree/Subtree Visualization" in html
|
|
assert "No puncture yet. Root frontier covers the full derivation space." in html
|
|
assert "Step 1: Backup Shares (2-of-3)" not in html
|
|
assert html.find("Current Active Roots (Frontier)") < html.find("Quick Start Walkthrough")
|
|
|
|
|
|
def test_api_state_includes_frontier_and_excludes_share_fields(monkeypatch, tmp_path: Path) -> None:
|
|
root = tmp_path / "assets_frontier_api"
|
|
root.mkdir()
|
|
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
|
|
|
|
app = create_app()
|
|
client = app.test_client()
|
|
payload = client.get("/api/state").get_json()
|
|
|
|
assert payload["active_prefixes"] == [""]
|
|
assert len(payload["active_frontier"]) == 1
|
|
assert payload["active_frontier"][0]["is_root"] is True
|
|
assert "seed_shares" not in payload
|
|
assert "shares_acknowledged" not in payload
|
|
|
|
|
|
def test_frontier_moves_from_root_after_puncture(monkeypatch, tmp_path: Path) -> None:
|
|
root = tmp_path / "assets_frontier_puncture"
|
|
root.mkdir()
|
|
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
|
|
|
|
app = create_app()
|
|
client = app.test_client()
|
|
|
|
before = client.get("/api/state").get_json()
|
|
assert before["active_prefixes"] == [""]
|
|
|
|
client.post("/puncture", data={"provider_id": "42", "file_time_id": "123456"}, follow_redirects=True)
|
|
after = client.get("/api/state").get_json()
|
|
|
|
assert "" not in after["active_prefixes"]
|
|
assert len(after["active_prefixes"]) > 1
|
|
assert all(row["depth"] > 0 for row in after["active_frontier"])
|
|
|
|
|
|
def test_api_state_tracks_last_puncture_frontier_diff(monkeypatch, tmp_path: Path) -> None:
|
|
root = tmp_path / "assets_last_diff"
|
|
root.mkdir()
|
|
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
|
|
|
|
app = create_app()
|
|
client = app.test_client()
|
|
client.post("/puncture", data={"provider_id": "42", "file_time_id": "123456"}, follow_redirects=True)
|
|
payload = client.get("/api/state").get_json()
|
|
|
|
diff = payload["last_puncture_diff"]
|
|
assert diff["target_kind"] == "tag"
|
|
assert diff["target"] == "01010100000000011110001001000000"
|
|
assert "" in diff["removed"]
|
|
assert isinstance(diff["added"], list)
|
|
|
|
|
|
def test_asset_workflow_decrypt_api_restores_cleartext(monkeypatch, tmp_path: Path) -> None:
|
|
root = tmp_path / "assets_decrypt_ok"
|
|
root.mkdir()
|
|
(root / "p.txt").write_text("plain-alpha", encoding="utf-8")
|
|
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
|
|
|
|
app = create_app()
|
|
client = app.test_client()
|
|
enc = client.post(
|
|
"/api/assets/workflow/encrypt",
|
|
json={
|
|
"provider_id": 42,
|
|
"file_time_id": 456,
|
|
"purpose": "decrypt-test",
|
|
"plaintext_relpaths": ["p.txt"],
|
|
},
|
|
).get_json()
|
|
mapping = enc["state"]["asset_files"][0]["mappings"][0]
|
|
record_id = int(mapping["record_id"])
|
|
|
|
dec = client.post("/api/assets/workflow/decrypt", json={"record_ids": [record_id]})
|
|
assert dec.status_code == 200
|
|
payload = dec.get_json()
|
|
assert payload["ok"] is True
|
|
restored = payload["restored"][0]["decrypted_relpath"]
|
|
assert (root / restored).is_file()
|
|
assert (root / restored).read_text(encoding="utf-8") == "plain-alpha"
|
|
|
|
|
|
def test_asset_workflow_decrypt_fails_when_key_punctured(monkeypatch, tmp_path: Path) -> None:
|
|
root = tmp_path / "assets_decrypt_blocked"
|
|
root.mkdir()
|
|
(root / "q.txt").write_text("plain-beta", encoding="utf-8")
|
|
monkeypatch.setenv("PUNCTURE_ASSET_ROOT", str(root))
|
|
|
|
app = create_app()
|
|
client = app.test_client()
|
|
enc = client.post(
|
|
"/api/assets/workflow/encrypt",
|
|
json={
|
|
"provider_id": 17,
|
|
"file_time_id": 700,
|
|
"purpose": "puncture-then-decrypt",
|
|
"plaintext_relpaths": ["q.txt"],
|
|
},
|
|
).get_json()
|
|
mapping = enc["state"]["asset_files"][0]["mappings"][0]
|
|
record_id = int(mapping["record_id"])
|
|
|
|
client.post("/puncture", data={"provider_id": "17", "file_time_id": "700"}, follow_redirects=True)
|
|
dec = client.post("/api/assets/workflow/decrypt", json={"record_ids": [record_id]})
|
|
assert dec.status_code == 400
|
|
assert "punctured" in dec.get_json()["error"]
|