Coverage for src / local_deep_research / security / file_integrity / integrity_manager.py: 98%

200 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2File Integrity Manager - Main service for file integrity verification. 

3 

4Provides smart verification with embedded statistics and sparse failure logging. 

5""" 

6 

7from datetime import datetime, UTC 

8from pathlib import Path 

9from typing import Optional, Tuple, List 

10from loguru import logger 

11 

12from .base_verifier import BaseFileVerifier 

13from ...database.models.file_integrity import ( 

14 FileIntegrityRecord, 

15 FileVerificationFailure, 

16) 

17 

18# Import session context conditionally (requires Flask) 

19try: 

20 from ...database.session_context import get_user_db_session 

21 

22 _has_session_context = True 

23except ImportError: 

24 _has_session_context = False 

25 # Provide stub for type checking 

26 get_user_db_session = None # type: ignore 

27 

28 

29class FileIntegrityManager: 

30 """ 

31 Central service for file integrity verification. 

32 

33 Features: 

34 - Smart verification (only verify if file modified) 

35 - Embedded statistics (low overhead) 

36 - Sparse failure logging (audit trail) 

37 - Multi-verifier support (different file types) 

38 - Automatic cleanup of old failure records 

39 """ 

40 

41 # Configuration for automatic cleanup 

42 MAX_FAILURES_PER_FILE = 100 # Keep at most this many failures per file 

43 MAX_TOTAL_FAILURES = 10000 # Global limit across all files 

44 

45 def __init__(self, username: str, password: Optional[str] = None): 

46 """ 

47 Initialize file integrity manager. 

48 

49 Args: 

50 username: Username for database access 

51 password: Optional password for encrypted database 

52 

53 Raises: 

54 ImportError: If Flask/session_context not available 

55 """ 

56 if not _has_session_context: 

57 raise ImportError( 

58 "FileIntegrityManager requires Flask and database session context. " 

59 "Install Flask to use this feature." 

60 ) 

61 

62 self.username = username 

63 self.password = password 

64 self.verifiers: List[BaseFileVerifier] = [] 

65 

66 # Run startup cleanup to remove old failures 

67 try: 

68 deleted = self.cleanup_all_old_failures() 

69 if deleted > 0: 

70 logger.info( 

71 f"[FILE_INTEGRITY] Startup cleanup: removed {deleted} old failure records" 

72 ) 

73 except Exception: 

74 logger.warning("[FILE_INTEGRITY] Startup cleanup failed") 

75 

76 def _normalize_path(self, file_path: Path) -> str: 

77 """ 

78 Normalize path for consistent storage and lookup. 

79 

80 Resolves symlinks, makes absolute, and normalizes separators 

81 to ensure the same file is always represented the same way. 

82 

83 Args: 

84 file_path: Path to normalize 

85 

86 Returns: 

87 Normalized path string 

88 """ 

89 return str(file_path.resolve()) 

90 

91 def register_verifier(self, verifier: BaseFileVerifier) -> None: 

92 """ 

93 Register a file type verifier. 

94 

95 Args: 

96 verifier: Verifier instance to register 

97 """ 

98 self.verifiers.append(verifier) 

99 logger.debug( 

100 f"[FILE_INTEGRITY] Registered verifier for type: {verifier.get_file_type()}" 

101 ) 

102 

103 def record_file( 

104 self, 

105 file_path: Path, 

106 related_entity_type: Optional[str] = None, 

107 related_entity_id: Optional[int] = None, 

108 ) -> FileIntegrityRecord: 

109 """ 

110 Create or update integrity record for a file. 

111 

112 Args: 

113 file_path: Path to file to record 

114 related_entity_type: Optional related entity type (e.g., 'rag_index') 

115 related_entity_id: Optional related entity ID 

116 

117 Returns: 

118 FileIntegrityRecord instance 

119 

120 Raises: 

121 FileNotFoundError: If file doesn't exist 

122 ValueError: If no verifier handles this file type 

