Coverage for src / local_deep_research / research_library / deletion / utils / cascade_helper.py: 96%

127 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Cascade helper for deletion operations. 

3 

4Handles cleanup of related records that don't have proper FK constraints: 

5- DocumentChunk (source_id has no FK constraint) 

6- FAISS index files 

7- Filesystem files 

8""" 

9 

10from pathlib import Path 

11from typing import Dict, List, Optional, Any 

12 

13from loguru import logger 

14from sqlalchemy.orm import Session 

15 

16from ....constants import FILE_PATH_SENTINELS 

17from ....database.models.library import ( 

18 Document, 

19 DocumentBlob, 

20 DocumentChunk, 

21 DocumentCollection, 

22 RAGIndex, 

23) 

24from ....database.models.download_tracker import DownloadTracker 

25 

26 

27class CascadeHelper: 

28 """Helper class for cleaning up related records during deletion.""" 

29 

30 @staticmethod 

31 def delete_document_chunks( 

32 session: Session, 

33 document_id: str, 

34 collection_name: Optional[str] = None, 

35 ) -> int: 

36 """ 

37 Delete DocumentChunks for a document. 

38 

39 Since DocumentChunk.source_id has no FK constraint, we must manually 

40 clean up chunks when deleting a document. 

41 

42 Args: 

43 session: Database session 

44 document_id: The document ID to delete chunks for 

45 collection_name: Optional collection name to limit deletion scope 

46 

47 Returns: 

48 Number of chunks deleted 

49 """ 

50 query = session.query(DocumentChunk).filter( 

51 DocumentChunk.source_id == document_id, 

52 DocumentChunk.source_type == "document", 

53 ) 

54 

55 if collection_name: 

56 query = query.filter( 

57 DocumentChunk.collection_name == collection_name 

58 ) 

59 

60 count = query.delete(synchronize_session=False) 

61 logger.debug( 

62 f"Deleted {count} chunks for document {document_id[:8]}..." 

63 + (f" in collection {collection_name}" if collection_name else "") 

64 ) 

65 return count 

66 

67 @staticmethod 

68 def delete_collection_chunks( 

69 session: Session, 

70 collection_name: str, 

71 ) -> int: 

72 """ 

73 Delete all DocumentChunks for a collection. 

74 

75 Args: 

76 session: Database session 

77 collection_name: The collection name (e.g., "collection_<uuid>") 

78 

79 Returns: 

80 Number of chunks deleted 

81 """ 

82 count = ( 

83 session.query(DocumentChunk) 

84 .filter_by(collection_name=collection_name) 

85 .delete(synchronize_session=False) 

86 ) 

87 logger.debug(f"Deleted {count} chunks for collection {collection_name}") 

88 return count 

89 

90 @staticmethod 

91 def get_document_blob_size(session: Session, document_id: str) -> int: 

92 """ 

93 Get the size of a document's blob in bytes. 

94 

95 Args: 

96 session: Database session 

97 document_id: The document ID 

98 

99 Returns: 

100 Size in bytes, or 0 if no blob exists 

101 """ 

102 blob = ( 

103 session.query(DocumentBlob) 

104 .filter_by(document_id=document_id) 

105 .first() 

106 ) 

107 if blob and blob.pdf_binary: 

108 return len(blob.pdf_binary) 

109 return 0 

110 

111 @staticmethod 

112 def delete_document_blob(session: Session, document_id: str) -> int: 

113 """ 

114 Delete a document's blob record. 

115 

116 Note: This is typically handled by CASCADE, but can be called explicitly 

117 for blob-only deletion. 

118 

119 Args: 

120 session: Database session 

121 document_id: The document ID 

122 

123 Returns: 

124 Size of deleted blob in bytes 

125 """ 

126 blob = ( 

127 session.query(DocumentBlob) 

128 .filter_by(document_id=document_id) 

129 .first() 

130 ) 

131 if blob: 

132 size = len(blob.pdf_binary) if blob.pdf_binary else 0 

133 session.delete(blob) 

134 logger.debug( 

135 f"Deleted blob for document {document_id[:8]}... ({size} bytes)" 

136 ) 

137 return size 

138 return 0 

139 

140 @staticmethod 

141 def delete_filesystem_file(file_path: Optional[str]) -> bool: 

142 """ 

