Coverage for src / local_deep_research / web / routes / history_routes.py: 80%

176 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1import json 

2 

3from flask import Blueprint, jsonify, request, session 

4from loguru import logger 

5from sqlalchemy import func 

6 

7from ...constants import ResearchStatus 

8from ...database.models import ResearchHistory 

9from ...database.models.library import Document as Document 

10from ...database.session_context import get_user_db_session 

11from ..auth.decorators import login_required 

12from ..models.database import ( 

13 get_logs_for_research, 

14 get_total_logs_for_research, 

15) 

16from ..routes.globals import get_globals 

17from ..services.research_service import get_research_strategy 

18from ..utils.rate_limiter import limiter 

19from ...security import filter_research_metadata 

20from ..utils.templates import render_template_with_defaults 

21 

22# Create a Blueprint for the history routes 

23history_bp = Blueprint("history", __name__, url_prefix="/history") 

24 

25 

26# resolve_report_path removed - reports are now stored in database 

27 

28 

29@history_bp.route("/") 

30@login_required 

31def history_page(): 

32 """Render the history page""" 

33 return render_template_with_defaults("pages/history.html") 

34 

35 

36@history_bp.route("/api", methods=["GET"]) 

37@login_required 

38def get_history(): 

39 """Get the research history JSON data""" 

40 username = session.get("username") 

41 if not username: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true

42 return jsonify({"status": "error", "message": "Not authenticated"}), 401 

43 

44 try: 

45 limit = request.args.get("limit", 200, type=int) 

46 limit = max(1, min(limit, 500)) 

47 offset = request.args.get("offset", 0, type=int) 

48 offset = max(0, offset) 

49 

50 with get_user_db_session(username) as db_session: 

51 # Single query with JOIN to get history + document counts 

52 results = ( 

53 db_session.query( 

54 ResearchHistory, 

55 func.count(Document.id).label("document_count"), 

56 ) 

57 .outerjoin(Document, Document.research_id == ResearchHistory.id) 

58 .group_by(ResearchHistory.id) 

59 .order_by(ResearchHistory.created_at.desc()) 

60 .limit(limit) 

61 .offset(offset) 

62 .all() 

63 ) 

64 

65 logger.debug(f"All research count: {len(results)}") 

66 

67 # Convert to list of dicts 

68 history = [] 

69 for research, doc_count in results: 

70 item = { 

71 "id": research.id, 

72 "title": research.title, 

73 "query": research.query, 

74 "mode": research.mode, 

75 "status": research.status, 

76 "created_at": research.created_at, 

77 "completed_at": research.completed_at, 

78 "duration_seconds": research.duration_seconds, 

79 "document_count": doc_count, 

80 } 

81 

82 item["metadata"] = filter_research_metadata( 

83 research.research_meta 

84 ) 

85 

86 # Recalculate duration if null but both timestamps exist 

87 if ( 87 ↛ 92line 87 didn't jump to line 92 because the condition on line 87 was never true

88 item["duration_seconds"] is None 

89 and item["created_at"] 

90 and item["completed_at"] 

91 ): 

92 try: 

93 from dateutil import parser 

94 

95 start_time = parser.parse(item["created_at"]) 

96 end_time = parser.parse(item["completed_at"]) 

97 item["duration_seconds"] = int( 

98 (end_time - start_time).total_seconds() 

99 ) 

100 except Exception: 

101 logger.warning("Error recalculating duration") 

102 logger.debug("Duration error details", exc_info=True) 

103 

104 history.append(item) 

105 

106 # Format response to match what client expects 

107 response_data = { 

108 "status": "success", 

109 "items": history, # Use 'items' key as expected by client 

110 } 

111 

112 # CORS headers are handled by SecurityHeaders middleware 

113 return jsonify(response_data) 

114 except Exception: 

115 logger.exception("Error getting history") 

116 return jsonify( 

117 { 

118 "status": "error", 

119 "items": [], 

120 "message": "Failed to retrieve history", 

121 } 

122 ) 

123 

124 

125@history_bp.route("/status/<string:research_id>") 

126@limiter.exempt 

127@login_required 

128def get_research_status(research_id): 

129 username = session.get("username") 

130 if not username: 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true

131 return jsonify({"status": "error", "message": "Not authenticated"}), 401 

132 

133 with get_user_db_session(username) as db_session: 