123 """ 

124 if not file_path.exists(): 

125 raise FileNotFoundError(f"File not found: {file_path}") 

126 

127 verifier = self._get_verifier_for_file(file_path) 

128 if not verifier: 

129 raise ValueError(f"No verifier registered for file: {file_path}") 

130 

131 # Calculate checksum and get file stats 

132 checksum = verifier.calculate_checksum(file_path) 

133 file_stat = file_path.stat() 

134 normalized_path = self._normalize_path(file_path) 

135 

136 with get_user_db_session(self.username, self.password) as session: 

137 # Check if record exists (using normalized path) 

138 record = ( 

139 session.query(FileIntegrityRecord) 

140 .filter_by(file_path=normalized_path) 

141 .first() 

142 ) 

143 

144 if record: 

145 # Update existing record 

146 record.checksum = checksum 

147 record.file_size = file_stat.st_size 

148 record.file_mtime = file_stat.st_mtime 

149 record.algorithm = verifier.get_algorithm() 

150 record.updated_at = datetime.now(UTC) 

151 logger.info(f"[FILE_INTEGRITY] Updated record for: {file_path}") 

152 else: 

153 # Create new record 

154 record = FileIntegrityRecord( 

155 file_path=normalized_path, 

156 file_type=verifier.get_file_type(), 

157 checksum=checksum, 

158 algorithm=verifier.get_algorithm(), 

159 file_size=file_stat.st_size, 

160 file_mtime=file_stat.st_mtime, 

161 verify_on_load=True, 

162 allow_modifications=verifier.allows_modifications(), 

163 related_entity_type=related_entity_type, 

164 related_entity_id=related_entity_id, 

165 total_verifications=0, 

166 consecutive_successes=0, 

167 consecutive_failures=0, 

168 ) 

169 session.add(record) 

170 logger.info( 

171 f"[FILE_INTEGRITY] Created record for: {file_path} (type: {verifier.get_file_type()})" 

172 ) 

173 

174 session.commit() 

175 session.refresh(record) 

176 return record 

177 

178 def verify_file( 

179 self, file_path: Path, force: bool = False 

180 ) -> Tuple[bool, Optional[str]]: 

181 """ 

182 Verify file integrity with smart checking. 

183 

184 Only verifies if: 

185 - File modification time changed since last verification, OR 

186 - force=True 

187 

188 Args: 

189 file_path: Path to file to verify 

190 force: Force verification even if file hasn't changed 

191 

192 Returns: 

193 Tuple of (success, reason_if_failed) 

194 """ 

195 normalized_path = self._normalize_path(file_path) 

196 

197 with get_user_db_session(self.username, self.password) as session: 

198 record = ( 

199 session.query(FileIntegrityRecord) 

200 .filter_by(file_path=normalized_path) 

201 .first() 

202 ) 

203 

204 if not record: 

205 logger.warning( 

206 f"[FILE_INTEGRITY] No record found for {file_path}, creating one" 

207 ) 

208 try: 

209 self.record_file(file_path) 

210 return True, None 

211 except Exception as e: 

212 logger.exception("[FILE_INTEGRITY] Failed to create record") 

213 return False, f"Failed to create integrity record: {str(e)}" 

214 

215 # Check if verification needed 

216 if not force and not self._needs_verification(record, file_path): 

217 logger.debug( 

218 f"[FILE_INTEGRITY] Skipping verification for {file_path} (unchanged)" 

219 ) 

220 return True, None 

221 

222 # Perform verification 

223 passed, reason = self._do_verification(record, file_path, session) 

224 

225 # Update statistics 

226 self._update_stats(record, passed, session) 

227 

228 # Log failure if needed 

229 if not passed: 

230 self._log_failure( 

231 record, file_path, reason or "Unknown failure", session 

232 ) 

233 

234 session.commit() 

235 

236 if passed: 

237 logger.info( 

238 f"[FILE_INTEGRITY] Verification passed: {file_path}" 

239 ) 

240 else: 

241 logger.error( 

242 f"[FILE_INTEGRITY] Verification FAILED: {file_path} - {reason}" 

243 ) 

244 

245 return passed, reason 

246 

247 def update_checksum(self, file_path: Path) -> None: 

248 """ 

