Coverage for src / local_deep_research / research_library / deletion / utils / cascade_helper.py: 85%

126 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Cascade helper for deletion operations. 

3 

4Handles cleanup of related records that don't have proper FK constraints: 

5- DocumentChunk (source_id has no FK constraint) 

6- FAISS index files 

7- Filesystem files 

8""" 

9 

10from pathlib import Path 

11from typing import Dict, List, Optional, Any 

12 

13from loguru import logger 

14from sqlalchemy.orm import Session 

15 

16from ....database.models.library import ( 

17 Document, 

18 DocumentBlob, 

19 DocumentChunk, 

20 DocumentCollection, 

21 RAGIndex, 

22) 

23from ....database.models.download_tracker import DownloadTracker 

24 

25 

26class CascadeHelper: 

27 """Helper class for cleaning up related records during deletion.""" 

28 

29 @staticmethod 

30 def delete_document_chunks( 

31 session: Session, 

32 document_id: str, 

33 collection_name: Optional[str] = None, 

34 ) -> int: 

35 """ 

36 Delete DocumentChunks for a document. 

37 

38 Since DocumentChunk.source_id has no FK constraint, we must manually 

39 clean up chunks when deleting a document. 

40 

41 Args: 

42 session: Database session 

43 document_id: The document ID to delete chunks for 

44 collection_name: Optional collection name to limit deletion scope 

45 

46 Returns: 

47 Number of chunks deleted 

48 """ 

49 query = session.query(DocumentChunk).filter( 

50 DocumentChunk.source_id == document_id, 

51 DocumentChunk.source_type == "document", 

52 ) 

53 

54 if collection_name: 

55 query = query.filter( 

56 DocumentChunk.collection_name == collection_name 

57 ) 

58 

59 count = query.delete(synchronize_session=False) 

60 logger.debug( 

61 f"Deleted {count} chunks for document {document_id[:8]}..." 

62 + (f" in collection {collection_name}" if collection_name else "") 

63 ) 

64 return count 

65 

66 @staticmethod 

67 def delete_collection_chunks( 

68 session: Session, 

69 collection_name: str, 

70 ) -> int: 

71 """ 

72 Delete all DocumentChunks for a collection. 

73 

74 Args: 

75 session: Database session 

76 collection_name: The collection name (e.g., "collection_<uuid>") 

77 

78 Returns: 

79 Number of chunks deleted 

80 """ 

81 count = ( 

82 session.query(DocumentChunk) 

83 .filter_by(collection_name=collection_name) 

84 .delete(synchronize_session=False) 

85 ) 

86 logger.debug(f"Deleted {count} chunks for collection {collection_name}") 

87 return count 

88 

89 @staticmethod 

90 def get_document_blob_size(session: Session, document_id: str) -> int: 

91 """ 

92 Get the size of a document's blob in bytes. 

93 

94 Args: 

95 session: Database session 

96 document_id: The document ID 

97 

98 Returns: 

99 Size in bytes, or 0 if no blob exists 

100 """ 

101 blob = ( 

102 session.query(DocumentBlob) 

103 .filter_by(document_id=document_id) 

104 .first() 

105 ) 

106 if blob and blob.pdf_binary: 

107 return len(blob.pdf_binary) 

108 return 0 

109 

110 @staticmethod 

111 def delete_document_blob(session: Session, document_id: str) -> int: 

112 """ 

113 Delete a document's blob record. 

114 

115 Note: This is typically handled by CASCADE, but can be called explicitly 

116 for blob-only deletion. 

117 

118 Args: 

119 session: Database session 

120 document_id: The document ID 

121 

122 Returns: 

123 Size of deleted blob in bytes 

124 """ 

125 blob = ( 

126 session.query(DocumentBlob) 

127 .filter_by(document_id=document_id) 

128 .first() 

129 ) 

130 if blob: 

131 size = len(blob.pdf_binary) if blob.pdf_binary else 0 

132 session.delete(blob) 

133 logger.debug( 

134 f"Deleted blob for document {document_id[:8]}... ({size} bytes)" 

135 ) 

136 return size 

137 return 0 

138 

139 @staticmethod 

140 def delete_filesystem_file(file_path: Optional[str]) -> bool: 

141 """ 

