Coverage for src/local_deep_research/database/alembic_runner.py: 91%

136 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Programmatic Alembic migration runner for per-user encrypted databases. 

3 

4This module provides functions to run Alembic migrations against SQLCipher 

5encrypted databases without using the Alembic CLI. Each user database 

6tracks its own migration version via the alembic_version table. 

7""" 

8 

9import os 

10import time 

11from pathlib import Path 

12from typing import Optional 

13 

14from alembic import command 

15from alembic.config import Config 

16from alembic.runtime.migration import MigrationContext 

17from alembic.script import ScriptDirectory 

18from loguru import logger 

19from sqlalchemy import Connection, Engine, inspect 

20from sqlalchemy.exc import IntegrityError, OperationalError 

21 

22 

23def get_migrations_dir() -> Path: 

24 """ 

25 Get the path to the migrations directory with security validation. 

26 

27 Validates that the migrations directory is within the expected package 

28 boundary to prevent symlink attacks that could redirect migration loading 

29 to arbitrary locations. 

30 

31 Returns: 

32 Path to the migrations directory 

33 

34 Raises: 

35 ValueError: If the migrations path is outside expected boundaries 

36 """ 

37 migrations_dir = Path(__file__).parent / "migrations" 

38 real_path = migrations_dir.resolve() 

39 expected_parent = Path(__file__).parent.resolve() 

40 

41 # Security: Ensure migrations directory is within expected package boundary 

42 # This prevents symlink attacks that could load arbitrary Python code 

43 if not real_path.is_relative_to(expected_parent): 43 ↛ 44line 43 didn't jump to line 44 because the condition on line 43 was never true

44 raise ValueError( 

45 "Invalid migrations path (possible symlink attack): " 

46 "migrations dir resolves outside expected package boundary" 

47 ) 

48 

49 return migrations_dir 

50 

51 

52def _validate_migrations_permissions(migrations_dir: Path) -> None: 

53 """ 

54 Validate migration files are not world-writable. 

55 

56 World-writable migration files could be replaced with malicious code 

57 that would execute during database migrations with the application's 

58 privileges. 

59 

60 Args: 

61 migrations_dir: Path to the migrations directory 

62 

63 Raises: 

64 ValueError: If any migration file is world-writable 

65 

66 Note: 

67 This check is skipped on Windows where file permissions work differently. 

68 """ 

69 if os.name == "nt": # Skip permission checks on Windows 

70 return 

71 

72 versions_dir = migrations_dir / "versions" 

73 if not versions_dir.exists(): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 return 

75 

76 # Check the versions directory itself 

77 st = versions_dir.stat() 

78 if st.st_mode & 0o002: 

79 raise ValueError( 

80 f"Migrations directory has insecure permissions (world-writable): " 

81 f"{versions_dir}. Fix with: chmod o-w {versions_dir}" 

82 ) 

83 

84 for migration_file in versions_dir.glob("*.py"): 

85 st = migration_file.stat() 

86 if st.st_mode & 0o002: # World-writable bit 

87 raise ValueError( 

88 f"Migration file has insecure permissions (world-writable): " 

89 f"{migration_file.name}. " 

90 f"Fix with: chmod o-w {migration_file}" 

91 ) 

92 

93 

94def get_alembic_config(engine: Engine) -> Config: 

95 """ 

96 Create an Alembic Config object for programmatic usage. 

97 

98 Args: 

99 engine: SQLAlchemy engine to run migrations against 

100 

101 Returns: 

102 Configured Alembic Config object 

103 """ 

104 migrations_dir = get_migrations_dir() 

105 

106 # Create config object without ini file 

107 config = Config() 

108 

109 # Set script location 

110 config.set_main_option("script_location", str(migrations_dir)) 

111 

112 # Set SQLAlchemy URL (not actually used since we pass connection directly) 

113 # But Alembic requires it to be set 

114 config.set_main_option("sqlalchemy.url", "sqlite:///:memory:") 

115 

116 return config 

117 

118 

119def get_current_revision(engine: Engine) -> Optional[str]: 

120 """ 