249 Update checksum after legitimate file modification. 

250 

251 Use this when you know a file was legitimately modified 

252 and want to update the baseline checksum. 

253 

254 Args: 

255 file_path: Path to file 

256 

257 Raises: 

258 FileNotFoundError: If file doesn't exist 

259 ValueError: If no record exists for file 

260 """ 

261 if not file_path.exists(): 

262 raise FileNotFoundError(f"File not found: {file_path}") 

263 

264 verifier = self._get_verifier_for_file(file_path) 

265 if not verifier: 

266 raise ValueError(f"No verifier registered for file: {file_path}") 

267 

268 checksum = verifier.calculate_checksum(file_path) 

269 file_stat = file_path.stat() 

270 

271 with get_user_db_session(self.username, self.password) as session: 

272 record = ( 

273 session.query(FileIntegrityRecord) 

274 .filter_by(file_path=str(file_path)) 

275 .first() 

276 ) 

277 

278 if not record: 

279 raise ValueError(f"No integrity record exists for: {file_path}") 

280 

281 record.checksum = checksum 

282 record.file_size = file_stat.st_size 

283 record.file_mtime = file_stat.st_mtime 

284 record.updated_at = datetime.now(UTC) 

285 

286 session.commit() 

287 logger.info(f"[FILE_INTEGRITY] Updated checksum for: {file_path}") 

288 

289 def get_file_stats(self, file_path: Path) -> Optional[dict]: 

290 """ 

291 Get verification statistics for a file. 

292 

293 Args: 

294 file_path: Path to file 

295 

296 Returns: 

297 Dictionary of stats or None if no record exists 

298 """ 

299 with get_user_db_session(self.username, self.password) as session: 

300 record = ( 

301 session.query(FileIntegrityRecord) 

302 .filter_by(file_path=str(file_path)) 

303 .first() 

304 ) 

305 

306 if not record: 

307 return None 

308 

309 return { 

310 "total_verifications": record.total_verifications, 

311 "last_verified_at": record.last_verified_at, 

312 "last_verification_passed": record.last_verification_passed, 

313 "consecutive_successes": record.consecutive_successes, 

314 "consecutive_failures": record.consecutive_failures, 

315 "file_type": record.file_type, 

316 "created_at": record.created_at, 

317 } 

318 

319 def get_failure_history( 

320 self, file_path: Path, limit: int = 100 

321 ) -> List[FileVerificationFailure]: 

322 """ 

323 Get failure history for a file. 

324 

325 Args: 

326 file_path: Path to file 

327 limit: Maximum number of failures to return 

328 

329 Returns: 

330 List of failure records 

331 """ 

332 with get_user_db_session(self.username, self.password) as session: 

333 record = ( 

334 session.query(FileIntegrityRecord) 

335 .filter_by(file_path=str(file_path)) 

336 .first() 

337 ) 

338 

339 if not record: 

340 return [] 

341 

342 failures = ( 

343 session.query(FileVerificationFailure) 

344 .filter_by(file_record_id=record.id) 

345 .order_by(FileVerificationFailure.verified_at.desc()) 

346 .limit(limit) 

347 .all() 

348 ) 

349 

350 # Detach from session 

351 for f in failures: 

352 session.expunge(f) 

353 

354 return failures 

355 

356 # Internal methods 

357 

358 def _get_verifier_for_file( 

359 self, file_path: Path 

360 ) -> Optional[BaseFileVerifier]: 

361 """Find verifier that handles this file type.""" 

362 for verifier in self.verifiers: 

363 if verifier.should_verify(file_path): 

364 return verifier 

365 return None 

366 

367 def _needs_verification( 

368 self, record: FileIntegrityRecord, file_path: Path 

369 ) -> bool: 

370 """ 

