Coverage for src / local_deep_research / research_library / deletion / services / document_deletion.py: 70%

119 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Document deletion service. 

3 

4Handles: 

5- Full document deletion with proper cascade cleanup 

6- Blob-only deletion (remove PDF, keep text) 

7- Remove from collection (unlink or delete if orphaned) 

8""" 

9 

10from typing import Dict, Any 

11 

12from loguru import logger 

13 

14from ....database.models.library import ( 

15 Document, 

16 DocumentChunk, 

17 DocumentCollection, 

18) 

19from ....database.session_context import get_user_db_session 

20from ..utils.cascade_helper import CascadeHelper 

21 

22 

23class DocumentDeletionService: 

24 """Service for document deletion operations.""" 

25 

26 def __init__(self, username: str): 

27 """ 

28 Initialize document deletion service. 

29 

30 Args: 

31 username: Username for database session 

32 """ 

33 self.username = username 

34 

35 def delete_document(self, document_id: str) -> Dict[str, Any]: 

36 """ 

37 Delete a document and ALL related data. 

38 

39 This method ensures complete cleanup: 

40 - DocumentChunks (no FK constraint, manual cleanup required) 

41 - DocumentBlob (CASCADE handles, but we track for stats) 

42 - Filesystem files 

43 - FAISS index entries 

44 - DownloadTracker update 

45 - DocumentCollection links (CASCADE) 

46 - RagDocumentStatus (CASCADE) 

47 

48 Args: 

49 document_id: ID of the document to delete 

50 

51 Returns: 

52 Dict with deletion details: 

53 { 

54 "deleted": True/False, 

55 "document_id": str, 

56 "title": str, 

57 "blob_deleted": bool, 

58 "blob_size": int, 

59 "chunks_deleted": int, 

60 "collections_unlinked": int, 

61 "error": str (if failed) 

62 } 

63 """ 

64 with get_user_db_session(self.username) as session: 

65 try: 

66 # Get document 

67 document = session.query(Document).get(document_id) 

68 if not document: 

69 return { 

70 "deleted": False, 

71 "document_id": document_id, 

72 "error": "Document not found", 

73 } 

74 

75 title = document.title or document.filename or "Untitled" 

76 result = { 

77 "deleted": False, 

78 "document_id": document_id, 

79 "title": title, 

80 "blob_deleted": False, 

81 "blob_size": 0, 

82 "chunks_deleted": 0, 

83 "collections_unlinked": 0, 

84 "file_deleted": False, 

85 } 

86 

87 # 1. Get collections before deletion for chunk cleanup 

88 collections = CascadeHelper.get_document_collections( 

89 session, document_id 

90 ) 

91 result["collections_unlinked"] = len(collections) 

92 

93 # 2. Delete DocumentChunks for ALL collections this document is in 

94 total_chunks_deleted = 0 

95 for collection_id in collections: 

96 collection_name = f"collection_{collection_id}" 

97 chunks_deleted = CascadeHelper.delete_document_chunks( 

98 session, document_id, collection_name 

99 ) 

100 total_chunks_deleted += chunks_deleted 

101 result["chunks_deleted"] = total_chunks_deleted 

102 

103 # 3. Get blob size before deletion (for stats) 

104 result["blob_size"] = CascadeHelper.get_document_blob_size( 

105 session, document_id 

106 ) 

107 result["blob_deleted"] = result["blob_size"] > 0 

108 

109 # 4. Delete filesystem file if exists 

110 if document.storage_mode == "filesystem" and document.file_path: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true

111 from ...utils import get_absolute_path_from_settings 

112 

113 try: 

114 file_path = get_absolute_path_from_settings( 

115 document.file_path 

116 ) 

117 result["file_deleted"] = ( 

118 CascadeHelper.delete_filesystem_file(str(file_path)) 

119 ) 

120 except Exception: 

121 logger.exception("Failed to delete filesystem file") 

122 

123 # 5. Update DownloadTracker 

124 CascadeHelper.update_download_tracker(session, document) 

125 

126 # 6. Delete the document and all related records 

127 CascadeHelper.delete_document_completely(session, document_id) 

128 session.commit() 

129 

130 result["deleted"] = True 

131 logger.info( 

132 f"Deleted document {document_id[:8]}... ({title}): " 

133 f"{total_chunks_deleted} chunks, " 

134 f"{result['blob_size']} bytes blob" 

135 ) 

136 

137 return result 

138 

139 except Exception: 

140 logger.exception(f"Failed to delete document {document_id}") 

141 session.rollback() 

142 return { 

143 "deleted": False, 

144 "document_id": document_id, 

145 "error": "Failed to delete document", 

146 } 

147 

148 def delete_blob_only(self, document_id: str) -> Dict[str, Any]: 

149 """ 

