Coverage for src / local_deep_research / security / file_integrity / integrity_manager.py: 59%

201 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2File Integrity Manager - Main service for file integrity verification. 

3 

4Provides smart verification with embedded statistics and sparse failure logging. 

5""" 

6 

7from datetime import datetime, UTC 

8from pathlib import Path 

9from typing import Optional, Tuple, List 

10from loguru import logger 

11 

12from .base_verifier import BaseFileVerifier 

13from ...database.models.file_integrity import ( 

14 FileIntegrityRecord, 

15 FileVerificationFailure, 

16) 

17 

18# Import session context conditionally (requires Flask) 

19try: 

20 from ...database.session_context import get_user_db_session 

21 

22 _has_session_context = True 

23except ImportError: 

24 _has_session_context = False 

25 # Provide stub for type checking 

26 get_user_db_session = None # type: ignore 

27 

28 

29class FileIntegrityManager: 

30 """ 

31 Central service for file integrity verification. 

32 

33 Features: 

34 - Smart verification (only verify if file modified) 

35 - Embedded statistics (low overhead) 

36 - Sparse failure logging (audit trail) 

37 - Multi-verifier support (different file types) 

38 - Automatic cleanup of old failure records 

39 """ 

40 

41 # Configuration for automatic cleanup 

42 MAX_FAILURES_PER_FILE = 100 # Keep at most this many failures per file 

43 MAX_TOTAL_FAILURES = 10000 # Global limit across all files 

44 

45 def __init__(self, username: str, password: Optional[str] = None): 

46 """ 

47 Initialize file integrity manager. 

48 

49 Args: 

50 username: Username for database access 

51 password: Optional password for encrypted database 

52 

53 Raises: 

54 ImportError: If Flask/session_context not available 

55 """ 

56 if not _has_session_context: 

57 raise ImportError( 

58 "FileIntegrityManager requires Flask and database session context. " 

59 "Install Flask to use this feature." 

60 ) 

61 

62 self.username = username 

63 self.password = password 

64 self.verifiers: List[BaseFileVerifier] = [] 

65 

66 # Run startup cleanup to remove old failures 

67 try: 

68 deleted = self.cleanup_all_old_failures() 

69 if deleted > 0: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true

70 logger.info( 

71 f"[FILE_INTEGRITY] Startup cleanup: removed {deleted} old failure records" 

72 ) 

73 except Exception as e: 

74 logger.warning(f"[FILE_INTEGRITY] Startup cleanup failed: {e}") 

75 

76 def _normalize_path(self, file_path: Path) -> str: 

77 """ 

78 Normalize path for consistent storage and lookup. 

79 

80 Resolves symlinks, makes absolute, and normalizes separators 

81 to ensure the same file is always represented the same way. 

82 

83 Args: 

84 file_path: Path to normalize 

85 

86 Returns: 

87 Normalized path string 

88 """ 

89 return str(file_path.resolve()) 

90 

91 def register_verifier(self, verifier: BaseFileVerifier) -> None: 

92 """ 

93 Register a file type verifier. 

94 

95 Args: 

96 verifier: Verifier instance to register 

97 """ 

98 self.verifiers.append(verifier) 

99 logger.debug( 

100 f"[FILE_INTEGRITY] Registered verifier for type: {verifier.get_file_type()}" 

101 ) 

102 

103 def record_file( 

104 self, 

105 file_path: Path, 

106 related_entity_type: Optional[str] = None, 

107 related_entity_id: Optional[int] = None, 

108 ) -> FileIntegrityRecord: 

109 """ 

110 Create or update integrity record for a file. 

111 

112 Args: 

113 file_path: Path to file to record 

114 related_entity_type: Optional related entity type (e.g., 'rag_index') 

115 related_entity_id: Optional related entity ID 

116 

117 Returns: 

118 FileIntegrityRecord instance 

119 

120 Raises: 

121 FileNotFoundError: If file doesn't exist 

122 ValueError: If no verifier handles this file type 

