Coverage for src / local_deep_research / web / routes / history_routes.py: 100%

170 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1import json 

2 

3from flask import Blueprint, jsonify, request, session 

4from loguru import logger 

5from sqlalchemy import func 

6 

7from ...constants import ResearchStatus 

8from ...database.models import ResearchHistory 

9from ...database.models.library import Document as Document 

10from ...database.session_context import get_user_db_session 

11from ..auth.decorators import login_required 

12from ..models.database import ( 

13 get_logs_for_research, 

14 get_total_logs_for_research, 

15) 

16from ..routes.globals import get_active_research_snapshot 

17from ..services.research_service import get_research_strategy 

18from ...security.rate_limiter import limiter 

19from ...security import filter_research_metadata 

20from ..utils.templates import render_template_with_defaults 

21 

22# Create a Blueprint for the history routes 

23history_bp = Blueprint("history", __name__, url_prefix="/history") 

24 

25# NOTE: Routes use session["username"] (not .get()) intentionally. 

26# @login_required guarantees the key exists; direct access fails fast 

27# if the decorator is ever removed. 

28 

29 

30# resolve_report_path removed - reports are now stored in database 

31 

32 

33@history_bp.route("/") 

34@login_required 

35def history_page(): 

36 """Render the history page""" 

37 return render_template_with_defaults("pages/history.html") 

38 

39 

40@history_bp.route("/api", methods=["GET"]) 

41@login_required 

42def get_history(): 

43 """Get the research history JSON data""" 

44 username = session["username"] 

45 

46 try: 

47 limit = request.args.get("limit", 200, type=int) 

48 limit = max(1, min(limit, 500)) 

49 offset = request.args.get("offset", 0, type=int) 

50 offset = max(0, offset) 

51 

52 with get_user_db_session(username) as db_session: 

53 # Single query with JOIN to get history + document counts 

54 results = ( 

55 db_session.query( 

56 ResearchHistory, 

57 func.count(Document.id).label("document_count"), 

58 ) 

59 .outerjoin(Document, Document.research_id == ResearchHistory.id) 

60 .group_by(ResearchHistory.id) 

61 .order_by(ResearchHistory.created_at.desc()) 

62 .limit(limit) 

63 .offset(offset) 

64 .all() 

65 ) 

66 

67 logger.debug(f"All research count: {len(results)}") 

68 

69 # Convert to list of dicts 

70 history = [] 

71 for research, doc_count in results: 

72 item = { 

73 "id": research.id, 

74 "title": research.title, 

75 "query": research.query, 

76 "mode": research.mode, 

77 "status": research.status, 

78 "created_at": research.created_at, 

79 "completed_at": research.completed_at, 

80 "duration_seconds": research.duration_seconds, 

81 "document_count": doc_count, 

82 } 

83 

84 item["metadata"] = filter_research_metadata( 

85 research.research_meta 

86 ) 

87 

88 # Recalculate duration if null but both timestamps exist 

89 if ( 

90 item["duration_seconds"] is None 

91 and item["created_at"] 

92 and item["completed_at"] 

93 ): 

94 try: 

95 from dateutil import parser # type: ignore[import-untyped] 

96 

97 start_time = parser.parse(item["created_at"]) 

98 end_time = parser.parse(item["completed_at"]) 

99 item["duration_seconds"] = int( 

100 (end_time - start_time).total_seconds() 

101 ) 

102 except Exception: 

103 logger.warning("Error recalculating duration") 

104 logger.debug("Duration error details", exc_info=True) 

105 

106 history.append(item) 

107 

108 # Format response to match what client expects 

109 response_data = { 

110 "status": "success", 

111 "items": history, # Use 'items' key as expected by client 

112 } 

113 

114 # CORS headers are handled by SecurityHeaders middleware 

115 return jsonify(response_data) 

116 except Exception: 

117 logger.exception("Error getting history") 

118 return jsonify( 

119 { 

120 "status": "error", 

121 "items": [], 

122 "message": "Failed to retrieve history", 

123 } 

124 ) 

125 

126 

127@history_bp.route("/status/<string:research_id>") 

128@limiter.exempt 

129@login_required 

130def get_research_status(research_id): 

131 username = session["username"] 

132 

133 with get_user_db_session(username) as db_session: 

134 research = ( 

135 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

136 ) 

137 