371 Check if file needs verification. 

372 

373 Only verify if file modification time changed since last verification. 

374 """ 

375 if not file_path.exists(): 

376 return True # File missing needs verification 

377 

378 if not record.last_verified_at: 

379 return True # Never verified 

380 

381 current_mtime = file_path.stat().st_mtime 

382 

383 # Compare with stored mtime 

384 if record.file_mtime is None: 

385 return True # No mtime stored 

386 

387 # Verify if file was modified (allow small floating point differences) 

388 return abs(current_mtime - record.file_mtime) > 0.001 

389 

390 def _do_verification( 

391 self, record: FileIntegrityRecord, file_path: Path, session 

392 ) -> Tuple[bool, Optional[str]]: 

393 """ 

394 Perform actual verification. 

395 

396 Returns: 

397 Tuple of (success, reason_if_failed) 

398 """ 

399 # Check file exists 

400 if not file_path.exists(): 

401 return False, "file_missing" 

402 

403 # Get verifier 

404 verifier = self._get_verifier_for_file(file_path) 

405 if not verifier: 

406 return False, "no_verifier" 

407 

408 # Calculate current checksum 

409 try: 

410 current_checksum = verifier.calculate_checksum(file_path) 

411 except Exception as e: 

412 logger.exception("[FILE_INTEGRITY] Failed to calculate checksum") 

413 return False, f"checksum_calculation_failed: {str(e)}" 

414 

415 # Compare checksums 

416 if current_checksum != record.checksum: 

417 return False, "checksum_mismatch" 

418 

419 # Update file mtime in record 

420 record.file_mtime = file_path.stat().st_mtime 

421 

422 return True, None 

423 

424 def _update_stats( 

425 self, record: FileIntegrityRecord, passed: bool, session 

426 ) -> None: 

427 """Update verification statistics.""" 

428 record.total_verifications += 1 

429 record.last_verified_at = datetime.now(UTC) 

430 record.last_verification_passed = passed 

431 

432 if passed: 

433 record.consecutive_successes += 1 

434 record.consecutive_failures = 0 

435 else: 

436 record.consecutive_failures += 1 

437 record.consecutive_successes = 0 

438 

439 def _log_failure( 

440 self, 

441 record: FileIntegrityRecord, 

442 file_path: Path, 

443 reason: str, 

444 session, 

445 ) -> None: 

446 """Log verification failure to audit trail.""" 

447 # Get current checksum if possible 

448 actual_checksum = None 

449 file_size = None 

450 

451 if file_path.exists(): 

452 try: 

453 verifier = self._get_verifier_for_file(file_path) 

454 if verifier: 454 ↛ 464line 454 didn't jump to line 464 because the condition on line 454 was always true

455 actual_checksum = verifier.calculate_checksum(file_path) 

456 file_size = file_path.stat().st_size 

457 except Exception: 

458 logger.debug( 

459 "Checksum calculation failed for {}", 

460 file_path, 

461 exc_info=True, 

462 ) 

463 

464 failure = FileVerificationFailure( 

465 file_record_id=record.id, 

466 expected_checksum=record.checksum, 

467 actual_checksum=actual_checksum, 

468 file_size=file_size, 

469 failure_reason=reason, 

470 ) 

471 session.add(failure) 

472 

473 logger.warning( 

474 f"[FILE_INTEGRITY] Logged failure for {file_path}: {reason}" 

475 ) 

476 

477 # Cleanup old failures for this file 

478 self._cleanup_old_failures(record, session) 

479 

480 # Periodically check if global cleanup needed (every 100th file to avoid overhead) 

481 if record.id % 100 == 0: 

482 self._check_global_cleanup_needed(session) 

483 

484 def _cleanup_old_failures( 

485 self, record: FileIntegrityRecord, session 

486 ) -> None: 

487 """ 

488 Clean up old failure records to prevent unbounded growth. 

489 

