Coverage for src / local_deep_research / research_library / deletion / services / bulk_deletion.py: 92%

67 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Bulk deletion service. 

3 

4Handles bulk operations: 

5- Delete multiple documents 

6- Delete blobs for multiple documents 

7- Remove multiple documents from a collection 

8""" 

9 

10from typing import Dict, Any, List 

11 

12from loguru import logger 

13 

14from .document_deletion import DocumentDeletionService 

15 

16 

17class BulkDeletionService: 

18 """Service for bulk deletion operations.""" 

19 

20 def __init__(self, username: str): 

21 """ 

22 Initialize bulk deletion service. 

23 

24 Args: 

25 username: Username for database session 

26 """ 

27 self.username = username 

28 self._document_service = DocumentDeletionService(username) 

29 

30 def delete_documents(self, document_ids: List[str]) -> Dict[str, Any]: 

31 """ 

32 Delete multiple documents. 

33 

34 Args: 

35 document_ids: List of document IDs to delete 

36 

37 Returns: 

38 Dict with bulk deletion results: 

39 { 

40 "total": int, 

41 "deleted": int, 

42 "failed": int, 

43 "total_chunks_deleted": int, 

44 "total_bytes_freed": int, 

45 "results": List[Dict], 

46 "errors": List[Dict] 

47 } 

48 """ 

49 result = { 

50 "total": len(document_ids), 

51 "deleted": 0, 

52 "failed": 0, 

53 "total_chunks_deleted": 0, 

54 "total_bytes_freed": 0, 

55 "results": [], 

56 "errors": [], 

57 } 

58 

59 for document_id in document_ids: 

60 delete_result = self._document_service.delete_document(document_id) 

61 

62 if delete_result.get("deleted"): 

63 result["deleted"] += 1 

64 result["total_chunks_deleted"] += delete_result.get( 

65 "chunks_deleted", 0 

66 ) 

67 result["total_bytes_freed"] += delete_result.get("blob_size", 0) 

68 result["results"].append( 

69 { 

70 "document_id": document_id, 

71 "title": delete_result.get("title", "Unknown"), 

72 "chunks_deleted": delete_result.get( 

73 "chunks_deleted", 0 

74 ), 

75 "blob_size": delete_result.get("blob_size", 0), 

76 } 

77 ) 

78 else: 

79 result["failed"] += 1 

80 result["errors"].append( 

81 { 

82 "document_id": document_id, 

83 "error": delete_result.get("error", "Unknown error"), 

84 } 

85 ) 

86 

87 logger.info( 

88 f"Bulk delete: {result['deleted']}/{result['total']} documents, " 

89 f"{result['total_chunks_deleted']} chunks, " 

90 f"{result['total_bytes_freed']} bytes" 

91 ) 

92 

93 return result 

94 

95 def delete_blobs(self, document_ids: List[str]) -> Dict[str, Any]: 

96 """ 

97 Delete PDF binaries for multiple documents, keeping text content. 

98 

99 Args: 

100 document_ids: List of document IDs to delete blobs for 

101 

102 Returns: 

103 Dict with bulk blob deletion results: 

104 { 

105 "total": int, 

106 "deleted": int, 

107 "skipped": int, 

108 "failed": int, 

109 "total_bytes_freed": int, 

110 "results": List[Dict], 

111 "errors": List[Dict] 

112 } 

113 """ 

114 result = { 

115 "total": len(document_ids), 

116 "deleted": 0, 

117 "skipped": 0, 

118 "failed": 0, 

119 "total_bytes_freed": 0, 

120 "results": [], 

121 "errors": [], 

122 } 

123 

124 for document_id in document_ids: 

125 delete_result = self._document_service.delete_blob_only(document_id) 

126 

127 if delete_result.get("deleted"): 

128 result["deleted"] += 1 

129 result["total_bytes_freed"] += delete_result.get( 

130 "bytes_freed", 0 

131 ) 

132 result["results"].append( 

133 { 

134 "document_id": document_id, 

135 "bytes_freed": delete_result.get("bytes_freed", 0), 

136 } 

137 ) 

138 elif "no stored PDF" in delete_result.get("error", "").lower(): 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true

139 result["skipped"] += 1 

140 else: 

141 result["failed"] += 1 

142 result["errors"].append( 

143 { 

144 "document_id": document_id, 

145 "error": delete_result.get("error", "Unknown error"), 

146 } 

147 ) 

148 

149 logger.info( 

150 f"Bulk blob delete: {result['deleted']}/{result['total']} blobs, " 

151 f"{result['total_bytes_freed']} bytes freed" 

152 ) 

153 

154 return result 

155 

156 def remove_documents_from_collection( 

157 self, 

158 document_ids: List[str], 

159 collection_id: str, 

160 ) -> Dict[str, Any]: 

161 """ 