138 if not research: 

139 return jsonify( 

140 {"status": "error", "message": "Research not found"} 

141 ), 404 

142 

143 # Extract attributes while session is active 

144 # to avoid DetachedInstanceError after the with block exits 

145 result = { 

146 "id": research.id, 

147 "query": research.query, 

148 "mode": research.mode, 

149 "status": research.status, 

150 "created_at": research.created_at, 

151 "completed_at": research.completed_at, 

152 "progress_log": research.progress_log, 

153 "report_path": research.report_path, 

154 } 

155 

156 # Add progress information from active research (atomic snapshot) 

157 snapshot = get_active_research_snapshot(research_id) 

158 if snapshot is not None: 

159 result["progress"] = snapshot["progress"] 

160 result["log"] = snapshot["log"] 

161 elif result.get("status") == ResearchStatus.COMPLETED: 

162 result["progress"] = 100 

163 try: 

164 result["log"] = json.loads(result.get("progress_log", "[]")) 

165 except Exception: 

166 logger.warning( 

167 "Error parsing progress_log for research {}", research_id 

168 ) 

169 result["log"] = [] 

170 else: 

171 result["progress"] = 0 

172 try: 

173 result["log"] = json.loads(result.get("progress_log", "[]")) 

174 except Exception: 

175 logger.warning( 

176 "Error parsing progress_log for research {}", research_id 

177 ) 

178 result["log"] = [] 

179 

180 return jsonify(result) 

181 

182 

183@history_bp.route("/details/<string:research_id>") 

184@login_required 

185def get_research_details(research_id): 

186 """Get detailed progress log for a specific research""" 

187 

188 logger.debug(f"Details route accessed for research_id: {research_id}") 

189 

190 username = session["username"] 

191 

192 try: 

193 with get_user_db_session(username) as db_session: 

194 research = ( 

195 db_session.query(ResearchHistory) 

196 .filter_by(id=research_id) 

197 .first() 

198 ) 

199 logger.debug(f"Research found: {research.id if research else None}") 

200 

201 if not research: 

202 logger.error(f"Research not found for id: {research_id}") 

203 return jsonify( 

204 {"status": "error", "message": "Research not found"} 

205 ), 404 

206 

207 # Extract all needed attributes while session is active 

208 # to avoid DetachedInstanceError after the with block exits 

209 research_data = { 

210 "query": research.query, 

211 "mode": research.mode, 

212 "status": research.status, 

213 "created_at": research.created_at, 

214 "completed_at": research.completed_at, 

215 } 

216 except Exception: 

217 logger.exception("Database error") 

218 return jsonify( 

219 { 

220 "status": "error", 

221 "message": "An internal database error occurred.", 

222 } 

223 ), 500 

224 

225 # Get logs from the dedicated log database 

226 logs = get_logs_for_research(research_id) 

227 

228 # Get strategy information 

229 strategy_name = get_research_strategy(research_id) 

230 

231 # Get an atomic snapshot of active research state 

232 snapshot = get_active_research_snapshot(research_id) 

233 

234 # If this is an active research, merge with any in-memory logs 

235 if snapshot is not None: 

236 # Use the logs from memory temporarily until they're saved to the database 

237 memory_logs = snapshot["log"] 

238 

239 # Filter out logs that are already in the database by timestamp 

240 db_timestamps = {log["time"] for log in logs} 

241 unique_memory_logs = [ 

242 log for log in memory_logs if log["time"] not in db_timestamps 

243 ] 

244 

245 # Add unique memory logs to our return list 

246 logs.extend(unique_memory_logs) 

247 

248 # Sort logs by timestamp 

249 logs.sort(key=lambda x: x["time"]) 

250 

251 progress = ( 

252 snapshot["progress"] 

253 if snapshot is not None 

254 else (100 if research_data["status"] == ResearchStatus.COMPLETED else 0) 

255 ) 

256 

257 return jsonify( 

258 { 

259 "research_id": research_id, 

260 "query": research_data["query"], 

261 "mode": research_data["mode"], 

262 "status": research_data["status"], 

263 "strategy": strategy_name, 

264 "progress": progress, 

265 "created_at": research_data["created_at"], 

266 "completed_at": research_data["completed_at"], 

267 "log": logs, 

268 } 

269 ) 

270 

271 

272@history_bp.route("/report/<string:research_id>") 

