Initial commit: autoresearch-quantum — automated magic-state preparation ratchet

Karpathy-style autoresearch engine for encoded magic-state preparation
on the [[4,2,2]] quantum error-detecting code using Qiskit Aer simulation.

Five-rung progressive search: baseline -> stability -> transfer -> factory -> Rosenfeld.
Smart challenger generation (neighbor walk + random combo + lesson-guided).
Machine-readable lesson feedback with per-dimension effects, interaction detection,
and cross-rung propagation. Factory throughput scoring. Resumable execution.
21 tests, all passing.
This commit is contained in:
saymrwulf 2026-04-04 17:39:15 +02:00
commit f9b8f3457f
38 changed files with 4605 additions and 0 deletions

6
.gitignore vendored Normal file
View file

@ -0,0 +1,6 @@
.venv/
__pycache__/
.pytest_cache/
*.pyc
*.egg-info/
data/

264
README.md Normal file
View file

@ -0,0 +1,264 @@
# Autoresearch Quantum
`autoresearch-quantum` is a Python research harness for a Karpathy-style autoresearch ratchet in quantum experiments:
- keep an incumbent experiment
- generate challenger experiments
- screen challengers on a cheap tier
- promote only justified challengers to an expensive tier
- replace the incumbent only when the challenger wins on the final criterion
- log every ratchet step
- extract a transferable lesson at the end of each rung
The first built-in experiment family targets encoded magic-state preparation in the `[[4,2,2]]` code with Qiskit. The framework is designed so the `[[4,2,2]]` rung is not the destination. It is the first rung in a ladder that shifts from best-circuit hunting toward reusable design rules for larger encoded workflows.
## Project Tree
```text
autoresearch-quantum/
├── configs/
│ └── rungs/
│ ├── rung1.yaml
│ ├── rung2.yaml
│ ├── rung3.yaml
│ └── rung4.yaml
├── src/
│ └── autoresearch_quantum/
│ ├── cli.py
│ ├── config.py
│ ├── models.py
│ ├── codes/
│ │ └── four_two_two.py
│ ├── experiments/
│ │ └── encoded_magic_state.py
│ ├── execution/
│ │ ├── analysis.py
│ │ ├── backends.py
│ │ ├── hardware.py
│ │ ├── local.py
│ │ └── transpile.py
│ ├── lessons/
│ │ └── extractor.py
│ ├── persistence/
│ │ └── store.py
│ ├── ratchet/
│ │ └── runner.py
│ ├── scoring/
│ │ └── score.py
│ └── search/
│ └── challengers.py
├── tests/
├── pyproject.toml
└── README.md
```
## Scientific Framing
### What is optimized
The harness optimizes an **experiment**, not just a circuit. A spec includes:
- logical magic-seed construction
- encoder realization
- verification strategy
- postselection rule
- ancilla strategy
- transpilation choices
- backend target and noise proxy
- shot and repeat allocation
### What is measured
The default score is:
```text
score = (usable_magic_quality * acceptance_rate) / total_cost
```
with a configurable `usable_magic_quality` assembled from:
- noisy encoded fidelity proxy
- logical magic witness
- codespace survival / postselection success
- stability under repeated noisy evaluation
- spectator logical alignment
and a configurable `total_cost` assembled from:
- two-qubit gate count
- transpiled depth
- total shots consumed
- runtime proxy
- hardware queue proxy
### Cheap tier vs expensive tier
Cheap tier:
- backend-aware transpilation
- noisy Aer evaluation
- density-matrix fidelity when a backend-derived noise model is available
- repeated local runs for stability scoring
Expensive tier:
- IBM Runtime execution through `SamplerV2`
- only used when enabled and when cheap-tier promotion thresholds are met
- isolated behind [`hardware.py`](/Users/oho/GitClone/CodexProjects/autoresearch-quantum/src/autoresearch_quantum/execution/hardware.py)
## Built-In `[[4,2,2]]` Experiment
The built-in experiment prepares an encoded logical T-state on one logical qubit of the `[[4,2,2]]` code while keeping the spectator logical qubit in `|0⟩`. The code utilities live in [`four_two_two.py`](/Users/oho/GitClone/CodexProjects/autoresearch-quantum/src/autoresearch_quantum/codes/four_two_two.py).
The harness evaluates:
- acceptance under optional `ZZZZ` and `XXXX` stabilizer checks
- logical `X` and `Y` witnesses for the encoded magic state
- spectator logical `Z`
- compiled cost after transpilation to a chosen backend target
This keeps the core scientific distinction explicit:
- a circuit can be locally good for `[[4,2,2]]`
- a rule is only valuable if it keeps helping across new backends or new rungs
## Installation
Create an isolated environment in the project root and install the package:
```bash
python3 -m venv .venv
. .venv/bin/activate
pip install -e '.[dev]'
```
For the optional IBM hardware path:
```bash
pip install -e '.[hardware,dev]'
```
If you want the CLI without installing editable mode, use `PYTHONPATH=src`.
## How To Run
### 1. Run a single local experiment
Use the rung config bootstrap incumbent as-is:
```bash
PYTHONPATH=src .venv/bin/python -m autoresearch_quantum run-experiment \
--config configs/rungs/rung1.yaml \
--store-dir data/demo
```
Override individual experiment fields:
```bash
PYTHONPATH=src .venv/bin/python -m autoresearch_quantum run-experiment \
--config configs/rungs/rung1.yaml \
--store-dir data/demo \
--set verification=z_only \
--set postselection=z_only \
--set ancilla_strategy=reused_single
```
### 2. Run one ratchet step
```bash
PYTHONPATH=src .venv/bin/python -m autoresearch_quantum run-step \
--config configs/rungs/rung1.yaml \
--store-dir data/demo
```
This will:
- load or bootstrap the incumbent
- generate neighbor challengers from the rung search space
- evaluate every challenger on the cheap tier
- promote only margin-beating challengers if hardware is enabled
- log the step and update the incumbent pointer if a challenger wins
### 3. Run one full rung
```bash
PYTHONPATH=src .venv/bin/python -m autoresearch_quantum run-rung \
--config configs/rungs/rung1.yaml \
--store-dir data/demo
```
Artifacts are persisted under `data/demo/rung_<n>/`:
- `experiments/*.json`
- `ratchet_steps/*.json`
- `incumbent.json`
- `lesson.json`
- `lesson.md`
### 4. Run a multi-rung ratchet campaign
```bash
PYTHONPATH=src .venv/bin/python -m autoresearch_quantum run-ratchet \
--config configs/rungs/rung1.yaml \
--config configs/rungs/rung2.yaml \
--config configs/rungs/rung3.yaml \
--config configs/rungs/rung4.yaml \
--store-dir data/campaign
```
### 5. Run an optional hardware-backed confirmation
First install the hardware extra and make IBM credentials available in the usual `qiskit-ibm-runtime` way. The simplest path is to export:
```bash
export QISKIT_IBM_TOKEN=...
```
Then enable the hardware tier in the rung config by setting `tier_policy.enable_hardware: true` and optionally `hardware.backend_name: ibm_brisbane`.
Run:
```bash
PYTHONPATH=src .venv/bin/python -m autoresearch_quantum run-step \
--config configs/rungs/rung1.yaml \
--store-dir data/hardware \
--hardware
```
Only challengers that beat the incumbent cheap-tier score by `tier_policy.cheap_margin` are promoted.
## Extending The Ladder
The intended progression is:
1. `rung1.yaml`
baseline `[[4,2,2]]` encoded magic-state preparation
2. `rung2.yaml`
same code with stronger stability and backend-awareness
3. `rung3.yaml`
transfer across backend families
4. `rung4.yaml`
factory-style cost pressure
To add a new rung:
- create a new YAML in `configs/rungs/`
- narrow the challenger space to the specific next question
- tune cheap and expensive score weights for that rung
- keep the lesson document as the real product
To add a new experiment family:
- implement a new builder under `src/autoresearch_quantum/experiments/`
- define the target state, witness operators, verification flow, and logging metadata
- route the ratchet to that experiment family through config or a new CLI selector
## Notes On Interpretation
This harness is explicit about proxy vs confirmation:
- cheap-tier fidelity and witness numbers are local proxies
- hardware runs are scarce and should be treated as confirmation
- the most important artifact of each rung is the lesson, not just the incumbent ID
That is the intended ratchet: better experiment plus better search rule.

682
THE_STORY.md Normal file
View file

