Coverage for src / local_deep_research / security / file_integrity / integrity_manager.py: 11%

201 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2File Integrity Manager - Main service for file integrity verification. 

3 

4Provides smart verification with embedded statistics and sparse failure logging. 

5""" 

6 

7from datetime import datetime, UTC 

8from pathlib import Path 

9from typing import Optional, Tuple, List 

10from loguru import logger 

11 

12from .base_verifier import BaseFileVerifier 

13from ...database.models.file_integrity import ( 

14 FileIntegrityRecord, 

15 FileVerificationFailure, 

16) 

17 

18# Import session context conditionally (requires Flask) 

19try: 

20 from ...database.session_context import get_user_db_session 

21 

22 _has_session_context = True 

23except ImportError: 

24 _has_session_context = False 

25 # Provide stub for type checking 

26 get_user_db_session = None # type: ignore 

27 

28 

29class FileIntegrityManager: 

30 """ 

31 Central service for file integrity verification. 

32 

33 Features: 

34 - Smart verification (only verify if file modified) 

35 - Embedded statistics (low overhead) 

36 - Sparse failure logging (audit trail) 

37 - Multi-verifier support (different file types) 

38 - Automatic cleanup of old failure records 

39 """ 

40 

41 # Configuration for automatic cleanup 

42 MAX_FAILURES_PER_FILE = 100 # Keep at most this many failures per file 

43 MAX_TOTAL_FAILURES = 10000 # Global limit across all files 

44 

45 def __init__(self, username: str, password: Optional[str] = None): 

46 """ 

47 Initialize file integrity manager. 

48 

49 Args: 

50 username: Username for database access 

51 password: Optional password for encrypted database 

52 

53 Raises: 

54 ImportError: If Flask/session_context not available 

55 """ 

56 if not _has_session_context: 

57 raise ImportError( 

58 "FileIntegrityManager requires Flask and database session context. " 

59 "Install Flask to use this feature." 

60 ) 

61 

62 self.username = username 

63 self.password = password 

64 self.verifiers: List[BaseFileVerifier] = [] 

65 

66 # Run startup cleanup to remove old failures 

67 try: 

68 deleted = self.cleanup_all_old_failures() 

69 if deleted > 0: 

70 logger.info( 

71 f"[FILE_INTEGRITY] Startup cleanup: removed {deleted} old failure records" 

72 ) 

73 except Exception as e: 

74 logger.warning(f"[FILE_INTEGRITY] Startup cleanup failed: {e}") 

75 

76 def _normalize_path(self, file_path: Path) -> str: 

77 """ 

78 Normalize path for consistent storage and lookup. 

79 

80 Resolves symlinks, makes absolute, and normalizes separators 

81 to ensure the same file is always represented the same way. 

82 

83 Args: 

84 file_path: Path to normalize 

85 

86 Returns: 

87 Normalized path string 

88 """ 

89 return str(file_path.resolve()) 

90 

91 def register_verifier(self, verifier: BaseFileVerifier) -> None: 

92 """ 

93 Register a file type verifier. 

94 

95 Args: 

96 verifier: Verifier instance to register 

97 """ 

98 self.verifiers.append(verifier) 

99 logger.debug( 

100 f"[FILE_INTEGRITY] Registered verifier for type: {verifier.get_file_type()}" 

101 ) 

102 

103 def record_file( 

104 self, 

105 file_path: Path, 

106 related_entity_type: Optional[str] = None, 

107 related_entity_id: Optional[int] = None, 

108 ) -> FileIntegrityRecord: 

109 """ 

110 Create or update integrity record for a file. 

111 

112 Args: 

113 file_path: Path to file to record 

114 related_entity_type: Optional related entity type (e.g., 'rag_index') 

115 related_entity_id: Optional related entity ID 

116 

117 Returns: 

118 FileIntegrityRecord instance 

119 

120 Raises: 

121 FileNotFoundError: If file doesn't exist 

122 ValueError: If no verifier handles this file type 