123 """ 

124 if not file_path.exists(): 

125 raise FileNotFoundError(f"File not found: {file_path}") 

126 

127 verifier = self._get_verifier_for_file(file_path) 

128 if not verifier: 

129 raise ValueError(f"No verifier registered for file: {file_path}") 

130 

131 # Calculate checksum and get file stats 

132 checksum = verifier.calculate_checksum(file_path) 

133 file_stat = file_path.stat() 

134 normalized_path = self._normalize_path(file_path) 

135 

136 with get_user_db_session(self.username, self.password) as session: 

137 # Check if record exists (using normalized path) 

138 record = ( 

139 session.query(FileIntegrityRecord) 

140 .filter_by(file_path=normalized_path) 

141 .first() 

142 ) 

143 

144 if record: 

145 # Update existing record 

146 record.checksum = checksum 

147 record.file_size = file_stat.st_size 

148 record.file_mtime = file_stat.st_mtime 

149 record.algorithm = verifier.get_algorithm() 

150 record.updated_at = datetime.now(UTC) 

151 logger.info(f"[FILE_INTEGRITY] Updated record for: {file_path}") 

152 else: 

153 # Create new record 

154 record = FileIntegrityRecord( 

155 file_path=normalized_path, 

156 file_type=verifier.get_file_type(), 

157 checksum=checksum, 

158 algorithm=verifier.get_algorithm(), 

159 file_size=file_stat.st_size, 

160 file_mtime=file_stat.st_mtime, 

161 verify_on_load=True, 

162 allow_modifications=verifier.allows_modifications(), 

163 related_entity_type=related_entity_type, 

164 related_entity_id=related_entity_id, 

165 total_verifications=0, 

166 consecutive_successes=0, 

167 consecutive_failures=0, 

168 ) 

169 session.add(record) 

170 logger.info( 

171 f"[FILE_INTEGRITY] Created record for: {file_path} (type: {verifier.get_file_type()})" 

172 ) 

173 

174 session.commit() 

175 session.refresh(record) 

176 return record 

177 

178 def verify_file( 

179 self, file_path: Path, force: bool = False 

180 ) -> Tuple[bool, Optional[str]]: 

181 """ 

182 Verify file integrity with smart checking. 

183 

184 Only verifies if: 

185 - File modification time changed since last verification, OR 

186 - force=True 

187 

188 Args: 

189 file_path: Path to file to verify 

190 force: Force verification even if file hasn't changed 

191 

192 Returns: 

193 Tuple of (success, reason_if_failed) 

194 """ 

195 normalized_path = self._normalize_path(file_path) 

196 

197 with get_user_db_session(self.username, self.password) as session: 

198 record = ( 

199 session.query(FileIntegrityRecord) 

200 .filter_by(file_path=normalized_path) 

201 .first() 

202 ) 

203 

204 if not record: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true

205 logger.warning( 

206 f"[FILE_INTEGRITY] No record found for {file_path}, creating one" 

207 ) 

208 # Create record if it doesn't exist 

209 try: 

210 # Need to commit and return since we're in a different session 

211 session.close() 

212 self.record_file(file_path) 

213 return True, None 

214 except Exception as e: 

215 logger.exception("[FILE_INTEGRITY] Failed to create record") 

216 return False, f"Failed to create integrity record: {str(e)}" 

217 

218 # Check if verification needed 

219 if not force and not self._needs_verification(record, file_path): 219 ↛ anywhereline 219 didn't jump anywhere: it always raised an exception.

220 logger.debug( 

221 f"[FILE_INTEGRITY] Skipping verification for {file_path} (unchanged)" 

222 ) 

223 return True, None 

224 

225 # Perform verification 

226 passed, reason = self._do_verification(record, file_path, session) 

227 

228 # Update statistics 

229 self._update_stats(record, passed, session) 

230 

231 # Log failure if needed 

232 if not passed: 

233 self._log_failure( 

234 record, file_path, reason or "Unknown failure", session 

235 ) 

236 

237 session.commit() 

238 

239 if passed: 

240 logger.info( 

241 f"[FILE_INTEGRITY] Verification passed: {file_path}" 

242 ) 

243 else: 

244 logger.error( 

245 f"[FILE_INTEGRITY] Verification FAILED: {file_path} - {reason}" 

246 ) 

247 

248 return passed, reason 

249 

250 def update_checksum(self, file_path: Path) -> None: 

251 """ 

