Coverage for src / local_deep_research / research_library / deletion / services / document_deletion.py: 100%

122 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Document deletion service. 

3 

4Handles: 

5- Full document deletion with proper cascade cleanup 

6- Blob-only deletion (remove PDF, keep text) 

7- Remove from collection (unlink or delete if orphaned) 

8""" 

9 

10from typing import Dict, Any 

11 

12from loguru import logger 

13 

14from ....constants import FILE_PATH_BLOB_DELETED 

15from ....database.models.library import ( 

16 Document, 

17 DocumentChunk, 

18 DocumentCollection, 

19) 

20from ....database.session_context import get_user_db_session 

21from ..utils.cascade_helper import CascadeHelper 

22 

23 

24class DocumentDeletionService: 

25 """Service for document deletion operations.""" 

26 

27 def __init__(self, username: str): 

28 """ 

29 Initialize document deletion service. 

30 

31 Args: 

32 username: Username for database session 

33 """ 

34 self.username = username 

35 

36 def delete_document(self, document_id: str) -> Dict[str, Any]: 

37 """ 

38 Delete a document and ALL related data. 

39 

40 This method ensures complete cleanup: 

41 - DocumentChunks (no FK constraint, manual cleanup required) 

42 - DocumentBlob (CASCADE handles, but we track for stats) 

43 - Filesystem files 

44 - FAISS index entries 

45 - DownloadTracker update 

46 - DocumentCollection links (CASCADE) 

47 - RagDocumentStatus (CASCADE) 

48 

49 Args: 

50 document_id: ID of the document to delete 

51 

52 Returns: 

53 Dict with deletion details: 

54 { 

55 "deleted": True/False, 

56 "document_id": str, 

57 "title": str, 

58 "blob_deleted": bool, 

59 "blob_size": int, 

60 "chunks_deleted": int, 

61 "collections_unlinked": int, 

62 "error": str (if failed) 

63 } 

64 """ 

65 with get_user_db_session(self.username) as session: 

66 try: 

67 # Get document 

68 document = session.query(Document).get(document_id) 

69 if not document: 

70 return { 

71 "deleted": False, 

72 "document_id": document_id, 

73 "error": "Document not found", 

74 } 

75 

76 title = document.title or document.filename or "Untitled" 

77 result: Dict[str, Any] = { 

78 "deleted": False, 

79 "document_id": document_id, 

80 "title": title, 

81 "blob_deleted": False, 

82 "blob_size": 0, 

83 "chunks_deleted": 0, 

84 "collections_unlinked": 0, 

85 "file_deleted": False, 

86 } 

87 

88 # 1. Get collections before deletion for chunk cleanup 

89 collections = CascadeHelper.get_document_collections( 

90 session, document_id 

91 ) 

92 result["collections_unlinked"] = len(collections) 

93 

94 # 2. Delete DocumentChunks for ALL collections this document is in 

95 total_chunks_deleted = 0 

96 for collection_id in collections: 

97 collection_name = f"collection_{collection_id}" 

98 chunks_deleted = CascadeHelper.delete_document_chunks( 

99 session, document_id, collection_name 

100 ) 

101 total_chunks_deleted += chunks_deleted 

102 result["chunks_deleted"] = total_chunks_deleted 

103 

104 # 3. Get blob size before deletion (for stats) 

105 result["blob_size"] = CascadeHelper.get_document_blob_size( 

106 session, document_id 

107 ) 

108 result["blob_deleted"] = result["blob_size"] > 0 

109 

110 # 4. Delete filesystem file if exists 

111 if document.storage_mode == "filesystem" and document.file_path: 

112 from ...utils import get_absolute_path_from_settings 

113 

114 try: 

115 file_path = get_absolute_path_from_settings( 

116 document.file_path 

117 ) 

118 if file_path: 

119 result["file_deleted"] = ( 

120 CascadeHelper.delete_filesystem_file( 

121 str(file_path) 

122 ) 

123 ) 

124 except Exception: 

125 logger.exception("Failed to delete filesystem file") 

126 

127 # 5. Update DownloadTracker 

128 CascadeHelper.update_download_tracker(session, document) 

129 

130 # 6. Delete the document and all related records 

131 CascadeHelper.delete_document_completely(session, document_id) 

132 session.commit() 

133 

134 result["deleted"] = True 

135 logger.info( 

136 f"Deleted document {document_id[:8]}... ({title}): " 

137 f"{total_chunks_deleted} chunks, " 

138 f"{result['blob_size']} bytes blob" 

139 ) 

140 

141 return result 

142 

143 except Exception: 

144 logger.exception(f"Failed to delete document {document_id}") 

145 session.rollback() 

146 return { 

147 "deleted": False, 

148 "document_id": document_id, 

149 "error": "Failed to delete document", 

150 } 

151 

152 def delete_blob_only(self, document_id: str) -> Dict[str, Any]: 

153 """ 