143 Delete a file from the filesystem. 

144 

145 Args: 

146 file_path: Path to the file (can be relative or absolute) 

147 

148 Returns: 

149 True if file was deleted, False otherwise 

150 """ 

151 if not file_path: 

152 return False 

153 

154 # Skip special path markers 

155 if file_path in FILE_PATH_SENTINELS: 

156 return False 

157 

158 try: 

159 path = Path(file_path) 

160 if path.is_file(): 

161 path.unlink() 

162 logger.debug(f"Deleted filesystem file: {file_path}") 

163 return True 

164 except Exception: 

165 logger.exception(f"Failed to delete filesystem file: {file_path}") 

166 return False 

167 

168 @staticmethod 

169 def delete_faiss_index_files(index_path: Optional[str]) -> bool: 

170 """ 

171 Delete FAISS index files. 

172 

173 FAISS stores indices as .faiss and .pkl files. 

174 

175 Args: 

176 index_path: Path to the FAISS index file (without extension) 

177 

178 Returns: 

179 True if files were deleted, False otherwise 

180 """ 

181 if not index_path: 

182 return False 

183 

184 try: 

185 path = Path(index_path) 

186 deleted_any = False 

187 

188 # FAISS index file 

189 faiss_file = path.with_suffix(".faiss") 

190 if faiss_file.is_file(): 

191 faiss_file.unlink() 

192 logger.debug(f"Deleted FAISS index file: {faiss_file}") 

193 deleted_any = True 

194 

195 # Pickle file for metadata 

196 pkl_file = path.with_suffix(".pkl") 

197 if pkl_file.is_file(): 

198 pkl_file.unlink() 

199 logger.debug(f"Deleted FAISS pkl file: {pkl_file}") 

200 deleted_any = True 

201 

202 return deleted_any 

203 except Exception: 

204 logger.exception(f"Failed to delete FAISS files for: {index_path}") 

205 return False 

206 

207 @staticmethod 

208 def delete_rag_indices_for_collection( 

209 session: Session, 

210 collection_name: str, 

211 ) -> Dict[str, Any]: 

212 """ 

213 Delete RAGIndex records and their FAISS files for a collection. 

214 

215 Args: 

216 session: Database session 

217 collection_name: The collection name (e.g., "collection_<uuid>") 

218 

219 Returns: 

220 Dict with deletion results 

221 """ 

222 indices = ( 

223 session.query(RAGIndex) 

224 .filter_by(collection_name=collection_name) 

225 .all() 

226 ) 

227 

228 deleted_indices = 0 

229 deleted_files = 0 

230 

231 for index in indices: 

232 # Delete FAISS files 

233 if CascadeHelper.delete_faiss_index_files(str(index.index_path)): 

234 deleted_files += 1 

235 

236 session.delete(index) 

237 deleted_indices += 1 

238 

239 logger.debug( 

240 f"Deleted {deleted_indices} RAGIndex records and {deleted_files} " 

241 f"FAISS files for collection {collection_name}" 

242 ) 

243 

244 return { 

245 "deleted_indices": deleted_indices, 

246 "deleted_files": deleted_files, 

247 } 

248 

249 @staticmethod 

250 def update_download_tracker( 

251 session: Session, 

252 document: Document, 

253 ) -> bool: 

254 """ 

255 Update DownloadTracker when a document is deleted. 

256 

257 The FK has SET NULL, but we also need to update is_downloaded flag. 

258 

259 Args: 

260 session: Database session 

261 document: The document being deleted 

262 

263 Returns: 

264 True if tracker was updated 