121 Get the current migration revision for a database. 

122 

123 Args: 

124 engine: SQLAlchemy engine 

125 

126 Returns: 

127 Current revision string or None if no migrations have run 

128 """ 

129 with engine.connect() as conn: 

130 context = MigrationContext.configure(conn) 

131 return context.get_current_revision() 

132 

133 

134def get_head_revision() -> Optional[str]: 

135 """ 

136 Get the latest migration revision. 

137 

138 Returns: 

139 Head revision string, or None if no migrations exist 

140 """ 

141 migrations_dir = get_migrations_dir() 

142 config = Config() 

143 config.set_main_option("script_location", str(migrations_dir)) 

144 

145 script = ScriptDirectory.from_config(config) 

146 return script.get_current_head() 

147 

148 

149def needs_migration(engine: Engine) -> bool: 

150 """ 

151 Check if a database needs migrations. 

152 

153 Args: 

154 engine: SQLAlchemy engine 

155 

156 Returns: 

157 True if migrations are pending 

158 """ 

159 head = get_head_revision() 

160 

161 if head is None: 161 ↛ 163line 161 didn't jump to line 163 because the condition on line 161 was never true

162 # No migrations exist yet 

163 return False 

164 

165 current = get_current_revision(engine) 

166 

167 if current is None: 

168 # Check if this is a fresh database or existing without migrations 

169 inspector = inspect(engine) 

170 tables = inspector.get_table_names() 

171 

172 if not tables: 

173 # Fresh database, needs initial migration 

174 return True 

175 if "alembic_version" not in tables: 175 ↛ 179line 175 didn't jump to line 179 because the condition on line 175 was always true

176 # Existing database without Alembic - needs stamping then check 

177 return True 

178 

179 return current != head 

180 

181 

182def stamp_database(engine: Engine, revision: str = "head") -> None: 

183 """ 

184 Stamp a database with a revision without running migrations. 

185 Used for baselining existing databases. 

186 

187 Concurrency: If two callers race to stamp a fresh database, one will hit 

188 "table alembic_version already exists" (OperationalError) or a duplicate 

189 PK on version_num (IntegrityError). Both outcomes are benign — the DB 

190 ends up stamped — so we swallow them after verifying the table+row are 

191 in place. A genuine failure (no row appeared) is re-raised. 

192 

193 Args: 

194 engine: SQLAlchemy engine 

195 revision: Revision to stamp (default "head") 

196 """ 

197 config = get_alembic_config(engine) 

198 

199 try: 

200 with engine.begin() as conn: 

201 config.attributes["connection"] = conn 

202 command.stamp(config, revision) 

203 except (IntegrityError, OperationalError) as exc: 

204 # Only swallow errors that look like a benign concurrent-stamp 

205 # race on the alembic_version table itself. A genuine failure 

206 # (disk full, SQLITE_BUSY on an unrelated table, corruption, 

207 # etc.) must propagate so callers see the real error. 

208 msg = str(exc).lower() 

209 looks_like_race = ( 

210 "alembic_version" in msg # IntegrityError or table-exists race 

211 or "already exists" in msg # CREATE TABLE race 

212 ) 

213 if not looks_like_race or get_current_revision(engine) is None: 213 ↛ 217line 213 didn't jump to line 217 because the condition on line 213 was always true

214 raise 

215 # Race-loss path: another caller stamped first. Don't claim we 

216 # stamped it ourselves — log at debug only. 

217 logger.debug( 

218 f"stamp_database({revision}) lost race to concurrent caller " 

219 f"({type(exc).__name__}); database is stamped, continuing" 

220 ) 

221 return 

222 

223 logger.info(f"Stamped database at revision: {revision}") 

224 

225 

226def _drop_orphan_alembic_temp_tables(conn: Connection) -> None: 

227 """Drop leftover ``_alembic_tmp_<table>`` tables from prior failed 