490 Keeps only the most recent MAX_FAILURES_PER_FILE failures per file. 

491 """ 

492 # Count failures for this file 

493 failure_count = ( 

494 session.query(FileVerificationFailure) 

495 .filter_by(file_record_id=record.id) 

496 .count() 

497 ) 

498 

499 if failure_count > self.MAX_FAILURES_PER_FILE: 

500 # Delete oldest failures, keeping only the most recent MAX_FAILURES_PER_FILE 

501 failures_to_delete = ( 

502 session.query(FileVerificationFailure) 

503 .filter_by(file_record_id=record.id) 

504 .order_by(FileVerificationFailure.verified_at.asc()) 

505 .limit(failure_count - self.MAX_FAILURES_PER_FILE) 

506 .all() 

507 ) 

508 

509 for failure in failures_to_delete: 

510 session.delete(failure) 

511 

512 logger.info( 

513 f"[FILE_INTEGRITY] Cleaned up {len(failures_to_delete)} old failures for file_record {record.id}" 

514 ) 

515 

516 def _check_global_cleanup_needed(self, session) -> None: 

517 """ 

518 Check if global cleanup is needed and run it if threshold exceeded. 

519 

520 Only runs cleanup if failure count exceeds MAX_TOTAL_FAILURES by 20%. 

521 This prevents constant cleanup while allowing some buffer. 

522 """ 

523 threshold = int(self.MAX_TOTAL_FAILURES * 1.2) # 20% over limit 

524 total_failures = session.query(FileVerificationFailure).count() 

525 

526 if total_failures > threshold: 

527 logger.info( 

528 f"[FILE_INTEGRITY] Global failure count ({total_failures}) exceeds threshold ({threshold}), " 

529 f"running cleanup..." 

530 ) 

531 

532 # Delete oldest failures to get under limit 

533 failures_to_delete_count = total_failures - self.MAX_TOTAL_FAILURES 

534 

535 failures_to_delete = ( 

536 session.query(FileVerificationFailure) 

537 .order_by(FileVerificationFailure.verified_at.asc()) 

538 .limit(failures_to_delete_count) 

539 .all() 

540 ) 

541 

542 for failure in failures_to_delete: 

543 session.delete(failure) 

544 

545 logger.info( 

546 f"[FILE_INTEGRITY] Threshold cleanup: deleted {len(failures_to_delete)} old failures" 

547 ) 

548 

549 def cleanup_all_old_failures(self) -> int: 

550 """ 

551 Global cleanup of failure records across all files. 

552 

553 Enforces MAX_TOTAL_FAILURES limit by removing oldest failures. 

554 

555 Returns: 

556 Number of records deleted 

557 """ 

558 with get_user_db_session(self.username, self.password) as session: 

559 total_failures = session.query(FileVerificationFailure).count() 

560 

561 if total_failures <= self.MAX_TOTAL_FAILURES: 

562 return 0 

563 

564 # Delete oldest failures to get under limit 

565 failures_to_delete_count = total_failures - self.MAX_TOTAL_FAILURES 

566 

567 failures_to_delete = ( 

568 session.query(FileVerificationFailure) 

569 .order_by(FileVerificationFailure.verified_at.asc()) 

570 .limit(failures_to_delete_count) 

571 .all() 

572 ) 

573 

574 for failure in failures_to_delete: 

575 session.delete(failure) 

576 

577 session.commit() 

578 

579 logger.info( 

580 f"[FILE_INTEGRITY] Global cleanup: deleted {len(failures_to_delete)} old failures " 

581 f"(total was {total_failures}, now {total_failures - len(failures_to_delete)})" 

582 ) 

583 

584 return len(failures_to_delete) 

585 

586 def get_total_failure_count(self) -> int: 

587 """ 

588 Get total number of failure records across all files. 

589 

590 Returns: 

591 Total count of failure records 

592 """ 

593 with get_user_db_session(self.username, self.password) as session: 

594 return session.query(FileVerificationFailure).count()