150 Delete PDF binary but keep document metadata and text content. 

151 

152 This saves database space while preserving searchability. 

153 

154 Args: 

155 document_id: ID of the document 

156 

157 Returns: 

158 Dict with deletion details: 

159 { 

160 "deleted": True/False, 

161 "document_id": str, 

162 "bytes_freed": int, 

163 "storage_mode_updated": bool, 

164 "error": str (if failed) 

165 } 

166 """ 

167 with get_user_db_session(self.username) as session: 

168 try: 

169 # Get document 

170 document = session.query(Document).get(document_id) 

171 if not document: 

172 return { 

173 "deleted": False, 

174 "document_id": document_id, 

175 "bytes_freed": 0, 

176 "error": "Document not found", 

177 } 

178 

179 result = { 

180 "deleted": False, 

181 "document_id": document_id, 

182 "bytes_freed": 0, 

183 "storage_mode_updated": False, 

184 } 

185 

186 # Handle based on storage mode 

187 if document.storage_mode == "database": 

188 # Delete blob from database 

189 result["bytes_freed"] = CascadeHelper.delete_document_blob( 

190 session, document_id 

191 ) 

192 

193 elif document.storage_mode == "filesystem": 193 ↛ 195line 193 didn't jump to line 195 because the condition on line 193 was never true

194 # Delete filesystem file 

195 from ...utils import get_absolute_path_from_settings 

196 

197 if document.file_path: 

198 try: 

199 file_path = get_absolute_path_from_settings( 

200 document.file_path 

201 ) 

202 if file_path.exists(): 

203 result["bytes_freed"] = file_path.stat().st_size 

204 CascadeHelper.delete_filesystem_file( 

205 str(file_path) 

206 ) 

207 except Exception: 

208 logger.exception("Failed to delete filesystem file") 

209 

210 else: 

211 # No blob to delete 

212 return { 

213 "deleted": False, 

214 "document_id": document_id, 

215 "bytes_freed": 0, 

216 "error": "Document has no stored PDF (storage_mode is 'none')", 

217 } 

218 

219 # Update document to indicate blob is deleted 

220 document.storage_mode = "none" 

221 document.file_path = "blob_deleted" 

222 result["storage_mode_updated"] = True 

223 

224 session.commit() 

225 result["deleted"] = True 

226 

227 logger.info( 

228 f"Deleted blob for document {document_id[:8]}...: " 

229 f"{result['bytes_freed']} bytes freed" 

230 ) 

231 

232 return result 

233 

234 except Exception: 

235 logger.exception( 

236 f"Failed to delete blob for document {document_id}" 

237 ) 

238 session.rollback() 

239 return { 

240 "deleted": False, 

241 "document_id": document_id, 

242 "bytes_freed": 0, 

243 "error": "Failed to delete document blob", 

244 } 

245 

246 def remove_from_collection( 

247 self, 

248 document_id: str, 

249 collection_id: str, 

250 ) -> Dict[str, Any]: 

251 """ 

252 Remove document from a collection. 

253 

254 If the document is not in any other collection after removal, 

255 it will be completely deleted. 

256 

257 Args: 

258 document_id: ID of the document 

259 collection_id: ID of the collection 

260 

261 Returns: 

262 Dict with operation details: 

263 { 

264 "unlinked": True/False, 

265 "document_deleted": bool, 

266 "document_id": str, 

267 "collection_id": str, 

268 "chunks_deleted": int, 

269 "error": str (if failed) 

270 } 

