Coverage for src / local_deep_research / research_library / deletion / utils / cascade_helper.py: 96%
127 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Cascade helper for deletion operations.
4Handles cleanup of related records that don't have proper FK constraints:
5- DocumentChunk (source_id has no FK constraint)
6- FAISS index files
7- Filesystem files
8"""
10from pathlib import Path
11from typing import Dict, List, Optional, Any
13from loguru import logger
14from sqlalchemy.orm import Session
16from ....constants import FILE_PATH_SENTINELS
17from ....database.models.library import (
18 Document,
19 DocumentBlob,
20 DocumentChunk,
21 DocumentCollection,
22 RAGIndex,
23)
24from ....database.models.download_tracker import DownloadTracker
27class CascadeHelper:
28 """Helper class for cleaning up related records during deletion."""
30 @staticmethod
31 def delete_document_chunks(
32 session: Session,
33 document_id: str,
34 collection_name: Optional[str] = None,
35 ) -> int:
36 """
37 Delete DocumentChunks for a document.
39 Since DocumentChunk.source_id has no FK constraint, we must manually
40 clean up chunks when deleting a document.
42 Args:
43 session: Database session
44 document_id: The document ID to delete chunks for
45 collection_name: Optional collection name to limit deletion scope
47 Returns:
48 Number of chunks deleted
49 """
50 query = session.query(DocumentChunk).filter(
51 DocumentChunk.source_id == document_id,
52 DocumentChunk.source_type == "document",
53 )
55 if collection_name:
56 query = query.filter(
57 DocumentChunk.collection_name == collection_name
58 )
60 count = query.delete(synchronize_session=False)
61 logger.debug(
62 f"Deleted {count} chunks for document {document_id[:8]}..."
63 + (f" in collection {collection_name}" if collection_name else "")
64 )
65 return count
67 @staticmethod
68 def delete_collection_chunks(
69 session: Session,
70 collection_name: str,
71 ) -> int:
72 """
73 Delete all DocumentChunks for a collection.
75 Args:
76 session: Database session
77 collection_name: The collection name (e.g., "collection_<uuid>")
79 Returns:
80 Number of chunks deleted
81 """
82 count = (
83 session.query(DocumentChunk)
84 .filter_by(collection_name=collection_name)
85 .delete(synchronize_session=False)
86 )
87 logger.debug(f"Deleted {count} chunks for collection {collection_name}")
88 return count
90 @staticmethod
91 def get_document_blob_size(session: Session, document_id: str) -> int:
92 """
93 Get the size of a document's blob in bytes.
95 Args:
96 session: Database session
97 document_id: The document ID
99 Returns:
100 Size in bytes, or 0 if no blob exists
101 """
102 blob = (
103 session.query(DocumentBlob)
104 .filter_by(document_id=document_id)
105 .first()
106 )
107 if blob and blob.pdf_binary:
108 return len(blob.pdf_binary)
109 return 0
111 @staticmethod
112 def delete_document_blob(session: Session, document_id: str) -> int:
113 """
114 Delete a document's blob record.
116 Note: This is typically handled by CASCADE, but can be called explicitly
117 for blob-only deletion.
119 Args:
120 session: Database session
121 document_id: The document ID
123 Returns:
124 Size of deleted blob in bytes
125 """
126 blob = (
127 session.query(DocumentBlob)
128 .filter_by(document_id=document_id)
129 .first()
130 )
131 if blob:
132 size = len(blob.pdf_binary) if blob.pdf_binary else 0
133 session.delete(blob)
134 logger.debug(
135 f"Deleted blob for document {document_id[:8]}... ({size} bytes)"
136 )
137 return size
138 return 0
140 @staticmethod
141 def delete_filesystem_file(file_path: Optional[str]) -> bool:
142 """
143 Delete a file from the filesystem.
145 Args:
146 file_path: Path to the file (can be relative or absolute)
148 Returns:
149 True if file was deleted, False otherwise
150 """
151 if not file_path:
152 return False
154 # Skip special path markers
155 if file_path in FILE_PATH_SENTINELS:
156 return False
158 try:
159 path = Path(file_path)
160 if path.is_file():
161 path.unlink()
162 logger.debug(f"Deleted filesystem file: {file_path}")
163 return True
164 except Exception:
165 logger.exception(f"Failed to delete filesystem file: {file_path}")
166 return False
168 @staticmethod
169 def delete_faiss_index_files(index_path: Optional[str]) -> bool:
170 """
171 Delete FAISS index files.
173 FAISS stores indices as .faiss and .pkl files.
175 Args:
176 index_path: Path to the FAISS index file (without extension)
178 Returns:
179 True if files were deleted, False otherwise
180 """
181 if not index_path:
182 return False
184 try:
185 path = Path(index_path)
186 deleted_any = False
188 # FAISS index file
189 faiss_file = path.with_suffix(".faiss")
190 if faiss_file.is_file():
191 faiss_file.unlink()
192 logger.debug(f"Deleted FAISS index file: {faiss_file}")
193 deleted_any = True
195 # Pickle file for metadata
196 pkl_file = path.with_suffix(".pkl")
197 if pkl_file.is_file():
198 pkl_file.unlink()
199 logger.debug(f"Deleted FAISS pkl file: {pkl_file}")
200 deleted_any = True
202 return deleted_any
203 except Exception:
204 logger.exception(f"Failed to delete FAISS files for: {index_path}")
205 return False
207 @staticmethod
208 def delete_rag_indices_for_collection(
209 session: Session,
210 collection_name: str,
211 ) -> Dict[str, Any]:
212 """
213 Delete RAGIndex records and their FAISS files for a collection.
215 Args:
216 session: Database session
217 collection_name: The collection name (e.g., "collection_<uuid>")
219 Returns:
220 Dict with deletion results
221 """
222 indices = (
223 session.query(RAGIndex)
224 .filter_by(collection_name=collection_name)
225 .all()
226 )
228 deleted_indices = 0
229 deleted_files = 0
231 for index in indices:
232 # Delete FAISS files
233 if CascadeHelper.delete_faiss_index_files(str(index.index_path)):
234 deleted_files += 1
236 session.delete(index)
237 deleted_indices += 1
239 logger.debug(
240 f"Deleted {deleted_indices} RAGIndex records and {deleted_files} "
241 f"FAISS files for collection {collection_name}"
242 )
244 return {
245 "deleted_indices": deleted_indices,
246 "deleted_files": deleted_files,
247 }
249 @staticmethod
250 def update_download_tracker(
251 session: Session,
252 document: Document,
253 ) -> bool:
254 """
255 Update DownloadTracker when a document is deleted.
257 The FK has SET NULL, but we also need to update is_downloaded flag.
259 Args:
260 session: Database session
261 document: The document being deleted
263 Returns:
264 True if tracker was updated
265 """
266 if not document.original_url:
267 return False
269 # Get URL hash using the same method as library_service
270 from ...utils import get_url_hash
272 try:
273 url_hash = get_url_hash(str(document.original_url))
274 tracker = (
275 session.query(DownloadTracker)
276 .filter_by(url_hash=url_hash)
277 .first()
278 )
280 if tracker:
281 tracker.is_downloaded = False # type: ignore[assignment]
282 tracker.file_path = None # type: ignore[assignment]
283 logger.debug(
284 f"Updated DownloadTracker for document {document.id[:8]}..."
285 )
286 return True
287 except Exception:
288 logger.exception("Failed to update DownloadTracker")
289 return False
291 @staticmethod
292 def count_document_in_collections(
293 session: Session,
294 document_id: str,
295 ) -> int:
296 """
297 Count how many collections a document is in.
299 Args:
300 session: Database session
301 document_id: The document ID
303 Returns:
304 Number of collections the document is in
305 """
306 return (
307 session.query(DocumentCollection)
308 .filter_by(document_id=document_id)
309 .count()
310 )
312 @staticmethod
313 def get_document_collections(
314 session: Session,
315 document_id: str,
316 ) -> List[str]:
317 """
318 Get all collection IDs a document belongs to.
320 Args:
321 session: Database session
322 document_id: The document ID
324 Returns:
325 List of collection IDs
326 """
327 doc_collections = (
328 session.query(DocumentCollection.collection_id)
329 .filter_by(document_id=document_id)
330 .all()
331 )
332 return [dc.collection_id for dc in doc_collections]
334 @staticmethod
335 def remove_from_faiss_index(
336 username: str,
337 collection_name: str,
338 chunk_ids: List[str],
339 ) -> bool:
340 """
341 Remove specific chunks from a FAISS index.
343 Args:
344 username: Username for RAG service
345 collection_name: Collection name
346 chunk_ids: List of chunk IDs to remove
348 Returns:
349 True if successful
350 """
351 try:
352 from ...services.library_rag_service import LibraryRAGService
354 with LibraryRAGService(
355 username=username,
356 ) as rag_service:
357 # This uses the existing remove functionality
358 if ( 358 ↛ 362line 358 didn't jump to line 362 because the condition on line 358 was never true
359 hasattr(rag_service, "faiss_index")
360 and rag_service.faiss_index
361 ):
362 if hasattr(rag_service.faiss_index, "delete"):
363 rag_service.faiss_index.delete(chunk_ids)
364 return True
365 except Exception:
366 logger.exception("Failed to remove chunks from FAISS index")
367 return False
369 @staticmethod
370 def delete_document_completely(
371 session: Session,
372 document_id: str,
373 ) -> bool:
374 """
375 Delete a document and all related records using query-based deletes.
377 This avoids ORM cascade issues where SQLAlchemy tries to set
378 DocumentBlob.document_id to NULL (which fails because it's a PK).
380 Deletes in order:
381 1. DocumentBlob
382 2. DocumentCollection links
383 3. Document itself
385 Note: DocumentChunks should be deleted separately before calling this,
386 as they may need collection-specific handling.
388 Args:
389 session: Database session
390 document_id: The document ID to delete
392 Returns:
393 True if document was deleted
394 """
395 # Delete blob (has document_id as PK, can't be nulled by cascade)
396 session.query(DocumentBlob).filter_by(document_id=document_id).delete(
397 synchronize_session=False
398 )
400 # Delete collection links
401 session.query(DocumentCollection).filter_by(
402 document_id=document_id
403 ).delete(synchronize_session=False)
405 # Delete document itself
406 deleted = (
407 session.query(Document)
408 .filter_by(id=document_id)
409 .delete(synchronize_session=False)
410 )
412 if deleted:
413 logger.debug(f"Deleted document {document_id[:8]}... completely")
415 return deleted > 0