228 batch_alter_table runs (issue #3817). 

229 

230 ``op.batch_alter_table`` rebuilds a table by creating 

231 ``_alembic_tmp_<table>``, copying data, dropping the original, and 

232 renaming. On a clean run alembic drops the temp table automatically. 

233 If a previous attempt failed in a way that bypassed transaction 

234 rollback (e.g., an older migration runner that auto-committed each 

235 migration, or a process killed mid-DDL on a non-transactional 

236 sqlite build), the temp table persists. The next attempt then fails 

237 at ``op.batch_alter_table`` with ``table _alembic_tmp_* already exists``. 

238 

239 This runs in autocommit mode at the SQLite level — each ``DROP TABLE`` 

240 briefly takes the file write lock and releases it. If a concurrent 

241 migration is mid-batch_alter_table on the same table, our DROP blocks 

242 on the SQLite write lock (busy_timeout=10000); by the time we 

243 acquire it, the concurrent migration's rename has consumed the temp 

244 table and our DROP IF EXISTS is a no-op. The race is benign. 

245 """ 

246 inspector = inspect(conn) 

247 temp_tables = [ 

248 name 

249 for name in inspector.get_table_names() 

250 if name.startswith("_alembic_tmp_") 

251 ] 

252 if not temp_tables: 

253 return 

254 logger.warning( 

255 f"Found {len(temp_tables)} orphan alembic temp table(s) from a " 

256 f"prior failed migration: {sorted(temp_tables)}. Dropping before retry." 

257 ) 

258 for name in temp_tables: 

259 # Identifier is constrained to the ``_alembic_tmp_`` prefix + a 

260 # parent table name from ``inspector.get_table_names()``; both 

261 # come from the database's own catalog and cannot contain 

262 # injection vectors. 

263 conn.exec_driver_sql(f'DROP TABLE IF EXISTS "{name}"') # noqa: S608 # bearer:disable python_lang_sql_injection 

264 

265 

266def _disable_fk_for_migration(conn: Connection) -> None: 

267 """Disable FK enforcement on the migration connection BEFORE any 

268 transaction opens (issue #3990). 

269 

270 ``apply_performance_pragmas`` set ``PRAGMA foreign_keys = ON`` at 

271 connect. SQLite then *silently ignores* further toggles of 

272 ``foreign_keys`` once any transaction (explicit or driver-implicit) 

273 is active. The sqlite3/sqlcipher3 driver auto-begins on the first 

274 DML; PRAGMA itself isn't DML, so issuing the PRAGMA before any DML 

275 is the only window where it actually takes effect. 

276 

277 With multi-migration upgrades (revision 0001 → 0009), the first 

278 migration to issue DML auto-begins the driver transaction and 

279 freezes FK in the connect-time ON state for the rest of the upgrade. 

280 That defeats migration 0007's defensive PRAGMA OFF and makes its 

281 orphan-scrub DELETE fail with ``foreign key mismatch`` on tables 

282 whose FK target lacks a UNIQUE backing — exactly the broken-schema 

283 state migration 0007 is meant to repair. 

284 

285 The caller is responsible for re-enabling FK after the migration 

286 transaction commits, BEFORE returning the connection to the pool — 

287 see ``run_migrations``. 

288 """ 

289 conn.exec_driver_sql("PRAGMA foreign_keys = OFF") 

290 # ``exec_driver_sql`` triggers SQLAlchemy autobegin even though no 

291 # sqlite-level transaction was opened (PRAGMA isn't DML, so the 

292 # driver doesn't auto-begin). Roll back the no-op SQLAlchemy 

293 # transaction so the caller's ``conn.begin()`` is allowed to start 

294 # a fresh one. ``PRAGMA foreign_keys`` is connection-level state and 

295 # survives ROLLBACK at the SQLite level — see 

296 # https://www.sqlite.org/pragma.html#pragma_foreign_keys. 

297 conn.rollback() 

298 

299 

300def run_migrations(engine: Engine, target: str = "head") -> None: 

301 """ 

302 Run pending migrations on a database. 

303 

304 The initial migration is idempotent (only creates tables that don't exist), 

305 so this function runs migrations rather than just stamping existing 

306 databases. This ensures any missing tables are created. 

307 

308 When ``target == "head"`` and the database is already at head, the call 

309 short-circuits without opening a write transaction — calling 

310 ``command.upgrade()`` unconditionally would still open a write transaction 

311 via ``engine.begin()`` (taking a RESERVED lock on the SQLite file) just to 

312 discover there's nothing to apply, serialising concurrent readers behind 

313 a no-op on every cold engine reopen. 

314 

315 Security validations performed before running migrations: 

316 - Migration directory path is within expected package boundary 

317 - Migration files are not world-writable 

318 

319 Pre-upgrade hygiene (run outside the migration transaction): 

320 - Drop orphan ``_alembic_tmp_*`` tables from prior failed 

321 ``batch_alter_table`` runs (issue #3817). 

322 - Disable ``PRAGMA foreign_keys`` so 0007's orphan scrub can run 

323 (issue #3990). Re-enabled after a successful upgrade, before the 

324 connection returns to the pool. 

325 

326 On failure inside the migration transaction, the inner 

327 ``conn.begin()`` rolls back automatically — the database stays at 

328 its previous revision. The original exception is re-raised so 

329 callers can decide how to handle it. 

330 

331 Args: 

332 engine: SQLAlchemy engine to migrate 

333 target: Target revision (default "head" for latest) 

334 

335 Raises: 

336 Exception: If migration fails (database is safely rolled back) 

337 """ 

338 migration_start = time.perf_counter() 

339 

340 # Security: Validate migrations directory and file permissions 

341 migrations_dir = get_migrations_dir() 

342 _validate_migrations_permissions(migrations_dir) 

343 

344 head = get_head_revision() 

345 

346 if head is None: 

347 # No migrations exist yet - nothing to do 

348 logger.debug("No migrations found, skipping") 

349 return 

350 

351 current = get_current_revision(engine) 

352 

353 # BUG-3747: Pre-Alembic baseline detection. 

354 # 

355 # A database that has schema tables but no alembic_version row was 

356 # created before commit 4fde036df (v1.4.0, 2026-03-21) via 

357 # Base.metadata.create_all(). Without stamping, command.upgrade() runs 

358 # 0001 (no-op for existing tables) followed by 0002+ against a legacy 

359 # column shape. Migration 0007's index backfill silently fails on 

360 # missing columns (e.g. settings.category), leaving the DB in a 

361 # corrupted state. Stamping at "0001" bypasses the broken path. 

362 if current is None: 

363 inspector = inspect(engine) 

364 existing_tables = set(inspector.get_table_names()) 

365 

366 # Defensive guard: refuse what looks like an auth database. The 

367 # auth DB has its own initialization path (`init_auth_database()` 

368 # in `auth_db.py`) and contains ONLY the `users` table. Pre- 

369 # Alembic user DBs ALSO contain `users` (created by the old 

370 # `Base.metadata.create_all()` path before migration 0001 added 

371 # the explicit skip), so we cannot just check "users present". 

372 # Instead we check the auth-DB *shape*: only `users`, optionally 

373 # alongside `alembic_version`. A real user DB always has 50+ 

374 # other tables. If the auth engine is ever accidentally routed 

375 # through this function, this guard will refuse loudly rather 

376 # than silently pollute the auth DB with user-DB tables. 

377 non_metadata_tables = existing_tables - {"alembic_version"} 

378 if non_metadata_tables == {"users"}: 

379 raise RuntimeError( 

380 "Refusing to run migrations on what looks like an auth " 

381 f"database (only 'users' table present; tables: " 

382 f"{sorted(existing_tables)}). Auth DB is initialized via " 

383 "init_auth_database()." 

384 ) 

385 

386 # User-DB sentinels: both tables date to project inception 

387 # (2025-06-29) and have never been renamed. We require BOTH — 

388 # any single one could be present on a partial-init test DB 

389 # (e.g. one that ran `Setting.__table__.create()` directly) 

390 # where we'd want 0001's `create_all()` to add the missing 

391 # tables, not be skipped by stamping. A real pre-Alembic 

392 # production DB has 60+ tables and definitely has both sentinels. 

393 PRE_ALEMBIC_SENTINELS = {"settings", "research_history"} 

394 if PRE_ALEMBIC_SENTINELS.issubset(existing_tables): 

395 logger.warning( 

396 "BUG-3747: pre-Alembic database detected " 

397 f"({len(existing_tables)} tables, no alembic_version). " 

398 "Stamping at revision 0001 before applying migrations." 

399 ) 

400 stamp_database(engine, "0001") 

401 current = get_current_revision(engine) 

402 logger.info( 

403 f"BUG-3747: pre-Alembic DB stamped at {current}; " 

404 "proceeding with upgrade to head" 

405 ) 

406 

407 # Short-circuit when the database is already at head. Calling 

408 # command.upgrade() unconditionally opens a write transaction via 

409 # engine.begin() even when there is nothing to apply — SQLite takes 

410 # a RESERVED lock on the file as soon as the first DML lands inside 

411 # that transaction, serialising concurrent readers behind a no-op on 

412 # every cold engine reopen. The fresh-DB path (current is None) still 

413 # runs the upgrade so tables and the alembic_version row get created. 

414 if current is not None and current == head and target == "head": 

415 logger.info(f"Database already at revision {head}; skipping upgrade") 

416 return 

417 

418 if current is None: 

419 logger.warning( 

420 "Database has no migration history — applying migrations " 

421 f"(target={target})" 

422 ) 

423 elif current != head and target == "head": 

424 logger.warning( 

425 f"Database schema outdated (revision {current}, " 

426 f"head is {head}) — applying migrations" 

427 ) 

428 

429 config = get_alembic_config(engine) 

430 

431 try: 

432 with engine.connect() as conn: 

433 _drop_orphan_alembic_temp_tables(conn) 

434 _disable_fk_for_migration(conn) 

435 with conn.begin(): 

436 config.attributes["connection"] = conn 

437 command.upgrade(config, target) 

438 # Re-enable FK on this connection BEFORE it returns to the 

439 # pool so subsequent checkouts see the production-default ON 

440 # state. The migration transaction has just committed, so we 

441 # are back outside any active transaction — PRAGMA toggles 

442 # work again. We can't rely on ``engine.dispose()`` to force 

443 # a fresh connection because engines built with ``creator=`` 

444 # have ``url.database is None``, which fails the dispose 

445 # guard below and leaves FK=OFF leaking into the pool. 

446 conn.exec_driver_sql("PRAGMA foreign_keys = ON") 

447 conn.rollback() 

448 except Exception: 

449 logger.exception( 

450 "Database migration failed — database remains at previous " 

451 "revision (auto-rollback by transaction manager)" 

452 ) 

453 raise 

454 

455 # Belt-and-suspenders: dispose pooled connections after a successful 

456 # upgrade. With FK explicitly re-enabled above this is no longer 

457 # load-bearing for FK state, but it forces the next checkout through 

458 # ``apply_performance_pragmas`` which also resets temp_store, cache_size, 

459 # journal_mode, etc. — protecting against any future migration that 

460 # touches connection-level PRAGMAs not handled by the FK fix-up. 

461 # Skip for ``:memory:`` engines — those use a single shared connection 

462 # and disposing it would destroy the just-migrated database. 

463 db_name = engine.url.database 

464 if db_name and db_name != ":memory:": 

465 engine.dispose() 

466 

467 new_revision = get_current_revision(engine) 

468 elapsed_ms = (time.perf_counter() - migration_start) * 1000 

469 if current != new_revision: 469 ↛ 474line 469 didn't jump to line 474 because the condition on line 469 was always true

470 logger.warning( 

471 f"Database migrated: {current} -> {new_revision} " 

472 f"({elapsed_ms:.0f}ms)" 

473 ) 

474 elif elapsed_ms > 100: 

475 logger.info( 

476 f"Database already at revision {new_revision} " 

477 f"(no-op upgrade took {elapsed_ms:.0f}ms)" 

478 ) 

479 else: 

480 logger.info( 

481 f"Database already at revision {new_revision} ({elapsed_ms:.0f}ms)" 

482 )