142 Delete a file from the filesystem. 

143 

144 Args: 

145 file_path: Path to the file (can be relative or absolute) 

146 

147 Returns: 

148 True if file was deleted, False otherwise 

149 """ 

150 if not file_path: 

151 return False 

152 

153 # Skip special path markers 

154 if file_path in ( 

155 "metadata_only", 

156 "text_only_not_stored", 

157 "blob_deleted", 

158 ): 

159 return False 

160 

161 try: 

162 path = Path(file_path) 

163 if path.exists(): 

164 path.unlink() 

165 logger.debug(f"Deleted filesystem file: {file_path}") 

166 return True 

167 except Exception: 

168 logger.exception(f"Failed to delete filesystem file: {file_path}") 

169 return False 

170 

171 @staticmethod 

172 def delete_faiss_index_files(index_path: Optional[str]) -> bool: 

173 """ 

174 Delete FAISS index files. 

175 

176 FAISS stores indices as .faiss and .pkl files. 

177 

178 Args: 

179 index_path: Path to the FAISS index file (without extension) 

180 

181 Returns: 

182 True if files were deleted, False otherwise 

183 """ 

184 if not index_path: 

185 return False 

186 

187 try: 

188 path = Path(index_path) 

189 deleted_any = False 

190 

191 # FAISS index file 

192 faiss_file = path.with_suffix(".faiss") 

193 if faiss_file.exists(): 

194 faiss_file.unlink() 

195 logger.debug(f"Deleted FAISS index file: {faiss_file}") 

196 deleted_any = True 

197 

198 # Pickle file for metadata 

199 pkl_file = path.with_suffix(".pkl") 

200 if pkl_file.exists(): 

201 pkl_file.unlink() 

202 logger.debug(f"Deleted FAISS pkl file: {pkl_file}") 

203 deleted_any = True 

204 

205 return deleted_any 

206 except Exception: 

207 logger.exception(f"Failed to delete FAISS files for: {index_path}") 

208 return False 

209 

210 @staticmethod 

211 def delete_rag_indices_for_collection( 

212 session: Session, 

213 collection_name: str, 

214 ) -> Dict[str, Any]: 

215 """ 

216 Delete RAGIndex records and their FAISS files for a collection. 

217 

218 Args: 

219 session: Database session 

220 collection_name: The collection name (e.g., "collection_<uuid>") 

221 

222 Returns: 

223 Dict with deletion results 

224 """ 

225 indices = ( 

226 session.query(RAGIndex) 

227 .filter_by(collection_name=collection_name) 

228 .all() 

229 ) 

230 

231 deleted_indices = 0 

232 deleted_files = 0 

233 

234 for index in indices: 

235 # Delete FAISS files 

236 if CascadeHelper.delete_faiss_index_files(index.index_path): 236 ↛ 239line 236 didn't jump to line 239 because the condition on line 236 was always true

237 deleted_files += 1 

238 

239 session.delete(index) 

240 deleted_indices += 1 

241 

242 logger.debug( 

243 f"Deleted {deleted_indices} RAGIndex records and {deleted_files} " 

244 f"FAISS files for collection {collection_name}" 

245 ) 

246 

247 return { 

248 "deleted_indices": deleted_indices, 

249 "deleted_files": deleted_files, 

250 } 

251 

252 @staticmethod 

253 def update_download_tracker( 

254 session: Session, 

255 document: Document, 

256 ) -> bool: 

257 """ 

258 Update DownloadTracker when a document is deleted. 

259 

260 The FK has SET NULL, but we also need to update is_downloaded flag. 

261 

262 Args: 

263 session: Database session 

264 document: The document being deleted 

265 

266 Returns: 

267 True if tracker was updated 

