Coverage for src / local_deep_research / security / file_integrity / integrity_manager.py: 11%
201 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2File Integrity Manager - Main service for file integrity verification.
4Provides smart verification with embedded statistics and sparse failure logging.
5"""
7from datetime import datetime, UTC
8from pathlib import Path
9from typing import Optional, Tuple, List
10from loguru import logger
12from .base_verifier import BaseFileVerifier
13from ...database.models.file_integrity import (
14 FileIntegrityRecord,
15 FileVerificationFailure,
16)
18# Import session context conditionally (requires Flask)
19try:
20 from ...database.session_context import get_user_db_session
22 _has_session_context = True
23except ImportError:
24 _has_session_context = False
25 # Provide stub for type checking
26 get_user_db_session = None # type: ignore
29class FileIntegrityManager:
30 """
31 Central service for file integrity verification.
33 Features:
34 - Smart verification (only verify if file modified)
35 - Embedded statistics (low overhead)
36 - Sparse failure logging (audit trail)
37 - Multi-verifier support (different file types)
38 - Automatic cleanup of old failure records
39 """
41 # Configuration for automatic cleanup
42 MAX_FAILURES_PER_FILE = 100 # Keep at most this many failures per file
43 MAX_TOTAL_FAILURES = 10000 # Global limit across all files
45 def __init__(self, username: str, password: Optional[str] = None):
46 """
47 Initialize file integrity manager.
49 Args:
50 username: Username for database access
51 password: Optional password for encrypted database
53 Raises:
54 ImportError: If Flask/session_context not available
55 """
56 if not _has_session_context:
57 raise ImportError(
58 "FileIntegrityManager requires Flask and database session context. "
59 "Install Flask to use this feature."
60 )
62 self.username = username
63 self.password = password
64 self.verifiers: List[BaseFileVerifier] = []
66 # Run startup cleanup to remove old failures
67 try:
68 deleted = self.cleanup_all_old_failures()
69 if deleted > 0:
70 logger.info(
71 f"[FILE_INTEGRITY] Startup cleanup: removed {deleted} old failure records"
72 )
73 except Exception as e:
74 logger.warning(f"[FILE_INTEGRITY] Startup cleanup failed: {e}")
76 def _normalize_path(self, file_path: Path) -> str:
77 """
78 Normalize path for consistent storage and lookup.
80 Resolves symlinks, makes absolute, and normalizes separators
81 to ensure the same file is always represented the same way.
83 Args:
84 file_path: Path to normalize
86 Returns:
87 Normalized path string
88 """
89 return str(file_path.resolve())
91 def register_verifier(self, verifier: BaseFileVerifier) -> None:
92 """
93 Register a file type verifier.
95 Args:
96 verifier: Verifier instance to register
97 """
98 self.verifiers.append(verifier)
99 logger.debug(
100 f"[FILE_INTEGRITY] Registered verifier for type: {verifier.get_file_type()}"
101 )
103 def record_file(
104 self,
105 file_path: Path,
106 related_entity_type: Optional[str] = None,
107 related_entity_id: Optional[int] = None,
108 ) -> FileIntegrityRecord:
109 """
110 Create or update integrity record for a file.
112 Args:
113 file_path: Path to file to record
114 related_entity_type: Optional related entity type (e.g., 'rag_index')
115 related_entity_id: Optional related entity ID
117 Returns:
118 FileIntegrityRecord instance
120 Raises:
121 FileNotFoundError: If file doesn't exist
122 ValueError: If no verifier handles this file type
123 """
124 if not file_path.exists():
125 raise FileNotFoundError(f"File not found: {file_path}")
127 verifier = self._get_verifier_for_file(file_path)
128 if not verifier:
129 raise ValueError(f"No verifier registered for file: {file_path}")
131 # Calculate checksum and get file stats
132 checksum = verifier.calculate_checksum(file_path)
133 file_stat = file_path.stat()
134 normalized_path = self._normalize_path(file_path)
136 with get_user_db_session(self.username, self.password) as session:
137 # Check if record exists (using normalized path)
138 record = (
139 session.query(FileIntegrityRecord)
140 .filter_by(file_path=normalized_path)
141 .first()
142 )
144 if record:
145 # Update existing record
146 record.checksum = checksum
147 record.file_size = file_stat.st_size
148 record.file_mtime = file_stat.st_mtime
149 record.algorithm = verifier.get_algorithm()
150 record.updated_at = datetime.now(UTC)
151 logger.info(f"[FILE_INTEGRITY] Updated record for: {file_path}")
152 else:
153 # Create new record
154 record = FileIntegrityRecord(
155 file_path=normalized_path,
156 file_type=verifier.get_file_type(),
157 checksum=checksum,
158 algorithm=verifier.get_algorithm(),
159 file_size=file_stat.st_size,
160 file_mtime=file_stat.st_mtime,
161 verify_on_load=True,
162 allow_modifications=verifier.allows_modifications(),
163 related_entity_type=related_entity_type,
164 related_entity_id=related_entity_id,
165 total_verifications=0,
166 consecutive_successes=0,
167 consecutive_failures=0,
168 )
169 session.add(record)
170 logger.info(
171 f"[FILE_INTEGRITY] Created record for: {file_path} (type: {verifier.get_file_type()})"
172 )
174 session.commit()
175 session.refresh(record)
176 return record
178 def verify_file(
179 self, file_path: Path, force: bool = False
180 ) -> Tuple[bool, Optional[str]]:
181 """
182 Verify file integrity with smart checking.
184 Only verifies if:
185 - File modification time changed since last verification, OR
186 - force=True
188 Args:
189 file_path: Path to file to verify
190 force: Force verification even if file hasn't changed
192 Returns:
193 Tuple of (success, reason_if_failed)
194 """
195 normalized_path = self._normalize_path(file_path)
197 with get_user_db_session(self.username, self.password) as session:
198 record = (
199 session.query(FileIntegrityRecord)
200 .filter_by(file_path=normalized_path)
201 .first()
202 )
204 if not record:
205 logger.warning(
206 f"[FILE_INTEGRITY] No record found for {file_path}, creating one"
207 )
208 # Create record if it doesn't exist
209 try:
210 # Need to commit and return since we're in a different session
211 session.close()
212 self.record_file(file_path)
213 return True, None
214 except Exception as e:
215 logger.exception(
216 f"[FILE_INTEGRITY] Failed to create record: {e}"
217 )
218 return False, f"Failed to create integrity record: {str(e)}"
220 # Check if verification needed
221 if not force and not self._needs_verification(record, file_path):
222 logger.debug(
223 f"[FILE_INTEGRITY] Skipping verification for {file_path} (unchanged)"
224 )
225 return True, None
227 # Perform verification
228 passed, reason = self._do_verification(record, file_path, session)
230 # Update statistics
231 self._update_stats(record, passed, session)
233 # Log failure if needed
234 if not passed:
235 self._log_failure(
236 record, file_path, reason or "Unknown failure", session
237 )
239 session.commit()
241 if passed:
242 logger.info(
243 f"[FILE_INTEGRITY] Verification passed: {file_path}"
244 )
245 else:
246 logger.error(
247 f"[FILE_INTEGRITY] Verification FAILED: {file_path} - {reason}"
248 )
250 return passed, reason
252 def update_checksum(self, file_path: Path) -> None:
253 """
254 Update checksum after legitimate file modification.
256 Use this when you know a file was legitimately modified
257 and want to update the baseline checksum.
259 Args:
260 file_path: Path to file
262 Raises:
263 FileNotFoundError: If file doesn't exist
264 ValueError: If no record exists for file
265 """
266 if not file_path.exists():
267 raise FileNotFoundError(f"File not found: {file_path}")
269 verifier = self._get_verifier_for_file(file_path)
270 if not verifier:
271 raise ValueError(f"No verifier registered for file: {file_path}")
273 checksum = verifier.calculate_checksum(file_path)
274 file_stat = file_path.stat()
276 with get_user_db_session(self.username, self.password) as session:
277 record = (
278 session.query(FileIntegrityRecord)
279 .filter_by(file_path=str(file_path))
280 .first()
281 )
283 if not record:
284 raise ValueError(f"No integrity record exists for: {file_path}")
286 record.checksum = checksum
287 record.file_size = file_stat.st_size
288 record.file_mtime = file_stat.st_mtime
289 record.updated_at = datetime.now(UTC)
291 session.commit()
292 logger.info(f"[FILE_INTEGRITY] Updated checksum for: {file_path}")
294 def get_file_stats(self, file_path: Path) -> Optional[dict]:
295 """
296 Get verification statistics for a file.
298 Args:
299 file_path: Path to file
301 Returns:
302 Dictionary of stats or None if no record exists
303 """
304 with get_user_db_session(self.username, self.password) as session:
305 record = (
306 session.query(FileIntegrityRecord)
307 .filter_by(file_path=str(file_path))
308 .first()
309 )
311 if not record:
312 return None
314 return {
315 "total_verifications": record.total_verifications,
316 "last_verified_at": record.last_verified_at,
317 "last_verification_passed": record.last_verification_passed,
318 "consecutive_successes": record.consecutive_successes,
319 "consecutive_failures": record.consecutive_failures,
320 "file_type": record.file_type,
321 "created_at": record.created_at,
322 }
324 def get_failure_history(
325 self, file_path: Path, limit: int = 100
326 ) -> List[FileVerificationFailure]:
327 """
328 Get failure history for a file.
330 Args:
331 file_path: Path to file
332 limit: Maximum number of failures to return
334 Returns:
335 List of failure records
336 """
337 with get_user_db_session(self.username, self.password) as session:
338 record = (
339 session.query(FileIntegrityRecord)
340 .filter_by(file_path=str(file_path))
341 .first()
342 )
344 if not record:
345 return []
347 failures = (
348 session.query(FileVerificationFailure)
349 .filter_by(file_record_id=record.id)
350 .order_by(FileVerificationFailure.verified_at.desc())
351 .limit(limit)
352 .all()
353 )
355 # Detach from session
356 for f in failures:
357 session.expunge(f)
359 return failures
361 # Internal methods
363 def _get_verifier_for_file(
364 self, file_path: Path
365 ) -> Optional[BaseFileVerifier]:
366 """Find verifier that handles this file type."""
367 for verifier in self.verifiers:
368 if verifier.should_verify(file_path):
369 return verifier
370 return None
372 def _needs_verification(
373 self, record: FileIntegrityRecord, file_path: Path
374 ) -> bool:
375 """
376 Check if file needs verification.
378 Only verify if file modification time changed since last verification.
379 """
380 if not file_path.exists():
381 return True # File missing needs verification
383 if not record.last_verified_at:
384 return True # Never verified
386 current_mtime = file_path.stat().st_mtime
388 # Compare with stored mtime
389 if record.file_mtime is None:
390 return True # No mtime stored
392 # Verify if file was modified (allow small floating point differences)
393 return abs(current_mtime - record.file_mtime) > 0.001
395 def _do_verification(
396 self, record: FileIntegrityRecord, file_path: Path, session
397 ) -> Tuple[bool, Optional[str]]:
398 """
399 Perform actual verification.
401 Returns:
402 Tuple of (success, reason_if_failed)
403 """
404 # Check file exists
405 if not file_path.exists():
406 return False, "file_missing"
408 # Get verifier
409 verifier = self._get_verifier_for_file(file_path)
410 if not verifier:
411 return False, "no_verifier"
413 # Calculate current checksum
414 try:
415 current_checksum = verifier.calculate_checksum(file_path)
416 except Exception as e:
417 logger.exception(
418 f"[FILE_INTEGRITY] Failed to calculate checksum: {e}"
419 )
420 return False, f"checksum_calculation_failed: {str(e)}"
422 # Compare checksums
423 if current_checksum != record.checksum:
424 return False, "checksum_mismatch"
426 # Update file mtime in record
427 record.file_mtime = file_path.stat().st_mtime
429 return True, None
431 def _update_stats(
432 self, record: FileIntegrityRecord, passed: bool, session
433 ) -> None:
434 """Update verification statistics."""
435 record.total_verifications += 1
436 record.last_verified_at = datetime.now(UTC)
437 record.last_verification_passed = passed
439 if passed:
440 record.consecutive_successes += 1
441 record.consecutive_failures = 0
442 else:
443 record.consecutive_failures += 1
444 record.consecutive_successes = 0
446 def _log_failure(
447 self,
448 record: FileIntegrityRecord,
449 file_path: Path,
450 reason: str,
451 session,
452 ) -> None:
453 """Log verification failure to audit trail."""
454 # Get current checksum if possible
455 actual_checksum = None
456 file_size = None
458 if file_path.exists():
459 try:
460 verifier = self._get_verifier_for_file(file_path)
461 if verifier:
462 actual_checksum = verifier.calculate_checksum(file_path)
463 file_size = file_path.stat().st_size
464 except Exception:
465 pass # Checksum calculation failed, leave as None
467 failure = FileVerificationFailure(
468 file_record_id=record.id,
469 expected_checksum=record.checksum,
470 actual_checksum=actual_checksum,
471 file_size=file_size,
472 failure_reason=reason,
473 )
474 session.add(failure)
476 logger.warning(
477 f"[FILE_INTEGRITY] Logged failure for {file_path}: {reason}"
478 )
480 # Cleanup old failures for this file
481 self._cleanup_old_failures(record, session)
483 # Periodically check if global cleanup needed (every 100th file to avoid overhead)
484 if record.id % 100 == 0:
485 self._check_global_cleanup_needed(session)
487 def _cleanup_old_failures(
488 self, record: FileIntegrityRecord, session
489 ) -> None:
490 """
491 Clean up old failure records to prevent unbounded growth.
493 Keeps only the most recent MAX_FAILURES_PER_FILE failures per file.
494 """
495 # Count failures for this file
496 failure_count = (
497 session.query(FileVerificationFailure)
498 .filter_by(file_record_id=record.id)
499 .count()
500 )
502 if failure_count > self.MAX_FAILURES_PER_FILE:
503 # Delete oldest failures, keeping only the most recent MAX_FAILURES_PER_FILE
504 failures_to_delete = (
505 session.query(FileVerificationFailure)
506 .filter_by(file_record_id=record.id)
507 .order_by(FileVerificationFailure.verified_at.asc())
508 .limit(failure_count - self.MAX_FAILURES_PER_FILE)
509 .all()
510 )
512 for failure in failures_to_delete:
513 session.delete(failure)
515 logger.info(
516 f"[FILE_INTEGRITY] Cleaned up {len(failures_to_delete)} old failures for file_record {record.id}"
517 )
519 def _check_global_cleanup_needed(self, session) -> None:
520 """
521 Check if global cleanup is needed and run it if threshold exceeded.
523 Only runs cleanup if failure count exceeds MAX_TOTAL_FAILURES by 20%.
524 This prevents constant cleanup while allowing some buffer.
525 """
526 threshold = int(self.MAX_TOTAL_FAILURES * 1.2) # 20% over limit
527 total_failures = session.query(FileVerificationFailure).count()
529 if total_failures > threshold:
530 logger.info(
531 f"[FILE_INTEGRITY] Global failure count ({total_failures}) exceeds threshold ({threshold}), "
532 f"running cleanup..."
533 )
535 # Delete oldest failures to get under limit
536 failures_to_delete_count = total_failures - self.MAX_TOTAL_FAILURES
538 failures_to_delete = (
539 session.query(FileVerificationFailure)
540 .order_by(FileVerificationFailure.verified_at.asc())
541 .limit(failures_to_delete_count)
542 .all()
543 )
545 for failure in failures_to_delete:
546 session.delete(failure)
548 logger.info(
549 f"[FILE_INTEGRITY] Threshold cleanup: deleted {len(failures_to_delete)} old failures"
550 )
552 def cleanup_all_old_failures(self) -> int:
553 """
554 Global cleanup of failure records across all files.
556 Enforces MAX_TOTAL_FAILURES limit by removing oldest failures.
558 Returns:
559 Number of records deleted
560 """
561 with get_user_db_session(self.username, self.password) as session:
562 total_failures = session.query(FileVerificationFailure).count()
564 if total_failures <= self.MAX_TOTAL_FAILURES:
565 return 0
567 # Delete oldest failures to get under limit
568 failures_to_delete_count = total_failures - self.MAX_TOTAL_FAILURES
570 failures_to_delete = (
571 session.query(FileVerificationFailure)
572 .order_by(FileVerificationFailure.verified_at.asc())
573 .limit(failures_to_delete_count)
574 .all()
575 )
577 for failure in failures_to_delete:
578 session.delete(failure)
580 session.commit()
582 logger.info(
583 f"[FILE_INTEGRITY] Global cleanup: deleted {len(failures_to_delete)} old failures "
584 f"(total was {total_failures}, now {total_failures - len(failures_to_delete)})"
585 )
587 return len(failures_to_delete)
589 def get_total_failure_count(self) -> int:
590 """
591 Get total number of failure records across all files.
593 Returns:
594 Total count of failure records
595 """
596 with get_user_db_session(self.username, self.password) as session:
597 return session.query(FileVerificationFailure).count()