252 Update checksum after legitimate file modification. 

253 

254 Use this when you know a file was legitimately modified 

255 and want to update the baseline checksum. 

256 

257 Args: 

258 file_path: Path to file 

259 

260 Raises: 

261 FileNotFoundError: If file doesn't exist 

262 ValueError: If no record exists for file 

263 """ 

264 if not file_path.exists(): 

265 raise FileNotFoundError(f"File not found: {file_path}") 

266 

267 verifier = self._get_verifier_for_file(file_path) 

268 if not verifier: 

269 raise ValueError(f"No verifier registered for file: {file_path}") 

270 

271 checksum = verifier.calculate_checksum(file_path) 

272 file_stat = file_path.stat() 

273 

274 with get_user_db_session(self.username, self.password) as session: 

275 record = ( 

276 session.query(FileIntegrityRecord) 

277 .filter_by(file_path=str(file_path)) 

278 .first() 

279 ) 

280 

281 if not record: 

282 raise ValueError(f"No integrity record exists for: {file_path}") 

283 

284 record.checksum = checksum 

285 record.file_size = file_stat.st_size 

286 record.file_mtime = file_stat.st_mtime 

287 record.updated_at = datetime.now(UTC) 

288 

289 session.commit() 

290 logger.info(f"[FILE_INTEGRITY] Updated checksum for: {file_path}") 

291 

292 def get_file_stats(self, file_path: Path) -> Optional[dict]: 

293 """ 

294 Get verification statistics for a file. 

295 

296 Args: 

297 file_path: Path to file 

298 

299 Returns: 

300 Dictionary of stats or None if no record exists 

301 """ 

302 with get_user_db_session(self.username, self.password) as session: 

303 record = ( 

304 session.query(FileIntegrityRecord) 

305 .filter_by(file_path=str(file_path)) 

306 .first() 

307 ) 

308 

309 if not record: 

310 return None 

311 

312 return { 

313 "total_verifications": record.total_verifications, 

314 "last_verified_at": record.last_verified_at, 

315 "last_verification_passed": record.last_verification_passed, 

316 "consecutive_successes": record.consecutive_successes, 

317 "consecutive_failures": record.consecutive_failures, 

318 "file_type": record.file_type, 

319 "created_at": record.created_at, 

320 } 

321 

322 def get_failure_history( 

323 self, file_path: Path, limit: int = 100 

324 ) -> List[FileVerificationFailure]: 

325 """ 

326 Get failure history for a file. 

327 

328 Args: 

329 file_path: Path to file 

330 limit: Maximum number of failures to return 

331 

332 Returns: 

333 List of failure records 

334 """ 

335 with get_user_db_session(self.username, self.password) as session: 

336 record = ( 

337 session.query(FileIntegrityRecord) 

338 .filter_by(file_path=str(file_path)) 

339 .first() 

340 ) 

341 

342 if not record: 

343 return [] 

344 

345 failures = ( 

346 session.query(FileVerificationFailure) 

347 .filter_by(file_record_id=record.id) 

348 .order_by(FileVerificationFailure.verified_at.desc()) 

349 .limit(limit) 

350 .all() 

351 ) 

352 

353 # Detach from session 

354 for f in failures: 

355 session.expunge(f) 

356 

357 return failures 

358 

359 # Internal methods 

360 

361 def _get_verifier_for_file( 

362 self, file_path: Path 

363 ) -> Optional[BaseFileVerifier]: 

364 """Find verifier that handles this file type.""" 

365 for verifier in self.verifiers: 

366 if verifier.should_verify(file_path): 366 ↛ 365line 366 didn't jump to line 365 because the condition on line 366 was always true

367 return verifier 

368 return None 

369 

370 def _needs_verification( 

371 self, record: FileIntegrityRecord, file_path: Path 

372 ) -> bool: 

373 """ 