162 Remove multiple documents from a collection. 

163 

164 Documents that are not in any other collection will be deleted. 

165 

166 Args: 

167 document_ids: List of document IDs to remove 

168 collection_id: ID of the collection 

169 

170 Returns: 

171 Dict with bulk removal results: 

172 { 

173 "total": int, 

174 "unlinked": int, 

175 "deleted": int, 

176 "failed": int, 

177 "total_chunks_deleted": int, 

178 "results": List[Dict], 

179 "errors": List[Dict] 

180 } 

181 """ 

182 result = { 

183 "total": len(document_ids), 

184 "unlinked": 0, 

185 "deleted": 0, 

186 "failed": 0, 

187 "total_chunks_deleted": 0, 

188 "results": [], 

189 "errors": [], 

190 } 

191 

192 for document_id in document_ids: 

193 remove_result = self._document_service.remove_from_collection( 

194 document_id, collection_id 

195 ) 

196 

197 if remove_result.get("unlinked"): 

198 result["unlinked"] += 1 

199 result["total_chunks_deleted"] += remove_result.get( 

200 "chunks_deleted", 0 

201 ) 

202 if remove_result.get("document_deleted"): 

203 result["deleted"] += 1 

204 result["results"].append( 

205 { 

206 "document_id": document_id, 

207 "document_deleted": remove_result.get( 

208 "document_deleted", False 

209 ), 

210 "chunks_deleted": remove_result.get( 

211 "chunks_deleted", 0 

212 ), 

213 } 

214 ) 

215 else: 

216 result["failed"] += 1 

217 result["errors"].append( 

218 { 

219 "document_id": document_id, 

220 "error": remove_result.get("error", "Unknown error"), 

221 } 

222 ) 

223 

224 logger.info( 

225 f"Bulk remove from collection: {result['unlinked']}/{result['total']} " 

226 f"unlinked, {result['deleted']} deleted, " 

227 f"{result['total_chunks_deleted']} chunks" 

228 ) 

229 

230 return result 

231 

232 def get_bulk_preview( 

233 self, 

234 document_ids: List[str], 

235 operation: str = "delete", 

236 ) -> Dict[str, Any]: 

237 """ 

238 Get a preview of what will be affected by a bulk operation. 

239 

240 Args: 

241 document_ids: List of document IDs 

242 operation: Type of operation ("delete", "delete_blobs") 

243 

244 Returns: 

245 Dict with preview information 

246 """ 

247 from ....database.models.library import Document, DocumentChunk 

248 from ....database.session_context import get_user_db_session 

249 from ..utils.cascade_helper import CascadeHelper 

250 

251 result = { 

252 "total_documents": len(document_ids), 

253 "found_documents": 0, 

254 "total_blob_size": 0, 

255 "documents_with_blobs": 0, 

256 "total_chunks": 0, 

257 "documents": [], 

258 } 

259 

260 with get_user_db_session(self.username) as session: 

261 for document_id in document_ids: 

262 document = session.query(Document).get(document_id) 

263 if not document: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true

264 continue 

265 

266 result["found_documents"] += 1 

267 blob_size = CascadeHelper.get_document_blob_size( 

268 session, document_id 

269 ) 

270 

271 if blob_size > 0: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 result["documents_with_blobs"] += 1 

273 result["total_blob_size"] += blob_size 

274 

275 chunks = ( 

276 session.query(DocumentChunk) 

277 .filter( 

278 DocumentChunk.source_id == document_id, 

279 DocumentChunk.source_type == "document", 

280 ) 

281 .count() 

282 ) 

283 result["total_chunks"] += chunks 

284 

285 result["documents"].append( 

286 { 

287 "id": document_id, 

288 "title": document.title 

289 or document.filename 

290 or "Untitled", 

291 "has_blob": blob_size > 0, 

292 "blob_size": blob_size, 

293 "chunks_count": chunks, 

294 } 

295 ) 

296 

297 return result