Coverage for src / local_deep_research / web / routes / history_routes.py: 100%
170 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1import json
3from flask import Blueprint, jsonify, request, session
4from loguru import logger
5from sqlalchemy import func
7from ...constants import ResearchStatus
8from ...database.models import ResearchHistory
9from ...database.models.library import Document as Document
10from ...database.session_context import get_user_db_session
11from ..auth.decorators import login_required
12from ..models.database import (
13 get_logs_for_research,
14 get_total_logs_for_research,
15)
16from ..routes.globals import get_active_research_snapshot
17from ..services.research_service import get_research_strategy
18from ...security.rate_limiter import limiter
19from ...security import filter_research_metadata
20from ..utils.templates import render_template_with_defaults
22# Create a Blueprint for the history routes
23history_bp = Blueprint("history", __name__, url_prefix="/history")
25# NOTE: Routes use session["username"] (not .get()) intentionally.
26# @login_required guarantees the key exists; direct access fails fast
27# if the decorator is ever removed.
30# resolve_report_path removed - reports are now stored in database
33@history_bp.route("/")
34@login_required
35def history_page():
36 """Render the history page"""
37 return render_template_with_defaults("pages/history.html")
40@history_bp.route("/api", methods=["GET"])
41@login_required
42def get_history():
43 """Get the research history JSON data"""
44 username = session["username"]
46 try:
47 limit = request.args.get("limit", 200, type=int)
48 limit = max(1, min(limit, 500))
49 offset = request.args.get("offset", 0, type=int)
50 offset = max(0, offset)
52 with get_user_db_session(username) as db_session:
53 # Single query with JOIN to get history + document counts
54 results = (
55 db_session.query(
56 ResearchHistory,
57 func.count(Document.id).label("document_count"),
58 )
59 .outerjoin(Document, Document.research_id == ResearchHistory.id)
60 .group_by(ResearchHistory.id)
61 .order_by(ResearchHistory.created_at.desc())
62 .limit(limit)
63 .offset(offset)
64 .all()
65 )
67 logger.debug(f"All research count: {len(results)}")
69 # Convert to list of dicts
70 history = []
71 for research, doc_count in results:
72 item = {
73 "id": research.id,
74 "title": research.title,
75 "query": research.query,
76 "mode": research.mode,
77 "status": research.status,
78 "created_at": research.created_at,
79 "completed_at": research.completed_at,
80 "duration_seconds": research.duration_seconds,
81 "document_count": doc_count,
82 }
84 item["metadata"] = filter_research_metadata(
85 research.research_meta
86 )
88 # Recalculate duration if null but both timestamps exist
89 if (
90 item["duration_seconds"] is None
91 and item["created_at"]
92 and item["completed_at"]
93 ):
94 try:
95 from dateutil import parser # type: ignore[import-untyped]
97 start_time = parser.parse(item["created_at"])
98 end_time = parser.parse(item["completed_at"])
99 item["duration_seconds"] = int(
100 (end_time - start_time).total_seconds()
101 )
102 except Exception:
103 logger.warning("Error recalculating duration")
104 logger.debug("Duration error details", exc_info=True)
106 history.append(item)
108 # Format response to match what client expects
109 response_data = {
110 "status": "success",
111 "items": history, # Use 'items' key as expected by client
112 }
114 # CORS headers are handled by SecurityHeaders middleware
115 return jsonify(response_data)
116 except Exception:
117 logger.exception("Error getting history")
118 return jsonify(
119 {
120 "status": "error",
121 "items": [],
122 "message": "Failed to retrieve history",
123 }
124 )
127@history_bp.route("/status/<string:research_id>")
128@limiter.exempt
129@login_required
130def get_research_status(research_id):
131 username = session["username"]
133 with get_user_db_session(username) as db_session:
134 research = (
135 db_session.query(ResearchHistory).filter_by(id=research_id).first()
136 )
138 if not research:
139 return jsonify(
140 {"status": "error", "message": "Research not found"}
141 ), 404
143 # Extract attributes while session is active
144 # to avoid DetachedInstanceError after the with block exits
145 result = {
146 "id": research.id,
147 "query": research.query,
148 "mode": research.mode,
149 "status": research.status,
150 "created_at": research.created_at,
151 "completed_at": research.completed_at,
152 "progress_log": research.progress_log,
153 "report_path": research.report_path,
154 }
156 # Add progress information from active research (atomic snapshot)
157 snapshot = get_active_research_snapshot(research_id)
158 if snapshot is not None:
159 result["progress"] = snapshot["progress"]
160 result["log"] = snapshot["log"]
161 elif result.get("status") == ResearchStatus.COMPLETED:
162 result["progress"] = 100
163 try:
164 result["log"] = json.loads(result.get("progress_log", "[]"))
165 except Exception:
166 logger.warning(
167 "Error parsing progress_log for research {}", research_id
168 )
169 result["log"] = []
170 else:
171 result["progress"] = 0
172 try:
173 result["log"] = json.loads(result.get("progress_log", "[]"))
174 except Exception:
175 logger.warning(
176 "Error parsing progress_log for research {}", research_id
177 )
178 result["log"] = []
180 return jsonify(result)
183@history_bp.route("/details/<string:research_id>")
184@login_required
185def get_research_details(research_id):
186 """Get detailed progress log for a specific research"""
188 logger.debug(f"Details route accessed for research_id: {research_id}")
190 username = session["username"]
192 try:
193 with get_user_db_session(username) as db_session:
194 research = (
195 db_session.query(ResearchHistory)
196 .filter_by(id=research_id)
197 .first()
198 )
199 logger.debug(f"Research found: {research.id if research else None}")
201 if not research:
202 logger.error(f"Research not found for id: {research_id}")
203 return jsonify(
204 {"status": "error", "message": "Research not found"}
205 ), 404
207 # Extract all needed attributes while session is active
208 # to avoid DetachedInstanceError after the with block exits
209 research_data = {
210 "query": research.query,
211 "mode": research.mode,
212 "status": research.status,
213 "created_at": research.created_at,
214 "completed_at": research.completed_at,
215 }
216 except Exception:
217 logger.exception("Database error")
218 return jsonify(
219 {
220 "status": "error",
221 "message": "An internal database error occurred.",
222 }
223 ), 500
225 # Get logs from the dedicated log database
226 logs = get_logs_for_research(research_id)
228 # Get strategy information
229 strategy_name = get_research_strategy(research_id)
231 # Get an atomic snapshot of active research state
232 snapshot = get_active_research_snapshot(research_id)
234 # If this is an active research, merge with any in-memory logs
235 if snapshot is not None:
236 # Use the logs from memory temporarily until they're saved to the database
237 memory_logs = snapshot["log"]
239 # Filter out logs that are already in the database by timestamp
240 db_timestamps = {log["time"] for log in logs}
241 unique_memory_logs = [
242 log for log in memory_logs if log["time"] not in db_timestamps
243 ]
245 # Add unique memory logs to our return list
246 logs.extend(unique_memory_logs)
248 # Sort logs by timestamp
249 logs.sort(key=lambda x: x["time"])
251 progress = (
252 snapshot["progress"]
253 if snapshot is not None
254 else (100 if research_data["status"] == ResearchStatus.COMPLETED else 0)
255 )
257 return jsonify(
258 {
259 "research_id": research_id,
260 "query": research_data["query"],
261 "mode": research_data["mode"],
262 "status": research_data["status"],
263 "strategy": strategy_name,
264 "progress": progress,
265 "created_at": research_data["created_at"],
266 "completed_at": research_data["completed_at"],
267 "log": logs,
268 }
269 )
272@history_bp.route("/report/<string:research_id>")
273@login_required
274def get_report(research_id):
275 from ...storage import get_report_storage
276 from ..auth.decorators import current_user
278 username = current_user()
280 with get_user_db_session(username) as db_session:
281 research = (
282 db_session.query(ResearchHistory).filter_by(id=research_id).first()
283 )
285 if not research:
286 return jsonify(
287 {"status": "error", "message": "Report not found"}
288 ), 404
290 try:
291 # Get report using storage abstraction
292 storage = get_report_storage(session=db_session)
293 report_data = storage.get_report_with_metadata(
294 research_id, username
295 )
297 if not report_data:
298 return jsonify(
299 {"status": "error", "message": "Report content not found"}
300 ), 404
302 # Extract content and metadata
303 content = report_data.get("content", "")
304 stored_metadata = report_data.get("metadata", {})
306 # Create an enhanced metadata dictionary with database fields
307 enhanced_metadata = {
308 "query": research.query,
309 "mode": research.mode,
310 "created_at": research.created_at,
311 "completed_at": research.completed_at,
312 "duration": research.duration_seconds,
313 }
315 # Merge with stored metadata
316 enhanced_metadata.update(stored_metadata)
318 return jsonify(
319 {
320 "status": "success",
321 "content": content,
322 "query": research.query,
323 "mode": research.mode,
324 "created_at": research.created_at,
325 "completed_at": research.completed_at,
326 "metadata": enhanced_metadata,
327 }
328 )
329 except Exception:
330 logger.exception(
331 "Failed to retrieve report for research {}", research_id
332 )
333 return jsonify(
334 {"status": "error", "message": "Failed to retrieve report"}
335 ), 500
338@history_bp.route("/markdown/<string:research_id>")
339@login_required
340def get_markdown(research_id):
341 """Get markdown export for a specific research"""
342 from ...storage import get_report_storage
343 from ..auth.decorators import current_user
345 username = current_user()
347 with get_user_db_session(username) as db_session:
348 research = (
349 db_session.query(ResearchHistory).filter_by(id=research_id).first()
350 )
352 if not research:
353 return jsonify(
354 {"status": "error", "message": "Report not found"}
355 ), 404
357 try:
358 # Get report using storage abstraction
359 storage = get_report_storage(session=db_session)
360 content = storage.get_report(research_id, username)
362 if not content:
363 return jsonify(
364 {"status": "error", "message": "Report content not found"}
365 ), 404
367 return jsonify({"status": "success", "content": content})
368 except Exception:
369 logger.exception(
370 "Failed to retrieve markdown report for research {}",
371 research_id,
372 )
373 return jsonify(
374 {"status": "error", "message": "Failed to retrieve report"}
375 ), 500
378@history_bp.route("/logs/<string:research_id>")
379@login_required
380def get_research_logs(research_id):
381 """Get logs for a specific research ID"""
382 username = session["username"]
384 # First check if the research exists
385 with get_user_db_session(username) as db_session:
386 research = (
387 db_session.query(ResearchHistory).filter_by(id=research_id).first()
388 )
390 if not research:
391 return jsonify(
392 {"status": "error", "message": "Research not found"}
393 ), 404
395 # Retrieve logs from the database
396 logs = get_logs_for_research(research_id)
398 # Format logs correctly if needed
399 formatted_logs = []
400 for log in logs:
401 log_entry = log.copy()
402 # Ensure each log has time, message, and type fields
403 log_entry["time"] = log.get("time", "")
404 log_entry["message"] = log.get("message", "No message")
405 log_entry["type"] = log.get("type", "info")
406 formatted_logs.append(log_entry)
408 return jsonify({"status": "success", "logs": formatted_logs})
411@history_bp.route("/log_count/<string:research_id>")
412@login_required
413def get_log_count(research_id):
414 """Get the total number of logs for a specific research ID"""
415 # Get the total number of logs for this research ID
416 total_logs = get_total_logs_for_research(research_id)
418 return jsonify({"status": "success", "total_logs": total_logs})