265 """ 

266 if not document.original_url: 

267 return False 

268 

269 # Get URL hash using the same method as library_service 

270 from ...utils import get_url_hash 

271 

272 try: 

273 url_hash = get_url_hash(str(document.original_url)) 

274 tracker = ( 

275 session.query(DownloadTracker) 

276 .filter_by(url_hash=url_hash) 

277 .first() 

278 ) 

279 

280 if tracker: 

281 tracker.is_downloaded = False # type: ignore[assignment] 

282 tracker.file_path = None # type: ignore[assignment] 

283 logger.debug( 

284 f"Updated DownloadTracker for document {document.id[:8]}..." 

285 ) 

286 return True 

287 except Exception: 

288 logger.exception("Failed to update DownloadTracker") 

289 return False 

290 

291 @staticmethod 

292 def count_document_in_collections( 

293 session: Session, 

294 document_id: str, 

295 ) -> int: 

296 """ 

297 Count how many collections a document is in. 

298 

299 Args: 

300 session: Database session 

301 document_id: The document ID 

302 

303 Returns: 

304 Number of collections the document is in 

305 """ 

306 return ( 

307 session.query(DocumentCollection) 

308 .filter_by(document_id=document_id) 

309 .count() 

310 ) 

311 

312 @staticmethod 

313 def get_document_collections( 

314 session: Session, 

315 document_id: str, 

316 ) -> List[str]: 

317 """ 

318 Get all collection IDs a document belongs to. 

319 

320 Args: 

321 session: Database session 

322 document_id: The document ID 

323 

324 Returns: 

325 List of collection IDs 

326 """ 

327 doc_collections = ( 

328 session.query(DocumentCollection.collection_id) 

329 .filter_by(document_id=document_id) 

330 .all() 

331 ) 

332 return [dc.collection_id for dc in doc_collections] 

333 

334 @staticmethod 

335 def remove_from_faiss_index( 

336 username: str, 

337 collection_name: str, 

338 chunk_ids: List[str], 

339 ) -> bool: 

340 """ 

341 Remove specific chunks from a FAISS index. 

342 

343 Args: 

344 username: Username for RAG service 

345 collection_name: Collection name 

346 chunk_ids: List of chunk IDs to remove 

347 

348 Returns: 

349 True if successful 

350 """ 

351 try: 

352 from ...services.library_rag_service import LibraryRAGService 

353 

354 with LibraryRAGService( 

355 username=username, 

356 ) as rag_service: 

357 # This uses the existing remove functionality 

358 if ( 358 ↛ 362line 358 didn't jump to line 362 because the condition on line 358 was never true

359 hasattr(rag_service, "faiss_index") 

360 and rag_service.faiss_index 

361 ): 

362 if hasattr(rag_service.faiss_index, "delete"): 

363 rag_service.faiss_index.delete(chunk_ids) 

364 return True 

365 except Exception: 

366 logger.exception("Failed to remove chunks from FAISS index") 

367 return False 

368 

369 @staticmethod 

370 def delete_document_completely( 

371 session: Session, 

372 document_id: str, 

373 ) -> bool: 

374 """ 

375 Delete a document and all related records using query-based deletes. 

376 

377 This avoids ORM cascade issues where SQLAlchemy tries to set 

378 DocumentBlob.document_id to NULL (which fails because it's a PK). 

379 

380 Deletes in order: 

381 1. DocumentBlob 

382 2. DocumentCollection links 

383 3. Document itself 

384 

385 Note: DocumentChunks should be deleted separately before calling this, 

386 as they may need collection-specific handling. 

387 

388 Args: 

389 session: Database session 

390 document_id: The document ID to delete 

391 

392 Returns: 

393 True if document was deleted 

394 """ 

395 # Delete blob (has document_id as PK, can't be nulled by cascade) 

396 session.query(DocumentBlob).filter_by(document_id=document_id).delete( 

397 synchronize_session=False 

398 ) 

399 

400 # Delete collection links 

401 session.query(DocumentCollection).filter_by( 

402 document_id=document_id 

403 ).delete(synchronize_session=False) 

404 

405 # Delete document itself 

406 deleted = ( 

407 session.query(Document) 

408 .filter_by(id=document_id) 

409 .delete(synchronize_session=False) 

410 ) 

411 

412 if deleted: 

413 logger.debug(f"Deleted document {document_id[:8]}... completely") 

414 

415 return deleted > 0