Coverage for src / local_deep_research / security / file_integrity / integrity_manager.py: 59%
201 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2File Integrity Manager - Main service for file integrity verification.
4Provides smart verification with embedded statistics and sparse failure logging.
5"""
7from datetime import datetime, UTC
8from pathlib import Path
9from typing import Optional, Tuple, List
10from loguru import logger
12from .base_verifier import BaseFileVerifier
13from ...database.models.file_integrity import (
14 FileIntegrityRecord,
15 FileVerificationFailure,
16)
18# Import session context conditionally (requires Flask)
19try:
20 from ...database.session_context import get_user_db_session
22 _has_session_context = True
23except ImportError:
24 _has_session_context = False
25 # Provide stub for type checking
26 get_user_db_session = None # type: ignore
29class FileIntegrityManager:
30 """
31 Central service for file integrity verification.
33 Features:
34 - Smart verification (only verify if file modified)
35 - Embedded statistics (low overhead)
36 - Sparse failure logging (audit trail)
37 - Multi-verifier support (different file types)
38 - Automatic cleanup of old failure records
39 """
41 # Configuration for automatic cleanup
42 MAX_FAILURES_PER_FILE = 100 # Keep at most this many failures per file
43 MAX_TOTAL_FAILURES = 10000 # Global limit across all files
45 def __init__(self, username: str, password: Optional[str] = None):
46 """
47 Initialize file integrity manager.
49 Args:
50 username: Username for database access
51 password: Optional password for encrypted database
53 Raises:
54 ImportError: If Flask/session_context not available
55 """
56 if not _has_session_context:
57 raise ImportError(
58 "FileIntegrityManager requires Flask and database session context. "
59 "Install Flask to use this feature."
60 )
62 self.username = username
63 self.password = password
64 self.verifiers: List[BaseFileVerifier] = []
66 # Run startup cleanup to remove old failures
67 try:
68 deleted = self.cleanup_all_old_failures()
69 if deleted > 0: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true
70 logger.info(
71 f"[FILE_INTEGRITY] Startup cleanup: removed {deleted} old failure records"
72 )
73 except Exception as e:
74 logger.warning(f"[FILE_INTEGRITY] Startup cleanup failed: {e}")
76 def _normalize_path(self, file_path: Path) -> str:
77 """
78 Normalize path for consistent storage and lookup.
80 Resolves symlinks, makes absolute, and normalizes separators
81 to ensure the same file is always represented the same way.
83 Args:
84 file_path: Path to normalize
86 Returns:
87 Normalized path string
88 """
89 return str(file_path.resolve())
91 def register_verifier(self, verifier: BaseFileVerifier) -> None:
92 """
93 Register a file type verifier.
95 Args:
96 verifier: Verifier instance to register
97 """
98 self.verifiers.append(verifier)
99 logger.debug(
100 f"[FILE_INTEGRITY] Registered verifier for type: {verifier.get_file_type()}"
101 )
103 def record_file(
104 self,
105 file_path: Path,
106 related_entity_type: Optional[str] = None,
107 related_entity_id: Optional[int] = None,
108 ) -> FileIntegrityRecord:
109 """
110 Create or update integrity record for a file.
112 Args:
113 file_path: Path to file to record
114 related_entity_type: Optional related entity type (e.g., 'rag_index')
115 related_entity_id: Optional related entity ID
117 Returns:
118 FileIntegrityRecord instance
120 Raises:
121 FileNotFoundError: If file doesn't exist
122 ValueError: If no verifier handles this file type
123 """
124 if not file_path.exists():
125 raise FileNotFoundError(f"File not found: {file_path}")
127 verifier = self._get_verifier_for_file(file_path)
128 if not verifier:
129 raise ValueError(f"No verifier registered for file: {file_path}")
131 # Calculate checksum and get file stats
132 checksum = verifier.calculate_checksum(file_path)
133 file_stat = file_path.stat()
134 normalized_path = self._normalize_path(file_path)
136 with get_user_db_session(self.username, self.password) as session:
137 # Check if record exists (using normalized path)
138 record = (
139 session.query(FileIntegrityRecord)
140 .filter_by(file_path=normalized_path)
141 .first()
142 )
144 if record:
145 # Update existing record
146 record.checksum = checksum
147 record.file_size = file_stat.st_size
148 record.file_mtime = file_stat.st_mtime
149 record.algorithm = verifier.get_algorithm()
150 record.updated_at = datetime.now(UTC)
151 logger.info(f"[FILE_INTEGRITY] Updated record for: {file_path}")
152 else:
153 # Create new record
154 record = FileIntegrityRecord(
155 file_path=normalized_path,
156 file_type=verifier.get_file_type(),
157 checksum=checksum,
158 algorithm=verifier.get_algorithm(),
159 file_size=file_stat.st_size,
160 file_mtime=file_stat.st_mtime,
161 verify_on_load=True,
162 allow_modifications=verifier.allows_modifications(),
163 related_entity_type=related_entity_type,
164 related_entity_id=related_entity_id,
165 total_verifications=0,
166 consecutive_successes=0,
167 consecutive_failures=0,
168 )
169 session.add(record)
170 logger.info(
171 f"[FILE_INTEGRITY] Created record for: {file_path} (type: {verifier.get_file_type()})"
172 )
174 session.commit()
175 session.refresh(record)
176 return record
178 def verify_file(
179 self, file_path: Path, force: bool = False
180 ) -> Tuple[bool, Optional[str]]:
181 """
182 Verify file integrity with smart checking.
184 Only verifies if:
185 - File modification time changed since last verification, OR
186 - force=True
188 Args:
189 file_path: Path to file to verify
190 force: Force verification even if file hasn't changed
192 Returns:
193 Tuple of (success, reason_if_failed)
194 """
195 normalized_path = self._normalize_path(file_path)
197 with get_user_db_session(self.username, self.password) as session:
198 record = (
199 session.query(FileIntegrityRecord)
200 .filter_by(file_path=normalized_path)
201 .first()
202 )
204 if not record: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 logger.warning(
206 f"[FILE_INTEGRITY] No record found for {file_path}, creating one"
207 )
208 # Create record if it doesn't exist
209 try:
210 # Need to commit and return since we're in a different session
211 session.close()
212 self.record_file(file_path)
213 return True, None
214 except Exception as e:
215 logger.exception("[FILE_INTEGRITY] Failed to create record")
216 return False, f"Failed to create integrity record: {str(e)}"
218 # Check if verification needed
219 if not force and not self._needs_verification(record, file_path): 219 ↛ anywhereline 219 didn't jump anywhere: it always raised an exception.
220 logger.debug(
221 f"[FILE_INTEGRITY] Skipping verification for {file_path} (unchanged)"
222 )
223 return True, None
225 # Perform verification
226 passed, reason = self._do_verification(record, file_path, session)
228 # Update statistics
229 self._update_stats(record, passed, session)
231 # Log failure if needed
232 if not passed:
233 self._log_failure(
234 record, file_path, reason or "Unknown failure", session
235 )
237 session.commit()
239 if passed:
240 logger.info(
241 f"[FILE_INTEGRITY] Verification passed: {file_path}"
242 )
243 else:
244 logger.error(
245 f"[FILE_INTEGRITY] Verification FAILED: {file_path} - {reason}"
246 )
248 return passed, reason
250 def update_checksum(self, file_path: Path) -> None:
251 """
252 Update checksum after legitimate file modification.
254 Use this when you know a file was legitimately modified
255 and want to update the baseline checksum.
257 Args:
258 file_path: Path to file
260 Raises:
261 FileNotFoundError: If file doesn't exist
262 ValueError: If no record exists for file
263 """
264 if not file_path.exists():
265 raise FileNotFoundError(f"File not found: {file_path}")
267 verifier = self._get_verifier_for_file(file_path)
268 if not verifier:
269 raise ValueError(f"No verifier registered for file: {file_path}")
271 checksum = verifier.calculate_checksum(file_path)
272 file_stat = file_path.stat()
274 with get_user_db_session(self.username, self.password) as session:
275 record = (
276 session.query(FileIntegrityRecord)
277 .filter_by(file_path=str(file_path))
278 .first()
279 )
281 if not record:
282 raise ValueError(f"No integrity record exists for: {file_path}")
284 record.checksum = checksum
285 record.file_size = file_stat.st_size
286 record.file_mtime = file_stat.st_mtime
287 record.updated_at = datetime.now(UTC)
289 session.commit()
290 logger.info(f"[FILE_INTEGRITY] Updated checksum for: {file_path}")
292 def get_file_stats(self, file_path: Path) -> Optional[dict]:
293 """
294 Get verification statistics for a file.
296 Args:
297 file_path: Path to file
299 Returns:
300 Dictionary of stats or None if no record exists
301 """
302 with get_user_db_session(self.username, self.password) as session:
303 record = (
304 session.query(FileIntegrityRecord)
305 .filter_by(file_path=str(file_path))
306 .first()
307 )
309 if not record:
310 return None
312 return {
313 "total_verifications": record.total_verifications,
314 "last_verified_at": record.last_verified_at,
315 "last_verification_passed": record.last_verification_passed,
316 "consecutive_successes": record.consecutive_successes,
317 "consecutive_failures": record.consecutive_failures,
318 "file_type": record.file_type,
319 "created_at": record.created_at,
320 }
322 def get_failure_history(
323 self, file_path: Path, limit: int = 100
324 ) -> List[FileVerificationFailure]:
325 """
326 Get failure history for a file.
328 Args:
329 file_path: Path to file
330 limit: Maximum number of failures to return
332 Returns:
333 List of failure records
334 """
335 with get_user_db_session(self.username, self.password) as session:
336 record = (
337 session.query(FileIntegrityRecord)
338 .filter_by(file_path=str(file_path))
339 .first()
340 )
342 if not record:
343 return []
345 failures = (
346 session.query(FileVerificationFailure)
347 .filter_by(file_record_id=record.id)
348 .order_by(FileVerificationFailure.verified_at.desc())
349 .limit(limit)
350 .all()
351 )
353 # Detach from session
354 for f in failures:
355 session.expunge(f)
357 return failures
359 # Internal methods
361 def _get_verifier_for_file(
362 self, file_path: Path
363 ) -> Optional[BaseFileVerifier]:
364 """Find verifier that handles this file type."""
365 for verifier in self.verifiers:
366 if verifier.should_verify(file_path): 366 ↛ 365line 366 didn't jump to line 365 because the condition on line 366 was always true
367 return verifier
368 return None
370 def _needs_verification(
371 self, record: FileIntegrityRecord, file_path: Path
372 ) -> bool:
373 """
374 Check if file needs verification.
376 Only verify if file modification time changed since last verification.
377 """
378 if not file_path.exists():
379 return True # File missing needs verification
381 if not record.last_verified_at:
382 return True # Never verified
384 current_mtime = file_path.stat().st_mtime
386 # Compare with stored mtime
387 if record.file_mtime is None: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true
388 return True # No mtime stored
390 # Verify if file was modified (allow small floating point differences)
391 return abs(current_mtime - record.file_mtime) > 0.001
393 def _do_verification(
394 self, record: FileIntegrityRecord, file_path: Path, session
395 ) -> Tuple[bool, Optional[str]]:
396 """
397 Perform actual verification.
399 Returns:
400 Tuple of (success, reason_if_failed)
401 """
402 # Check file exists
403 if not file_path.exists():
404 return False, "file_missing"
406 # Get verifier
407 verifier = self._get_verifier_for_file(file_path)
408 if not verifier:
409 return False, "no_verifier"
411 # Calculate current checksum
412 try:
413 current_checksum = verifier.calculate_checksum(file_path)
414 except Exception as e:
415 logger.exception("[FILE_INTEGRITY] Failed to calculate checksum")
416 return False, f"checksum_calculation_failed: {str(e)}"
418 # Compare checksums
419 if current_checksum != record.checksum:
420 return False, "checksum_mismatch"
422 # Update file mtime in record
423 record.file_mtime = file_path.stat().st_mtime
425 return True, None
427 def _update_stats(
428 self, record: FileIntegrityRecord, passed: bool, session
429 ) -> None:
430 """Update verification statistics."""
431 record.total_verifications += 1
432 record.last_verified_at = datetime.now(UTC)
433 record.last_verification_passed = passed
435 if passed:
436 record.consecutive_successes += 1
437 record.consecutive_failures = 0
438 else:
439 record.consecutive_failures += 1
440 record.consecutive_successes = 0
442 def _log_failure(
443 self,
444 record: FileIntegrityRecord,
445 file_path: Path,
446 reason: str,
447 session,
448 ) -> None:
449 """Log verification failure to audit trail."""
450 # Get current checksum if possible
451 actual_checksum = None
452 file_size = None
454 if file_path.exists(): 454 ↛ 463line 454 didn't jump to line 463 because the condition on line 454 was always true
455 try:
456 verifier = self._get_verifier_for_file(file_path)
457 if verifier: 457 ↛ 463line 457 didn't jump to line 463 because the condition on line 457 was always true
458 actual_checksum = verifier.calculate_checksum(file_path)
459 file_size = file_path.stat().st_size
460 except Exception:
461 pass # Checksum calculation failed, leave as None
463 failure = FileVerificationFailure(
464 file_record_id=record.id,
465 expected_checksum=record.checksum,
466 actual_checksum=actual_checksum,
467 file_size=file_size,
468 failure_reason=reason,
469 )
470 session.add(failure)
472 logger.warning(
473 f"[FILE_INTEGRITY] Logged failure for {file_path}: {reason}"
474 )
476 # Cleanup old failures for this file
477 self._cleanup_old_failures(record, session)
479 # Periodically check if global cleanup needed (every 100th file to avoid overhead)
480 if record.id % 100 == 0: 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true
481 self._check_global_cleanup_needed(session)
483 def _cleanup_old_failures(
484 self, record: FileIntegrityRecord, session
485 ) -> None:
486 """
487 Clean up old failure records to prevent unbounded growth.
489 Keeps only the most recent MAX_FAILURES_PER_FILE failures per file.
490 """
491 # Count failures for this file
492 failure_count = (
493 session.query(FileVerificationFailure)
494 .filter_by(file_record_id=record.id)
495 .count()
496 )
498 if failure_count > self.MAX_FAILURES_PER_FILE:
499 # Delete oldest failures, keeping only the most recent MAX_FAILURES_PER_FILE
500 failures_to_delete = (
501 session.query(FileVerificationFailure)
502 .filter_by(file_record_id=record.id)
503 .order_by(FileVerificationFailure.verified_at.asc())
504 .limit(failure_count - self.MAX_FAILURES_PER_FILE)
505 .all()
506 )
508 for failure in failures_to_delete:
509 session.delete(failure)
511 logger.info(
512 f"[FILE_INTEGRITY] Cleaned up {len(failures_to_delete)} old failures for file_record {record.id}"
513 )
515 def _check_global_cleanup_needed(self, session) -> None:
516 """
517 Check if global cleanup is needed and run it if threshold exceeded.
519 Only runs cleanup if failure count exceeds MAX_TOTAL_FAILURES by 20%.
520 This prevents constant cleanup while allowing some buffer.
521 """
522 threshold = int(self.MAX_TOTAL_FAILURES * 1.2) # 20% over limit
523 total_failures = session.query(FileVerificationFailure).count()
525 if total_failures > threshold:
526 logger.info(
527 f"[FILE_INTEGRITY] Global failure count ({total_failures}) exceeds threshold ({threshold}), "
528 f"running cleanup..."
529 )
531 # Delete oldest failures to get under limit
532 failures_to_delete_count = total_failures - self.MAX_TOTAL_FAILURES
534 failures_to_delete = (
535 session.query(FileVerificationFailure)
536 .order_by(FileVerificationFailure.verified_at.asc())
537 .limit(failures_to_delete_count)
538 .all()
539 )
541 for failure in failures_to_delete:
542 session.delete(failure)
544 logger.info(
545 f"[FILE_INTEGRITY] Threshold cleanup: deleted {len(failures_to_delete)} old failures"
546 )
548 def cleanup_all_old_failures(self) -> int:
549 """
550 Global cleanup of failure records across all files.
552 Enforces MAX_TOTAL_FAILURES limit by removing oldest failures.
554 Returns:
555 Number of records deleted
556 """
557 with get_user_db_session(self.username, self.password) as session:
558 total_failures = session.query(FileVerificationFailure).count()
560 if total_failures <= self.MAX_TOTAL_FAILURES: 560 ↛ 564line 560 didn't jump to line 564 because the condition on line 560 was always true
561 return 0
563 # Delete oldest failures to get under limit
564 failures_to_delete_count = total_failures - self.MAX_TOTAL_FAILURES
566 failures_to_delete = (
567 session.query(FileVerificationFailure)
568 .order_by(FileVerificationFailure.verified_at.asc())
569 .limit(failures_to_delete_count)
570 .all()
571 )
573 for failure in failures_to_delete:
574 session.delete(failure)
576 session.commit()
578 logger.info(
579 f"[FILE_INTEGRITY] Global cleanup: deleted {len(failures_to_delete)} old failures "
580 f"(total was {total_failures}, now {total_failures - len(failures_to_delete)})"
581 )
583 return len(failures_to_delete)
585 def get_total_failure_count(self) -> int:
586 """
587 Get total number of failure records across all files.
589 Returns:
590 Total count of failure records
591 """
592 with get_user_db_session(self.username, self.password) as session:
593 return session.query(FileVerificationFailure).count()