123 """ 

124 if not file_path.exists(): 

125 raise FileNotFoundError(f"File not found: {file_path}") 

126 

127 verifier = self._get_verifier_for_file(file_path) 

128 if not verifier: 

129 raise ValueError(f"No verifier registered for file: {file_path}") 

130 

131 # Calculate checksum and get file stats 

132 checksum = verifier.calculate_checksum(file_path) 

133 file_stat = file_path.stat() 

134 normalized_path = self._normalize_path(file_path) 

135 

136 with get_user_db_session(self.username, self.password) as session: 

137 # Check if record exists (using normalized path) 

138 record = ( 

139 session.query(FileIntegrityRecord) 

140 .filter_by(file_path=normalized_path) 

141 .first() 

142 ) 

143 

144 if record: 

145 # Update existing record 

146 record.checksum = checksum 

147 record.file_size = file_stat.st_size 

148 record.file_mtime = file_stat.st_mtime 

149 record.algorithm = verifier.get_algorithm() 

150 record.updated_at = datetime.now(UTC) 

151 logger.info(f"[FILE_INTEGRITY] Updated record for: {file_path}") 

152 else: 

153 # Create new record 

154 record = FileIntegrityRecord( 

155 file_path=normalized_path, 

156 file_type=verifier.get_file_type(), 

157 checksum=checksum, 

158 algorithm=verifier.get_algorithm(), 

159 file_size=file_stat.st_size, 

160 file_mtime=file_stat.st_mtime, 

161 verify_on_load=True, 

162 allow_modifications=verifier.allows_modifications(), 

163 related_entity_type=related_entity_type, 

164 related_entity_id=related_entity_id, 

165 total_verifications=0, 

166 consecutive_successes=0, 

167 consecutive_failures=0, 

168 ) 

169 session.add(record) 

170 logger.info( 

171 f"[FILE_INTEGRITY] Created record for: {file_path} (type: {verifier.get_file_type()})" 

172 ) 

173 

174 session.commit() 

175 session.refresh(record) 

176 return record 

177 

178 def verify_file( 

179 self, file_path: Path, force: bool = False 

180 ) -> Tuple[bool, Optional[str]]: 

181 """ 

182 Verify file integrity with smart checking. 

183 

184 Only verifies if: 

185 - File modification time changed since last verification, OR 

186 - force=True 

187 

188 Args: 

189 file_path: Path to file to verify 

190 force: Force verification even if file hasn't changed 

191 

192 Returns: 

193 Tuple of (success, reason_if_failed) 

194 """ 

195 normalized_path = self._normalize_path(file_path) 

196 

197 with get_user_db_session(self.username, self.password) as session: 

198 record = ( 

199 session.query(FileIntegrityRecord) 

200 .filter_by(file_path=normalized_path) 

201 .first() 

202 ) 

203 

204 if not record: 

205 logger.warning( 

206 f"[FILE_INTEGRITY] No record found for {file_path}, creating one" 

207 ) 

208 # Create record if it doesn't exist 

209 try: 

210 # Need to commit and return since we're in a different session 

211 session.close() 

212 self.record_file(file_path) 

213 return True, None 

214 except Exception as e: 

215 logger.exception( 

216 f"[FILE_INTEGRITY] Failed to create record: {e}" 

217 ) 

218 return False, f"Failed to create integrity record: {str(e)}" 

219 

220 # Check if verification needed 

221 if not force and not self._needs_verification(record, file_path): 

222 logger.debug( 

223 f"[FILE_INTEGRITY] Skipping verification for {file_path} (unchanged)" 

224 ) 

225 return True, None 

226 

227 # Perform verification 

228 passed, reason = self._do_verification(record, file_path, session) 

229 

230 # Update statistics 

231 self._update_stats(record, passed, session) 

232 

233 # Log failure if needed 

234 if not passed: 

235 self._log_failure( 

236 record, file_path, reason or "Unknown failure", session 

237 ) 

238 

239 session.commit() 

240 

241 if passed: 

242 logger.info( 

243 f"[FILE_INTEGRITY] Verification passed: {file_path}" 

244 ) 

245 else: 

246 logger.error( 

247 f"[FILE_INTEGRITY] Verification FAILED: {file_path} - {reason}" 

248 ) 

249 

250 return passed, reason 

251 

252 def update_checksum(self, file_path: Path) -> None: 

253 """ 