271 """ 

272 with get_user_db_session(self.username) as session: 

273 try: 

274 # Verify document exists 

275 document = session.query(Document).get(document_id) 

276 if not document: 

277 return { 

278 "unlinked": False, 

279 "document_deleted": False, 

280 "document_id": document_id, 

281 "collection_id": collection_id, 

282 "error": "Document not found", 

283 } 

284 

285 # Verify collection exists and document is in it 

286 doc_collection = ( 

287 session.query(DocumentCollection) 

288 .filter_by( 

289 document_id=document_id, collection_id=collection_id 

290 ) 

291 .first() 

292 ) 

293 

294 if not doc_collection: 

295 return { 

296 "unlinked": False, 

297 "document_deleted": False, 

298 "document_id": document_id, 

299 "collection_id": collection_id, 

300 "error": "Document not in this collection", 

301 } 

302 

303 result = { 

304 "unlinked": False, 

305 "document_deleted": False, 

306 "document_id": document_id, 

307 "collection_id": collection_id, 

308 "chunks_deleted": 0, 

309 } 

310 

311 # Delete chunks for this document in this collection 

312 collection_name = f"collection_{collection_id}" 

313 result["chunks_deleted"] = CascadeHelper.delete_document_chunks( 

314 session, document_id, collection_name 

315 ) 

316 

317 # Remove the link 

318 session.delete(doc_collection) 

319 session.flush() 

320 

321 # Check if document is in any other collection 

322 remaining_count = CascadeHelper.count_document_in_collections( 

323 session, document_id 

324 ) 

325 

326 if remaining_count == 0: 326 ↛ 330line 326 didn't jump to line 330 because the condition on line 326 was never true

327 # Document is orphaned - delete it completely 

328 # Note: We're already in a session, so we need to do this 

329 # directly rather than calling delete_document() 

330 logger.info( 

331 f"Document {document_id[:8]}... is orphaned, deleting" 

332 ) 

333 

334 # Delete remaining chunks (shouldn't be any, but be safe) 

335 session.query(DocumentChunk).filter( 

336 DocumentChunk.source_id == document_id, 

337 DocumentChunk.source_type == "document", 

338 ).delete(synchronize_session=False) 

339 

340 # Update DownloadTracker 

341 CascadeHelper.update_download_tracker(session, document) 

342 

343 # Delete filesystem file if applicable 

344 if ( 

345 document.storage_mode == "filesystem" 

346 and document.file_path 

347 ): 

348 from ...utils import get_absolute_path_from_settings 

349 

350 try: 

351 file_path = get_absolute_path_from_settings( 

352 document.file_path 

353 ) 

354 CascadeHelper.delete_filesystem_file(str(file_path)) 

355 except Exception: 

356 logger.exception("Failed to delete filesystem file") 

357 

358 # Delete document and all related records 

359 CascadeHelper.delete_document_completely( 

360 session, document_id 

361 ) 

362 result["document_deleted"] = True 

363 

364 session.commit() 

365 result["unlinked"] = True 

366 

367 logger.info( 

368 f"Removed document {document_id[:8]}... from collection " 

369 f"{collection_id[:8]}... " 

370 f"(deleted={result['document_deleted']})" 

371 ) 

372 

373 return result 

374 

375 except Exception: 

376 logger.exception( 

377 f"Failed to remove document {document_id} " 

378 f"from collection {collection_id}" 

379 ) 

380 session.rollback() 

381 return { 

382 "unlinked": False, 

383 "document_deleted": False, 

384 "document_id": document_id, 

385 "collection_id": collection_id, 

386 "error": "Failed to remove document from collection", 

387 } 

388 

389 def get_deletion_preview(self, document_id: str) -> Dict[str, Any]: 

390 """ 

391 Get a preview of what will be deleted. 

392 

393 Useful for showing the user what will happen before confirming. 

394 

395 Args: 

396 document_id: ID of the document 

397 

398 Returns: 

399 Dict with preview information 

400 """ 

401 with get_user_db_session(self.username) as session: 

402 document = session.query(Document).get(document_id) 

403 if not document: 

404 return {"found": False, "document_id": document_id} 

405 

406 collections = CascadeHelper.get_document_collections( 

407 session, document_id 

408 ) 

409 

410 # Count chunks 

411 total_chunks = ( 

412 session.query(DocumentChunk) 

413 .filter( 

414 DocumentChunk.source_id == document_id, 

415 DocumentChunk.source_type == "document", 

416 ) 

417 .count() 

418 ) 

419 

420 blob_size = CascadeHelper.get_document_blob_size( 

421 session, document_id 

422 ) 

423 

424 return { 

425 "found": True, 

426 "document_id": document_id, 

427 "title": document.title or document.filename or "Untitled", 

428 "file_type": document.file_type, 

429 "storage_mode": document.storage_mode, 

430 "has_blob": blob_size > 0, 

431 "blob_size": blob_size, 

432 "has_text": bool(document.text_content), 

433 "collections_count": len(collections), 

434 "chunks_count": total_chunks, 

435 }