@ -0,0 +1,682 @@
# The Story of autoresearch-quantum
## What this system does, in one paragraph
This is a machine that discovers, by itself, the best way to prepare an
encoded magic state on the [[4,2,2]] quantum error-detecting code. You give
it a starting recipe and a search space of alternatives. It runs hundreds of
simulated quantum experiments, scores them, learns which choices help and
which choices hurt, narrows the search, and climbs to the best recipe it can
find -- then hands you a written lesson explaining what it learned and why.
The entire loop -- propose, evaluate, compare, learn, repeat -- runs without
human intervention. That is the "auto" in autoresearch.
---
## Part 1: The quantum computing problem
### 1.1 What is a magic state?
Fault-tolerant quantum computers need a special ingredient called a **magic
state** to perform the T gate -- the non-Clifford gate that makes quantum
computation universal. You cannot create this state using Clifford operations
alone, so you prepare a noisy approximation and then **distill** it into a
high-fidelity copy. The preparation step is the bottleneck: if your raw magic
states are junk, distillation is expensive or impossible.
### 1.2 What is the [[4,2,2]] code?
The [[4,2,2]] code is the smallest quantum error-detecting code. It uses 4
physical qubits to encode 2 logical qubits. It cannot correct errors, but it
can *detect* them: if an error flips one qubit, the code's stabilizers
(XXXX and ZZZZ) flag it, and you can throw the shot away. This
**postselection** raises quality at the cost of throughput.
The code has two logical qubits. We use one to carry the magic state and the
other as a **spectator** -- an untouched qubit whose Z-measurement tells us
whether the encoding process corrupted the logical subspace.
### 1.3 What knobs does this system turn?
An experiment recipe (called an `ExperimentSpec`) has ~15 tuneable dimensions:
| Dimension | What it controls | Example values |
|---|---|---|
| `seed_style` | How the raw T-state is prepared on qubit 0 | `h_p`, `ry_rz`, `u_magic` |
| `encoder_style` | How the 4-qubit encoding circuit is built | `cx_chain`, `cz_compiled` |
| `verification` | Which stabilizers are measured before readout | `both`, `z_only`, `x_only`, `none` |
| `postselection` | Which syndrome outcomes cause a shot to be discarded | `all_measured`, `z_only`, `none` |
| `ancilla_strategy` | Whether verification uses 1 reused or 2 dedicated ancillas | `dedicated_pair`, `reused_single` |
| `optimization_level` | Qiskit transpiler aggressiveness | 1, 2, 3 |
| `layout_method` | Physical qubit placement algorithm | `sabre`, `dense` |
| `routing_method` | SWAP insertion algorithm | `sabre`, `basic` |
| `target_backend` | Which IBM device topology to compile for | `fake_brisbane`, `fake_kyoto`, ... |
| `shots` | Samples per circuit | 256 -- 4096 |
The question the system answers: **Which combination of these choices gives
the highest-quality encoded magic states at the lowest cost?**
### 1.4 How is each experiment evaluated?
For each `ExperimentSpec`, the executor:
1. **Builds four circuits** (`encoded_magic_state.py`):
- `acceptance` -- measures all data qubits in the Z basis after
verification, to compute the postselection acceptance rate.
- `logical_x` -- rotates into the X basis before measurement, to get
`<X_L>` on the magic-carrying logical qubit.
- `logical_y` -- rotates into the Y basis, to get `<Y_L>`.
- `spectator_z` -- measures the spectator logical qubit in Z, to get
`<Z_spectator>`.
2. **Transpiles** them for the target backend's coupling map and basis gates.
3. **Simulates** them on Qiskit Aer with the backend's calibrated noise model,
repeating the configured number of times with independent random seeds.
4. **Postselects**: for each shot, checks the syndrome register. Shots where
the stabiliser flagged an error are discarded. What remains is the
postselected ensemble.
5. **Computes metrics** from the postselected data:
| Metric | Formula | What it measures |
|---|---|---|
| `logical_magic_witness` | `((1 + (X_L + Y_L)/sqrt(2)) / 2) * ((1 + Z_spectator) / 2)` | Magic-state quality, penalised if spectator is disturbed |
| `acceptance_rate` | `accepted_shots / total_shots` | Throughput (what fraction survives postselection) |
| `stability_score` | `1 - pstdev(repeat_scores) / mean(repeat_scores)` | Consistency across independent repeat runs |
| `noisy_encoded_fidelity` | `Tr(rho_noisy \| target><target \|)` via density matrix simulation | How close the noisy state is to the ideal encoded T-state |
| `codespace_rate` | Mean acceptance across all four circuit types | Overall codespace survival |
| `two_qubit_count`, `depth` | From the transpiled circuits | Cost proxies |
6. **Scores** the experiment by combining these metrics into a single scalar:
```
score = (quality * acceptance_rate) / cost
```
where `quality` is a weighted sum of the metrics above (weights are
per-rung, configured in YAML) and `cost` accounts for gate count, depth,
shots, and estimated runtime.
---
## Part 2: The autoresearch engine (the meta layer)
This is a direct implementation of the **Karpathy autoresearch pattern**: an
automated loop that does what a diligent PhD student would do -- try things,
keep what works, learn why, zoom in, try harder things.
### 2.1 The ratchet metaphor
A ratchet is a mechanism that only moves forward. In this system:
- The **incumbent** is the best experiment found so far.
- Each **step**, the system generates **challengers** -- modified versions of
the incumbent -- evaluates them, and replaces the incumbent only if a
challenger beats it by a configured margin.
- The incumbent can only improve. It never regresses.
A **rung** is a complete search campaign: multiple ratchet steps, with a
patience counter that stops the rung early if the incumbent stops improving.
A **full ratchet** runs multiple rungs in sequence, each one asking a
progressively harder question.
### 2.2 The five rungs
```
Rung 1: "What preparation recipe works at all?"
|
| winner propagates down
v
Rung 2: "Is it stable across noisy backends?"
|
| winner propagates down, search space narrows
v
Rung 3: "Does it transfer to other devices?"
|
| winner propagates down, search space narrows further
v
Rung 4: "What maximises throughput per cost?"
|
| winner propagates down, only proven dimensions survive
v
Rung 5: "Which heuristics are load-bearing for distillation?"
```
Each rung is a YAML file (`configs/rungs/rung1.yaml` through `rung5.yaml`)
that configures:
- What to search over (the dimension grid)
- How to score (which quality metrics matter most)
- How hard to search (step budget, patience, promotion rules)
- Where to start (bootstrap incumbent)
The key insight: **the output of the system is not just the best circuit**.
It is the best circuit *plus a machine-readable set of rules* about what
worked and why, formatted so the next rung (or the next human) can pick up
where the machine left off.
### 2.3 The search strategies
The original Codex implementation had a single strategy: change one knob at
a time and see if the score improves. This is local hill-climbing. It
plateaus after one pass through the neighbours.
The new system uses a **composite generator** that allocates its budget
across three strategies:
| Strategy | Weight | What it does |
|---|---|---|
| `NeighborWalk` | 40% | Classic single-axis perturbation. Reliable, no surprises. |
| `RandomCombo` | 30% | Picks 1--3 dimensions at random and mutates them simultaneously. Escapes local optima by making multi-axis jumps. |
| `LessonGuided` | 30% | Reads the `SearchRule` directives from previous rungs. Fixes dimensions that are proven. Avoids values that are proven bad. Samples preferred values with probability proportional to confidence. |
When no lessons exist yet (rung 1), `RandomCombo` gets 60% of the budget
to maximise early exploration.
Every generated candidate is checked against a **history set** of all
previously evaluated fingerprints. The system never wastes a slot evaluating
a spec it has already seen.
### 2.4 The lesson feedback loop
After each rung completes, two artefacts are produced:
1. **RungLesson** (human-readable): a Markdown narrative that says things like
*"verification=z_only improved mean score by +0.0312 over 8 runs"* and
*"Consider probing remaining ancilla_strategy values."*
2. **LessonFeedback** (machine-readable): a list of `SearchRule` objects:
```
SearchRule(dimension="verification", action="prefer", value="z_only",
confidence=0.67, reason="mean score 0.1823 is +0.0312 above overall mean")
SearchRule(dimension="seed_style", action="fix", value="ry_rz",
confidence=0.60, reason="all top-3 experiments use seed_style=ry_rz")
SearchRule(dimension="verification+postselection", action="prefer",
value=("z_only", "z_only"), confidence=0.33,
reason="interaction effect +0.0089 (joint=+0.0401, expected_additive=+0.0312)")
```
The rules come from three analyses:
- **Per-dimension mean effects**: for each value of each dimension, compute
the mean score minus the overall mean. Positive = prefer, negative = avoid.
- **Fix detection**: if the top-K experiments all share a value, and that
value outperforms alternatives, emit a "fix" rule.
- **Interaction detection**: for each pair of dimensions, check whether the
joint effect exceeds the sum of the two marginal effects. If so, there is
a synergy (or conflict) between those two choices.
These rules feed directly into the `LessonGuided` strategy in the next rung.
They also feed into `narrow_search_space()`, which prunes "avoid" values and
constrains "fix" dimensions, physically shrinking the grid the next rung
searches over.
### 2.5 Cross-rung propagation
When `run_ratchet()` finishes rung N and begins rung N+1:
1. The **winner spec** from rung N becomes the bootstrap incumbent for rung N+1.
The human-written YAML bootstrap is overridden. (A `propagated_spec.json` is
saved for traceability.)
2. The **accumulated SearchRules** from all completed rungs are combined and
used to narrow the search space of rung N+1.
3. The `LessonGuided` strategy in rung N+1 has access to rules from *all*
previous rungs, not just the most recent one.
This is the "ratchet" in action across rungs: the system starts broad, learns
what matters, and zooms in.
### 2.6 The two scoring functions
| Score function | Used by | Formula | Optimises for |
|---|---|---|---|
| `weighted_acceptance_cost` | Rungs 1--3 | `(quality * acceptance) / cost` | Best magic-state quality at reasonable cost |
| `factory_throughput` | Rungs 4--5 | `(acceptance * witness) / cost` (heavier cost penalty) | Accepted states per unit cost, as a proxy for distillation factory yield |
The factory score also computes `FactoryMetrics` (accepted per shot, logical
error per accepted, cost per accepted, throughput proxy) and attaches them to
the experiment record for downstream analysis.
### 2.7 Transfer evaluation
Rung 3 can optionally run in **transfer mode**: instead of searching over
backends as a dimension (which just finds the easiest backend), it evaluates
the *same spec* across multiple backends and scores it by the **minimum**
(pessimistic) score. A spec that scores 0.18 on Brisbane and 0.02 on Kyoto
gets a transfer score of 0.02, not 0.10. This prevents backend overfitting.
```
python -m autoresearch_quantum run-transfer \
--config configs/rungs/rung3.yaml \
--backends fake_brisbane fake_kyoto fake_sherbrooke
```
### 2.8 Resumability
Every ratchet step saves a `progress.json` checkpoint:
```json
{
"rung": 2,
"steps_completed": 2,
"patience_remaining": 1,
"current_incumbent_id": "r2-incumbent-a1b2c3d4e5",
"completed": false
}
```
If the process crashes or you Ctrl-C, re-running the same rung picks up from
the last completed step with the correct patience counter. No work is lost.
---
## Part 3: Claims and how the tests prove them
### Claim 1: The encoded state is a valid magic state in the [[4,2,2]] code.
**Test**: `test_encoded_target_state_satisfies_stabilizers`
Constructs the ideal encoded magic statevector and checks that both
stabilizers (XXXX and ZZZZ) have expectation value exactly 1.0. If the
encoding circuit were wrong, at least one stabilizer would not be +1.
### Claim 2: The circuit bundle measures the right observables.
**Test**: `test_circuit_bundle_contains_expected_contexts`
Verifies that `build_circuit_bundle()` produces exactly the four expected
circuits (logical_x, logical_y, spectator_z, acceptance), each with correct
metadata. If a measurement basis rotation were missing or a circuit were
mislabelled, this catches it.
### Claim 3: Noisy simulation produces meaningful scores.
**Test**: `test_local_executor_produces_score`
Runs a full evaluation (build circuits, transpile, simulate with noise,
postselect, compute witness, score) and checks that the score is positive and
the acceptance rate and witness are in [0, 1]. This is an integration test of
the entire evaluation pipeline -- if any piece is broken, the score collapses.
### Claim 4: The challenger generator explores the search space correctly.
**Tests**: `test_neighbor_challengers_mutate_single_dimension`,
`test_neighbor_walk_respects_history`,
`test_random_combo_generates_multi_axis_mutations`,
`test_lesson_guided_uses_rules`,
`test_composite_generator_combines_strategies`
These verify:
- NeighborWalk changes exactly one field per challenger.
- Passing a history set of already-seen fingerprints produces zero
duplicates.
- RandomCombo produces at least one challenger with >1 changed field (the
defining property of multi-axis mutation).
- LessonGuided respects "fix" rules: when told to fix `seed_style=ry_rz`,
every generated challenger has that value.
- The composite generator stays within the budget cap.
### Claim 5: The lesson system extracts correct prefer/avoid/fix rules.
**Tests**: `test_extract_search_rules_prefer_and_avoid`,
`test_narrow_search_space_removes_avoided`,
`test_build_lesson_feedback_end_to_end`
Given synthetic experiment records where `z_only` scores 0.80--0.85 and
`both` scores 0.50--0.55, the extractor must emit a "prefer z_only" and
"avoid both" rule. `narrow_search_space` must actually remove avoided values
and constrain fixed dimensions.
### Claim 6: The factory score function computes throughput metrics.
**Tests**: `test_factory_throughput_score_produces_metrics`,
`test_score_registry_has_factory`
Given known input metrics (acceptance 0.70, witness 0.80), verifies that
`factory_throughput_score` produces a positive score, attaches
`factory_metrics` to the `extra` dict, and that `accepted_states_per_shot`
equals the input acceptance rate.
### Claim 7: Transfer evaluation runs the same spec across backends.
**Test**: `test_transfer_evaluator_runs_across_backends`
Runs a transfer evaluation on a single backend (for speed) and checks that a
`TransferReport` is returned with a positive transfer score and the correct
backend key in `per_backend_scores`.
### Claim 8: Progress and feedback survive serialisation round-trips.
**Tests**: `test_save_and_load_progress`,
`test_save_and_load_lesson_feedback`
Writes a `RungProgress` / `LessonFeedback` to disk via the store, reads it
back, and verifies all fields match. If the JSON schema or the
deserialisation logic drifts, this catches it.
### Claim 9: A full rung saves progress and produces both lesson types.
**Tests**: `test_run_rung_saves_progress`,
`test_run_rung_returns_lesson_and_feedback`
Runs a complete rung (bootstrap + steps + lesson extraction) and checks that
`progress.json` exists and is marked `completed`, and that the return value
includes both a human-readable `RungLesson` and a machine-readable
`LessonFeedback`.
### Claim 10: Multi-rung ratchet propagates winners and accumulates lessons.
**Test**: `test_run_ratchet_propagates_winner`
Runs a two-rung ratchet and checks that:
- Both rungs produce (lesson, feedback) tuples.
- `harness._accumulated_lessons` contains entries from both rungs, proving
that rung 2 had access to rung 1's rules when generating challengers.
### Claim 11: Different specs get different simulator seeds.
**Test**: `test_different_specs_get_different_seeds`
The old code used `seed_simulator = 11_000 + repeat_index`, meaning every
spec got the same random stream. The new code hashes the spec's fingerprint
into the seed. This test creates two specs that differ only in `verification`
and checks that their computed seeds are different.
---
## Part 4: The file map
```
autoresearch-quantum/
configs/rungs/
rung1.yaml Baseline: what recipe works at all?
rung2.yaml Stability: does it hold under noise variation?
rung3.yaml Transfer: does it work on other devices?
rung4.yaml Factory: what maximises throughput per cost?
rung5.yaml Rosenfeld: which heuristics are load-bearing?
src/autoresearch_quantum/
models.py Every data structure in one file
config.py YAML -> RungConfig parser
cli.py Entry point: run-experiment, run-step, run-rung,
run-ratchet, run-transfer
codes/
four_two_two.py The [[4,2,2]] code: stabilizers, logical ops,
encoder circuits, magic seed gates
experiments/
encoded_magic_state.py Builds the four-circuit measurement bundle
execution/
local.py LocalCheapExecutor: Aer noise simulation
hardware.py IBMHardwareExecutor: real-device SamplerV2
transfer.py TransferEvaluator: same spec across N backends
analysis.py Postselection, eigenvalues, witness formula
backends.py Backend resolution (fake_* or IBM runtime)
transpile.py Transpilation, gate counting, runtime estimates
scoring/
score.py weighted_acceptance_cost + factory_throughput
search/
challengers.py GeneratedChallenger, neighbor generation, dedup
strategies.py NeighborWalk, RandomCombo, LessonGuided,
CompositeGenerator
lessons/
extractor.py Human-readable RungLesson + machine LessonFeedback
feedback.py SearchRule extraction, interaction detection,
search space narrowing
ratchet/
runner.py AutoresearchHarness: the orchestrator
persistence/
store.py JSON file store: experiments, steps, progress,
lessons, feedback, propagated specs
tests/
test_harness.py 21 tests covering every subsystem
data/ Output directory (created at runtime)
default/
rung_1/
experiments/ One JSON per evaluated spec
ratchet_steps/ One JSON per step
incumbent.json Current best
progress.json Resumability checkpoint
lesson.json Machine-readable lesson
lesson.md Human-readable narrative
lesson_feedback.json SearchRules for the next rung
rung_2/
propagated_spec.json Winner carried from rung 1
...
```
---
## Part 5: How to use it without Claude
You do not need an AI to run this system or to make progress with its
output. Everything below runs in your terminal.
### 5.1 Setup
```bash
cd autoresearch-quantum
python -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"
```
### 5.2 Run a single experiment
```bash
python -m autoresearch_quantum run-experiment \
--config configs/rungs/rung1.yaml \
--set verification=z_only \
--set seed_style=ry_rz
```
This prints a JSON result with the score, failure mode, and experiment ID.
The full record is saved to `data/default/rung_1/experiments/`.
### 5.3 Run one ratchet step
```bash
python -m autoresearch_quantum run-step \
--config configs/rungs/rung1.yaml
```
This bootstraps an incumbent (if none exists), generates challengers, evaluates
them, promotes the best, and saves the step record. Run it again and it
generates *new* challengers (never repeating), with a new incumbent if one was
found.
### 5.4 Run a full rung
```bash
python -m autoresearch_quantum run-rung \
--config configs/rungs/rung1.yaml
```
Runs up to `step_budget` steps (default 3), stopping early if patience runs
out. Produces `data/default/rung_1/lesson.md` -- read this file. It tells you
what helped, what hurt, what seems invariant, and what to test next.
### 5.5 Run the full five-rung ratchet
```bash
python -m autoresearch_quantum run-ratchet \
--config configs/rungs/rung1.yaml \
--config configs/rungs/rung2.yaml \
--config configs/rungs/rung3.yaml \
--config configs/rungs/rung4.yaml \
--config configs/rungs/rung5.yaml
```
This is the full pipeline. Each rung's winner is automatically propagated to
the next rung. Each rung's lessons narrow the search space for the next.
When it finishes, you have five lesson files and a final optimised recipe.
### 5.6 Run a transfer evaluation
```bash
python -m autoresearch_quantum run-transfer \
--config configs/rungs/rung3.yaml \
--backends fake_brisbane fake_kyoto fake_sherbrooke
```
Tests a single spec across multiple backend noise models. The output tells you
the per-backend scores and the pessimistic transfer score.
### 5.7 Reading the output
After a ratchet run, the most valuable artefacts are:
| File | What to do with it |
|---|---|
| `rung_N/lesson.md` | Read it. It is a structured report. The "What Helped" section tells you which settings to keep. The "What Hurt" section tells you what to stop trying. |
| `rung_N/lesson_feedback.json` | This is the machine-readable version. Open it and look at the `rules` array. Each rule has an `action` (prefer/avoid/fix), a `dimension`, a `value`, a `confidence` (0--1), and a `reason`. |
| `rung_N/incumbent.json` | Contains the `experiment_id` of the current best spec. Load the corresponding file from `experiments/` to see its full spec and scores. |
| `rung_N/propagated_spec.json` | The spec that was carried forward from the previous rung. Compare it with the YAML bootstrap to see what the system changed. |
| `rung_N/progress.json` | If the run was interrupted, this tells you where it left off. Just re-run the same command to resume. |
### 5.8 Making manual progress with the artefacts
The system is designed so that you can interleave human intuition with
automated search:
1. **Read the lesson.** If rung 1 says `verification=z_only` consistently
helps, you now know something about the physics: X-stabiliser checking
adds gate cost without enough quality payoff at this noise level.
2. **Edit the YAML.** Remove values that the lesson says to avoid. Add new
values you want to explore. Change the weights if you care more about
throughput than fidelity. Save the file and re-run.
3. **Run single experiments.** If you have a specific hypothesis
("What if `approximation_degree=0.95` helps?"), test it directly with
`run-experiment --set approximation_degree=0.95`. The result is saved to
the store and will be included in the next lesson extraction.
4. **Resume interrupted runs.** If your laptop dies mid-rung, just re-run the
same command. Progress is checkpointed after every step.
5. **Compare across rungs.** Open `rung_1/lesson_feedback.json` and
`rung_3/lesson_feedback.json` side by side. Rules that appear in both with
high confidence are load-bearing. Rules that appear in rung 1 but vanish by
rung 3 were artefacts of the initial noise model.
6. **Feed results to a new search.** Copy the `best_spec_fields` from
`lesson_feedback.json` into a new YAML config as the bootstrap incumbent.
Define a tighter search space around the winning region. Run another rung.
You are now doing what the system does in `run_ratchet` -- but with human
judgement about what to explore next.
### 5.9 Running the tests
```bash
python -m pytest tests/ -v
```
All 21 tests should pass. They take about 13 seconds. If a test fails after
you edit a YAML config, the most likely cause is that you introduced a
dimension value that does not correspond to an implemented code path (e.g.,
`encoder_style: "rzz_lattice"` does not exist in `four_two_two.py`).
---
## Part 6: What this system does NOT do (yet)
- **It does not run on real quantum hardware by default.** The
`IBMHardwareExecutor` exists and is wired up, but `enable_hardware: false`
in every config. Set it to `true` and provide credentials via the
`QISKIT_IBM_TOKEN` environment variable to use real devices.
- **It does not do distillation.** Rung 5 (Rosenfeld Direction) identifies
which heuristics matter for factory-style workflows, but it does not
actually build a distillation circuit. That is the next project.
- **It does not use LLMs in the loop.** The "auto" is algorithmic
(statistical rule extraction + guided search), not generative. There is no
GPT/Claude call inside the ratchet loop. The intelligence is in the
`SearchRule` extraction, the `CompositeGenerator` budget allocation, and
the cross-rung propagation logic.
- **It does not visualise results.** There is no dashboard. The output is
JSON and Markdown. You read it, or you write a script to plot it.
- **It does not parallelise evaluations.** Each experiment runs sequentially.
On a machine with multiple cores, you could shard the challenger set across
processes, but that is not implemented.
---
## Part 7: Architecture diagram
```
configs/rungs/rung1-5.yaml
|
v
+---------+---------+
| AutoresearchHarness |
| (ratchet/runner.py) |
+---+-----+-----+---+
| | |
+------------+ | +------------+
| | |
v v v
CompositeGenerator LocalCheapExecutor ResearchStore
(search/strategies.py) (execution/local.py) (persistence/store.py)
| | |
+----------+------+ | +--------+--------+
| | | | | | |
v v v v v v v
Neighbor Random Lesson build_circuit save_ save_ save_
Walk Combo Guided _bundle() exp step progress
| |
v v
LessonFeedback AerSimulator
(lessons/ + noise model
feedback.py) + postselection
+ witness
+ scoring
```
The data flows in a circle:
```
Evaluate --> Score --> Compare --> Learn --> Narrow --> Generate --> Evaluate
```
That circle is the ratchet step. Each rung runs it multiple times. Each
ratchet runs multiple rungs. The lessons tighten the circle with every pass.
---
*This document was written on 2026-04-04 to describe the system as built.
The code is the ground truth. If this document contradicts the code, the
code is correct.*

73
configs/rungs/rung1.yaml Normal file
View file

@ -0,0 +1,73 @@
rung: 1
name: "[[4,2,2]] Encoded Magic-State Preparation"
description: "Baseline ratchet over preparation, verification, and postselection choices for the smallest encoded magic-state experiment."
objective: "Maximize acceptance-weighted encoded magic quality for [[4,2,2]] T-state preparation on a backend-aware cheap tier."
bootstrap_incumbent:
seed_style: h_p
encoder_style: cx_chain
verification: both
postselection: all_measured
ancilla_strategy: dedicated_pair
optimization_level: 2
layout_method: sabre
routing_method: sabre
approximation_degree: 1.0
target_backend: fake_brisbane
noise_backend: fake_brisbane
shots: 512
repeats: 2
notes: "Bootstrap incumbent for encoded T-state preparation."
search_space:
max_challengers_per_step: 8
dimensions:
seed_style: [h_p, ry_rz, u_magic]
encoder_style: [cx_chain, cz_compiled]
verification: [both, z_only, x_only]
postselection: [all_measured, z_only, none]
ancilla_strategy: [dedicated_pair, reused_single]
optimization_level: [1, 2, 3]
tier_policy:
cheap_margin: 0.002
confirmation_margin: 0.0
cheap_shots: 512
expensive_shots: 1024
cheap_repeats: 2
expensive_repeats: 1
promote_top_k: 2
enable_hardware: false
confirm_incumbent_on_hardware: true
hardware_budget: 1
score:
name: weighted_acceptance_cost
base_cost: 1.0
cheap_quality:
ideal_fidelity: 0.10
noisy_fidelity: 0.40
logical_witness: 0.25
codespace_rate: 0.15
stability_score: 0.05
spectator_alignment: 0.05
expensive_quality:
logical_witness: 0.55
codespace_rate: 0.15
stability_score: 0.20
spectator_alignment: 0.10
cost_weights:
two_qubit_count: 0.08
depth: 0.01
shot_count: 0.00020
runtime_estimate: 0.015
queue_cost_proxy: 0.30
step_budget: 3
patience: 2
hardware:
backend_name:
channel:
instance:
token_env_var: QISKIT_IBM_TOKEN

73
configs/rungs/rung2.yaml Normal file
View file

@ -0,0 +1,73 @@
rung: 2
name: "Backend-Aware Stability Rung"
description: "Same [[4,2,2]] task, but with repeated cheap-tier runs, backend variation, and stronger stability pressure."
objective: "Favor experiment settings that hold score under calibration-like backend changes and repeated noisy evaluation."
bootstrap_incumbent:
seed_style: h_p
encoder_style: cx_chain
verification: both
postselection: all_measured
ancilla_strategy: dedicated_pair
optimization_level: 3
layout_method: sabre
routing_method: sabre
approximation_degree: 1.0
target_backend: fake_kyoto
noise_backend: fake_kyoto
shots: 768
repeats: 3
notes: "Stability-focused bootstrap incumbent."
search_space:
max_challengers_per_step: 8
dimensions:
target_backend: [fake_kyoto, fake_brisbane, fake_sherbrooke]
noise_backend: [fake_kyoto, fake_brisbane, fake_sherbrooke]
verification: [both, z_only]
postselection: [all_measured, z_only]
optimization_level: [1, 2, 3]
layout_method: [sabre, dense]
routing_method: [sabre, basic]
tier_policy:
cheap_margin: 0.001
confirmation_margin: 0.0
cheap_shots: 768
expensive_shots: 1536
cheap_repeats: 3
expensive_repeats: 1
promote_top_k: 2
enable_hardware: false
confirm_incumbent_on_hardware: true
hardware_budget: 1
score:
name: weighted_acceptance_cost
base_cost: 1.0
cheap_quality:
noisy_fidelity: 0.30
logical_witness: 0.25
codespace_rate: 0.20
stability_score: 0.20
spectator_alignment: 0.05
expensive_quality:
logical_witness: 0.45
codespace_rate: 0.15
stability_score: 0.30
spectator_alignment: 0.10
cost_weights:
two_qubit_count: 0.06
depth: 0.01
shot_count: 0.00025
runtime_estimate: 0.02
queue_cost_proxy: 0.35
step_budget: 3
patience: 2
hardware:
backend_name:
channel:
instance:
token_env_var: QISKIT_IBM_TOKEN

77
configs/rungs/rung3.yaml Normal file
View file