254 Update checksum after legitimate file modification. 

255 

256 Use this when you know a file was legitimately modified 

257 and want to update the baseline checksum. 

258 

259 Args: 

260 file_path: Path to file 

261 

262 Raises: 

263 FileNotFoundError: If file doesn't exist 

264 ValueError: If no record exists for file 

265 """ 

266 if not file_path.exists(): 

267 raise FileNotFoundError(f"File not found: {file_path}") 

268 

269 verifier = self._get_verifier_for_file(file_path) 

270 if not verifier: 

271 raise ValueError(f"No verifier registered for file: {file_path}") 

272 

273 checksum = verifier.calculate_checksum(file_path) 

274 file_stat = file_path.stat() 

275 

276 with get_user_db_session(self.username, self.password) as session: 

277 record = ( 

278 session.query(FileIntegrityRecord) 

279 .filter_by(file_path=str(file_path)) 

280 .first() 

281 ) 

282 

283 if not record: 

284 raise ValueError(f"No integrity record exists for: {file_path}") 

285 

286 record.checksum = checksum 

287 record.file_size = file_stat.st_size 

288 record.file_mtime = file_stat.st_mtime 

289 record.updated_at = datetime.now(UTC) 

290 

291 session.commit() 

292 logger.info(f"[FILE_INTEGRITY] Updated checksum for: {file_path}") 

293 

294 def get_file_stats(self, file_path: Path) -> Optional[dict]: 

295 """ 

296 Get verification statistics for a file. 

297 

298 Args: 

299 file_path: Path to file 

300 

301 Returns: 

302 Dictionary of stats or None if no record exists 

303 """ 

304 with get_user_db_session(self.username, self.password) as session: 

305 record = ( 

306 session.query(FileIntegrityRecord) 

307 .filter_by(file_path=str(file_path)) 

308 .first() 

309 ) 

310 

311 if not record: 

312 return None 

313 

314 return { 

315 "total_verifications": record.total_verifications, 

316 "last_verified_at": record.last_verified_at, 

317 "last_verification_passed": record.last_verification_passed, 

318 "consecutive_successes": record.consecutive_successes, 

319 "consecutive_failures": record.consecutive_failures, 

320 "file_type": record.file_type, 

321 "created_at": record.created_at, 

322 } 

323 

324 def get_failure_history( 

325 self, file_path: Path, limit: int = 100 

326 ) -> List[FileVerificationFailure]: 

327 """ 

328 Get failure history for a file. 

329 

330 Args: 

331 file_path: Path to file 

332 limit: Maximum number of failures to return 

333 

334 Returns: 

335 List of failure records 

336 """ 

337 with get_user_db_session(self.username, self.password) as session: 

338 record = ( 

339 session.query(FileIntegrityRecord) 

340 .filter_by(file_path=str(file_path)) 

341 .first() 

342 ) 

343 

344 if not record: 

345 return [] 

346 

347 failures = ( 

348 session.query(FileVerificationFailure) 

349 .filter_by(file_record_id=record.id) 

350 .order_by(FileVerificationFailure.verified_at.desc()) 

351 .limit(limit) 

352 .all() 

353 ) 

354 

355 # Detach from session 

356 for f in failures: 

357 session.expunge(f) 

358 

359 return failures 

360 

361 # Internal methods 

362 

363 def _get_verifier_for_file( 

364 self, file_path: Path 

365 ) -> Optional[BaseFileVerifier]: 

366 """Find verifier that handles this file type.""" 

367 for verifier in self.verifiers: 

368 if verifier.should_verify(file_path): 

369 return verifier 

370 return None 

371 

372 def _needs_verification( 

373 self, record: FileIntegrityRecord, file_path: Path 

374 ) -> bool: 

375 """ 