134 research = ( 

135 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

136 ) 

137 

138 if not research: 

139 return jsonify( 

140 {"status": "error", "message": "Research not found"} 

141 ), 404 

142 

143 # Extract attributes while session is active 

144 # to avoid DetachedInstanceError after the with block exits 

145 result = { 

146 "id": research.id, 

147 "query": research.query, 

148 "mode": research.mode, 

149 "status": research.status, 

150 "created_at": research.created_at, 

151 "completed_at": research.completed_at, 

152 "progress_log": research.progress_log, 

153 "report_path": research.report_path, 

154 } 

155 

156 globals_dict = get_globals() 

157 active_research = globals_dict["active_research"] 

158 

159 # Add progress information 

160 if research_id in active_research: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true

161 result["progress"] = active_research[research_id]["progress"] 

162 result["log"] = active_research[research_id]["log"] 

163 elif result.get("status") == ResearchStatus.COMPLETED: 

164 result["progress"] = 100 

165 try: 

166 result["log"] = json.loads(result.get("progress_log", "[]")) 

167 except Exception: 

168 result["log"] = [] 

169 else: 

170 result["progress"] = 0 

171 try: 

172 result["log"] = json.loads(result.get("progress_log", "[]")) 

173 except Exception: 

174 result["log"] = [] 

175 

176 return jsonify(result) 

177 

178 

179@history_bp.route("/details/<string:research_id>") 

180@login_required 

181def get_research_details(research_id): 

182 """Get detailed progress log for a specific research""" 

183 

184 logger.debug(f"Details route accessed for research_id: {research_id}") 

185 

186 username = session.get("username") 

187 if not username: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true

188 logger.error("No username in session") 

189 return jsonify({"status": "error", "message": "Not authenticated"}), 401 

190 

191 try: 

192 with get_user_db_session(username) as db_session: 

193 research = ( 

194 db_session.query(ResearchHistory) 

195 .filter_by(id=research_id) 

196 .first() 

197 ) 

198 logger.debug(f"Research found: {research.id if research else None}") 

199 

200 if not research: 

201 logger.error(f"Research not found for id: {research_id}") 

202 return jsonify( 

203 {"status": "error", "message": "Research not found"} 

204 ), 404 

205 

206 # Extract all needed attributes while session is active 

207 # to avoid DetachedInstanceError after the with block exits 

208 research_data = { 

209 "query": research.query, 

210 "mode": research.mode, 

211 "status": research.status, 

212 "created_at": research.created_at, 

213 "completed_at": research.completed_at, 

214 } 

215 except Exception: 

216 logger.exception("Database error") 

217 return jsonify( 

218 { 

219 "status": "error", 

220 "message": "An internal database error occurred.", 

221 } 

222 ), 500 

223 

224 # Get logs from the dedicated log database 

225 logs = get_logs_for_research(research_id) 

226 

227 # Get strategy information 

228 strategy_name = get_research_strategy(research_id) 

229 

230 globals_dict = get_globals() 

231 active_research = globals_dict["active_research"] 

232 

233 # If this is an active research, merge with any in-memory logs 

234 if research_id in active_research: 234 ↛ 236line 234 didn't jump to line 236 because the condition on line 234 was never true

235 # Use the logs from memory temporarily until they're saved to the database 

236 memory_logs = active_research[research_id]["log"] 

237 

238 # Filter out logs that are already in the database by timestamp 

239 db_timestamps = {log["time"] for log in logs} 

240 unique_memory_logs = [ 

241 log for log in memory_logs if log["time"] not in db_timestamps 

242 ] 

243 

244 # Add unique memory logs to our return list 

245 logs.extend(unique_memory_logs) 

246 

247 # Sort logs by timestamp 

248 logs.sort(key=lambda x: x["time"]) 

249 

250 return jsonify( 

251 { 

252 "research_id": research_id, 

253 "query": research_data["query"], 

254 "mode": research_data["mode"], 

255 "status": research_data["status"], 

256 "strategy": strategy_name, 

257 "progress": active_research.get(research_id, {}).get( 

258 "progress", 

259 100 

260 if research_data["status"] == ResearchStatus.COMPLETED 

261 else 0, 

262 ), 

263 "created_at": research_data["created_at"], 

264 "completed_at": research_data["completed_at"], 

265 "log": logs, 

266 } 

267 ) 

268 

269 

