Coverage for src / local_deep_research / research_library / deletion / services / collection_deletion.py: 94%

77 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Collection deletion service. 

3 

4Handles: 

5- Full collection deletion with proper cleanup 

6- Documents are preserved but unlinked 

7- RAG index and chunks are deleted 

8""" 

9 

10from typing import Dict, Any 

11 

12from loguru import logger 

13 

14from ....database.models.library import ( 

15 Collection, 

16 DocumentCollection, 

17 DocumentChunk, 

18 CollectionFolder, 

19 RAGIndex, 

20 RagDocumentStatus, 

21) 

22from ....database.session_context import get_user_db_session 

23from ..utils.cascade_helper import CascadeHelper 

24 

25 

26class CollectionDeletionService: 

27 """Service for collection deletion operations.""" 

28 

29 def __init__(self, username: str): 

30 """ 

31 Initialize collection deletion service. 

32 

33 Args: 

34 username: Username for database session 

35 """ 

36 self.username = username 

37 

38 def delete_collection( 

39 self, collection_id: str, delete_orphaned_documents: bool = True 

40 ) -> Dict[str, Any]: 

41 """ 

42 Delete a collection and clean up all related data. 

43 

44 By default, orphaned documents (not in any other collection) are deleted. 

45 Set delete_orphaned_documents=False to preserve all documents. 

46 

47 The following are deleted: 

48 - DocumentChunks for this collection 

49 - FAISS index files 

50 - RAGIndex records 

51 - CollectionFolder records (CASCADE) 

52 - DocumentCollection links (CASCADE) 

53 - RagDocumentStatus records (CASCADE) 

54 - Orphaned documents (if delete_orphaned_documents=True) 

55 

56 Args: 

57 collection_id: ID of the collection to delete 

58 delete_orphaned_documents: If True, delete documents not in any 

59 other collection after unlinking 

60 

61 Returns: 

62 Dict with deletion details: 

63 { 

64 "deleted": True/False, 

65 "collection_id": str, 

66 "collection_name": str, 

67 "chunks_deleted": int, 

68 "documents_unlinked": int, 

69 "indices_deleted": int, 

70 "folders_deleted": int, 

71 "orphaned_documents_deleted": int, 

72 "error": str (if failed) 

73 } 

74 """ 

75 with get_user_db_session(self.username) as session: 

76 try: 

77 # Get collection 

78 collection = session.query(Collection).get(collection_id) 

79 if not collection: 

80 return { 

81 "deleted": False, 

82 "collection_id": collection_id, 

83 "error": "Collection not found", 

84 } 

85 

86 collection_name = f"collection_{collection_id}" 

87 result = { 

88 "deleted": False, 

89 "collection_id": collection_id, 

90 "collection_name": collection.name, 

91 "chunks_deleted": 0, 

92 "documents_unlinked": 0, 

93 "indices_deleted": 0, 

94 "folders_deleted": 0, 

95 "orphaned_documents_deleted": 0, 

96 } 

97 

98 # 1. Get document IDs BEFORE deleting links (for orphan check) 

99 doc_ids_in_collection = [ 

100 dc.document_id 

101 for dc in session.query(DocumentCollection) 

102 .filter_by(collection_id=collection_id) 

103 .all() 

104 ] 

105 result["documents_unlinked"] = len(doc_ids_in_collection) 

106 

107 # 2. Delete DocumentChunks for this collection 

108 result["chunks_deleted"] = ( 

109 CascadeHelper.delete_collection_chunks( 

110 session, collection_name 

111 ) 

112 ) 

113 

114 # 3. Delete RAGIndex records and FAISS files 

115 rag_result = CascadeHelper.delete_rag_indices_for_collection( 

116 session, collection_name 

117 ) 

118 result["indices_deleted"] = rag_result["deleted_indices"] 

119 

120 # 4. Count folders before deletion 

121 result["folders_deleted"] = ( 

122 session.query(CollectionFolder) 

123 .filter_by(collection_id=collection_id) 

124 .count() 

125 ) 

126 

127 # 5. Delete DocumentCollection links explicitly before collection 

128 session.query(DocumentCollection).filter_by( 

129 collection_id=collection_id 

130 ).delete(synchronize_session=False) 

131 

132 # 6. Delete linked folders explicitly 

133 session.query(CollectionFolder).filter_by( 

134 collection_id=collection_id 

135 ).delete(synchronize_session=False) 

136 

137 # 7. Delete the collection itself 

138 session.delete(collection) 

139 

140 # 8. Delete orphaned documents if requested 

141 if delete_orphaned_documents: 141 ↛ 159line 141 didn't jump to line 159 because the condition on line 141 was always true

142 for doc_id in doc_ids_in_collection: 

143 # Check if document is in any other collection 

144 remaining = ( 

145 session.query(DocumentCollection) 

146 .filter_by(document_id=doc_id) 

147 .count() 

148 ) 

149 if remaining == 0: 

150 # Document is orphaned - delete it 

151 CascadeHelper.delete_document_completely( 

152 session, doc_id 

153 ) 

154 result["orphaned_documents_deleted"] += 1 

155 logger.info( 

156 f"Deleted orphaned document {doc_id[:8]}..." 

157 ) 

158 

159 session.commit() 

160 

161 result["deleted"] = True 

162 logger.info( 

163 f"Deleted collection {collection_id[:8]}... " 

164 f"({result['collection_name']}): {result['chunks_deleted']} chunks, " 

165 f"{result['documents_unlinked']} documents unlinked, " 

166 f"{result['orphaned_documents_deleted']} orphaned deleted" 

167 ) 

168 

169 return result 

170 

171 except Exception: 

172 logger.exception(f"Failed to delete collection {collection_id}") 

173 session.rollback() 

174 return { 

175 "deleted": False, 

176 "collection_id": collection_id, 

177 "error": "Failed to delete collection", 

178 } 

179 

180 def delete_collection_index_only( 

181 self, collection_id: str 

182 ) -> Dict[str, Any]: 

183 """ 