@ -0,0 +1,77 @@
rung: 3
name: "Transfer Test Rung"
description: "Keep only the strongest principles from the first two rungs and test them across multiple backend targets."
objective: "Measure which [[4,2,2]] heuristics transfer across backend families rather than overfitting a single noise profile."
bootstrap_incumbent:
seed_style: ry_rz
encoder_style: cx_chain
verification: z_only
postselection: z_only
ancilla_strategy: reused_single
optimization_level: 3
layout_method: sabre
routing_method: sabre
approximation_degree: 1.0
target_backend: fake_sherbrooke
noise_backend: fake_sherbrooke
shots: 768
repeats: 3
notes: "Transfer-focused incumbent."
search_space:
max_challengers_per_step: 6
dimensions:
target_backend: [fake_sherbrooke, fake_brisbane, fake_kyoto]
noise_backend: [fake_sherbrooke, fake_brisbane, fake_kyoto]
seed_style: [ry_rz, h_p]
verification: [z_only, both]
postselection: [z_only, all_measured]
ancilla_strategy: [reused_single, dedicated_pair]
tier_policy:
cheap_margin: 0.001
confirmation_margin: 0.0
cheap_shots: 768
expensive_shots: 1536
cheap_repeats: 3
expensive_repeats: 1
promote_top_k: 2
enable_hardware: false
confirm_incumbent_on_hardware: true
hardware_budget: 1
score:
name: weighted_acceptance_cost
base_cost: 1.0
cheap_quality:
noisy_fidelity: 0.20
logical_witness: 0.30
codespace_rate: 0.20
stability_score: 0.20
spectator_alignment: 0.10
expensive_quality:
logical_witness: 0.50
codespace_rate: 0.15
stability_score: 0.25
spectator_alignment: 0.10
cost_weights:
two_qubit_count: 0.05
depth: 0.01
shot_count: 0.00025
runtime_estimate: 0.02
queue_cost_proxy: 0.40
step_budget: 3
patience: 2
transfer_backends:
- fake_sherbrooke
- fake_brisbane
- fake_kyoto
hardware:
backend_name:
channel:
instance:
token_env_var: QISKIT_IBM_TOKEN

73
configs/rungs/rung4.yaml Normal file
View file

@ -0,0 +1,73 @@
rung: 4
name: "Factory-Style Cost Rung"
description: "Shift the scalar score away from best-state chasing toward accepted states per cost proxy."
objective: "Optimize accepted encoded magic states per unit cost, using circuit suite cost as a first factory-style proxy."
bootstrap_incumbent:
seed_style: ry_rz
encoder_style: cx_chain
verification: z_only
postselection: z_only
ancilla_strategy: reused_single
optimization_level: 3
layout_method: dense
routing_method: basic
approximation_degree: 1.0
target_backend: fake_brisbane
noise_backend: fake_brisbane
shots: 384
repeats: 2
notes: "Throughput-oriented incumbent."
search_space:
max_challengers_per_step: 6
dimensions:
verification: [z_only, both, none]
postselection: [z_only, all_measured, none]
ancilla_strategy: [reused_single, dedicated_pair]
optimization_level: [2, 3]
layout_method: [dense, sabre]
routing_method: [basic, sabre]
shots: [256, 384, 512]
tier_policy:
cheap_margin: 0.001
confirmation_margin: 0.0
cheap_shots: 384
expensive_shots: 1024
cheap_repeats: 2
expensive_repeats: 1
promote_top_k: 1
enable_hardware: false
confirm_incumbent_on_hardware: true
hardware_budget: 1
score:
name: factory_throughput
base_cost: 1.0
cheap_quality:
noisy_fidelity: 0.15
logical_witness: 0.25
codespace_rate: 0.20
stability_score: 0.10
spectator_alignment: 0.10
expensive_quality:
logical_witness: 0.40
codespace_rate: 0.20
stability_score: 0.10
spectator_alignment: 0.10
cost_weights:
two_qubit_count: 0.10
depth: 0.02
shot_count: 0.00040
runtime_estimate: 0.03
queue_cost_proxy: 0.50
step_budget: 3
patience: 2
hardware:
backend_name:
channel:
instance:
token_env_var: QISKIT_IBM_TOKEN

71
configs/rungs/rung5.yaml Normal file
View file

@ -0,0 +1,71 @@
rung: 5
name: "Rosenfeld Direction"
description: "Identify which heuristics matter for cultivation/distillation workflows. Narrowest search space; only proven dimensions survive."
objective: "Determine which preparation, verification, and transpilation choices are load-bearing under factory-realistic conditions, as a precursor to distillation pipeline integration."
bootstrap_incumbent:
seed_style: ry_rz
encoder_style: cx_chain
verification: z_only
postselection: z_only
ancilla_strategy: reused_single
optimization_level: 3
layout_method: dense
routing_method: basic
approximation_degree: 1.0
target_backend: fake_brisbane
noise_backend: fake_brisbane
shots: 512
repeats: 3
notes: "Rosenfeld-direction incumbent — propagated from rung 4 winner."
search_space:
max_challengers_per_step: 4
dimensions:
verification: [z_only, both]
postselection: [z_only, all_measured]
ancilla_strategy: [reused_single, dedicated_pair]
optimization_level: [2, 3]
shots: [384, 512, 768]
tier_policy:
cheap_margin: 0.0005
confirmation_margin: 0.0
cheap_shots: 512
expensive_shots: 2048
cheap_repeats: 3
expensive_repeats: 2
promote_top_k: 1
enable_hardware: false
confirm_incumbent_on_hardware: true
hardware_budget: 1
score:
name: factory_throughput
base_cost: 1.0
cheap_quality:
noisy_fidelity: 0.10
logical_witness: 0.35
codespace_rate: 0.20
stability_score: 0.25
spectator_alignment: 0.10
expensive_quality:
logical_witness: 0.40
codespace_rate: 0.20
stability_score: 0.30
spectator_alignment: 0.10
cost_weights:
two_qubit_count: 0.12
depth: 0.02
shot_count: 0.00050
runtime_estimate: 0.04
queue_cost_proxy: 0.60
step_budget: 4
patience: 3
hardware:
backend_name:
channel:
instance:
token_env_var: QISKIT_IBM_TOKEN

39
pyproject.toml Normal file
View file

@ -0,0 +1,39 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "autoresearch-quantum"
version = "0.1.0"
description = "Karpathy-style autoresearch harness for encoded magic-state preparation experiments."
readme = "README.md"
requires-python = ">=3.11"
authors = [
{ name = "Codex" }
]
dependencies = [
"qiskit>=2.3,<3",
"qiskit-aer>=0.17,<0.18",
"pyyaml>=6,<7",
]
[project.optional-dependencies]
hardware = [
"qiskit-ibm-runtime>=0.46,<0.47",
]
dev = [
"pytest>=9,<10",
]
[project.scripts]
autoresearch-quantum = "autoresearch_quantum.cli:main"
[tool.setuptools]
package-dir = {"" = "src"}
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
pythonpath = ["src"]
testpaths = ["tests"]

View file

@ -0,0 +1,6 @@
"""Autoresearch harness for encoded magic-state preparation."""
from .config import load_rung_config
from .models import ExperimentSpec
__all__ = ["ExperimentSpec", "load_rung_config"]

View file

@ -0,0 +1,5 @@
from .cli import main
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,184 @@
from __future__ import annotations
import argparse
import json
from dataclasses import asdict, is_dataclass
from pathlib import Path
from typing import Any
from .config import load_rung_config
from .models import ExperimentSpec
from .persistence.store import ResearchStore
from .ratchet.runner import AutoresearchHarness
def _parse_override(value: str) -> tuple[str, Any]:
key, raw = value.split("=", 1)
if raw.lower() in {"true", "false"}:
return key, raw.lower() == "true"
if raw.isdigit():
return key, int(raw)
try:
return key, float(raw)
except ValueError:
pass
if raw.startswith("[") and raw.endswith("]"):
return key, json.loads(raw)
return key, raw
def _build_spec_from_config(config_path: Path, overrides: list[str]) -> tuple[Any, ExperimentSpec]:
rung_config = load_rung_config(config_path)
spec = rung_config.bootstrap_incumbent
update_payload = dict(_parse_override(item) for item in overrides)
if update_payload:
spec = spec.with_updates(**update_payload)
return rung_config, spec
def _print_json(payload: Any) -> None:
def _default(value: Any) -> Any:
if is_dataclass(value):
return asdict(value)
return str(value)
print(json.dumps(payload, indent=2, default=_default))
def _add_store_arg(parser: argparse.ArgumentParser) -> None:
parser.add_argument("--store-dir", default="data/default", help="Persistent result store directory.")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Quantum autoresearch ratchet CLI")
_add_store_arg(parser)
subparsers = parser.add_subparsers(dest="command", required=True)
experiment = subparsers.add_parser("run-experiment", help="Run one local experiment.")
_add_store_arg(experiment)
experiment.add_argument("--config", required=True)
experiment.add_argument("--set", action="append", default=[], help="Override spec fields: key=value")
experiment.add_argument("--hardware", action="store_true", help="Also run hardware confirmation if enabled.")
challenger_set = subparsers.add_parser("run-challenger-set", help="Evaluate one challenger neighborhood.")
_add_store_arg(challenger_set)
challenger_set.add_argument("--config", required=True)
step = subparsers.add_parser("run-step", help="Run one ratchet step.")
_add_store_arg(step)
step.add_argument("--config", required=True)
step.add_argument("--hardware", action="store_true")
rung = subparsers.add_parser("run-rung", help="Run a full rung.")
_add_store_arg(rung)
rung.add_argument("--config", required=True)
rung.add_argument("--hardware", action="store_true")
ratchet = subparsers.add_parser("run-ratchet", help="Run multiple rung configs in order.")
_add_store_arg(ratchet)
ratchet.add_argument("--config", action="append", required=True)
ratchet.add_argument("--hardware", action="store_true")
transfer = subparsers.add_parser("run-transfer", help="Evaluate a spec across multiple backends.")
_add_store_arg(transfer)
transfer.add_argument("--config", required=True)
transfer.add_argument("--set", action="append", default=[], help="Override spec fields: key=value")
transfer.add_argument("--backends", nargs="+", help="Backend names to evaluate on (overrides config).")
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
store = ResearchStore(args.store_dir)
harness = AutoresearchHarness(store)
if args.command == "run-experiment":
rung_config, spec = _build_spec_from_config(Path(args.config), args.set)
record = harness.run_single_experiment(
spec,
rung_config,
promote_to_hardware=bool(args.hardware),
)
_print_json(
{
"experiment_id": record.experiment_id,
"score": record.final_score,
"cheap_score": record.cheap_result.score,
"expensive_score": record.expensive_result.score if record.expensive_result else None,
"failure_mode": record.best_result.metrics.dominant_failure_mode,
}
)
return 0
if args.command == "run-challenger-set":
rung_config = load_rung_config(args.config)
records = harness.run_challenger_set(rung_config)
_print_json(
[
{
"experiment_id": record.experiment_id,
"mutation": record.mutation_note,
"cheap_score": record.cheap_result.score,
}
for record in records
]
)
return 0
if args.command == "run-step":
rung_config = load_rung_config(args.config)
step = harness.run_ratchet_step(rung_config, allow_hardware=bool(args.hardware))
_print_json(step)
return 0
if args.command == "run-rung":
rung_config = load_rung_config(args.config)
steps, lesson, feedback = harness.run_rung(rung_config, allow_hardware=bool(args.hardware))
_print_json({
"steps": steps,
"lesson_path": str(store.rung_dir(rung_config.rung) / "lesson.md"),
"lesson": lesson,
"feedback_rules": len(feedback.rules),
"narrowed_dimensions": feedback.narrowed_dimensions,
})
return 0
if args.command == "run-ratchet":
configs = [load_rung_config(path) for path in args.config]
results = harness.run_ratchet(configs, allow_hardware=bool(args.hardware))
_print_json([
{
"rung": lesson.rung,
"lesson": lesson,
"feedback_rules": len(feedback.rules),
}
for lesson, feedback in results
])
return 0
if args.command == "run-transfer":
from .execution.transfer import TransferEvaluator
rung_config, spec = _build_spec_from_config(Path(args.config), getattr(args, "set", []))
backends = args.backends or rung_config.transfer_backends
if not backends:
print("Error: No backends specified. Use --backends or add transfer_backends to config.")
return 1
evaluator = TransferEvaluator(harness.local_executor)
report = evaluator.evaluate_across_backends(spec, backends, rung_config)
_print_json({
"spec_fingerprint": spec.fingerprint(),
"transfer_score": report.transfer_score,
"mean_score": report.mean_score,
"min_score": report.min_score,
"max_score": report.max_score,
"std_score": report.std_score,
"per_backend_scores": report.per_backend_scores,
})
return 0
parser.error(f"Unknown command: {args.command}")
return 2

View file

@ -0,0 +1 @@
"""Code-specific utilities."""

View file

@ -0,0 +1,78 @@
from __future__ import annotations
from math import pi
from qiskit import QuantumCircuit
from qiskit.quantum_info import SparsePauliOp, Statevector
DATA_QUBITS = 4
MAGIC_PREP_QUBIT = 0
SPECTATOR_LOGICAL_QUBIT = 1
STABILIZERS = {
"z_stabilizer": SparsePauliOp.from_list([("ZZZZ", 1.0)]),
"x_stabilizer": SparsePauliOp.from_list([("XXXX", 1.0)]),
}
MEASUREMENT_OPERATORS = {
"logical_x": {0: "X", 2: "X"},
"logical_y": {0: "Y", 1: "Z", 2: "X"},
"spectator_z": {1: "Z", 2: "Z"},
}
def apply_magic_seed(circuit: QuantumCircuit, qubit: int, style: str) -> None:
if style == "h_p":
circuit.h(qubit)
circuit.p(pi / 4, qubit)
return
if style == "ry_rz":
circuit.ry(pi / 2, qubit)
circuit.rz(pi / 4, qubit)
return
if style == "u_magic":
circuit.u(pi / 2, 0.0, pi / 4, qubit)
return
raise ValueError(f"Unsupported seed style: {style}")
def build_encoder(style: str = "cx_chain") -> QuantumCircuit:
circuit = QuantumCircuit(DATA_QUBITS, name=f"encoder_{style}")
if style == "cx_chain":
circuit.cx(0, 2)
circuit.cx(1, 0)
circuit.h(3)
circuit.cx(3, 0)
circuit.cx(3, 1)
circuit.cx(3, 2)
return circuit
if style == "cz_compiled":
circuit.h(2)
circuit.cz(0, 2)
circuit.h(2)
circuit.h(0)
circuit.cz(1, 0)
circuit.h(0)
circuit.h(3)
circuit.h(0)
circuit.cz(3, 0)
circuit.h(0)
circuit.h(1)
circuit.cz(3, 1)
circuit.h(1)
circuit.h(2)
circuit.cz(3, 2)
circuit.h(2)
return circuit
raise ValueError(f"Unsupported encoder style: {style}")
def build_preparation_circuit(seed_style: str = "h_p", encoder_style: str = "cx_chain") -> QuantumCircuit:
circuit = QuantumCircuit(DATA_QUBITS, name="prep_422_magic")
apply_magic_seed(circuit, MAGIC_PREP_QUBIT, seed_style)
circuit.compose(build_encoder(encoder_style), qubits=range(DATA_QUBITS), inplace=True)
return circuit
def encoded_magic_statevector() -> Statevector:
return Statevector.from_instruction(build_preparation_circuit())

View file

@ -0,0 +1,82 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Mapping
import yaml
from .models import (
CostWeights,
ExperimentSpec,
HardwareConfig,
QualityWeights,
RungConfig,
ScoreConfig,
SearchSpaceConfig,
TierPolicyConfig,
)
def _quality_weights(data: Mapping[str, Any] | None) -> QualityWeights:
return QualityWeights(**dict(data or {}))
def _cost_weights(data: Mapping[str, Any] | None) -> CostWeights:
return CostWeights(**dict(data or {}))
def _score_config(data: Mapping[str, Any] | None) -> ScoreConfig:
payload = dict(data or {})
return ScoreConfig(
name=payload.get("name", "weighted_acceptance_cost"),
cheap_quality=_quality_weights(payload.get("cheap_quality")),
expensive_quality=_quality_weights(payload.get("expensive_quality")),
cost_weights=_cost_weights(payload.get("cost_weights")),
base_cost=float(payload.get("base_cost", 1.0)),
)
def _search_space_config(data: Mapping[str, Any] | None) -> SearchSpaceConfig:
payload = dict(data or {})
return SearchSpaceConfig(
dimensions=dict(payload.get("dimensions", {})),
max_challengers_per_step=int(payload.get("max_challengers_per_step", 8)),
)
def _tier_policy_config(data: Mapping[str, Any] | None) -> TierPolicyConfig:
return TierPolicyConfig(**dict(data or {}))
def _hardware_config(data: Mapping[str, Any] | None) -> HardwareConfig:
return HardwareConfig(**dict(data or {}))
def _experiment_spec(rung: int, data: Mapping[str, Any]) -> ExperimentSpec:
payload = dict(data)
payload["rung"] = rung
if "initial_layout" in payload and payload["initial_layout"] is not None:
payload["initial_layout"] = tuple(payload["initial_layout"])
return ExperimentSpec(**payload)
def load_rung_config(path: str | Path) -> RungConfig:
config_path = Path(path)
with config_path.open("r", encoding="utf-8") as handle:
payload = yaml.safe_load(handle)
rung = int(payload["rung"])
return RungConfig(
rung=rung,
name=str(payload["name"]),
description=str(payload["description"]),
objective=str(payload["objective"]),
bootstrap_incumbent=_experiment_spec(rung, payload["bootstrap_incumbent"]),
search_space=_search_space_config(payload.get("search_space")),
tier_policy=_tier_policy_config(payload.get("tier_policy")),
score=_score_config(payload.get("score")),
step_budget=int(payload.get("step_budget", 3)),
patience=int(payload.get("patience", 2)),
hardware=_hardware_config(payload.get("hardware")),
transfer_backends=list(payload.get("transfer_backends", [])),
)

View file

@ -0,0 +1 @@
"""Execution backends and analyzers."""

View file