374 Check if file needs verification. 

375 

376 Only verify if file modification time changed since last verification. 

377 """ 

378 if not file_path.exists(): 

379 return True # File missing needs verification 

380 

381 if not record.last_verified_at: 

382 return True # Never verified 

383 

384 current_mtime = file_path.stat().st_mtime 

385 

386 # Compare with stored mtime 

387 if record.file_mtime is None: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true

388 return True # No mtime stored 

389 

390 # Verify if file was modified (allow small floating point differences) 

391 return abs(current_mtime - record.file_mtime) > 0.001 

392 

393 def _do_verification( 

394 self, record: FileIntegrityRecord, file_path: Path, session 

395 ) -> Tuple[bool, Optional[str]]: 

396 """ 

397 Perform actual verification. 

398 

399 Returns: 

400 Tuple of (success, reason_if_failed) 

401 """ 

402 # Check file exists 

403 if not file_path.exists(): 

404 return False, "file_missing" 

405 

406 # Get verifier 

407 verifier = self._get_verifier_for_file(file_path) 

408 if not verifier: 

409 return False, "no_verifier" 

410 

411 # Calculate current checksum 

412 try: 

413 current_checksum = verifier.calculate_checksum(file_path) 

414 except Exception as e: 

415 logger.exception("[FILE_INTEGRITY] Failed to calculate checksum") 

416 return False, f"checksum_calculation_failed: {str(e)}" 

417 

418 # Compare checksums 

419 if current_checksum != record.checksum: 

420 return False, "checksum_mismatch" 

421 

422 # Update file mtime in record 

423 record.file_mtime = file_path.stat().st_mtime 

424 

425 return True, None 

426 

427 def _update_stats( 

428 self, record: FileIntegrityRecord, passed: bool, session 

429 ) -> None: 

430 """Update verification statistics.""" 

431 record.total_verifications += 1 

432 record.last_verified_at = datetime.now(UTC) 

433 record.last_verification_passed = passed 

434 

435 if passed: 

436 record.consecutive_successes += 1 

437 record.consecutive_failures = 0 

438 else: 

439 record.consecutive_failures += 1 

440 record.consecutive_successes = 0 

441 

442 def _log_failure( 

443 self, 

444 record: FileIntegrityRecord, 

445 file_path: Path, 

446 reason: str, 

447 session, 

448 ) -> None: 

449 """Log verification failure to audit trail.""" 

450 # Get current checksum if possible 

451 actual_checksum = None 

452 file_size = None 

453 

454 if file_path.exists(): 454 ↛ 463line 454 didn't jump to line 463 because the condition on line 454 was always true

455 try: 

456 verifier = self._get_verifier_for_file(file_path) 

457 if verifier: 457 ↛ 463line 457 didn't jump to line 463 because the condition on line 457 was always true

458 actual_checksum = verifier.calculate_checksum(file_path) 

459 file_size = file_path.stat().st_size 

460 except Exception: 

461 pass # Checksum calculation failed, leave as None 

462 

463 failure = FileVerificationFailure( 

464 file_record_id=record.id, 

465 expected_checksum=record.checksum, 

466 actual_checksum=actual_checksum, 

467 file_size=file_size, 

468 failure_reason=reason, 

469 ) 

470 session.add(failure) 

471 

472 logger.warning( 

473 f"[FILE_INTEGRITY] Logged failure for {file_path}: {reason}" 

474 ) 

475 

476 # Cleanup old failures for this file 

477 self._cleanup_old_failures(record, session) 

478 

479 # Periodically check if global cleanup needed (every 100th file to avoid overhead) 

480 if record.id % 100 == 0: 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true

481 self._check_global_cleanup_needed(session) 

482 

483 def _cleanup_old_failures( 

484 self, record: FileIntegrityRecord, session 

485 ) -> None: 

486 """ 

487 Clean up old failure records to prevent unbounded growth. 

488 

