Coverage for src / local_deep_research / database / alembic_runner.py: 92%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Programmatic Alembic migration runner for per-user encrypted databases. 

3 

4This module provides functions to run Alembic migrations against SQLCipher 

5encrypted databases without using the Alembic CLI. Each user database 

6tracks its own migration version via the alembic_version table. 

7""" 

8 

9import os 

10from pathlib import Path 

11from typing import Optional 

12 

13from alembic import command 

14from alembic.config import Config 

15from alembic.runtime.migration import MigrationContext 

16from alembic.script import ScriptDirectory 

17from loguru import logger 

18from sqlalchemy import Engine, inspect 

19 

20 

21def get_migrations_dir() -> Path: 

22 """ 

23 Get the path to the migrations directory with security validation. 

24 

25 Validates that the migrations directory is within the expected package 

26 boundary to prevent symlink attacks that could redirect migration loading 

27 to arbitrary locations. 

28 

29 Returns: 

30 Path to the migrations directory 

31 

32 Raises: 

33 ValueError: If the migrations path is outside expected boundaries 

34 """ 

35 migrations_dir = Path(__file__).parent / "migrations" 

36 real_path = migrations_dir.resolve() 

37 expected_parent = Path(__file__).parent.resolve() 

38 

39 # Security: Ensure migrations directory is within expected package boundary 

40 # This prevents symlink attacks that could load arbitrary Python code 

41 if not real_path.is_relative_to(expected_parent): 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true

42 raise ValueError( 

43 "Invalid migrations path (possible symlink attack): " 

44 "migrations dir resolves outside expected package boundary" 

45 ) 

46 

47 return migrations_dir 

48 

49 

50def _validate_migrations_permissions(migrations_dir: Path) -> None: 

51 """ 

52 Validate migration files are not world-writable. 

53 

54 World-writable migration files could be replaced with malicious code 

55 that would execute during database migrations with the application's 

56 privileges. 

57 

58 Args: 

59 migrations_dir: Path to the migrations directory 

60 

61 Raises: 

62 ValueError: If any migration file is world-writable 

63 

64 Note: 

65 This check is skipped on Windows where file permissions work differently. 

66 """ 

67 if os.name == "nt": # Skip permission checks on Windows 

68 return 

69 

70 versions_dir = migrations_dir / "versions" 

71 if not versions_dir.exists(): 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 return 

73 

74 # Check the versions directory itself 

75 st = versions_dir.stat() 

76 if st.st_mode & 0o002: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true

77 raise ValueError( 

78 f"Migrations directory has insecure permissions (world-writable): " 

79 f"{versions_dir}. Fix with: chmod o-w {versions_dir}" 

80 ) 

81 

82 for migration_file in versions_dir.glob("*.py"): 

83 st = migration_file.stat() 

84 if st.st_mode & 0o002: # World-writable bit 

85 raise ValueError( 

86 f"Migration file has insecure permissions (world-writable): " 

87 f"{migration_file.name}. " 

88 f"Fix with: chmod o-w {migration_file}" 

89 ) 

90 

91 

92def get_alembic_config(engine: Engine) -> Config: 

93 """ 

94 Create an Alembic Config object for programmatic usage. 

95 

96 Args: 

97 engine: SQLAlchemy engine to run migrations against 

98 

99 Returns: 

100 Configured Alembic Config object 

101 """ 

102 migrations_dir = get_migrations_dir() 

103 

104 # Create config object without ini file 

105 config = Config() 

106 

107 # Set script location 

108 config.set_main_option("script_location", str(migrations_dir)) 

109 

110 # Set SQLAlchemy URL (not actually used since we pass connection directly) 

111 # But Alembic requires it to be set 

112 config.set_main_option("sqlalchemy.url", "sqlite:///:memory:") 

113 

114 return config 

115 

116 

117def get_current_revision(engine: Engine) -> Optional[str]: 

118 """ 

119 Get the current migration revision for a database. 

120 

121 Args: 

122 engine: SQLAlchemy engine 

123 

124 Returns: 

125 Current revision string or None if no migrations have run 

126 """ 

127 with engine.connect() as conn: 

128 context = MigrationContext.configure(conn) 

129 return context.get_current_revision() 

130 

131 

132def get_head_revision() -> Optional[str]: 

133 """ 

