Coverage for src / local_deep_research / web / routes / research_routes.py: 94%
664 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1import io
2import json
3from datetime import datetime, UTC
4from pathlib import Path
6from flask import (
7 Blueprint,
8 jsonify,
9 redirect,
10 request,
11 send_file,
12 session,
13 url_for,
14)
15from loguru import logger
16from ...settings.logger import log_settings
17from sqlalchemy import func
19# Security imports
20from ...config.constants import DEFAULT_OLLAMA_URL
21from ...llm.providers.base import normalize_provider
22from ...constants import ResearchStatus
23from ...security import (
24 FileUploadValidator,
25 UnsafeFilenameError,
26 filter_research_metadata,
27 sanitize_filename,
28 strip_settings_snapshot,
29)
30from ...security.rate_limiter import (
31 upload_rate_limit_ip,
32 upload_rate_limit_user,
33)
34from ...security.decorators import require_json_body
35from ...config.paths import get_config_directory
37# Services imports
38from ..services.pdf_extraction_service import get_pdf_extraction_service
40from ...database.models import (
41 QueuedResearch,
42 ResearchHistory,
43 ResearchLog,
44 UserActiveResearch,
45)
46from ...database.models.library import Document as Document
47from ...database.encrypted_db import db_manager
48from ...database.session_context import get_g_db_session, get_user_db_session
49from ..auth.decorators import login_required
50from ..auth.password_utils import get_user_password
51from ..models.database import calculate_duration
52from ..services.research_service import (
53 export_report_to_memory,
54 run_research_process,
55 start_research_process,
56)
57from ...security.rate_limiter import limiter
58from ..utils.templates import render_template_with_defaults
59from .globals import (
60 append_research_log,
61 get_active_research_ids,
62 get_research_field,
63 is_research_active,
64 set_termination_flag,
65)
67# Create a Blueprint for the research application
68research_bp = Blueprint("research", __name__)
71# NOTE: Routes use session["username"] (not .get()) intentionally.
72# @login_required guarantees the key exists; direct access fails fast
73# if the decorator is ever removed.
76# Add static route at the root level
77@research_bp.route("/redirect-static/<path:path>")
78def redirect_static(path):
79 """Redirect old static URLs to new static URLs"""
80 return redirect(url_for("static", filename=path))
83@research_bp.route("/progress/<string:research_id>")
84@login_required
85def progress_page(research_id):
86 """Render the research progress page"""
87 return render_template_with_defaults("pages/progress.html")
90@research_bp.route("/details/<string:research_id>")
91@login_required
92def research_details_page(research_id):
93 """Render the research details page"""
94 return render_template_with_defaults("pages/details.html")
97@research_bp.route("/results/<string:research_id>")
98@login_required
99def results_page(research_id):
100 """Render the research results page"""
101 return render_template_with_defaults("pages/results.html")
104@research_bp.route("/history")
105@login_required
106def history_page():
107 """Render the history page"""
108 return render_template_with_defaults("pages/history.html")
111# Add missing settings routes
112@research_bp.route("/settings", methods=["GET"])
113@login_required
114def settings_page():
115 """Render the settings page"""
116 return render_template_with_defaults("settings_dashboard.html")
119def _extract_research_params(data, settings_manager):
120 """Extract and resolve research parameters from request data and settings.
122 Returns a dict with keys: model_provider, model, custom_endpoint,
123 ollama_url, search_engine, max_results, time_period, iterations,
124 questions_per_iteration, strategy.
125 """
126 model_provider = data.get("model_provider")
127 if not model_provider:
128 model_provider = settings_manager.get_setting("llm.provider", "ollama")
129 logger.debug(
130 f"No model_provider in request, using database setting: {model_provider}"
131 )
132 else:
133 logger.debug(f"Using model_provider from request: {model_provider}")
134 # Normalize provider to lowercase canonical form
135 model_provider = normalize_provider(model_provider)
137 model = data.get("model")
138 if not model:
139 model = settings_manager.get_setting("llm.model", None)
140 logger.debug(f"No model in request, using database setting: {model}")
141 else:
142 logger.debug(f"Using model from request: {model}")
144 custom_endpoint = data.get("custom_endpoint")
145 if not custom_endpoint and model_provider == "openai_endpoint":
146 custom_endpoint = settings_manager.get_setting(
147 "llm.openai_endpoint.url", None
148 )
149 logger.debug(
150 f"No custom_endpoint in request, using database setting: {custom_endpoint}"
151 )
153 ollama_url = data.get("ollama_url")
154 if not ollama_url and model_provider == "ollama":
155 ollama_url = settings_manager.get_setting(
156 "llm.ollama.url", DEFAULT_OLLAMA_URL
157 )
158 logger.debug(
159 f"No ollama_url in request, using database setting: {ollama_url}"
160 )
162 search_engine = data.get("search_engine") or data.get("search_tool")
163 if not search_engine:
164 search_engine = settings_manager.get_setting("search.tool", "searxng")
166 max_results = data.get("max_results")
167 time_period = data.get("time_period")
169 iterations = data.get("iterations")
170 if iterations is None:
171 iterations = settings_manager.get_setting("search.iterations", 5)
173 questions_per_iteration = data.get("questions_per_iteration")
174 if questions_per_iteration is None:
175 questions_per_iteration = settings_manager.get_setting(
176 "search.questions_per_iteration", 5
177 )
179 strategy = data.get("strategy")
180 if not strategy:
181 strategy = settings_manager.get_setting(
182 "search.search_strategy", "source-based"
183 )
185 return {
186 "model_provider": model_provider,
187 "model": model,
188 "custom_endpoint": custom_endpoint,
189 "ollama_url": ollama_url,
190 "search_engine": search_engine,
191 "max_results": max_results,
192 "time_period": time_period,
193 "iterations": iterations,
194 "questions_per_iteration": questions_per_iteration,
195 "strategy": strategy,
196 }
199def _queue_research(
200 db_session,
201 username,
202 research_id,
203 query,
204 mode,
205 research_settings,
206 params,
207 session_id,
208 reason="",
209 research=None,
210):
211 """Add research to queue and notify processor. Returns a JSON response.
213 Args:
214 reason: Optional prefix explaining why the research was queued
215 (e.g. "due to concurrent limit").
216 research: Optional ResearchHistory object whose status should be set
217 to QUEUED atomically with the queue record insertion.
218 """
219 max_position = (
220 db_session.query(func.max(QueuedResearch.position))
221 .filter_by(username=username)
222 .scalar()
223 or 0
224 )
226 queued_record = QueuedResearch(
227 username=username,
228 research_id=research_id,
229 query=query,
230 mode=mode,
231 settings_snapshot=research_settings,
232 position=max_position + 1,
233 )
234 db_session.add(queued_record)
235 if research is not None:
236 research.status = ResearchStatus.QUEUED # type: ignore[assignment]
237 db_session.commit()
238 logger.info(
239 f"Queued research {research_id} at position {max_position + 1} for user {username}"
240 )
242 from ..queue.processor_v2 import queue_processor
244 queue_processor.notify_research_queued(
245 username,
246 research_id,
247 session_id=session_id,
248 query=query,
249 mode=mode,
250 settings_snapshot=research_settings,
251 model_provider=params["model_provider"],
252 model=params["model"],
253 custom_endpoint=params["custom_endpoint"],
254 search_engine=params["search_engine"],
255 max_results=params["max_results"],
256 time_period=params["time_period"],
257 iterations=params["iterations"],
258 questions_per_iteration=params["questions_per_iteration"],
259 strategy=params["strategy"],
260 )
262 position = max_position + 1
263 reason_text = f" {reason}" if reason else ""
264 message = f"Your research has been queued{reason_text}. Position in queue: {position}"
265 return jsonify(
266 {
267 "status": ResearchStatus.QUEUED,
268 "research_id": research_id,
269 "queue_position": position,
270 "message": message,
271 }
272 )
275@research_bp.route("/api/start_research", methods=["POST"])
276@login_required
277@require_json_body(error_format="status")
278def start_research():
279 data = request.json
280 # Debug logging to trace model parameter
281 logger.debug(f"Request data keys: {list(data.keys())}")
283 # Check if this is a news search
284 metadata = data.get("metadata", {})
285 if metadata.get("is_news_search"): 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 logger.info(
287 f"News search request received: triggered_by={metadata.get('triggered_by', 'unknown')}"
288 )
290 query = data.get("query")
291 mode = data.get("mode", "quick")
293 # Replace date placeholders if they exist
294 if query and "YYYY-MM-DD" in query:
295 # Use local system time
296 current_date = datetime.now(UTC).strftime("%Y-%m-%d")
298 original_query = query
299 query = query.replace("YYYY-MM-DD", current_date)
300 logger.info(
301 f"Replaced date placeholder in query: {original_query[:100]}... -> {query[:100]}..."
302 )
303 logger.info(f"Using date: {current_date}")
305 # Update metadata to track the replacement
306 if not metadata: 306 ↛ 308line 306 didn't jump to line 308 because the condition on line 306 was always true
307 metadata = {}
308 metadata["original_query"] = original_query
309 metadata["processed_query"] = query
310 metadata["date_replaced"] = current_date
311 data["metadata"] = metadata
313 # Get parameters from request or use database settings
314 from ...settings.manager import SettingsManager
316 username = session["username"]
318 with get_user_db_session(username) as db_session:
319 settings_manager = SettingsManager(db_session=db_session)
320 params = _extract_research_params(data, settings_manager)
322 model_provider = params["model_provider"]
323 model = params["model"]
324 custom_endpoint = params["custom_endpoint"]
325 search_engine = params["search_engine"]
326 max_results = params["max_results"]
327 time_period = params["time_period"]
328 iterations = params["iterations"]
329 questions_per_iteration = params["questions_per_iteration"]
330 strategy = params["strategy"]
332 # Debug logging for model parameter specifically
333 logger.debug(
334 f"Extracted model value: '{model}' (type: {type(model).__name__})"
335 )
337 # Log the selections for troubleshooting
338 logger.info(
339 f"Starting research with provider: {model_provider}, model: {model}, search engine: {search_engine}"
340 )
341 logger.info(
342 f"Additional parameters: max_results={max_results}, time_period={time_period}, iterations={iterations}, questions={questions_per_iteration}, strategy={strategy}"
343 )
345 if not query:
346 return jsonify({"status": "error", "message": "Query is required"}), 400
348 # Validate required parameters based on provider
349 if model_provider == "openai_endpoint" and not custom_endpoint:
350 return (
351 jsonify(
352 {
353 "status": "error",
354 "message": "Custom endpoint URL is required for OpenAI endpoint provider",
355 }
356 ),
357 400,
358 )
360 if not model:
361 logger.error(
362 f"No model specified or configured. Provider: {model_provider}"
363 )
364 return jsonify(
365 {
366 "status": "error",
367 "message": "Model is required. Please configure a model in the settings.",
368 }
369 ), 400
371 # Check if the user has too many active researches
372 username = session["username"]
374 # Get max concurrent researches from settings
375 from ...settings import SettingsManager
377 with get_user_db_session() as db_session:
378 settings_manager = SettingsManager(db_session)
379 max_concurrent_researches = settings_manager.get_setting(
380 "app.max_concurrent_researches", 3
381 )
383 # Use existing session from g to check active researches
384 try:
385 db_session = get_g_db_session()
386 if db_session:
387 # First, clean up stale entries where the research thread has died
388 # (e.g. crashed with an unhandled exception before cleanup ran).
389 # Without this, dead researches permanently block the queue.
390 from ..routes.globals import (
391 is_research_thread_alive,
392 cleanup_research,
393 )
395 stale_rows = (
396 db_session.query(UserActiveResearch)
397 .filter_by(username=username, status=ResearchStatus.IN_PROGRESS)
398 .all()
399 )
400 for row in stale_rows:
401 if not is_research_thread_alive(row.research_id):
402 logger.warning(
403 f"Cleaning up stale research {row.research_id} "
404 f"(thread dead, started {row.started_at})"
405 )
406 row.status = ResearchStatus.FAILED
407 cleanup_research(row.research_id)
408 if any(
409 not is_research_thread_alive(r.research_id) for r in stale_rows
410 ):
411 db_session.commit()
413 # Now count truly active researches
414 active_count = (
415 db_session.query(UserActiveResearch)
416 .filter_by(username=username, status=ResearchStatus.IN_PROGRESS)
417 .count()
418 )
420 # Debug logging
421 logger.info(
422 f"Active research count for {username}: {active_count}/{max_concurrent_researches}"
423 )
425 should_queue = active_count >= max_concurrent_researches
426 logger.info(f"Should queue new research: {should_queue}")
427 else:
428 logger.warning(
429 "No database session available to check active researches"
430 )
431 should_queue = False
432 except Exception:
433 logger.exception("Failed to check active researches")
434 # Default to not queueing if we can't check
435 should_queue = False
437 # For non-queued research, verify password is available BEFORE creating DB records
438 # (queued research gets password later via queue processor)
439 user_password = None
440 if not should_queue:
441 user_password = get_user_password(username)
443 if not user_password:
444 if db_manager.has_encryption:
445 logger.error(
446 f"No password available for user {username} with encrypted database - "
447 "cannot start research (session password expired or lost after server restart)"
448 )
449 # Use status/message keys to match the research API convention
450 # (the research frontend checks data.status and data.message)
451 return jsonify(
452 {
453 "status": "error",
454 "message": "Your session has expired. Please log out and log back in to start research.",
455 }
456 ), 401
457 logger.warning(
458 f"No password available for metrics access for user {username}"
459 )
461 # Create a record in the database with explicit UTC timestamp
462 import uuid
463 import threading
465 created_at = datetime.now(UTC).isoformat()
466 research_id = str(uuid.uuid4())
468 # Create organized research metadata with settings snapshot
469 research_settings = {
470 # Direct submission parameters
471 "submission": {
472 "model_provider": model_provider,
473 "model": model,
474 "custom_endpoint": custom_endpoint,
475 "search_engine": search_engine,
476 "max_results": max_results,
477 "time_period": time_period,
478 "iterations": iterations,
479 "questions_per_iteration": questions_per_iteration,
480 "strategy": strategy,
481 },
482 # System information
483 "system": {
484 "timestamp": created_at,
485 "user": username,
486 "version": "1.0", # Track metadata version for future migrations
487 "server_url": request.host_url, # Add server URL for link generation
488 },
489 }
491 # Add any additional metadata from request
492 additional_metadata = data.get("metadata", {})
493 if additional_metadata:
494 research_settings.update(additional_metadata)
495 # Get complete settings snapshot for this research
496 try:
497 from local_deep_research.settings import SettingsManager
499 # Get or lazily create a session for settings snapshot
500 db_session_for_settings = get_g_db_session()
501 if db_session_for_settings:
502 # Create SettingsManager with the existing session
503 username = session["username"]
504 # Ensure any pending changes are committed
505 try:
506 db_session_for_settings.commit()
507 except Exception:
508 db_session_for_settings.rollback()
509 settings_manager = SettingsManager(
510 db_session_for_settings, owns_session=False
511 )
512 # Get all current settings as a snapshot (bypass cache to ensure fresh data)
513 all_settings = settings_manager.get_all_settings(bypass_cache=True)
515 # Add settings snapshot to metadata
516 research_settings["settings_snapshot"] = all_settings
517 logger.info(
518 f"Captured {len(all_settings)} settings for research {research_id}"
519 )
520 else:
521 # If no session in g, create a new one temporarily to get settings
522 logger.warning(
523 "No database session in g, creating temporary session for settings snapshot"
524 )
525 from ...database.thread_local_session import get_metrics_session
527 password = get_user_password(username)
529 if password:
530 temp_session = get_metrics_session(username, password)
531 if temp_session:
532 username = session["username"]
533 settings_manager = SettingsManager(
534 temp_session, owns_session=False
535 )
536 all_settings = settings_manager.get_all_settings(
537 bypass_cache=True
538 )
539 research_settings["settings_snapshot"] = all_settings
540 logger.info(
541 f"Captured {len(all_settings)} settings using temporary session for research {research_id}"
542 )
543 else:
544 logger.error(
545 "Failed to create temporary session for settings snapshot"
546 )
547 return jsonify(
548 {
549 "status": "error",
550 "message": "Cannot create research without settings snapshot.",
551 }
552 ), 500
553 else:
554 logger.error(
555 "No password available to create session for settings snapshot"
556 )
557 return jsonify(
558 {
559 "status": "error",
560 "message": "Cannot create research without settings snapshot.",
561 }
562 ), 500
563 except Exception:
564 logger.exception("Failed to capture settings snapshot")
565 # Cannot continue without settings snapshot for thread-based research
566 return jsonify(
567 {
568 "status": "error",
569 "message": "Failed to capture settings for research. Please try again.",
570 }
571 ), 500
573 # Use existing session from g
574 username = session["username"]
576 try:
577 # Get or lazily create a session
578 db_session = get_g_db_session()
579 if db_session:
580 # Determine initial status based on whether we need to queue
581 initial_status = (
582 ResearchStatus.QUEUED
583 if should_queue
584 else ResearchStatus.IN_PROGRESS
585 )
587 research = ResearchHistory(
588 id=research_id, # Set UUID as primary key
589 query=query,
590 mode=mode,
591 status=initial_status,
592 created_at=created_at,
593 progress_log=[{"time": created_at, "progress": 0}],
594 research_meta=research_settings,
595 )
596 db_session.add(research)
597 db_session.commit()
598 logger.info(
599 f"Created research entry with UUID: {research_id}, status: {initial_status}"
600 )
602 if should_queue:
603 session_id = session.get("session_id")
604 return _queue_research(
605 db_session,
606 username,
607 research_id,
608 query,
609 mode,
610 research_settings,
611 params,
612 session_id,
613 )
614 # Start immediately
615 # Create active research tracking record
616 import threading
618 active_record = UserActiveResearch(
619 username=username,
620 research_id=research_id,
621 status=ResearchStatus.IN_PROGRESS,
622 thread_id=str(threading.current_thread().ident),
623 settings_snapshot=research_settings,
624 )
625 db_session.add(active_record)
626 db_session.commit()
627 logger.info(f"Created active research record for user {username}")
629 # Double-check the count after committing to handle race conditions
630 # Use the existing session for the recheck
631 try:
632 # Use the same session we already have
633 recheck_session = db_session
634 final_count = (
635 recheck_session.query(UserActiveResearch)
636 .filter_by(
637 username=username, status=ResearchStatus.IN_PROGRESS
638 )
639 .count()
640 )
641 logger.info(
642 f"Final active count after commit: {final_count}/{max_concurrent_researches}"
643 )
645 if final_count > max_concurrent_researches:
646 # We exceeded the limit due to a race condition
647 # Remove this record and queue instead
648 logger.warning(
649 f"Race condition detected: {final_count} > {max_concurrent_researches}, moving to queue"
650 )
651 db_session.delete(active_record)
652 db_session.commit()
654 session_id = session.get("session_id")
655 return _queue_research(
656 db_session,
657 username,
658 research_id,
659 query,
660 mode,
661 research_settings,
662 params,
663 session_id,
664 reason="due to concurrent limit",
665 research=research,
666 )
667 except Exception:
668 logger.warning("Could not recheck active count")
670 except Exception:
671 logger.exception("Failed to create research entry")
672 return jsonify(
673 {"status": "error", "message": "Failed to create research entry"}
674 ), 500
676 # Only start the research if not queued
677 if not should_queue: 677 ↛ 730line 677 didn't jump to line 730 because the condition on line 677 was always true
678 # Save the research strategy to the database before starting the thread
679 try:
680 from ..services.research_service import save_research_strategy
682 save_research_strategy(research_id, strategy, username=username)
683 except Exception:
684 logger.warning("Could not save research strategy")
686 # Debug logging for settings snapshot
687 snapshot_data = research_settings.get("settings_snapshot", {})
688 log_settings(snapshot_data, "Settings snapshot being passed to thread")
689 if "search.tool" in snapshot_data:
690 logger.debug(
691 f"search.tool in snapshot: {snapshot_data['search.tool']}"
692 )
693 else:
694 logger.debug("search.tool NOT in snapshot")
696 # Start the research process with the selected parameters
697 research_thread = start_research_process(
698 research_id,
699 query,
700 mode,
701 run_research_process,
702 username=username, # Pass username to the thread
703 user_password=user_password, # Pass password for database access
704 model_provider=model_provider,
705 model=model,
706 custom_endpoint=custom_endpoint,
707 search_engine=search_engine,
708 max_results=max_results,
709 time_period=time_period,
710 iterations=iterations,
711 questions_per_iteration=questions_per_iteration,
712 strategy=strategy,
713 settings_snapshot=snapshot_data, # Pass complete settings
714 )
716 # Update the active research record with the actual thread ID
717 try:
718 with get_user_db_session(username) as thread_session:
719 active_record = (
720 thread_session.query(UserActiveResearch)
721 .filter_by(username=username, research_id=research_id)
722 .first()
723 )
724 if active_record: 724 ↛ 730line 724 didn't jump to line 730
725 active_record.thread_id = str(research_thread.ident)
726 thread_session.commit()
727 except Exception:
728 logger.warning("Could not update thread ID")
730 return jsonify({"status": "success", "research_id": research_id})
733@research_bp.route("/api/terminate/<string:research_id>", methods=["POST"])
734@login_required
735def terminate_research(research_id):
736 """Terminate an in-progress research process"""
737 username = session["username"]
739 # Check if the research exists and is in progress
740 try:
741 with get_user_db_session(username) as db_session:
742 research = (
743 db_session.query(ResearchHistory)
744 .filter_by(id=research_id)
745 .first()
746 )
748 if not research:
749 return jsonify(
750 {"status": "error", "message": "Research not found"}
751 ), 404
753 status = research.status
755 # If it's already in a terminal state, return success
756 if status in (
757 ResearchStatus.COMPLETED,
758 ResearchStatus.SUSPENDED,
759 ResearchStatus.FAILED,
760 ResearchStatus.ERROR,
761 ):
762 return jsonify(
763 {
764 "status": "success",
765 "message": f"Research already {status}",
766 }
767 )
769 # Check if it's in the active_research dict
770 if not is_research_active(research_id):
771 # Update the status in the database
772 research.status = ResearchStatus.SUSPENDED
773 db_session.commit()
774 return jsonify(
775 {"status": "success", "message": "Research terminated"}
776 )
778 # Set the termination flag
779 set_termination_flag(research_id)
781 # Log the termination request - using UTC timestamp
782 timestamp = datetime.now(UTC).isoformat()
783 termination_message = "Research termination requested by user"
784 current_progress = get_research_field(research_id, "progress", 0)
786 # Create log entry
787 log_entry = {
788 "time": timestamp,
789 "message": termination_message,
790 "progress": current_progress,
791 "metadata": {"phase": "termination"},
792 }
794 # Add to in-memory log
795 append_research_log(research_id, log_entry)
797 # Add to database log
798 logger.log("MILESTONE", f"Research ended: {termination_message}")
800 # Update the log in the database
801 if research.progress_log: 801 ↛ 810line 801 didn't jump to line 810 because the condition on line 801 was always true
802 try:
803 if isinstance(research.progress_log, str):
804 current_log = json.loads(research.progress_log)
805 else:
806 current_log = research.progress_log
807 except Exception:
808 current_log = []
809 else:
810 current_log = []
812 current_log.append(log_entry)
813 research.progress_log = current_log
814 research.status = ResearchStatus.SUSPENDED
815 db_session.commit()
817 # Emit a socket event for the termination request
818 try:
819 event_data = {
820 "status": ResearchStatus.SUSPENDED,
821 "message": "Research was suspended by user request",
822 }
824 from ..services.socket_service import SocketIOService
826 SocketIOService().emit_to_subscribers(
827 "progress", research_id, event_data
828 )
830 except Exception:
831 logger.exception("Socket emit error (non-critical)")
833 return jsonify(
834 {
835 "status": "success",
836 "message": "Research termination requested",
837 }
838 )
839 except Exception:
840 logger.exception("Error terminating research")
841 return jsonify(
842 {"status": "error", "message": "Failed to terminate research"}
843 ), 500
846@research_bp.route("/api/delete/<string:research_id>", methods=["DELETE"])
847@login_required
848def delete_research(research_id):
849 """Delete a research record"""
850 username = session["username"]
852 try:
853 with get_user_db_session(username) as db_session:
854 research = (
855 db_session.query(ResearchHistory)
856 .filter_by(id=research_id)
857 .first()
858 )
860 if not research:
861 return jsonify(
862 {"status": "error", "message": "Research not found"}
863 ), 404
865 status = research.status
866 report_path = research.report_path
868 # Don't allow deleting research in progress
869 if status == ResearchStatus.IN_PROGRESS and is_research_active(
870 research_id
871 ):
872 return (
873 jsonify(
874 {
875 "status": "error",
876 "message": "Cannot delete research that is in progress",
877 }
878 ),
879 400,
880 )
882 # Delete report file if it exists
883 if report_path and Path(report_path).exists(): 883 ↛ 884line 883 didn't jump to line 884 because the condition on line 883 was never true
884 try:
885 Path(report_path).unlink()
886 except Exception:
887 logger.exception("Error removing report file")
889 # Delete the database record
890 db_session.delete(research)
891 db_session.commit()
893 return jsonify({"status": "success"})
894 except Exception:
895 logger.exception("Error deleting research")
896 return jsonify(
897 {"status": "error", "message": "Failed to delete research"}
898 ), 500
901@research_bp.route("/api/clear_history", methods=["POST"])
902@login_required
903def clear_history():
904 """Clear all research history"""
905 username = session["username"]
907 try:
908 with get_user_db_session(username) as db_session:
909 # Get all research records first to clean up files
910 research_records = db_session.query(ResearchHistory).all()
912 # Get IDs of currently active research (snapshot)
913 active_ids = get_active_research_ids()
915 # Clean up report files
916 for research in research_records:
917 # Skip active research
918 if research.id in active_ids: 918 ↛ 919line 918 didn't jump to line 919 because the condition on line 918 was never true
919 continue
921 # Delete report file if it exists
922 if research.report_path and Path(research.report_path).exists(): 922 ↛ 923line 922 didn't jump to line 923 because the condition on line 922 was never true
923 try:
924 Path(research.report_path).unlink()
925 except Exception:
926 logger.exception("Error removing report file")
928 # Delete records from the database, except active research
929 if active_ids:
930 db_session.query(ResearchHistory).filter(
931 ~ResearchHistory.id.in_(active_ids)
932 ).delete(synchronize_session=False)
933 else:
934 db_session.query(ResearchHistory).delete(
935 synchronize_session=False
936 )
938 db_session.commit()
940 return jsonify({"status": "success"})
941 except Exception:
942 logger.exception("Error clearing history")
943 return jsonify(
944 {"status": "error", "message": "Failed to process request"}
945 ), 500
948@research_bp.route("/open_file_location", methods=["POST"])
949@login_required
950def open_file_location():
951 """Open a file location in the system file explorer.
953 Security: This endpoint is disabled for server deployments.
954 It only makes sense for desktop usage where the server and client are on the same machine.
955 """
956 return jsonify(
957 {
958 "status": "error",
959 "message": "This feature is disabled. It is only available in desktop mode.",
960 }
961 ), 403
964@research_bp.route("/api/save_raw_config", methods=["POST"])
965@login_required
966@require_json_body(error_format="success")
967def save_raw_config():
968 """Save raw configuration"""
969 data = request.json
970 raw_config = data.get("raw_config")
972 if not raw_config:
973 return (
974 jsonify(
975 {"success": False, "error": "Raw configuration is required"}
976 ),
977 400,
978 )
980 # Security: Parse and validate the TOML to block dangerous keys
981 try:
982 import tomllib
983 except ImportError:
984 import tomli as tomllib # type: ignore[no-redef]
986 try:
987 parsed_config = tomllib.loads(raw_config)
988 except Exception:
989 logger.warning("Invalid TOML configuration")
990 # Don't expose internal exception details to users (CWE-209)
991 return jsonify(
992 {
993 "success": False,
994 "error": "Invalid TOML syntax. Please check your configuration format.",
995 }
996 ), 400
998 # Security: Check for dangerous keys that could enable code execution
999 # These patterns match keys used for dynamic module imports
1000 BLOCKED_KEY_PATTERNS = ["module_path", "class_name", "module", "class"]
1002 def find_blocked_keys(obj, path=""):
1003 """Recursively find any blocked keys in the config."""
1004 blocked = []
1005 if isinstance(obj, dict):
1006 for key, value in obj.items():
1007 current_path = f"{path}.{key}" if path else key
1008 key_lower = key.lower()
1009 for pattern in BLOCKED_KEY_PATTERNS:
1010 if pattern in key_lower:
1011 blocked.append(current_path)
1012 break
1013 # Recurse into nested dicts
1014 blocked.extend(find_blocked_keys(value, current_path))
1015 elif isinstance(obj, list): 1015 ↛ 1016line 1015 didn't jump to line 1016 because the condition on line 1015 was never true
1016 for i, item in enumerate(obj):
1017 blocked.extend(find_blocked_keys(item, f"{path}[{i}]"))
1018 return blocked
1020 blocked_keys = find_blocked_keys(parsed_config)
1021 if blocked_keys:
1022 logger.warning(
1023 f"Security: Blocked attempt to write config with dangerous keys: {blocked_keys}"
1024 )
1025 return jsonify(
1026 {
1027 "success": False,
1028 "error": "Configuration contains protected keys that cannot be modified",
1029 "blocked_keys": blocked_keys,
1030 }
1031 ), 403
1033 try:
1034 from ...security.file_write_verifier import write_file_verified
1036 # Get the config file path (uses centralized path config, respects LDR_DATA_DIR)
1037 config_dir = get_config_directory()
1038 config_path = config_dir / "config.toml"
1040 # Write the configuration to file
1041 write_file_verified(
1042 config_path,
1043 raw_config,
1044 "system.allow_config_write",
1045 context="system configuration file",
1046 )
1048 return jsonify({"success": True})
1049 except Exception:
1050 logger.exception("Error saving configuration file")
1051 return jsonify(
1052 {"success": False, "error": "Failed to process request"}
1053 ), 500
1056@research_bp.route("/api/history", methods=["GET"])
1057@login_required
1058def get_history():
1059 """Get research history"""
1060 username = session["username"]
1062 try:
1063 with get_user_db_session(username) as db_session:
1064 # Query all research history ordered by created_at
1065 research_records = (
1066 db_session.query(ResearchHistory)
1067 .order_by(ResearchHistory.created_at.desc())
1068 .all()
1069 )
1071 # Build history items while session is active to avoid
1072 # DetachedInstanceError on ORM attribute access
1073 history_items = []
1074 for research in research_records:
1075 # Calculate duration if completed
1076 duration_seconds = None
1077 if research.completed_at and research.created_at:
1078 try:
1079 duration_seconds = calculate_duration(
1080 research.created_at, research.completed_at
1081 )
1082 except Exception:
1083 logger.exception("Error calculating duration")
1085 # Count documents in the library for this research
1086 doc_count = (
1087 db_session.query(Document)
1088 .filter_by(research_id=research.id)
1089 .count()
1090 )
1092 # Create a history item
1093 item = {
1094 "id": research.id,
1095 "query": research.query,
1096 "mode": research.mode,
1097 "status": research.status,
1098 "created_at": research.created_at,
1099 "completed_at": research.completed_at,
1100 "duration_seconds": duration_seconds,
1101 "metadata": filter_research_metadata(
1102 research.research_meta
1103 ),
1104 "document_count": doc_count,
1105 }
1107 # Add title if it exists
1108 if hasattr(research, "title") and research.title is not None:
1109 item["title"] = research.title
1111 history_items.append(item)
1113 return jsonify({"status": "success", "items": history_items})
1114 except Exception:
1115 logger.exception("Error getting history")
1116 return jsonify(
1117 {"status": "error", "message": "Failed to process request"}
1118 ), 500
1121@research_bp.route("/api/research/<string:research_id>")
1122@login_required
1123def get_research_details(research_id):
1124 """Get full details of a research using ORM"""
1125 username = session["username"]
1127 try:
1128 with get_user_db_session(username) as db_session:
1129 research = (
1130 db_session.query(ResearchHistory)
1131 .filter(ResearchHistory.id == research_id)
1132 .first()
1133 )
1135 if not research:
1136 return jsonify({"error": "Research not found"}), 404
1138 return jsonify(
1139 {
1140 "id": research.id,
1141 "query": research.query,
1142 "status": research.status,
1143 "progress": research.progress,
1144 "progress_percentage": research.progress or 0,
1145 "mode": research.mode,
1146 "created_at": research.created_at,
1147 "completed_at": research.completed_at,
1148 "report_path": research.report_path,
1149 "metadata": strip_settings_snapshot(research.research_meta),
1150 }
1151 )
1152 except Exception:
1153 logger.exception("Error getting research details")
1154 return jsonify({"error": "An internal error has occurred"}), 500
1157@research_bp.route("/api/research/<string:research_id>/logs")
1158@login_required
1159def get_research_logs(research_id):
1160 """Get logs for a specific research"""
1161 username = session["username"]
1163 try:
1164 # First check if the research exists
1165 with get_user_db_session(username) as db_session:
1166 research = (
1167 db_session.query(ResearchHistory)
1168 .filter_by(id=research_id)
1169 .first()
1170 )
1171 if not research:
1172 return jsonify({"error": "Research not found"}), 404
1174 # Get logs from research_logs table
1175 log_results = (
1176 db_session.query(ResearchLog)
1177 .filter_by(research_id=research_id)
1178 .order_by(ResearchLog.timestamp)
1179 .all()
1180 )
1182 # Extract log attributes while session is active
1183 # to avoid DetachedInstanceError on ORM attribute access
1184 logs = []
1185 for row in log_results:
1186 logs.append(
1187 {
1188 "id": row.id,
1189 "message": row.message,
1190 "timestamp": row.timestamp,
1191 "log_type": row.level,
1192 }
1193 )
1195 return jsonify(logs)
1197 except Exception:
1198 logger.exception("Error getting research logs")
1199 return jsonify({"error": "An internal error has occurred"}), 500
1202@research_bp.route("/api/report/<string:research_id>")
1203@login_required
1204def get_research_report(research_id):
1205 """Get the research report content"""
1206 username = session["username"]
1208 try:
1209 with get_user_db_session(username) as db_session:
1210 # Query using ORM
1211 research = (
1212 db_session.query(ResearchHistory)
1213 .filter_by(id=research_id)
1214 .first()
1215 )
1217 if research is None:
1218 return jsonify({"error": "Research not found"}), 404
1220 # Parse metadata if it exists
1221 metadata = research.research_meta
1223 # Get report content using storage abstraction
1224 from ...storage import get_report_storage
1226 # Get settings snapshot for this thread
1227 settings_snapshot = (
1228 metadata.get("settings_snapshot") if metadata else None
1229 )
1231 # Pass settings_snapshot to avoid thread context issues
1232 storage = get_report_storage(
1233 session=db_session, settings_snapshot=settings_snapshot
1234 )
1235 content = storage.get_report(research_id, username)
1237 if content is None:
1238 return jsonify({"error": "Report not found"}), 404
1240 # Return the report data with backwards-compatible fields
1241 # Examples expect 'summary', 'sources', 'findings' at top level
1242 safe_metadata = strip_settings_snapshot(metadata)
1243 return jsonify(
1244 {
1245 "content": content,
1246 # Backwards-compatible fields for examples
1247 "summary": content, # The markdown report is the summary
1248 "sources": safe_metadata.get("all_links_of_system", []),
1249 "findings": safe_metadata.get("findings", []),
1250 "metadata": {
1251 "title": research.title if research.title else None,
1252 "query": research.query,
1253 "mode": research.mode if research.mode else None,
1254 "created_at": research.created_at
1255 if research.created_at
1256 else None,
1257 "completed_at": research.completed_at
1258 if research.completed_at
1259 else None,
1260 "report_path": research.report_path,
1261 **safe_metadata,
1262 },
1263 }
1264 )
1266 except Exception:
1267 logger.exception("Error getting research report")
1268 return jsonify({"error": "An internal error has occurred"}), 500
1271@research_bp.route(
1272 "/api/v1/research/<research_id>/export/<format>", methods=["POST"]
1273)
1274@login_required
1275def export_research_report(research_id, format):
1276 """Export research report to different formats (LaTeX, Quarto, RIS, PDF, ODT, etc.)"""
1277 try:
1278 # Use the exporter registry to validate format
1279 from ...exporters import ExporterRegistry
1281 if not ExporterRegistry.is_format_supported(format):
1282 available = ExporterRegistry.get_available_formats()
1283 return jsonify(
1284 {
1285 "error": f"Invalid format. Available formats: {', '.join(available)}"
1286 }
1287 ), 400
1289 # Get research from database
1290 username = session["username"]
1292 try:
1293 with get_user_db_session(username) as db_session:
1294 research = (
1295 db_session.query(ResearchHistory)
1296 .filter_by(id=research_id)
1297 .first()
1298 )
1299 if not research:
1300 return jsonify({"error": "Research not found"}), 404
1302 # Get report using storage abstraction
1303 from ...storage import get_report_storage
1305 # Get metadata for settings snapshot
1306 metadata = (
1307 research.research_meta if research.research_meta else {}
1308 )
1309 settings_snapshot = (
1310 metadata.get("settings_snapshot") if metadata else None
1311 )
1313 storage = get_report_storage(
1314 session=db_session, settings_snapshot=settings_snapshot
1315 )
1317 # Get report content directly (in memory)
1318 report_content = storage.get_report(research_id, username)
1319 if not report_content:
1320 return jsonify({"error": "Report content not found"}), 404
1322 # Export to requested format (all in memory)
1323 try:
1324 # Use title or query for the PDF title
1325 pdf_title = research.title or research.query
1327 # Generate export content in memory
1328 export_content, filename, mimetype = (
1329 export_report_to_memory(
1330 report_content, format, title=pdf_title
1331 )
1332 )
1334 # Send the file directly from memory
1335 return send_file(
1336 io.BytesIO(export_content),
1337 as_attachment=True,
1338 download_name=filename,
1339 mimetype=mimetype,
1340 )
1341 except Exception:
1342 logger.exception("Error exporting report")
1343 return jsonify(
1344 {
1345 "error": f"Failed to export to {format}. Please try again later."
1346 }
1347 ), 500
1349 except Exception:
1350 logger.exception("Error in export endpoint")
1351 return jsonify({"error": "An internal error has occurred"}), 500
1353 except Exception:
1354 logger.exception("Unexpected error in export endpoint")
1355 return jsonify({"error": "An internal error has occurred"}), 500
1358@research_bp.route("/api/research/<string:research_id>/status")
1359@limiter.exempt
1360@login_required
1361def get_research_status(research_id):
1362 """Get the status of a research process"""
1363 username = session["username"]
1365 try:
1366 with get_user_db_session(username) as db_session:
1367 research = (
1368 db_session.query(ResearchHistory)
1369 .filter_by(id=research_id)
1370 .first()
1371 )
1373 if research is None:
1374 return jsonify({"error": "Research not found"}), 404
1376 status = research.status
1377 progress = research.progress
1378 completed_at = research.completed_at
1379 report_path = research.report_path
1380 metadata = research.research_meta or {}
1382 # Extract and format error information for better UI display
1383 error_info = {}
1384 if metadata and "error" in metadata:
1385 error_msg = metadata["error"]
1386 error_type = "unknown"
1388 # Detect specific error types
1389 if "timeout" in error_msg.lower():
1390 error_type = "timeout"
1391 error_info = {
1392 "type": "timeout",
1393 "message": "LLM service timed out during synthesis. This may be due to high server load or connectivity issues.",
1394 "suggestion": "Try again later or use a smaller query scope.",
1395 }
1396 elif (
1397 "token limit" in error_msg.lower()
1398 or "context length" in error_msg.lower()
1399 ):
1400 error_type = "token_limit"
1401 error_info = {
1402 "type": "token_limit",
1403 "message": "The research query exceeded the AI model's token limit during synthesis.",
1404 "suggestion": "Try using a more specific query or reduce the research scope.",
1405 }
1406 elif (
1407 "final answer synthesis fail" in error_msg.lower()
1408 or "llm error" in error_msg.lower()
1409 ):
1410 error_type = "llm_error"
1411 error_info = {
1412 "type": "llm_error",
1413 "message": "The AI model encountered an error during final answer synthesis.",
1414 "suggestion": "Check that your LLM service is running correctly or try a different model.",
1415 }
1416 elif "ollama" in error_msg.lower():
1417 error_type = "ollama_error"
1418 error_info = {
1419 "type": "ollama_error",
1420 "message": "The Ollama service is not responding properly.",
1421 "suggestion": "Make sure Ollama is running with 'ollama serve' and the model is downloaded.",
1422 }
1423 elif "connection" in error_msg.lower():
1424 error_type = "connection"
1425 error_info = {
1426 "type": "connection",
1427 "message": "Connection error with the AI service.",
1428 "suggestion": "Check your internet connection and AI service status.",
1429 }
1430 elif metadata.get("solution"):
1431 # Use the solution provided in metadata if available
1432 error_info = {
1433 "type": error_type,
1434 "message": error_msg,
1435 "suggestion": str(metadata.get("solution")),
1436 }
1437 else:
1438 # Generic error with the original message
1439 error_info = {
1440 "type": error_type,
1441 "message": error_msg,
1442 "suggestion": "Try again with a different query or check the application logs.",
1443 }
1445 # Get the latest milestone log for this research
1446 latest_milestone = None
1447 try:
1448 milestone_log = (
1449 db_session.query(ResearchLog)
1450 .filter_by(research_id=research_id, level="MILESTONE")
1451 .order_by(ResearchLog.timestamp.desc())
1452 .first()
1453 )
1454 if milestone_log:
1455 latest_milestone = {
1456 "message": milestone_log.message,
1457 "time": milestone_log.timestamp.isoformat()
1458 if milestone_log.timestamp
1459 else None,
1460 "type": "MILESTONE",
1461 }
1462 logger.debug(
1463 f"Found latest milestone for research {research_id}: {milestone_log.message}"
1464 )
1465 else:
1466 logger.debug(
1467 f"No milestone logs found for research {research_id}"
1468 )
1469 except Exception:
1470 logger.warning("Error fetching latest milestone")
1472 filtered_metadata = strip_settings_snapshot(metadata)
1473 if error_info:
1474 filtered_metadata["error_info"] = error_info
1476 response_data = {
1477 "status": status,
1478 "progress": progress,
1479 "completed_at": completed_at,
1480 "report_path": report_path,
1481 "metadata": filtered_metadata,
1482 }
1484 # Include latest milestone as a log_entry for frontend compatibility
1485 if latest_milestone:
1486 response_data["log_entry"] = latest_milestone
1488 return jsonify(response_data)
1489 except Exception:
1490 logger.exception("Error getting research status")
1491 return jsonify({"error": "Error checking research status"}), 500
1494@research_bp.route("/api/queue/status", methods=["GET"])
1495@login_required
1496def get_queue_status():
1497 """Get the current queue status for the user"""
1498 username = session["username"]
1500 from ..queue import QueueManager
1502 try:
1503 queue_items = QueueManager.get_user_queue(username)
1505 return jsonify(
1506 {
1507 "status": "success",
1508 "queue": queue_items,
1509 "total": len(queue_items),
1510 }
1511 )
1512 except Exception:
1513 logger.exception("Error getting queue status")
1514 return jsonify(
1515 {"status": "error", "message": "Failed to process request"}
1516 ), 500
1519@research_bp.route("/api/queue/<string:research_id>/position", methods=["GET"])
1520@login_required
1521def get_queue_position(research_id):
1522 """Get the queue position for a specific research"""
1523 username = session["username"]
1525 from ..queue import QueueManager
1527 try:
1528 position = QueueManager.get_queue_position(username, research_id)
1530 if position is None: 1530 ↛ 1531line 1530 didn't jump to line 1531 because the condition on line 1530 was never true
1531 return jsonify(
1532 {"status": "error", "message": "Research not found in queue"}
1533 ), 404
1535 return jsonify({"status": "success", "position": position})
1536 except Exception:
1537 logger.exception("Error getting queue position")
1538 return jsonify(
1539 {"status": "error", "message": "Failed to process request"}
1540 ), 500
1543@research_bp.route("/api/config/limits", methods=["GET"])
1544def get_upload_limits():
1545 """
1546 Get file upload configuration limits.
1548 Returns the backend's authoritative limits for file uploads,
1549 allowing the frontend to stay in sync without hardcoding values.
1550 """
1551 return jsonify(
1552 {
1553 "max_file_size": FileUploadValidator.MAX_FILE_SIZE,
1554 "max_files": FileUploadValidator.MAX_FILES_PER_REQUEST,
1555 "allowed_mime_types": list(FileUploadValidator.ALLOWED_MIME_TYPES),
1556 }
1557 )
1560@research_bp.route("/api/upload/pdf", methods=["POST"])
1561@login_required
1562@upload_rate_limit_user
1563@upload_rate_limit_ip
1564def upload_pdf():
1565 """
1566 Upload and extract text from PDF files with comprehensive security validation.
1568 Security features:
1569 - Rate limiting (10 uploads/min, 100/hour per user)
1570 - File size validation (50MB max per file)
1571 - File count validation (100 files max)
1572 - PDF structure validation
1573 - MIME type validation
1575 Performance improvements:
1576 - Single-pass PDF processing (text + metadata)
1577 - Optimized extraction service
1578 """
1579 try:
1580 # Early request size validation (before reading any files)
1581 # This prevents memory exhaustion from chunked encoding attacks
1582 max_request_size = (
1583 FileUploadValidator.MAX_FILES_PER_REQUEST
1584 * FileUploadValidator.MAX_FILE_SIZE
1585 )
1586 if request.content_length and request.content_length > max_request_size: 1586 ↛ 1587line 1586 didn't jump to line 1587 because the condition on line 1586 was never true
1587 return jsonify(
1588 {
1589 "error": f"Request too large. Maximum size is {max_request_size // (1024 * 1024)}MB"
1590 }
1591 ), 413
1593 # Check if files are present in the request
1594 if "files" not in request.files:
1595 return jsonify({"error": "No files provided"}), 400
1597 files = request.files.getlist("files")
1598 if not files or files[0].filename == "":
1599 return jsonify({"error": "No files selected"}), 400
1601 # Validate file count
1602 is_valid, error_msg = FileUploadValidator.validate_file_count(
1603 len(files)
1604 )
1605 if not is_valid:
1606 return jsonify({"error": error_msg}), 400
1608 # Get PDF extraction service
1609 pdf_service = get_pdf_extraction_service()
1611 extracted_texts = []
1612 total_files = len(files)
1613 processed_files = 0
1614 errors = []
1616 for file in files:
1617 if not file or not file.filename: 1617 ↛ 1618line 1617 didn't jump to line 1618 because the condition on line 1617 was never true
1618 errors.append("Unnamed file: Skipped")
1619 continue
1621 try:
1622 filename = sanitize_filename(
1623 file.filename, allowed_extensions={".pdf"}
1624 )
1625 except UnsafeFilenameError:
1626 errors.append("Rejected file: invalid or disallowed filename")
1627 continue
1629 try:
1630 # Read file content (with disk spooling, large files are read from temp file)
1631 pdf_content = file.read()
1633 # Comprehensive validation
1634 is_valid, error_msg = FileUploadValidator.validate_upload(
1635 filename=filename,
1636 file_content=pdf_content,
1637 content_length=file.content_length,
1638 )
1640 if not is_valid:
1641 errors.append(f"{filename}: {error_msg}")
1642 continue
1644 # Extract text and metadata in single pass (performance fix)
1645 result = pdf_service.extract_text_and_metadata(
1646 pdf_content, filename
1647 )
1649 if result["success"]:
1650 extracted_texts.append(
1651 {
1652 "filename": result["filename"],
1653 "text": result["text"],
1654 "size": result["size"],
1655 "pages": result["pages"],
1656 }
1657 )
1658 processed_files += 1
1659 else:
1660 errors.append(f"{filename}: {result['error']}")
1662 except Exception:
1663 logger.exception(f"Error processing {filename}")
1664 errors.append(f"{filename}: Error processing file")
1665 finally:
1666 # Close the file stream to release resources
1667 try:
1668 file.close()
1669 except Exception:
1670 logger.debug("best-effort file stream close", exc_info=True)
1672 # Prepare response
1673 response_data = {
1674 "status": "success",
1675 "processed_files": processed_files,
1676 "total_files": total_files,
1677 "extracted_texts": extracted_texts,
1678 "combined_text": "\n\n".join(
1679 [
1680 f"--- From {item['filename']} ---\n{item['text']}"
1681 for item in extracted_texts
1682 ]
1683 ),
1684 "errors": errors,
1685 }
1687 if processed_files == 0:
1688 return jsonify(
1689 {
1690 "status": "error",
1691 "message": "No files were processed successfully",
1692 "errors": errors,
1693 }
1694 ), 400
1696 return jsonify(response_data)
1698 except Exception:
1699 logger.exception("Error processing PDF upload")
1700 return jsonify({"error": "Failed to process PDF files"}), 500