489 Keeps only the most recent MAX_FAILURES_PER_FILE failures per file. 

490 """ 

491 # Count failures for this file 

492 failure_count = ( 

493 session.query(FileVerificationFailure) 

494 .filter_by(file_record_id=record.id) 

495 .count() 

496 ) 

497 

498 if failure_count > self.MAX_FAILURES_PER_FILE: 

499 # Delete oldest failures, keeping only the most recent MAX_FAILURES_PER_FILE 

500 failures_to_delete = ( 

501 session.query(FileVerificationFailure) 

502 .filter_by(file_record_id=record.id) 

503 .order_by(FileVerificationFailure.verified_at.asc()) 

504 .limit(failure_count - self.MAX_FAILURES_PER_FILE) 

505 .all() 

506 ) 

507 

508 for failure in failures_to_delete: 

509 session.delete(failure) 

510 

511 logger.info( 

512 f"[FILE_INTEGRITY] Cleaned up {len(failures_to_delete)} old failures for file_record {record.id}" 

513 ) 

514 

515 def _check_global_cleanup_needed(self, session) -> None: 

516 """ 

517 Check if global cleanup is needed and run it if threshold exceeded. 

518 

519 Only runs cleanup if failure count exceeds MAX_TOTAL_FAILURES by 20%. 

520 This prevents constant cleanup while allowing some buffer. 

521 """ 

522 threshold = int(self.MAX_TOTAL_FAILURES * 1.2) # 20% over limit 

523 total_failures = session.query(FileVerificationFailure).count() 

524 

525 if total_failures > threshold: 

526 logger.info( 

527 f"[FILE_INTEGRITY] Global failure count ({total_failures}) exceeds threshold ({threshold}), " 

528 f"running cleanup..." 

529 ) 

530 

531 # Delete oldest failures to get under limit 

532 failures_to_delete_count = total_failures - self.MAX_TOTAL_FAILURES 

533 

534 failures_to_delete = ( 

535 session.query(FileVerificationFailure) 

536 .order_by(FileVerificationFailure.verified_at.asc()) 

537 .limit(failures_to_delete_count) 

538 .all() 

539 ) 

540 

541 for failure in failures_to_delete: 

542 session.delete(failure) 

543 

544 logger.info( 

545 f"[FILE_INTEGRITY] Threshold cleanup: deleted {len(failures_to_delete)} old failures" 

546 ) 

547 

548 def cleanup_all_old_failures(self) -> int: 

549 """ 

550 Global cleanup of failure records across all files. 

551 

552 Enforces MAX_TOTAL_FAILURES limit by removing oldest failures. 

553 

554 Returns: 

555 Number of records deleted 

556 """ 

557 with get_user_db_session(self.username, self.password) as session: 

558 total_failures = session.query(FileVerificationFailure).count() 

559 

560 if total_failures <= self.MAX_TOTAL_FAILURES: 560 ↛ 564line 560 didn't jump to line 564 because the condition on line 560 was always true

561 return 0 

562 

563 # Delete oldest failures to get under limit 

564 failures_to_delete_count = total_failures - self.MAX_TOTAL_FAILURES 

565 

566 failures_to_delete = ( 

567 session.query(FileVerificationFailure) 

568 .order_by(FileVerificationFailure.verified_at.asc()) 

569 .limit(failures_to_delete_count) 

570 .all() 

571 ) 

572 

573 for failure in failures_to_delete: 

574 session.delete(failure) 

575 

576 session.commit() 

577 

578 logger.info( 

579 f"[FILE_INTEGRITY] Global cleanup: deleted {len(failures_to_delete)} old failures " 

580 f"(total was {total_failures}, now {total_failures - len(failures_to_delete)})" 

581 ) 

582 

583 return len(failures_to_delete) 

584 

585 def get_total_failure_count(self) -> int: 

586 """ 

587 Get total number of failure records across all files. 

588 

589 Returns: 

590 Total count of failure records 

591 """ 

592 with get_user_db_session(self.username, self.password) as session: 

593 return session.query(FileVerificationFailure).count()