mirror of
https://github.com/saymrwulf/KnowledgeRefinery.git
synced 2026-05-16 21:00:07 +00:00
macOS app for corpus ingestion, semantic search, and concept universe visualization powered by local LLMs via LM Studio. Architecture: - Go daemon (17MB single binary, zero dependencies) - chi router, pure-Go SQLite, tiktoken tokenizer - 6-stage pipeline: scan → extract → chunk → embed → annotate → conceptualize - Brute-force cosine vector search in memory - 89 tests across 8 packages - SwiftUI app (macOS 15+) - Multi-workspace management with auto-start daemons - Live pipeline progress, search, concept browser - WebGPU 3D universe renderer with Canvas2D fallback - Custom crystal app icon
750 lines
20 KiB
Go
750 lines
20 KiB
Go
package storage
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"time"
|
|
|
|
_ "modernc.org/sqlite"
|
|
)
|
|
|
|
const schemaDDL = `
|
|
CREATE TABLE IF NOT EXISTS file_assets (
|
|
id TEXT PRIMARY KEY,
|
|
path TEXT NOT NULL,
|
|
filename TEXT NOT NULL,
|
|
uti TEXT,
|
|
mime_type TEXT,
|
|
size_bytes INTEGER,
|
|
mtime_ns INTEGER,
|
|
content_hash TEXT,
|
|
scan_version INTEGER DEFAULT 1,
|
|
status TEXT DEFAULT 'pending',
|
|
error_message TEXT,
|
|
created_at TEXT,
|
|
updated_at TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_file_assets_path ON file_assets(path);
|
|
CREATE INDEX IF NOT EXISTS idx_file_assets_status ON file_assets(status);
|
|
CREATE INDEX IF NOT EXISTS idx_file_assets_content_hash ON file_assets(content_hash);
|
|
|
|
CREATE TABLE IF NOT EXISTS content_atoms (
|
|
id TEXT PRIMARY KEY,
|
|
asset_id TEXT REFERENCES file_assets(id),
|
|
atom_type TEXT NOT NULL,
|
|
sequence_index INTEGER,
|
|
payload_text TEXT,
|
|
payload_ref TEXT,
|
|
metadata_json TEXT,
|
|
evidence_anchor TEXT NOT NULL,
|
|
created_at TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_content_atoms_asset ON content_atoms(asset_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS chunks (
|
|
id TEXT PRIMARY KEY,
|
|
atom_id TEXT REFERENCES content_atoms(id),
|
|
asset_id TEXT REFERENCES file_assets(id),
|
|
chunk_text TEXT NOT NULL,
|
|
token_count INTEGER,
|
|
chunk_index INTEGER,
|
|
evidence_anchor TEXT NOT NULL,
|
|
embedding_id TEXT,
|
|
pipeline_version TEXT,
|
|
created_at TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_chunks_asset ON chunks(asset_id);
|
|
CREATE INDEX IF NOT EXISTS idx_chunks_atom ON chunks(atom_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS annotations (
|
|
id TEXT PRIMARY KEY,
|
|
chunk_id TEXT REFERENCES chunks(id),
|
|
model_id TEXT NOT NULL,
|
|
prompt_id TEXT NOT NULL,
|
|
prompt_version TEXT NOT NULL,
|
|
pipeline_version TEXT NOT NULL,
|
|
topics_json TEXT,
|
|
sentiment_label TEXT,
|
|
sentiment_confidence REAL,
|
|
entities_json TEXT,
|
|
claims_json TEXT,
|
|
summary TEXT,
|
|
quality_flags_json TEXT,
|
|
is_current INTEGER DEFAULT 1,
|
|
created_at TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_annotations_chunk ON annotations(chunk_id);
|
|
CREATE INDEX IF NOT EXISTS idx_annotations_current ON annotations(is_current);
|
|
|
|
CREATE TABLE IF NOT EXISTS concept_nodes (
|
|
id TEXT PRIMARY KEY,
|
|
level INTEGER NOT NULL,
|
|
label TEXT,
|
|
description TEXT,
|
|
parent_id TEXT REFERENCES concept_nodes(id),
|
|
exemplar_chunk_ids TEXT,
|
|
pipeline_version TEXT,
|
|
model_id TEXT,
|
|
created_at TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_concept_nodes_level ON concept_nodes(level);
|
|
|
|
CREATE TABLE IF NOT EXISTS graph_edges (
|
|
id TEXT PRIMARY KEY,
|
|
source_id TEXT NOT NULL,
|
|
target_id TEXT NOT NULL,
|
|
edge_type TEXT NOT NULL,
|
|
weight REAL,
|
|
evidence_json TEXT,
|
|
pipeline_version TEXT,
|
|
created_at TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_graph_edges_source ON graph_edges(source_id);
|
|
CREATE INDEX IF NOT EXISTS idx_graph_edges_target ON graph_edges(target_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS pipeline_jobs (
|
|
id TEXT PRIMARY KEY,
|
|
job_type TEXT NOT NULL,
|
|
status TEXT DEFAULT 'pending',
|
|
progress_json TEXT,
|
|
created_at TEXT,
|
|
updated_at TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS watched_volumes (
|
|
id TEXT PRIMARY KEY,
|
|
path TEXT NOT NULL UNIQUE,
|
|
label TEXT,
|
|
added_at TEXT,
|
|
last_scan_at TEXT
|
|
);
|
|
`
|
|
|
|
// Database provides thread-safe SQLite operations.
|
|
type Database struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
func NewDatabase(dbPath string) (*Database, error) {
|
|
db, err := sql.Open("sqlite", dbPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("open database: %w", err)
|
|
}
|
|
// SQLite pragmas
|
|
for _, pragma := range []string{
|
|
"PRAGMA journal_mode=WAL",
|
|
"PRAGMA foreign_keys=ON",
|
|
"PRAGMA busy_timeout=10000",
|
|
} {
|
|
if _, err := db.Exec(pragma); err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("exec %s: %w", pragma, err)
|
|
}
|
|
}
|
|
return &Database{db: db}, nil
|
|
}
|
|
|
|
func (d *Database) Initialize() error {
|
|
_, err := d.db.Exec(schemaDDL)
|
|
return err
|
|
}
|
|
|
|
func (d *Database) Close() error {
|
|
return d.db.Close()
|
|
}
|
|
|
|
func (d *Database) DB() *sql.DB {
|
|
return d.db
|
|
}
|
|
|
|
// -- FileAsset operations --
|
|
|
|
func (d *Database) UpsertFileAsset(a FileAsset) error {
|
|
now := nowISO()
|
|
_, err := d.db.Exec(`
|
|
INSERT INTO file_assets (id, path, filename, uti, mime_type, size_bytes, mtime_ns,
|
|
content_hash, scan_version, status, error_message, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
path=excluded.path, filename=excluded.filename, uti=excluded.uti,
|
|
mime_type=excluded.mime_type, size_bytes=excluded.size_bytes,
|
|
mtime_ns=excluded.mtime_ns, content_hash=excluded.content_hash,
|
|
scan_version=excluded.scan_version, status=excluded.status,
|
|
error_message=excluded.error_message, updated_at=?`,
|
|
a.ID, a.Path, a.Filename, a.UTI, a.MimeType, a.SizeBytes, a.MtimeNs,
|
|
a.ContentHash, a.ScanVersion, string(a.Status), a.ErrorMessage, a.CreatedAt, now, now,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (d *Database) GetFileAsset(assetID string) (*FileAsset, error) {
|
|
return d.scanFileAsset(d.db.QueryRow("SELECT * FROM file_assets WHERE id=?", assetID))
|
|
}
|
|
|
|
func (d *Database) GetFileAssetByPath(path string) (*FileAsset, error) {
|
|
return d.scanFileAsset(d.db.QueryRow("SELECT * FROM file_assets WHERE path=?", path))
|
|
}
|
|
|
|
func (d *Database) scanFileAsset(row *sql.Row) (*FileAsset, error) {
|
|
var a FileAsset
|
|
var status string
|
|
err := row.Scan(
|
|
&a.ID, &a.Path, &a.Filename, &a.UTI, &a.MimeType, &a.SizeBytes, &a.MtimeNs,
|
|
&a.ContentHash, &a.ScanVersion, &status, &a.ErrorMessage, &a.CreatedAt, &a.UpdatedAt,
|
|
)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
a.Status = AssetStatus(status)
|
|
return &a, nil
|
|
}
|
|
|
|
func (d *Database) GetAssetsByStatus(status AssetStatus, limit int) ([]FileAsset, error) {
|
|
rows, err := d.db.Query("SELECT * FROM file_assets WHERE status=? LIMIT ?", string(status), limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return d.scanFileAssets(rows)
|
|
}
|
|
|
|
func (d *Database) GetAllAssets() ([]FileAsset, error) {
|
|
rows, err := d.db.Query("SELECT * FROM file_assets")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return d.scanFileAssets(rows)
|
|
}
|
|
|
|
func (d *Database) scanFileAssets(rows *sql.Rows) ([]FileAsset, error) {
|
|
var assets []FileAsset
|
|
for rows.Next() {
|
|
var a FileAsset
|
|
var status string
|
|
err := rows.Scan(
|
|
&a.ID, &a.Path, &a.Filename, &a.UTI, &a.MimeType, &a.SizeBytes, &a.MtimeNs,
|
|
&a.ContentHash, &a.ScanVersion, &status, &a.ErrorMessage, &a.CreatedAt, &a.UpdatedAt,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
a.Status = AssetStatus(status)
|
|
assets = append(assets, a)
|
|
}
|
|
return assets, rows.Err()
|
|
}
|
|
|
|
func (d *Database) UpdateAssetStatus(assetID string, status AssetStatus, errMsg *string) error {
|
|
now := nowISO()
|
|
_, err := d.db.Exec(
|
|
"UPDATE file_assets SET status=?, error_message=?, updated_at=? WHERE id=?",
|
|
string(status), errMsg, now, assetID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (d *Database) CountAssetsByStatus() (map[string]int, error) {
|
|
rows, err := d.db.Query("SELECT status, COUNT(*) as cnt FROM file_assets GROUP BY status")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
result := make(map[string]int)
|
|
for rows.Next() {
|
|
var status string
|
|
var cnt int
|
|
if err := rows.Scan(&status, &cnt); err != nil {
|
|
return nil, err
|
|
}
|
|
result[status] = cnt
|
|
}
|
|
return result, rows.Err()
|
|
}
|
|
|
|
// -- ContentAtom operations --
|
|
|
|
func (d *Database) InsertContentAtom(a ContentAtom) error {
|
|
_, err := d.db.Exec(`
|
|
INSERT OR REPLACE INTO content_atoms
|
|
(id, asset_id, atom_type, sequence_index, payload_text, payload_ref,
|
|
metadata_json, evidence_anchor, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
a.ID, a.AssetID, string(a.AtomType), a.SequenceIndex,
|
|
a.PayloadText, a.PayloadRef, a.MetadataJSON,
|
|
a.EvidenceAnchor, a.CreatedAt,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (d *Database) InsertContentAtoms(atoms []ContentAtom) error {
|
|
tx, err := d.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
stmt, err := tx.Prepare(`
|
|
INSERT OR REPLACE INTO content_atoms
|
|
(id, asset_id, atom_type, sequence_index, payload_text, payload_ref,
|
|
metadata_json, evidence_anchor, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
defer stmt.Close()
|
|
|
|
for _, a := range atoms {
|
|
_, err := stmt.Exec(
|
|
a.ID, a.AssetID, string(a.AtomType), a.SequenceIndex,
|
|
a.PayloadText, a.PayloadRef, a.MetadataJSON,
|
|
a.EvidenceAnchor, a.CreatedAt,
|
|
)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (d *Database) GetAtomsForAsset(assetID string) ([]ContentAtom, error) {
|
|
rows, err := d.db.Query(
|
|
"SELECT * FROM content_atoms WHERE asset_id=? ORDER BY sequence_index",
|
|
assetID,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var atoms []ContentAtom
|
|
for rows.Next() {
|
|
var a ContentAtom
|
|
var atomType string
|
|
err := rows.Scan(
|
|
&a.ID, &a.AssetID, &atomType, &a.SequenceIndex,
|
|
&a.PayloadText, &a.PayloadRef, &a.MetadataJSON,
|
|
&a.EvidenceAnchor, &a.CreatedAt,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
a.AtomType = AtomType(atomType)
|
|
atoms = append(atoms, a)
|
|
}
|
|
return atoms, rows.Err()
|
|
}
|
|
|
|
func (d *Database) DeleteAtomsForAsset(assetID string) error {
|
|
_, err := d.db.Exec("DELETE FROM content_atoms WHERE asset_id=?", assetID)
|
|
return err
|
|
}
|
|
|
|
// -- Chunk operations --
|
|
|
|
func (d *Database) InsertChunk(c Chunk) error {
|
|
_, err := d.db.Exec(`
|
|
INSERT OR REPLACE INTO chunks
|
|
(id, atom_id, asset_id, chunk_text, token_count, chunk_index,
|
|
evidence_anchor, embedding_id, pipeline_version, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
c.ID, c.AtomID, c.AssetID, c.ChunkText, c.TokenCount, c.ChunkIndex,
|
|
c.EvidenceAnchor, c.EmbeddingID, c.PipelineVersion, c.CreatedAt,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (d *Database) InsertChunks(chunks []Chunk) error {
|
|
tx, err := d.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
stmt, err := tx.Prepare(`
|
|
INSERT OR REPLACE INTO chunks
|
|
(id, atom_id, asset_id, chunk_text, token_count, chunk_index,
|
|
evidence_anchor, embedding_id, pipeline_version, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
defer stmt.Close()
|
|
|
|
for _, c := range chunks {
|
|
_, err := stmt.Exec(
|
|
c.ID, c.AtomID, c.AssetID, c.ChunkText, c.TokenCount, c.ChunkIndex,
|
|
c.EvidenceAnchor, c.EmbeddingID, c.PipelineVersion, c.CreatedAt,
|
|
)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (d *Database) GetChunksForAsset(assetID string) ([]Chunk, error) {
|
|
rows, err := d.db.Query("SELECT * FROM chunks WHERE asset_id=? ORDER BY chunk_index", assetID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return d.scanChunks(rows)
|
|
}
|
|
|
|
func (d *Database) GetChunksWithoutEmbeddings(limit int) ([]Chunk, error) {
|
|
rows, err := d.db.Query("SELECT * FROM chunks WHERE embedding_id IS NULL LIMIT ?", limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return d.scanChunks(rows)
|
|
}
|
|
|
|
func (d *Database) GetChunk(chunkID string) (*Chunk, error) {
|
|
row := d.db.QueryRow("SELECT * FROM chunks WHERE id=?", chunkID)
|
|
var c Chunk
|
|
err := row.Scan(
|
|
&c.ID, &c.AtomID, &c.AssetID, &c.ChunkText, &c.TokenCount, &c.ChunkIndex,
|
|
&c.EvidenceAnchor, &c.EmbeddingID, &c.PipelineVersion, &c.CreatedAt,
|
|
)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &c, nil
|
|
}
|
|
|
|
func (d *Database) UpdateChunkEmbedding(chunkID, embeddingID string) error {
|
|
_, err := d.db.Exec("UPDATE chunks SET embedding_id=? WHERE id=?", embeddingID, chunkID)
|
|
return err
|
|
}
|
|
|
|
func (d *Database) DeleteChunksForAsset(assetID string) error {
|
|
_, err := d.db.Exec("DELETE FROM chunks WHERE asset_id=?", assetID)
|
|
return err
|
|
}
|
|
|
|
func (d *Database) CountChunks() (int, error) {
|
|
var cnt int
|
|
err := d.db.QueryRow("SELECT COUNT(*) FROM chunks").Scan(&cnt)
|
|
return cnt, err
|
|
}
|
|
|
|
func (d *Database) scanChunks(rows *sql.Rows) ([]Chunk, error) {
|
|
var chunks []Chunk
|
|
for rows.Next() {
|
|
var c Chunk
|
|
err := rows.Scan(
|
|
&c.ID, &c.AtomID, &c.AssetID, &c.ChunkText, &c.TokenCount, &c.ChunkIndex,
|
|
&c.EvidenceAnchor, &c.EmbeddingID, &c.PipelineVersion, &c.CreatedAt,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
chunks = append(chunks, c)
|
|
}
|
|
return chunks, rows.Err()
|
|
}
|
|
|
|
// -- Annotation operations --
|
|
|
|
func (d *Database) InsertAnnotation(ann Annotation) error {
|
|
tx, err := d.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Mark previous annotations as non-current
|
|
_, err = tx.Exec("UPDATE annotations SET is_current=0 WHERE chunk_id=? AND is_current=1", ann.ChunkID)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
_, err = tx.Exec(`
|
|
INSERT INTO annotations
|
|
(id, chunk_id, model_id, prompt_id, prompt_version, pipeline_version,
|
|
topics_json, sentiment_label, sentiment_confidence, entities_json,
|
|
claims_json, summary, quality_flags_json, is_current, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
ann.ID, ann.ChunkID, ann.ModelID, ann.PromptID, ann.PromptVersion,
|
|
ann.PipelineVersion, ann.TopicsJSON, ann.SentimentLabel,
|
|
ann.SentimentConfidence, ann.EntitiesJSON, ann.ClaimsJSON,
|
|
ann.Summary, ann.QualityFlagsJSON, ann.IsCurrent, ann.CreatedAt,
|
|
)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (d *Database) GetCurrentAnnotation(chunkID string) (*Annotation, error) {
|
|
row := d.db.QueryRow(
|
|
"SELECT * FROM annotations WHERE chunk_id=? AND is_current=1",
|
|
chunkID,
|
|
)
|
|
var a Annotation
|
|
err := row.Scan(
|
|
&a.ID, &a.ChunkID, &a.ModelID, &a.PromptID, &a.PromptVersion,
|
|
&a.PipelineVersion, &a.TopicsJSON, &a.SentimentLabel,
|
|
&a.SentimentConfidence, &a.EntitiesJSON, &a.ClaimsJSON,
|
|
&a.Summary, &a.QualityFlagsJSON, &a.IsCurrent, &a.CreatedAt,
|
|
)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &a, nil
|
|
}
|
|
|
|
func (d *Database) CountAnnotations() (int, error) {
|
|
var cnt int
|
|
err := d.db.QueryRow("SELECT COUNT(*) FROM annotations WHERE is_current=1").Scan(&cnt)
|
|
return cnt, err
|
|
}
|
|
|
|
// -- ConceptNode operations --
|
|
|
|
func (d *Database) InsertConceptNode(node ConceptNode) error {
|
|
_, err := d.db.Exec(`
|
|
INSERT OR REPLACE INTO concept_nodes
|
|
(id, level, label, description, parent_id, exemplar_chunk_ids,
|
|
pipeline_version, model_id, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
node.ID, node.Level, node.Label, node.Description, node.ParentID,
|
|
node.ExemplarChunkIDs, node.PipelineVersion, node.ModelID, node.CreatedAt,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (d *Database) GetConceptNodes(level *int) ([]ConceptNode, error) {
|
|
var rows *sql.Rows
|
|
var err error
|
|
if level != nil {
|
|
rows, err = d.db.Query("SELECT * FROM concept_nodes WHERE level=?", *level)
|
|
} else {
|
|
rows, err = d.db.Query("SELECT * FROM concept_nodes")
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var nodes []ConceptNode
|
|
for rows.Next() {
|
|
var n ConceptNode
|
|
err := rows.Scan(
|
|
&n.ID, &n.Level, &n.Label, &n.Description, &n.ParentID,
|
|
&n.ExemplarChunkIDs, &n.PipelineVersion, &n.ModelID, &n.CreatedAt,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
nodes = append(nodes, n)
|
|
}
|
|
return nodes, rows.Err()
|
|
}
|
|
|
|
func (d *Database) CountConcepts() (int, error) {
|
|
var cnt int
|
|
err := d.db.QueryRow("SELECT COUNT(*) FROM concept_nodes").Scan(&cnt)
|
|
return cnt, err
|
|
}
|
|
|
|
// -- GraphEdge operations --
|
|
|
|
func (d *Database) InsertGraphEdge(edge GraphEdge) error {
|
|
_, err := d.db.Exec(`
|
|
INSERT OR REPLACE INTO graph_edges
|
|
(id, source_id, target_id, edge_type, weight, evidence_json,
|
|
pipeline_version, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
edge.ID, edge.SourceID, edge.TargetID, edge.EdgeType,
|
|
edge.Weight, edge.EvidenceJSON, edge.PipelineVersion, edge.CreatedAt,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (d *Database) CountEdges() (int, error) {
|
|
var cnt int
|
|
err := d.db.QueryRow("SELECT COUNT(*) FROM graph_edges").Scan(&cnt)
|
|
return cnt, err
|
|
}
|
|
|
|
// GetGraphEdges retrieves edges with optional ordering and limit.
|
|
func (d *Database) GetGraphEdges(orderBy string, limit int) ([]GraphEdge, error) {
|
|
q := "SELECT * FROM graph_edges"
|
|
if orderBy != "" {
|
|
q += " ORDER BY " + orderBy
|
|
}
|
|
if limit > 0 {
|
|
q += fmt.Sprintf(" LIMIT %d", limit)
|
|
}
|
|
rows, err := d.db.Query(q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return d.scanEdges(rows)
|
|
}
|
|
|
|
// GetEdgesForNode retrieves edges involving a specific node.
|
|
func (d *Database) GetEdgesForNode(nodeID string, limit int) ([]GraphEdge, error) {
|
|
rows, err := d.db.Query(
|
|
"SELECT * FROM graph_edges WHERE source_id=? OR target_id=? ORDER BY weight DESC LIMIT ?",
|
|
nodeID, nodeID, limit,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return d.scanEdges(rows)
|
|
}
|
|
|
|
// GetMemberChunkIDs returns target_ids for concept_member edges from a concept.
|
|
func (d *Database) GetMemberChunkIDs(conceptID string) ([]string, error) {
|
|
rows, err := d.db.Query(
|
|
"SELECT target_id FROM graph_edges WHERE source_id=? AND edge_type='concept_member'",
|
|
conceptID,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var ids []string
|
|
for rows.Next() {
|
|
var id string
|
|
if err := rows.Scan(&id); err != nil {
|
|
return nil, err
|
|
}
|
|
ids = append(ids, id)
|
|
}
|
|
return ids, rows.Err()
|
|
}
|
|
|
|
func (d *Database) scanEdges(rows *sql.Rows) ([]GraphEdge, error) {
|
|
var edges []GraphEdge
|
|
for rows.Next() {
|
|
var e GraphEdge
|
|
err := rows.Scan(
|
|
&e.ID, &e.SourceID, &e.TargetID, &e.EdgeType,
|
|
&e.Weight, &e.EvidenceJSON, &e.PipelineVersion, &e.CreatedAt,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
edges = append(edges, e)
|
|
}
|
|
return edges, rows.Err()
|
|
}
|
|
|
|
// -- PipelineJob operations --
|
|
|
|
func (d *Database) UpsertPipelineJob(job PipelineJob) error {
|
|
now := nowISO()
|
|
_, err := d.db.Exec(`
|
|
INSERT INTO pipeline_jobs (id, job_type, status, progress_json, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
status=excluded.status, progress_json=excluded.progress_json, updated_at=?`,
|
|
job.ID, job.JobType, string(job.Status), job.ProgressJSON, job.CreatedAt, now, now,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (d *Database) GetLatestJob(jobType *string) (*PipelineJob, error) {
|
|
var row *sql.Row
|
|
if jobType != nil {
|
|
row = d.db.QueryRow(
|
|
"SELECT * FROM pipeline_jobs WHERE job_type=? ORDER BY created_at DESC LIMIT 1",
|
|
*jobType,
|
|
)
|
|
} else {
|
|
row = d.db.QueryRow("SELECT * FROM pipeline_jobs ORDER BY created_at DESC LIMIT 1")
|
|
}
|
|
var j PipelineJob
|
|
var status string
|
|
err := row.Scan(&j.ID, &j.JobType, &status, &j.ProgressJSON, &j.CreatedAt, &j.UpdatedAt)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
j.Status = JobStatus(status)
|
|
return &j, nil
|
|
}
|
|
|
|
func (d *Database) UpdateJobStatus(jobID string, status JobStatus, progress *string) error {
|
|
now := nowISO()
|
|
_, err := d.db.Exec(
|
|
"UPDATE pipeline_jobs SET status=?, progress_json=?, updated_at=? WHERE id=?",
|
|
string(status), progress, now, jobID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
// -- WatchedVolume operations --
|
|
|
|
func (d *Database) AddWatchedVolume(vol WatchedVolume) error {
|
|
_, err := d.db.Exec(`
|
|
INSERT INTO watched_volumes (id, path, label, added_at, last_scan_at)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(path) DO UPDATE SET label=excluded.label`,
|
|
vol.ID, vol.Path, vol.Label, vol.AddedAt, vol.LastScanAt,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (d *Database) GetWatchedVolumes() ([]WatchedVolume, error) {
|
|
rows, err := d.db.Query("SELECT * FROM watched_volumes")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var vols []WatchedVolume
|
|
for rows.Next() {
|
|
var v WatchedVolume
|
|
if err := rows.Scan(&v.ID, &v.Path, &v.Label, &v.AddedAt, &v.LastScanAt); err != nil {
|
|
return nil, err
|
|
}
|
|
vols = append(vols, v)
|
|
}
|
|
return vols, rows.Err()
|
|
}
|
|
|
|
func (d *Database) RemoveWatchedVolume(path string) error {
|
|
_, err := d.db.Exec("DELETE FROM watched_volumes WHERE path=?", path)
|
|
return err
|
|
}
|
|
|
|
func (d *Database) UpdateVolumeScanTime(volID string) error {
|
|
now := time.Now().UTC().Format(time.RFC3339)
|
|
_, err := d.db.Exec("UPDATE watched_volumes SET last_scan_at=? WHERE id=?", now, volID)
|
|
return err
|
|
}
|
|
|
|
// GetConceptNodeByID retrieves a single concept node.
|
|
func (d *Database) GetConceptNodeByID(id string) (*ConceptNode, error) {
|
|
row := d.db.QueryRow("SELECT * FROM concept_nodes WHERE id=?", id)
|
|
var n ConceptNode
|
|
err := row.Scan(
|
|
&n.ID, &n.Level, &n.Label, &n.Description, &n.ParentID,
|
|
&n.ExemplarChunkIDs, &n.PipelineVersion, &n.ModelID, &n.CreatedAt,
|
|
)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &n, nil
|
|
}
|