Coverage for src / local_deep_research / database / alembic_runner.py: 92%
88 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Programmatic Alembic migration runner for per-user encrypted databases.
4This module provides functions to run Alembic migrations against SQLCipher
5encrypted databases without using the Alembic CLI. Each user database
6tracks its own migration version via the alembic_version table.
7"""
9import os
10from pathlib import Path
11from typing import Optional
13from alembic import command
14from alembic.config import Config
15from alembic.runtime.migration import MigrationContext
16from alembic.script import ScriptDirectory
17from loguru import logger
18from sqlalchemy import Engine, inspect
21def get_migrations_dir() -> Path:
22 """
23 Get the path to the migrations directory with security validation.
25 Validates that the migrations directory is within the expected package
26 boundary to prevent symlink attacks that could redirect migration loading
27 to arbitrary locations.
29 Returns:
30 Path to the migrations directory
32 Raises:
33 ValueError: If the migrations path is outside expected boundaries
34 """
35 migrations_dir = Path(__file__).parent / "migrations"
36 real_path = migrations_dir.resolve()
37 expected_parent = Path(__file__).parent.resolve()
39 # Security: Ensure migrations directory is within expected package boundary
40 # This prevents symlink attacks that could load arbitrary Python code
41 if not real_path.is_relative_to(expected_parent): 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true
42 raise ValueError(
43 "Invalid migrations path (possible symlink attack): "
44 "migrations dir resolves outside expected package boundary"
45 )
47 return migrations_dir
50def _validate_migrations_permissions(migrations_dir: Path) -> None:
51 """
52 Validate migration files are not world-writable.
54 World-writable migration files could be replaced with malicious code
55 that would execute during database migrations with the application's
56 privileges.
58 Args:
59 migrations_dir: Path to the migrations directory
61 Raises:
62 ValueError: If any migration file is world-writable
64 Note:
65 This check is skipped on Windows where file permissions work differently.
66 """
67 if os.name == "nt": # Skip permission checks on Windows
68 return
70 versions_dir = migrations_dir / "versions"
71 if not versions_dir.exists(): 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 return
74 # Check the versions directory itself
75 st = versions_dir.stat()
76 if st.st_mode & 0o002: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true
77 raise ValueError(
78 f"Migrations directory has insecure permissions (world-writable): "
79 f"{versions_dir}. Fix with: chmod o-w {versions_dir}"
80 )
82 for migration_file in versions_dir.glob("*.py"):
83 st = migration_file.stat()
84 if st.st_mode & 0o002: # World-writable bit
85 raise ValueError(
86 f"Migration file has insecure permissions (world-writable): "
87 f"{migration_file.name}. "
88 f"Fix with: chmod o-w {migration_file}"
89 )
92def get_alembic_config(engine: Engine) -> Config:
93 """
94 Create an Alembic Config object for programmatic usage.
96 Args:
97 engine: SQLAlchemy engine to run migrations against
99 Returns:
100 Configured Alembic Config object
101 """
102 migrations_dir = get_migrations_dir()
104 # Create config object without ini file
105 config = Config()
107 # Set script location
108 config.set_main_option("script_location", str(migrations_dir))
110 # Set SQLAlchemy URL (not actually used since we pass connection directly)
111 # But Alembic requires it to be set
112 config.set_main_option("sqlalchemy.url", "sqlite:///:memory:")
114 return config
117def get_current_revision(engine: Engine) -> Optional[str]:
118 """
119 Get the current migration revision for a database.
121 Args:
122 engine: SQLAlchemy engine
124 Returns:
125 Current revision string or None if no migrations have run
126 """
127 with engine.connect() as conn:
128 context = MigrationContext.configure(conn)
129 return context.get_current_revision()
132def get_head_revision() -> Optional[str]:
133 """
134 Get the latest migration revision.
136 Returns:
137 Head revision string, or None if no migrations exist
138 """
139 migrations_dir = get_migrations_dir()
140 config = Config()
141 config.set_main_option("script_location", str(migrations_dir))
143 script = ScriptDirectory.from_config(config)
144 return script.get_current_head()
147def needs_migration(engine: Engine) -> bool:
148 """
149 Check if a database needs migrations.
151 Args:
152 engine: SQLAlchemy engine
154 Returns:
155 True if migrations are pending
156 """
157 head = get_head_revision()
159 if head is None: 159 ↛ 161line 159 didn't jump to line 161 because the condition on line 159 was never true
160 # No migrations exist yet
161 return False
163 current = get_current_revision(engine)
165 if current is None:
166 # Check if this is a fresh database or existing without migrations
167 inspector = inspect(engine)
168 tables = inspector.get_table_names()
170 if not tables:
171 # Fresh database, needs initial migration
172 return True
173 if "alembic_version" not in tables: 173 ↛ 177line 173 didn't jump to line 177 because the condition on line 173 was always true
174 # Existing database without Alembic - needs stamping then check
175 return True
177 return current != head
180def stamp_database(engine: Engine, revision: str = "head") -> None:
181 """
182 Stamp a database with a revision without running migrations.
183 Used for baselining existing databases.
185 Args:
186 engine: SQLAlchemy engine
187 revision: Revision to stamp (default "head")
188 """
189 config = get_alembic_config(engine)
191 with engine.begin() as conn:
192 config.attributes["connection"] = conn
193 command.stamp(config, revision)
195 logger.info(f"Stamped database at revision: {revision}")
198def run_migrations(engine: Engine, target: str = "head") -> None:
199 """
200 Run pending migrations on a database.
202 The initial migration is idempotent (only creates tables that don't exist),
203 so this function always runs migrations rather than just stamping existing
204 databases. This ensures any missing tables are created.
206 Security validations performed before running migrations:
207 - Migration directory path is within expected package boundary
208 - Migration files are not world-writable
210 On failure, the transaction is automatically rolled back by
211 ``engine.begin()``'s context manager — the database stays at its
212 previous revision. The original exception is re-raised so callers
213 can decide how to handle it.
215 Args:
216 engine: SQLAlchemy engine to migrate
217 target: Target revision (default "head" for latest)
219 Raises:
220 Exception: If migration fails (database is safely rolled back)
221 """
222 # Security: Validate migrations directory and file permissions
223 migrations_dir = get_migrations_dir()
224 _validate_migrations_permissions(migrations_dir)
226 head = get_head_revision()
228 if head is None:
229 # No migrations exist yet - nothing to do
230 logger.debug("No migrations found, skipping")
231 return
233 current = get_current_revision(engine)
235 if current is None:
236 logger.warning(
237 "Database has no migration history — applying migrations "
238 f"(target={target})"
239 )
240 elif current != head and target == "head":
241 logger.warning(
242 f"Database schema outdated (revision {current}, "
243 f"head is {head}) — applying migrations"
244 )
246 config = get_alembic_config(engine)
248 try:
249 with engine.begin() as conn:
250 config.attributes["connection"] = conn
251 command.upgrade(config, target)
252 except Exception:
253 logger.exception(
254 "Database migration failed — database remains at previous "
255 "revision (auto-rollback by transaction manager)"
256 )
257 raise
259 new_revision = get_current_revision(engine)
260 if current != new_revision:
261 logger.warning(f"Database migrated: {current} -> {new_revision}")
262 else:
263 logger.info(f"Database already at revision {new_revision}")