Coverage for src/local_deep_research/database/alembic_runner.py: 91%
136 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Programmatic Alembic migration runner for per-user encrypted databases.
4This module provides functions to run Alembic migrations against SQLCipher
5encrypted databases without using the Alembic CLI. Each user database
6tracks its own migration version via the alembic_version table.
7"""
9import os
10import time
11from pathlib import Path
12from typing import Optional
14from alembic import command
15from alembic.config import Config
16from alembic.runtime.migration import MigrationContext
17from alembic.script import ScriptDirectory
18from loguru import logger
19from sqlalchemy import Connection, Engine, inspect
20from sqlalchemy.exc import IntegrityError, OperationalError
23def get_migrations_dir() -> Path:
24 """
25 Get the path to the migrations directory with security validation.
27 Validates that the migrations directory is within the expected package
28 boundary to prevent symlink attacks that could redirect migration loading
29 to arbitrary locations.
31 Returns:
32 Path to the migrations directory
34 Raises:
35 ValueError: If the migrations path is outside expected boundaries
36 """
37 migrations_dir = Path(__file__).parent / "migrations"
38 real_path = migrations_dir.resolve()
39 expected_parent = Path(__file__).parent.resolve()
41 # Security: Ensure migrations directory is within expected package boundary
42 # This prevents symlink attacks that could load arbitrary Python code
43 if not real_path.is_relative_to(expected_parent): 43 ↛ 44line 43 didn't jump to line 44 because the condition on line 43 was never true
44 raise ValueError(
45 "Invalid migrations path (possible symlink attack): "
46 "migrations dir resolves outside expected package boundary"
47 )
49 return migrations_dir
52def _validate_migrations_permissions(migrations_dir: Path) -> None:
53 """
54 Validate migration files are not world-writable.
56 World-writable migration files could be replaced with malicious code
57 that would execute during database migrations with the application's
58 privileges.
60 Args:
61 migrations_dir: Path to the migrations directory
63 Raises:
64 ValueError: If any migration file is world-writable
66 Note:
67 This check is skipped on Windows where file permissions work differently.
68 """
69 if os.name == "nt": # Skip permission checks on Windows
70 return
72 versions_dir = migrations_dir / "versions"
73 if not versions_dir.exists(): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true
74 return
76 # Check the versions directory itself
77 st = versions_dir.stat()
78 if st.st_mode & 0o002:
79 raise ValueError(
80 f"Migrations directory has insecure permissions (world-writable): "
81 f"{versions_dir}. Fix with: chmod o-w {versions_dir}"
82 )
84 for migration_file in versions_dir.glob("*.py"):
85 st = migration_file.stat()
86 if st.st_mode & 0o002: # World-writable bit
87 raise ValueError(
88 f"Migration file has insecure permissions (world-writable): "
89 f"{migration_file.name}. "
90 f"Fix with: chmod o-w {migration_file}"
91 )
94def get_alembic_config(engine: Engine) -> Config:
95 """
96 Create an Alembic Config object for programmatic usage.
98 Args:
99 engine: SQLAlchemy engine to run migrations against
101 Returns:
102 Configured Alembic Config object
103 """
104 migrations_dir = get_migrations_dir()
106 # Create config object without ini file
107 config = Config()
109 # Set script location
110 config.set_main_option("script_location", str(migrations_dir))
112 # Set SQLAlchemy URL (not actually used since we pass connection directly)
113 # But Alembic requires it to be set
114 config.set_main_option("sqlalchemy.url", "sqlite:///:memory:")
116 return config
119def get_current_revision(engine: Engine) -> Optional[str]:
120 """
121 Get the current migration revision for a database.
123 Args:
124 engine: SQLAlchemy engine
126 Returns:
127 Current revision string or None if no migrations have run
128 """
129 with engine.connect() as conn:
130 context = MigrationContext.configure(conn)
131 return context.get_current_revision()
134def get_head_revision() -> Optional[str]:
135 """
136 Get the latest migration revision.
138 Returns:
139 Head revision string, or None if no migrations exist
140 """
141 migrations_dir = get_migrations_dir()
142 config = Config()
143 config.set_main_option("script_location", str(migrations_dir))
145 script = ScriptDirectory.from_config(config)
146 return script.get_current_head()
149def needs_migration(engine: Engine) -> bool:
150 """
151 Check if a database needs migrations.
153 Args:
154 engine: SQLAlchemy engine
156 Returns:
157 True if migrations are pending
158 """
159 head = get_head_revision()
161 if head is None: 161 ↛ 163line 161 didn't jump to line 163 because the condition on line 161 was never true
162 # No migrations exist yet
163 return False
165 current = get_current_revision(engine)
167 if current is None:
168 # Check if this is a fresh database or existing without migrations
169 inspector = inspect(engine)
170 tables = inspector.get_table_names()
172 if not tables:
173 # Fresh database, needs initial migration
174 return True
175 if "alembic_version" not in tables: 175 ↛ 179line 175 didn't jump to line 179 because the condition on line 175 was always true
176 # Existing database without Alembic - needs stamping then check
177 return True
179 return current != head
182def stamp_database(engine: Engine, revision: str = "head") -> None:
183 """
184 Stamp a database with a revision without running migrations.
185 Used for baselining existing databases.
187 Concurrency: If two callers race to stamp a fresh database, one will hit
188 "table alembic_version already exists" (OperationalError) or a duplicate
189 PK on version_num (IntegrityError). Both outcomes are benign — the DB
190 ends up stamped — so we swallow them after verifying the table+row are
191 in place. A genuine failure (no row appeared) is re-raised.
193 Args:
194 engine: SQLAlchemy engine
195 revision: Revision to stamp (default "head")
196 """
197 config = get_alembic_config(engine)
199 try:
200 with engine.begin() as conn:
201 config.attributes["connection"] = conn
202 command.stamp(config, revision)
203 except (IntegrityError, OperationalError) as exc:
204 # Only swallow errors that look like a benign concurrent-stamp
205 # race on the alembic_version table itself. A genuine failure
206 # (disk full, SQLITE_BUSY on an unrelated table, corruption,
207 # etc.) must propagate so callers see the real error.
208 msg = str(exc).lower()
209 looks_like_race = (
210 "alembic_version" in msg # IntegrityError or table-exists race
211 or "already exists" in msg # CREATE TABLE race
212 )
213 if not looks_like_race or get_current_revision(engine) is None: 213 ↛ 217line 213 didn't jump to line 217 because the condition on line 213 was always true
214 raise
215 # Race-loss path: another caller stamped first. Don't claim we
216 # stamped it ourselves — log at debug only.
217 logger.debug(
218 f"stamp_database({revision}) lost race to concurrent caller "
219 f"({type(exc).__name__}); database is stamped, continuing"
220 )
221 return
223 logger.info(f"Stamped database at revision: {revision}")
226def _drop_orphan_alembic_temp_tables(conn: Connection) -> None:
227 """Drop leftover ``_alembic_tmp_<table>`` tables from prior failed
228 batch_alter_table runs (issue #3817).
230 ``op.batch_alter_table`` rebuilds a table by creating
231 ``_alembic_tmp_<table>``, copying data, dropping the original, and
232 renaming. On a clean run alembic drops the temp table automatically.
233 If a previous attempt failed in a way that bypassed transaction
234 rollback (e.g., an older migration runner that auto-committed each
235 migration, or a process killed mid-DDL on a non-transactional
236 sqlite build), the temp table persists. The next attempt then fails
237 at ``op.batch_alter_table`` with ``table _alembic_tmp_* already exists``.
239 This runs in autocommit mode at the SQLite level — each ``DROP TABLE``
240 briefly takes the file write lock and releases it. If a concurrent
241 migration is mid-batch_alter_table on the same table, our DROP blocks
242 on the SQLite write lock (busy_timeout=10000); by the time we
243 acquire it, the concurrent migration's rename has consumed the temp
244 table and our DROP IF EXISTS is a no-op. The race is benign.
245 """
246 inspector = inspect(conn)
247 temp_tables = [
248 name
249 for name in inspector.get_table_names()
250 if name.startswith("_alembic_tmp_")
251 ]
252 if not temp_tables:
253 return
254 logger.warning(
255 f"Found {len(temp_tables)} orphan alembic temp table(s) from a "
256 f"prior failed migration: {sorted(temp_tables)}. Dropping before retry."
257 )
258 for name in temp_tables:
259 # Identifier is constrained to the ``_alembic_tmp_`` prefix + a
260 # parent table name from ``inspector.get_table_names()``; both
261 # come from the database's own catalog and cannot contain
262 # injection vectors.
263 conn.exec_driver_sql(f'DROP TABLE IF EXISTS "{name}"') # noqa: S608 # bearer:disable python_lang_sql_injection
266def _disable_fk_for_migration(conn: Connection) -> None:
267 """Disable FK enforcement on the migration connection BEFORE any
268 transaction opens (issue #3990).
270 ``apply_performance_pragmas`` set ``PRAGMA foreign_keys = ON`` at
271 connect. SQLite then *silently ignores* further toggles of
272 ``foreign_keys`` once any transaction (explicit or driver-implicit)
273 is active. The sqlite3/sqlcipher3 driver auto-begins on the first
274 DML; PRAGMA itself isn't DML, so issuing the PRAGMA before any DML
275 is the only window where it actually takes effect.
277 With multi-migration upgrades (revision 0001 → 0009), the first
278 migration to issue DML auto-begins the driver transaction and
279 freezes FK in the connect-time ON state for the rest of the upgrade.
280 That defeats migration 0007's defensive PRAGMA OFF and makes its
281 orphan-scrub DELETE fail with ``foreign key mismatch`` on tables
282 whose FK target lacks a UNIQUE backing — exactly the broken-schema
283 state migration 0007 is meant to repair.
285 The caller is responsible for re-enabling FK after the migration
286 transaction commits, BEFORE returning the connection to the pool —
287 see ``run_migrations``.
288 """
289 conn.exec_driver_sql("PRAGMA foreign_keys = OFF")
290 # ``exec_driver_sql`` triggers SQLAlchemy autobegin even though no
291 # sqlite-level transaction was opened (PRAGMA isn't DML, so the
292 # driver doesn't auto-begin). Roll back the no-op SQLAlchemy
293 # transaction so the caller's ``conn.begin()`` is allowed to start
294 # a fresh one. ``PRAGMA foreign_keys`` is connection-level state and
295 # survives ROLLBACK at the SQLite level — see
296 # https://www.sqlite.org/pragma.html#pragma_foreign_keys.
297 conn.rollback()
300def run_migrations(engine: Engine, target: str = "head") -> None:
301 """
302 Run pending migrations on a database.
304 The initial migration is idempotent (only creates tables that don't exist),
305 so this function runs migrations rather than just stamping existing
306 databases. This ensures any missing tables are created.
308 When ``target == "head"`` and the database is already at head, the call
309 short-circuits without opening a write transaction — calling
310 ``command.upgrade()`` unconditionally would still open a write transaction
311 via ``engine.begin()`` (taking a RESERVED lock on the SQLite file) just to
312 discover there's nothing to apply, serialising concurrent readers behind
313 a no-op on every cold engine reopen.
315 Security validations performed before running migrations:
316 - Migration directory path is within expected package boundary
317 - Migration files are not world-writable
319 Pre-upgrade hygiene (run outside the migration transaction):
320 - Drop orphan ``_alembic_tmp_*`` tables from prior failed
321 ``batch_alter_table`` runs (issue #3817).
322 - Disable ``PRAGMA foreign_keys`` so 0007's orphan scrub can run
323 (issue #3990). Re-enabled after a successful upgrade, before the
324 connection returns to the pool.
326 On failure inside the migration transaction, the inner
327 ``conn.begin()`` rolls back automatically — the database stays at
328 its previous revision. The original exception is re-raised so
329 callers can decide how to handle it.
331 Args:
332 engine: SQLAlchemy engine to migrate
333 target: Target revision (default "head" for latest)
335 Raises:
336 Exception: If migration fails (database is safely rolled back)
337 """
338 migration_start = time.perf_counter()
340 # Security: Validate migrations directory and file permissions
341 migrations_dir = get_migrations_dir()
342 _validate_migrations_permissions(migrations_dir)
344 head = get_head_revision()
346 if head is None:
347 # No migrations exist yet - nothing to do
348 logger.debug("No migrations found, skipping")
349 return
351 current = get_current_revision(engine)
353 # BUG-3747: Pre-Alembic baseline detection.
354 #
355 # A database that has schema tables but no alembic_version row was
356 # created before commit 4fde036df (v1.4.0, 2026-03-21) via
357 # Base.metadata.create_all(). Without stamping, command.upgrade() runs
358 # 0001 (no-op for existing tables) followed by 0002+ against a legacy
359 # column shape. Migration 0007's index backfill silently fails on
360 # missing columns (e.g. settings.category), leaving the DB in a
361 # corrupted state. Stamping at "0001" bypasses the broken path.
362 if current is None:
363 inspector = inspect(engine)
364 existing_tables = set(inspector.get_table_names())
366 # Defensive guard: refuse what looks like an auth database. The
367 # auth DB has its own initialization path (`init_auth_database()`
368 # in `auth_db.py`) and contains ONLY the `users` table. Pre-
369 # Alembic user DBs ALSO contain `users` (created by the old
370 # `Base.metadata.create_all()` path before migration 0001 added
371 # the explicit skip), so we cannot just check "users present".
372 # Instead we check the auth-DB *shape*: only `users`, optionally
373 # alongside `alembic_version`. A real user DB always has 50+
374 # other tables. If the auth engine is ever accidentally routed
375 # through this function, this guard will refuse loudly rather
376 # than silently pollute the auth DB with user-DB tables.
377 non_metadata_tables = existing_tables - {"alembic_version"}
378 if non_metadata_tables == {"users"}:
379 raise RuntimeError(
380 "Refusing to run migrations on what looks like an auth "
381 f"database (only 'users' table present; tables: "
382 f"{sorted(existing_tables)}). Auth DB is initialized via "
383 "init_auth_database()."
384 )
386 # User-DB sentinels: both tables date to project inception
387 # (2025-06-29) and have never been renamed. We require BOTH —
388 # any single one could be present on a partial-init test DB
389 # (e.g. one that ran `Setting.__table__.create()` directly)
390 # where we'd want 0001's `create_all()` to add the missing
391 # tables, not be skipped by stamping. A real pre-Alembic
392 # production DB has 60+ tables and definitely has both sentinels.
393 PRE_ALEMBIC_SENTINELS = {"settings", "research_history"}
394 if PRE_ALEMBIC_SENTINELS.issubset(existing_tables):
395 logger.warning(
396 "BUG-3747: pre-Alembic database detected "
397 f"({len(existing_tables)} tables, no alembic_version). "
398 "Stamping at revision 0001 before applying migrations."
399 )
400 stamp_database(engine, "0001")
401 current = get_current_revision(engine)
402 logger.info(
403 f"BUG-3747: pre-Alembic DB stamped at {current}; "
404 "proceeding with upgrade to head"
405 )
407 # Short-circuit when the database is already at head. Calling
408 # command.upgrade() unconditionally opens a write transaction via
409 # engine.begin() even when there is nothing to apply — SQLite takes
410 # a RESERVED lock on the file as soon as the first DML lands inside
411 # that transaction, serialising concurrent readers behind a no-op on
412 # every cold engine reopen. The fresh-DB path (current is None) still
413 # runs the upgrade so tables and the alembic_version row get created.
414 if current is not None and current == head and target == "head":
415 logger.info(f"Database already at revision {head}; skipping upgrade")
416 return
418 if current is None:
419 logger.warning(
420 "Database has no migration history — applying migrations "
421 f"(target={target})"
422 )
423 elif current != head and target == "head":
424 logger.warning(
425 f"Database schema outdated (revision {current}, "
426 f"head is {head}) — applying migrations"
427 )
429 config = get_alembic_config(engine)
431 try:
432 with engine.connect() as conn:
433 _drop_orphan_alembic_temp_tables(conn)
434 _disable_fk_for_migration(conn)
435 with conn.begin():
436 config.attributes["connection"] = conn
437 command.upgrade(config, target)
438 # Re-enable FK on this connection BEFORE it returns to the
439 # pool so subsequent checkouts see the production-default ON
440 # state. The migration transaction has just committed, so we
441 # are back outside any active transaction — PRAGMA toggles
442 # work again. We can't rely on ``engine.dispose()`` to force
443 # a fresh connection because engines built with ``creator=``
444 # have ``url.database is None``, which fails the dispose
445 # guard below and leaves FK=OFF leaking into the pool.
446 conn.exec_driver_sql("PRAGMA foreign_keys = ON")
447 conn.rollback()
448 except Exception:
449 logger.exception(
450 "Database migration failed — database remains at previous "
451 "revision (auto-rollback by transaction manager)"
452 )
453 raise
455 # Belt-and-suspenders: dispose pooled connections after a successful
456 # upgrade. With FK explicitly re-enabled above this is no longer
457 # load-bearing for FK state, but it forces the next checkout through
458 # ``apply_performance_pragmas`` which also resets temp_store, cache_size,
459 # journal_mode, etc. — protecting against any future migration that
460 # touches connection-level PRAGMAs not handled by the FK fix-up.
461 # Skip for ``:memory:`` engines — those use a single shared connection
462 # and disposing it would destroy the just-migrated database.
463 db_name = engine.url.database
464 if db_name and db_name != ":memory:":
465 engine.dispose()
467 new_revision = get_current_revision(engine)
468 elapsed_ms = (time.perf_counter() - migration_start) * 1000
469 if current != new_revision: 469 ↛ 474line 469 didn't jump to line 474 because the condition on line 469 was always true
470 logger.warning(
471 f"Database migrated: {current} -> {new_revision} "
472 f"({elapsed_ms:.0f}ms)"
473 )
474 elif elapsed_ms > 100:
475 logger.info(
476 f"Database already at revision {new_revision} "
477 f"(no-op upgrade took {elapsed_ms:.0f}ms)"
478 )
479 else:
480 logger.info(
481 f"Database already at revision {new_revision} ({elapsed_ms:.0f}ms)"
482 )