376 Check if file needs verification. 

377 

378 Only verify if file modification time changed since last verification. 

379 """ 

380 if not file_path.exists(): 

381 return True # File missing needs verification 

382 

383 if not record.last_verified_at: 

384 return True # Never verified 

385 

386 current_mtime = file_path.stat().st_mtime 

387 

388 # Compare with stored mtime 

389 if record.file_mtime is None: 

390 return True # No mtime stored 

391 

392 # Verify if file was modified (allow small floating point differences) 

393 return abs(current_mtime - record.file_mtime) > 0.001 

394 

395 def _do_verification( 

396 self, record: FileIntegrityRecord, file_path: Path, session 

397 ) -> Tuple[bool, Optional[str]]: 

398 """ 

399 Perform actual verification. 

400 

401 Returns: 

402 Tuple of (success, reason_if_failed) 

403 """ 

404 # Check file exists 

405 if not file_path.exists(): 

406 return False, "file_missing" 

407 

408 # Get verifier 

409 verifier = self._get_verifier_for_file(file_path) 

410 if not verifier: 

411 return False, "no_verifier" 

412 

413 # Calculate current checksum 

414 try: 

415 current_checksum = verifier.calculate_checksum(file_path) 

416 except Exception as e: 

417 logger.exception( 

418 f"[FILE_INTEGRITY] Failed to calculate checksum: {e}" 

419 ) 

420 return False, f"checksum_calculation_failed: {str(e)}" 

421 

422 # Compare checksums 

423 if current_checksum != record.checksum: 

424 return False, "checksum_mismatch" 

425 

426 # Update file mtime in record 

427 record.file_mtime = file_path.stat().st_mtime 

428 

429 return True, None 

430 

431 def _update_stats( 

432 self, record: FileIntegrityRecord, passed: bool, session 

433 ) -> None: 

434 """Update verification statistics.""" 

435 record.total_verifications += 1 

436 record.last_verified_at = datetime.now(UTC) 

437 record.last_verification_passed = passed 

438 

439 if passed: 

440 record.consecutive_successes += 1 

441 record.consecutive_failures = 0 

442 else: 

443 record.consecutive_failures += 1 

444 record.consecutive_successes = 0 

445 

446 def _log_failure( 

447 self, 

448 record: FileIntegrityRecord, 

449 file_path: Path, 

450 reason: str, 

451 session, 

452 ) -> None: 

453 """Log verification failure to audit trail.""" 

454 # Get current checksum if possible 

455 actual_checksum = None 

456 file_size = None 

457 

458 if file_path.exists(): 

459 try: 

460 verifier = self._get_verifier_for_file(file_path) 

461 if verifier: 

462 actual_checksum = verifier.calculate_checksum(file_path) 

463 file_size = file_path.stat().st_size 

464 except Exception: 

465 pass # Checksum calculation failed, leave as None 

466 

467 failure = FileVerificationFailure( 

468 file_record_id=record.id, 

469 expected_checksum=record.checksum, 

470 actual_checksum=actual_checksum, 

471 file_size=file_size, 

472 failure_reason=reason, 

473 ) 

474 session.add(failure) 

475 

476 logger.warning( 

477 f"[FILE_INTEGRITY] Logged failure for {file_path}: {reason}" 

478 ) 

479 

480 # Cleanup old failures for this file 

481 self._cleanup_old_failures(record, session) 

482 

483 # Periodically check if global cleanup needed (every 100th file to avoid overhead) 

484 if record.id % 100 == 0: 

485 self._check_global_cleanup_needed(session) 

486 

487 def _cleanup_old_failures( 

488 self, record: FileIntegrityRecord, session 

489 ) -> None: 

490 """ 

491 Clean up old failure records to prevent unbounded growth. 

492 