154 Delete PDF binary but keep document metadata and text content. 

155 

156 This saves database space while preserving searchability. 

157 

158 Args: 

159 document_id: ID of the document 

160 

161 Returns: 

162 Dict with deletion details: 

163 { 

164 "deleted": True/False, 

165 "document_id": str, 

166 "bytes_freed": int, 

167 "storage_mode_updated": bool, 

168 "error": str (if failed) 

169 } 

170 """ 

171 with get_user_db_session(self.username) as session: 

172 try: 

173 # Get document 

174 document = session.query(Document).get(document_id) 

175 if not document: 

176 return { 

177 "deleted": False, 

178 "document_id": document_id, 

179 "bytes_freed": 0, 

180 "error": "Document not found", 

181 } 

182 

183 result = { 

184 "deleted": False, 

185 "document_id": document_id, 

186 "bytes_freed": 0, 

187 "storage_mode_updated": False, 

188 } 

189 

190 # Handle based on storage mode 

191 if document.storage_mode == "database": 

192 # Delete blob from database 

193 result["bytes_freed"] = CascadeHelper.delete_document_blob( 

194 session, document_id 

195 ) 

196 

197 elif document.storage_mode == "filesystem": 

198 # Delete filesystem file 

199 from ...utils import get_absolute_path_from_settings 

200 

201 if document.file_path: 

202 try: 

203 file_path = get_absolute_path_from_settings( 

204 document.file_path 

205 ) 

206 if file_path and file_path.is_file(): 

207 result["bytes_freed"] = file_path.stat().st_size 

208 CascadeHelper.delete_filesystem_file( 

209 str(file_path) 

210 ) 

211 except Exception: 

212 logger.exception("Failed to delete filesystem file") 

213 

214 else: 

215 # No blob to delete 

216 return { 

217 "deleted": False, 

218 "document_id": document_id, 

219 "bytes_freed": 0, 

220 "error": "Document has no stored PDF (storage_mode is 'none')", 

221 } 

222 

223 # Update document to indicate blob is deleted 

224 document.storage_mode = "none" 

225 document.file_path = FILE_PATH_BLOB_DELETED 

226 result["storage_mode_updated"] = True 

227 

228 session.commit() 

229 result["deleted"] = True 

230 

231 logger.info( 

232 f"Deleted blob for document {document_id[:8]}...: " 

233 f"{result['bytes_freed']} bytes freed" 

234 ) 

235 

236 return result 

237 

238 except Exception: 

239 logger.exception( 

240 f"Failed to delete blob for document {document_id}" 

241 ) 

242 session.rollback() 

243 return { 

244 "deleted": False, 

245 "document_id": document_id, 

246 "bytes_freed": 0, 

247 "error": "Failed to delete document blob", 

248 } 

249 

250 def remove_from_collection( 

251 self, 

252 document_id: str, 

253 collection_id: str, 

254 ) -> Dict[str, Any]: 

255 """ 

256 Remove document from a collection. 

257 

258 If the document is not in any other collection after removal, 

259 it will be completely deleted. 

260 

261 Args: 

262 document_id: ID of the document 

263 collection_id: ID of the collection 

264 

265 Returns: 

266 Dict with operation details: 

267 { 

268 "unlinked": True/False, 

269 "document_deleted": bool, 

270 "document_id": str, 

271 "collection_id": str, 

272 "chunks_deleted": int, 

273 "error": str (if failed) 

274 } 