268 """ 

269 if not document.original_url: 

270 return False 

271 

272 # Get URL hash using the same method as library_service 

273 from ...utils import get_url_hash 

274 

275 try: 

276 url_hash = get_url_hash(document.original_url) 

277 tracker = ( 

278 session.query(DownloadTracker) 

279 .filter_by(url_hash=url_hash) 

280 .first() 

281 ) 

282 

283 if tracker: 283 ↛ 292line 283 didn't jump to line 292 because the condition on line 283 was always true

284 tracker.is_downloaded = False 

285 tracker.file_path = None 

286 logger.debug( 

287 f"Updated DownloadTracker for document {document.id[:8]}..." 

288 ) 

289 return True 

290 except Exception: 

291 logger.exception("Failed to update DownloadTracker") 

292 return False 

293 

294 @staticmethod 

295 def count_document_in_collections( 

296 session: Session, 

297 document_id: str, 

298 ) -> int: 

299 """ 

300 Count how many collections a document is in. 

301 

302 Args: 

303 session: Database session 

304 document_id: The document ID 

305 

306 Returns: 

307 Number of collections the document is in 

308 """ 

309 return ( 

310 session.query(DocumentCollection) 

311 .filter_by(document_id=document_id) 

312 .count() 

313 ) 

314 

315 @staticmethod 

316 def get_document_collections( 

317 session: Session, 

318 document_id: str, 

319 ) -> List[str]: 

320 """ 

321 Get all collection IDs a document belongs to. 

322 

323 Args: 

324 session: Database session 

325 document_id: The document ID 

326 

327 Returns: 

328 List of collection IDs 

329 """ 

330 doc_collections = ( 

331 session.query(DocumentCollection.collection_id) 

332 .filter_by(document_id=document_id) 

333 .all() 

334 ) 

335 return [dc.collection_id for dc in doc_collections] 

336 

337 @staticmethod 

338 def remove_from_faiss_index( 

339 username: str, 

340 collection_name: str, 

341 chunk_ids: List[str], 

342 ) -> bool: 

343 """ 

344 Remove specific chunks from a FAISS index. 

345 

346 Args: 

347 username: Username for RAG service 

348 collection_name: Collection name 

349 chunk_ids: List of chunk IDs to remove 

350 

351 Returns: 

352 True if successful 

353 """ 

354 try: 

355 from ..services.library_rag_service import LibraryRAGService 

356 

357 rag_service = LibraryRAGService( 

358 username=username, 

359 collection_name=collection_name, 

360 ) 

361 

362 # This uses the existing remove functionality 

363 if hasattr(rag_service, "faiss_index") and rag_service.faiss_index: 

364 if hasattr(rag_service.faiss_index, "delete"): 

365 rag_service.faiss_index.delete(chunk_ids) 

366 return True 

367 except Exception: 

368 logger.exception("Failed to remove chunks from FAISS index") 

369 return False 

370 

371 @staticmethod 

372 def delete_document_completely( 

373 session: Session, 

374 document_id: str, 

375 ) -> bool: 

376 """ 

377 Delete a document and all related records using query-based deletes. 

378 

379 This avoids ORM cascade issues where SQLAlchemy tries to set 

380 DocumentBlob.document_id to NULL (which fails because it's a PK). 

381 

382 Deletes in order: 

383 1. DocumentBlob 

384 2. DocumentCollection links 

385 3. Document itself 

386 

387 Note: DocumentChunks should be deleted separately before calling this, 

388 as they may need collection-specific handling. 

389 

390 Args: 

391 session: Database session 

392 document_id: The document ID to delete 

393 

394 Returns: 

395 True if document was deleted 

396 """ 

397 # Delete blob (has document_id as PK, can't be nulled by cascade) 

398 session.query(DocumentBlob).filter_by(document_id=document_id).delete( 

399 synchronize_session=False 

400 ) 

401 

402 # Delete collection links 

403 session.query(DocumentCollection).filter_by( 

404 document_id=document_id 

405 ).delete(synchronize_session=False) 

406 

407 # Delete document itself 

408 deleted = ( 

409 session.query(Document) 

410 .filter_by(id=document_id) 

411 .delete(synchronize_session=False) 

412 ) 

413 

414 if deleted: 

415 logger.debug(f"Deleted document {document_id[:8]}... completely") 

416 

417 return deleted > 0