From f9b8f3457f9c2dfed55d1842d9abf3554898c3d6 Mon Sep 17 00:00:00 2001 From: saymrwulf Date: Sat, 4 Apr 2026 17:39:15 +0200 Subject: [PATCH] =?UTF-8?q?Initial=20commit:=20autoresearch-quantum=20?= =?UTF-8?q?=E2=80=94=20automated=20magic-state=20preparation=20ratchet?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Karpathy-style autoresearch engine for encoded magic-state preparation on the [[4,2,2]] quantum error-detecting code using Qiskit Aer simulation. Five-rung progressive search: baseline -> stability -> transfer -> factory -> Rosenfeld. Smart challenger generation (neighbor walk + random combo + lesson-guided). Machine-readable lesson feedback with per-dimension effects, interaction detection, and cross-rung propagation. Factory throughput scoring. Resumable execution. 21 tests, all passing. --- .gitignore | 6 + README.md | 264 +++++++ THE_STORY.md | 682 ++++++++++++++++++ configs/rungs/rung1.yaml | 73 ++ configs/rungs/rung2.yaml | 73 ++ configs/rungs/rung3.yaml | 77 ++ configs/rungs/rung4.yaml | 73 ++ configs/rungs/rung5.yaml | 71 ++ pyproject.toml | 39 + src/autoresearch_quantum/__init__.py | 6 + src/autoresearch_quantum/__main__.py | 5 + src/autoresearch_quantum/cli.py | 184 +++++ src/autoresearch_quantum/codes/__init__.py | 1 + .../codes/four_two_two.py | 78 ++ src/autoresearch_quantum/config.py | 82 +++ .../execution/__init__.py | 1 + .../execution/analysis.py | 119 +++ .../execution/backends.py | 66 ++ .../execution/hardware.py | 121 ++++ src/autoresearch_quantum/execution/local.py | 178 +++++ .../execution/transfer.py | 62 ++ .../execution/transpile.py | 48 ++ .../experiments/__init__.py | 1 + .../experiments/encoded_magic_state.py | 163 +++++ src/autoresearch_quantum/lessons/__init__.py | 1 + src/autoresearch_quantum/lessons/extractor.py | 152 ++++ src/autoresearch_quantum/lessons/feedback.py | 199 +++++ src/autoresearch_quantum/models.py | 262 +++++++ .../persistence/__init__.py | 1 + src/autoresearch_quantum/persistence/store.py | 135 ++++ src/autoresearch_quantum/ratchet/__init__.py | 1 + src/autoresearch_quantum/ratchet/runner.py | 441 +++++++++++ src/autoresearch_quantum/scoring/__init__.py | 1 + src/autoresearch_quantum/scoring/score.py | 150 ++++ src/autoresearch_quantum/search/__init__.py | 1 + .../search/challengers.py | 52 ++ src/autoresearch_quantum/search/strategies.py | 277 +++++++ tests/test_harness.py | 459 ++++++++++++ 38 files changed, 4605 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 THE_STORY.md create mode 100644 configs/rungs/rung1.yaml create mode 100644 configs/rungs/rung2.yaml create mode 100644 configs/rungs/rung3.yaml create mode 100644 configs/rungs/rung4.yaml create mode 100644 configs/rungs/rung5.yaml create mode 100644 pyproject.toml create mode 100644 src/autoresearch_quantum/__init__.py create mode 100644 src/autoresearch_quantum/__main__.py create mode 100644 src/autoresearch_quantum/cli.py create mode 100644 src/autoresearch_quantum/codes/__init__.py create mode 100644 src/autoresearch_quantum/codes/four_two_two.py create mode 100644 src/autoresearch_quantum/config.py create mode 100644 src/autoresearch_quantum/execution/__init__.py create mode 100644 src/autoresearch_quantum/execution/analysis.py create mode 100644 src/autoresearch_quantum/execution/backends.py create mode 100644 src/autoresearch_quantum/execution/hardware.py create mode 100644 src/autoresearch_quantum/execution/local.py create mode 100644 src/autoresearch_quantum/execution/transfer.py create mode 100644 src/autoresearch_quantum/execution/transpile.py create mode 100644 src/autoresearch_quantum/experiments/__init__.py create mode 100644 src/autoresearch_quantum/experiments/encoded_magic_state.py create mode 100644 src/autoresearch_quantum/lessons/__init__.py create mode 100644 src/autoresearch_quantum/lessons/extractor.py create mode 100644 src/autoresearch_quantum/lessons/feedback.py create mode 100644 src/autoresearch_quantum/models.py create mode 100644 src/autoresearch_quantum/persistence/__init__.py create mode 100644 src/autoresearch_quantum/persistence/store.py create mode 100644 src/autoresearch_quantum/ratchet/__init__.py create mode 100644 src/autoresearch_quantum/ratchet/runner.py create mode 100644 src/autoresearch_quantum/scoring/__init__.py create mode 100644 src/autoresearch_quantum/scoring/score.py create mode 100644 src/autoresearch_quantum/search/__init__.py create mode 100644 src/autoresearch_quantum/search/challengers.py create mode 100644 src/autoresearch_quantum/search/strategies.py create mode 100644 tests/test_harness.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..98905e2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.venv/ +__pycache__/ +.pytest_cache/ +*.pyc +*.egg-info/ +data/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..45b17d0 --- /dev/null +++ b/README.md @@ -0,0 +1,264 @@ +# Autoresearch Quantum + +`autoresearch-quantum` is a Python research harness for a Karpathy-style autoresearch ratchet in quantum experiments: + +- keep an incumbent experiment +- generate challenger experiments +- screen challengers on a cheap tier +- promote only justified challengers to an expensive tier +- replace the incumbent only when the challenger wins on the final criterion +- log every ratchet step +- extract a transferable lesson at the end of each rung + +The first built-in experiment family targets encoded magic-state preparation in the `[[4,2,2]]` code with Qiskit. The framework is designed so the `[[4,2,2]]` rung is not the destination. It is the first rung in a ladder that shifts from best-circuit hunting toward reusable design rules for larger encoded workflows. + +## Project Tree + +```text +autoresearch-quantum/ +├── configs/ +│ └── rungs/ +│ ├── rung1.yaml +│ ├── rung2.yaml +│ ├── rung3.yaml +│ └── rung4.yaml +├── src/ +│ └── autoresearch_quantum/ +│ ├── cli.py +│ ├── config.py +│ ├── models.py +│ ├── codes/ +│ │ └── four_two_two.py +│ ├── experiments/ +│ │ └── encoded_magic_state.py +│ ├── execution/ +│ │ ├── analysis.py +│ │ ├── backends.py +│ │ ├── hardware.py +│ │ ├── local.py +│ │ └── transpile.py +│ ├── lessons/ +│ │ └── extractor.py +│ ├── persistence/ +│ │ └── store.py +│ ├── ratchet/ +│ │ └── runner.py +│ ├── scoring/ +│ │ └── score.py +│ └── search/ +│ └── challengers.py +├── tests/ +├── pyproject.toml +└── README.md +``` + +## Scientific Framing + +### What is optimized + +The harness optimizes an **experiment**, not just a circuit. A spec includes: + +- logical magic-seed construction +- encoder realization +- verification strategy +- postselection rule +- ancilla strategy +- transpilation choices +- backend target and noise proxy +- shot and repeat allocation + +### What is measured + +The default score is: + +```text +score = (usable_magic_quality * acceptance_rate) / total_cost +``` + +with a configurable `usable_magic_quality` assembled from: + +- noisy encoded fidelity proxy +- logical magic witness +- codespace survival / postselection success +- stability under repeated noisy evaluation +- spectator logical alignment + +and a configurable `total_cost` assembled from: + +- two-qubit gate count +- transpiled depth +- total shots consumed +- runtime proxy +- hardware queue proxy + +### Cheap tier vs expensive tier + +Cheap tier: + +- backend-aware transpilation +- noisy Aer evaluation +- density-matrix fidelity when a backend-derived noise model is available +- repeated local runs for stability scoring + +Expensive tier: + +- IBM Runtime execution through `SamplerV2` +- only used when enabled and when cheap-tier promotion thresholds are met +- isolated behind [`hardware.py`](/Users/oho/GitClone/CodexProjects/autoresearch-quantum/src/autoresearch_quantum/execution/hardware.py) + +## Built-In `[[4,2,2]]` Experiment + +The built-in experiment prepares an encoded logical T-state on one logical qubit of the `[[4,2,2]]` code while keeping the spectator logical qubit in `|0⟩`. The code utilities live in [`four_two_two.py`](/Users/oho/GitClone/CodexProjects/autoresearch-quantum/src/autoresearch_quantum/codes/four_two_two.py). + +The harness evaluates: + +- acceptance under optional `ZZZZ` and `XXXX` stabilizer checks +- logical `X` and `Y` witnesses for the encoded magic state +- spectator logical `Z` +- compiled cost after transpilation to a chosen backend target + +This keeps the core scientific distinction explicit: + +- a circuit can be locally good for `[[4,2,2]]` +- a rule is only valuable if it keeps helping across new backends or new rungs + +## Installation + +Create an isolated environment in the project root and install the package: + +```bash +python3 -m venv .venv +. .venv/bin/activate +pip install -e '.[dev]' +``` + +For the optional IBM hardware path: + +```bash +pip install -e '.[hardware,dev]' +``` + +If you want the CLI without installing editable mode, use `PYTHONPATH=src`. + +## How To Run + +### 1. Run a single local experiment + +Use the rung config bootstrap incumbent as-is: + +```bash +PYTHONPATH=src .venv/bin/python -m autoresearch_quantum run-experiment \ + --config configs/rungs/rung1.yaml \ + --store-dir data/demo +``` + +Override individual experiment fields: + +```bash +PYTHONPATH=src .venv/bin/python -m autoresearch_quantum run-experiment \ + --config configs/rungs/rung1.yaml \ + --store-dir data/demo \ + --set verification=z_only \ + --set postselection=z_only \ + --set ancilla_strategy=reused_single +``` + +### 2. Run one ratchet step + +```bash +PYTHONPATH=src .venv/bin/python -m autoresearch_quantum run-step \ + --config configs/rungs/rung1.yaml \ + --store-dir data/demo +``` + +This will: + +- load or bootstrap the incumbent +- generate neighbor challengers from the rung search space +- evaluate every challenger on the cheap tier +- promote only margin-beating challengers if hardware is enabled +- log the step and update the incumbent pointer if a challenger wins + +### 3. Run one full rung + +```bash +PYTHONPATH=src .venv/bin/python -m autoresearch_quantum run-rung \ + --config configs/rungs/rung1.yaml \ + --store-dir data/demo +``` + +Artifacts are persisted under `data/demo/rung_/`: + +- `experiments/*.json` +- `ratchet_steps/*.json` +- `incumbent.json` +- `lesson.json` +- `lesson.md` + +### 4. Run a multi-rung ratchet campaign + +```bash +PYTHONPATH=src .venv/bin/python -m autoresearch_quantum run-ratchet \ + --config configs/rungs/rung1.yaml \ + --config configs/rungs/rung2.yaml \ + --config configs/rungs/rung3.yaml \ + --config configs/rungs/rung4.yaml \ + --store-dir data/campaign +``` + +### 5. Run an optional hardware-backed confirmation + +First install the hardware extra and make IBM credentials available in the usual `qiskit-ibm-runtime` way. The simplest path is to export: + +```bash +export QISKIT_IBM_TOKEN=... +``` + +Then enable the hardware tier in the rung config by setting `tier_policy.enable_hardware: true` and optionally `hardware.backend_name: ibm_brisbane`. + +Run: + +```bash +PYTHONPATH=src .venv/bin/python -m autoresearch_quantum run-step \ + --config configs/rungs/rung1.yaml \ + --store-dir data/hardware \ + --hardware +``` + +Only challengers that beat the incumbent cheap-tier score by `tier_policy.cheap_margin` are promoted. + +## Extending The Ladder + +The intended progression is: + +1. `rung1.yaml` + baseline `[[4,2,2]]` encoded magic-state preparation +2. `rung2.yaml` + same code with stronger stability and backend-awareness +3. `rung3.yaml` + transfer across backend families +4. `rung4.yaml` + factory-style cost pressure + +To add a new rung: + +- create a new YAML in `configs/rungs/` +- narrow the challenger space to the specific next question +- tune cheap and expensive score weights for that rung +- keep the lesson document as the real product + +To add a new experiment family: + +- implement a new builder under `src/autoresearch_quantum/experiments/` +- define the target state, witness operators, verification flow, and logging metadata +- route the ratchet to that experiment family through config or a new CLI selector + +## Notes On Interpretation + +This harness is explicit about proxy vs confirmation: + +- cheap-tier fidelity and witness numbers are local proxies +- hardware runs are scarce and should be treated as confirmation +- the most important artifact of each rung is the lesson, not just the incumbent ID + +That is the intended ratchet: better experiment plus better search rule. diff --git a/THE_STORY.md b/THE_STORY.md new file mode 100644 index 0000000..2bf3f50 --- /dev/null +++ b/THE_STORY.md @@ -0,0 +1,682 @@ +# The Story of autoresearch-quantum + +## What this system does, in one paragraph + +This is a machine that discovers, by itself, the best way to prepare an +encoded magic state on the [[4,2,2]] quantum error-detecting code. You give +it a starting recipe and a search space of alternatives. It runs hundreds of +simulated quantum experiments, scores them, learns which choices help and +which choices hurt, narrows the search, and climbs to the best recipe it can +find -- then hands you a written lesson explaining what it learned and why. +The entire loop -- propose, evaluate, compare, learn, repeat -- runs without +human intervention. That is the "auto" in autoresearch. + + +--- + + +## Part 1: The quantum computing problem + +### 1.1 What is a magic state? + +Fault-tolerant quantum computers need a special ingredient called a **magic +state** to perform the T gate -- the non-Clifford gate that makes quantum +computation universal. You cannot create this state using Clifford operations +alone, so you prepare a noisy approximation and then **distill** it into a +high-fidelity copy. The preparation step is the bottleneck: if your raw magic +states are junk, distillation is expensive or impossible. + +### 1.2 What is the [[4,2,2]] code? + +The [[4,2,2]] code is the smallest quantum error-detecting code. It uses 4 +physical qubits to encode 2 logical qubits. It cannot correct errors, but it +can *detect* them: if an error flips one qubit, the code's stabilizers +(XXXX and ZZZZ) flag it, and you can throw the shot away. This +**postselection** raises quality at the cost of throughput. + +The code has two logical qubits. We use one to carry the magic state and the +other as a **spectator** -- an untouched qubit whose Z-measurement tells us +whether the encoding process corrupted the logical subspace. + +### 1.3 What knobs does this system turn? + +An experiment recipe (called an `ExperimentSpec`) has ~15 tuneable dimensions: + +| Dimension | What it controls | Example values | +|---|---|---| +| `seed_style` | How the raw T-state is prepared on qubit 0 | `h_p`, `ry_rz`, `u_magic` | +| `encoder_style` | How the 4-qubit encoding circuit is built | `cx_chain`, `cz_compiled` | +| `verification` | Which stabilizers are measured before readout | `both`, `z_only`, `x_only`, `none` | +| `postselection` | Which syndrome outcomes cause a shot to be discarded | `all_measured`, `z_only`, `none` | +| `ancilla_strategy` | Whether verification uses 1 reused or 2 dedicated ancillas | `dedicated_pair`, `reused_single` | +| `optimization_level` | Qiskit transpiler aggressiveness | 1, 2, 3 | +| `layout_method` | Physical qubit placement algorithm | `sabre`, `dense` | +| `routing_method` | SWAP insertion algorithm | `sabre`, `basic` | +| `target_backend` | Which IBM device topology to compile for | `fake_brisbane`, `fake_kyoto`, ... | +| `shots` | Samples per circuit | 256 -- 4096 | + +The question the system answers: **Which combination of these choices gives +the highest-quality encoded magic states at the lowest cost?** + +### 1.4 How is each experiment evaluated? + +For each `ExperimentSpec`, the executor: + +1. **Builds four circuits** (`encoded_magic_state.py`): + - `acceptance` -- measures all data qubits in the Z basis after + verification, to compute the postselection acceptance rate. + - `logical_x` -- rotates into the X basis before measurement, to get + `` on the magic-carrying logical qubit. + - `logical_y` -- rotates into the Y basis, to get ``. + - `spectator_z` -- measures the spectator logical qubit in Z, to get + ``. + +2. **Transpiles** them for the target backend's coupling map and basis gates. + +3. **Simulates** them on Qiskit Aer with the backend's calibrated noise model, + repeating the configured number of times with independent random seeds. + +4. **Postselects**: for each shot, checks the syndrome register. Shots where + the stabiliser flagged an error are discarded. What remains is the + postselected ensemble. + +5. **Computes metrics** from the postselected data: + + | Metric | Formula | What it measures | + |---|---|---| + | `logical_magic_witness` | `((1 + (X_L + Y_L)/sqrt(2)) / 2) * ((1 + Z_spectator) / 2)` | Magic-state quality, penalised if spectator is disturbed | + | `acceptance_rate` | `accepted_shots / total_shots` | Throughput (what fraction survives postselection) | + | `stability_score` | `1 - pstdev(repeat_scores) / mean(repeat_scores)` | Consistency across independent repeat runs | + | `noisy_encoded_fidelity` | `Tr(rho_noisy \| target>1 changed field (the + defining property of multi-axis mutation). +- LessonGuided respects "fix" rules: when told to fix `seed_style=ry_rz`, + every generated challenger has that value. +- The composite generator stays within the budget cap. + +### Claim 5: The lesson system extracts correct prefer/avoid/fix rules. + +**Tests**: `test_extract_search_rules_prefer_and_avoid`, +`test_narrow_search_space_removes_avoided`, +`test_build_lesson_feedback_end_to_end` + +Given synthetic experiment records where `z_only` scores 0.80--0.85 and +`both` scores 0.50--0.55, the extractor must emit a "prefer z_only" and +"avoid both" rule. `narrow_search_space` must actually remove avoided values +and constrain fixed dimensions. + +### Claim 6: The factory score function computes throughput metrics. + +**Tests**: `test_factory_throughput_score_produces_metrics`, +`test_score_registry_has_factory` + +Given known input metrics (acceptance 0.70, witness 0.80), verifies that +`factory_throughput_score` produces a positive score, attaches +`factory_metrics` to the `extra` dict, and that `accepted_states_per_shot` +equals the input acceptance rate. + +### Claim 7: Transfer evaluation runs the same spec across backends. + +**Test**: `test_transfer_evaluator_runs_across_backends` + +Runs a transfer evaluation on a single backend (for speed) and checks that a +`TransferReport` is returned with a positive transfer score and the correct +backend key in `per_backend_scores`. + +### Claim 8: Progress and feedback survive serialisation round-trips. + +**Tests**: `test_save_and_load_progress`, +`test_save_and_load_lesson_feedback` + +Writes a `RungProgress` / `LessonFeedback` to disk via the store, reads it +back, and verifies all fields match. If the JSON schema or the +deserialisation logic drifts, this catches it. + +### Claim 9: A full rung saves progress and produces both lesson types. + +**Tests**: `test_run_rung_saves_progress`, +`test_run_rung_returns_lesson_and_feedback` + +Runs a complete rung (bootstrap + steps + lesson extraction) and checks that +`progress.json` exists and is marked `completed`, and that the return value +includes both a human-readable `RungLesson` and a machine-readable +`LessonFeedback`. + +### Claim 10: Multi-rung ratchet propagates winners and accumulates lessons. + +**Test**: `test_run_ratchet_propagates_winner` + +Runs a two-rung ratchet and checks that: +- Both rungs produce (lesson, feedback) tuples. +- `harness._accumulated_lessons` contains entries from both rungs, proving + that rung 2 had access to rung 1's rules when generating challengers. + +### Claim 11: Different specs get different simulator seeds. + +**Test**: `test_different_specs_get_different_seeds` + +The old code used `seed_simulator = 11_000 + repeat_index`, meaning every +spec got the same random stream. The new code hashes the spec's fingerprint +into the seed. This test creates two specs that differ only in `verification` +and checks that their computed seeds are different. + + +--- + + +## Part 4: The file map + +``` +autoresearch-quantum/ + configs/rungs/ + rung1.yaml Baseline: what recipe works at all? + rung2.yaml Stability: does it hold under noise variation? + rung3.yaml Transfer: does it work on other devices? + rung4.yaml Factory: what maximises throughput per cost? + rung5.yaml Rosenfeld: which heuristics are load-bearing? + + src/autoresearch_quantum/ + models.py Every data structure in one file + config.py YAML -> RungConfig parser + cli.py Entry point: run-experiment, run-step, run-rung, + run-ratchet, run-transfer + + codes/ + four_two_two.py The [[4,2,2]] code: stabilizers, logical ops, + encoder circuits, magic seed gates + + experiments/ + encoded_magic_state.py Builds the four-circuit measurement bundle + + execution/ + local.py LocalCheapExecutor: Aer noise simulation + hardware.py IBMHardwareExecutor: real-device SamplerV2 + transfer.py TransferEvaluator: same spec across N backends + analysis.py Postselection, eigenvalues, witness formula + backends.py Backend resolution (fake_* or IBM runtime) + transpile.py Transpilation, gate counting, runtime estimates + + scoring/ + score.py weighted_acceptance_cost + factory_throughput + + search/ + challengers.py GeneratedChallenger, neighbor generation, dedup + strategies.py NeighborWalk, RandomCombo, LessonGuided, + CompositeGenerator + + lessons/ + extractor.py Human-readable RungLesson + machine LessonFeedback + feedback.py SearchRule extraction, interaction detection, + search space narrowing + + ratchet/ + runner.py AutoresearchHarness: the orchestrator + + persistence/ + store.py JSON file store: experiments, steps, progress, + lessons, feedback, propagated specs + + tests/ + test_harness.py 21 tests covering every subsystem + + data/ Output directory (created at runtime) + default/ + rung_1/ + experiments/ One JSON per evaluated spec + ratchet_steps/ One JSON per step + incumbent.json Current best + progress.json Resumability checkpoint + lesson.json Machine-readable lesson + lesson.md Human-readable narrative + lesson_feedback.json SearchRules for the next rung + rung_2/ + propagated_spec.json Winner carried from rung 1 + ... +``` + + +--- + + +## Part 5: How to use it without Claude + +You do not need an AI to run this system or to make progress with its +output. Everything below runs in your terminal. + +### 5.1 Setup + +```bash +cd autoresearch-quantum +python -m venv .venv +source .venv/bin/activate +pip install -e ".[dev]" +``` + +### 5.2 Run a single experiment + +```bash +python -m autoresearch_quantum run-experiment \ + --config configs/rungs/rung1.yaml \ + --set verification=z_only \ + --set seed_style=ry_rz +``` + +This prints a JSON result with the score, failure mode, and experiment ID. +The full record is saved to `data/default/rung_1/experiments/`. + +### 5.3 Run one ratchet step + +```bash +python -m autoresearch_quantum run-step \ + --config configs/rungs/rung1.yaml +``` + +This bootstraps an incumbent (if none exists), generates challengers, evaluates +them, promotes the best, and saves the step record. Run it again and it +generates *new* challengers (never repeating), with a new incumbent if one was +found. + +### 5.4 Run a full rung + +```bash +python -m autoresearch_quantum run-rung \ + --config configs/rungs/rung1.yaml +``` + +Runs up to `step_budget` steps (default 3), stopping early if patience runs +out. Produces `data/default/rung_1/lesson.md` -- read this file. It tells you +what helped, what hurt, what seems invariant, and what to test next. + +### 5.5 Run the full five-rung ratchet + +```bash +python -m autoresearch_quantum run-ratchet \ + --config configs/rungs/rung1.yaml \ + --config configs/rungs/rung2.yaml \ + --config configs/rungs/rung3.yaml \ + --config configs/rungs/rung4.yaml \ + --config configs/rungs/rung5.yaml +``` + +This is the full pipeline. Each rung's winner is automatically propagated to +the next rung. Each rung's lessons narrow the search space for the next. +When it finishes, you have five lesson files and a final optimised recipe. + +### 5.6 Run a transfer evaluation + +```bash +python -m autoresearch_quantum run-transfer \ + --config configs/rungs/rung3.yaml \ + --backends fake_brisbane fake_kyoto fake_sherbrooke +``` + +Tests a single spec across multiple backend noise models. The output tells you +the per-backend scores and the pessimistic transfer score. + +### 5.7 Reading the output + +After a ratchet run, the most valuable artefacts are: + +| File | What to do with it | +|---|---| +| `rung_N/lesson.md` | Read it. It is a structured report. The "What Helped" section tells you which settings to keep. The "What Hurt" section tells you what to stop trying. | +| `rung_N/lesson_feedback.json` | This is the machine-readable version. Open it and look at the `rules` array. Each rule has an `action` (prefer/avoid/fix), a `dimension`, a `value`, a `confidence` (0--1), and a `reason`. | +| `rung_N/incumbent.json` | Contains the `experiment_id` of the current best spec. Load the corresponding file from `experiments/` to see its full spec and scores. | +| `rung_N/propagated_spec.json` | The spec that was carried forward from the previous rung. Compare it with the YAML bootstrap to see what the system changed. | +| `rung_N/progress.json` | If the run was interrupted, this tells you where it left off. Just re-run the same command to resume. | + +### 5.8 Making manual progress with the artefacts + +The system is designed so that you can interleave human intuition with +automated search: + +1. **Read the lesson.** If rung 1 says `verification=z_only` consistently + helps, you now know something about the physics: X-stabiliser checking + adds gate cost without enough quality payoff at this noise level. + +2. **Edit the YAML.** Remove values that the lesson says to avoid. Add new + values you want to explore. Change the weights if you care more about + throughput than fidelity. Save the file and re-run. + +3. **Run single experiments.** If you have a specific hypothesis + ("What if `approximation_degree=0.95` helps?"), test it directly with + `run-experiment --set approximation_degree=0.95`. The result is saved to + the store and will be included in the next lesson extraction. + +4. **Resume interrupted runs.** If your laptop dies mid-rung, just re-run the + same command. Progress is checkpointed after every step. + +5. **Compare across rungs.** Open `rung_1/lesson_feedback.json` and + `rung_3/lesson_feedback.json` side by side. Rules that appear in both with + high confidence are load-bearing. Rules that appear in rung 1 but vanish by + rung 3 were artefacts of the initial noise model. + +6. **Feed results to a new search.** Copy the `best_spec_fields` from + `lesson_feedback.json` into a new YAML config as the bootstrap incumbent. + Define a tighter search space around the winning region. Run another rung. + You are now doing what the system does in `run_ratchet` -- but with human + judgement about what to explore next. + +### 5.9 Running the tests + +```bash +python -m pytest tests/ -v +``` + +All 21 tests should pass. They take about 13 seconds. If a test fails after +you edit a YAML config, the most likely cause is that you introduced a +dimension value that does not correspond to an implemented code path (e.g., +`encoder_style: "rzz_lattice"` does not exist in `four_two_two.py`). + + +--- + + +## Part 6: What this system does NOT do (yet) + +- **It does not run on real quantum hardware by default.** The + `IBMHardwareExecutor` exists and is wired up, but `enable_hardware: false` + in every config. Set it to `true` and provide credentials via the + `QISKIT_IBM_TOKEN` environment variable to use real devices. + +- **It does not do distillation.** Rung 5 (Rosenfeld Direction) identifies + which heuristics matter for factory-style workflows, but it does not + actually build a distillation circuit. That is the next project. + +- **It does not use LLMs in the loop.** The "auto" is algorithmic + (statistical rule extraction + guided search), not generative. There is no + GPT/Claude call inside the ratchet loop. The intelligence is in the + `SearchRule` extraction, the `CompositeGenerator` budget allocation, and + the cross-rung propagation logic. + +- **It does not visualise results.** There is no dashboard. The output is + JSON and Markdown. You read it, or you write a script to plot it. + +- **It does not parallelise evaluations.** Each experiment runs sequentially. + On a machine with multiple cores, you could shard the challenger set across + processes, but that is not implemented. + + +--- + + +## Part 7: Architecture diagram + +``` + configs/rungs/rung1-5.yaml + | + v + +---------+---------+ + | AutoresearchHarness | + | (ratchet/runner.py) | + +---+-----+-----+---+ + | | | + +------------+ | +------------+ + | | | + v v v + CompositeGenerator LocalCheapExecutor ResearchStore + (search/strategies.py) (execution/local.py) (persistence/store.py) + | | | + +----------+------+ | +--------+--------+ + | | | | | | | + v v v v v v v + Neighbor Random Lesson build_circuit save_ save_ save_ + Walk Combo Guided _bundle() exp step progress + | | + v v + LessonFeedback AerSimulator + (lessons/ + noise model + feedback.py) + postselection + + witness + + scoring +``` + +The data flows in a circle: + +``` + Evaluate --> Score --> Compare --> Learn --> Narrow --> Generate --> Evaluate +``` + +That circle is the ratchet step. Each rung runs it multiple times. Each +ratchet runs multiple rungs. The lessons tighten the circle with every pass. + + +--- + +*This document was written on 2026-04-04 to describe the system as built. +The code is the ground truth. If this document contradicts the code, the +code is correct.* diff --git a/configs/rungs/rung1.yaml b/configs/rungs/rung1.yaml new file mode 100644 index 0000000..f9800d1 --- /dev/null +++ b/configs/rungs/rung1.yaml @@ -0,0 +1,73 @@ +rung: 1 +name: "[[4,2,2]] Encoded Magic-State Preparation" +description: "Baseline ratchet over preparation, verification, and postselection choices for the smallest encoded magic-state experiment." +objective: "Maximize acceptance-weighted encoded magic quality for [[4,2,2]] T-state preparation on a backend-aware cheap tier." + +bootstrap_incumbent: + seed_style: h_p + encoder_style: cx_chain + verification: both + postselection: all_measured + ancilla_strategy: dedicated_pair + optimization_level: 2 + layout_method: sabre + routing_method: sabre + approximation_degree: 1.0 + target_backend: fake_brisbane + noise_backend: fake_brisbane + shots: 512 + repeats: 2 + notes: "Bootstrap incumbent for encoded T-state preparation." + +search_space: + max_challengers_per_step: 8 + dimensions: + seed_style: [h_p, ry_rz, u_magic] + encoder_style: [cx_chain, cz_compiled] + verification: [both, z_only, x_only] + postselection: [all_measured, z_only, none] + ancilla_strategy: [dedicated_pair, reused_single] + optimization_level: [1, 2, 3] + +tier_policy: + cheap_margin: 0.002 + confirmation_margin: 0.0 + cheap_shots: 512 + expensive_shots: 1024 + cheap_repeats: 2 + expensive_repeats: 1 + promote_top_k: 2 + enable_hardware: false + confirm_incumbent_on_hardware: true + hardware_budget: 1 + +score: + name: weighted_acceptance_cost + base_cost: 1.0 + cheap_quality: + ideal_fidelity: 0.10 + noisy_fidelity: 0.40 + logical_witness: 0.25 + codespace_rate: 0.15 + stability_score: 0.05 + spectator_alignment: 0.05 + expensive_quality: + logical_witness: 0.55 + codespace_rate: 0.15 + stability_score: 0.20 + spectator_alignment: 0.10 + cost_weights: + two_qubit_count: 0.08 + depth: 0.01 + shot_count: 0.00020 + runtime_estimate: 0.015 + queue_cost_proxy: 0.30 + +step_budget: 3 +patience: 2 + +hardware: + backend_name: + channel: + instance: + token_env_var: QISKIT_IBM_TOKEN diff --git a/configs/rungs/rung2.yaml b/configs/rungs/rung2.yaml new file mode 100644 index 0000000..f6aa2cd --- /dev/null +++ b/configs/rungs/rung2.yaml @@ -0,0 +1,73 @@ +rung: 2 +name: "Backend-Aware Stability Rung" +description: "Same [[4,2,2]] task, but with repeated cheap-tier runs, backend variation, and stronger stability pressure." +objective: "Favor experiment settings that hold score under calibration-like backend changes and repeated noisy evaluation." + +bootstrap_incumbent: + seed_style: h_p + encoder_style: cx_chain + verification: both + postselection: all_measured + ancilla_strategy: dedicated_pair + optimization_level: 3 + layout_method: sabre + routing_method: sabre + approximation_degree: 1.0 + target_backend: fake_kyoto + noise_backend: fake_kyoto + shots: 768 + repeats: 3 + notes: "Stability-focused bootstrap incumbent." + +search_space: + max_challengers_per_step: 8 + dimensions: + target_backend: [fake_kyoto, fake_brisbane, fake_sherbrooke] + noise_backend: [fake_kyoto, fake_brisbane, fake_sherbrooke] + verification: [both, z_only] + postselection: [all_measured, z_only] + optimization_level: [1, 2, 3] + layout_method: [sabre, dense] + routing_method: [sabre, basic] + +tier_policy: + cheap_margin: 0.001 + confirmation_margin: 0.0 + cheap_shots: 768 + expensive_shots: 1536 + cheap_repeats: 3 + expensive_repeats: 1 + promote_top_k: 2 + enable_hardware: false + confirm_incumbent_on_hardware: true + hardware_budget: 1 + +score: + name: weighted_acceptance_cost + base_cost: 1.0 + cheap_quality: + noisy_fidelity: 0.30 + logical_witness: 0.25 + codespace_rate: 0.20 + stability_score: 0.20 + spectator_alignment: 0.05 + expensive_quality: + logical_witness: 0.45 + codespace_rate: 0.15 + stability_score: 0.30 + spectator_alignment: 0.10 + cost_weights: + two_qubit_count: 0.06 + depth: 0.01 + shot_count: 0.00025 + runtime_estimate: 0.02 + queue_cost_proxy: 0.35 + +step_budget: 3 +patience: 2 + +hardware: + backend_name: + channel: + instance: + token_env_var: QISKIT_IBM_TOKEN diff --git a/configs/rungs/rung3.yaml b/configs/rungs/rung3.yaml new file mode 100644 index 0000000..91c0d6b --- /dev/null +++ b/configs/rungs/rung3.yaml @@ -0,0 +1,77 @@ +rung: 3 +name: "Transfer Test Rung" +description: "Keep only the strongest principles from the first two rungs and test them across multiple backend targets." +objective: "Measure which [[4,2,2]] heuristics transfer across backend families rather than overfitting a single noise profile." + +bootstrap_incumbent: + seed_style: ry_rz + encoder_style: cx_chain + verification: z_only + postselection: z_only + ancilla_strategy: reused_single + optimization_level: 3 + layout_method: sabre + routing_method: sabre + approximation_degree: 1.0 + target_backend: fake_sherbrooke + noise_backend: fake_sherbrooke + shots: 768 + repeats: 3 + notes: "Transfer-focused incumbent." + +search_space: + max_challengers_per_step: 6 + dimensions: + target_backend: [fake_sherbrooke, fake_brisbane, fake_kyoto] + noise_backend: [fake_sherbrooke, fake_brisbane, fake_kyoto] + seed_style: [ry_rz, h_p] + verification: [z_only, both] + postselection: [z_only, all_measured] + ancilla_strategy: [reused_single, dedicated_pair] + +tier_policy: + cheap_margin: 0.001 + confirmation_margin: 0.0 + cheap_shots: 768 + expensive_shots: 1536 + cheap_repeats: 3 + expensive_repeats: 1 + promote_top_k: 2 + enable_hardware: false + confirm_incumbent_on_hardware: true + hardware_budget: 1 + +score: + name: weighted_acceptance_cost + base_cost: 1.0 + cheap_quality: + noisy_fidelity: 0.20 + logical_witness: 0.30 + codespace_rate: 0.20 + stability_score: 0.20 + spectator_alignment: 0.10 + expensive_quality: + logical_witness: 0.50 + codespace_rate: 0.15 + stability_score: 0.25 + spectator_alignment: 0.10 + cost_weights: + two_qubit_count: 0.05 + depth: 0.01 + shot_count: 0.00025 + runtime_estimate: 0.02 + queue_cost_proxy: 0.40 + +step_budget: 3 +patience: 2 + +transfer_backends: + - fake_sherbrooke + - fake_brisbane + - fake_kyoto + +hardware: + backend_name: + channel: + instance: + token_env_var: QISKIT_IBM_TOKEN diff --git a/configs/rungs/rung4.yaml b/configs/rungs/rung4.yaml new file mode 100644 index 0000000..98e2a90 --- /dev/null +++ b/configs/rungs/rung4.yaml @@ -0,0 +1,73 @@ +rung: 4 +name: "Factory-Style Cost Rung" +description: "Shift the scalar score away from best-state chasing toward accepted states per cost proxy." +objective: "Optimize accepted encoded magic states per unit cost, using circuit suite cost as a first factory-style proxy." + +bootstrap_incumbent: + seed_style: ry_rz + encoder_style: cx_chain + verification: z_only + postselection: z_only + ancilla_strategy: reused_single + optimization_level: 3 + layout_method: dense + routing_method: basic + approximation_degree: 1.0 + target_backend: fake_brisbane + noise_backend: fake_brisbane + shots: 384 + repeats: 2 + notes: "Throughput-oriented incumbent." + +search_space: + max_challengers_per_step: 6 + dimensions: + verification: [z_only, both, none] + postselection: [z_only, all_measured, none] + ancilla_strategy: [reused_single, dedicated_pair] + optimization_level: [2, 3] + layout_method: [dense, sabre] + routing_method: [basic, sabre] + shots: [256, 384, 512] + +tier_policy: + cheap_margin: 0.001 + confirmation_margin: 0.0 + cheap_shots: 384 + expensive_shots: 1024 + cheap_repeats: 2 + expensive_repeats: 1 + promote_top_k: 1 + enable_hardware: false + confirm_incumbent_on_hardware: true + hardware_budget: 1 + +score: + name: factory_throughput + base_cost: 1.0 + cheap_quality: + noisy_fidelity: 0.15 + logical_witness: 0.25 + codespace_rate: 0.20 + stability_score: 0.10 + spectator_alignment: 0.10 + expensive_quality: + logical_witness: 0.40 + codespace_rate: 0.20 + stability_score: 0.10 + spectator_alignment: 0.10 + cost_weights: + two_qubit_count: 0.10 + depth: 0.02 + shot_count: 0.00040 + runtime_estimate: 0.03 + queue_cost_proxy: 0.50 + +step_budget: 3 +patience: 2 + +hardware: + backend_name: + channel: + instance: + token_env_var: QISKIT_IBM_TOKEN diff --git a/configs/rungs/rung5.yaml b/configs/rungs/rung5.yaml new file mode 100644 index 0000000..2dc8b8b --- /dev/null +++ b/configs/rungs/rung5.yaml @@ -0,0 +1,71 @@ +rung: 5 +name: "Rosenfeld Direction" +description: "Identify which heuristics matter for cultivation/distillation workflows. Narrowest search space; only proven dimensions survive." +objective: "Determine which preparation, verification, and transpilation choices are load-bearing under factory-realistic conditions, as a precursor to distillation pipeline integration." + +bootstrap_incumbent: + seed_style: ry_rz + encoder_style: cx_chain + verification: z_only + postselection: z_only + ancilla_strategy: reused_single + optimization_level: 3 + layout_method: dense + routing_method: basic + approximation_degree: 1.0 + target_backend: fake_brisbane + noise_backend: fake_brisbane + shots: 512 + repeats: 3 + notes: "Rosenfeld-direction incumbent — propagated from rung 4 winner." + +search_space: + max_challengers_per_step: 4 + dimensions: + verification: [z_only, both] + postselection: [z_only, all_measured] + ancilla_strategy: [reused_single, dedicated_pair] + optimization_level: [2, 3] + shots: [384, 512, 768] + +tier_policy: + cheap_margin: 0.0005 + confirmation_margin: 0.0 + cheap_shots: 512 + expensive_shots: 2048 + cheap_repeats: 3 + expensive_repeats: 2 + promote_top_k: 1 + enable_hardware: false + confirm_incumbent_on_hardware: true + hardware_budget: 1 + +score: + name: factory_throughput + base_cost: 1.0 + cheap_quality: + noisy_fidelity: 0.10 + logical_witness: 0.35 + codespace_rate: 0.20 + stability_score: 0.25 + spectator_alignment: 0.10 + expensive_quality: + logical_witness: 0.40 + codespace_rate: 0.20 + stability_score: 0.30 + spectator_alignment: 0.10 + cost_weights: + two_qubit_count: 0.12 + depth: 0.02 + shot_count: 0.00050 + runtime_estimate: 0.04 + queue_cost_proxy: 0.60 + +step_budget: 4 +patience: 3 + +hardware: + backend_name: + channel: + instance: + token_env_var: QISKIT_IBM_TOKEN diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..aa9d4d8 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,39 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "autoresearch-quantum" +version = "0.1.0" +description = "Karpathy-style autoresearch harness for encoded magic-state preparation experiments." +readme = "README.md" +requires-python = ">=3.11" +authors = [ + { name = "Codex" } +] +dependencies = [ + "qiskit>=2.3,<3", + "qiskit-aer>=0.17,<0.18", + "pyyaml>=6,<7", +] + +[project.optional-dependencies] +hardware = [ + "qiskit-ibm-runtime>=0.46,<0.47", +] +dev = [ + "pytest>=9,<10", +] + +[project.scripts] +autoresearch-quantum = "autoresearch_quantum.cli:main" + +[tool.setuptools] +package-dir = {"" = "src"} + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.pytest.ini_options] +pythonpath = ["src"] +testpaths = ["tests"] diff --git a/src/autoresearch_quantum/__init__.py b/src/autoresearch_quantum/__init__.py new file mode 100644 index 0000000..2f181a7 --- /dev/null +++ b/src/autoresearch_quantum/__init__.py @@ -0,0 +1,6 @@ +"""Autoresearch harness for encoded magic-state preparation.""" + +from .config import load_rung_config +from .models import ExperimentSpec + +__all__ = ["ExperimentSpec", "load_rung_config"] diff --git a/src/autoresearch_quantum/__main__.py b/src/autoresearch_quantum/__main__.py new file mode 100644 index 0000000..a049ad7 --- /dev/null +++ b/src/autoresearch_quantum/__main__.py @@ -0,0 +1,5 @@ +from .cli import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/autoresearch_quantum/cli.py b/src/autoresearch_quantum/cli.py new file mode 100644 index 0000000..8316b55 --- /dev/null +++ b/src/autoresearch_quantum/cli.py @@ -0,0 +1,184 @@ +from __future__ import annotations + +import argparse +import json +from dataclasses import asdict, is_dataclass +from pathlib import Path +from typing import Any + +from .config import load_rung_config +from .models import ExperimentSpec +from .persistence.store import ResearchStore +from .ratchet.runner import AutoresearchHarness + + +def _parse_override(value: str) -> tuple[str, Any]: + key, raw = value.split("=", 1) + if raw.lower() in {"true", "false"}: + return key, raw.lower() == "true" + if raw.isdigit(): + return key, int(raw) + try: + return key, float(raw) + except ValueError: + pass + if raw.startswith("[") and raw.endswith("]"): + return key, json.loads(raw) + return key, raw + + +def _build_spec_from_config(config_path: Path, overrides: list[str]) -> tuple[Any, ExperimentSpec]: + rung_config = load_rung_config(config_path) + spec = rung_config.bootstrap_incumbent + update_payload = dict(_parse_override(item) for item in overrides) + if update_payload: + spec = spec.with_updates(**update_payload) + return rung_config, spec + + +def _print_json(payload: Any) -> None: + def _default(value: Any) -> Any: + if is_dataclass(value): + return asdict(value) + return str(value) + + print(json.dumps(payload, indent=2, default=_default)) + + +def _add_store_arg(parser: argparse.ArgumentParser) -> None: + parser.add_argument("--store-dir", default="data/default", help="Persistent result store directory.") + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Quantum autoresearch ratchet CLI") + _add_store_arg(parser) + + subparsers = parser.add_subparsers(dest="command", required=True) + + experiment = subparsers.add_parser("run-experiment", help="Run one local experiment.") + _add_store_arg(experiment) + experiment.add_argument("--config", required=True) + experiment.add_argument("--set", action="append", default=[], help="Override spec fields: key=value") + experiment.add_argument("--hardware", action="store_true", help="Also run hardware confirmation if enabled.") + + challenger_set = subparsers.add_parser("run-challenger-set", help="Evaluate one challenger neighborhood.") + _add_store_arg(challenger_set) + challenger_set.add_argument("--config", required=True) + + step = subparsers.add_parser("run-step", help="Run one ratchet step.") + _add_store_arg(step) + step.add_argument("--config", required=True) + step.add_argument("--hardware", action="store_true") + + rung = subparsers.add_parser("run-rung", help="Run a full rung.") + _add_store_arg(rung) + rung.add_argument("--config", required=True) + rung.add_argument("--hardware", action="store_true") + + ratchet = subparsers.add_parser("run-ratchet", help="Run multiple rung configs in order.") + _add_store_arg(ratchet) + ratchet.add_argument("--config", action="append", required=True) + ratchet.add_argument("--hardware", action="store_true") + + transfer = subparsers.add_parser("run-transfer", help="Evaluate a spec across multiple backends.") + _add_store_arg(transfer) + transfer.add_argument("--config", required=True) + transfer.add_argument("--set", action="append", default=[], help="Override spec fields: key=value") + transfer.add_argument("--backends", nargs="+", help="Backend names to evaluate on (overrides config).") + + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + + store = ResearchStore(args.store_dir) + harness = AutoresearchHarness(store) + + if args.command == "run-experiment": + rung_config, spec = _build_spec_from_config(Path(args.config), args.set) + record = harness.run_single_experiment( + spec, + rung_config, + promote_to_hardware=bool(args.hardware), + ) + _print_json( + { + "experiment_id": record.experiment_id, + "score": record.final_score, + "cheap_score": record.cheap_result.score, + "expensive_score": record.expensive_result.score if record.expensive_result else None, + "failure_mode": record.best_result.metrics.dominant_failure_mode, + } + ) + return 0 + + if args.command == "run-challenger-set": + rung_config = load_rung_config(args.config) + records = harness.run_challenger_set(rung_config) + _print_json( + [ + { + "experiment_id": record.experiment_id, + "mutation": record.mutation_note, + "cheap_score": record.cheap_result.score, + } + for record in records + ] + ) + return 0 + + if args.command == "run-step": + rung_config = load_rung_config(args.config) + step = harness.run_ratchet_step(rung_config, allow_hardware=bool(args.hardware)) + _print_json(step) + return 0 + + if args.command == "run-rung": + rung_config = load_rung_config(args.config) + steps, lesson, feedback = harness.run_rung(rung_config, allow_hardware=bool(args.hardware)) + _print_json({ + "steps": steps, + "lesson_path": str(store.rung_dir(rung_config.rung) / "lesson.md"), + "lesson": lesson, + "feedback_rules": len(feedback.rules), + "narrowed_dimensions": feedback.narrowed_dimensions, + }) + return 0 + + if args.command == "run-ratchet": + configs = [load_rung_config(path) for path in args.config] + results = harness.run_ratchet(configs, allow_hardware=bool(args.hardware)) + _print_json([ + { + "rung": lesson.rung, + "lesson": lesson, + "feedback_rules": len(feedback.rules), + } + for lesson, feedback in results + ]) + return 0 + + if args.command == "run-transfer": + from .execution.transfer import TransferEvaluator + rung_config, spec = _build_spec_from_config(Path(args.config), getattr(args, "set", [])) + backends = args.backends or rung_config.transfer_backends + if not backends: + print("Error: No backends specified. Use --backends or add transfer_backends to config.") + return 1 + evaluator = TransferEvaluator(harness.local_executor) + report = evaluator.evaluate_across_backends(spec, backends, rung_config) + _print_json({ + "spec_fingerprint": spec.fingerprint(), + "transfer_score": report.transfer_score, + "mean_score": report.mean_score, + "min_score": report.min_score, + "max_score": report.max_score, + "std_score": report.std_score, + "per_backend_scores": report.per_backend_scores, + }) + return 0 + + parser.error(f"Unknown command: {args.command}") + return 2 diff --git a/src/autoresearch_quantum/codes/__init__.py b/src/autoresearch_quantum/codes/__init__.py new file mode 100644 index 0000000..eb73076 --- /dev/null +++ b/src/autoresearch_quantum/codes/__init__.py @@ -0,0 +1 @@ +"""Code-specific utilities.""" diff --git a/src/autoresearch_quantum/codes/four_two_two.py b/src/autoresearch_quantum/codes/four_two_two.py new file mode 100644 index 0000000..bc4651f --- /dev/null +++ b/src/autoresearch_quantum/codes/four_two_two.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +from math import pi + +from qiskit import QuantumCircuit +from qiskit.quantum_info import SparsePauliOp, Statevector + + +DATA_QUBITS = 4 +MAGIC_PREP_QUBIT = 0 +SPECTATOR_LOGICAL_QUBIT = 1 +STABILIZERS = { + "z_stabilizer": SparsePauliOp.from_list([("ZZZZ", 1.0)]), + "x_stabilizer": SparsePauliOp.from_list([("XXXX", 1.0)]), +} +MEASUREMENT_OPERATORS = { + "logical_x": {0: "X", 2: "X"}, + "logical_y": {0: "Y", 1: "Z", 2: "X"}, + "spectator_z": {1: "Z", 2: "Z"}, +} + + +def apply_magic_seed(circuit: QuantumCircuit, qubit: int, style: str) -> None: + if style == "h_p": + circuit.h(qubit) + circuit.p(pi / 4, qubit) + return + if style == "ry_rz": + circuit.ry(pi / 2, qubit) + circuit.rz(pi / 4, qubit) + return + if style == "u_magic": + circuit.u(pi / 2, 0.0, pi / 4, qubit) + return + raise ValueError(f"Unsupported seed style: {style}") + + +def build_encoder(style: str = "cx_chain") -> QuantumCircuit: + circuit = QuantumCircuit(DATA_QUBITS, name=f"encoder_{style}") + if style == "cx_chain": + circuit.cx(0, 2) + circuit.cx(1, 0) + circuit.h(3) + circuit.cx(3, 0) + circuit.cx(3, 1) + circuit.cx(3, 2) + return circuit + if style == "cz_compiled": + circuit.h(2) + circuit.cz(0, 2) + circuit.h(2) + circuit.h(0) + circuit.cz(1, 0) + circuit.h(0) + circuit.h(3) + circuit.h(0) + circuit.cz(3, 0) + circuit.h(0) + circuit.h(1) + circuit.cz(3, 1) + circuit.h(1) + circuit.h(2) + circuit.cz(3, 2) + circuit.h(2) + return circuit + raise ValueError(f"Unsupported encoder style: {style}") + + +def build_preparation_circuit(seed_style: str = "h_p", encoder_style: str = "cx_chain") -> QuantumCircuit: + circuit = QuantumCircuit(DATA_QUBITS, name="prep_422_magic") + apply_magic_seed(circuit, MAGIC_PREP_QUBIT, seed_style) + circuit.compose(build_encoder(encoder_style), qubits=range(DATA_QUBITS), inplace=True) + return circuit + + +def encoded_magic_statevector() -> Statevector: + return Statevector.from_instruction(build_preparation_circuit()) + diff --git a/src/autoresearch_quantum/config.py b/src/autoresearch_quantum/config.py new file mode 100644 index 0000000..a07aa1e --- /dev/null +++ b/src/autoresearch_quantum/config.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any, Mapping + +import yaml + +from .models import ( + CostWeights, + ExperimentSpec, + HardwareConfig, + QualityWeights, + RungConfig, + ScoreConfig, + SearchSpaceConfig, + TierPolicyConfig, +) + + +def _quality_weights(data: Mapping[str, Any] | None) -> QualityWeights: + return QualityWeights(**dict(data or {})) + + +def _cost_weights(data: Mapping[str, Any] | None) -> CostWeights: + return CostWeights(**dict(data or {})) + + +def _score_config(data: Mapping[str, Any] | None) -> ScoreConfig: + payload = dict(data or {}) + return ScoreConfig( + name=payload.get("name", "weighted_acceptance_cost"), + cheap_quality=_quality_weights(payload.get("cheap_quality")), + expensive_quality=_quality_weights(payload.get("expensive_quality")), + cost_weights=_cost_weights(payload.get("cost_weights")), + base_cost=float(payload.get("base_cost", 1.0)), + ) + + +def _search_space_config(data: Mapping[str, Any] | None) -> SearchSpaceConfig: + payload = dict(data or {}) + return SearchSpaceConfig( + dimensions=dict(payload.get("dimensions", {})), + max_challengers_per_step=int(payload.get("max_challengers_per_step", 8)), + ) + + +def _tier_policy_config(data: Mapping[str, Any] | None) -> TierPolicyConfig: + return TierPolicyConfig(**dict(data or {})) + + +def _hardware_config(data: Mapping[str, Any] | None) -> HardwareConfig: + return HardwareConfig(**dict(data or {})) + + +def _experiment_spec(rung: int, data: Mapping[str, Any]) -> ExperimentSpec: + payload = dict(data) + payload["rung"] = rung + if "initial_layout" in payload and payload["initial_layout"] is not None: + payload["initial_layout"] = tuple(payload["initial_layout"]) + return ExperimentSpec(**payload) + + +def load_rung_config(path: str | Path) -> RungConfig: + config_path = Path(path) + with config_path.open("r", encoding="utf-8") as handle: + payload = yaml.safe_load(handle) + + rung = int(payload["rung"]) + return RungConfig( + rung=rung, + name=str(payload["name"]), + description=str(payload["description"]), + objective=str(payload["objective"]), + bootstrap_incumbent=_experiment_spec(rung, payload["bootstrap_incumbent"]), + search_space=_search_space_config(payload.get("search_space")), + tier_policy=_tier_policy_config(payload.get("tier_policy")), + score=_score_config(payload.get("score")), + step_budget=int(payload.get("step_budget", 3)), + patience=int(payload.get("patience", 2)), + hardware=_hardware_config(payload.get("hardware")), + transfer_backends=list(payload.get("transfer_backends", [])), + ) diff --git a/src/autoresearch_quantum/execution/__init__.py b/src/autoresearch_quantum/execution/__init__.py new file mode 100644 index 0000000..38c1723 --- /dev/null +++ b/src/autoresearch_quantum/execution/__init__.py @@ -0,0 +1 @@ +"""Execution backends and analyzers.""" diff --git a/src/autoresearch_quantum/execution/analysis.py b/src/autoresearch_quantum/execution/analysis.py new file mode 100644 index 0000000..477e891 --- /dev/null +++ b/src/autoresearch_quantum/execution/analysis.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +from collections import Counter +from math import sqrt +from statistics import fmean, pstdev +from typing import Any, Iterable + + +def local_memory_records(memory: list[str], creg_names: list[str]) -> list[dict[str, str]]: + records: list[dict[str, str]] = [] + ordered_names = list(reversed(creg_names)) + for shot in memory: + parts = shot.split(" ") + records.append(dict(zip(ordered_names, parts, strict=True))) + return records + + +def sampler_memory_records(bitstrings_by_register: dict[str, list[str]]) -> list[dict[str, str]]: + first_key = next(iter(bitstrings_by_register), None) + if first_key is None: + return [] + shots = len(bitstrings_by_register[first_key]) + records: list[dict[str, str]] = [] + for shot_index in range(shots): + records.append( + {name: bitstrings[shot_index] for name, bitstrings in bitstrings_by_register.items()} + ) + return records + + +def syndrome_outcomes(syndrome_bits: str, syndrome_labels: list[str]) -> dict[str, int]: + if not syndrome_labels: + return {} + least_significant_first = syndrome_bits[::-1] + return { + label: int(bit) + for label, bit in zip(syndrome_labels, least_significant_first, strict=True) + } + + +def postselection_passes(postselection: str, syndrome_labels: list[str], syndrome_bits: str) -> bool: + if postselection == "none" or not syndrome_labels: + return True + outcomes = syndrome_outcomes(syndrome_bits, syndrome_labels) + if postselection == "all_measured": + return all(bit == 0 for bit in outcomes.values()) + if postselection == "z_only": + return outcomes.get("z_stabilizer", 0) == 0 + if postselection == "x_only": + return outcomes.get("x_stabilizer", 0) == 0 + raise ValueError(f"Unsupported postselection rule: {postselection}") + + +def operator_eigenvalue(data_bits: str, measured_qubits: Iterable[int]) -> int: + least_significant_first = data_bits[::-1] + parity = sum(least_significant_first[index] == "1" for index in measured_qubits) + return 1 if parity % 2 == 0 else -1 + + +def summarize_context( + records: list[dict[str, str]], + syndrome_labels: list[str], + postselection: str, + operator: dict[int, str] | None = None, +) -> dict[str, Any]: + total_shots = len(records) + syndrome_counter: Counter[str] = Counter() + raw_data_counter: Counter[str] = Counter() + accepted_counter: Counter[str] = Counter() + accepted_values: list[int] = [] + accepted = 0 + + for shot in records: + syndrome_bits = shot.get("syndrome", "") + data_bits = shot.get("readout", "") + syndrome_counter[syndrome_bits] += 1 + raw_data_counter[data_bits] += 1 + passes = postselection_passes(postselection, syndrome_labels, syndrome_bits) + if not passes: + continue + accepted += 1 + accepted_counter[data_bits] += 1 + if operator is not None: + accepted_values.append(operator_eigenvalue(data_bits, operator.keys())) + + acceptance_rate = accepted / total_shots if total_shots else 0.0 + expectation = ( + sum(accepted_values) / len(accepted_values) + if accepted_values + else 0.0 + ) + return { + "total_shots": total_shots, + "accepted_shots": accepted, + "acceptance_rate": acceptance_rate, + "expectation": expectation, + "syndrome_counts": dict(syndrome_counter), + "raw_data_counts": dict(raw_data_counter), + "accepted_data_counts": dict(accepted_counter), + } + + +def logical_magic_witness(logical_x: float, logical_y: float, spectator_z: float) -> float: + witness = (1.0 + ((logical_x + logical_y) / sqrt(2.0))) / 2.0 + spectator_alignment = (1.0 + spectator_z) / 2.0 + value = witness * spectator_alignment + return max(0.0, min(1.0, value)) + + +def stability_score(values: list[float]) -> float: + if not values: + return 0.0 + if len(values) == 1: + return 1.0 + mean_value = fmean(values) + if abs(mean_value) < 1e-9: + return 0.0 + variation = pstdev(values) + return max(0.0, min(1.0, 1.0 - (variation / max(abs(mean_value), 1e-9)))) diff --git a/src/autoresearch_quantum/execution/backends.py b/src/autoresearch_quantum/execution/backends.py new file mode 100644 index 0000000..1a82677 --- /dev/null +++ b/src/autoresearch_quantum/execution/backends.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import os +from functools import lru_cache +from typing import Any + +from qiskit.providers.backend import BackendV2 + +from ..models import HardwareConfig + +try: + from qiskit_ibm_runtime import QiskitRuntimeService + from qiskit_ibm_runtime.fake_provider import FakeProviderForBackendV2 +except ImportError: # pragma: no cover - exercised only when hardware extras missing + QiskitRuntimeService = None + FakeProviderForBackendV2 = None + + +@lru_cache(maxsize=1) +def _fake_provider() -> Any: + if FakeProviderForBackendV2 is None: + raise RuntimeError("qiskit-ibm-runtime is required for fake backends.") + return FakeProviderForBackendV2() + + +def resolve_backend(name: str, hardware: HardwareConfig | None = None) -> BackendV2: + if name.startswith("fake_"): + return _fake_provider().backend(name) + + if QiskitRuntimeService is None: + raise RuntimeError( + "qiskit-ibm-runtime is not installed. Install the hardware extra to use IBM backends." + ) + + service_kwargs: dict[str, Any] = {} + if hardware and hardware.channel: + service_kwargs["channel"] = hardware.channel + if hardware and hardware.instance: + service_kwargs["instance"] = hardware.instance + if hardware: + token = os.getenv(hardware.token_env_var) + if token: + service_kwargs["token"] = token + + service = QiskitRuntimeService(**service_kwargs) if service_kwargs else QiskitRuntimeService() + return service.backend(name) + + +def backend_metadata(backend: BackendV2) -> dict[str, Any]: + operation_names = [] + if getattr(backend, "operation_names", None): + operation_names = sorted(list(backend.operation_names)) + coupling_map = getattr(backend, "coupling_map", None) + if coupling_map is None: + coupling_edges = 0 + elif hasattr(coupling_map, "get_edges"): + coupling_edges = len(coupling_map.get_edges()) + else: + coupling_edges = len(coupling_map) + + return { + "name": backend.name, + "num_qubits": getattr(backend, "num_qubits", None), + "operation_names": operation_names, + "coupling_edges": coupling_edges, + } diff --git a/src/autoresearch_quantum/execution/hardware.py b/src/autoresearch_quantum/execution/hardware.py new file mode 100644 index 0000000..10fc031 --- /dev/null +++ b/src/autoresearch_quantum/execution/hardware.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +from statistics import fmean + +from ..codes.four_two_two import MEASUREMENT_OPERATORS +from ..experiments.encoded_magic_state import build_circuit_bundle +from ..models import EvaluationMetrics, ExperimentSpec, RungConfig, TierResult +from ..scoring.score import score_metrics +from .analysis import logical_magic_witness, sampler_memory_records, stability_score, summarize_context +from .backends import backend_metadata, resolve_backend +from .transpile import circuit_metadata, count_two_qubit_gates, runtime_estimate, transpile_circuits + +try: + from qiskit_ibm_runtime import SamplerV2 +except ImportError: # pragma: no cover - exercised only when hardware extras missing + SamplerV2 = None + + +class IBMHardwareExecutor: + def evaluate(self, spec: ExperimentSpec, rung_config: RungConfig) -> TierResult: + if SamplerV2 is None: + raise RuntimeError( + "qiskit-ibm-runtime is not installed. Install the hardware extra to enable IBM execution." + ) + + backend_name = rung_config.hardware.backend_name or spec.target_backend + backend = resolve_backend(backend_name, rung_config.hardware) + bundle = build_circuit_bundle(spec) + context_names = ["acceptance", *bundle.witness_circuits.keys()] + raw_circuits = [bundle.acceptance, *bundle.witness_circuits.values()] + transpiled_contexts = transpile_circuits(raw_circuits, spec, backend) + circuits_by_name = dict(zip(context_names, transpiled_contexts, strict=True)) + + shots = rung_config.tier_policy.expensive_shots + repeats = rung_config.tier_policy.expensive_repeats + sampler = SamplerV2(mode=backend) + + aggregate: dict[str, list[dict[str, object]]] = {name: [] for name in context_names} + repeat_scores: list[float] = [] + notes: list[str] = [] + + for _ in range(repeats): + result = sampler.run(list(circuits_by_name.values()), shots=shots).result() + for context_name, pub_result, circuit in zip( + context_names, + result, + circuits_by_name.values(), + strict=True, + ): + records = sampler_memory_records( + { + name: bit_array.get_bitstrings() + for name, bit_array in pub_result.data.items() + } + ) + summary = summarize_context( + records, + syndrome_labels=list(circuit.metadata.get("syndrome_labels", [])), + postselection=str(circuit.metadata.get("postselection", "none")), + operator=MEASUREMENT_OPERATORS.get(context_name), + ) + aggregate[context_name].append(summary) + + x_value = float(aggregate["logical_x"][-1]["expectation"]) + y_value = float(aggregate["logical_y"][-1]["expectation"]) + spectator = float(aggregate["spectator_z"][-1]["expectation"]) + acceptance = float(aggregate["acceptance"][-1]["acceptance_rate"]) + repeat_scores.append(logical_magic_witness(x_value, y_value, spectator) * acceptance) + + acceptance_rate = fmean(float(item["acceptance_rate"]) for item in aggregate["acceptance"]) + logical_x = fmean(float(item["expectation"]) for item in aggregate["logical_x"]) + logical_y = fmean(float(item["expectation"]) for item in aggregate["logical_y"]) + spectator_z = fmean(float(item["expectation"]) for item in aggregate["spectator_z"]) + + metrics = EvaluationMetrics( + logical_magic_witness=logical_magic_witness(logical_x, logical_y, spectator_z), + acceptance_rate=acceptance_rate, + codespace_rate=fmean( + float(item["acceptance_rate"]) + for summaries in aggregate.values() + for item in summaries + ), + spectator_logical_z=spectator_z, + logical_x=logical_x, + logical_y=logical_y, + stability_score=stability_score(repeat_scores), + two_qubit_count=sum(count_two_qubit_gates(circuit) for circuit in circuits_by_name.values()), + depth=max(circuit.depth() for circuit in circuits_by_name.values()), + shot_count=shots * repeats * len(circuits_by_name), + runtime_estimate=sum(runtime_estimate(circuit) for circuit in circuits_by_name.values()), + queue_cost_proxy=1.0, + transpile_metadata={ + name: circuit_metadata(circuit, spec) for name, circuit in circuits_by_name.items() + }, + backend_metadata={"target_backend": backend_metadata(backend)}, + ) + metrics.dominant_failure_mode = ( + "hardware drift sensitivity" + if (metrics.stability_score or 1.0) < 0.75 + else "hardware confirmation run" + ) + + score, quality, _ = score_metrics(metrics, "expensive", rung_config.score) + notes.append( + f"Hardware-tier confirmation used backend {backend.name} with {shots} shots x {repeats} repeats." + ) + return TierResult( + tier="expensive", + score=score, + quality_estimate=quality, + metrics=metrics, + counts_summary={ + name: { + "mean_acceptance_rate": fmean(float(item["acceptance_rate"]) for item in summaries), + "mean_expectation": fmean(float(item["expectation"]) for item in summaries), + "latest": summaries[-1], + } + for name, summaries in aggregate.items() + }, + notes=notes, + ) diff --git a/src/autoresearch_quantum/execution/local.py b/src/autoresearch_quantum/execution/local.py new file mode 100644 index 0000000..9c072b1 --- /dev/null +++ b/src/autoresearch_quantum/execution/local.py @@ -0,0 +1,178 @@ +from __future__ import annotations + +import hashlib +from math import fsum +from statistics import fmean + +from qiskit.quantum_info import Statevector, state_fidelity +from qiskit_aer import AerSimulator +from qiskit_aer.noise import NoiseModel + +from ..codes.four_two_two import MEASUREMENT_OPERATORS +from ..experiments.encoded_magic_state import build_circuit_bundle +from ..models import EvaluationMetrics, ExperimentSpec, RungConfig, TierResult +from ..scoring.score import score_metrics +from .analysis import ( + local_memory_records, + logical_magic_witness, + stability_score, + summarize_context, +) +from .backends import backend_metadata, resolve_backend +from .transpile import circuit_metadata, count_two_qubit_gates, runtime_estimate, transpile_circuits + + +def _dominant_failure_mode(metrics: EvaluationMetrics) -> str: + if metrics.acceptance_rate < 0.45: + return "postselection collapse" + if metrics.logical_magic_witness is not None and metrics.logical_magic_witness < 0.65: + return "logical witness erosion" + if metrics.stability_score is not None and metrics.stability_score < 0.75: + return "noise sensitivity" + if metrics.two_qubit_count > 60 or metrics.depth > 120: + return "transpile cost explosion" + return "residual coherent/noisy error" + + +class LocalCheapExecutor: + def evaluate(self, spec: ExperimentSpec, rung_config: RungConfig) -> TierResult: + bundle = build_circuit_bundle(spec) + target_backend = resolve_backend(spec.target_backend, rung_config.hardware) + noise_backend_name = spec.noise_backend or spec.target_backend + noise_backend = resolve_backend(noise_backend_name, rung_config.hardware) + + transpiled_prep = transpile_circuits([bundle.prep], spec, target_backend)[0] + context_names = ["acceptance", *bundle.witness_circuits.keys()] + raw_circuits = [bundle.acceptance, *bundle.witness_circuits.values()] + transpiled_contexts = transpile_circuits(raw_circuits, spec, target_backend) + circuits_by_name = dict(zip(context_names, transpiled_contexts, strict=True)) + + ideal_fidelity = state_fidelity(Statevector.from_instruction(bundle.prep), bundle.target_state) + noisy_fidelity = None + shot_simulator: AerSimulator + density_simulator: AerSimulator | None = None + notes: list[str] = [] + + try: + noise_model = NoiseModel.from_backend(noise_backend) + shot_simulator = AerSimulator( + noise_model=noise_model, + basis_gates=noise_model.basis_gates, + coupling_map=getattr(noise_backend, "coupling_map", None), + ) + density_simulator = AerSimulator( + method="density_matrix", + noise_model=noise_model, + basis_gates=noise_model.basis_gates, + coupling_map=getattr(noise_backend, "coupling_map", None), + ) + except Exception as exc: # pragma: no cover - depends on backend capabilities + notes.append(f"Noise model unavailable, falling back to ideal simulation: {exc}") + shot_simulator = AerSimulator() + + if density_simulator is not None: + density_circuit = transpiled_prep.copy() + density_circuit.save_density_matrix() + density_result = density_simulator.run(density_circuit).result() + noisy_density = density_result.data(0)["density_matrix"] + noisy_fidelity = state_fidelity(noisy_density, bundle.target_state) + + repeats = spec.repeats or rung_config.tier_policy.cheap_repeats + shots = spec.shots or rung_config.tier_policy.cheap_shots + repeat_scores: list[float] = [] + aggregate: dict[str, list[dict[str, object]]] = {name: [] for name in context_names} + + for repeat_index in range(repeats): + for context_name, circuit in circuits_by_name.items(): + result = shot_simulator.run( + circuit, + shots=shots, + memory=True, + seed_simulator=int( + hashlib.sha256( + f"{spec.fingerprint()}-{repeat_index}".encode() + ).hexdigest()[:8], + 16, + ), + ).result() + memory = result.get_memory(circuit) + records = local_memory_records(memory, [creg.name for creg in circuit.cregs]) + operator = bundle.witness_circuits.get(context_name) + measurement_operator = None + if operator is not None: + measurement_operator = bundle.witness_circuits[context_name].metadata.get("operator") + summary = summarize_context( + records, + syndrome_labels=list(circuit.metadata.get("syndrome_labels", [])), + postselection=str(circuit.metadata.get("postselection", "none")), + operator=MEASUREMENT_OPERATORS.get(context_name), + ) + aggregate[context_name].append(summary) + + x_value = float(aggregate["logical_x"][-1]["expectation"]) + y_value = float(aggregate["logical_y"][-1]["expectation"]) + spectator = float(aggregate["spectator_z"][-1]["expectation"]) + acceptance = float(aggregate["acceptance"][-1]["acceptance_rate"]) + repeat_scores.append(logical_magic_witness(x_value, y_value, spectator) * acceptance) + + acceptance_rate = fmean(float(item["acceptance_rate"]) for item in aggregate["acceptance"]) + logical_x = fmean(float(item["expectation"]) for item in aggregate["logical_x"]) + logical_y = fmean(float(item["expectation"]) for item in aggregate["logical_y"]) + spectator_z = fmean(float(item["expectation"]) for item in aggregate["spectator_z"]) + witness = logical_magic_witness(logical_x, logical_y, spectator_z) + codespace_rate = fmean( + [ + float(item["acceptance_rate"]) + for summaries in aggregate.values() + for item in summaries + ] + ) + + total_two_qubit = sum(count_two_qubit_gates(circuit) for circuit in circuits_by_name.values()) + max_depth = max(circuit.depth() for circuit in circuits_by_name.values()) + total_runtime = fsum(runtime_estimate(circuit) for circuit in circuits_by_name.values()) + + metrics = EvaluationMetrics( + ideal_encoded_fidelity=ideal_fidelity, + noisy_encoded_fidelity=noisy_fidelity if noisy_fidelity is not None else ideal_fidelity, + logical_magic_witness=witness, + acceptance_rate=acceptance_rate, + codespace_rate=codespace_rate, + spectator_logical_z=spectator_z, + logical_x=logical_x, + logical_y=logical_y, + stability_score=stability_score(repeat_scores), + two_qubit_count=total_two_qubit, + depth=max_depth, + shot_count=shots * repeats * len(circuits_by_name), + runtime_estimate=total_runtime, + queue_cost_proxy=0.0, + transpile_metadata={ + name: circuit_metadata(circuit, spec) for name, circuit in circuits_by_name.items() + }, + backend_metadata={ + "target_backend": backend_metadata(target_backend), + "noise_backend": backend_metadata(noise_backend), + }, + ) + metrics.dominant_failure_mode = _dominant_failure_mode(metrics) + + score, quality, _ = score_metrics(metrics, "cheap", rung_config.score) + counts_summary = { + name: { + "mean_acceptance_rate": fmean(float(item["acceptance_rate"]) for item in summaries), + "mean_expectation": fmean(float(item["expectation"]) for item in summaries), + "latest": summaries[-1], + } + for name, summaries in aggregate.items() + } + notes.append(f"Cheap-tier proxy used {shots} shots x {repeats} repeats over {len(circuits_by_name)} circuits.") + + return TierResult( + tier="cheap", + score=score, + quality_estimate=quality, + metrics=metrics, + counts_summary=counts_summary, + notes=notes, + ) diff --git a/src/autoresearch_quantum/execution/transfer.py b/src/autoresearch_quantum/execution/transfer.py new file mode 100644 index 0000000..5b9074a --- /dev/null +++ b/src/autoresearch_quantum/execution/transfer.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +import logging +from dataclasses import replace +from statistics import fmean, stdev + +from ..models import ExperimentSpec, RungConfig, TransferReport +from .local import LocalCheapExecutor + +logger = logging.getLogger(__name__) + + +class TransferEvaluator: + """Evaluate a single spec across multiple backend noise models. + + The transfer_score is the minimum across backends (pessimistic), + which prevents overfitting to a single noise profile. + """ + + def __init__(self, executor: LocalCheapExecutor | None = None) -> None: + self.executor = executor or LocalCheapExecutor() + + def evaluate_across_backends( + self, + spec: ExperimentSpec, + backends: list[str], + rung_config: RungConfig, + ) -> TransferReport: + per_backend_scores: dict[str, float] = {} + per_backend_metrics = {} + + for backend_name in backends: + backend_spec = spec.with_updates( + target_backend=backend_name, + noise_backend=backend_name, + ) + result = self.executor.evaluate(backend_spec, rung_config) + per_backend_scores[backend_name] = result.score + per_backend_metrics[backend_name] = result.metrics + logger.info( + "Transfer eval: spec %s on %s -> score %.4f", + spec.fingerprint(), + backend_name, + result.score, + ) + + scores = list(per_backend_scores.values()) + mean_s = fmean(scores) + min_s = min(scores) + max_s = max(scores) + std_s = stdev(scores) if len(scores) > 1 else 0.0 + + return TransferReport( + spec=spec, + per_backend_scores=per_backend_scores, + per_backend_metrics=per_backend_metrics, + mean_score=mean_s, + min_score=min_s, + max_score=max_s, + std_score=std_s, + transfer_score=min_s, # pessimistic + ) diff --git a/src/autoresearch_quantum/execution/transpile.py b/src/autoresearch_quantum/execution/transpile.py new file mode 100644 index 0000000..6f6e5a3 --- /dev/null +++ b/src/autoresearch_quantum/execution/transpile.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from typing import Any + +from qiskit import QuantumCircuit, transpile +from qiskit.providers.backend import BackendV2 + +from ..models import ExperimentSpec + + +def transpile_circuits( + circuits: list[QuantumCircuit], + spec: ExperimentSpec, + backend: BackendV2, +) -> list[QuantumCircuit]: + transpiled = transpile( + circuits, + backend=backend, + optimization_level=spec.optimization_level, + layout_method=spec.layout_method, + routing_method=spec.routing_method, + approximation_degree=spec.approximation_degree, + initial_layout=list(spec.initial_layout) if spec.initial_layout else None, + ) + if isinstance(transpiled, QuantumCircuit): + return [transpiled] + return list(transpiled) + + +def count_two_qubit_gates(circuit: QuantumCircuit) -> int: + return sum(1 for instruction in circuit.data if instruction.operation.num_qubits == 2) + + +def runtime_estimate(circuit: QuantumCircuit) -> float: + resets = sum(1 for instruction in circuit.data if instruction.operation.name == "reset") + return float(circuit.depth() + (3 * count_two_qubit_gates(circuit)) + (5 * resets)) + + +def circuit_metadata(circuit: QuantumCircuit, spec: ExperimentSpec) -> dict[str, Any]: + return { + "optimization_level": spec.optimization_level, + "layout_method": spec.layout_method, + "routing_method": spec.routing_method, + "approximation_degree": spec.approximation_degree, + "depth": circuit.depth(), + "size": circuit.size(), + "two_qubit_count": count_two_qubit_gates(circuit), + } diff --git a/src/autoresearch_quantum/experiments/__init__.py b/src/autoresearch_quantum/experiments/__init__.py new file mode 100644 index 0000000..e0af801 --- /dev/null +++ b/src/autoresearch_quantum/experiments/__init__.py @@ -0,0 +1 @@ +"""Experiment builders.""" diff --git a/src/autoresearch_quantum/experiments/encoded_magic_state.py b/src/autoresearch_quantum/experiments/encoded_magic_state.py new file mode 100644 index 0000000..e51b14b --- /dev/null +++ b/src/autoresearch_quantum/experiments/encoded_magic_state.py @@ -0,0 +1,163 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister +from qiskit.quantum_info import Statevector + +from ..codes.four_two_two import DATA_QUBITS, MEASUREMENT_OPERATORS, build_preparation_circuit +from ..models import ExperimentSpec + + +@dataclass(frozen=True) +class MeasurementCircuitBundle: + prep: QuantumCircuit + acceptance: QuantumCircuit + witness_circuits: dict[str, QuantumCircuit] + target_state: Statevector + + +def _verification_checks(spec: ExperimentSpec) -> list[str]: + if spec.verification == "none": + return [] + if spec.verification == "z_only": + return ["z_stabilizer"] + if spec.verification == "x_only": + return ["x_stabilizer"] + if spec.verification == "both": + return ["z_stabilizer", "x_stabilizer"] + raise ValueError(f"Unsupported verification mode: {spec.verification}") + + +def _ancilla_count(spec: ExperimentSpec, checks: list[str]) -> int: + if not checks: + return 0 + if spec.ancilla_strategy == "dedicated_pair": + return len(checks) + if spec.ancilla_strategy == "reused_single": + return 1 + raise ValueError(f"Unsupported ancilla strategy: {spec.ancilla_strategy}") + + +def _add_z_check(circuit: QuantumCircuit, ancilla: int, data_qubits: list[int]) -> None: + for qubit in data_qubits: + circuit.cx(qubit, ancilla) + + +def _add_x_check(circuit: QuantumCircuit, ancilla: int, data_qubits: list[int]) -> None: + circuit.h(ancilla) + for qubit in data_qubits: + circuit.cx(ancilla, qubit) + circuit.h(ancilla) + + +def _measure_operator(circuit: QuantumCircuit, data_qubits: list[int], operator: dict[int, str]) -> None: + for qubit in data_qubits: + basis = operator.get(qubit, "Z") + if basis == "X": + circuit.h(qubit) + elif basis == "Y": + circuit.sdg(qubit) + circuit.h(qubit) + elif basis == "Z": + continue + else: + raise ValueError(f"Unsupported basis: {basis}") + + +def _attach_verification( + circuit: QuantumCircuit, + spec: ExperimentSpec, + data_qubits: list[int], + ancilla_qubits: list[int], + syndrome_bits: list[int], +) -> list[str]: + checks = _verification_checks(spec) + labels: list[str] = [] + if not checks: + return labels + + if spec.ancilla_strategy == "dedicated_pair": + for ancilla_qubit, syndrome_bit, label in zip(ancilla_qubits, syndrome_bits, checks, strict=True): + if label == "z_stabilizer": + _add_z_check(circuit, ancilla_qubit, data_qubits) + else: + _add_x_check(circuit, ancilla_qubit, data_qubits) + circuit.measure(ancilla_qubit, syndrome_bit) + labels.append(label) + return labels + + ancilla_qubit = ancilla_qubits[0] + for syndrome_bit, label in zip(syndrome_bits, checks, strict=True): + if label == "z_stabilizer": + _add_z_check(circuit, ancilla_qubit, data_qubits) + else: + _add_x_check(circuit, ancilla_qubit, data_qubits) + circuit.measure(ancilla_qubit, syndrome_bit) + labels.append(label) + if label != checks[-1]: + circuit.reset(ancilla_qubit) + return labels + + +def _base_circuit(spec: ExperimentSpec, context_name: str, operator: dict[int, str] | None) -> QuantumCircuit: + checks = _verification_checks(spec) + ancilla_count = _ancilla_count(spec, checks) + syndrome_bits = len(checks) + + data = QuantumRegister(DATA_QUBITS, "data") + ancilla = QuantumRegister(ancilla_count, "anc") if ancilla_count else None + syndrome = ClassicalRegister(syndrome_bits, "syndrome") if syndrome_bits else None + readout = ClassicalRegister(DATA_QUBITS, "readout") + + registers = [data] + if ancilla is not None: + registers.append(ancilla) + if syndrome is not None: + registers.append(syndrome) + registers.append(readout) + + circuit = QuantumCircuit(*registers, name=context_name) + circuit.compose( + build_preparation_circuit(spec.seed_style, spec.encoder_style), + qubits=list(range(DATA_QUBITS)), + inplace=True, + ) + + syndrome_labels: list[str] = [] + if ancilla is not None and syndrome is not None: + syndrome_labels = _attach_verification( + circuit, + spec, + data_qubits=list(range(DATA_QUBITS)), + ancilla_qubits=list(range(DATA_QUBITS, DATA_QUBITS + ancilla_count)), + syndrome_bits=list(range(syndrome_bits)), + ) + + if operator is not None: + _measure_operator(circuit, list(range(DATA_QUBITS)), operator) + + circuit.measure(data, readout) + circuit.metadata = { + "context": context_name, + "syndrome_labels": syndrome_labels, + "postselection": spec.postselection, + "logical_operator": operator, + } + return circuit + + +def build_circuit_bundle(spec: ExperimentSpec) -> MeasurementCircuitBundle: + prep = build_preparation_circuit(spec.seed_style, spec.encoder_style) + witness_circuits = { + name: _base_circuit(spec, name, operator) + for name, operator in MEASUREMENT_OPERATORS.items() + } + acceptance = _base_circuit(spec, "acceptance", operator=None) + target_state = Statevector.from_instruction(build_preparation_circuit()) + return MeasurementCircuitBundle( + prep=prep, + acceptance=acceptance, + witness_circuits=witness_circuits, + target_state=target_state, + ) diff --git a/src/autoresearch_quantum/lessons/__init__.py b/src/autoresearch_quantum/lessons/__init__.py new file mode 100644 index 0000000..08fb833 --- /dev/null +++ b/src/autoresearch_quantum/lessons/__init__.py @@ -0,0 +1 @@ +"""Lesson extraction.""" diff --git a/src/autoresearch_quantum/lessons/extractor.py b/src/autoresearch_quantum/lessons/extractor.py new file mode 100644 index 0000000..b465237 --- /dev/null +++ b/src/autoresearch_quantum/lessons/extractor.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +from collections import defaultdict +from statistics import fmean +from typing import Any + +from ..models import LessonFeedback, RungConfig, RungLesson +from .feedback import build_lesson_feedback + + +def _record_score(record: dict[str, Any]) -> float: + return float(record.get("final_score", 0.0)) + + +def extract_rung_lesson( + rung_config: RungConfig, + experiment_records: list[dict[str, Any]], + ratchet_steps: list[dict[str, Any]], +) -> tuple[RungLesson, LessonFeedback]: + if not experiment_records: + empty = "No experiments were recorded for this rung." + empty_lesson = RungLesson( + rung=rung_config.rung, + name=rung_config.name, + objective=rung_config.objective, + what_helped=[empty], + what_hurt=[empty], + what_seems_invariant=[empty], + what_seems_hardware_specific=[empty], + what_should_be_tested_next=[empty], + what_should_be_promoted_to_next_rung=[empty], + what_should_be_discarded=[empty], + narrative=empty, + ) + empty_feedback = LessonFeedback( + rung=rung_config.rung, + rules=[], + narrowed_dimensions={}, + best_spec_fields={}, + ) + return empty_lesson, empty_feedback + + overall_mean = fmean(_record_score(record) for record in experiment_records) + top_records = sorted(experiment_records, key=_record_score, reverse=True)[: min(3, len(experiment_records))] + + value_effects: list[tuple[float, str, Any, int]] = [] + hardware_deltas: list[tuple[float, str, Any]] = [] + for dimension in rung_config.search_space.dimensions: + grouped: dict[Any, list[dict[str, Any]]] = defaultdict(list) + for record in experiment_records: + grouped[record["spec"][dimension]].append(record) + for value, records in grouped.items(): + mean_score = fmean(_record_score(record) for record in records) + value_effects.append((mean_score - overall_mean, dimension, value, len(records))) + + hardware_scores = [ + float(record["expensive_result"]["score"]) - float(record["cheap_result"]["score"]) + for record in records + if record.get("expensive_result") + ] + if hardware_scores: + hardware_deltas.append((fmean(hardware_scores), dimension, value)) + + helped = [ + f"{dimension}={value} improved mean score by {delta:+.4f} over {samples} runs." + for delta, dimension, value, samples in sorted(value_effects, reverse=True)[:3] + ] + hurt = [ + f"{dimension}={value} hurt mean score by {delta:+.4f} over {samples} runs." + for delta, dimension, value, samples in sorted(value_effects)[:3] + ] + + invariants: list[str] = [] + for dimension in rung_config.search_space.dimensions: + values = {record["spec"][dimension] for record in top_records} + if len(values) == 1: + value = next(iter(values)) + invariants.append(f"Top-ranked experiments consistently kept {dimension}={value}.") + + hardware_specific = [ + f"{dimension}={value} shifted hardware score by {delta:+.4f} relative to cheap-tier screening." + for delta, dimension, value in sorted(hardware_deltas, key=lambda item: abs(item[0]), reverse=True)[:3] + ] or ["No hardware-specific divergence was observed in this rung."] + + explored_values = { + dimension: {record["spec"][dimension] for record in experiment_records} + for dimension in rung_config.search_space.dimensions + } + should_test_next = [] + for dimension, values in rung_config.search_space.dimensions.items(): + remaining = [value for value in values if value not in explored_values[dimension]] + if remaining: + should_test_next.append(f"Probe remaining {dimension} values: {remaining}.") + if not should_test_next: + should_test_next.append( + "Lift the best settings into a new experiment family or backend target for transfer testing." + ) + + step_lessons = [step["distilled_lesson"] for step in ratchet_steps[-3:] if step.get("distilled_lesson")] + promoted = step_lessons or ["Carry forward the best incumbent settings as priors for the next rung."] + discarded = [ + entry + for entry in hurt + if "over 1 runs" not in entry + ] or ["No setting is discarded yet; collect more evidence before pruning."] + + narrative_lines = [ + f"# Rung {rung_config.rung}: {rung_config.name}", + "", + f"Objective: {rung_config.objective}", + "", + "## What Helped", + *[f"- {item}" for item in helped], + "", + "## What Hurt", + *[f"- {item}" for item in hurt], + "", + "## Invariants", + *[f"- {item}" for item in invariants or ['No invariant emerged strongly enough yet.']], + "", + "## Hardware-Specific Effects", + *[f"- {item}" for item in hardware_specific], + "", + "## Next Tests", + *[f"- {item}" for item in should_test_next], + "", + "## Promote Forward", + *[f"- {item}" for item in promoted], + "", + "## Discard", + *[f"- {item}" for item in discarded], + ] + + lesson = RungLesson( + rung=rung_config.rung, + name=rung_config.name, + objective=rung_config.objective, + what_helped=helped, + what_hurt=hurt, + what_seems_invariant=invariants or ["No invariant emerged strongly enough yet."], + what_seems_hardware_specific=hardware_specific, + what_should_be_tested_next=should_test_next, + what_should_be_promoted_to_next_rung=promoted, + what_should_be_discarded=discarded, + narrative="\n".join(narrative_lines), + ) + feedback = build_lesson_feedback( + rung_config.rung, + experiment_records, + rung_config.search_space, + ) + return lesson, feedback diff --git a/src/autoresearch_quantum/lessons/feedback.py b/src/autoresearch_quantum/lessons/feedback.py new file mode 100644 index 0000000..6fc095b --- /dev/null +++ b/src/autoresearch_quantum/lessons/feedback.py @@ -0,0 +1,199 @@ +from __future__ import annotations + +from collections import defaultdict +from itertools import combinations +from statistics import fmean, stdev +from typing import Any + +from ..models import LessonFeedback, SearchRule, SearchSpaceConfig + + +def _record_score(record: dict[str, Any]) -> float: + return float(record.get("final_score", 0.0)) + + +def extract_search_rules( + experiment_records: list[dict[str, Any]], + search_space: SearchSpaceConfig, + min_samples: int = 2, + effect_threshold: float = 0.005, +) -> list[SearchRule]: + """Extract machine-readable search rules from experiment data. + + Analyses per-dimension mean effects, interaction detection, and + consistency patterns to produce prefer/avoid/fix directives. + """ + if not experiment_records: + return [] + + overall_mean = fmean(_record_score(r) for r in experiment_records) + rules: list[SearchRule] = [] + dim_names = list(search_space.dimensions.keys()) + + # Per-dimension analysis + for dim in dim_names: + grouped: dict[Any, list[float]] = defaultdict(list) + for record in experiment_records: + val = record["spec"].get(dim) + if val is not None: + grouped[val].append(_record_score(record)) + + for value, scores in grouped.items(): + if len(scores) < min_samples: + continue + mean_score = fmean(scores) + delta = mean_score - overall_mean + confidence = min(1.0, len(scores) / max(len(experiment_records), 1)) + + if delta > effect_threshold: + rules.append(SearchRule( + dimension=dim, + action="prefer", + value=value, + confidence=confidence, + reason=f"mean score {mean_score:.4f} is {delta:+.4f} above overall mean ({len(scores)} samples)", + )) + elif delta < -effect_threshold: + rules.append(SearchRule( + dimension=dim, + action="avoid", + value=value, + confidence=confidence, + reason=f"mean score {mean_score:.4f} is {delta:+.4f} below overall mean ({len(scores)} samples)", + )) + + # Check for "fix" rules: if top-K experiments all share a value + top_k = min(5, max(3, len(experiment_records) // 3)) + top_records = sorted(experiment_records, key=_record_score, reverse=True)[:top_k] + if len(top_records) >= 3: + for dim in dim_names: + values_in_top = {r["spec"].get(dim) for r in top_records} + if len(values_in_top) == 1: + fixed_value = next(iter(values_in_top)) + # Also check it's better than alternatives + all_with_value = [ + _record_score(r) for r in experiment_records + if r["spec"].get(dim) == fixed_value + ] + all_without = [ + _record_score(r) for r in experiment_records + if r["spec"].get(dim) != fixed_value + ] + if all_without and fmean(all_with_value) > fmean(all_without): + rules.append(SearchRule( + dimension=dim, + action="fix", + value=fixed_value, + confidence=min(1.0, len(top_records) / len(experiment_records)), + reason=f"all top-{len(top_records)} experiments use {dim}={fixed_value}", + )) + + # Interaction detection: for dimension pairs, check if joint effect > sum of marginals + for dim_a, dim_b in combinations(dim_names, 2): + marginal_a: dict[Any, float] = {} + marginal_b: dict[Any, float] = {} + joint: dict[tuple[Any, Any], list[float]] = defaultdict(list) + + for record in experiment_records: + va = record["spec"].get(dim_a) + vb = record["spec"].get(dim_b) + score = _record_score(record) + joint[(va, vb)].append(score) + + # Need enough joint observations + if all(len(v) < min_samples for v in joint.values()): + continue + + # Compute marginals + for dim, marginals in [(dim_a, marginal_a), (dim_b, marginal_b)]: + grouped_m: dict[Any, list[float]] = defaultdict(list) + for record in experiment_records: + grouped_m[record["spec"].get(dim)].append(_record_score(record)) + for val, scores in grouped_m.items(): + marginals[val] = fmean(scores) - overall_mean + + # Check for interactions + for (va, vb), scores in joint.items(): + if len(scores) < min_samples: + continue + joint_effect = fmean(scores) - overall_mean + expected_additive = marginal_a.get(va, 0.0) + marginal_b.get(vb, 0.0) + interaction = joint_effect - expected_additive + if abs(interaction) > effect_threshold * 2: + action = "prefer" if interaction > 0 else "avoid" + rules.append(SearchRule( + dimension=f"{dim_a}+{dim_b}", + action=action, + value=(va, vb), + confidence=min(1.0, len(scores) / len(experiment_records)), + reason=( + f"interaction effect {interaction:+.4f} " + f"(joint={joint_effect:+.4f}, expected_additive={expected_additive:+.4f})" + ), + )) + + return rules + + +def narrow_search_space( + search_space: SearchSpaceConfig, + rules: list[SearchRule], + min_values_per_dim: int = 2, +) -> SearchSpaceConfig: + """Prune search space based on lesson rules. + + - Remove "avoid" values (keeping at least min_values_per_dim per dimension) + - Constrain "fix" dimensions to the fixed value only + """ + new_dims: dict[str, list[Any]] = {} + + # Collect avoids and fixes per simple dimension + avoid_map: dict[str, set[Any]] = defaultdict(set) + fix_map: dict[str, Any] = {} + for rule in rules: + if "+" in str(rule.dimension): + continue # Skip interaction rules for narrowing + if rule.action == "avoid" and rule.confidence >= 0.3: + avoid_map[rule.dimension].add(rule.value) + elif rule.action == "fix" and rule.confidence >= 0.4: + fix_map[rule.dimension] = rule.value + + for dim, values in search_space.dimensions.items(): + if dim in fix_map and fix_map[dim] in values: + new_dims[dim] = [fix_map[dim]] + elif dim in avoid_map: + filtered = [v for v in values if v not in avoid_map[dim]] + if len(filtered) >= min_values_per_dim: + new_dims[dim] = filtered + else: + new_dims[dim] = list(values) # Keep all if pruning too aggressive + else: + new_dims[dim] = list(values) + + return SearchSpaceConfig( + dimensions=new_dims, + max_challengers_per_step=search_space.max_challengers_per_step, + ) + + +def build_lesson_feedback( + rung: int, + experiment_records: list[dict[str, Any]], + search_space: SearchSpaceConfig, +) -> LessonFeedback: + """Build a complete LessonFeedback from experiment data.""" + rules = extract_search_rules(experiment_records, search_space) + narrowed = narrow_search_space(search_space, rules) + + # Extract best spec fields from top experiment + best_spec_fields: dict[str, Any] = {} + if experiment_records: + best = max(experiment_records, key=_record_score) + best_spec_fields = dict(best.get("spec", {})) + + return LessonFeedback( + rung=rung, + rules=rules, + narrowed_dimensions=narrowed.dimensions, + best_spec_fields=best_spec_fields, + ) diff --git a/src/autoresearch_quantum/models.py b/src/autoresearch_quantum/models.py new file mode 100644 index 0000000..7f065b0 --- /dev/null +++ b/src/autoresearch_quantum/models.py @@ -0,0 +1,262 @@ +from __future__ import annotations + +from dataclasses import asdict, dataclass, field, replace +from datetime import datetime, timezone +from hashlib import sha1 +from typing import Any + + +def utc_timestamp() -> str: + return datetime.now(timezone.utc).isoformat() + + +def short_hash(payload: str, length: int = 10) -> str: + return sha1(payload.encode("utf-8")).hexdigest()[:length] + + +@dataclass(frozen=True) +class ExperimentSpec: + rung: int + seed_style: str = "h_p" + encoder_style: str = "cx_chain" + verification: str = "both" + postselection: str = "all_measured" + ancilla_strategy: str = "dedicated_pair" + optimization_level: int = 2 + layout_method: str = "sabre" + routing_method: str = "sabre" + approximation_degree: float = 1.0 + target_backend: str = "fake_brisbane" + noise_backend: str | None = None + initial_layout: tuple[int, ...] | None = None + shots: int = 2048 + repeats: int = 3 + notes: str = "" + + def with_updates(self, **changes: Any) -> "ExperimentSpec": + if "initial_layout" in changes and isinstance(changes["initial_layout"], list): + changes["initial_layout"] = tuple(changes["initial_layout"]) + return replace(self, **changes) + + def identity_payload(self) -> str: + payload = asdict(self) + return repr(payload) + + def fingerprint(self) -> str: + return short_hash(self.identity_payload()) + + +@dataclass(frozen=True) +class QualityWeights: + ideal_fidelity: float = 0.0 + noisy_fidelity: float = 0.0 + logical_witness: float = 0.0 + codespace_rate: float = 0.0 + stability_score: float = 0.0 + spectator_alignment: float = 0.0 + + +@dataclass(frozen=True) +class CostWeights: + two_qubit_count: float = 0.08 + depth: float = 0.01 + shot_count: float = 0.00015 + runtime_estimate: float = 0.02 + queue_cost_proxy: float = 0.3 + + +@dataclass(frozen=True) +class ScoreConfig: + name: str = "weighted_acceptance_cost" + cheap_quality: QualityWeights = field(default_factory=QualityWeights) + expensive_quality: QualityWeights = field(default_factory=QualityWeights) + cost_weights: CostWeights = field(default_factory=CostWeights) + base_cost: float = 1.0 + + +@dataclass(frozen=True) +class SearchSpaceConfig: + dimensions: dict[str, list[Any]] = field(default_factory=dict) + max_challengers_per_step: int = 8 + + +@dataclass(frozen=True) +class TierPolicyConfig: + cheap_margin: float = 0.01 + confirmation_margin: float = 0.0 + cheap_shots: int = 2048 + expensive_shots: int = 4096 + cheap_repeats: int = 3 + expensive_repeats: int = 2 + noisy_simulator: str = "aer" + promote_top_k: int = 2 + enable_hardware: bool = False + confirm_incumbent_on_hardware: bool = True + hardware_budget: int = 0 + + +@dataclass(frozen=True) +class HardwareConfig: + backend_name: str | None = None + channel: str | None = None + instance: str | None = None + token_env_var: str = "QISKIT_IBM_TOKEN" + + +@dataclass(frozen=True) +class RungConfig: + rung: int + name: str + description: str + objective: str + bootstrap_incumbent: ExperimentSpec + search_space: SearchSpaceConfig + tier_policy: TierPolicyConfig + score: ScoreConfig + step_budget: int = 3 + patience: int = 2 + hardware: HardwareConfig = field(default_factory=HardwareConfig) + transfer_backends: list[str] = field(default_factory=list) + + +@dataclass +class EvaluationMetrics: + ideal_encoded_fidelity: float | None = None + noisy_encoded_fidelity: float | None = None + logical_magic_witness: float | None = None + acceptance_rate: float = 1.0 + codespace_rate: float | None = None + spectator_logical_z: float | None = None + logical_x: float | None = None + logical_y: float | None = None + stability_score: float | None = None + two_qubit_count: int = 0 + depth: int = 0 + shot_count: int = 0 + runtime_estimate: float = 0.0 + queue_cost_proxy: float = 0.0 + total_cost: float = 0.0 + dominant_failure_mode: str = "unclassified" + transpile_metadata: dict[str, Any] = field(default_factory=dict) + backend_metadata: dict[str, Any] = field(default_factory=dict) + extra: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class TierResult: + tier: str + score: float + quality_estimate: float + metrics: EvaluationMetrics + counts_summary: dict[str, Any] = field(default_factory=dict) + notes: list[str] = field(default_factory=list) + created_at: str = field(default_factory=utc_timestamp) + + +@dataclass +class ExperimentRecord: + experiment_id: str + rung: int + role: str + parent_incumbent_id: str | None + mutation_note: str + spec: ExperimentSpec + cheap_result: TierResult + expensive_result: TierResult | None = None + final_score: float = 0.0 + promoted_to_expensive: bool = False + became_incumbent: bool = False + created_at: str = field(default_factory=utc_timestamp) + + @property + def best_result(self) -> TierResult: + return self.expensive_result or self.cheap_result + + +@dataclass +class RatchetStepRecord: + step_index: int + rung: int + incumbent_before_id: str + challengers_tested: list[str] + promoted_challengers: list[str] + winner_id: str + winning_margin: float + cheap_tier_justification: str + expensive_tier_result: str + distilled_lesson: str + created_at: str = field(default_factory=utc_timestamp) + + +@dataclass +class RungLesson: + rung: int + name: str + objective: str + what_helped: list[str] + what_hurt: list[str] + what_seems_invariant: list[str] + what_seems_hardware_specific: list[str] + what_should_be_tested_next: list[str] + what_should_be_promoted_to_next_rung: list[str] + what_should_be_discarded: list[str] + narrative: str + created_at: str = field(default_factory=utc_timestamp) + + +@dataclass(frozen=True) +class SearchRule: + """Machine-readable directive extracted from lesson analysis.""" + dimension: str + action: str # "prefer", "avoid", "fix" + value: Any + confidence: float # 0.0–1.0, based on sample proportion + reason: str + + +@dataclass(frozen=True) +class LessonFeedback: + """Machine-readable counterpart to RungLesson for search guidance.""" + rung: int + rules: list[SearchRule] + narrowed_dimensions: dict[str, list[Any]] + best_spec_fields: dict[str, Any] + transfer_scores: dict[str, float] = field(default_factory=dict) + + +@dataclass +class TransferReport: + """Cross-backend evaluation results for a single spec.""" + spec: ExperimentSpec + per_backend_scores: dict[str, float] + per_backend_metrics: dict[str, EvaluationMetrics] + mean_score: float + min_score: float + max_score: float + std_score: float + transfer_score: float # pessimistic = min(scores) + + +@dataclass +class FactoryMetrics: + """Factory-style throughput metrics attached to EvaluationMetrics.extra.""" + accepted_states_per_shot: float + logical_error_per_accepted: float + accepted_per_unit_cost: float + quality_yield: float + cost_per_accepted: float + throughput_proxy: float + + +@dataclass +class RungProgress: + """Resumability state for a rung execution.""" + rung: int + steps_completed: int + patience_remaining: int + current_incumbent_id: str + completed: bool = False + + +def generate_experiment_id(spec: ExperimentSpec, role: str) -> str: + return f"r{spec.rung}-{role}-{spec.fingerprint()}" diff --git a/src/autoresearch_quantum/persistence/__init__.py b/src/autoresearch_quantum/persistence/__init__.py new file mode 100644 index 0000000..575470d --- /dev/null +++ b/src/autoresearch_quantum/persistence/__init__.py @@ -0,0 +1 @@ +"""Persistence for experiment records and lessons.""" diff --git a/src/autoresearch_quantum/persistence/store.py b/src/autoresearch_quantum/persistence/store.py new file mode 100644 index 0000000..4dfd7c6 --- /dev/null +++ b/src/autoresearch_quantum/persistence/store.py @@ -0,0 +1,135 @@ +from __future__ import annotations + +import json +from dataclasses import asdict +from pathlib import Path +from typing import Any + +from ..models import ( + ExperimentRecord, + ExperimentSpec, + LessonFeedback, + RatchetStepRecord, + RungLesson, + RungProgress, + SearchRule, +) + + +class ResearchStore: + def __init__(self, root: str | Path) -> None: + self.root = Path(root) + self.root.mkdir(parents=True, exist_ok=True) + + def rung_dir(self, rung: int) -> Path: + path = self.root / f"rung_{rung}" + path.mkdir(parents=True, exist_ok=True) + return path + + def experiment_dir(self, rung: int) -> Path: + path = self.rung_dir(rung) / "experiments" + path.mkdir(parents=True, exist_ok=True) + return path + + def ratchet_dir(self, rung: int) -> Path: + path = self.rung_dir(rung) / "ratchet_steps" + path.mkdir(parents=True, exist_ok=True) + return path + + def _write_json(self, path: Path, payload: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") + + def save_experiment(self, record: ExperimentRecord) -> Path: + path = self.experiment_dir(record.rung) / f"{record.experiment_id}.json" + self._write_json(path, asdict(record)) + return path + + def load_experiment(self, rung: int, experiment_id: str) -> dict[str, Any]: + path = self.experiment_dir(rung) / f"{experiment_id}.json" + return json.loads(path.read_text(encoding="utf-8")) + + def list_experiments(self, rung: int) -> list[dict[str, Any]]: + return [ + json.loads(path.read_text(encoding="utf-8")) + for path in sorted(self.experiment_dir(rung).glob("*.json")) + ] + + def save_ratchet_step(self, step: RatchetStepRecord) -> Path: + path = self.ratchet_dir(step.rung) / f"step_{step.step_index:04d}.json" + self._write_json(path, asdict(step)) + return path + + def list_ratchet_steps(self, rung: int) -> list[dict[str, Any]]: + return [ + json.loads(path.read_text(encoding="utf-8")) + for path in sorted(self.ratchet_dir(rung).glob("*.json")) + ] + + def set_incumbent(self, rung: int, experiment_id: str) -> Path: + path = self.rung_dir(rung) / "incumbent.json" + self._write_json(path, {"experiment_id": experiment_id}) + return path + + def load_incumbent_id(self, rung: int) -> str | None: + path = self.rung_dir(rung) / "incumbent.json" + if not path.exists(): + return None + payload = json.loads(path.read_text(encoding="utf-8")) + return str(payload["experiment_id"]) + + def save_lesson(self, lesson: RungLesson) -> Path: + json_path = self.rung_dir(lesson.rung) / "lesson.json" + md_path = self.rung_dir(lesson.rung) / "lesson.md" + self._write_json(json_path, asdict(lesson)) + md_path.write_text(lesson.narrative, encoding="utf-8") + return json_path + + def save_lesson_feedback(self, feedback: LessonFeedback) -> Path: + path = self.rung_dir(feedback.rung) / "lesson_feedback.json" + payload = { + "rung": feedback.rung, + "rules": [asdict(r) for r in feedback.rules], + "narrowed_dimensions": feedback.narrowed_dimensions, + "best_spec_fields": feedback.best_spec_fields, + "transfer_scores": feedback.transfer_scores, + } + self._write_json(path, payload) + return path + + def load_lesson_feedback(self, rung: int) -> LessonFeedback | None: + path = self.rung_dir(rung) / "lesson_feedback.json" + if not path.exists(): + return None + data = json.loads(path.read_text(encoding="utf-8")) + rules = [SearchRule(**r) for r in data.get("rules", [])] + return LessonFeedback( + rung=data["rung"], + rules=rules, + narrowed_dimensions=data.get("narrowed_dimensions", {}), + best_spec_fields=data.get("best_spec_fields", {}), + transfer_scores=data.get("transfer_scores", {}), + ) + + def save_progress(self, progress: RungProgress) -> Path: + path = self.rung_dir(progress.rung) / "progress.json" + self._write_json(path, asdict(progress)) + return path + + def load_progress(self, rung: int) -> RungProgress | None: + path = self.rung_dir(rung) / "progress.json" + if not path.exists(): + return None + data = json.loads(path.read_text(encoding="utf-8")) + return RungProgress(**data) + + def save_propagated_spec(self, rung: int, spec: ExperimentSpec) -> Path: + path = self.rung_dir(rung) / "propagated_spec.json" + self._write_json(path, asdict(spec)) + return path + + def load_propagated_spec(self, rung: int) -> dict[str, Any] | None: + path = self.rung_dir(rung) / "propagated_spec.json" + if not path.exists(): + return None + return json.loads(path.read_text(encoding="utf-8")) diff --git a/src/autoresearch_quantum/ratchet/__init__.py b/src/autoresearch_quantum/ratchet/__init__.py new file mode 100644 index 0000000..8218429 --- /dev/null +++ b/src/autoresearch_quantum/ratchet/__init__.py @@ -0,0 +1 @@ +"""Ratchet orchestration.""" diff --git a/src/autoresearch_quantum/ratchet/runner.py b/src/autoresearch_quantum/ratchet/runner.py new file mode 100644 index 0000000..cc8e52e --- /dev/null +++ b/src/autoresearch_quantum/ratchet/runner.py @@ -0,0 +1,441 @@ +from __future__ import annotations + +import json +import logging +from dataclasses import asdict, replace +from typing import Any + +from ..execution.local import LocalCheapExecutor +from ..lessons.extractor import extract_rung_lesson +from ..models import ( + EvaluationMetrics, + ExperimentRecord, + ExperimentSpec, + LessonFeedback, + RatchetStepRecord, + RungConfig, + RungProgress, + TierResult, + generate_experiment_id, +) +from ..persistence.store import ResearchStore +from ..search.challengers import GeneratedChallenger, mutation_summary +from ..search.strategies import CompositeGenerator, default_composite + +logger = logging.getLogger(__name__) + + +def _from_dict_spec(payload: dict[str, Any]) -> ExperimentSpec: + if payload.get("initial_layout") is not None: + payload = dict(payload) + payload["initial_layout"] = tuple(payload["initial_layout"]) + return ExperimentSpec(**payload) + + +def _record_from_json(payload: dict[str, Any]) -> ExperimentRecord: + cheap = _tier_result_from_dict(payload["cheap_result"]) + expensive = _tier_result_from_dict(payload["expensive_result"]) if payload.get("expensive_result") else None + return ExperimentRecord( + experiment_id=payload["experiment_id"], + rung=int(payload["rung"]), + role=payload["role"], + parent_incumbent_id=payload.get("parent_incumbent_id"), + mutation_note=payload.get("mutation_note", ""), + spec=_from_dict_spec(payload["spec"]), + cheap_result=cheap, # type: ignore[arg-type] + expensive_result=expensive, # type: ignore[arg-type] + final_score=float(payload.get("final_score", 0.0)), + promoted_to_expensive=bool(payload.get("promoted_to_expensive", False)), + became_incumbent=bool(payload.get("became_incumbent", False)), + created_at=payload.get("created_at", ""), + ) + + +def _metrics_from_dict(payload: dict[str, Any]) -> EvaluationMetrics: + return EvaluationMetrics(**payload) + + +def _tier_result_from_dict(payload: dict[str, Any]) -> TierResult: + return TierResult( + tier=payload["tier"], + score=float(payload["score"]), + quality_estimate=float(payload["quality_estimate"]), + metrics=_metrics_from_dict(payload["metrics"]), + counts_summary=dict(payload.get("counts_summary", {})), + notes=list(payload.get("notes", [])), + created_at=payload.get("created_at", ""), + ) + + +class AutoresearchHarness: + def __init__(self, store: ResearchStore) -> None: + self.store = store + self.local_executor = LocalCheapExecutor() + self._hardware_executor: Any = None # Lazy-loaded + self._experiment_history: set[str] = set() + self._accumulated_lessons: list[LessonFeedback] = [] + + @property + def hardware_executor(self) -> Any: + if self._hardware_executor is None: + from ..execution.hardware import IBMHardwareExecutor + self._hardware_executor = IBMHardwareExecutor() + return self._hardware_executor + + def _build_history(self, rung: int) -> set[str]: + """Collect fingerprints of all experiments already tried in this rung.""" + experiments = self.store.list_experiments(rung) + return { + ExperimentSpec(**{ + k: tuple(v) if k == "initial_layout" and isinstance(v, list) else v + for k, v in exp["spec"].items() + }).fingerprint() + for exp in experiments + } + + def _get_challenger_generator(self) -> CompositeGenerator: + return default_composite(has_lessons=bool(self._accumulated_lessons)) + + def _evaluate_record( + self, + spec: ExperimentSpec, + rung_config: RungConfig, + role: str, + parent_incumbent_id: str | None, + mutation_note: str, + promote_to_hardware: bool = False, + ) -> ExperimentRecord: + cheap_result = self.local_executor.evaluate(spec, rung_config) + record = ExperimentRecord( + experiment_id=generate_experiment_id(spec, role), + rung=spec.rung, + role=role, + parent_incumbent_id=parent_incumbent_id, + mutation_note=mutation_note, + spec=spec, + cheap_result=cheap_result, + final_score=cheap_result.score, + ) + if promote_to_hardware and rung_config.tier_policy.enable_hardware: + expensive_result = self.hardware_executor.evaluate(spec, rung_config) + record.expensive_result = expensive_result + record.promoted_to_expensive = True + record.final_score = expensive_result.score + self.store.save_experiment(record) + self._experiment_history.add(spec.fingerprint()) + return record + + def _load_incumbent(self, rung: int) -> ExperimentRecord | None: + experiment_id = self.store.load_incumbent_id(rung) + if experiment_id is None: + return None + payload = self.store.load_experiment(rung, experiment_id) + return _record_from_json(payload) + + def ensure_incumbent(self, rung_config: RungConfig) -> ExperimentRecord: + incumbent = self._load_incumbent(rung_config.rung) + if incumbent is not None: + return incumbent + incumbent = self._evaluate_record( + rung_config.bootstrap_incumbent, + rung_config, + role="incumbent", + parent_incumbent_id=None, + mutation_note="bootstrap incumbent", + promote_to_hardware=False, + ) + incumbent.became_incumbent = True + self.store.save_experiment(incumbent) + self.store.set_incumbent(rung_config.rung, incumbent.experiment_id) + return incumbent + + def run_single_experiment( + self, + spec: ExperimentSpec, + rung_config: RungConfig, + role: str = "challenger", + parent_incumbent_id: str | None = None, + mutation_note: str = "direct run", + promote_to_hardware: bool = False, + ) -> ExperimentRecord: + return self._evaluate_record( + spec, + rung_config, + role=role, + parent_incumbent_id=parent_incumbent_id, + mutation_note=mutation_note, + promote_to_hardware=promote_to_hardware, + ) + + def run_challenger_set(self, rung_config: RungConfig) -> list[ExperimentRecord]: + incumbent = self.ensure_incumbent(rung_config) + history = self._build_history(rung_config.rung) | self._experiment_history + generator = self._get_challenger_generator() + challengers = generator.generate( + incumbent.spec, + rung_config.search_space, + history, + self._accumulated_lessons, + ) + records: list[ExperimentRecord] = [] + for challenger in challengers: + records.append( + self._evaluate_record( + challenger.spec, + rung_config, + role="challenger", + parent_incumbent_id=incumbent.experiment_id, + mutation_note=challenger.mutation_note, + promote_to_hardware=False, + ) + ) + return records + + def run_ratchet_step(self, rung_config: RungConfig, allow_hardware: bool = False) -> RatchetStepRecord: + incumbent = self.ensure_incumbent(rung_config) + history = self._build_history(rung_config.rung) | self._experiment_history + generator = self._get_challenger_generator() + challengers = generator.generate( + incumbent.spec, + rung_config.search_space, + history, + self._accumulated_lessons, + ) + + challenger_records: list[ExperimentRecord] = [] + for challenger in challengers: + challenger_records.append( + self._evaluate_record( + challenger.spec, + rung_config, + role="challenger", + parent_incumbent_id=incumbent.experiment_id, + mutation_note=challenger.mutation_note, + promote_to_hardware=False, + ) + ) + + if not challenger_records: + logger.info("No new challengers generated (search space exhausted for rung %d)", rung_config.rung) + + incumbent_cheap = incumbent.cheap_result.score + promoted = [ + record + for record in sorted( + challenger_records, + key=lambda item: item.cheap_result.score, + reverse=True, + ) + if record.cheap_result.score > (incumbent_cheap + rung_config.tier_policy.cheap_margin) + ][: rung_config.tier_policy.promote_top_k] + + expensive_tier_result = "Hardware tier disabled." + if ( + allow_hardware + and rung_config.tier_policy.enable_hardware + and promoted + ): + if rung_config.tier_policy.confirm_incumbent_on_hardware and not incumbent.promoted_to_expensive: + incumbent = self._evaluate_record( + incumbent.spec, + rung_config, + role=incumbent.role, + parent_incumbent_id=incumbent.parent_incumbent_id, + mutation_note=incumbent.mutation_note, + promote_to_hardware=True, + ) + incumbent.became_incumbent = True + self.store.save_experiment(incumbent) + self.store.set_incumbent(rung_config.rung, incumbent.experiment_id) + + promoted = [ + self._evaluate_record( + record.spec, + rung_config, + role=record.role, + parent_incumbent_id=record.parent_incumbent_id, + mutation_note=record.mutation_note, + promote_to_hardware=True, + ) + for record in promoted[: rung_config.tier_policy.hardware_budget or len(promoted)] + ] + expensive_tier_result = ( + f"Promoted {len(promoted)} challengers to hardware confirmation on " + f"{rung_config.hardware.backend_name or rung_config.bootstrap_incumbent.target_backend}." + ) + + candidates = [incumbent, *promoted] if promoted else [incumbent, *challenger_records] + winner = max(candidates, key=lambda record: record.final_score) + winning_margin = winner.final_score - incumbent.final_score + if winner.experiment_id != incumbent.experiment_id and winning_margin > rung_config.tier_policy.confirmation_margin: + winner = replace(winner, became_incumbent=True) + self.store.save_experiment(winner) + self.store.set_incumbent(rung_config.rung, winner.experiment_id) + + cheap_tier_justification = ( + "Promoted challengers beat the incumbent on cheap-tier score by at least " + f"{rung_config.tier_policy.cheap_margin:.4f}." + if promoted + else "No challenger cleared the cheap-tier promotion margin." + ) + distilled_lesson = self._distill_lesson(incumbent, winner, promoted) + + step = RatchetStepRecord( + step_index=len(self.store.list_ratchet_steps(rung_config.rung)) + 1, + rung=rung_config.rung, + incumbent_before_id=incumbent.experiment_id, + challengers_tested=[record.experiment_id for record in challenger_records], + promoted_challengers=[record.experiment_id for record in promoted], + winner_id=winner.experiment_id, + winning_margin=winning_margin, + cheap_tier_justification=cheap_tier_justification, + expensive_tier_result=expensive_tier_result, + distilled_lesson=distilled_lesson, + ) + self.store.save_ratchet_step(step) + return step + + def run_rung( + self, + rung_config: RungConfig, + allow_hardware: bool = False, + ) -> tuple[list[RatchetStepRecord], Any, LessonFeedback]: + # Check for resumable progress + progress = self.store.load_progress(rung_config.rung) + if progress and not progress.completed: + steps_done = progress.steps_completed + patience_left = progress.patience_remaining + baseline_incumbent = progress.current_incumbent_id + logger.info( + "Resuming rung %d from step %d (patience=%d)", + rung_config.rung, steps_done, patience_left, + ) + else: + steps_done = 0 + patience_left = rung_config.patience + baseline_incumbent = self.ensure_incumbent(rung_config).experiment_id + + steps: list[RatchetStepRecord] = [] + + for step_idx in range(steps_done, rung_config.step_budget): + step = self.run_ratchet_step(rung_config, allow_hardware=allow_hardware) + steps.append(step) + + if step.winner_id == baseline_incumbent: + patience_left -= 1 + else: + baseline_incumbent = step.winner_id + patience_left = rung_config.patience + + # Save progress after each step + self.store.save_progress(RungProgress( + rung=rung_config.rung, + steps_completed=step_idx + 1, + patience_remaining=patience_left, + current_incumbent_id=baseline_incumbent, + completed=False, + )) + + if patience_left <= 0: + break + + # Mark rung as completed + self.store.save_progress(RungProgress( + rung=rung_config.rung, + steps_completed=steps_done + len(steps), + patience_remaining=patience_left, + current_incumbent_id=baseline_incumbent, + completed=True, + )) + + lesson, feedback = extract_rung_lesson( + rung_config, + self.store.list_experiments(rung_config.rung), + self.store.list_ratchet_steps(rung_config.rung), + ) + self.store.save_lesson(lesson) + self.store.save_lesson_feedback(feedback) + self._accumulated_lessons.append(feedback) + + return steps, lesson, feedback + + def run_ratchet( + self, + rung_configs: list[RungConfig], + allow_hardware: bool = False, + ) -> list[tuple[Any, LessonFeedback]]: + """Run multiple rungs in sequence, propagating winners and lessons.""" + results: list[tuple[Any, LessonFeedback]] = [] + self._accumulated_lessons = [] + + for i, rung_config in enumerate(rung_configs): + # Propagate winner from previous rung as bootstrap + if i > 0 and results: + prev_feedback = results[-1][1] + if prev_feedback.best_spec_fields: + propagated_spec = self._propagate_spec( + prev_feedback.best_spec_fields, + rung_config, + ) + rung_config = replace( + rung_config, + bootstrap_incumbent=propagated_spec, + ) + logger.info( + "Propagated winner from rung %d -> rung %d bootstrap", + rung_configs[i - 1].rung, + rung_config.rung, + ) + # Save propagated spec for traceability + self.store.save_propagated_spec(rung_config.rung, propagated_spec) + + # Narrow search space based on accumulated lessons + if prev_feedback.narrowed_dimensions: + from ..lessons.feedback import narrow_search_space + narrowed = narrow_search_space( + rung_config.search_space, + [r for fb in self._accumulated_lessons for r in fb.rules], + ) + rung_config = replace(rung_config, search_space=narrowed) + + steps, lesson, feedback = self.run_rung(rung_config, allow_hardware=allow_hardware) + results.append((lesson, feedback)) + + return results + + def _propagate_spec( + self, + best_fields: dict[str, Any], + target_config: RungConfig, + ) -> ExperimentSpec: + """Build a new ExperimentSpec for the next rung from previous winner fields.""" + target_spec = target_config.bootstrap_incumbent + # Only override fields that exist in ExperimentSpec and are in the best_fields + valid_fields = set(ExperimentSpec.__dataclass_fields__.keys()) + updates: dict[str, Any] = {} + for key, value in best_fields.items(): + if key in valid_fields and key != "rung": + updates[key] = value + # Override rung to match the target + updates["rung"] = target_config.rung + return target_spec.with_updates(**updates) + + def _distill_lesson( + self, + incumbent: ExperimentRecord, + winner: ExperimentRecord, + promoted: list[ExperimentRecord], + ) -> str: + if winner.experiment_id == incumbent.experiment_id: + return ( + "No ratchet this step: the incumbent remained best because challengers failed to " + f"overcome {incumbent.best_result.metrics.dominant_failure_mode}." + ) + + change_note = mutation_summary(incumbent.spec, winner.spec) + confirmation = "hardware-confirmed" if winner.promoted_to_expensive else "cheap-tier" + return ( + f"{change_note} became the new incumbent on {confirmation} score. " + f"It improved final score by {winner.final_score - incumbent.final_score:+.4f}; " + f"{len(promoted)} challengers were strong enough to justify promotion." + ) diff --git a/src/autoresearch_quantum/scoring/__init__.py b/src/autoresearch_quantum/scoring/__init__.py new file mode 100644 index 0000000..97a4a3c --- /dev/null +++ b/src/autoresearch_quantum/scoring/__init__.py @@ -0,0 +1 @@ +"""Scalar scoring utilities.""" diff --git a/src/autoresearch_quantum/scoring/score.py b/src/autoresearch_quantum/scoring/score.py new file mode 100644 index 0000000..98bddad --- /dev/null +++ b/src/autoresearch_quantum/scoring/score.py @@ -0,0 +1,150 @@ +from __future__ import annotations + +from typing import Callable + +from ..models import EvaluationMetrics, FactoryMetrics, QualityWeights, ScoreConfig + + +def _clamp(value: float | None) -> float | None: + if value is None: + return None + return max(0.0, min(1.0, value)) + + +def _quality_components(metrics: EvaluationMetrics, weights: QualityWeights) -> dict[str, float | None]: + spectator_alignment = None + if metrics.spectator_logical_z is not None: + spectator_alignment = (1.0 + metrics.spectator_logical_z) / 2.0 + return { + "ideal_fidelity": _clamp(metrics.ideal_encoded_fidelity), + "noisy_fidelity": _clamp(metrics.noisy_encoded_fidelity), + "logical_witness": _clamp(metrics.logical_magic_witness), + "codespace_rate": _clamp(metrics.codespace_rate), + "stability_score": _clamp(metrics.stability_score), + "spectator_alignment": _clamp(spectator_alignment), + } + + +def weighted_acceptance_cost( + metrics: EvaluationMetrics, + tier: str, + config: ScoreConfig, +) -> tuple[float, float, float]: + weights = config.cheap_quality if tier == "cheap" else config.expensive_quality + values = _quality_components(metrics, weights) + weight_map = { + "ideal_fidelity": weights.ideal_fidelity, + "noisy_fidelity": weights.noisy_fidelity, + "logical_witness": weights.logical_witness, + "codespace_rate": weights.codespace_rate, + "stability_score": weights.stability_score, + "spectator_alignment": weights.spectator_alignment, + } + weighted_sum = 0.0 + total_weight = 0.0 + for key, weight in weight_map.items(): + value = values[key] + if weight <= 0 or value is None: + continue + weighted_sum += weight * value + total_weight += weight + + quality = weighted_sum / total_weight if total_weight else 0.0 + cost = ( + config.base_cost + + (config.cost_weights.two_qubit_count * metrics.two_qubit_count) + + (config.cost_weights.depth * metrics.depth) + + (config.cost_weights.shot_count * metrics.shot_count) + + (config.cost_weights.runtime_estimate * metrics.runtime_estimate) + + (config.cost_weights.queue_cost_proxy * metrics.queue_cost_proxy) + ) + metrics.total_cost = cost + score = (quality * metrics.acceptance_rate) / max(cost, 1e-9) + return score, quality, cost + + +def factory_throughput_score( + metrics: EvaluationMetrics, + tier: str, + config: ScoreConfig, +) -> tuple[float, float, float]: + """Score optimised for accepted magic states per unit cost. + + Computes FactoryMetrics as a side-effect attached to metrics.extra. + """ + weights = config.cheap_quality if tier == "cheap" else config.expensive_quality + values = _quality_components(metrics, weights) + weight_map = { + "ideal_fidelity": weights.ideal_fidelity, + "noisy_fidelity": weights.noisy_fidelity, + "logical_witness": weights.logical_witness, + "codespace_rate": weights.codespace_rate, + "stability_score": weights.stability_score, + "spectator_alignment": weights.spectator_alignment, + } + weighted_sum = 0.0 + total_weight = 0.0 + for key, weight in weight_map.items(): + value = values[key] + if weight <= 0 or value is None: + continue + weighted_sum += weight * value + total_weight += weight + + quality = weighted_sum / total_weight if total_weight else 0.0 + + # Cost with heavier penalty + cost = ( + config.base_cost + + (config.cost_weights.two_qubit_count * metrics.two_qubit_count * 1.5) + + (config.cost_weights.depth * metrics.depth * 1.5) + + (config.cost_weights.shot_count * metrics.shot_count) + + (config.cost_weights.runtime_estimate * metrics.runtime_estimate) + + (config.cost_weights.queue_cost_proxy * metrics.queue_cost_proxy) + ) + metrics.total_cost = cost + + # Factory-specific metrics + acceptance = metrics.acceptance_rate + witness = metrics.logical_magic_witness or 0.0 + logical_error = max(0.0, 1.0 - witness) + accepted_per_shot = acceptance + accepted_per_cost = acceptance / max(cost, 1e-9) + cost_per_accepted = cost / max(acceptance, 1e-9) + quality_yield = quality * acceptance + throughput_proxy = acceptance * witness / max(cost, 1e-9) + + factory = FactoryMetrics( + accepted_states_per_shot=accepted_per_shot, + logical_error_per_accepted=logical_error, + accepted_per_unit_cost=accepted_per_cost, + quality_yield=quality_yield, + cost_per_accepted=cost_per_accepted, + throughput_proxy=throughput_proxy, + ) + metrics.extra["factory_metrics"] = { + "accepted_states_per_shot": factory.accepted_states_per_shot, + "logical_error_per_accepted": factory.logical_error_per_accepted, + "accepted_per_unit_cost": factory.accepted_per_unit_cost, + "quality_yield": factory.quality_yield, + "cost_per_accepted": factory.cost_per_accepted, + "throughput_proxy": factory.throughput_proxy, + } + + # Score = throughput proxy (acceptance * witness / cost) + score = throughput_proxy + return score, quality, cost + + +SCORE_REGISTRY: dict[str, Callable[[EvaluationMetrics, str, ScoreConfig], tuple[float, float, float]]] = { + "weighted_acceptance_cost": weighted_acceptance_cost, + "factory_throughput": factory_throughput_score, +} + + +def score_metrics(metrics: EvaluationMetrics, tier: str, config: ScoreConfig) -> tuple[float, float, float]: + try: + score_fn = SCORE_REGISTRY[config.name] + except KeyError as exc: + raise ValueError(f"Unknown score function: {config.name}") from exc + return score_fn(metrics, tier, config) diff --git a/src/autoresearch_quantum/search/__init__.py b/src/autoresearch_quantum/search/__init__.py new file mode 100644 index 0000000..75f42a7 --- /dev/null +++ b/src/autoresearch_quantum/search/__init__.py @@ -0,0 +1 @@ +"""Challenger generation.""" diff --git a/src/autoresearch_quantum/search/challengers.py b/src/autoresearch_quantum/search/challengers.py new file mode 100644 index 0000000..ee8dce8 --- /dev/null +++ b/src/autoresearch_quantum/search/challengers.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from ..models import ExperimentSpec, SearchSpaceConfig + + +@dataclass(frozen=True) +class GeneratedChallenger: + spec: ExperimentSpec + mutation_note: str + + +def generate_neighbor_challengers( + incumbent: ExperimentSpec, + search_space: SearchSpaceConfig, + history: set[str] | None = None, +) -> list[GeneratedChallenger]: + challengers: list[GeneratedChallenger] = [] + seen: set[str] = set(history or set()) + + for field_name, values in search_space.dimensions.items(): + current = getattr(incumbent, field_name) + for value in values: + normalized = tuple(value) if field_name == "initial_layout" and isinstance(value, list) else value + if normalized == current: + continue + candidate = incumbent.with_updates(**{field_name: normalized}) + fingerprint = candidate.fingerprint() + if fingerprint in seen: + continue + seen.add(fingerprint) + challengers.append( + GeneratedChallenger( + spec=candidate, + mutation_note=f"{field_name}: {current} -> {normalized}", + ) + ) + if len(challengers) >= search_space.max_challengers_per_step: + return challengers + + return challengers + + +def mutation_summary(parent: ExperimentSpec, child: ExperimentSpec) -> str: + changes: list[str] = [] + for field_name in parent.__dataclass_fields__: + if getattr(parent, field_name) != getattr(child, field_name): + changes.append( + f"{field_name}: {getattr(parent, field_name)} -> {getattr(child, field_name)}" + ) + return ", ".join(changes) if changes else "no mutation" diff --git a/src/autoresearch_quantum/search/strategies.py b/src/autoresearch_quantum/search/strategies.py new file mode 100644 index 0000000..c9b9941 --- /dev/null +++ b/src/autoresearch_quantum/search/strategies.py @@ -0,0 +1,277 @@ +from __future__ import annotations + +import random +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any + +from ..models import ExperimentSpec, LessonFeedback, SearchRule, SearchSpaceConfig +from .challengers import GeneratedChallenger + + +class ChallengerStrategy(ABC): + @abstractmethod + def generate( + self, + incumbent: ExperimentSpec, + search_space: SearchSpaceConfig, + history: set[str], + lessons: list[LessonFeedback] | None = None, + ) -> list[GeneratedChallenger]: + ... + + +class NeighborWalk(ChallengerStrategy): + """Single-axis perturbation — the original Codex strategy, kept as baseline.""" + + def generate( + self, + incumbent: ExperimentSpec, + search_space: SearchSpaceConfig, + history: set[str], + lessons: list[LessonFeedback] | None = None, + ) -> list[GeneratedChallenger]: + challengers: list[GeneratedChallenger] = [] + seen = set(history) + + for field_name, values in search_space.dimensions.items(): + current = getattr(incumbent, field_name) + for value in values: + normalized = tuple(value) if field_name == "initial_layout" and isinstance(value, list) else value + if normalized == current: + continue + candidate = incumbent.with_updates(**{field_name: normalized}) + fp = candidate.fingerprint() + if fp in seen: + continue + seen.add(fp) + challengers.append( + GeneratedChallenger( + spec=candidate, + mutation_note=f"neighbor: {field_name}: {current} -> {normalized}", + ) + ) + if len(challengers) >= search_space.max_challengers_per_step: + return challengers + return challengers + + +class RandomCombo(ChallengerStrategy): + """Pick 1–3 random dimensions and mutate them simultaneously.""" + + def __init__(self, num_candidates: int = 6, max_mutations: int = 3) -> None: + self.num_candidates = num_candidates + self.max_mutations = max_mutations + + def generate( + self, + incumbent: ExperimentSpec, + search_space: SearchSpaceConfig, + history: set[str], + lessons: list[LessonFeedback] | None = None, + ) -> list[GeneratedChallenger]: + challengers: list[GeneratedChallenger] = [] + seen = set(history) + dim_names = list(search_space.dimensions.keys()) + if not dim_names: + return challengers + + attempts = 0 + max_attempts = self.num_candidates * 5 + + while len(challengers) < self.num_candidates and attempts < max_attempts: + attempts += 1 + n_dims = random.randint(1, min(self.max_mutations, len(dim_names))) + chosen_dims = random.sample(dim_names, n_dims) + updates: dict[str, Any] = {} + mutation_parts: list[str] = [] + + for dim in chosen_dims: + values = search_space.dimensions[dim] + current = getattr(incumbent, dim) + alternatives = [v for v in values if v != current] + if not alternatives: + continue + new_val = random.choice(alternatives) + if dim == "initial_layout" and isinstance(new_val, list): + new_val = tuple(new_val) + updates[dim] = new_val + mutation_parts.append(f"{dim}: {current} -> {new_val}") + + if not updates: + continue + + candidate = incumbent.with_updates(**updates) + fp = candidate.fingerprint() + if fp in seen: + continue + seen.add(fp) + challengers.append( + GeneratedChallenger( + spec=candidate, + mutation_note=f"combo: {', '.join(mutation_parts)}", + ) + ) + + return challengers + + +class LessonGuided(ChallengerStrategy): + """Use SearchRules from lessons to bias generation toward promising regions.""" + + def __init__(self, num_candidates: int = 4) -> None: + self.num_candidates = num_candidates + + def generate( + self, + incumbent: ExperimentSpec, + search_space: SearchSpaceConfig, + history: set[str], + lessons: list[LessonFeedback] | None = None, + ) -> list[GeneratedChallenger]: + if not lessons: + return [] + + # Collect all rules across rungs + all_rules: list[SearchRule] = [] + for feedback in lessons: + all_rules.extend(feedback.rules) + + if not all_rules: + return [] + + # Build preference/avoidance maps + prefer: dict[str, list[tuple[Any, float]]] = {} + avoid: dict[str, set[Any]] = {} + fix: dict[str, Any] = {} + + for rule in all_rules: + if rule.action == "prefer": + prefer.setdefault(rule.dimension, []).append((rule.value, rule.confidence)) + elif rule.action == "avoid": + avoid.setdefault(rule.dimension, set()).add(rule.value) + elif rule.action == "fix": + fix[rule.dimension] = rule.value + + challengers: list[GeneratedChallenger] = [] + seen = set(history) + + for _ in range(self.num_candidates * 3): + if len(challengers) >= self.num_candidates: + break + + updates: dict[str, Any] = {} + mutation_parts: list[str] = [] + + # Apply "fix" rules first + for dim, value in fix.items(): + if dim in search_space.dimensions: + current = getattr(incumbent, dim) + if value != current: + normalized = tuple(value) if dim == "initial_layout" and isinstance(value, list) else value + updates[dim] = normalized + mutation_parts.append(f"fix({dim}): {current} -> {normalized}") + + # Then apply "prefer" rules probabilistically + for dim, preferences in prefer.items(): + if dim in fix or dim not in search_space.dimensions: + continue + current = getattr(incumbent, dim) + avoided = avoid.get(dim, set()) + # Weighted sampling from preferred values + candidates = [(v, c) for v, c in preferences if v != current and v not in avoided] + if not candidates and random.random() < 0.5: + # Sometimes also try non-preferred, non-avoided values + all_vals = [v for v in search_space.dimensions[dim] if v != current and v not in avoided] + if all_vals: + val = random.choice(all_vals) + normalized = tuple(val) if dim == "initial_layout" and isinstance(val, list) else val + updates[dim] = normalized + mutation_parts.append(f"explore({dim}): {current} -> {normalized}") + elif candidates: + # Weight by confidence + total = sum(c for _, c in candidates) + r = random.random() * total + cumulative = 0.0 + chosen = candidates[0][0] + for val, conf in candidates: + cumulative += conf + if r <= cumulative: + chosen = val + break + normalized = tuple(chosen) if dim == "initial_layout" and isinstance(chosen, list) else chosen + updates[dim] = normalized + mutation_parts.append(f"guided({dim}): {current} -> {normalized}") + + if not updates: + continue + + candidate = incumbent.with_updates(**updates) + fp = candidate.fingerprint() + if fp in seen: + continue + seen.add(fp) + challengers.append( + GeneratedChallenger( + spec=candidate, + mutation_note=f"lesson: {', '.join(mutation_parts)}", + ) + ) + + return challengers + + +@dataclass +class StrategyWeight: + strategy: ChallengerStrategy + weight: float + + +class CompositeGenerator(ChallengerStrategy): + """Weighted combination of multiple strategies. Allocates budget proportionally.""" + + def __init__(self, strategies: list[StrategyWeight]) -> None: + self.strategies = strategies + + def generate( + self, + incumbent: ExperimentSpec, + search_space: SearchSpaceConfig, + history: set[str], + lessons: list[LessonFeedback] | None = None, + ) -> list[GeneratedChallenger]: + total_weight = sum(sw.weight for sw in self.strategies) + budget = search_space.max_challengers_per_step + all_challengers: list[GeneratedChallenger] = [] + seen = set(history) + + for sw in self.strategies: + allocation = max(1, int(budget * sw.weight / total_weight)) + sub_space = SearchSpaceConfig( + dimensions=search_space.dimensions, + max_challengers_per_step=allocation, + ) + new_challengers = sw.strategy.generate(incumbent, sub_space, seen, lessons) + for c in new_challengers: + fp = c.spec.fingerprint() + if fp not in seen: + seen.add(fp) + all_challengers.append(c) + if len(all_challengers) >= budget: + return all_challengers + + return all_challengers + + +def default_composite(has_lessons: bool = False) -> CompositeGenerator: + """Build the default composite generator with sensible weights.""" + strategies: list[StrategyWeight] = [ + StrategyWeight(NeighborWalk(), weight=0.4), + StrategyWeight(RandomCombo(), weight=0.3), + ] + if has_lessons: + strategies.append(StrategyWeight(LessonGuided(), weight=0.3)) + else: + # Without lessons, give more budget to exploration + strategies[1] = StrategyWeight(RandomCombo(num_candidates=8), weight=0.6) + return CompositeGenerator(strategies) diff --git a/tests/test_harness.py b/tests/test_harness.py new file mode 100644 index 0000000..7359d2e --- /dev/null +++ b/tests/test_harness.py @@ -0,0 +1,459 @@ +from __future__ import annotations + +from pathlib import Path + +from qiskit.quantum_info import Statevector + +from autoresearch_quantum.codes.four_two_two import STABILIZERS, encoded_magic_statevector +from autoresearch_quantum.experiments.encoded_magic_state import build_circuit_bundle +from autoresearch_quantum.execution.local import LocalCheapExecutor +from autoresearch_quantum.execution.transfer import TransferEvaluator +from autoresearch_quantum.lessons.feedback import ( + build_lesson_feedback, + extract_search_rules, + narrow_search_space, +) +from autoresearch_quantum.models import ( + CostWeights, + ExperimentSpec, + FactoryMetrics, + HardwareConfig, + LessonFeedback, + QualityWeights, + RungConfig, + RungProgress, + ScoreConfig, + SearchRule, + SearchSpaceConfig, + TierPolicyConfig, + TransferReport, +) +from autoresearch_quantum.persistence.store import ResearchStore +from autoresearch_quantum.ratchet.runner import AutoresearchHarness +from autoresearch_quantum.scoring.score import factory_throughput_score, score_metrics +from autoresearch_quantum.search.challengers import generate_neighbor_challengers +from autoresearch_quantum.search.strategies import ( + CompositeGenerator, + LessonGuided, + NeighborWalk, + RandomCombo, + StrategyWeight, + default_composite, +) + + +def _test_rung(search_dimensions: dict[str, list[object]] | None = None) -> RungConfig: + spec = ExperimentSpec( + rung=1, + target_backend="fake_brisbane", + noise_backend="fake_brisbane", + shots=64, + repeats=1, + ) + return RungConfig( + rung=1, + name="test", + description="test rung", + objective="test objective", + bootstrap_incumbent=spec, + search_space=SearchSpaceConfig( + dimensions=search_dimensions or {"verification": ["both", "z_only"]}, + max_challengers_per_step=4, + ), + tier_policy=TierPolicyConfig( + cheap_margin=0.0, + confirmation_margin=0.0, + cheap_shots=64, + expensive_shots=128, + cheap_repeats=1, + expensive_repeats=1, + promote_top_k=1, + enable_hardware=False, + confirm_incumbent_on_hardware=False, + hardware_budget=0, + ), + score=ScoreConfig( + cheap_quality=QualityWeights( + ideal_fidelity=0.2, + noisy_fidelity=0.3, + logical_witness=0.3, + codespace_rate=0.1, + stability_score=0.05, + spectator_alignment=0.05, + ), + expensive_quality=QualityWeights( + logical_witness=0.6, + codespace_rate=0.2, + stability_score=0.1, + spectator_alignment=0.1, + ), + cost_weights=CostWeights( + two_qubit_count=0.05, + depth=0.01, + shot_count=0.0001, + runtime_estimate=0.01, + queue_cost_proxy=0.0, + ), + ), + step_budget=1, + patience=1, + hardware=HardwareConfig(), + ) + + +# ── Original tests ────────────────────────────────────────────────────────── + +def test_encoded_target_state_satisfies_stabilizers() -> None: + state = encoded_magic_statevector() + assert isinstance(state, Statevector) + for stabilizer in STABILIZERS.values(): + expectation = state.expectation_value(stabilizer) + assert abs(expectation - 1.0) < 1e-8 + + +def test_circuit_bundle_contains_expected_contexts() -> None: + bundle = build_circuit_bundle(ExperimentSpec(rung=1)) + assert set(bundle.witness_circuits) == {"logical_x", "logical_y", "spectator_z"} + for name, circuit in bundle.witness_circuits.items(): + assert circuit.metadata["context"] == name + assert "logical_operator" in circuit.metadata + assert bundle.acceptance.metadata["context"] == "acceptance" + + +def test_local_executor_produces_score() -> None: + rung = _test_rung() + result = LocalCheapExecutor().evaluate(rung.bootstrap_incumbent, rung) + assert result.score > 0.0 + assert 0.0 <= result.metrics.acceptance_rate <= 1.0 + assert 0.0 <= (result.metrics.logical_magic_witness or 0.0) <= 1.0 + + +def test_neighbor_challengers_mutate_single_dimension() -> None: + incumbent = ExperimentSpec(rung=1) + search_space = SearchSpaceConfig( + dimensions={ + "verification": ["both", "z_only"], + "seed_style": ["h_p", "ry_rz"], + }, + max_challengers_per_step=8, + ) + challengers = generate_neighbor_challengers(incumbent, search_space) + assert len(challengers) == 2 + for challenger in challengers: + changed_fields = [ + field_name + for field_name in incumbent.__dataclass_fields__ + if getattr(incumbent, field_name) != getattr(challenger.spec, field_name) + ] + assert len(changed_fields) == 1 + + +def test_ratchet_step_persists_incumbent_and_step(tmp_path: Path) -> None: + rung = _test_rung({"verification": ["both", "z_only"], "postselection": ["all_measured", "z_only"]}) + harness = AutoresearchHarness(ResearchStore(tmp_path)) + step = harness.run_ratchet_step(rung, allow_hardware=False) + assert step.step_index == 1 + assert (tmp_path / "rung_1" / "incumbent.json").exists() + assert list((tmp_path / "rung_1" / "ratchet_steps").glob("*.json")) + + +# ── New tests: challenger strategies ──────────────────────────────────────── + +def test_neighbor_walk_respects_history() -> None: + incumbent = ExperimentSpec(rung=1) + search_space = SearchSpaceConfig( + dimensions={"verification": ["both", "z_only"], "seed_style": ["h_p", "ry_rz"]}, + max_challengers_per_step=8, + ) + # First pass: get all challengers + all_challengers = generate_neighbor_challengers(incumbent, search_space) + fps = {c.spec.fingerprint() for c in all_challengers} + # Second pass with history: should get nothing new + new_challengers = generate_neighbor_challengers(incumbent, search_space, history=fps) + assert len(new_challengers) == 0 + + +def test_random_combo_generates_multi_axis_mutations() -> None: + incumbent = ExperimentSpec(rung=1) + search_space = SearchSpaceConfig( + dimensions={ + "verification": ["both", "z_only", "x_only"], + "seed_style": ["h_p", "ry_rz", "u_magic"], + "optimization_level": [1, 2, 3], + }, + max_challengers_per_step=10, + ) + strategy = RandomCombo(num_candidates=10, max_mutations=3) + challengers = strategy.generate(incumbent, search_space, set()) + assert len(challengers) > 0 + # At least one challenger should mutate multiple dimensions + multi_axis = [ + c for c in challengers + if sum( + 1 for f in incumbent.__dataclass_fields__ + if getattr(incumbent, f) != getattr(c.spec, f) + ) > 1 + ] + # Probabilistic, but with 10 candidates and 3 dims it's extremely likely + assert len(multi_axis) > 0 + + +def test_lesson_guided_uses_rules() -> None: + incumbent = ExperimentSpec(rung=1) + search_space = SearchSpaceConfig( + dimensions={ + "verification": ["both", "z_only", "x_only"], + "seed_style": ["h_p", "ry_rz", "u_magic"], + }, + max_challengers_per_step=8, + ) + feedback = LessonFeedback( + rung=1, + rules=[ + SearchRule("verification", "prefer", "z_only", 0.8, "top performer"), + SearchRule("seed_style", "avoid", "h_p", 0.6, "consistently poor"), + SearchRule("seed_style", "fix", "ry_rz", 0.9, "all top-K use this"), + ], + narrowed_dimensions={}, + best_spec_fields={}, + ) + strategy = LessonGuided(num_candidates=6) + challengers = strategy.generate(incumbent, search_space, set(), [feedback]) + assert len(challengers) > 0 + # All challengers should have seed_style fixed to ry_rz (from fix rule) + for c in challengers: + assert c.spec.seed_style == "ry_rz" + + +def test_composite_generator_combines_strategies() -> None: + incumbent = ExperimentSpec(rung=1) + search_space = SearchSpaceConfig( + dimensions={ + "verification": ["both", "z_only", "x_only"], + "seed_style": ["h_p", "ry_rz", "u_magic"], + "optimization_level": [1, 2, 3], + }, + max_challengers_per_step=8, + ) + composite = default_composite(has_lessons=False) + challengers = composite.generate(incumbent, search_space, set()) + assert len(challengers) > 0 + assert len(challengers) <= 8 + + +# ── New tests: lesson feedback ─────��──────────────────────────────────────── + +def test_extract_search_rules_prefer_and_avoid() -> None: + search_space = SearchSpaceConfig( + dimensions={"verification": ["both", "z_only"]}, + max_challengers_per_step=4, + ) + records = [ + {"spec": {"verification": "z_only"}, "final_score": 0.8}, + {"spec": {"verification": "z_only"}, "final_score": 0.85}, + {"spec": {"verification": "z_only"}, "final_score": 0.82}, + {"spec": {"verification": "both"}, "final_score": 0.5}, + {"spec": {"verification": "both"}, "final_score": 0.55}, + {"spec": {"verification": "both"}, "final_score": 0.52}, + ] + rules = extract_search_rules(records, search_space) + actions = {(r.dimension, r.action, r.value) for r in rules} + assert ("verification", "prefer", "z_only") in actions + assert ("verification", "avoid", "both") in actions + + +def test_narrow_search_space_removes_avoided() -> None: + search_space = SearchSpaceConfig( + dimensions={ + "verification": ["both", "z_only", "x_only"], + "seed_style": ["h_p", "ry_rz", "u_magic"], + }, + max_challengers_per_step=8, + ) + rules = [ + SearchRule("verification", "avoid", "x_only", 0.5, "poor"), + SearchRule("seed_style", "fix", "ry_rz", 0.6, "best"), + ] + narrowed = narrow_search_space(search_space, rules) + assert "x_only" not in narrowed.dimensions["verification"] + assert narrowed.dimensions["seed_style"] == ["ry_rz"] + + +def test_build_lesson_feedback_end_to_end() -> None: + search_space = SearchSpaceConfig( + dimensions={"verification": ["both", "z_only"]}, + max_challengers_per_step=4, + ) + records = [ + {"spec": {"verification": "z_only"}, "final_score": 0.8}, + {"spec": {"verification": "z_only"}, "final_score": 0.85}, + {"spec": {"verification": "both"}, "final_score": 0.5}, + {"spec": {"verification": "both"}, "final_score": 0.55}, + ] + feedback = build_lesson_feedback(1, records, search_space) + assert feedback.rung == 1 + assert len(feedback.rules) > 0 + assert feedback.best_spec_fields["verification"] == "z_only" + + +# ── New tests: factory score ──────────────────────────────────────────────── + +def test_factory_throughput_score_produces_metrics() -> None: + from autoresearch_quantum.models import EvaluationMetrics + metrics = EvaluationMetrics( + ideal_encoded_fidelity=0.95, + noisy_encoded_fidelity=0.85, + logical_magic_witness=0.80, + acceptance_rate=0.70, + codespace_rate=0.65, + stability_score=0.90, + two_qubit_count=30, + depth=50, + shot_count=1024, + ) + config = ScoreConfig( + name="factory_throughput", + cheap_quality=QualityWeights( + noisy_fidelity=0.3, + logical_witness=0.4, + codespace_rate=0.2, + stability_score=0.1, + ), + ) + score, quality, cost = factory_throughput_score(metrics, "cheap", config) + assert score > 0.0 + assert quality > 0.0 + assert cost > 0.0 + assert "factory_metrics" in metrics.extra + fm = metrics.extra["factory_metrics"] + assert fm["accepted_states_per_shot"] == 0.70 + assert fm["throughput_proxy"] > 0.0 + + +def test_score_registry_has_factory() -> None: + from autoresearch_quantum.scoring.score import SCORE_REGISTRY + assert "factory_throughput" in SCORE_REGISTRY + + +# ── New tests: transfer evaluation ──────────────���─────────────────────────── + +def test_transfer_evaluator_runs_across_backends() -> None: + rung = _test_rung() + evaluator = TransferEvaluator() + report = evaluator.evaluate_across_backends( + rung.bootstrap_incumbent, + ["fake_brisbane"], # Use single backend for speed + rung, + ) + assert isinstance(report, TransferReport) + assert report.transfer_score > 0.0 + assert "fake_brisbane" in report.per_backend_scores + + +# ── New tests: persistence (progress, feedback) ───��──────────────────────── + +def test_save_and_load_progress(tmp_path: Path) -> None: + store = ResearchStore(tmp_path) + progress = RungProgress( + rung=1, + steps_completed=2, + patience_remaining=1, + current_incumbent_id="r1-incumbent-abc123", + completed=False, + ) + store.save_progress(progress) + loaded = store.load_progress(1) + assert loaded is not None + assert loaded.steps_completed == 2 + assert loaded.current_incumbent_id == "r1-incumbent-abc123" + assert not loaded.completed + + +def test_save_and_load_lesson_feedback(tmp_path: Path) -> None: + store = ResearchStore(tmp_path) + feedback = LessonFeedback( + rung=1, + rules=[SearchRule("verification", "prefer", "z_only", 0.8, "good")], + narrowed_dimensions={"verification": ["z_only"]}, + best_spec_fields={"verification": "z_only"}, + ) + store.save_lesson_feedback(feedback) + loaded = store.load_lesson_feedback(1) + assert loaded is not None + assert len(loaded.rules) == 1 + assert loaded.rules[0].dimension == "verification" + assert loaded.rules[0].action == "prefer" + + +# ── New tests: resumability in harness ────────────────────────────────────── + +def test_run_rung_saves_progress(tmp_path: Path) -> None: + rung = _test_rung({"verification": ["both", "z_only"]}) + store = ResearchStore(tmp_path) + harness = AutoresearchHarness(store) + steps, lesson, feedback = harness.run_rung(rung, allow_hardware=False) + assert len(steps) >= 1 + progress = store.load_progress(1) + assert progress is not None + assert progress.completed + + +def test_run_rung_returns_lesson_and_feedback(tmp_path: Path) -> None: + rung = _test_rung({"verification": ["both", "z_only"]}) + harness = AutoresearchHarness(ResearchStore(tmp_path)) + steps, lesson, feedback = harness.run_rung(rung, allow_hardware=False) + assert lesson.rung == 1 + assert isinstance(feedback, LessonFeedback) + assert feedback.rung == 1 + + +# ── New tests: cross-rung propagation ──────��──────────────────────────────── + +def test_run_ratchet_propagates_winner(tmp_path: Path) -> None: + rung1 = _test_rung({"verification": ["both", "z_only"]}) + rung2_spec = ExperimentSpec( + rung=2, + target_backend="fake_brisbane", + noise_backend="fake_brisbane", + shots=64, + repeats=1, + ) + rung2 = RungConfig( + rung=2, + name="test rung 2", + description="test rung 2", + objective="test objective 2", + bootstrap_incumbent=rung2_spec, + search_space=SearchSpaceConfig( + dimensions={"verification": ["both", "z_only"]}, + max_challengers_per_step=2, + ), + tier_policy=rung1.tier_policy, + score=rung1.score, + step_budget=1, + patience=1, + hardware=HardwareConfig(), + ) + + store = ResearchStore(tmp_path) + harness = AutoresearchHarness(store) + results = harness.run_ratchet([rung1, rung2], allow_hardware=False) + assert len(results) == 2 + # Both should have lesson + feedback + for lesson, feedback in results: + assert lesson is not None + assert isinstance(feedback, LessonFeedback) + # Accumulated lessons should have entries from both rungs + assert len(harness._accumulated_lessons) == 2 + + +# ── New tests: seed determinism fix ───────────────────────────────────────── + +def test_different_specs_get_different_seeds() -> None: + """Two specs with different fingerprints should produce different seeds.""" + import hashlib + spec_a = ExperimentSpec(rung=1, verification="both") + spec_b = ExperimentSpec(rung=1, verification="z_only") + seed_a = int(hashlib.sha256(f"{spec_a.fingerprint()}-0".encode()).hexdigest()[:8], 16) + seed_b = int(hashlib.sha256(f"{spec_b.fingerprint()}-0".encode()).hexdigest()[:8], 16) + assert seed_a != seed_b