184 Delete only the RAG index for a collection, keeping the collection itself. 

185 

186 This is useful for rebuilding an index from scratch. 

187 

188 Args: 

189 collection_id: ID of the collection 

190 

191 Returns: 

192 Dict with deletion details 

193 """ 

194 with get_user_db_session(self.username) as session: 

195 try: 

196 # Verify collection exists 

197 collection = session.query(Collection).get(collection_id) 

198 if not collection: 

199 return { 

200 "deleted": False, 

201 "collection_id": collection_id, 

202 "error": "Collection not found", 

203 } 

204 

205 collection_name = f"collection_{collection_id}" 

206 result = { 

207 "deleted": False, 

208 "collection_id": collection_id, 

209 "chunks_deleted": 0, 

210 "indices_deleted": 0, 

211 "documents_reset": 0, 

212 } 

213 

214 # 1. Delete DocumentChunks 

215 result["chunks_deleted"] = ( 

216 CascadeHelper.delete_collection_chunks( 

217 session, collection_name 

218 ) 

219 ) 

220 

221 # 2. Delete RAGIndex records and FAISS files 

222 rag_result = CascadeHelper.delete_rag_indices_for_collection( 

223 session, collection_name 

224 ) 

225 result["indices_deleted"] = rag_result["deleted_indices"] 

226 

227 # 3. Reset DocumentCollection indexed status 

228 result["documents_reset"] = ( 

229 session.query(DocumentCollection) 

230 .filter_by(collection_id=collection_id) 

231 .update({"indexed": False, "chunk_count": 0}) 

232 ) 

233 

234 # 4. Delete RagDocumentStatus for this collection 

235 session.query(RagDocumentStatus).filter_by( 

236 collection_id=collection_id 

237 ).delete(synchronize_session=False) 

238 

239 # 5. Reset collection embedding info 

240 collection.embedding_model = None 

241 collection.embedding_model_type = None 

242 collection.embedding_dimension = None 

243 collection.chunk_size = None 

244 collection.chunk_overlap = None 

245 

246 session.commit() 

247 result["deleted"] = True 

248 

249 logger.info( 

250 f"Deleted index for collection {collection_id[:8]}...: " 

251 f"{result['chunks_deleted']} chunks, " 

252 f"{result['documents_reset']} documents reset" 

253 ) 

254 

255 return result 

256 

257 except Exception: 

258 logger.exception( 

259 f"Failed to delete index for collection {collection_id}" 

260 ) 

261 session.rollback() 

262 return { 

263 "deleted": False, 

264 "collection_id": collection_id, 

265 "error": "Failed to delete collection index", 

266 } 

267 

268 def get_deletion_preview(self, collection_id: str) -> Dict[str, Any]: 

269 """ 

270 Get a preview of what will be deleted. 

271 

272 Useful for showing the user what will happen before confirming. 

273 

274 Args: 

275 collection_id: ID of the collection 

276 

277 Returns: 

278 Dict with preview information 

279 """ 

280 with get_user_db_session(self.username) as session: 

281 collection = session.query(Collection).get(collection_id) 

282 if not collection: 

283 return {"found": False, "collection_id": collection_id} 

284 

285 collection_name = f"collection_{collection_id}" 

286 

287 # Count documents 

288 documents_count = ( 

289 session.query(DocumentCollection) 

290 .filter_by(collection_id=collection_id) 

291 .count() 

292 ) 

293 

294 # Count chunks 

295 chunks_count = ( 

296 session.query(DocumentChunk) 

297 .filter_by(collection_name=collection_name) 

298 .count() 

299 ) 

300 

301 # Count folders 

302 folders_count = ( 

303 session.query(CollectionFolder) 

304 .filter_by(collection_id=collection_id) 

305 .count() 

306 ) 

307 

308 # Check for RAG index 

309 has_index = ( 

310 session.query(RAGIndex) 

311 .filter_by(collection_name=collection_name) 

312 .first() 

313 is not None 

314 ) 

315 

316 return { 

317 "found": True, 

318 "collection_id": collection_id, 

319 "name": collection.name, 

320 "description": collection.description, 

321 "is_default": collection.is_default, 

322 "documents_count": documents_count, 

323 "chunks_count": chunks_count, 

324 "folders_count": folders_count, 

325 "has_rag_index": has_index, 

326 "embedding_model": collection.embedding_model, 

327 }