Coverage for src / local_deep_research / security / file_integrity / integrity_manager.py: 98%
200 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2File Integrity Manager - Main service for file integrity verification.
4Provides smart verification with embedded statistics and sparse failure logging.
5"""
7from datetime import datetime, UTC
8from pathlib import Path
9from typing import Optional, Tuple, List
10from loguru import logger
12from .base_verifier import BaseFileVerifier
13from ...database.models.file_integrity import (
14 FileIntegrityRecord,
15 FileVerificationFailure,
16)
18# Import session context conditionally (requires Flask)
19try:
20 from ...database.session_context import get_user_db_session
22 _has_session_context = True
23except ImportError:
24 _has_session_context = False
25 # Provide stub for type checking
26 get_user_db_session = None # type: ignore
29class FileIntegrityManager:
30 """
31 Central service for file integrity verification.
33 Features:
34 - Smart verification (only verify if file modified)
35 - Embedded statistics (low overhead)
36 - Sparse failure logging (audit trail)
37 - Multi-verifier support (different file types)
38 - Automatic cleanup of old failure records
39 """
41 # Configuration for automatic cleanup
42 MAX_FAILURES_PER_FILE = 100 # Keep at most this many failures per file
43 MAX_TOTAL_FAILURES = 10000 # Global limit across all files
45 def __init__(self, username: str, password: Optional[str] = None):
46 """
47 Initialize file integrity manager.
49 Args:
50 username: Username for database access
51 password: Optional password for encrypted database
53 Raises:
54 ImportError: If Flask/session_context not available
55 """
56 if not _has_session_context:
57 raise ImportError(
58 "FileIntegrityManager requires Flask and database session context. "
59 "Install Flask to use this feature."
60 )
62 self.username = username
63 self.password = password
64 self.verifiers: List[BaseFileVerifier] = []
66 # Run startup cleanup to remove old failures
67 try:
68 deleted = self.cleanup_all_old_failures()
69 if deleted > 0:
70 logger.info(
71 f"[FILE_INTEGRITY] Startup cleanup: removed {deleted} old failure records"
72 )
73 except Exception:
74 logger.warning("[FILE_INTEGRITY] Startup cleanup failed")
76 def _normalize_path(self, file_path: Path) -> str:
77 """
78 Normalize path for consistent storage and lookup.
80 Resolves symlinks, makes absolute, and normalizes separators
81 to ensure the same file is always represented the same way.
83 Args:
84 file_path: Path to normalize
86 Returns:
87 Normalized path string
88 """
89 return str(file_path.resolve())
91 def register_verifier(self, verifier: BaseFileVerifier) -> None:
92 """
93 Register a file type verifier.
95 Args:
96 verifier: Verifier instance to register
97 """
98 self.verifiers.append(verifier)
99 logger.debug(
100 f"[FILE_INTEGRITY] Registered verifier for type: {verifier.get_file_type()}"
101 )
103 def record_file(
104 self,
105 file_path: Path,
106 related_entity_type: Optional[str] = None,
107 related_entity_id: Optional[int] = None,
108 ) -> FileIntegrityRecord:
109 """
110 Create or update integrity record for a file.
112 Args:
113 file_path: Path to file to record
114 related_entity_type: Optional related entity type (e.g., 'rag_index')
115 related_entity_id: Optional related entity ID
117 Returns:
118 FileIntegrityRecord instance
120 Raises:
121 FileNotFoundError: If file doesn't exist
122 ValueError: If no verifier handles this file type
123 """
124 if not file_path.exists():
125 raise FileNotFoundError(f"File not found: {file_path}")
127 verifier = self._get_verifier_for_file(file_path)
128 if not verifier:
129 raise ValueError(f"No verifier registered for file: {file_path}")
131 # Calculate checksum and get file stats
132 checksum = verifier.calculate_checksum(file_path)
133 file_stat = file_path.stat()
134 normalized_path = self._normalize_path(file_path)
136 with get_user_db_session(self.username, self.password) as session:
137 # Check if record exists (using normalized path)
138 record = (
139 session.query(FileIntegrityRecord)
140 .filter_by(file_path=normalized_path)
141 .first()
142 )
144 if record:
145 # Update existing record
146 record.checksum = checksum
147 record.file_size = file_stat.st_size
148 record.file_mtime = file_stat.st_mtime
149 record.algorithm = verifier.get_algorithm()
150 record.updated_at = datetime.now(UTC)
151 logger.info(f"[FILE_INTEGRITY] Updated record for: {file_path}")
152 else:
153 # Create new record
154 record = FileIntegrityRecord(
155 file_path=normalized_path,
156 file_type=verifier.get_file_type(),
157 checksum=checksum,
158 algorithm=verifier.get_algorithm(),
159 file_size=file_stat.st_size,
160 file_mtime=file_stat.st_mtime,
161 verify_on_load=True,
162 allow_modifications=verifier.allows_modifications(),
163 related_entity_type=related_entity_type,
164 related_entity_id=related_entity_id,
165 total_verifications=0,
166 consecutive_successes=0,
167 consecutive_failures=0,
168 )
169 session.add(record)
170 logger.info(
171 f"[FILE_INTEGRITY] Created record for: {file_path} (type: {verifier.get_file_type()})"
172 )
174 session.commit()
175 session.refresh(record)
176 return record
178 def verify_file(
179 self, file_path: Path, force: bool = False
180 ) -> Tuple[bool, Optional[str]]:
181 """
182 Verify file integrity with smart checking.
184 Only verifies if:
185 - File modification time changed since last verification, OR
186 - force=True
188 Args:
189 file_path: Path to file to verify
190 force: Force verification even if file hasn't changed
192 Returns:
193 Tuple of (success, reason_if_failed)
194 """
195 normalized_path = self._normalize_path(file_path)
197 with get_user_db_session(self.username, self.password) as session:
198 record = (
199 session.query(FileIntegrityRecord)
200 .filter_by(file_path=normalized_path)
201 .first()
202 )
204 if not record:
205 logger.warning(
206 f"[FILE_INTEGRITY] No record found for {file_path}, creating one"
207 )
208 try:
209 self.record_file(file_path)
210 return True, None
211 except Exception as e:
212 logger.exception("[FILE_INTEGRITY] Failed to create record")
213 return False, f"Failed to create integrity record: {str(e)}"
215 # Check if verification needed
216 if not force and not self._needs_verification(record, file_path):
217 logger.debug(
218 f"[FILE_INTEGRITY] Skipping verification for {file_path} (unchanged)"
219 )
220 return True, None
222 # Perform verification
223 passed, reason = self._do_verification(record, file_path, session)
225 # Update statistics
226 self._update_stats(record, passed, session)
228 # Log failure if needed
229 if not passed:
230 self._log_failure(
231 record, file_path, reason or "Unknown failure", session
232 )
234 session.commit()
236 if passed:
237 logger.info(
238 f"[FILE_INTEGRITY] Verification passed: {file_path}"
239 )
240 else:
241 logger.error(
242 f"[FILE_INTEGRITY] Verification FAILED: {file_path} - {reason}"
243 )
245 return passed, reason
247 def update_checksum(self, file_path: Path) -> None:
248 """
249 Update checksum after legitimate file modification.
251 Use this when you know a file was legitimately modified
252 and want to update the baseline checksum.
254 Args:
255 file_path: Path to file
257 Raises:
258 FileNotFoundError: If file doesn't exist
259 ValueError: If no record exists for file
260 """
261 if not file_path.exists():
262 raise FileNotFoundError(f"File not found: {file_path}")
264 verifier = self._get_verifier_for_file(file_path)
265 if not verifier:
266 raise ValueError(f"No verifier registered for file: {file_path}")
268 checksum = verifier.calculate_checksum(file_path)
269 file_stat = file_path.stat()
271 with get_user_db_session(self.username, self.password) as session:
272 record = (
273 session.query(FileIntegrityRecord)
274 .filter_by(file_path=str(file_path))
275 .first()
276 )
278 if not record:
279 raise ValueError(f"No integrity record exists for: {file_path}")
281 record.checksum = checksum
282 record.file_size = file_stat.st_size
283 record.file_mtime = file_stat.st_mtime
284 record.updated_at = datetime.now(UTC)
286 session.commit()
287 logger.info(f"[FILE_INTEGRITY] Updated checksum for: {file_path}")
289 def get_file_stats(self, file_path: Path) -> Optional[dict]:
290 """
291 Get verification statistics for a file.
293 Args:
294 file_path: Path to file
296 Returns:
297 Dictionary of stats or None if no record exists
298 """
299 with get_user_db_session(self.username, self.password) as session:
300 record = (
301 session.query(FileIntegrityRecord)
302 .filter_by(file_path=str(file_path))
303 .first()
304 )
306 if not record:
307 return None
309 return {
310 "total_verifications": record.total_verifications,
311 "last_verified_at": record.last_verified_at,
312 "last_verification_passed": record.last_verification_passed,
313 "consecutive_successes": record.consecutive_successes,
314 "consecutive_failures": record.consecutive_failures,
315 "file_type": record.file_type,
316 "created_at": record.created_at,
317 }
319 def get_failure_history(
320 self, file_path: Path, limit: int = 100
321 ) -> List[FileVerificationFailure]:
322 """
323 Get failure history for a file.
325 Args:
326 file_path: Path to file
327 limit: Maximum number of failures to return
329 Returns:
330 List of failure records
331 """
332 with get_user_db_session(self.username, self.password) as session:
333 record = (
334 session.query(FileIntegrityRecord)
335 .filter_by(file_path=str(file_path))
336 .first()
337 )
339 if not record:
340 return []
342 failures = (
343 session.query(FileVerificationFailure)
344 .filter_by(file_record_id=record.id)
345 .order_by(FileVerificationFailure.verified_at.desc())
346 .limit(limit)
347 .all()
348 )
350 # Detach from session
351 for f in failures:
352 session.expunge(f)
354 return failures
356 # Internal methods
358 def _get_verifier_for_file(
359 self, file_path: Path
360 ) -> Optional[BaseFileVerifier]:
361 """Find verifier that handles this file type."""
362 for verifier in self.verifiers:
363 if verifier.should_verify(file_path):
364 return verifier
365 return None
367 def _needs_verification(
368 self, record: FileIntegrityRecord, file_path: Path
369 ) -> bool:
370 """
371 Check if file needs verification.
373 Only verify if file modification time changed since last verification.
374 """
375 if not file_path.exists():
376 return True # File missing needs verification
378 if not record.last_verified_at:
379 return True # Never verified
381 current_mtime = file_path.stat().st_mtime
383 # Compare with stored mtime
384 if record.file_mtime is None:
385 return True # No mtime stored
387 # Verify if file was modified (allow small floating point differences)
388 return abs(current_mtime - record.file_mtime) > 0.001
390 def _do_verification(
391 self, record: FileIntegrityRecord, file_path: Path, session
392 ) -> Tuple[bool, Optional[str]]:
393 """
394 Perform actual verification.
396 Returns:
397 Tuple of (success, reason_if_failed)
398 """
399 # Check file exists
400 if not file_path.exists():
401 return False, "file_missing"
403 # Get verifier
404 verifier = self._get_verifier_for_file(file_path)
405 if not verifier:
406 return False, "no_verifier"
408 # Calculate current checksum
409 try:
410 current_checksum = verifier.calculate_checksum(file_path)
411 except Exception as e:
412 logger.exception("[FILE_INTEGRITY] Failed to calculate checksum")
413 return False, f"checksum_calculation_failed: {str(e)}"
415 # Compare checksums
416 if current_checksum != record.checksum:
417 return False, "checksum_mismatch"
419 # Update file mtime in record
420 record.file_mtime = file_path.stat().st_mtime
422 return True, None
424 def _update_stats(
425 self, record: FileIntegrityRecord, passed: bool, session
426 ) -> None:
427 """Update verification statistics."""
428 record.total_verifications += 1
429 record.last_verified_at = datetime.now(UTC)
430 record.last_verification_passed = passed
432 if passed:
433 record.consecutive_successes += 1
434 record.consecutive_failures = 0
435 else:
436 record.consecutive_failures += 1
437 record.consecutive_successes = 0
439 def _log_failure(
440 self,
441 record: FileIntegrityRecord,
442 file_path: Path,
443 reason: str,
444 session,
445 ) -> None:
446 """Log verification failure to audit trail."""
447 # Get current checksum if possible
448 actual_checksum = None
449 file_size = None
451 if file_path.exists():
452 try:
453 verifier = self._get_verifier_for_file(file_path)
454 if verifier: 454 ↛ 464line 454 didn't jump to line 464 because the condition on line 454 was always true
455 actual_checksum = verifier.calculate_checksum(file_path)
456 file_size = file_path.stat().st_size
457 except Exception:
458 logger.debug(
459 "Checksum calculation failed for {}",
460 file_path,
461 exc_info=True,
462 )
464 failure = FileVerificationFailure(
465 file_record_id=record.id,
466 expected_checksum=record.checksum,
467 actual_checksum=actual_checksum,
468 file_size=file_size,
469 failure_reason=reason,
470 )
471 session.add(failure)
473 logger.warning(
474 f"[FILE_INTEGRITY] Logged failure for {file_path}: {reason}"
475 )
477 # Cleanup old failures for this file
478 self._cleanup_old_failures(record, session)
480 # Periodically check if global cleanup needed (every 100th file to avoid overhead)
481 if record.id % 100 == 0:
482 self._check_global_cleanup_needed(session)
484 def _cleanup_old_failures(
485 self, record: FileIntegrityRecord, session
486 ) -> None:
487 """
488 Clean up old failure records to prevent unbounded growth.
490 Keeps only the most recent MAX_FAILURES_PER_FILE failures per file.
491 """
492 # Count failures for this file
493 failure_count = (
494 session.query(FileVerificationFailure)
495 .filter_by(file_record_id=record.id)
496 .count()
497 )
499 if failure_count > self.MAX_FAILURES_PER_FILE:
500 # Delete oldest failures, keeping only the most recent MAX_FAILURES_PER_FILE
501 failures_to_delete = (
502 session.query(FileVerificationFailure)
503 .filter_by(file_record_id=record.id)
504 .order_by(FileVerificationFailure.verified_at.asc())
505 .limit(failure_count - self.MAX_FAILURES_PER_FILE)
506 .all()
507 )
509 for failure in failures_to_delete:
510 session.delete(failure)
512 logger.info(
513 f"[FILE_INTEGRITY] Cleaned up {len(failures_to_delete)} old failures for file_record {record.id}"
514 )
516 def _check_global_cleanup_needed(self, session) -> None:
517 """
518 Check if global cleanup is needed and run it if threshold exceeded.
520 Only runs cleanup if failure count exceeds MAX_TOTAL_FAILURES by 20%.
521 This prevents constant cleanup while allowing some buffer.
522 """
523 threshold = int(self.MAX_TOTAL_FAILURES * 1.2) # 20% over limit
524 total_failures = session.query(FileVerificationFailure).count()
526 if total_failures > threshold:
527 logger.info(
528 f"[FILE_INTEGRITY] Global failure count ({total_failures}) exceeds threshold ({threshold}), "
529 f"running cleanup..."
530 )
532 # Delete oldest failures to get under limit
533 failures_to_delete_count = total_failures - self.MAX_TOTAL_FAILURES
535 failures_to_delete = (
536 session.query(FileVerificationFailure)
537 .order_by(FileVerificationFailure.verified_at.asc())
538 .limit(failures_to_delete_count)
539 .all()
540 )
542 for failure in failures_to_delete:
543 session.delete(failure)
545 logger.info(
546 f"[FILE_INTEGRITY] Threshold cleanup: deleted {len(failures_to_delete)} old failures"
547 )
549 def cleanup_all_old_failures(self) -> int:
550 """
551 Global cleanup of failure records across all files.
553 Enforces MAX_TOTAL_FAILURES limit by removing oldest failures.
555 Returns:
556 Number of records deleted
557 """
558 with get_user_db_session(self.username, self.password) as session:
559 total_failures = session.query(FileVerificationFailure).count()
561 if total_failures <= self.MAX_TOTAL_FAILURES:
562 return 0
564 # Delete oldest failures to get under limit
565 failures_to_delete_count = total_failures - self.MAX_TOTAL_FAILURES
567 failures_to_delete = (
568 session.query(FileVerificationFailure)
569 .order_by(FileVerificationFailure.verified_at.asc())
570 .limit(failures_to_delete_count)
571 .all()
572 )
574 for failure in failures_to_delete:
575 session.delete(failure)
577 session.commit()
579 logger.info(
580 f"[FILE_INTEGRITY] Global cleanup: deleted {len(failures_to_delete)} old failures "
581 f"(total was {total_failures}, now {total_failures - len(failures_to_delete)})"
582 )
584 return len(failures_to_delete)
586 def get_total_failure_count(self) -> int:
587 """
588 Get total number of failure records across all files.
590 Returns:
591 Total count of failure records
592 """
593 with get_user_db_session(self.username, self.password) as session:
594 return session.query(FileVerificationFailure).count()