275 """ 

276 with get_user_db_session(self.username) as session: 

277 try: 

278 # Verify document exists 

279 document = session.query(Document).get(document_id) 

280 if not document: 

281 return { 

282 "unlinked": False, 

283 "document_deleted": False, 

284 "document_id": document_id, 

285 "collection_id": collection_id, 

286 "error": "Document not found", 

287 } 

288 

289 # Verify collection exists and document is in it 

290 doc_collection = ( 

291 session.query(DocumentCollection) 

292 .filter_by( 

293 document_id=document_id, collection_id=collection_id 

294 ) 

295 .first() 

296 ) 

297 

298 if not doc_collection: 

299 return { 

300 "unlinked": False, 

301 "document_deleted": False, 

302 "document_id": document_id, 

303 "collection_id": collection_id, 

304 "error": "Document not in this collection", 

305 } 

306 

307 result = { 

308 "unlinked": False, 

309 "document_deleted": False, 

310 "document_id": document_id, 

311 "collection_id": collection_id, 

312 "chunks_deleted": 0, 

313 } 

314 

315 # Delete chunks for this document in this collection 

316 collection_name = f"collection_{collection_id}" 

317 result["chunks_deleted"] = CascadeHelper.delete_document_chunks( 

318 session, document_id, collection_name 

319 ) 

320 

321 # Remove the link 

322 session.delete(doc_collection) 

323 session.flush() 

324 

325 # Check if document is in any other collection 

326 remaining_count = CascadeHelper.count_document_in_collections( 

327 session, document_id 

328 ) 

329 

330 if remaining_count == 0: 

331 # Document is orphaned - delete it completely 

332 # Note: We're already in a session, so we need to do this 

333 # directly rather than calling delete_document() 

334 logger.info( 

335 f"Document {document_id[:8]}... is orphaned, deleting" 

336 ) 

337 

338 # Delete remaining chunks (shouldn't be any, but be safe) 

339 session.query(DocumentChunk).filter( 

340 DocumentChunk.source_id == document_id, 

341 DocumentChunk.source_type == "document", 

342 ).delete(synchronize_session=False) 

343 

344 # Update DownloadTracker 

345 CascadeHelper.update_download_tracker(session, document) 

346 

347 # Delete filesystem file if applicable 

348 if ( 

349 document.storage_mode == "filesystem" 

350 and document.file_path 

351 ): 

352 from ...utils import get_absolute_path_from_settings 

353 

354 try: 

355 file_path = get_absolute_path_from_settings( 

356 document.file_path 

357 ) 

358 if file_path: 

359 CascadeHelper.delete_filesystem_file( 

360 str(file_path) 

361 ) 

362 except Exception: 

363 logger.exception("Failed to delete filesystem file") 

364 

365 # Delete document and all related records 

366 CascadeHelper.delete_document_completely( 

367 session, document_id 

368 ) 

369 result["document_deleted"] = True 

370 

371 session.commit() 

372 result["unlinked"] = True 

373 

374 logger.info( 

375 f"Removed document {document_id[:8]}... from collection " 

376 f"{collection_id[:8]}... " 

377 f"(deleted={result['document_deleted']})" 

378 ) 

379 

380 return result 

381 

382 except Exception: 

383 logger.exception( 

384 f"Failed to remove document {document_id} " 

385 f"from collection {collection_id}" 

386 ) 

387 session.rollback() 

388 return { 

389 "unlinked": False, 

390 "document_deleted": False, 

391 "document_id": document_id, 

392 "collection_id": collection_id, 

393 "error": "Failed to remove document from collection", 

394 } 

395 

396 def get_deletion_preview(self, document_id: str) -> Dict[str, Any]: 

397 """ 

398 Get a preview of what will be deleted. 

399 

400 Useful for showing the user what will happen before confirming. 

401 

402 Args: 

403 document_id: ID of the document 

404 

405 Returns: 

406 Dict with preview information 

407 """ 

408 with get_user_db_session(self.username) as session: 

409 document = session.query(Document).get(document_id) 

410 if not document: 

411 return {"found": False, "document_id": document_id} 

412 

413 collections = CascadeHelper.get_document_collections( 

414 session, document_id 

415 ) 

416 

417 # Count chunks 

418 total_chunks = ( 

419 session.query(DocumentChunk) 

420 .filter( 

421 DocumentChunk.source_id == document_id, 

422 DocumentChunk.source_type == "document", 

423 ) 

424 .count() 

425 ) 

426 

427 blob_size = CascadeHelper.get_document_blob_size( 

428 session, document_id 

429 ) 

430 

431 return { 

432 "found": True, 

433 "document_id": document_id, 

434 "title": document.title or document.filename or "Untitled", 

435 "file_type": document.file_type, 

436 "storage_mode": document.storage_mode, 

437 "has_blob": blob_size > 0, 

438 "blob_size": blob_size, 

439 "has_text": bool(document.text_content), 

440 "collections_count": len(collections), 

441 "chunks_count": total_chunks, 

442 }