@ -0,0 +1,119 @@
from __future__ import annotations
from collections import Counter
from math import sqrt
from statistics import fmean, pstdev
from typing import Any, Iterable
def local_memory_records(memory: list[str], creg_names: list[str]) -> list[dict[str, str]]:
records: list[dict[str, str]] = []
ordered_names = list(reversed(creg_names))
for shot in memory:
parts = shot.split(" ")
records.append(dict(zip(ordered_names, parts, strict=True)))
return records
def sampler_memory_records(bitstrings_by_register: dict[str, list[str]]) -> list[dict[str, str]]:
first_key = next(iter(bitstrings_by_register), None)
if first_key is None:
return []
shots = len(bitstrings_by_register[first_key])
records: list[dict[str, str]] = []
for shot_index in range(shots):
records.append(
{name: bitstrings[shot_index] for name, bitstrings in bitstrings_by_register.items()}
)
return records
def syndrome_outcomes(syndrome_bits: str, syndrome_labels: list[str]) -> dict[str, int]:
if not syndrome_labels:
return {}
least_significant_first = syndrome_bits[::-1]
return {
label: int(bit)
for label, bit in zip(syndrome_labels, least_significant_first, strict=True)
}
def postselection_passes(postselection: str, syndrome_labels: list[str], syndrome_bits: str) -> bool:
if postselection == "none" or not syndrome_labels:
return True
outcomes = syndrome_outcomes(syndrome_bits, syndrome_labels)
if postselection == "all_measured":
return all(bit == 0 for bit in outcomes.values())
if postselection == "z_only":
return outcomes.get("z_stabilizer", 0) == 0
if postselection == "x_only":
return outcomes.get("x_stabilizer", 0) == 0
raise ValueError(f"Unsupported postselection rule: {postselection}")
def operator_eigenvalue(data_bits: str, measured_qubits: Iterable[int]) -> int:
least_significant_first = data_bits[::-1]
parity = sum(least_significant_first[index] == "1" for index in measured_qubits)
return 1 if parity % 2 == 0 else -1
def summarize_context(
records: list[dict[str, str]],
syndrome_labels: list[str],
postselection: str,
operator: dict[int, str] | None = None,
) -> dict[str, Any]:
total_shots = len(records)
syndrome_counter: Counter[str] = Counter()
raw_data_counter: Counter[str] = Counter()
accepted_counter: Counter[str] = Counter()
accepted_values: list[int] = []
accepted = 0
for shot in records:
syndrome_bits = shot.get("syndrome", "")
data_bits = shot.get("readout", "")
syndrome_counter[syndrome_bits] += 1
raw_data_counter[data_bits] += 1
passes = postselection_passes(postselection, syndrome_labels, syndrome_bits)
if not passes:
continue
accepted += 1
accepted_counter[data_bits] += 1
if operator is not None:
accepted_values.append(operator_eigenvalue(data_bits, operator.keys()))
acceptance_rate = accepted / total_shots if total_shots else 0.0
expectation = (
sum(accepted_values) / len(accepted_values)
if accepted_values
else 0.0
)
return {
"total_shots": total_shots,
"accepted_shots": accepted,
"acceptance_rate": acceptance_rate,
"expectation": expectation,
"syndrome_counts": dict(syndrome_counter),
"raw_data_counts": dict(raw_data_counter),
"accepted_data_counts": dict(accepted_counter),
}
def logical_magic_witness(logical_x: float, logical_y: float, spectator_z: float) -> float:
witness = (1.0 + ((logical_x + logical_y) / sqrt(2.0))) / 2.0
spectator_alignment = (1.0 + spectator_z) / 2.0
value = witness * spectator_alignment
return max(0.0, min(1.0, value))
def stability_score(values: list[float]) -> float:
if not values:
return 0.0
if len(values) == 1:
return 1.0
mean_value = fmean(values)
if abs(mean_value) < 1e-9:
return 0.0
variation = pstdev(values)
return max(0.0, min(1.0, 1.0 - (variation / max(abs(mean_value), 1e-9))))

View file

@ -0,0 +1,66 @@
from __future__ import annotations
import os
from functools import lru_cache
from typing import Any
from qiskit.providers.backend import BackendV2
from ..models import HardwareConfig
try:
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_ibm_runtime.fake_provider import FakeProviderForBackendV2
except ImportError: # pragma: no cover - exercised only when hardware extras missing
QiskitRuntimeService = None
FakeProviderForBackendV2 = None
@lru_cache(maxsize=1)
def _fake_provider() -> Any:
if FakeProviderForBackendV2 is None:
raise RuntimeError("qiskit-ibm-runtime is required for fake backends.")
return FakeProviderForBackendV2()
def resolve_backend(name: str, hardware: HardwareConfig | None = None) -> BackendV2:
if name.startswith("fake_"):
return _fake_provider().backend(name)
if QiskitRuntimeService is None:
raise RuntimeError(
"qiskit-ibm-runtime is not installed. Install the hardware extra to use IBM backends."
)
service_kwargs: dict[str, Any] = {}
if hardware and hardware.channel:
service_kwargs["channel"] = hardware.channel
if hardware and hardware.instance:
service_kwargs["instance"] = hardware.instance
if hardware:
token = os.getenv(hardware.token_env_var)
if token:
service_kwargs["token"] = token
service = QiskitRuntimeService(**service_kwargs) if service_kwargs else QiskitRuntimeService()
return service.backend(name)
def backend_metadata(backend: BackendV2) -> dict[str, Any]:
operation_names = []
if getattr(backend, "operation_names", None):
operation_names = sorted(list(backend.operation_names))
coupling_map = getattr(backend, "coupling_map", None)
if coupling_map is None:
coupling_edges = 0
elif hasattr(coupling_map, "get_edges"):
coupling_edges = len(coupling_map.get_edges())
else:
coupling_edges = len(coupling_map)
return {
"name": backend.name,
"num_qubits": getattr(backend, "num_qubits", None),
"operation_names": operation_names,
"coupling_edges": coupling_edges,
}

View file

@ -0,0 +1,121 @@
from __future__ import annotations
from statistics import fmean
from ..codes.four_two_two import MEASUREMENT_OPERATORS
from ..experiments.encoded_magic_state import build_circuit_bundle
from ..models import EvaluationMetrics, ExperimentSpec, RungConfig, TierResult
from ..scoring.score import score_metrics
from .analysis import logical_magic_witness, sampler_memory_records, stability_score, summarize_context
from .backends import backend_metadata, resolve_backend
from .transpile import circuit_metadata, count_two_qubit_gates, runtime_estimate, transpile_circuits
try:
from qiskit_ibm_runtime import SamplerV2
except ImportError: # pragma: no cover - exercised only when hardware extras missing
SamplerV2 = None
class IBMHardwareExecutor:
def evaluate(self, spec: ExperimentSpec, rung_config: RungConfig) -> TierResult:
if SamplerV2 is None:
raise RuntimeError(
"qiskit-ibm-runtime is not installed. Install the hardware extra to enable IBM execution."
)
backend_name = rung_config.hardware.backend_name or spec.target_backend
backend = resolve_backend(backend_name, rung_config.hardware)
bundle = build_circuit_bundle(spec)
context_names = ["acceptance", *bundle.witness_circuits.keys()]
raw_circuits = [bundle.acceptance, *bundle.witness_circuits.values()]
transpiled_contexts = transpile_circuits(raw_circuits, spec, backend)
circuits_by_name = dict(zip(context_names, transpiled_contexts, strict=True))
shots = rung_config.tier_policy.expensive_shots
repeats = rung_config.tier_policy.expensive_repeats
sampler = SamplerV2(mode=backend)
aggregate: dict[str, list[dict[str, object]]] = {name: [] for name in context_names}
repeat_scores: list[float] = []
notes: list[str] = []
for _ in range(repeats):
result = sampler.run(list(circuits_by_name.values()), shots=shots).result()
for context_name, pub_result, circuit in zip(
context_names,
result,
circuits_by_name.values(),
strict=True,
):
records = sampler_memory_records(
{
name: bit_array.get_bitstrings()
for name, bit_array in pub_result.data.items()
}
)
summary = summarize_context(
records,
syndrome_labels=list(circuit.metadata.get("syndrome_labels", [])),
postselection=str(circuit.metadata.get("postselection", "none")),
operator=MEASUREMENT_OPERATORS.get(context_name),
)
aggregate[context_name].append(summary)
x_value = float(aggregate["logical_x"][-1]["expectation"])
y_value = float(aggregate["logical_y"][-1]["expectation"])
spectator = float(aggregate["spectator_z"][-1]["expectation"])
acceptance = float(aggregate["acceptance"][-1]["acceptance_rate"])
repeat_scores.append(logical_magic_witness(x_value, y_value, spectator) * acceptance)
acceptance_rate = fmean(float(item["acceptance_rate"]) for item in aggregate["acceptance"])
logical_x = fmean(float(item["expectation"]) for item in aggregate["logical_x"])
logical_y = fmean(float(item["expectation"]) for item in aggregate["logical_y"])
spectator_z = fmean(float(item["expectation"]) for item in aggregate["spectator_z"])
metrics = EvaluationMetrics(
logical_magic_witness=logical_magic_witness(logical_x, logical_y, spectator_z),
acceptance_rate=acceptance_rate,
codespace_rate=fmean(
float(item["acceptance_rate"])
for summaries in aggregate.values()
for item in summaries
),
spectator_logical_z=spectator_z,
logical_x=logical_x,
logical_y=logical_y,
stability_score=stability_score(repeat_scores),
two_qubit_count=sum(count_two_qubit_gates(circuit) for circuit in circuits_by_name.values()),
depth=max(circuit.depth() for circuit in circuits_by_name.values()),
shot_count=shots * repeats * len(circuits_by_name),
runtime_estimate=sum(runtime_estimate(circuit) for circuit in circuits_by_name.values()),
queue_cost_proxy=1.0,
transpile_metadata={
name: circuit_metadata(circuit, spec) for name, circuit in circuits_by_name.items()
},
backend_metadata={"target_backend": backend_metadata(backend)},
)
metrics.dominant_failure_mode = (
"hardware drift sensitivity"
if (metrics.stability_score or 1.0) < 0.75
else "hardware confirmation run"
)
score, quality, _ = score_metrics(metrics, "expensive", rung_config.score)
notes.append(
f"Hardware-tier confirmation used backend {backend.name} with {shots} shots x {repeats} repeats."
)
return TierResult(
tier="expensive",
score=score,
quality_estimate=quality,
metrics=metrics,
counts_summary={
name: {
"mean_acceptance_rate": fmean(float(item["acceptance_rate"]) for item in summaries),
"mean_expectation": fmean(float(item["expectation"]) for item in summaries),
"latest": summaries[-1],
}
for name, summaries in aggregate.items()
},
notes=notes,
)

View file

@ -0,0 +1,178 @@
from __future__ import annotations
import hashlib
from math import fsum
from statistics import fmean
from qiskit.quantum_info import Statevector, state_fidelity
from qiskit_aer import AerSimulator
from qiskit_aer.noise import NoiseModel
from ..codes.four_two_two import MEASUREMENT_OPERATORS
from ..experiments.encoded_magic_state import build_circuit_bundle
from ..models import EvaluationMetrics, ExperimentSpec, RungConfig, TierResult
from ..scoring.score import score_metrics
from .analysis import (
local_memory_records,
logical_magic_witness,
stability_score,
summarize_context,
)
from .backends import backend_metadata, resolve_backend
from .transpile import circuit_metadata, count_two_qubit_gates, runtime_estimate, transpile_circuits
def _dominant_failure_mode(metrics: EvaluationMetrics) -> str:
if metrics.acceptance_rate < 0.45:
return "postselection collapse"
if metrics.logical_magic_witness is not None and metrics.logical_magic_witness < 0.65:
return "logical witness erosion"
if metrics.stability_score is not None and metrics.stability_score < 0.75:
return "noise sensitivity"
if metrics.two_qubit_count > 60 or metrics.depth > 120:
return "transpile cost explosion"
return "residual coherent/noisy error"
class LocalCheapExecutor:
def evaluate(self, spec: ExperimentSpec, rung_config: RungConfig) -> TierResult:
bundle = build_circuit_bundle(spec)
target_backend = resolve_backend(spec.target_backend, rung_config.hardware)
noise_backend_name = spec.noise_backend or spec.target_backend
noise_backend = resolve_backend(noise_backend_name, rung_config.hardware)
transpiled_prep = transpile_circuits([bundle.prep], spec, target_backend)[0]
context_names = ["acceptance", *bundle.witness_circuits.keys()]
raw_circuits = [bundle.acceptance, *bundle.witness_circuits.values()]
transpiled_contexts = transpile_circuits(raw_circuits, spec, target_backend)
circuits_by_name = dict(zip(context_names, transpiled_contexts, strict=True))
ideal_fidelity = state_fidelity(Statevector.from_instruction(bundle.prep), bundle.target_state)
noisy_fidelity = None
shot_simulator: AerSimulator
density_simulator: AerSimulator | None = None
notes: list[str] = []
try:
noise_model = NoiseModel.from_backend(noise_backend)
shot_simulator = AerSimulator(
noise_model=noise_model,
basis_gates=noise_model.basis_gates,
coupling_map=getattr(noise_backend, "coupling_map", None),
)
density_simulator = AerSimulator(
method="density_matrix",
noise_model=noise_model,
basis_gates=noise_model.basis_gates,
coupling_map=getattr(noise_backend, "coupling_map", None),
)
except Exception as exc: # pragma: no cover - depends on backend capabilities
notes.append(f"Noise model unavailable, falling back to ideal simulation: {exc}")
shot_simulator = AerSimulator()
if density_simulator is not None:
density_circuit = transpiled_prep.copy()
density_circuit.save_density_matrix()
density_result = density_simulator.run(density_circuit).result()
noisy_density = density_result.data(0)["density_matrix"]
noisy_fidelity = state_fidelity(noisy_density, bundle.target_state)
repeats = spec.repeats or rung_config.tier_policy.cheap_repeats
shots = spec.shots or rung_config.tier_policy.cheap_shots
repeat_scores: list[float] = []
aggregate: dict[str, list[dict[str, object]]] = {name: [] for name in context_names}
for repeat_index in range(repeats):
for context_name, circuit in circuits_by_name.items():
result = shot_simulator.run(
circuit,
shots=shots,
memory=True,
seed_simulator=int(
hashlib.sha256(
f"{spec.fingerprint()}-{repeat_index}".encode()
).hexdigest()[:8],
16,
),
).result()
memory = result.get_memory(circuit)
records = local_memory_records(memory, [creg.name for creg in circuit.cregs])
operator = bundle.witness_circuits.get(context_name)
measurement_operator = None
if operator is not None:
measurement_operator = bundle.witness_circuits[context_name].metadata.get("operator")
summary = summarize_context(
records,
syndrome_labels=list(circuit.metadata.get("syndrome_labels", [])),
postselection=str(circuit.metadata.get("postselection", "none")),
operator=MEASUREMENT_OPERATORS.get(context_name),
)
aggregate[context_name].append(summary)
x_value = float(aggregate["logical_x"][-1]["expectation"])
y_value = float(aggregate["logical_y"][-1]["expectation"])
spectator = float(aggregate["spectator_z"][-1]["expectation"])
acceptance = float(aggregate["acceptance"][-1]["acceptance_rate"])
repeat_scores.append(logical_magic_witness(x_value, y_value, spectator) * acceptance)
acceptance_rate = fmean(float(item["acceptance_rate"]) for item in aggregate["acceptance"])
logical_x = fmean(float(item["expectation"]) for item in aggregate["logical_x"])
logical_y = fmean(float(item["expectation"]) for item in aggregate["logical_y"])
spectator_z = fmean(float(item["expectation"]) for item in aggregate["spectator_z"])
witness = logical_magic_witness(logical_x, logical_y, spectator_z)
codespace_rate = fmean(
[
float(item["acceptance_rate"])
for summaries in aggregate.values()
for item in summaries
]
)
total_two_qubit = sum(count_two_qubit_gates(circuit) for circuit in circuits_by_name.values())
max_depth = max(circuit.depth() for circuit in circuits_by_name.values())
total_runtime = fsum(runtime_estimate(circuit) for circuit in circuits_by_name.values())
metrics = EvaluationMetrics(
ideal_encoded_fidelity=ideal_fidelity,
noisy_encoded_fidelity=noisy_fidelity if noisy_fidelity is not None else ideal_fidelity,
logical_magic_witness=witness,
acceptance_rate=acceptance_rate,
codespace_rate=codespace_rate,
spectator_logical_z=spectator_z,
logical_x=logical_x,
logical_y=logical_y,
stability_score=stability_score(repeat_scores),
two_qubit_count=total_two_qubit,
depth=max_depth,
shot_count=shots * repeats * len(circuits_by_name),
runtime_estimate=total_runtime,
queue_cost_proxy=0.0,
transpile_metadata={
name: circuit_metadata(circuit, spec) for name, circuit in circuits_by_name.items()
},
backend_metadata={
"target_backend": backend_metadata(target_backend),
"noise_backend": backend_metadata(noise_backend),
},
)
metrics.dominant_failure_mode = _dominant_failure_mode(metrics)
score, quality, _ = score_metrics(metrics, "cheap", rung_config.score)
counts_summary = {
name: {
"mean_acceptance_rate": fmean(float(item["acceptance_rate"]) for item in summaries),
"mean_expectation": fmean(float(item["expectation"]) for item in summaries),
"latest": summaries[-1],
}
for name, summaries in aggregate.items()
}
notes.append(f"Cheap-tier proxy used {shots} shots x {repeats} repeats over {len(circuits_by_name)} circuits.")
return TierResult(
tier="cheap",
score=score,
quality_estimate=quality,
metrics=metrics,
counts_summary=counts_summary,
notes=notes,
)

View file

@ -0,0 +1,62 @@
from __future__ import annotations
import logging
from dataclasses import replace
from statistics import fmean, stdev
from ..models import ExperimentSpec, RungConfig, TransferReport
from .local import LocalCheapExecutor
logger = logging.getLogger(__name__)
class TransferEvaluator:
"""Evaluate a single spec across multiple backend noise models.
The transfer_score is the minimum across backends (pessimistic),
which prevents overfitting to a single noise profile.
"""
def __init__(self, executor: LocalCheapExecutor | None = None) -> None:
self.executor = executor or LocalCheapExecutor()
def evaluate_across_backends(
self,
spec: ExperimentSpec,
backends: list[str],
rung_config: RungConfig,
) -> TransferReport:
per_backend_scores: dict[str, float] = {}
per_backend_metrics = {}
for backend_name in backends:
backend_spec = spec.with_updates(
target_backend=backend_name,
noise_backend=backend_name,
)
result = self.executor.evaluate(backend_spec, rung_config)
per_backend_scores[backend_name] = result.score
per_backend_metrics[backend_name] = result.metrics
logger.info(
"Transfer eval: spec %s on %s -> score %.4f",
spec.fingerprint(),
backend_name,
result.score,
)
scores = list(per_backend_scores.values())
mean_s = fmean(scores)
min_s = min(scores)
max_s = max(scores)
std_s = stdev(scores) if len(scores) > 1 else 0.0
return TransferReport(
spec=spec,
per_backend_scores=per_backend_scores,
per_backend_metrics=per_backend_metrics,
mean_score=mean_s,
min_score=min_s,
max_score=max_s,
std_score=std_s,
transfer_score=min_s, # pessimistic
)

View file