270@history_bp.route("/report/<string:research_id>") 

271@login_required 

272def get_report(research_id): 

273 from ...storage import get_report_storage 

274 from ..auth.decorators import current_user 

275 

276 username = current_user() 

277 

278 with get_user_db_session(username) as db_session: 

279 research = ( 

280 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

281 ) 

282 

283 if not research: 

284 return jsonify( 

285 {"status": "error", "message": "Report not found"} 

286 ), 404 

287 

288 try: 

289 # Get report using storage abstraction 

290 storage = get_report_storage(session=db_session) 

291 report_data = storage.get_report_with_metadata( 

292 research_id, username 

293 ) 

294 

295 if not report_data: 295 ↛ 296line 295 didn't jump to line 296 because the condition on line 295 was never true

296 return jsonify( 

297 {"status": "error", "message": "Report content not found"} 

298 ), 404 

299 

300 # Extract content and metadata 

301 content = report_data.get("content", "") 

302 stored_metadata = report_data.get("metadata", {}) 

303 

304 # Create an enhanced metadata dictionary with database fields 

305 enhanced_metadata = { 

306 "query": research.query, 

307 "mode": research.mode, 

308 "created_at": research.created_at, 

309 "completed_at": research.completed_at, 

310 "duration": research.duration_seconds, 

311 } 

312 

313 # Merge with stored metadata 

314 enhanced_metadata.update(stored_metadata) 

315 

316 return jsonify( 

317 { 

318 "status": "success", 

319 "content": content, 

320 "query": research.query, 

321 "mode": research.mode, 

322 "created_at": research.created_at, 

323 "completed_at": research.completed_at, 

324 "metadata": enhanced_metadata, 

325 } 

326 ) 

327 except Exception: 

328 return jsonify( 

329 {"status": "error", "message": "Failed to retrieve report"} 

330 ), 500 

331 

332 

333@history_bp.route("/markdown/<string:research_id>") 

334@login_required 

335def get_markdown(research_id): 

336 """Get markdown export for a specific research""" 

337 from ...storage import get_report_storage 

338 from ..auth.decorators import current_user 

339 

340 username = current_user() 

341 

342 with get_user_db_session(username) as db_session: 

343 research = ( 

344 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

345 ) 

346 

347 if not research: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true

348 return jsonify( 

349 {"status": "error", "message": "Report not found"} 

350 ), 404 

351 

352 try: 

353 # Get report using storage abstraction 

354 storage = get_report_storage(session=db_session) 

355 content = storage.get_report(research_id, username) 

356 

357 if not content: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true

358 return jsonify( 

359 {"status": "error", "message": "Report content not found"} 

360 ), 404 

361 

362 return jsonify({"status": "success", "content": content}) 

363 except Exception: 

364 return jsonify( 

365 {"status": "error", "message": "Failed to retrieve report"} 

366 ), 500 

367 

368 

369@history_bp.route("/logs/<string:research_id>") 

370@login_required 

371def get_research_logs(research_id): 

372 """Get logs for a specific research ID""" 

373 username = session.get("username") 

374 if not username: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true

375 return jsonify({"status": "error", "message": "Not authenticated"}), 401 

376 

377 # First check if the research exists 

378 with get_user_db_session(username) as db_session: 

379 research = ( 

380 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

381 ) 

382 

383 if not research: 

384 return jsonify( 

385 {"status": "error", "message": "Research not found"} 

386 ), 404 

387 

388 # Retrieve logs from the database 

389 logs = get_logs_for_research(research_id) 

390 

391 # Format logs correctly if needed 

392 formatted_logs = [] 

393 for log in logs: 

394 log_entry = log.copy() 

395 # Ensure each log has time, message, and type fields 

396 log_entry["time"] = log.get("time", "") 

397 log_entry["message"] = log.get("message", "No message") 

398 log_entry["type"] = log.get("type", "info") 

399 formatted_logs.append(log_entry) 

400 

401 return jsonify({"status": "success", "logs": formatted_logs}) 

402 

403 

404@history_bp.route("/log_count/<string:research_id>") 

405@login_required 

406def get_log_count(research_id): 

407 """Get the total number of logs for a specific research ID""" 

408 # Get the total number of logs for this research ID 

409 total_logs = get_total_logs_for_research(research_id) 

410 

411 return jsonify({"status": "success", "total_logs": total_logs})