134 Get the latest migration revision. 

135 

136 Returns: 

137 Head revision string, or None if no migrations exist 

138 """ 

139 migrations_dir = get_migrations_dir() 

140 config = Config() 

141 config.set_main_option("script_location", str(migrations_dir)) 

142 

143 script = ScriptDirectory.from_config(config) 

144 return script.get_current_head() 

145 

146 

147def needs_migration(engine: Engine) -> bool: 

148 """ 

149 Check if a database needs migrations. 

150 

151 Args: 

152 engine: SQLAlchemy engine 

153 

154 Returns: 

155 True if migrations are pending 

156 """ 

157 head = get_head_revision() 

158 

159 if head is None: 159 ↛ 161line 159 didn't jump to line 161 because the condition on line 159 was never true

160 # No migrations exist yet 

161 return False 

162 

163 current = get_current_revision(engine) 

164 

165 if current is None: 

166 # Check if this is a fresh database or existing without migrations 

167 inspector = inspect(engine) 

168 tables = inspector.get_table_names() 

169 

170 if not tables: 

171 # Fresh database, needs initial migration 

172 return True 

173 if "alembic_version" not in tables: 173 ↛ 177line 173 didn't jump to line 177 because the condition on line 173 was always true

174 # Existing database without Alembic - needs stamping then check 

175 return True 

176 

177 return current != head 

178 

179 

180def stamp_database(engine: Engine, revision: str = "head") -> None: 

181 """ 

182 Stamp a database with a revision without running migrations. 

183 Used for baselining existing databases. 

184 

185 Args: 

186 engine: SQLAlchemy engine 

187 revision: Revision to stamp (default "head") 

188 """ 

189 config = get_alembic_config(engine) 

190 

191 with engine.begin() as conn: 

192 config.attributes["connection"] = conn 

193 command.stamp(config, revision) 

194 

195 logger.info(f"Stamped database at revision: {revision}") 

196 

197 

198def run_migrations(engine: Engine, target: str = "head") -> None: 

199 """ 

200 Run pending migrations on a database. 

201 

202 The initial migration is idempotent (only creates tables that don't exist), 

203 so this function always runs migrations rather than just stamping existing 

204 databases. This ensures any missing tables are created. 

205 

206 Security validations performed before running migrations: 

207 - Migration directory path is within expected package boundary 

208 - Migration files are not world-writable 

209 

210 On failure, the transaction is automatically rolled back by 

211 ``engine.begin()``'s context manager — the database stays at its 

212 previous revision. The original exception is re-raised so callers 

213 can decide how to handle it. 

214 

215 Args: 

216 engine: SQLAlchemy engine to migrate 

217 target: Target revision (default "head" for latest) 

218 

219 Raises: 

220 Exception: If migration fails (database is safely rolled back) 

221 """ 

222 # Security: Validate migrations directory and file permissions 

223 migrations_dir = get_migrations_dir() 

224 _validate_migrations_permissions(migrations_dir) 

225 

226 head = get_head_revision() 

227 

228 if head is None: 

229 # No migrations exist yet - nothing to do 

230 logger.debug("No migrations found, skipping") 

231 return 

232 

233 current = get_current_revision(engine) 

234 

235 if current is None: 

236 logger.warning( 

237 "Database has no migration history — applying migrations " 

238 f"(target={target})" 

239 ) 

240 elif current != head and target == "head": 

241 logger.warning( 

242 f"Database schema outdated (revision {current}, " 

243 f"head is {head}) — applying migrations" 

244 ) 

245 

246 config = get_alembic_config(engine) 

247 

248 try: 

249 with engine.begin() as conn: 

250 config.attributes["connection"] = conn 

251 command.upgrade(config, target) 

252 except Exception: 

253 logger.exception( 

254 "Database migration failed — database remains at previous " 

255 "revision (auto-rollback by transaction manager)" 

256 ) 

257 raise 

258 

259 new_revision = get_current_revision(engine) 

260 if current != new_revision: 

261 logger.warning(f"Database migrated: {current} -> {new_revision}") 

262 else: 

263 logger.info(f"Database already at revision {new_revision}")