@ -0,0 +1,48 @@
from __future__ import annotations
from typing import Any
from qiskit import QuantumCircuit, transpile
from qiskit.providers.backend import BackendV2
from ..models import ExperimentSpec
def transpile_circuits(
circuits: list[QuantumCircuit],
spec: ExperimentSpec,
backend: BackendV2,
) -> list[QuantumCircuit]:
transpiled = transpile(
circuits,
backend=backend,
optimization_level=spec.optimization_level,
layout_method=spec.layout_method,
routing_method=spec.routing_method,
approximation_degree=spec.approximation_degree,
initial_layout=list(spec.initial_layout) if spec.initial_layout else None,
)
if isinstance(transpiled, QuantumCircuit):
return [transpiled]
return list(transpiled)
def count_two_qubit_gates(circuit: QuantumCircuit) -> int:
return sum(1 for instruction in circuit.data if instruction.operation.num_qubits == 2)
def runtime_estimate(circuit: QuantumCircuit) -> float:
resets = sum(1 for instruction in circuit.data if instruction.operation.name == "reset")
return float(circuit.depth() + (3 * count_two_qubit_gates(circuit)) + (5 * resets))
def circuit_metadata(circuit: QuantumCircuit, spec: ExperimentSpec) -> dict[str, Any]:
return {
"optimization_level": spec.optimization_level,
"layout_method": spec.layout_method,
"routing_method": spec.routing_method,
"approximation_degree": spec.approximation_degree,
"depth": circuit.depth(),
"size": circuit.size(),
"two_qubit_count": count_two_qubit_gates(circuit),
}

View file

@ -0,0 +1 @@
"""Experiment builders."""

View file

@ -0,0 +1,163 @@
from __future__ import annotations
from dataclasses import dataclass
from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister
from qiskit.quantum_info import Statevector
from ..codes.four_two_two import DATA_QUBITS, MEASUREMENT_OPERATORS, build_preparation_circuit
from ..models import ExperimentSpec
@dataclass(frozen=True)
class MeasurementCircuitBundle:
prep: QuantumCircuit
acceptance: QuantumCircuit
witness_circuits: dict[str, QuantumCircuit]
target_state: Statevector
def _verification_checks(spec: ExperimentSpec) -> list[str]:
if spec.verification == "none":
return []
if spec.verification == "z_only":
return ["z_stabilizer"]
if spec.verification == "x_only":
return ["x_stabilizer"]
if spec.verification == "both":
return ["z_stabilizer", "x_stabilizer"]
raise ValueError(f"Unsupported verification mode: {spec.verification}")
def _ancilla_count(spec: ExperimentSpec, checks: list[str]) -> int:
if not checks:
return 0
if spec.ancilla_strategy == "dedicated_pair":
return len(checks)
if spec.ancilla_strategy == "reused_single":
return 1
raise ValueError(f"Unsupported ancilla strategy: {spec.ancilla_strategy}")
def _add_z_check(circuit: QuantumCircuit, ancilla: int, data_qubits: list[int]) -> None:
for qubit in data_qubits:
circuit.cx(qubit, ancilla)
def _add_x_check(circuit: QuantumCircuit, ancilla: int, data_qubits: list[int]) -> None:
circuit.h(ancilla)
for qubit in data_qubits:
circuit.cx(ancilla, qubit)
circuit.h(ancilla)
def _measure_operator(circuit: QuantumCircuit, data_qubits: list[int], operator: dict[int, str]) -> None:
for qubit in data_qubits:
basis = operator.get(qubit, "Z")
if basis == "X":
circuit.h(qubit)
elif basis == "Y":
circuit.sdg(qubit)
circuit.h(qubit)
elif basis == "Z":
continue
else:
raise ValueError(f"Unsupported basis: {basis}")
def _attach_verification(
circuit: QuantumCircuit,
spec: ExperimentSpec,
data_qubits: list[int],
ancilla_qubits: list[int],
syndrome_bits: list[int],
) -> list[str]:
checks = _verification_checks(spec)
labels: list[str] = []
if not checks:
return labels
if spec.ancilla_strategy == "dedicated_pair":
for ancilla_qubit, syndrome_bit, label in zip(ancilla_qubits, syndrome_bits, checks, strict=True):
if label == "z_stabilizer":
_add_z_check(circuit, ancilla_qubit, data_qubits)
else:
_add_x_check(circuit, ancilla_qubit, data_qubits)
circuit.measure(ancilla_qubit, syndrome_bit)
labels.append(label)
return labels
ancilla_qubit = ancilla_qubits[0]
for syndrome_bit, label in zip(syndrome_bits, checks, strict=True):
if label == "z_stabilizer":
_add_z_check(circuit, ancilla_qubit, data_qubits)
else:
_add_x_check(circuit, ancilla_qubit, data_qubits)
circuit.measure(ancilla_qubit, syndrome_bit)
labels.append(label)
if label != checks[-1]:
circuit.reset(ancilla_qubit)
return labels
def _base_circuit(spec: ExperimentSpec, context_name: str, operator: dict[int, str] | None) -> QuantumCircuit:
checks = _verification_checks(spec)
ancilla_count = _ancilla_count(spec, checks)
syndrome_bits = len(checks)
data = QuantumRegister(DATA_QUBITS, "data")
ancilla = QuantumRegister(ancilla_count, "anc") if ancilla_count else None
syndrome = ClassicalRegister(syndrome_bits, "syndrome") if syndrome_bits else None
readout = ClassicalRegister(DATA_QUBITS, "readout")
registers = [data]
if ancilla is not None:
registers.append(ancilla)
if syndrome is not None:
registers.append(syndrome)
registers.append(readout)
circuit = QuantumCircuit(*registers, name=context_name)
circuit.compose(
build_preparation_circuit(spec.seed_style, spec.encoder_style),
qubits=list(range(DATA_QUBITS)),
inplace=True,
)
syndrome_labels: list[str] = []
if ancilla is not None and syndrome is not None:
syndrome_labels = _attach_verification(
circuit,
spec,
data_qubits=list(range(DATA_QUBITS)),
ancilla_qubits=list(range(DATA_QUBITS, DATA_QUBITS + ancilla_count)),
syndrome_bits=list(range(syndrome_bits)),
)
if operator is not None:
_measure_operator(circuit, list(range(DATA_QUBITS)), operator)
circuit.measure(data, readout)
circuit.metadata = {
"context": context_name,
"syndrome_labels": syndrome_labels,
"postselection": spec.postselection,
"logical_operator": operator,
}
return circuit
def build_circuit_bundle(spec: ExperimentSpec) -> MeasurementCircuitBundle:
prep = build_preparation_circuit(spec.seed_style, spec.encoder_style)
witness_circuits = {
name: _base_circuit(spec, name, operator)
for name, operator in MEASUREMENT_OPERATORS.items()
}
acceptance = _base_circuit(spec, "acceptance", operator=None)
target_state = Statevector.from_instruction(build_preparation_circuit())
return MeasurementCircuitBundle(
prep=prep,
acceptance=acceptance,
witness_circuits=witness_circuits,
target_state=target_state,
)

View file

@ -0,0 +1 @@
"""Lesson extraction."""

View file

@ -0,0 +1,152 @@
from __future__ import annotations
from collections import defaultdict
from statistics import fmean
from typing import Any
from ..models import LessonFeedback, RungConfig, RungLesson
from .feedback import build_lesson_feedback
def _record_score(record: dict[str, Any]) -> float:
return float(record.get("final_score", 0.0))
def extract_rung_lesson(
rung_config: RungConfig,
experiment_records: list[dict[str, Any]],
ratchet_steps: list[dict[str, Any]],
) -> tuple[RungLesson, LessonFeedback]:
if not experiment_records:
empty = "No experiments were recorded for this rung."
empty_lesson = RungLesson(
rung=rung_config.rung,
name=rung_config.name,
objective=rung_config.objective,
what_helped=[empty],
what_hurt=[empty],
what_seems_invariant=[empty],
what_seems_hardware_specific=[empty],
what_should_be_tested_next=[empty],
what_should_be_promoted_to_next_rung=[empty],
what_should_be_discarded=[empty],
narrative=empty,
)
empty_feedback = LessonFeedback(
rung=rung_config.rung,
rules=[],
narrowed_dimensions={},
best_spec_fields={},
)
return empty_lesson, empty_feedback
overall_mean = fmean(_record_score(record) for record in experiment_records)
top_records = sorted(experiment_records, key=_record_score, reverse=True)[: min(3, len(experiment_records))]
value_effects: list[tuple[float, str, Any, int]] = []
hardware_deltas: list[tuple[float, str, Any]] = []
for dimension in rung_config.search_space.dimensions:
grouped: dict[Any, list[dict[str, Any]]] = defaultdict(list)
for record in experiment_records:
grouped[record["spec"][dimension]].append(record)
for value, records in grouped.items():
mean_score = fmean(_record_score(record) for record in records)
value_effects.append((mean_score - overall_mean, dimension, value, len(records)))
hardware_scores = [
float(record["expensive_result"]["score"]) - float(record["cheap_result"]["score"])
for record in records
if record.get("expensive_result")
]
if hardware_scores:
hardware_deltas.append((fmean(hardware_scores), dimension, value))
helped = [
f"{dimension}={value} improved mean score by {delta:+.4f} over {samples} runs."
for delta, dimension, value, samples in sorted(value_effects, reverse=True)[:3]
]
hurt = [
f"{dimension}={value} hurt mean score by {delta:+.4f} over {samples} runs."
for delta, dimension, value, samples in sorted(value_effects)[:3]
]
invariants: list[str] = []
for dimension in rung_config.search_space.dimensions:
values = {record["spec"][dimension] for record in top_records}
if len(values) == 1:
value = next(iter(values))
invariants.append(f"Top-ranked experiments consistently kept {dimension}={value}.")
hardware_specific = [
f"{dimension}={value} shifted hardware score by {delta:+.4f} relative to cheap-tier screening."
for delta, dimension, value in sorted(hardware_deltas, key=lambda item: abs(item[0]), reverse=True)[:3]
] or ["No hardware-specific divergence was observed in this rung."]
explored_values = {
dimension: {record["spec"][dimension] for record in experiment_records}
for dimension in rung_config.search_space.dimensions
}
should_test_next = []
for dimension, values in rung_config.search_space.dimensions.items():
remaining = [value for value in values if value not in explored_values[dimension]]
if remaining:
should_test_next.append(f"Probe remaining {dimension} values: {remaining}.")
if not should_test_next:
should_test_next.append(
"Lift the best settings into a new experiment family or backend target for transfer testing."
)
step_lessons = [step["distilled_lesson"] for step in ratchet_steps[-3:] if step.get("distilled_lesson")]
promoted = step_lessons or ["Carry forward the best incumbent settings as priors for the next rung."]
discarded = [
entry
for entry in hurt
if "over 1 runs" not in entry
] or ["No setting is discarded yet; collect more evidence before pruning."]
narrative_lines = [
f"# Rung {rung_config.rung}: {rung_config.name}",
"",
f"Objective: {rung_config.objective}",
"",
"## What Helped",
*[f"- {item}" for item in helped],
"",
"## What Hurt",
*[f"- {item}" for item in hurt],
"",
"## Invariants",
*[f"- {item}" for item in invariants or ['No invariant emerged strongly enough yet.']],
"",
"## Hardware-Specific Effects",
*[f"- {item}" for item in hardware_specific],
"",
"## Next Tests",
*[f"- {item}" for item in should_test_next],
"",
"## Promote Forward",
*[f"- {item}" for item in promoted],
"",
"## Discard",
*[f"- {item}" for item in discarded],
]
lesson = RungLesson(
rung=rung_config.rung,
name=rung_config.name,
objective=rung_config.objective,
what_helped=helped,
what_hurt=hurt,
what_seems_invariant=invariants or ["No invariant emerged strongly enough yet."],
what_seems_hardware_specific=hardware_specific,
what_should_be_tested_next=should_test_next,
what_should_be_promoted_to_next_rung=promoted,
what_should_be_discarded=discarded,
narrative="\n".join(narrative_lines),
)
feedback = build_lesson_feedback(
rung_config.rung,
experiment_records,
rung_config.search_space,
)
return lesson, feedback

View file