273@login_required 

274def get_report(research_id): 

275 from ...storage import get_report_storage 

276 from ..auth.decorators import current_user 

277 

278 username = current_user() 

279 

280 with get_user_db_session(username) as db_session: 

281 research = ( 

282 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

283 ) 

284 

285 if not research: 

286 return jsonify( 

287 {"status": "error", "message": "Report not found"} 

288 ), 404 

289 

290 try: 

291 # Get report using storage abstraction 

292 storage = get_report_storage(session=db_session) 

293 report_data = storage.get_report_with_metadata( 

294 research_id, username 

295 ) 

296 

297 if not report_data: 

298 return jsonify( 

299 {"status": "error", "message": "Report content not found"} 

300 ), 404 

301 

302 # Extract content and metadata 

303 content = report_data.get("content", "") 

304 stored_metadata = report_data.get("metadata", {}) 

305 

306 # Create an enhanced metadata dictionary with database fields 

307 enhanced_metadata = { 

308 "query": research.query, 

309 "mode": research.mode, 

310 "created_at": research.created_at, 

311 "completed_at": research.completed_at, 

312 "duration": research.duration_seconds, 

313 } 

314 

315 # Merge with stored metadata 

316 enhanced_metadata.update(stored_metadata) 

317 

318 return jsonify( 

319 { 

320 "status": "success", 

321 "content": content, 

322 "query": research.query, 

323 "mode": research.mode, 

324 "created_at": research.created_at, 

325 "completed_at": research.completed_at, 

326 "metadata": enhanced_metadata, 

327 } 

328 ) 

329 except Exception: 

330 logger.exception( 

331 "Failed to retrieve report for research {}", research_id 

332 ) 

333 return jsonify( 

334 {"status": "error", "message": "Failed to retrieve report"} 

335 ), 500 

336 

337 

338@history_bp.route("/markdown/<string:research_id>") 

339@login_required 

340def get_markdown(research_id): 

341 """Get markdown export for a specific research""" 

342 from ...storage import get_report_storage 

343 from ..auth.decorators import current_user 

344 

345 username = current_user() 

346 

347 with get_user_db_session(username) as db_session: 

348 research = ( 

349 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

350 ) 

351 

352 if not research: 

353 return jsonify( 

354 {"status": "error", "message": "Report not found"} 

355 ), 404 

356 

357 try: 

358 # Get report using storage abstraction 

359 storage = get_report_storage(session=db_session) 

360 content = storage.get_report(research_id, username) 

361 

362 if not content: 

363 return jsonify( 

364 {"status": "error", "message": "Report content not found"} 

365 ), 404 

366 

367 return jsonify({"status": "success", "content": content}) 

368 except Exception: 

369 logger.exception( 

370 "Failed to retrieve markdown report for research {}", 

371 research_id, 

372 ) 

373 return jsonify( 

374 {"status": "error", "message": "Failed to retrieve report"} 

375 ), 500 

376 

377 

378@history_bp.route("/logs/<string:research_id>") 

379@login_required 

380def get_research_logs(research_id): 

381 """Get logs for a specific research ID""" 

382 username = session["username"] 

383 

384 # First check if the research exists 

385 with get_user_db_session(username) as db_session: 

386 research = ( 

387 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

388 ) 

389 

390 if not research: 

391 return jsonify( 

392 {"status": "error", "message": "Research not found"} 

393 ), 404 

394 

395 # Retrieve logs from the database 

396 logs = get_logs_for_research(research_id) 

397 

398 # Format logs correctly if needed 

399 formatted_logs = [] 

400 for log in logs: 

401 log_entry = log.copy() 

402 # Ensure each log has time, message, and type fields 

403 log_entry["time"] = log.get("time", "") 

404 log_entry["message"] = log.get("message", "No message") 

405 log_entry["type"] = log.get("type", "info") 

406 formatted_logs.append(log_entry) 

407 

408 return jsonify({"status": "success", "logs": formatted_logs}) 

409 

410 

411@history_bp.route("/log_count/<string:research_id>") 

412@login_required 

413def get_log_count(research_id): 

414 """Get the total number of logs for a specific research ID""" 

415 # Get the total number of logs for this research ID 

416 total_logs = get_total_logs_for_research(research_id) 

417 

418 return jsonify({"status": "success", "total_logs": total_logs})