Coverage for src / local_deep_research / web / routes / research_routes_orm.py: 66%
180 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Research routes using SQLAlchemy ORM instead of raw SQL.
3This is a converted version showing how to replace raw SQL with ORM queries.
4"""
6import json
7from datetime import datetime, UTC
8from pathlib import Path
10from flask import (
11 Blueprint,
12 jsonify,
13 request,
14)
15from loguru import logger
17from ...config.paths import get_research_outputs_directory
18from ...constants import ResearchStatus
19from ...database.models import ResearchHistory
20from ...database.session_context import get_user_db_session
21from ...security import filter_research_metadata, strip_settings_snapshot
22from ..auth.decorators import login_required
23from ..models.database import calculate_duration
24from .globals import active_research, termination_flags
26# Create a Blueprint for the research application
27research_bp = Blueprint("research", __name__)
29# Output directory for research results
30OUTPUT_DIR = get_research_outputs_directory()
33# Example conversions from the original file:
36def check_research_status_orm(research_id):
37 """
38 Check research status using ORM instead of raw SQL.
40 Original SQL:
41 SELECT status FROM research_history WHERE id = ?
42 """
43 with get_user_db_session() as db_session:
44 research = (
45 db_session.query(ResearchHistory).filter_by(id=research_id).first()
46 )
47 return research.status if research else None
50def update_research_status_orm(research_id, new_status):
51 """
52 Update research status using ORM.
54 Original SQL:
55 UPDATE research_history SET status = ? WHERE id = ?
56 """
57 with get_user_db_session() as db_session:
58 research = (
59 db_session.query(ResearchHistory).filter_by(id=research_id).first()
60 )
61 if research:
62 research.status = new_status
63 db_session.commit()
64 return True
65 return False
68def update_progress_log_orm(research_id, progress_log):
69 """
70 Update progress log using ORM.
72 Original SQL:
73 UPDATE research_history SET progress_log = ? WHERE id = ?
74 """
75 with get_user_db_session() as db_session:
76 research = (
77 db_session.query(ResearchHistory).filter_by(id=research_id).first()
78 )
79 if research:
80 research.progress_log = progress_log
81 db_session.commit()
82 return True
83 return False
86@research_bp.route("/api/start_research", methods=["POST"])
87@login_required
88def start_research():
89 """Start research with ORM operations."""
90 data = request.json
91 query = data.get("query")
92 mode = data.get("mode", "quick")
94 # ... validation code ...
96 # Check if there's any active research that's actually still running
97 if active_research:
98 # Verify each active research is still valid
99 stale_research_ids = []
100 with get_user_db_session() as db_session:
101 for research_id, research_data in list(active_research.items()):
102 # Check database status using ORM
103 research = (
104 db_session.query(ResearchHistory)
105 .filter_by(id=research_id)
106 .first()
107 )
109 # If the research doesn't exist in DB or is not in_progress, it's stale
110 if (
111 not research
112 or research.status != ResearchStatus.IN_PROGRESS
113 or (
114 not research_data.get("thread")
115 or not research_data.get("thread").is_alive()
116 )
117 ):
118 stale_research_ids.append(research_id)
120 # Clean up any stale research processes
121 for stale_id in stale_research_ids:
122 logger.info(f"Cleaning up stale research process: {stale_id}")
123 if stale_id in active_research:
124 del active_research[stale_id]
125 if stale_id in termination_flags:
126 del termination_flags[stale_id]
128 # Create a record in the database with ORM
129 created_at = datetime.now(UTC).isoformat()
131 # Save research settings in the metadata field
132 research_settings = {
133 "model_provider": data.get("model_provider", "OLLAMA"),
134 "model": data.get("model"),
135 # ... other settings ...
136 }
138 with get_user_db_session() as db_session:
139 research = ResearchHistory(
140 query=query,
141 mode=mode,
142 status=ResearchStatus.IN_PROGRESS,
143 created_at=created_at,
144 progress_log=[{"time": created_at, "progress": 0}],
145 research_meta=research_settings,
146 )
147 db_session.add(research)
148 db_session.commit()
149 research_id = research.id
151 # Start the research process
152 # ... rest of the function ...
154 return jsonify({"status": "success", "research_id": research_id})
157@research_bp.route("/api/terminate/<string:research_id>", methods=["POST"])
158@login_required
159def terminate_research(research_id):
160 """Terminate research using ORM."""
161 try:
162 with get_user_db_session() as db_session:
163 # Check if the research exists and is in progress
164 research = (
165 db_session.query(ResearchHistory)
166 .filter_by(id=research_id)
167 .first()
168 )
170 if not research:
171 return jsonify(
172 {"status": "error", "message": "Research not found"}
173 ), 404
175 # If it's not in progress, return an error
176 if research.status != ResearchStatus.IN_PROGRESS: 176 ↛ 188line 176 didn't jump to line 188 because the condition on line 176 was always true
177 return (
178 jsonify(
179 {
180 "status": "error",
181 "message": "Research is not in progress",
182 }
183 ),
184 400,
185 )
187 # Check if it's in the active_research dict
188 if research_id not in active_research:
189 # Update the status in the database
190 research.status = ResearchStatus.SUSPENDED
191 db_session.commit()
192 return jsonify(
193 {"status": "success", "message": "Research terminated"}
194 )
196 # Set the termination flag
197 termination_flags[research_id] = True
199 # Log the termination request
200 timestamp = datetime.now(UTC).isoformat()
201 termination_message = "Research termination requested by user"
202 current_progress = active_research[research_id]["progress"]
204 # Create log entry
205 log_entry = {
206 "time": timestamp,
207 "message": termination_message,
208 "progress": current_progress,
209 "metadata": {"phase": "termination"},
210 }
212 # Add to in-memory log
213 active_research[research_id]["log"].append(log_entry)
215 # Update the log in the database
216 if research.progress_log:
217 try:
218 current_log = research.progress_log
219 if isinstance(current_log, str):
220 current_log = json.loads(current_log)
221 except Exception:
222 current_log = []
223 else:
224 current_log = []
226 current_log.append(log_entry)
227 research.progress_log = current_log
228 research.status = ResearchStatus.SUSPENDED
229 db_session.commit()
231 logger.log("MILESTONE", f"Research ended: {termination_message}")
233 return jsonify({"status": "success", "message": "Research terminated"})
235 finally:
236 db_session.close()
239@research_bp.route("/api/delete/<string:research_id>", methods=["DELETE"])
240@login_required
241def delete_research(research_id):
242 """Delete research using ORM."""
243 db_session = get_user_db_session()
245 try:
246 # Get the research record
247 research = (
248 db_session.query(ResearchHistory).filter_by(id=research_id).first()
249 )
251 if not research:
252 return jsonify(
253 {"status": "error", "message": "Research not found"}
254 ), 404
256 # Get report path before deletion
257 report_path = research.report_path
259 # Delete from database
260 db_session.delete(research)
261 db_session.commit()
263 # Delete report file if exists
264 if report_path and Path(report_path).exists(): 264 ↛ 271line 264 didn't jump to line 271 because the condition on line 264 was always true
265 try:
266 Path(report_path).unlink()
267 logger.info(f"Deleted report file: {report_path}")
268 except Exception:
269 logger.exception("Failed to delete report file")
271 return jsonify(
272 {"status": "success", "message": "Research deleted successfully"}
273 )
275 except Exception:
276 db_session.rollback()
277 logger.exception("Error deleting research")
278 return jsonify(
279 {
280 "status": "error",
281 "message": "An internal error occurred while deleting the research.",
282 }
283 ), 500
284 finally:
285 db_session.close()
288@research_bp.route("/api/clear_history", methods=["POST"])
289@login_required
290def clear_history():
291 """Clear history using ORM."""
292 db_session = get_user_db_session()
294 try:
295 # Get all research records
296 all_research = db_session.query(ResearchHistory).all()
298 # Delete report files
299 deleted_files = 0
300 for research in all_research:
301 if research.report_path and Path(research.report_path).exists(): 301 ↛ 300line 301 didn't jump to line 300 because the condition on line 301 was always true
302 try:
303 Path(research.report_path).unlink()
304 deleted_files += 1
305 except Exception:
306 logger.exception(
307 f"Failed to delete file {research.report_path}"
308 )
310 # Delete all records
311 deleted_count = db_session.query(ResearchHistory).delete()
312 db_session.commit()
314 logger.info(
315 f"Cleared history: {deleted_count} records, {deleted_files} files"
316 )
318 return jsonify(
319 {
320 "status": "success",
321 "message": f"Deleted {deleted_count} research records and {deleted_files} report files",
322 }
323 )
325 except Exception:
326 db_session.rollback()
327 logger.exception("Error clearing history")
328 return jsonify(
329 {
330 "status": "error",
331 "message": "An internal error occurred while clearing the history.",
332 }
333 ), 500
334 finally:
335 db_session.close()
338@research_bp.route("/api/history", methods=["GET"])
339@login_required
340def api_get_history():
341 """Get history using ORM with pagination."""
342 page = max(1, request.args.get("page", 1, type=int))
343 per_page = request.args.get("per_page", 50, type=int)
344 per_page = max(1, min(per_page, 500))
346 db_session = get_user_db_session()
348 try:
349 # Query with pagination
350 query = db_session.query(ResearchHistory).order_by(
351 ResearchHistory.created_at.desc()
352 )
354 # Get total count
355 total = query.count()
357 # Get paginated results
358 research_items = (
359 query.offset((page - 1) * per_page).limit(per_page).all()
360 )
362 # Convert to dictionaries
363 history_data = []
364 for item in research_items:
365 data = {
366 "id": item.id,
367 "query": item.query,
368 "mode": item.mode,
369 "status": item.status,
370 "created_at": item.created_at,
371 "completed_at": item.completed_at,
372 "duration_seconds": item.duration_seconds,
373 "metadata": filter_research_metadata(item.research_meta),
374 "progress": item.progress,
375 "title": item.title,
376 }
378 # Calculate duration if not set
379 if not data["duration_seconds"] and data["created_at"]: 379 ↛ 380line 379 didn't jump to line 380 because the condition on line 379 was never true
380 data["duration_seconds"] = calculate_duration(
381 data["created_at"], data["completed_at"]
382 )
384 history_data.append(data)
386 return jsonify(
387 {
388 "history": history_data,
389 "total": total,
390 "page": page,
391 "per_page": per_page,
392 "total_pages": (total + per_page - 1) // per_page,
393 }
394 )
396 except Exception:
397 logger.exception("Error fetching history")
398 return jsonify(
399 {
400 "status": "error",
401 "message": "An internal error occurred while fetching the history.",
402 }
403 ), 500
404 finally:
405 db_session.close()
408@research_bp.route("/api/research/<string:research_id>")
409@login_required
410def api_get_research(research_id):
411 """Get research details using ORM."""
412 try:
413 with get_user_db_session() as db_session:
414 research = (
415 db_session.query(ResearchHistory)
416 .filter_by(id=research_id)
417 .first()
418 )
420 if not research:
421 return jsonify(
422 {"status": "error", "message": "Research not found"}
423 ), 404
425 # Convert to dictionary
426 data = {
427 "id": research.id,
428 "query": research.query,
429 "mode": research.mode,
430 "status": research.status,
431 "created_at": research.created_at,
432 "completed_at": research.completed_at,
433 "duration_seconds": research.duration_seconds,
434 "metadata": strip_settings_snapshot(research.research_meta),
435 "progress": research.progress,
436 "title": research.title,
437 }
439 # Add logs if available
440 if research_id in active_research:
441 data["logs"] = active_research[research_id].get("log", [])
443 return jsonify(data)
445 except Exception:
446 logger.exception("Error fetching research")
447 return jsonify(
448 {
449 "status": "error",
450 "message": "An internal error occurred while fetching the research.",
451 }
452 ), 500
455# Add more converted routes as needed...