@ -0,0 +1,199 @@
from __future__ import annotations
from collections import defaultdict
from itertools import combinations
from statistics import fmean, stdev
from typing import Any
from ..models import LessonFeedback, SearchRule, SearchSpaceConfig
def _record_score(record: dict[str, Any]) -> float:
return float(record.get("final_score", 0.0))
def extract_search_rules(
experiment_records: list[dict[str, Any]],
search_space: SearchSpaceConfig,
min_samples: int = 2,
effect_threshold: float = 0.005,
) -> list[SearchRule]:
"""Extract machine-readable search rules from experiment data.
Analyses per-dimension mean effects, interaction detection, and
consistency patterns to produce prefer/avoid/fix directives.
"""
if not experiment_records:
return []
overall_mean = fmean(_record_score(r) for r in experiment_records)
rules: list[SearchRule] = []
dim_names = list(search_space.dimensions.keys())
# Per-dimension analysis
for dim in dim_names:
grouped: dict[Any, list[float]] = defaultdict(list)
for record in experiment_records:
val = record["spec"].get(dim)
if val is not None:
grouped[val].append(_record_score(record))
for value, scores in grouped.items():
if len(scores) < min_samples:
continue
mean_score = fmean(scores)
delta = mean_score - overall_mean
confidence = min(1.0, len(scores) / max(len(experiment_records), 1))
if delta > effect_threshold:
rules.append(SearchRule(
dimension=dim,
action="prefer",
value=value,
confidence=confidence,
reason=f"mean score {mean_score:.4f} is {delta:+.4f} above overall mean ({len(scores)} samples)",
))
elif delta < -effect_threshold:
rules.append(SearchRule(
dimension=dim,
action="avoid",
value=value,
confidence=confidence,
reason=f"mean score {mean_score:.4f} is {delta:+.4f} below overall mean ({len(scores)} samples)",
))
# Check for "fix" rules: if top-K experiments all share a value
top_k = min(5, max(3, len(experiment_records) // 3))
top_records = sorted(experiment_records, key=_record_score, reverse=True)[:top_k]
if len(top_records) >= 3:
for dim in dim_names:
values_in_top = {r["spec"].get(dim) for r in top_records}
if len(values_in_top) == 1:
fixed_value = next(iter(values_in_top))
# Also check it's better than alternatives
all_with_value = [
_record_score(r) for r in experiment_records
if r["spec"].get(dim) == fixed_value
]
all_without = [
_record_score(r) for r in experiment_records
if r["spec"].get(dim) != fixed_value
]
if all_without and fmean(all_with_value) > fmean(all_without):
rules.append(SearchRule(
dimension=dim,
action="fix",
value=fixed_value,
confidence=min(1.0, len(top_records) / len(experiment_records)),
reason=f"all top-{len(top_records)} experiments use {dim}={fixed_value}",
))
# Interaction detection: for dimension pairs, check if joint effect > sum of marginals
for dim_a, dim_b in combinations(dim_names, 2):
marginal_a: dict[Any, float] = {}
marginal_b: dict[Any, float] = {}
joint: dict[tuple[Any, Any], list[float]] = defaultdict(list)
for record in experiment_records:
va = record["spec"].get(dim_a)
vb = record["spec"].get(dim_b)
score = _record_score(record)
joint[(va, vb)].append(score)
# Need enough joint observations
if all(len(v) < min_samples for v in joint.values()):
continue
# Compute marginals
for dim, marginals in [(dim_a, marginal_a), (dim_b, marginal_b)]:
grouped_m: dict[Any, list[float]] = defaultdict(list)
for record in experiment_records:
grouped_m[record["spec"].get(dim)].append(_record_score(record))
for val, scores in grouped_m.items():
marginals[val] = fmean(scores) - overall_mean
# Check for interactions
for (va, vb), scores in joint.items():
if len(scores) < min_samples:
continue
joint_effect = fmean(scores) - overall_mean
expected_additive = marginal_a.get(va, 0.0) + marginal_b.get(vb, 0.0)
interaction = joint_effect - expected_additive
if abs(interaction) > effect_threshold * 2:
action = "prefer" if interaction > 0 else "avoid"
rules.append(SearchRule(
dimension=f"{dim_a}+{dim_b}",
action=action,
value=(va, vb),
confidence=min(1.0, len(scores) / len(experiment_records)),
reason=(
f"interaction effect {interaction:+.4f} "
f"(joint={joint_effect:+.4f}, expected_additive={expected_additive:+.4f})"
),
))
return rules
def narrow_search_space(
search_space: SearchSpaceConfig,
rules: list[SearchRule],
min_values_per_dim: int = 2,
) -> SearchSpaceConfig:
"""Prune search space based on lesson rules.
- Remove "avoid" values (keeping at least min_values_per_dim per dimension)
- Constrain "fix" dimensions to the fixed value only
"""
new_dims: dict[str, list[Any]] = {}
# Collect avoids and fixes per simple dimension
avoid_map: dict[str, set[Any]] = defaultdict(set)
fix_map: dict[str, Any] = {}
for rule in rules:
if "+" in str(rule.dimension):
continue # Skip interaction rules for narrowing
if rule.action == "avoid" and rule.confidence >= 0.3:
avoid_map[rule.dimension].add(rule.value)
elif rule.action == "fix" and rule.confidence >= 0.4:
fix_map[rule.dimension] = rule.value
for dim, values in search_space.dimensions.items():
if dim in fix_map and fix_map[dim] in values:
new_dims[dim] = [fix_map[dim]]
elif dim in avoid_map:
filtered = [v for v in values if v not in avoid_map[dim]]
if len(filtered) >= min_values_per_dim:
new_dims[dim] = filtered
else:
new_dims[dim] = list(values) # Keep all if pruning too aggressive
else:
new_dims[dim] = list(values)
return SearchSpaceConfig(
dimensions=new_dims,
max_challengers_per_step=search_space.max_challengers_per_step,
)
def build_lesson_feedback(
rung: int,
experiment_records: list[dict[str, Any]],
search_space: SearchSpaceConfig,
) -> LessonFeedback:
"""Build a complete LessonFeedback from experiment data."""
rules = extract_search_rules(experiment_records, search_space)
narrowed = narrow_search_space(search_space, rules)
# Extract best spec fields from top experiment
best_spec_fields: dict[str, Any] = {}
if experiment_records:
best = max(experiment_records, key=_record_score)
best_spec_fields = dict(best.get("spec", {}))
return LessonFeedback(
rung=rung,
rules=rules,
narrowed_dimensions=narrowed.dimensions,
best_spec_fields=best_spec_fields,
)

View file

@ -0,0 +1,262 @@
from __future__ import annotations
from dataclasses import asdict, dataclass, field, replace
from datetime import datetime, timezone
from hashlib import sha1
from typing import Any
def utc_timestamp() -> str:
return datetime.now(timezone.utc).isoformat()
def short_hash(payload: str, length: int = 10) -> str:
return sha1(payload.encode("utf-8")).hexdigest()[:length]
@dataclass(frozen=True)
class ExperimentSpec:
rung: int
seed_style: str = "h_p"
encoder_style: str = "cx_chain"
verification: str = "both"
postselection: str = "all_measured"
ancilla_strategy: str = "dedicated_pair"
optimization_level: int = 2
layout_method: str = "sabre"
routing_method: str = "sabre"
approximation_degree: float = 1.0
target_backend: str = "fake_brisbane"
noise_backend: str | None = None
initial_layout: tuple[int, ...] | None = None
shots: int = 2048
repeats: int = 3
notes: str = ""
def with_updates(self, **changes: Any) -> "ExperimentSpec":
if "initial_layout" in changes and isinstance(changes["initial_layout"], list):
changes["initial_layout"] = tuple(changes["initial_layout"])
return replace(self, **changes)
def identity_payload(self) -> str:
payload = asdict(self)
return repr(payload)
def fingerprint(self) -> str:
return short_hash(self.identity_payload())
@dataclass(frozen=True)
class QualityWeights:
ideal_fidelity: float = 0.0
noisy_fidelity: float = 0.0
logical_witness: float = 0.0
codespace_rate: float = 0.0
stability_score: float = 0.0
spectator_alignment: float = 0.0
@dataclass(frozen=True)
class CostWeights:
two_qubit_count: float = 0.08
depth: float = 0.01
shot_count: float = 0.00015
runtime_estimate: float = 0.02
queue_cost_proxy: float = 0.3
@dataclass(frozen=True)
class ScoreConfig:
name: str = "weighted_acceptance_cost"
cheap_quality: QualityWeights = field(default_factory=QualityWeights)
expensive_quality: QualityWeights = field(default_factory=QualityWeights)
cost_weights: CostWeights = field(default_factory=CostWeights)
base_cost: float = 1.0
@dataclass(frozen=True)
class SearchSpaceConfig:
dimensions: dict[str, list[Any]] = field(default_factory=dict)
max_challengers_per_step: int = 8
@dataclass(frozen=True)
class TierPolicyConfig:
cheap_margin: float = 0.01
confirmation_margin: float = 0.0
cheap_shots: int = 2048
expensive_shots: int = 4096
cheap_repeats: int = 3
expensive_repeats: int = 2
noisy_simulator: str = "aer"
promote_top_k: int = 2
enable_hardware: bool = False
confirm_incumbent_on_hardware: bool = True
hardware_budget: int = 0
@dataclass(frozen=True)
class HardwareConfig:
backend_name: str | None = None
channel: str | None = None
instance: str | None = None
token_env_var: str = "QISKIT_IBM_TOKEN"
@dataclass(frozen=True)
class RungConfig:
rung: int
name: str
description: str
objective: str
bootstrap_incumbent: ExperimentSpec
search_space: SearchSpaceConfig
tier_policy: TierPolicyConfig
score: ScoreConfig
step_budget: int = 3
patience: int = 2
hardware: HardwareConfig = field(default_factory=HardwareConfig)
transfer_backends: list[str] = field(default_factory=list)
@dataclass
class EvaluationMetrics:
ideal_encoded_fidelity: float | None = None
noisy_encoded_fidelity: float | None = None
logical_magic_witness: float | None = None
acceptance_rate: float = 1.0
codespace_rate: float | None = None
spectator_logical_z: float | None = None
logical_x: float | None = None
logical_y: float | None = None
stability_score: float | None = None
two_qubit_count: int = 0
depth: int = 0
shot_count: int = 0
runtime_estimate: float = 0.0
queue_cost_proxy: float = 0.0
total_cost: float = 0.0
dominant_failure_mode: str = "unclassified"
transpile_metadata: dict[str, Any] = field(default_factory=dict)
backend_metadata: dict[str, Any] = field(default_factory=dict)
extra: dict[str, Any] = field(default_factory=dict)
@dataclass
class TierResult:
tier: str
score: float
quality_estimate: float
metrics: EvaluationMetrics
counts_summary: dict[str, Any] = field(default_factory=dict)
notes: list[str] = field(default_factory=list)
created_at: str = field(default_factory=utc_timestamp)
@dataclass
class ExperimentRecord:
experiment_id: str
rung: int
role: str
parent_incumbent_id: str | None
mutation_note: str
spec: ExperimentSpec
cheap_result: TierResult
expensive_result: TierResult | None = None
final_score: float = 0.0
promoted_to_expensive: bool = False
became_incumbent: bool = False
created_at: str = field(default_factory=utc_timestamp)
@property
def best_result(self) -> TierResult:
return self.expensive_result or self.cheap_result
@dataclass
class RatchetStepRecord:
step_index: int
rung: int
incumbent_before_id: str
challengers_tested: list[str]
promoted_challengers: list[str]
winner_id: str
winning_margin: float
cheap_tier_justification: str
expensive_tier_result: str
distilled_lesson: str
created_at: str = field(default_factory=utc_timestamp)
@dataclass
class RungLesson:
rung: int
name: str
objective: str
what_helped: list[str]
what_hurt: list[str]
what_seems_invariant: list[str]
what_seems_hardware_specific: list[str]
what_should_be_tested_next: list[str]
what_should_be_promoted_to_next_rung: list[str]
what_should_be_discarded: list[str]
narrative: str
created_at: str = field(default_factory=utc_timestamp)
@dataclass(frozen=True)
class SearchRule:
"""Machine-readable directive extracted from lesson analysis."""
dimension: str
action: str # "prefer", "avoid", "fix"
value: Any
confidence: float # 0.01.0, based on sample proportion
reason: str
@dataclass(frozen=True)
class LessonFeedback:
"""Machine-readable counterpart to RungLesson for search guidance."""
rung: int
rules: list[SearchRule]
narrowed_dimensions: dict[str, list[Any]]
best_spec_fields: dict[str, Any]
transfer_scores: dict[str, float] = field(default_factory=dict)
@dataclass
class TransferReport:
"""Cross-backend evaluation results for a single spec."""
spec: ExperimentSpec
per_backend_scores: dict[str, float]
per_backend_metrics: dict[str, EvaluationMetrics]
mean_score: float
min_score: float
max_score: float
std_score: float
transfer_score: float # pessimistic = min(scores)
@dataclass
class FactoryMetrics:
"""Factory-style throughput metrics attached to EvaluationMetrics.extra."""
accepted_states_per_shot: float
logical_error_per_accepted: float
accepted_per_unit_cost: float
quality_yield: float
cost_per_accepted: float
throughput_proxy: float
@dataclass
class RungProgress:
"""Resumability state for a rung execution."""
rung: int
steps_completed: int
patience_remaining: int
current_incumbent_id: str
completed: bool = False
def generate_experiment_id(spec: ExperimentSpec, role: str) -> str:
return f"r{spec.rung}-{role}-{spec.fingerprint()}"

View file

@ -0,0 +1 @@
"""Persistence for experiment records and lessons."""

View file

@ -0,0 +1,135 @@
from __future__ import annotations
import json
from dataclasses import asdict
from pathlib import Path
from typing import Any
from ..models import (
ExperimentRecord,
ExperimentSpec,
LessonFeedback,
RatchetStepRecord,
RungLesson,
RungProgress,
SearchRule,
)
class ResearchStore:
def __init__(self, root: str | Path) -> None:
self.root = Path(root)
self.root.mkdir(parents=True, exist_ok=True)
def rung_dir(self, rung: int) -> Path:
path = self.root / f"rung_{rung}"
path.mkdir(parents=True, exist_ok=True)
return path
def experiment_dir(self, rung: int) -> Path:
path = self.rung_dir(rung) / "experiments"
path.mkdir(parents=True, exist_ok=True)
return path
def ratchet_dir(self, rung: int) -> Path:
path = self.rung_dir(rung) / "ratchet_steps"
path.mkdir(parents=True, exist_ok=True)
return path
def _write_json(self, path: Path, payload: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
def save_experiment(self, record: ExperimentRecord) -> Path:
path = self.experiment_dir(record.rung) / f"{record.experiment_id}.json"
self._write_json(path, asdict(record))
return path
def load_experiment(self, rung: int, experiment_id: str) -> dict[str, Any]:
path = self.experiment_dir(rung) / f"{experiment_id}.json"
return json.loads(path.read_text(encoding="utf-8"))
def list_experiments(self, rung: int) -> list[dict[str, Any]]:
return [
json.loads(path.read_text(encoding="utf-8"))
for path in sorted(self.experiment_dir(rung).glob("*.json"))
]
def save_ratchet_step(self, step: RatchetStepRecord) -> Path:
path = self.ratchet_dir(step.rung) / f"step_{step.step_index:04d}.json"
self._write_json(path, asdict(step))
return path
def list_ratchet_steps(self, rung: int) -> list[dict[str, Any]]:
return [
json.loads(path.read_text(encoding="utf-8"))
for path in sorted(self.ratchet_dir(rung).glob("*.json"))
]
def set_incumbent(self, rung: int, experiment_id: str) -> Path:
path = self.rung_dir(rung) / "incumbent.json"
self._write_json(path, {"experiment_id": experiment_id})
return path
def load_incumbent_id(self, rung: int) -> str | None:
path = self.rung_dir(rung) / "incumbent.json"
if not path.exists():
return None
payload = json.loads(path.read_text(encoding="utf-8"))
return str(payload["experiment_id"])
def save_lesson(self, lesson: RungLesson) -> Path:
json_path = self.rung_dir(lesson.rung) / "lesson.json"
md_path = self.rung_dir(lesson.rung) / "lesson.md"
self._write_json(json_path, asdict(lesson))
md_path.write_text(lesson.narrative, encoding="utf-8")
return json_path
def save_lesson_feedback(self, feedback: LessonFeedback) -> Path:
path = self.rung_dir(feedback.rung) / "lesson_feedback.json"
payload = {
"rung": feedback.rung,
"rules": [asdict(r) for r in feedback.rules],
"narrowed_dimensions": feedback.narrowed_dimensions,
"best_spec_fields": feedback.best_spec_fields,
"transfer_scores": feedback.transfer_scores,
}
self._write_json(path, payload)
return path
def load_lesson_feedback(self, rung: int) -> LessonFeedback | None:
path = self.rung_dir(rung) / "lesson_feedback.json"
if not path.exists():
return None
data = json.loads(path.read_text(encoding="utf-8"))
rules = [SearchRule(**r) for r in data.get("rules", [])]
return LessonFeedback(
rung=data["rung"],
rules=rules,
narrowed_dimensions=data.get("narrowed_dimensions", {}),
best_spec_fields=data.get("best_spec_fields", {}),
transfer_scores=data.get("transfer_scores", {}),
)
def save_progress(self, progress: RungProgress) -> Path:
path = self.rung_dir(progress.rung) / "progress.json"
self._write_json(path, asdict(progress))
return path
def load_progress(self, rung: int) -> RungProgress | None:
path = self.rung_dir(rung) / "progress.json"
if not path.exists():
return None
data = json.loads(path.read_text(encoding="utf-8"))
return RungProgress(**data)
def save_propagated_spec(self, rung: int, spec: ExperimentSpec) -> Path:
path = self.rung_dir(rung) / "propagated_spec.json"
self._write_json(path, asdict(spec))
return path
def load_propagated_spec(self, rung: int) -> dict[str, Any] | None:
path = self.rung_dir(rung) / "propagated_spec.json"
if not path.exists():
return None
return json.loads(path.read_text(encoding="utf-8"))

View file

@ -0,0 +1 @@
"""Ratchet orchestration."""

View file

@ -0,0 +1,441 @@
from __future__ import annotations
import json
import logging
from dataclasses import asdict, replace
from typing import Any
from ..execution.local import LocalCheapExecutor
from ..lessons.extractor import extract_rung_lesson
from ..models import (
EvaluationMetrics,
ExperimentRecord,
ExperimentSpec,
LessonFeedback,
RatchetStepRecord,
RungConfig,
RungProgress,
TierResult,
generate_experiment_id,
)
from ..persistence.store import ResearchStore
from ..search.challengers import GeneratedChallenger, mutation_summary
from ..search.strategies import CompositeGenerator, default_composite
logger = logging.getLogger(__name__)
def _from_dict_spec(payload: dict[str, Any]) -> ExperimentSpec:
if payload.get("initial_layout") is not None:
payload = dict(payload)
payload["initial_layout"] = tuple(payload["initial_layout"])
return ExperimentSpec(**payload)
def _record_from_json(payload: dict[str, Any]) -> ExperimentRecord:
cheap = _tier_result_from_dict(payload["cheap_result"])
expensive = _tier_result_from_dict(payload["expensive_result"]) if payload.get("expensive_result") else None
return ExperimentRecord(
experiment_id=payload["experiment_id"],
rung=int(payload["rung"]),
role=payload["role"],
parent_incumbent_id=payload.get("parent_incumbent_id"),
mutation_note=payload.get("mutation_note", ""),
spec=_from_dict_spec(payload["spec"]),
cheap_result=cheap, # type: ignore[arg-type]
expensive_result=expensive, # type: ignore[arg-type]
final_score=float(payload.get("final_score", 0.0)),
promoted_to_expensive=bool(payload.get("promoted_to_expensive", False)),
became_incumbent=bool(payload.get("became_incumbent", False)),
created_at=payload.get("created_at", ""),
)
def _metrics_from_dict(payload: dict[str, Any]) -> EvaluationMetrics:
return EvaluationMetrics(**payload)
def _tier_result_from_dict(payload: dict[str, Any]) -> TierResult:
return TierResult(
tier=payload["tier"],
score=float(payload["score"]),
quality_estimate=float(payload["quality_estimate"]),
metrics=_metrics_from_dict(payload["metrics"]),
counts_summary=dict(payload.get("counts_summary", {})),
notes=list(payload.get("notes", [])),
created_at=payload.get("created_at", ""),
)
class AutoresearchHarness:
def __init__(self, store: ResearchStore) -> None:
self.store = store
self.local_executor = LocalCheapExecutor()
self._hardware_executor: Any = None # Lazy-loaded
self._experiment_history: set[str] = set()
self._accumulated_lessons: list[LessonFeedback] = []
@property
def hardware_executor(self) -> Any:
if self._hardware_executor is None:
from ..execution.hardware import IBMHardwareExecutor
self._hardware_executor = IBMHardwareExecutor()
return self._hardware_executor
def _build_history(self, rung: int) -> set[str]:
"""Collect fingerprints of all experiments already tried in this rung."""
experiments = self.store.list_experiments(rung)
return {
ExperimentSpec(**{
k: tuple(v) if k == "initial_layout" and isinstance(v, list) else v
for k, v in exp["spec"].items()
}).fingerprint()
for exp in experiments
}
def _get_challenger_generator(self) -> CompositeGenerator:
return default_composite(has_lessons=bool(self._accumulated_lessons))
def _evaluate_record(
self,
spec: ExperimentSpec,
rung_config: RungConfig,
role: str,
parent_incumbent_id: str | None,
mutation_note: str,
promote_to_hardware: bool = False,
) -> ExperimentRecord:
cheap_result = self.local_executor.evaluate(spec, rung_config)
record = ExperimentRecord(
experiment_id=generate_experiment_id(spec, role),
rung=spec.rung,
role=role,
parent_incumbent_id=parent_incumbent_id,
mutation_note=mutation_note,
spec=spec,
cheap_result=cheap_result,
final_score=cheap_result.score,
)
if promote_to_hardware and rung_config.tier_policy.enable_hardware:
expensive_result = self.hardware_executor.evaluate(spec, rung_config)
record.expensive_result = expensive_result
record.promoted_to_expensive = True
record.final_score = expensive_result.score
self.store.save_experiment(record)
self._experiment_history.add(spec.fingerprint())
return record
def _load_incumbent(self, rung: int) -> ExperimentRecord | None:
experiment_id = self.store.load_incumbent_id(rung)
if experiment_id is None:
return None
payload = self.store.load_experiment(rung, experiment_id)
return _record_from_json(payload)
def ensure_incumbent(self, rung_config: RungConfig) -> ExperimentRecord:
incumbent = self._load_incumbent(rung_config.rung)
if incumbent is not None:
return incumbent
incumbent = self._evaluate_record(
rung_config.bootstrap_incumbent,
rung_config,
role="incumbent",
parent_incumbent_id=None,
mutation_note="bootstrap incumbent",
promote_to_hardware=False,
)
incumbent.became_incumbent = True
self.store.save_experiment(incumbent)
self.store.set_incumbent(rung_config.rung, incumbent.experiment_id)
return incumbent
def run_single_experiment(
self,
spec: ExperimentSpec,
rung_config: RungConfig,
role: str = "challenger",
parent_incumbent_id: str | None = None,
mutation_note: str = "direct run",
promote_to_hardware: bool = False,
) -> ExperimentRecord:
return self._evaluate_record(
spec,
rung_config,
role=role,
parent_incumbent_id=parent_incumbent_id,
mutation_note=mutation_note,
promote_to_hardware=promote_to_hardware,
)
def run_challenger_set(self, rung_config: RungConfig) -> list[ExperimentRecord]:
incumbent = self.ensure_incumbent(rung_config)
history = self._build_history(rung_config.rung) | self._experiment_history
generator = self._get_challenger_generator()
challengers = generator.generate(
incumbent.spec,
rung_config.search_space,
history,
self._accumulated_lessons,
)
records: list[ExperimentRecord] = []
for challenger in challengers:
records.append(
self._evaluate_record(
challenger.spec,
rung_config,
role="challenger",
parent_incumbent_id=incumbent.experiment_id,
mutation_note=challenger.mutation_note,
promote_to_hardware=False,
)
)
return records
def run_ratchet_step(self, rung_config: RungConfig, allow_hardware: bool = False) -> RatchetStepRecord:
incumbent = self.ensure_incumbent(rung_config)
history = self._build_history(rung_config.rung) | self._experiment_history
generator = self._get_challenger_generator()
challengers = generator.generate(
incumbent.spec,
rung_config.search_space,
history,
self._accumulated_lessons,
)
challenger_records: list[ExperimentRecord] = []
for challenger in challengers:
challenger_records.append(
self._evaluate_record(
challenger.spec,
rung_config,
role="challenger",
parent_incumbent_id=incumbent.experiment_id,
mutation_note=challenger.mutation_note,
promote_to_hardware=False,
)
)
if not challenger_records:
logger.info("No new challengers generated (search space exhausted for rung %d)", rung_config.rung)
incumbent_cheap = incumbent.cheap_result.score
promoted = [
record
for record in sorted(
challenger_records,
key=lambda item: item.cheap_result.score,
reverse=True,
)
if record.cheap_result.score > (incumbent_cheap + rung_config.tier_policy.cheap_margin)
][: rung_config.tier_policy.promote_top_k]
expensive_tier_result = "Hardware tier disabled."
if (
allow_hardware
and rung_config.tier_policy.enable_hardware
and promoted
):
if rung_config.tier_policy.confirm_incumbent_on_hardware and not incumbent.promoted_to_expensive:
incumbent = self._evaluate_record(
incumbent.spec,
rung_config,
role=incumbent.role,
parent_incumbent_id=incumbent.parent_incumbent_id,
mutation_note=incumbent.mutation_note,
promote_to_hardware=True,
)
incumbent.became_incumbent = True
self.store.save_experiment(incumbent)
self.store.set_incumbent(rung_config.rung, incumbent.experiment_id)
promoted = [
self._evaluate_record(
record.spec,
rung_config,
role=record.role,
parent_incumbent_id=record.parent_incumbent_id,
mutation_note=record.mutation_note,
promote_to_hardware=True,
)
for record in promoted[: rung_config.tier_policy.hardware_budget or len(promoted)]
]
expensive_tier_result = (
f"Promoted {len(promoted)} challengers to hardware confirmation on "
f"{rung_config.hardware.backend_name or rung_config.bootstrap_incumbent.target_backend}."
)
candidates = [incumbent, *promoted] if promoted else [incumbent, *challenger_records]
winner = max(candidates, key=lambda record: record.final_score)
winning_margin = winner.final_score - incumbent.final_score
if winner.experiment_id != incumbent.experiment_id and winning_margin > rung_config.tier_policy.confirmation_margin:
winner = replace(winner, became_incumbent=True)
self.store.save_experiment(winner)
self.store.set_incumbent(rung_config.rung, winner.experiment_id)
cheap_tier_justification = (
"Promoted challengers beat the incumbent on cheap-tier score by at least "
f"{rung_config.tier_policy.cheap_margin:.4f}."
if promoted
else "No challenger cleared the cheap-tier promotion margin."
)
distilled_lesson = self._distill_lesson(incumbent, winner, promoted)
step = RatchetStepRecord(
step_index=len(self.store.list_ratchet_steps(rung_config.rung)) + 1,
rung=rung_config.rung,
incumbent_before_id=incumbent.experiment_id,
challengers_tested=[record.experiment_id for record in challenger_records],
promoted_challengers=[record.experiment_id for record in promoted],
winner_id=winner.experiment_id,
winning_margin=winning_margin,
cheap_tier_justification=cheap_tier_justification,
expensive_tier_result=expensive_tier_result,
distilled_lesson=distilled_lesson,
)
self.store.save_ratchet_step(step)
return step
def run_rung(
self,
rung_config: RungConfig,
allow_hardware: bool = False,
) -> tuple[list[RatchetStepRecord], Any, LessonFeedback]:
# Check for resumable progress
progress = self.store.load_progress(rung_config.rung)
if progress and not progress.completed:
steps_done = progress.steps_completed
patience_left = progress.patience_remaining
baseline_incumbent = progress.current_incumbent_id
logger.info(
"Resuming rung %d from step %d (patience=%d)",
rung_config.rung, steps_done, patience_left,
)
else:
steps_done = 0
patience_left = rung_config.patience
baseline_incumbent = self.ensure_incumbent(rung_config).experiment_id
steps: list[RatchetStepRecord] = []
for step_idx in range(steps_done, rung_config.step_budget):
step = self.run_ratchet_step(rung_config, allow_hardware=allow_hardware)
steps.append(step)
if step.winner_id == baseline_incumbent:
patience_left -= 1
else:
baseline_incumbent = step.winner_id
patience_left = rung_config.patience
# Save progress after each step
self.store.save_progress(RungProgress(
rung=rung_config.rung,
steps_completed=step_idx + 1,
patience_remaining=patience_left,
current_incumbent_id=baseline_incumbent,
completed=False,
))
if patience_left <= 0:
break
# Mark rung as completed
self.store.save_progress(RungProgress(
rung=rung_config.rung,
steps_completed=steps_done + len(steps),
patience_remaining=patience_left,
current_incumbent_id=baseline_incumbent,
completed=True,
))
lesson, feedback = extract_rung_lesson(
rung_config,
self.store.list_experiments(rung_config.rung),
self.store.list_ratchet_steps(rung_config.rung),
)
self.store.save_lesson(lesson)
self.store.save_lesson_feedback(feedback)
self._accumulated_lessons.append(feedback)
return steps, lesson, feedback
def run_ratchet(
self,
rung_configs: list[RungConfig],
allow_hardware: bool = False,
) -> list[tuple[Any, LessonFeedback]]:
"""Run multiple rungs in sequence, propagating winners and lessons."""
results: list[tuple[Any, LessonFeedback]] = []
self._accumulated_lessons = []
for i, rung_config in enumerate(rung_configs):
# Propagate winner from previous rung as bootstrap
if i > 0 and results:
prev_feedback = results[-1][1]
if prev_feedback.best_spec_fields:
propagated_spec = self._propagate_spec(
prev_feedback.best_spec_fields,
rung_config,
)
rung_config = replace(
rung_config,
bootstrap_incumbent=propagated_spec,
)
logger.info(
"Propagated winner from rung %d -> rung %d bootstrap",
rung_configs[i - 1].rung,
rung_config.rung,
)
# Save propagated spec for traceability
self.store.save_propagated_spec(rung_config.rung, propagated_spec)
# Narrow search space based on accumulated lessons
if prev_feedback.narrowed_dimensions:
from ..lessons.feedback import narrow_search_space
narrowed = narrow_search_space(
rung_config.search_space,
[r for fb in self._accumulated_lessons for r in fb.rules],
)
rung_config = replace(rung_config, search_space=narrowed)
steps, lesson, feedback = self.run_rung(rung_config, allow_hardware=allow_hardware)
results.append((lesson, feedback))
return results
def _propagate_spec(
self,
best_fields: dict[str, Any],
target_config: RungConfig,
) -> ExperimentSpec:
"""Build a new ExperimentSpec for the next rung from previous winner fields."""
target_spec = target_config.bootstrap_incumbent
# Only override fields that exist in ExperimentSpec and are in the best_fields
valid_fields = set(ExperimentSpec.__dataclass_fields__.keys())
updates: dict[str, Any] = {}
for key, value in best_fields.items():
if key in valid_fields and key != "rung":
updates[key] = value
# Override rung to match the target
updates["rung"] = target_config.rung
return target_spec.with_updates(**updates)
def _distill_lesson(
self,
incumbent: ExperimentRecord,
winner: ExperimentRecord,
promoted: list[ExperimentRecord],
) -> str:
if winner.experiment_id == incumbent.experiment_id:
return (
"No ratchet this step: the incumbent remained best because challengers failed to "
f"overcome {incumbent.best_result.metrics.dominant_failure_mode}."
)
change_note = mutation_summary(incumbent.spec, winner.spec)
confirmation = "hardware-confirmed" if winner.promoted_to_expensive else "cheap-tier"
return (
f"{change_note} became the new incumbent on {confirmation} score. "
f"It improved final score by {winner.final_score - incumbent.final_score:+.4f}; "
f"{len(promoted)} challengers were strong enough to justify promotion."
)

View file

@ -0,0 +1 @@
"""Scalar scoring utilities."""

View file

@ -0,0 +1,150 @@
from __future__ import annotations
from typing import Callable
from ..models import EvaluationMetrics, FactoryMetrics, QualityWeights, ScoreConfig
def _clamp(value: float | None) -> float | None:
if value is None:
return None
return max(0.0, min(1.0, value))
def _quality_components(metrics: EvaluationMetrics, weights: QualityWeights) -> dict[str, float | None]:
spectator_alignment = None
if metrics.spectator_logical_z is not None:
spectator_alignment = (1.0 + metrics.spectator_logical_z) / 2.0
return {
"ideal_fidelity": _clamp(metrics.ideal_encoded_fidelity),
"noisy_fidelity": _clamp(metrics.noisy_encoded_fidelity),
"logical_witness": _clamp(metrics.logical_magic_witness),
"codespace_rate": _clamp(metrics.codespace_rate),
"stability_score": _clamp(metrics.stability_score),
"spectator_alignment": _clamp(spectator_alignment),
}
def weighted_acceptance_cost(
metrics: EvaluationMetrics,
tier: str,
config: ScoreConfig,
) -> tuple[float, float, float]:
weights = config.cheap_quality if tier == "cheap" else config.expensive_quality
values = _quality_components(metrics, weights)
weight_map = {
"ideal_fidelity": weights.ideal_fidelity,
"noisy_fidelity": weights.noisy_fidelity,
"logical_witness": weights.logical_witness,
"codespace_rate": weights.codespace_rate,
"stability_score": weights.stability_score,
"spectator_alignment": weights.spectator_alignment,
}
weighted_sum = 0.0
total_weight = 0.0
for key, weight in weight_map.items():
value = values[key]
if weight <= 0 or value is None:
continue
weighted_sum += weight * value
total_weight += weight
quality = weighted_sum / total_weight if total_weight else 0.0
cost = (
config.base_cost
+ (config.cost_weights.two_qubit_count * metrics.two_qubit_count)
+ (config.cost_weights.depth * metrics.depth)
+ (config.cost_weights.shot_count * metrics.shot_count)
+ (config.cost_weights.runtime_estimate * metrics.runtime_estimate)
+ (config.cost_weights.queue_cost_proxy * metrics.queue_cost_proxy)
)
metrics.total_cost = cost
score = (quality * metrics.acceptance_rate) / max(cost, 1e-9)
return score, quality, cost
def factory_throughput_score(
metrics: EvaluationMetrics,
tier: str,
config: ScoreConfig,
) -> tuple[float, float, float]:
"""Score optimised for accepted magic states per unit cost.
Computes FactoryMetrics as a side-effect attached to metrics.extra.
"""
weights = config.cheap_quality if tier == "cheap" else config.expensive_quality
values = _quality_components(metrics, weights)
weight_map = {
"ideal_fidelity": weights.ideal_fidelity,
"noisy_fidelity": weights.noisy_fidelity,
"logical_witness": weights.logical_witness,
"codespace_rate": weights.codespace_rate,
"stability_score": weights.stability_score,
"spectator_alignment": weights.spectator_alignment,
}
weighted_sum = 0.0
total_weight = 0.0
for key, weight in weight_map.items():
value = values[key]
if weight <= 0 or value is None:
continue
weighted_sum += weight * value
total_weight += weight
quality = weighted_sum / total_weight if total_weight else 0.0
# Cost with heavier penalty
cost = (
config.base_cost
+ (config.cost_weights.two_qubit_count * metrics.two_qubit_count * 1.5)
+ (config.cost_weights.depth * metrics.depth * 1.5)
+ (config.cost_weights.shot_count * metrics.shot_count)
+ (config.cost_weights.runtime_estimate * metrics.runtime_estimate)
+ (config.cost_weights.queue_cost_proxy * metrics.queue_cost_proxy)
)
metrics.total_cost = cost
# Factory-specific metrics
acceptance = metrics.acceptance_rate
witness = metrics.logical_magic_witness or 0.0
logical_error = max(0.0, 1.0 - witness)
accepted_per_shot = acceptance
accepted_per_cost = acceptance / max(cost, 1e-9)
cost_per_accepted = cost / max(acceptance, 1e-9)
quality_yield = quality * acceptance
throughput_proxy = acceptance * witness / max(cost, 1e-9)
factory = FactoryMetrics(
accepted_states_per_shot=accepted_per_shot,
logical_error_per_accepted=logical_error,
accepted_per_unit_cost=accepted_per_cost,
quality_yield=quality_yield,
cost_per_accepted=cost_per_accepted,
throughput_proxy=throughput_proxy,
)
metrics.extra["factory_metrics"] = {
"accepted_states_per_shot": factory.accepted_states_per_shot,
"logical_error_per_accepted": factory.logical_error_per_accepted,
"accepted_per_unit_cost": factory.accepted_per_unit_cost,
"quality_yield": factory.quality_yield,
"cost_per_accepted": factory.cost_per_accepted,
"throughput_proxy": factory.throughput_proxy,
}
# Score = throughput proxy (acceptance * witness / cost)
score = throughput_proxy
return score, quality, cost
SCORE_REGISTRY: dict[str, Callable[[EvaluationMetrics, str, ScoreConfig], tuple[float, float, float]]] = {
"weighted_acceptance_cost": weighted_acceptance_cost,
"factory_throughput": factory_throughput_score,
}
def score_metrics(metrics: EvaluationMetrics, tier: str, config: ScoreConfig) -> tuple[float, float, float]:
try:
score_fn = SCORE_REGISTRY[config.name]
except KeyError as exc:
raise ValueError(f"Unknown score function: {config.name}") from exc
return score_fn(metrics, tier, config)

View file

@ -0,0 +1 @@
"""Challenger generation."""

View file

@ -0,0 +1,52 @@
from __future__ import annotations
from dataclasses import dataclass
from ..models import ExperimentSpec, SearchSpaceConfig
@dataclass(frozen=True)
class GeneratedChallenger:
spec: ExperimentSpec
mutation_note: str
def generate_neighbor_challengers(
incumbent: ExperimentSpec,
search_space: SearchSpaceConfig,
history: set[str] | None = None,
) -> list[GeneratedChallenger]:
challengers: list[GeneratedChallenger] = []
seen: set[str] = set(history or set())
for field_name, values in search_space.dimensions.items():
current = getattr(incumbent, field_name)
for value in values:
normalized = tuple(value) if field_name == "initial_layout" and isinstance(value, list) else value
if normalized == current:
continue
candidate = incumbent.with_updates(**{field_name: normalized})
fingerprint = candidate.fingerprint()
if fingerprint in seen:
continue
seen.add(fingerprint)
challengers.append(
GeneratedChallenger(
spec=candidate,
mutation_note=f"{field_name}: {current} -> {normalized}",
)
)
if len(challengers) >= search_space.max_challengers_per_step:
return challengers
return challengers
def mutation_summary(parent: ExperimentSpec, child: ExperimentSpec) -> str:
changes: list[str] = []
for field_name in parent.__dataclass_fields__:
if getattr(parent, field_name) != getattr(child, field_name):
changes.append(
f"{field_name}: {getattr(parent, field_name)} -> {getattr(child, field_name)}"
)
return ", ".join(changes) if changes else "no mutation"

View file

@ -0,0 +1,277 @@
from __future__ import annotations
import random
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
from ..models import ExperimentSpec, LessonFeedback, SearchRule, SearchSpaceConfig
from .challengers import GeneratedChallenger
class ChallengerStrategy(ABC):
@abstractmethod
def generate(
self,
incumbent: ExperimentSpec,
search_space: SearchSpaceConfig,
history: set[str],
lessons: list[LessonFeedback] | None = None,
) -> list[GeneratedChallenger]:
...
class NeighborWalk(ChallengerStrategy):
"""Single-axis perturbation — the original Codex strategy, kept as baseline."""
def generate(
self,
incumbent: ExperimentSpec,
search_space: SearchSpaceConfig,
history: set[str],
lessons: list[LessonFeedback] | None = None,
) -> list[GeneratedChallenger]:
challengers: list[GeneratedChallenger] = []
seen = set(history)
for field_name, values in search_space.dimensions.items():
current = getattr(incumbent, field_name)
for value in values:
normalized = tuple(value) if field_name == "initial_layout" and isinstance(value, list) else value
if normalized == current:
continue
candidate = incumbent.with_updates(**{field_name: normalized})
fp = candidate.fingerprint()
if fp in seen:
continue
seen.add(fp)
challengers.append(
GeneratedChallenger(
spec=candidate,
mutation_note=f"neighbor: {field_name}: {current} -> {normalized}",
)
)
if len(challengers) >= search_space.max_challengers_per_step:
return challengers
return challengers
class RandomCombo(ChallengerStrategy):
"""Pick 13 random dimensions and mutate them simultaneously."""
def __init__(self, num_candidates: int = 6, max_mutations: int = 3) -> None:
self.num_candidates = num_candidates
self.max_mutations = max_mutations
def generate(
self,
incumbent: ExperimentSpec,
search_space: SearchSpaceConfig,
history: set[str],
lessons: list[LessonFeedback] | None = None,
) -> list[GeneratedChallenger]:
challengers: list[GeneratedChallenger] = []
seen = set(history)
dim_names = list(search_space.dimensions.keys())
if not dim_names:
return challengers
attempts = 0
max_attempts = self.num_candidates * 5
while len(challengers) < self.num_candidates and attempts < max_attempts:
attempts += 1
n_dims = random.randint(1, min(self.max_mutations, len(dim_names)))
chosen_dims = random.sample(dim_names, n_dims)
updates: dict[str, Any] = {}
mutation_parts: list[str] = []
for dim in chosen_dims:
values = search_space.dimensions[dim]
current = getattr(incumbent, dim)
alternatives = [v for v in values if v != current]
if not alternatives:
continue
new_val = random.choice(alternatives)
if dim == "initial_layout" and isinstance(new_val, list):
new_val = tuple(new_val)
updates[dim] = new_val
mutation_parts.append(f"{dim}: {current} -> {new_val}")
if not updates:
continue
candidate = incumbent.with_updates(**updates)
fp = candidate.fingerprint()
if fp in seen:
continue
seen.add(fp)
challengers.append(
GeneratedChallenger(
spec=candidate,
mutation_note=f"combo: {', '.join(mutation_parts)}",
)
)
return challengers
class LessonGuided(ChallengerStrategy):
"""Use SearchRules from lessons to bias generation toward promising regions."""
def __init__(self, num_candidates: int = 4) -> None:
self.num_candidates = num_candidates
def generate(
self,
incumbent: ExperimentSpec,
search_space: SearchSpaceConfig,
history: set[str],
lessons: list[LessonFeedback] | None = None,
) -> list[GeneratedChallenger]:
if not lessons:
return []
# Collect all rules across rungs
all_rules: list[SearchRule] = []
for feedback in lessons:
all_rules.extend(feedback.rules)
if not all_rules:
return []
# Build preference/avoidance maps
prefer: dict[str, list[tuple[Any, float]]] = {}
avoid: dict[str, set[Any]] = {}
fix: dict[str, Any] = {}
for rule in all_rules:
if rule.action == "prefer":
prefer.setdefault(rule.dimension, []).append((rule.value, rule.confidence))
elif rule.action == "avoid":
avoid.setdefault(rule.dimension, set()).add(rule.value)
elif rule.action == "fix":
fix[rule.dimension] = rule.value
challengers: list[GeneratedChallenger] = []
seen = set(history)
for _ in range(self.num_candidates * 3):
if len(challengers) >= self.num_candidates:
break
updates: dict[str, Any] = {}
mutation_parts: list[str] = []
# Apply "fix" rules first
for dim, value in fix.items():
if dim in search_space.dimensions:
current = getattr(incumbent, dim)
if value != current:
normalized = tuple(value) if dim == "initial_layout" and isinstance(value, list) else value
updates[dim] = normalized
mutation_parts.append(f"fix({dim}): {current} -> {normalized}")
# Then apply "prefer" rules probabilistically
for dim, preferences in prefer.items():
if dim in fix or dim not in search_space.dimensions:
continue
current = getattr(incumbent, dim)
avoided = avoid.get(dim, set())
# Weighted sampling from preferred values
candidates = [(v, c) for v, c in preferences if v != current and v not in avoided]
if not candidates and random.random() < 0.5:
# Sometimes also try non-preferred, non-avoided values
all_vals = [v for v in search_space.dimensions[dim] if v != current and v not in avoided]
if all_vals:
val = random.choice(all_vals)
normalized = tuple(val) if dim == "initial_layout" and isinstance(val, list) else val
updates[dim] = normalized
mutation_parts.append(f"explore({dim}): {current} -> {normalized}")
elif candidates:
# Weight by confidence
total = sum(c for _, c in candidates)
r = random.random() * total
cumulative = 0.0
chosen = candidates[0][0]
for val, conf in candidates:
cumulative += conf
if r <= cumulative:
chosen = val
break
normalized = tuple(chosen) if dim == "initial_layout" and isinstance(chosen, list) else chosen
updates[dim] = normalized
mutation_parts.append(f"guided({dim}): {current} -> {normalized}")
if not updates:
continue
candidate = incumbent.with_updates(**updates)
fp = candidate.fingerprint()
if fp in seen:
continue
seen.add(fp)
challengers.append(
GeneratedChallenger(
spec=candidate,
mutation_note=f"lesson: {', '.join(mutation_parts)}",
)
)
return challengers
@dataclass
class StrategyWeight:
strategy: ChallengerStrategy
weight: float
class CompositeGenerator(ChallengerStrategy):
"""Weighted combination of multiple strategies. Allocates budget proportionally."""
def __init__(self, strategies: list[StrategyWeight]) -> None:
self.strategies = strategies
def generate(
self,
incumbent: ExperimentSpec,
search_space: SearchSpaceConfig,
history: set[str],
lessons: list[LessonFeedback] | None = None,
) -> list[GeneratedChallenger]:
total_weight = sum(sw.weight for sw in self.strategies)
budget = search_space.max_challengers_per_step
all_challengers: list[GeneratedChallenger] = []
seen = set(history)
for sw in self.strategies:
allocation = max(1, int(budget * sw.weight / total_weight))
sub_space = SearchSpaceConfig(
dimensions=search_space.dimensions,
max_challengers_per_step=allocation,
)
new_challengers = sw.strategy.generate(incumbent, sub_space, seen, lessons)
for c in new_challengers:
fp = c.spec.fingerprint()
if fp not in seen:
seen.add(fp)
all_challengers.append(c)
if len(all_challengers) >= budget:
return all_challengers
return all_challengers
def default_composite(has_lessons: bool = False) -> CompositeGenerator:
"""Build the default composite generator with sensible weights."""
strategies: list[StrategyWeight] = [
StrategyWeight(NeighborWalk(), weight=0.4),
StrategyWeight(RandomCombo(), weight=0.3),
]
if has_lessons:
strategies.append(StrategyWeight(LessonGuided(), weight=0.3))
else:
# Without lessons, give more budget to exploration
strategies[1] = StrategyWeight(RandomCombo(num_candidates=8), weight=0.6)
return CompositeGenerator(strategies)

459
tests/test_harness.py Normal file
View file

@ -0,0 +1,459 @@
from __future__ import annotations
from pathlib import Path
from qiskit.quantum_info import Statevector
from autoresearch_quantum.codes.four_two_two import STABILIZERS, encoded_magic_statevector
from autoresearch_quantum.experiments.encoded_magic_state import build_circuit_bundle
from autoresearch_quantum.execution.local import LocalCheapExecutor
from autoresearch_quantum.execution.transfer import TransferEvaluator
from autoresearch_quantum.lessons.feedback import (
build_lesson_feedback,
extract_search_rules,
narrow_search_space,
)
from autoresearch_quantum.models import (
CostWeights,
ExperimentSpec,
FactoryMetrics,
HardwareConfig,
LessonFeedback,
QualityWeights,
RungConfig,
RungProgress,
ScoreConfig,
SearchRule,
SearchSpaceConfig,
TierPolicyConfig,
TransferReport,
)
from autoresearch_quantum.persistence.store import ResearchStore
from autoresearch_quantum.ratchet.runner import AutoresearchHarness
from autoresearch_quantum.scoring.score import factory_throughput_score, score_metrics
from autoresearch_quantum.search.challengers import generate_neighbor_challengers
from autoresearch_quantum.search.strategies import (
CompositeGenerator,
LessonGuided,
NeighborWalk,
RandomCombo,
StrategyWeight,
default_composite,
)
def _test_rung(search_dimensions: dict[str, list[object]] | None = None) -> RungConfig:
spec = ExperimentSpec(
rung=1,
target_backend="fake_brisbane",
noise_backend="fake_brisbane",
shots=64,
repeats=1,
)
return RungConfig(
rung=1,
name="test",
description="test rung",
objective="test objective",
bootstrap_incumbent=spec,
search_space=SearchSpaceConfig(
dimensions=search_dimensions or {"verification": ["both", "z_only"]},
max_challengers_per_step=4,
),
tier_policy=TierPolicyConfig(
cheap_margin=0.0,
confirmation_margin=0.0,
cheap_shots=64,
expensive_shots=128,
cheap_repeats=1,
expensive_repeats=1,
promote_top_k=1,
enable_hardware=False,
confirm_incumbent_on_hardware=False,
hardware_budget=0,
),
score=ScoreConfig(
cheap_quality=QualityWeights(
ideal_fidelity=0.2,
noisy_fidelity=0.3,
logical_witness=0.3,
codespace_rate=0.1,
stability_score=0.05,
spectator_alignment=0.05,
),
expensive_quality=QualityWeights(
logical_witness=0.6,
codespace_rate=0.2,
stability_score=0.1,
spectator_alignment=0.1,
),
cost_weights=CostWeights(
two_qubit_count=0.05,
depth=0.01,
shot_count=0.0001,
runtime_estimate=0.01,
queue_cost_proxy=0.0,
),
),
step_budget=1,
patience=1,
hardware=HardwareConfig(),
)
# ── Original tests ──────────────────────────────────────────────────────────
def test_encoded_target_state_satisfies_stabilizers() -> None:
state = encoded_magic_statevector()
assert isinstance(state, Statevector)
for stabilizer in STABILIZERS.values():
expectation = state.expectation_value(stabilizer)
assert abs(expectation - 1.0) < 1e-8
def test_circuit_bundle_contains_expected_contexts() -> None:
bundle = build_circuit_bundle(ExperimentSpec(rung=1))
assert set(bundle.witness_circuits) == {"logical_x", "logical_y", "spectator_z"}
for name, circuit in bundle.witness_circuits.items():
assert circuit.metadata["context"] == name
assert "logical_operator" in circuit.metadata
assert bundle.acceptance.metadata["context"] == "acceptance"
def test_local_executor_produces_score() -> None:
rung = _test_rung()
result = LocalCheapExecutor().evaluate(rung.bootstrap_incumbent, rung)
assert result.score > 0.0
assert 0.0 <= result.metrics.acceptance_rate <= 1.0
assert 0.0 <= (result.metrics.logical_magic_witness or 0.0) <= 1.0
def test_neighbor_challengers_mutate_single_dimension() -> None:
incumbent = ExperimentSpec(rung=1)
search_space = SearchSpaceConfig(
dimensions={
"verification": ["both", "z_only"],
"seed_style": ["h_p", "ry_rz"],
},
max_challengers_per_step=8,
)
challengers = generate_neighbor_challengers(incumbent, search_space)
assert len(challengers) == 2
for challenger in challengers:
changed_fields = [
field_name
for field_name in incumbent.__dataclass_fields__
if getattr(incumbent, field_name) != getattr(challenger.spec, field_name)
]
assert len(changed_fields) == 1
def test_ratchet_step_persists_incumbent_and_step(tmp_path: Path) -> None:
rung = _test_rung({"verification": ["both", "z_only"], "postselection": ["all_measured", "z_only"]})
harness = AutoresearchHarness(ResearchStore(tmp_path))
step = harness.run_ratchet_step(rung, allow_hardware=False)
assert step.step_index == 1
assert (tmp_path / "rung_1" / "incumbent.json").exists()
assert list((tmp_path / "rung_1" / "ratchet_steps").glob("*.json"))
# ── New tests: challenger strategies ────────────────────────────────────────
def test_neighbor_walk_respects_history() -> None:
incumbent = ExperimentSpec(rung=1)
search_space = SearchSpaceConfig(
dimensions={"verification": ["both", "z_only"], "seed_style": ["h_p", "ry_rz"]},
max_challengers_per_step=8,
)
# First pass: get all challengers
all_challengers = generate_neighbor_challengers(incumbent, search_space)
fps = {c.spec.fingerprint() for c in all_challengers}
# Second pass with history: should get nothing new
new_challengers = generate_neighbor_challengers(incumbent, search_space, history=fps)
assert len(new_challengers) == 0
def test_random_combo_generates_multi_axis_mutations() -> None:
incumbent = ExperimentSpec(rung=1)
search_space = SearchSpaceConfig(
dimensions={
"verification": ["both", "z_only", "x_only"],
"seed_style": ["h_p", "ry_rz", "u_magic"],
"optimization_level": [1, 2, 3],
},
max_challengers_per_step=10,
)
strategy = RandomCombo(num_candidates=10, max_mutations=3)
challengers = strategy.generate(incumbent, search_space, set())
assert len(challengers) > 0
# At least one challenger should mutate multiple dimensions
multi_axis = [
c for c in challengers
if sum(
1 for f in incumbent.__dataclass_fields__
if getattr(incumbent, f) != getattr(c.spec, f)
) > 1
]
# Probabilistic, but with 10 candidates and 3 dims it's extremely likely
assert len(multi_axis) > 0
def test_lesson_guided_uses_rules() -> None:
incumbent = ExperimentSpec(rung=1)
search_space = SearchSpaceConfig(
dimensions={
"verification": ["both", "z_only", "x_only"],
"seed_style": ["h_p", "ry_rz", "u_magic"],
},
max_challengers_per_step=8,
)
feedback = LessonFeedback(
rung=1,
rules=[
SearchRule("verification", "prefer", "z_only", 0.8, "top performer"),
SearchRule("seed_style", "avoid", "h_p", 0.6, "consistently poor"),
SearchRule("seed_style", "fix", "ry_rz", 0.9, "all top-K use this"),
],
narrowed_dimensions={},
best_spec_fields={},
)
strategy = LessonGuided(num_candidates=6)
challengers = strategy.generate(incumbent, search_space, set(), [feedback])
assert len(challengers) > 0
# All challengers should have seed_style fixed to ry_rz (from fix rule)
for c in challengers:
assert c.spec.seed_style == "ry_rz"
def test_composite_generator_combines_strategies() -> None:
incumbent = ExperimentSpec(rung=1)
search_space = SearchSpaceConfig(
dimensions={
"verification": ["both", "z_only", "x_only"],
"seed_style": ["h_p", "ry_rz", "u_magic"],
"optimization_level": [1, 2, 3],
},
max_challengers_per_step=8,
)
composite = default_composite(has_lessons=False)
challengers = composite.generate(incumbent, search_space, set())
assert len(challengers) > 0
assert len(challengers) <= 8
# ── New tests: lesson feedback ─────<E29480><E29480>────────────────────────────────────────
def test_extract_search_rules_prefer_and_avoid() -> None:
search_space = SearchSpaceConfig(
dimensions={"verification": ["both", "z_only"]},
max_challengers_per_step=4,
)
records = [
{"spec": {"verification": "z_only"}, "final_score": 0.8},
{"spec": {"verification": "z_only"}, "final_score": 0.85},
{"spec": {"verification": "z_only"}, "final_score": 0.82},
{"spec": {"verification": "both"}, "final_score": 0.5},
{"spec": {"verification": "both"}, "final_score": 0.55},
{"spec": {"verification": "both"}, "final_score": 0.52},
]
rules = extract_search_rules(records, search_space)
actions = {(r.dimension, r.action, r.value) for r in rules}
assert ("verification", "prefer", "z_only") in actions
assert ("verification", "avoid", "both") in actions
def test_narrow_search_space_removes_avoided() -> None:
search_space = SearchSpaceConfig(
dimensions={
"verification": ["both", "z_only", "x_only"],
"seed_style": ["h_p", "ry_rz", "u_magic"],
},
max_challengers_per_step=8,
)
rules = [
SearchRule("verification", "avoid", "x_only", 0.5, "poor"),
SearchRule("seed_style", "fix", "ry_rz", 0.6, "best"),
]
narrowed = narrow_search_space(search_space, rules)
assert "x_only" not in narrowed.dimensions["verification"]
assert narrowed.dimensions["seed_style"] == ["ry_rz"]
def test_build_lesson_feedback_end_to_end() -> None:
search_space = SearchSpaceConfig(
dimensions={"verification": ["both", "z_only"]},
max_challengers_per_step=4,
)
records = [
{"spec": {"verification": "z_only"}, "final_score": 0.8},
{"spec": {"verification": "z_only"}, "final_score": 0.85},
{"spec": {"verification": "both"}, "final_score": 0.5},
{"spec": {"verification": "both"}, "final_score": 0.55},
]
feedback = build_lesson_feedback(1, records, search_space)
assert feedback.rung == 1
assert len(feedback.rules) > 0
assert feedback.best_spec_fields["verification"] == "z_only"
# ── New tests: factory score ────────────────────────────────────────────────
def test_factory_throughput_score_produces_metrics() -> None:
from autoresearch_quantum.models import EvaluationMetrics
metrics = EvaluationMetrics(
ideal_encoded_fidelity=0.95,
noisy_encoded_fidelity=0.85,
logical_magic_witness=0.80,
acceptance_rate=0.70,
codespace_rate=0.65,
stability_score=0.90,
two_qubit_count=30,
depth=50,
shot_count=1024,
)
config = ScoreConfig(
name="factory_throughput",
cheap_quality=QualityWeights(
noisy_fidelity=0.3,
logical_witness=0.4,
codespace_rate=0.2,
stability_score=0.1,
),
)
score, quality, cost = factory_throughput_score(metrics, "cheap", config)
assert score > 0.0
assert quality > 0.0
assert cost > 0.0
assert "factory_metrics" in metrics.extra
fm = metrics.extra["factory_metrics"]
assert fm["accepted_states_per_shot"] == 0.70
assert fm["throughput_proxy"] > 0.0
def test_score_registry_has_factory() -> None:
from autoresearch_quantum.scoring.score import SCORE_REGISTRY
assert "factory_throughput" in SCORE_REGISTRY
# ── New tests: transfer evaluation ──────────────<E29480><E29480><EFBFBD>───────────────────────────
def test_transfer_evaluator_runs_across_backends() -> None:
rung = _test_rung()
evaluator = TransferEvaluator()
report = evaluator.evaluate_across_backends(
rung.bootstrap_incumbent,
["fake_brisbane"], # Use single backend for speed
rung,
)
assert isinstance(report, TransferReport)
assert report.transfer_score > 0.0
assert "fake_brisbane" in report.per_backend_scores
# ── New tests: persistence (progress, feedback) ───<E29480><E29480>────────────────────────
def test_save_and_load_progress(tmp_path: Path) -> None:
store = ResearchStore(tmp_path)
progress = RungProgress(
rung=1,
steps_completed=2,
patience_remaining=1,
current_incumbent_id="r1-incumbent-abc123",
completed=False,
)
store.save_progress(progress)
loaded = store.load_progress(1)
assert loaded is not None
assert loaded.steps_completed == 2
assert loaded.current_incumbent_id == "r1-incumbent-abc123"
assert not loaded.completed
def test_save_and_load_lesson_feedback(tmp_path: Path) -> None:
store = ResearchStore(tmp_path)
feedback = LessonFeedback(
rung=1,
rules=[SearchRule("verification", "prefer", "z_only", 0.8, "good")],
narrowed_dimensions={"verification": ["z_only"]},
best_spec_fields={"verification": "z_only"},
)
store.save_lesson_feedback(feedback)
loaded = store.load_lesson_feedback(1)
assert loaded is not None
assert len(loaded.rules) == 1
assert loaded.rules[0].dimension == "verification"
assert loaded.rules[0].action == "prefer"
# ── New tests: resumability in harness ──────────────────────────────────────
def test_run_rung_saves_progress(tmp_path: Path) -> None:
rung = _test_rung({"verification": ["both", "z_only"]})
store = ResearchStore(tmp_path)
harness = AutoresearchHarness(store)
steps, lesson, feedback = harness.run_rung(rung, allow_hardware=False)
assert len(steps) >= 1
progress = store.load_progress(1)
assert progress is not None
assert progress.completed
def test_run_rung_returns_lesson_and_feedback(tmp_path: Path) -> None:
rung = _test_rung({"verification": ["both", "z_only"]})
harness = AutoresearchHarness(ResearchStore(tmp_path))
steps, lesson, feedback = harness.run_rung(rung, allow_hardware=False)
assert lesson.rung == 1
assert isinstance(feedback, LessonFeedback)
assert feedback.rung == 1
# ── New tests: cross-rung propagation ──────<E29480><E29480>────────────────────────────────
def test_run_ratchet_propagates_winner(tmp_path: Path) -> None:
rung1 = _test_rung({"verification": ["both", "z_only"]})
rung2_spec = ExperimentSpec(
rung=2,
target_backend="fake_brisbane",
noise_backend="fake_brisbane",
shots=64,
repeats=1,
)
rung2 = RungConfig(
rung=2,
name="test rung 2",
description="test rung 2",
objective="test objective 2",
bootstrap_incumbent=rung2_spec,
search_space=SearchSpaceConfig(
dimensions={"verification": ["both", "z_only"]},
max_challengers_per_step=2,
),
tier_policy=rung1.tier_policy,
score=rung1.score,
step_budget=1,
patience=1,
hardware=HardwareConfig(),
)
store = ResearchStore(tmp_path)
harness = AutoresearchHarness(store)
results = harness.run_ratchet([rung1, rung2], allow_hardware=False)
assert len(results) == 2
# Both should have lesson + feedback
for lesson, feedback in results:
assert lesson is not None
assert isinstance(feedback, LessonFeedback)
# Accumulated lessons should have entries from both rungs
assert len(harness._accumulated_lessons) == 2
# ── New tests: seed determinism fix ─────────────────────────────────────────
def test_different_specs_get_different_seeds() -> None:
"""Two specs with different fingerprints should produce different seeds."""
import hashlib
spec_a = ExperimentSpec(rung=1, verification="both")
spec_b = ExperimentSpec(rung=1, verification="z_only")
seed_a = int(hashlib.sha256(f"{spec_a.fingerprint()}-0".encode()).hexdigest()[:8], 16)
seed_b = int(hashlib.sha256(f"{spec_b.fingerprint()}-0".encode()).hexdigest()[:8], 16)
assert seed_a != seed_b