Coverage for src / local_deep_research / web / routes / history_routes.py: 80%
176 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1import json
3from flask import Blueprint, jsonify, request, session
4from loguru import logger
5from sqlalchemy import func
7from ...constants import ResearchStatus
8from ...database.models import ResearchHistory
9from ...database.models.library import Document as Document
10from ...database.session_context import get_user_db_session
11from ..auth.decorators import login_required
12from ..models.database import (
13 get_logs_for_research,
14 get_total_logs_for_research,
15)
16from ..routes.globals import get_globals
17from ..services.research_service import get_research_strategy
18from ..utils.rate_limiter import limiter
19from ...security import filter_research_metadata
20from ..utils.templates import render_template_with_defaults
22# Create a Blueprint for the history routes
23history_bp = Blueprint("history", __name__, url_prefix="/history")
26# resolve_report_path removed - reports are now stored in database
29@history_bp.route("/")
30@login_required
31def history_page():
32 """Render the history page"""
33 return render_template_with_defaults("pages/history.html")
36@history_bp.route("/api", methods=["GET"])
37@login_required
38def get_history():
39 """Get the research history JSON data"""
40 username = session.get("username")
41 if not username: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true
42 return jsonify({"status": "error", "message": "Not authenticated"}), 401
44 try:
45 limit = request.args.get("limit", 200, type=int)
46 limit = max(1, min(limit, 500))
47 offset = request.args.get("offset", 0, type=int)
48 offset = max(0, offset)
50 with get_user_db_session(username) as db_session:
51 # Single query with JOIN to get history + document counts
52 results = (
53 db_session.query(
54 ResearchHistory,
55 func.count(Document.id).label("document_count"),
56 )
57 .outerjoin(Document, Document.research_id == ResearchHistory.id)
58 .group_by(ResearchHistory.id)
59 .order_by(ResearchHistory.created_at.desc())
60 .limit(limit)
61 .offset(offset)
62 .all()
63 )
65 logger.debug(f"All research count: {len(results)}")
67 # Convert to list of dicts
68 history = []
69 for research, doc_count in results:
70 item = {
71 "id": research.id,
72 "title": research.title,
73 "query": research.query,
74 "mode": research.mode,
75 "status": research.status,
76 "created_at": research.created_at,
77 "completed_at": research.completed_at,
78 "duration_seconds": research.duration_seconds,
79 "document_count": doc_count,
80 }
82 item["metadata"] = filter_research_metadata(
83 research.research_meta
84 )
86 # Recalculate duration if null but both timestamps exist
87 if ( 87 ↛ 92line 87 didn't jump to line 92 because the condition on line 87 was never true
88 item["duration_seconds"] is None
89 and item["created_at"]
90 and item["completed_at"]
91 ):
92 try:
93 from dateutil import parser
95 start_time = parser.parse(item["created_at"])
96 end_time = parser.parse(item["completed_at"])
97 item["duration_seconds"] = int(
98 (end_time - start_time).total_seconds()
99 )
100 except Exception:
101 logger.warning("Error recalculating duration")
102 logger.debug("Duration error details", exc_info=True)
104 history.append(item)
106 # Format response to match what client expects
107 response_data = {
108 "status": "success",
109 "items": history, # Use 'items' key as expected by client
110 }
112 # CORS headers are handled by SecurityHeaders middleware
113 return jsonify(response_data)
114 except Exception:
115 logger.exception("Error getting history")
116 return jsonify(
117 {
118 "status": "error",
119 "items": [],
120 "message": "Failed to retrieve history",
121 }
122 )
125@history_bp.route("/status/<string:research_id>")
126@limiter.exempt
127@login_required
128def get_research_status(research_id):
129 username = session.get("username")
130 if not username: 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true
131 return jsonify({"status": "error", "message": "Not authenticated"}), 401
133 with get_user_db_session(username) as db_session:
134 research = (
135 db_session.query(ResearchHistory).filter_by(id=research_id).first()
136 )
138 if not research:
139 return jsonify(
140 {"status": "error", "message": "Research not found"}
141 ), 404
143 # Extract attributes while session is active
144 # to avoid DetachedInstanceError after the with block exits
145 result = {
146 "id": research.id,
147 "query": research.query,
148 "mode": research.mode,
149 "status": research.status,
150 "created_at": research.created_at,
151 "completed_at": research.completed_at,
152 "progress_log": research.progress_log,
153 "report_path": research.report_path,
154 }
156 globals_dict = get_globals()
157 active_research = globals_dict["active_research"]
159 # Add progress information
160 if research_id in active_research: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true
161 result["progress"] = active_research[research_id]["progress"]
162 result["log"] = active_research[research_id]["log"]
163 elif result.get("status") == ResearchStatus.COMPLETED:
164 result["progress"] = 100
165 try:
166 result["log"] = json.loads(result.get("progress_log", "[]"))
167 except Exception:
168 result["log"] = []
169 else:
170 result["progress"] = 0
171 try:
172 result["log"] = json.loads(result.get("progress_log", "[]"))
173 except Exception:
174 result["log"] = []
176 return jsonify(result)
179@history_bp.route("/details/<string:research_id>")
180@login_required
181def get_research_details(research_id):
182 """Get detailed progress log for a specific research"""
184 logger.debug(f"Details route accessed for research_id: {research_id}")
186 username = session.get("username")
187 if not username: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true
188 logger.error("No username in session")
189 return jsonify({"status": "error", "message": "Not authenticated"}), 401
191 try:
192 with get_user_db_session(username) as db_session:
193 research = (
194 db_session.query(ResearchHistory)
195 .filter_by(id=research_id)
196 .first()
197 )
198 logger.debug(f"Research found: {research.id if research else None}")
200 if not research:
201 logger.error(f"Research not found for id: {research_id}")
202 return jsonify(
203 {"status": "error", "message": "Research not found"}
204 ), 404
206 # Extract all needed attributes while session is active
207 # to avoid DetachedInstanceError after the with block exits
208 research_data = {
209 "query": research.query,
210 "mode": research.mode,
211 "status": research.status,
212 "created_at": research.created_at,
213 "completed_at": research.completed_at,
214 }
215 except Exception:
216 logger.exception("Database error")
217 return jsonify(
218 {
219 "status": "error",
220 "message": "An internal database error occurred.",
221 }
222 ), 500
224 # Get logs from the dedicated log database
225 logs = get_logs_for_research(research_id)
227 # Get strategy information
228 strategy_name = get_research_strategy(research_id)
230 globals_dict = get_globals()
231 active_research = globals_dict["active_research"]
233 # If this is an active research, merge with any in-memory logs
234 if research_id in active_research: 234 ↛ 236line 234 didn't jump to line 236 because the condition on line 234 was never true
235 # Use the logs from memory temporarily until they're saved to the database
236 memory_logs = active_research[research_id]["log"]
238 # Filter out logs that are already in the database by timestamp
239 db_timestamps = {log["time"] for log in logs}
240 unique_memory_logs = [
241 log for log in memory_logs if log["time"] not in db_timestamps
242 ]
244 # Add unique memory logs to our return list
245 logs.extend(unique_memory_logs)
247 # Sort logs by timestamp
248 logs.sort(key=lambda x: x["time"])
250 return jsonify(
251 {
252 "research_id": research_id,
253 "query": research_data["query"],
254 "mode": research_data["mode"],
255 "status": research_data["status"],
256 "strategy": strategy_name,
257 "progress": active_research.get(research_id, {}).get(
258 "progress",
259 100
260 if research_data["status"] == ResearchStatus.COMPLETED
261 else 0,
262 ),
263 "created_at": research_data["created_at"],
264 "completed_at": research_data["completed_at"],
265 "log": logs,
266 }
267 )
270@history_bp.route("/report/<string:research_id>")
271@login_required
272def get_report(research_id):
273 from ...storage import get_report_storage
274 from ..auth.decorators import current_user
276 username = current_user()
278 with get_user_db_session(username) as db_session:
279 research = (
280 db_session.query(ResearchHistory).filter_by(id=research_id).first()
281 )
283 if not research:
284 return jsonify(
285 {"status": "error", "message": "Report not found"}
286 ), 404
288 try:
289 # Get report using storage abstraction
290 storage = get_report_storage(session=db_session)
291 report_data = storage.get_report_with_metadata(
292 research_id, username
293 )
295 if not report_data: 295 ↛ 296line 295 didn't jump to line 296 because the condition on line 295 was never true
296 return jsonify(
297 {"status": "error", "message": "Report content not found"}
298 ), 404
300 # Extract content and metadata
301 content = report_data.get("content", "")
302 stored_metadata = report_data.get("metadata", {})
304 # Create an enhanced metadata dictionary with database fields
305 enhanced_metadata = {
306 "query": research.query,
307 "mode": research.mode,
308 "created_at": research.created_at,
309 "completed_at": research.completed_at,
310 "duration": research.duration_seconds,
311 }
313 # Merge with stored metadata
314 enhanced_metadata.update(stored_metadata)
316 return jsonify(
317 {
318 "status": "success",
319 "content": content,
320 "query": research.query,
321 "mode": research.mode,
322 "created_at": research.created_at,
323 "completed_at": research.completed_at,
324 "metadata": enhanced_metadata,
325 }
326 )
327 except Exception:
328 return jsonify(
329 {"status": "error", "message": "Failed to retrieve report"}
330 ), 500
333@history_bp.route("/markdown/<string:research_id>")
334@login_required
335def get_markdown(research_id):
336 """Get markdown export for a specific research"""
337 from ...storage import get_report_storage
338 from ..auth.decorators import current_user
340 username = current_user()
342 with get_user_db_session(username) as db_session:
343 research = (
344 db_session.query(ResearchHistory).filter_by(id=research_id).first()
345 )
347 if not research: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true
348 return jsonify(
349 {"status": "error", "message": "Report not found"}
350 ), 404
352 try:
353 # Get report using storage abstraction
354 storage = get_report_storage(session=db_session)
355 content = storage.get_report(research_id, username)
357 if not content: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true
358 return jsonify(
359 {"status": "error", "message": "Report content not found"}
360 ), 404
362 return jsonify({"status": "success", "content": content})
363 except Exception:
364 return jsonify(
365 {"status": "error", "message": "Failed to retrieve report"}
366 ), 500
369@history_bp.route("/logs/<string:research_id>")
370@login_required
371def get_research_logs(research_id):
372 """Get logs for a specific research ID"""
373 username = session.get("username")
374 if not username: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true
375 return jsonify({"status": "error", "message": "Not authenticated"}), 401
377 # First check if the research exists
378 with get_user_db_session(username) as db_session:
379 research = (
380 db_session.query(ResearchHistory).filter_by(id=research_id).first()
381 )
383 if not research:
384 return jsonify(
385 {"status": "error", "message": "Research not found"}
386 ), 404
388 # Retrieve logs from the database
389 logs = get_logs_for_research(research_id)
391 # Format logs correctly if needed
392 formatted_logs = []
393 for log in logs:
394 log_entry = log.copy()
395 # Ensure each log has time, message, and type fields
396 log_entry["time"] = log.get("time", "")
397 log_entry["message"] = log.get("message", "No message")
398 log_entry["type"] = log.get("type", "info")
399 formatted_logs.append(log_entry)
401 return jsonify({"status": "success", "logs": formatted_logs})
404@history_bp.route("/log_count/<string:research_id>")
405@login_required
406def get_log_count(research_id):
407 """Get the total number of logs for a specific research ID"""
408 # Get the total number of logs for this research ID
409 total_logs = get_total_logs_for_research(research_id)
411 return jsonify({"status": "success", "total_logs": total_logs})