493 Keeps only the most recent MAX_FAILURES_PER_FILE failures per file. 

494 """ 

495 # Count failures for this file 

496 failure_count = ( 

497 session.query(FileVerificationFailure) 

498 .filter_by(file_record_id=record.id) 

499 .count() 

500 ) 

501 

502 if failure_count > self.MAX_FAILURES_PER_FILE: 

503 # Delete oldest failures, keeping only the most recent MAX_FAILURES_PER_FILE 

504 failures_to_delete = ( 

505 session.query(FileVerificationFailure) 

506 .filter_by(file_record_id=record.id) 

507 .order_by(FileVerificationFailure.verified_at.asc()) 

508 .limit(failure_count - self.MAX_FAILURES_PER_FILE) 

509 .all() 

510 ) 

511 

512 for failure in failures_to_delete: 

513 session.delete(failure) 

514 

515 logger.info( 

516 f"[FILE_INTEGRITY] Cleaned up {len(failures_to_delete)} old failures for file_record {record.id}" 

517 ) 

518 

519 def _check_global_cleanup_needed(self, session) -> None: 

520 """ 

521 Check if global cleanup is needed and run it if threshold exceeded. 

522 

523 Only runs cleanup if failure count exceeds MAX_TOTAL_FAILURES by 20%. 

524 This prevents constant cleanup while allowing some buffer. 

525 """ 

526 threshold = int(self.MAX_TOTAL_FAILURES * 1.2) # 20% over limit 

527 total_failures = session.query(FileVerificationFailure).count() 

528 

529 if total_failures > threshold: 

530 logger.info( 

531 f"[FILE_INTEGRITY] Global failure count ({total_failures}) exceeds threshold ({threshold}), " 

532 f"running cleanup..." 

533 ) 

534 

535 # Delete oldest failures to get under limit 

536 failures_to_delete_count = total_failures - self.MAX_TOTAL_FAILURES 

537 

538 failures_to_delete = ( 

539 session.query(FileVerificationFailure) 

540 .order_by(FileVerificationFailure.verified_at.asc()) 

541 .limit(failures_to_delete_count) 

542 .all() 

543 ) 

544 

545 for failure in failures_to_delete: 

546 session.delete(failure) 

547 

548 logger.info( 

549 f"[FILE_INTEGRITY] Threshold cleanup: deleted {len(failures_to_delete)} old failures" 

550 ) 

551 

552 def cleanup_all_old_failures(self) -> int: 

553 """ 

554 Global cleanup of failure records across all files. 

555 

556 Enforces MAX_TOTAL_FAILURES limit by removing oldest failures. 

557 

558 Returns: 

559 Number of records deleted 

560 """ 

561 with get_user_db_session(self.username, self.password) as session: 

562 total_failures = session.query(FileVerificationFailure).count() 

563 

564 if total_failures <= self.MAX_TOTAL_FAILURES: 

565 return 0 

566 

567 # Delete oldest failures to get under limit 

568 failures_to_delete_count = total_failures - self.MAX_TOTAL_FAILURES 

569 

570 failures_to_delete = ( 

571 session.query(FileVerificationFailure) 

572 .order_by(FileVerificationFailure.verified_at.asc()) 

573 .limit(failures_to_delete_count) 

574 .all() 

575 ) 

576 

577 for failure in failures_to_delete: 

578 session.delete(failure) 

579 

580 session.commit() 

581 

582 logger.info( 

583 f"[FILE_INTEGRITY] Global cleanup: deleted {len(failures_to_delete)} old failures " 

584 f"(total was {total_failures}, now {total_failures - len(failures_to_delete)})" 

585 ) 

586 

587 return len(failures_to_delete) 

588 

589 def get_total_failure_count(self) -> int: 

590 """ 

591 Get total number of failure records across all files. 

592 

593 Returns: 

594 Total count of failure records 

595 """ 

596 with get_user_db_session(self.username, self.password) as session: 

597 return session.query(FileVerificationFailure).count()