Coverage for src/local_deep_research/web/routes/research_routes.py: 91%

698 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1import io 

2import json 

3from datetime import datetime, UTC 

4from pathlib import Path 

5 

6from flask import ( 

7 Blueprint, 

8 jsonify, 

9 redirect, 

10 request, 

11 send_file, 

12 session, 

13 url_for, 

14) 

15from loguru import logger 

16from ...settings.logger import log_settings 

17from sqlalchemy import func 

18 

19# Security imports 

20from ...config.constants import DEFAULT_OLLAMA_URL 

21from ...exceptions import DuplicateResearchError, SystemAtCapacityError 

22from ...llm.providers.base import normalize_provider 

23from ...constants import ResearchStatus 

24from ...security import ( 

25 FileUploadValidator, 

26 UnsafeFilenameError, 

27 filter_research_metadata, 

28 sanitize_filename, 

29 strip_settings_snapshot, 

30) 

31from ...security.rate_limiter import ( 

32 upload_rate_limit_ip, 

33 upload_rate_limit_user, 

34) 

35from ...security.decorators import require_json_body 

36from ...config.paths import get_config_directory 

37 

38# Services imports 

39from ..services.pdf_extraction_service import get_pdf_extraction_service 

40from ..services.pdf_service import ( 

41 MissingPDFDependencyError, 

42 get_weasyprint_install_instructions, 

43) 

44 

45from ...database.models import ( 

46 QueuedResearch, 

47 ResearchHistory, 

48 ResearchLog, 

49 UserActiveResearch, 

50) 

51from ...database.models.library import Document as Document 

52from ...database.encrypted_db import db_manager 

53from ...database.session_context import get_g_db_session, get_user_db_session 

54from ..auth.decorators import login_required 

55from ..auth.password_utils import get_user_password 

56from ..models.database import calculate_duration 

57from ..services.research_service import ( 

58 export_report_to_memory, 

59 run_research_process, 

60 start_research_process, 

61) 

62from ...security.rate_limiter import limiter 

63from ..utils.templates import render_template_with_defaults 

64from .globals import ( 

65 append_research_log, 

66 get_active_research_ids, 

67 get_research_field, 

68 is_research_active, 

69 set_termination_flag, 

70) 

71 

72# Create a Blueprint for the research application 

73research_bp = Blueprint("research", __name__) 

74 

75 

76# NOTE: Routes use session["username"] (not .get()) intentionally. 

77# @login_required guarantees the key exists; direct access fails fast 

78# if the decorator is ever removed. 

79 

80 

81# Add static route at the root level 

82@research_bp.route("/redirect-static/<path:path>") 

83def redirect_static(path): 

84 """Redirect old static URLs to new static URLs""" 

85 return redirect(url_for("static", filename=path)) 

86 

87 

88@research_bp.route("/progress/<string:research_id>") 

89@login_required 

90def progress_page(research_id): 

91 """Render the research progress page""" 

92 return render_template_with_defaults("pages/progress.html") 

93 

94 

95@research_bp.route("/details/<string:research_id>") 

96@login_required 

97def research_details_page(research_id): 

98 """Render the research details page""" 

99 return render_template_with_defaults("pages/details.html") 

100 

101 

102@research_bp.route("/results/<string:research_id>") 

103@login_required 

104def results_page(research_id): 

105 """Render the research results page""" 

106 return render_template_with_defaults("pages/results.html") 

107 

108 

109@research_bp.route("/history") 

110@login_required 

111def history_page(): 

112 """Render the history page""" 

113 return render_template_with_defaults("pages/history.html") 

114 

115 

116# Add missing settings routes 

117@research_bp.route("/settings", methods=["GET"]) 

118@login_required 

119def settings_page(): 

120 """Render the settings page""" 

121 return render_template_with_defaults("settings_dashboard.html") 

122 

123 

124def _extract_research_params(data, settings_manager): 

125 """Extract and resolve research parameters from request data and settings. 

126 

127 Returns a dict with keys: model_provider, model, custom_endpoint, 

128 ollama_url, search_engine, max_results, time_period, iterations, 

129 questions_per_iteration, strategy. 

130 """ 

131 model_provider = data.get("model_provider") 

132 if not model_provider: 

133 model_provider = settings_manager.get_setting("llm.provider", "ollama") 

134 logger.debug( 

135 f"No model_provider in request, using database setting: {model_provider}" 

136 ) 

137 else: 

138 logger.debug(f"Using model_provider from request: {model_provider}") 

139 # Normalize provider to lowercase canonical form 

140 model_provider = normalize_provider(model_provider) 

141 

142 model = data.get("model") 

143 if not model: 

144 model = settings_manager.get_setting("llm.model", None) 

145 logger.debug(f"No model in request, using database setting: {model}") 

146 else: 

147 logger.debug(f"Using model from request: {model}") 

148 

149 custom_endpoint = data.get("custom_endpoint") 

150 if not custom_endpoint and model_provider == "openai_endpoint": 

151 custom_endpoint = settings_manager.get_setting( 

152 "llm.openai_endpoint.url", None 

153 ) 

154 logger.debug( 

155 f"No custom_endpoint in request, using database setting: {custom_endpoint}" 

156 ) 

157 

158 ollama_url = data.get("ollama_url") 

159 if not ollama_url and model_provider == "ollama": 

160 ollama_url = settings_manager.get_setting( 

161 "llm.ollama.url", DEFAULT_OLLAMA_URL 

162 ) 

163 logger.debug( 

164 f"No ollama_url in request, using database setting: {ollama_url}" 

165 ) 

166 

167 search_engine = data.get("search_engine") or data.get("search_tool") 

168 if not search_engine: 

169 search_engine = settings_manager.get_setting("search.tool", "searxng") 

170 

171 max_results = data.get("max_results") 

172 time_period = data.get("time_period") 

173 

174 iterations = data.get("iterations") 

175 if iterations is None: 

176 iterations = settings_manager.get_setting("search.iterations", 5) 

177 

178 questions_per_iteration = data.get("questions_per_iteration") 

179 if questions_per_iteration is None: 

180 questions_per_iteration = settings_manager.get_setting( 

181 "search.questions_per_iteration", 5 

182 ) 

183 

184 strategy = data.get("strategy") 

185 if not strategy: 

186 strategy = settings_manager.get_setting( 

187 "search.search_strategy", "source-based" 

188 ) 

189 

190 return { 

191 "model_provider": model_provider, 

192 "model": model, 

193 "custom_endpoint": custom_endpoint, 

194 "ollama_url": ollama_url, 

195 "search_engine": search_engine, 

196 "max_results": max_results, 

197 "time_period": time_period, 

198 "iterations": iterations, 

199 "questions_per_iteration": questions_per_iteration, 

200 "strategy": strategy, 

201 } 

202 

203 

204def _queue_research( 

205 db_session, 

206 username, 

207 research_id, 

208 query, 

209 mode, 

210 research_settings, 

211 params, 

212 session_id, 

213 reason="", 

214 research=None, 

215): 

216 """Add research to queue and notify processor. Returns a JSON response. 

217 

218 Args: 

219 reason: Optional prefix explaining why the research was queued 

220 (e.g. "due to concurrent limit"). 

221 research: Optional ResearchHistory object whose status should be set 

222 to QUEUED atomically with the queue record insertion. 

223 """ 

224 max_position = ( 

225 db_session.query(func.max(QueuedResearch.position)) 

226 .filter_by(username=username) 

227 .scalar() 

228 or 0 

229 ) 

230 

231 queued_record = QueuedResearch( 

232 username=username, 

233 research_id=research_id, 

234 query=query, 

235 mode=mode, 

236 settings_snapshot=research_settings, 

237 position=max_position + 1, 

238 ) 

239 db_session.add(queued_record) 

240 if research is not None: 

241 research.status = ResearchStatus.QUEUED # type: ignore[assignment] 

242 db_session.commit() 

243 logger.info( 

244 f"Queued research {research_id} at position {max_position + 1} for user {username}" 

245 ) 

246 

247 from ..queue.processor_v2 import queue_processor 

248 

249 queue_processor.notify_research_queued( 

250 username, 

251 research_id, 

252 session_id=session_id, 

253 query=query, 

254 mode=mode, 

255 settings_snapshot=research_settings, 

256 model_provider=params["model_provider"], 

257 model=params["model"], 

258 custom_endpoint=params["custom_endpoint"], 

259 search_engine=params["search_engine"], 

260 max_results=params["max_results"], 

261 time_period=params["time_period"], 

262 iterations=params["iterations"], 

263 questions_per_iteration=params["questions_per_iteration"], 

264 strategy=params["strategy"], 

265 ) 

266 

267 position = max_position + 1 

268 reason_text = f" {reason}" if reason else "" 

269 message = f"Your research has been queued{reason_text}. Position in queue: {position}" 

270 return jsonify( 

271 { 

272 "status": ResearchStatus.QUEUED, 

273 "research_id": research_id, 

274 "queue_position": position, 

275 "message": message, 

276 } 

277 ) 

278 

279 

280@research_bp.route("/api/start_research", methods=["POST"]) 

281@login_required 

282@require_json_body(error_format="status") 

283def start_research(): 

284 data = request.json 

285 # Debug logging to trace model parameter 

286 logger.debug(f"Request data keys: {list(data.keys())}") 

287 

288 # Check if this is a news search 

289 metadata = data.get("metadata", {}) 

290 if metadata.get("is_news_search"): 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true

291 logger.info( 

292 f"News search request received: triggered_by={metadata.get('triggered_by', 'unknown')}" 

293 ) 

294 

295 query = data.get("query") 

296 mode = data.get("mode", "quick") 

297 

298 # Replace date placeholders if they exist 

299 if query and "YYYY-MM-DD" in query: 

300 # Use local system time 

301 current_date = datetime.now(UTC).strftime("%Y-%m-%d") 

302 

303 original_query = query 

304 query = query.replace("YYYY-MM-DD", current_date) 

305 logger.info( 

306 f"Replaced date placeholder in query: {original_query[:100]}... -> {query[:100]}..." 

307 ) 

308 logger.info(f"Using date: {current_date}") 

309 

310 # Update metadata to track the replacement 

311 if not metadata: 311 ↛ 313line 311 didn't jump to line 313 because the condition on line 311 was always true

312 metadata = {} 

313 metadata["original_query"] = original_query 

314 metadata["processed_query"] = query 

315 metadata["date_replaced"] = current_date 

316 data["metadata"] = metadata 

317 

318 # Get parameters from request or use database settings 

319 from ...settings.manager import SettingsManager 

320 

321 username = session["username"] 

322 

323 with get_user_db_session(username) as db_session: 

324 settings_manager = SettingsManager(db_session=db_session) 

325 params = _extract_research_params(data, settings_manager) 

326 

327 model_provider = params["model_provider"] 

328 model = params["model"] 

329 custom_endpoint = params["custom_endpoint"] 

330 search_engine = params["search_engine"] 

331 max_results = params["max_results"] 

332 time_period = params["time_period"] 

333 iterations = params["iterations"] 

334 questions_per_iteration = params["questions_per_iteration"] 

335 strategy = params["strategy"] 

336 

337 # Debug logging for model parameter specifically 

338 logger.debug( 

339 f"Extracted model value: '{model}' (type: {type(model).__name__})" 

340 ) 

341 

342 # Log the selections for troubleshooting 

343 logger.info( 

344 f"Starting research with provider: {model_provider}, model: {model}, search engine: {search_engine}" 

345 ) 

346 logger.info( 

347 f"Additional parameters: max_results={max_results}, time_period={time_period}, iterations={iterations}, questions={questions_per_iteration}, strategy={strategy}" 

348 ) 

349 

350 if not query: 

351 return jsonify({"status": "error", "message": "Query is required"}), 400 

352 

353 # Validate required parameters based on provider 

354 if model_provider == "openai_endpoint" and not custom_endpoint: 

355 return ( 

356 jsonify( 

357 { 

358 "status": "error", 

359 "message": "Custom endpoint URL is required for OpenAI endpoint provider", 

360 } 

361 ), 

362 400, 

363 ) 

364 

365 if not model: 

366 logger.error( 

367 f"No model specified or configured. Provider: {model_provider}" 

368 ) 

369 return jsonify( 

370 { 

371 "status": "error", 

372 "message": "Model is required. Please configure a model in the settings.", 

373 } 

374 ), 400 

375 

376 # Check if the user has too many active researches 

377 username = session["username"] 

378 

379 # Get max concurrent researches from settings 

380 from ...settings import SettingsManager 

381 

382 with get_user_db_session() as db_session: 

383 settings_manager = SettingsManager(db_session) 

384 max_concurrent_researches = settings_manager.get_setting( 

385 "app.max_concurrent_researches", 3 

386 ) 

387 

388 # Use existing session from g to check active researches 

389 try: 

390 db_session = get_g_db_session() 

391 if db_session: 

392 # First, clean up stale entries where the research thread has died 

393 # (e.g. crashed with an unhandled exception before cleanup ran). 

394 # Without this, dead researches permanently block the queue. 

395 from ..routes.globals import reclaim_stale_user_active_research 

396 

397 # No grace cutoff here — research_routes.start_research has 

398 # always reclaimed all dead-thread rows immediately. The chat 

399 # send-message path uses a 30s grace via grace_cutoff_dt. 

400 if reclaim_stale_user_active_research( 400 ↛ 403line 400 didn't jump to line 403 because the condition on line 400 was never true

401 db_session, username, logger=logger 

402 ): 

403 db_session.commit() 

404 

405 # Now count truly active researches 

406 active_count = ( 

407 db_session.query(UserActiveResearch) 

408 .filter_by(username=username, status=ResearchStatus.IN_PROGRESS) 

409 .count() 

410 ) 

411 

412 # Debug logging 

413 logger.info( 

414 f"Active research count for {username}: {active_count}/{max_concurrent_researches}" 

415 ) 

416 

417 should_queue = active_count >= max_concurrent_researches 

418 logger.info(f"Should queue new research: {should_queue}") 

419 else: 

420 logger.warning( 

421 "No database session available to check active researches" 

422 ) 

423 should_queue = False 

424 except Exception: 

425 logger.exception("Failed to check active researches") 

426 # Default to not queueing if we can't check 

427 should_queue = False 

428 

429 # For non-queued research, verify password is available BEFORE creating DB records 

430 # (queued research gets password later via queue processor) 

431 user_password = None 

432 if not should_queue: 

433 user_password = get_user_password(username) 

434 

435 if not user_password: 

436 if db_manager.has_encryption: 

437 logger.error( 

438 f"No password available for user {username} with encrypted database - " 

439 "cannot start research (session password expired or lost after server restart)" 

440 ) 

441 # Use status/message keys to match the research API convention 

442 # (the research frontend checks data.status and data.message) 

443 return jsonify( 

444 { 

445 "status": "error", 

446 "message": "Your session has expired. Please log out and log back in to start research.", 

447 } 

448 ), 401 

449 logger.warning( 

450 f"No password available for metrics access for user {username}" 

451 ) 

452 

453 # Create a record in the database with explicit UTC timestamp 

454 import uuid 

455 import threading 

456 

457 created_at = datetime.now(UTC).isoformat() 

458 research_id = str(uuid.uuid4()) 

459 

460 # Create organized research metadata with settings snapshot 

461 research_settings = { 

462 # Direct submission parameters 

463 "submission": { 

464 "model_provider": model_provider, 

465 "model": model, 

466 "custom_endpoint": custom_endpoint, 

467 "search_engine": search_engine, 

468 "max_results": max_results, 

469 "time_period": time_period, 

470 "iterations": iterations, 

471 "questions_per_iteration": questions_per_iteration, 

472 "strategy": strategy, 

473 }, 

474 # System information 

475 "system": { 

476 "timestamp": created_at, 

477 "user": username, 

478 "version": "1.0", # Track metadata version for future migrations 

479 "server_url": request.host_url, # Add server URL for link generation 

480 }, 

481 } 

482 

483 # Add any additional metadata from request 

484 additional_metadata = data.get("metadata", {}) 

485 if additional_metadata: 

486 research_settings.update(additional_metadata) 

487 # Get complete settings snapshot for this research 

488 try: 

489 from local_deep_research.settings import SettingsManager 

490 

491 # Get or lazily create a session for settings snapshot 

492 db_session_for_settings = get_g_db_session() 

493 if db_session_for_settings: 

494 # Create SettingsManager with the existing session 

495 username = session["username"] 

496 # Ensure any pending changes are committed 

497 try: 

498 db_session_for_settings.commit() 

499 except Exception: 

500 db_session_for_settings.rollback() 

501 settings_manager = SettingsManager( 

502 db_session_for_settings, owns_session=False 

503 ) 

504 # Get all current settings as a snapshot (bypass cache to ensure fresh data) 

505 all_settings = settings_manager.get_all_settings(bypass_cache=True) 

506 

507 # Add settings snapshot to metadata 

508 research_settings["settings_snapshot"] = all_settings 

509 logger.info( 

510 f"Captured {len(all_settings)} settings for research {research_id}" 

511 ) 

512 else: 

513 # If no session in g, create a new one temporarily to get settings 

514 logger.warning( 

515 "No database session in g, creating temporary session for settings snapshot" 

516 ) 

517 from ...database.thread_local_session import get_metrics_session 

518 

519 password = get_user_password(username) 

520 

521 if password: 

522 temp_session = get_metrics_session(username, password) 

523 if temp_session: 

524 username = session["username"] 

525 settings_manager = SettingsManager( 

526 temp_session, owns_session=False 

527 ) 

528 all_settings = settings_manager.get_all_settings( 

529 bypass_cache=True 

530 ) 

531 research_settings["settings_snapshot"] = all_settings 

532 logger.info( 

533 f"Captured {len(all_settings)} settings using temporary session for research {research_id}" 

534 ) 

535 else: 

536 logger.error( 

537 "Failed to create temporary session for settings snapshot" 

538 ) 

539 return jsonify( 

540 { 

541 "status": "error", 

542 "message": "Cannot create research without settings snapshot.", 

543 } 

544 ), 500 

545 else: 

546 logger.error( 

547 "No password available to create session for settings snapshot" 

548 ) 

549 return jsonify( 

550 { 

551 "status": "error", 

552 "message": "Cannot create research without settings snapshot.", 

553 } 

554 ), 500 

555 except Exception: 

556 logger.exception("Failed to capture settings snapshot") 

557 # Cannot continue without settings snapshot for thread-based research 

558 return jsonify( 

559 { 

560 "status": "error", 

561 "message": "Failed to capture settings for research. Please try again.", 

562 } 

563 ), 500 

564 

565 # Use existing session from g 

566 username = session["username"] 

567 

568 try: 

569 # Get or lazily create a session 

570 db_session = get_g_db_session() 

571 if db_session: 

572 # Determine initial status based on whether we need to queue 

573 initial_status = ( 

574 ResearchStatus.QUEUED 

575 if should_queue 

576 else ResearchStatus.IN_PROGRESS 

577 ) 

578 

579 research = ResearchHistory( 

580 id=research_id, # Set UUID as primary key 

581 query=query, 

582 mode=mode, 

583 status=initial_status, 

584 created_at=created_at, 

585 progress_log=[{"time": created_at, "progress": 0}], 

586 research_meta=research_settings, 

587 ) 

588 db_session.add(research) 

589 db_session.commit() 

590 logger.info( 

591 f"Created research entry with UUID: {research_id}, status: {initial_status}" 

592 ) 

593 

594 if should_queue: 

595 session_id = session.get("session_id") 

596 return _queue_research( 

597 db_session, 

598 username, 

599 research_id, 

600 query, 

601 mode, 

602 research_settings, 

603 params, 

604 session_id, 

605 ) 

606 # Start immediately 

607 # Create active research tracking record 

608 import threading 

609 

610 active_record = UserActiveResearch( 

611 username=username, 

612 research_id=research_id, 

613 status=ResearchStatus.IN_PROGRESS, 

614 thread_id=str(threading.current_thread().ident), 

615 settings_snapshot=research_settings, 

616 ) 

617 db_session.add(active_record) 

618 db_session.commit() 

619 logger.info(f"Created active research record for user {username}") 

620 

621 # Double-check the count after committing to handle race conditions 

622 # Use the existing session for the recheck 

623 try: 

624 # Use the same session we already have 

625 recheck_session = db_session 

626 final_count = ( 

627 recheck_session.query(UserActiveResearch) 

628 .filter_by( 

629 username=username, status=ResearchStatus.IN_PROGRESS 

630 ) 

631 .count() 

632 ) 

633 logger.info( 

634 f"Final active count after commit: {final_count}/{max_concurrent_researches}" 

635 ) 

636 

637 if final_count > max_concurrent_researches: 

638 # We exceeded the limit due to a race condition 

639 # Remove this record and queue instead 

640 logger.warning( 

641 f"Race condition detected: {final_count} > {max_concurrent_researches}, moving to queue" 

642 ) 

643 db_session.delete(active_record) 

644 db_session.commit() 

645 

646 session_id = session.get("session_id") 

647 return _queue_research( 

648 db_session, 

649 username, 

650 research_id, 

651 query, 

652 mode, 

653 research_settings, 

654 params, 

655 session_id, 

656 reason="due to concurrent limit", 

657 research=research, 

658 ) 

659 except Exception: 

660 logger.warning("Could not recheck active count") 

661 

662 except Exception: 

663 logger.exception("Failed to create research entry") 

664 return jsonify( 

665 {"status": "error", "message": "Failed to create research entry"} 

666 ), 500 

667 

668 # Only start the research if not queued 

669 if not should_queue: 669 ↛ 810line 669 didn't jump to line 810 because the condition on line 669 was always true

670 # Save the research strategy to the database before starting the thread 

671 try: 

672 from ..services.research_service import save_research_strategy 

673 

674 save_research_strategy(research_id, strategy, username=username) 

675 except Exception: 

676 logger.warning("Could not save research strategy") 

677 

678 # Debug logging for settings snapshot 

679 snapshot_data = research_settings.get("settings_snapshot", {}) 

680 log_settings(snapshot_data, "Settings snapshot being passed to thread") 

681 if "search.tool" in snapshot_data: 

682 logger.debug( 

683 f"search.tool in snapshot: {snapshot_data['search.tool']}" 

684 ) 

685 else: 

686 logger.debug("search.tool NOT in snapshot") 

687 

688 # Start the research process with the selected parameters. 

689 # If the spawn raises, the UserActiveResearch + IN_PROGRESS 

690 # ResearchHistory rows persisted above would otherwise be 

691 # permanently orphaned (no thread, no cleanup path). Catch any 

692 # exception, mark the research FAILED, delete the active row, 

693 # and return 500 — same contract as the queue processor's 

694 # terminal-failure branch introduced in #3481. 

695 try: 

696 research_thread = start_research_process( 

697 research_id, 

698 query, 

699 mode, 

700 run_research_process, 

701 username=username, # Pass username to the thread 

702 user_password=user_password, # Pass password for database access 

703 model_provider=model_provider, 

704 model=model, 

705 custom_endpoint=custom_endpoint, 

706 search_engine=search_engine, 

707 max_results=max_results, 

708 time_period=time_period, 

709 iterations=iterations, 

710 questions_per_iteration=questions_per_iteration, 

711 strategy=strategy, 

712 settings_snapshot=snapshot_data, # Pass complete settings 

713 ) 

714 except DuplicateResearchError: 

715 # A live thread already owns this research_id. Do NOT delete 

716 # the UserActiveResearch row or mark ResearchHistory FAILED — 

717 # that state belongs to the live thread, and mutating it 

718 # would terminate a running research from the user's 

719 # perspective while it keeps executing. Same contract as the 

720 # queue processor's dedicated dup branch (#3506). 

721 logger.warning( 

722 f"Duplicate live thread detected for {research_id} " 

723 "on direct submission; leaving state intact" 

724 ) 

725 return jsonify( 

726 { 

727 "status": "error", 

728 "message": "Research is already running.", 

729 } 

730 ), 409 

731 except SystemAtCapacityError: 

732 # System at concurrent-research capacity. Roll back the rows 

733 # committed above (UserActiveResearch + IN_PROGRESS history) 

734 # and return 429 so the client can retry shortly. 

735 logger.warning( 

736 f"SystemAtCapacityError on direct submission for " 

737 f"{research_id}; rolling back orphan rows" 

738 ) 

739 try: 

740 with get_user_db_session(username) as cleanup_session: 

741 stale_active = ( 

742 cleanup_session.query(UserActiveResearch) 

743 .filter_by(username=username, research_id=research_id) 

744 .first() 

745 ) 

746 if stale_active: 

747 cleanup_session.delete(stale_active) 

748 cleanup_session.query(ResearchHistory).filter_by( 

749 id=research_id 

750 ).delete() 

751 cleanup_session.commit() 

752 except Exception: 

753 logger.exception( 

754 "Cleanup after SystemAtCapacityError raised; " 

755 "leaving orphan rows for the reconciler" 

756 ) 

757 return jsonify( 

758 { 

759 "status": "error", 

760 "message": "Server is at research capacity. Please retry shortly.", 

761 } 

762 ), 429 

763 except Exception: 

764 logger.exception( 

765 f"Failed to spawn research thread for {research_id}" 

766 ) 

767 try: 

768 with get_user_db_session(username) as cleanup_session: 

769 stale_active = ( 

770 cleanup_session.query(UserActiveResearch) 

771 .filter_by(username=username, research_id=research_id) 

772 .first() 

773 ) 

774 if stale_active: 774 ↛ 776line 774 didn't jump to line 776 because the condition on line 774 was always true

775 cleanup_session.delete(stale_active) 

776 research_row = ( 

777 cleanup_session.query(ResearchHistory) 

778 .filter_by(id=research_id) 

779 .first() 

780 ) 

781 if research_row: 781 ↛ 783line 781 didn't jump to line 783 because the condition on line 781 was always true

782 research_row.status = ResearchStatus.FAILED 

783 cleanup_session.commit() 

784 except Exception: 

785 logger.exception( 

786 "Cleanup after spawn failure raised; leaving " 

787 "orphan rows for the reconciler to handle" 

788 ) 

789 return jsonify( 

790 { 

791 "status": "error", 

792 "message": "Failed to start research. Please try again.", 

793 } 

794 ), 500 

795 

796 # Update the active research record with the actual thread ID. 

797 try: 

798 with get_user_db_session(username) as thread_session: 

799 active_record = ( 

800 thread_session.query(UserActiveResearch) 

801 .filter_by(username=username, research_id=research_id) 

802 .first() 

803 ) 

804 if active_record: 804 ↛ 810line 804 didn't jump to line 810

805 active_record.thread_id = str(research_thread.ident) 

806 thread_session.commit() 

807 except Exception: 

808 logger.warning("Could not update thread ID") 

809 

810 return jsonify({"status": "success", "research_id": research_id}) 

811 

812 

813@research_bp.route("/api/terminate/<string:research_id>", methods=["POST"]) 

814@login_required 

815def terminate_research(research_id): 

816 """Terminate an in-progress research process""" 

817 username = session["username"] 

818 

819 # Check if the research exists and is in progress 

820 try: 

821 with get_user_db_session(username) as db_session: 

822 research = ( 

823 db_session.query(ResearchHistory) 

824 .filter_by(id=research_id) 

825 .first() 

826 ) 

827 

828 if not research: 

829 return jsonify( 

830 {"status": "error", "message": "Research not found"} 

831 ), 404 

832 

833 status = research.status 

834 

835 # If it's already in a terminal state, return success 

836 if status in ( 

837 ResearchStatus.COMPLETED, 

838 ResearchStatus.SUSPENDED, 

839 ResearchStatus.FAILED, 

840 ResearchStatus.ERROR, 

841 ): 

842 return jsonify( 

843 { 

844 "status": "success", 

845 "message": f"Research already {status}", 

846 } 

847 ) 

848 

849 # Check if it's in the active_research dict 

850 if not is_research_active(research_id): 

851 # The worker may not be registered in _active_research yet: a 

852 # just-submitted research commits its IN_PROGRESS row before the 

853 # worker thread registers itself (spawn-grace window). Set the 

854 # termination flag anyway so a worker that starts right after 

855 # this still sees it and aborts at its first checkpoint — 

856 # otherwise the user's Stop is silently ignored and the research 

857 # runs to completion (overwriting this SUSPENDED status). The 

858 # flag is harmless if no worker ever starts. 

859 set_termination_flag(research_id) 

860 research.status = ResearchStatus.SUSPENDED 

861 db_session.commit() 

862 return jsonify( 

863 {"status": "success", "message": "Research terminated"} 

864 ) 

865 

866 # Set the termination flag 

867 set_termination_flag(research_id) 

868 

869 # Log the termination request - using UTC timestamp 

870 timestamp = datetime.now(UTC).isoformat() 

871 termination_message = "Research termination requested by user" 

872 current_progress = get_research_field(research_id, "progress", 0) 

873 

874 # Create log entry 

875 log_entry = { 

876 "time": timestamp, 

877 "message": termination_message, 

878 "progress": current_progress, 

879 "metadata": {"phase": "termination"}, 

880 } 

881 

882 # Add to in-memory log 

883 append_research_log(research_id, log_entry) 

884 

885 # Add to database log 

886 logger.log("MILESTONE", f"Research ended: {termination_message}") 

887 

888 # Update the log in the database 

889 if research.progress_log: 889 ↛ 898line 889 didn't jump to line 898 because the condition on line 889 was always true

890 try: 

891 if isinstance(research.progress_log, str): 

892 current_log = json.loads(research.progress_log) 

893 else: 

894 current_log = research.progress_log 

895 except Exception: 

896 current_log = [] 

897 else: 

898 current_log = [] 

899 

900 current_log.append(log_entry) 

901 research.progress_log = current_log 

902 research.status = ResearchStatus.SUSPENDED 

903 db_session.commit() 

904 

905 # Emit a socket event for the termination request 

906 try: 

907 event_data = { 

908 "status": ResearchStatus.SUSPENDED, 

909 "message": "Research was suspended by user request", 

910 } 

911 

912 from ..services.socket_service import SocketIOService 

913 

914 SocketIOService().emit_to_subscribers( 

915 "progress", research_id, event_data 

916 ) 

917 

918 except Exception: 

919 logger.exception("Socket emit error (non-critical)") 

920 

921 return jsonify( 

922 { 

923 "status": "success", 

924 "message": "Research termination requested", 

925 } 

926 ) 

927 except Exception: 

928 logger.exception("Error terminating research") 

929 return jsonify( 

930 {"status": "error", "message": "Failed to terminate research"} 

931 ), 500 

932 

933 

934@research_bp.route("/api/delete/<string:research_id>", methods=["DELETE"]) 

935@login_required 

936def delete_research(research_id): 

937 """Delete a research record""" 

938 username = session["username"] 

939 

940 try: 

941 with get_user_db_session(username) as db_session: 

942 research = ( 

943 db_session.query(ResearchHistory) 

944 .filter_by(id=research_id) 

945 .first() 

946 ) 

947 

948 if not research: 

949 return jsonify( 

950 {"status": "error", "message": "Research not found"} 

951 ), 404 

952 

953 status = research.status 

954 report_path = research.report_path 

955 

956 # Don't allow deleting research in progress 

957 if status == ResearchStatus.IN_PROGRESS and is_research_active( 

958 research_id 

959 ): 

960 return ( 

961 jsonify( 

962 { 

963 "status": "error", 

964 "message": "Cannot delete research that is in progress", 

965 } 

966 ), 

967 400, 

968 ) 

969 

970 # Delete report file if it exists 

971 if report_path and Path(report_path).exists(): 971 ↛ 972line 971 didn't jump to line 972 because the condition on line 971 was never true

972 try: 

973 Path(report_path).unlink() 

974 except Exception: 

975 logger.exception("Error removing report file") 

976 

977 # Delete the database record 

978 db_session.delete(research) 

979 db_session.commit() 

980 

981 return jsonify({"status": "success"}) 

982 except Exception: 

983 logger.exception("Error deleting research") 

984 return jsonify( 

985 {"status": "error", "message": "Failed to delete research"} 

986 ), 500 

987 

988 

989@research_bp.route("/api/clear_history", methods=["POST"]) 

990@login_required 

991def clear_history(): 

992 """Clear all research history""" 

993 username = session["username"] 

994 

995 try: 

996 with get_user_db_session(username) as db_session: 

997 # Get all research records first to clean up files 

998 research_records = db_session.query(ResearchHistory).all() 

999 

1000 # Get IDs of currently active research (snapshot) 

1001 active_ids = get_active_research_ids() 

1002 

1003 # Clean up report files 

1004 for research in research_records: 

1005 # Skip active research 

1006 if research.id in active_ids: 1006 ↛ 1007line 1006 didn't jump to line 1007 because the condition on line 1006 was never true

1007 continue 

1008 

1009 # Delete report file if it exists 

1010 if research.report_path and Path(research.report_path).exists(): 1010 ↛ 1011line 1010 didn't jump to line 1011 because the condition on line 1010 was never true

1011 try: 

1012 Path(research.report_path).unlink() 

1013 except Exception: 

1014 logger.exception("Error removing report file") 

1015 

1016 # Query.delete() bypasses ORM cascade; child rows clean up 

1017 # via DDL-level ondelete="CASCADE" only because PRAGMA 

1018 # foreign_keys = ON is set on every connection. 

1019 if active_ids: 

1020 db_session.query(ResearchHistory).filter( 

1021 ~ResearchHistory.id.in_(active_ids) 

1022 ).delete(synchronize_session=False) 

1023 else: 

1024 db_session.query(ResearchHistory).delete( 

1025 synchronize_session=False 

1026 ) 

1027 

1028 db_session.commit() 

1029 

1030 return jsonify({"status": "success"}) 

1031 except Exception: 

1032 logger.exception("Error clearing history") 

1033 return jsonify( 

1034 {"status": "error", "message": "Failed to process request"} 

1035 ), 500 

1036 

1037 

1038@research_bp.route("/open_file_location", methods=["POST"]) 

1039@login_required 

1040def open_file_location(): 

1041 """Open a file location in the system file explorer. 

1042 

1043 Security: This endpoint is disabled for server deployments. 

1044 It only makes sense for desktop usage where the server and client are on the same machine. 

1045 """ 

1046 return jsonify( 

1047 { 

1048 "status": "error", 

1049 "message": "This feature is disabled. It is only available in desktop mode.", 

1050 } 

1051 ), 403 

1052 

1053 

1054@research_bp.route("/api/save_raw_config", methods=["POST"]) 

1055@login_required 

1056@require_json_body(error_format="success") 

1057def save_raw_config(): 

1058 """Save raw configuration""" 

1059 data = request.json 

1060 raw_config = data.get("raw_config") 

1061 

1062 if not raw_config: 

1063 return ( 

1064 jsonify( 

1065 {"success": False, "error": "Raw configuration is required"} 

1066 ), 

1067 400, 

1068 ) 

1069 

1070 # Security: Parse and validate the TOML to block dangerous keys 

1071 try: 

1072 import tomllib 

1073 except ImportError: 

1074 import tomli as tomllib # type: ignore[no-redef] 

1075 

1076 try: 

1077 parsed_config = tomllib.loads(raw_config) 

1078 except Exception: 

1079 logger.warning("Invalid TOML configuration") 

1080 # Don't expose internal exception details to users (CWE-209) 

1081 return jsonify( 

1082 { 

1083 "success": False, 

1084 "error": "Invalid TOML syntax. Please check your configuration format.", 

1085 } 

1086 ), 400 

1087 

1088 # Security: Check for dangerous keys that could enable code execution 

1089 # These patterns match keys used for dynamic module imports 

1090 BLOCKED_KEY_PATTERNS = ["module_path", "class_name", "module", "class"] 

1091 

1092 def find_blocked_keys(obj, path=""): 

1093 """Recursively find any blocked keys in the config.""" 

1094 blocked = [] 

1095 if isinstance(obj, dict): 

1096 for key, value in obj.items(): 

1097 current_path = f"{path}.{key}" if path else key 

1098 key_lower = key.lower() 

1099 for pattern in BLOCKED_KEY_PATTERNS: 

1100 if pattern in key_lower: 

1101 blocked.append(current_path) 

1102 break 

1103 # Recurse into nested dicts 

1104 blocked.extend(find_blocked_keys(value, current_path)) 

1105 elif isinstance(obj, list): 1105 ↛ 1106line 1105 didn't jump to line 1106 because the condition on line 1105 was never true

1106 for i, item in enumerate(obj): 

1107 blocked.extend(find_blocked_keys(item, f"{path}[{i}]")) 

1108 return blocked 

1109 

1110 blocked_keys = find_blocked_keys(parsed_config) 

1111 if blocked_keys: 

1112 logger.warning( 

1113 f"Security: Blocked attempt to write config with dangerous keys: {blocked_keys}" 

1114 ) 

1115 return jsonify( 

1116 { 

1117 "success": False, 

1118 "error": "Configuration contains protected keys that cannot be modified", 

1119 "blocked_keys": blocked_keys, 

1120 } 

1121 ), 403 

1122 

1123 try: 

1124 from ...security.file_write_verifier import write_file_verified 

1125 

1126 # Get the config file path (uses centralized path config, respects LDR_DATA_DIR) 

1127 config_dir = get_config_directory() 

1128 config_path = config_dir / "config.toml" 

1129 

1130 # Write the configuration to file 

1131 write_file_verified( 

1132 config_path, 

1133 raw_config, 

1134 "system.allow_config_write", 

1135 context="system configuration file", 

1136 ) 

1137 

1138 return jsonify({"success": True}) 

1139 except Exception: 

1140 logger.exception("Error saving configuration file") 

1141 return jsonify( 

1142 {"success": False, "error": "Failed to process request"} 

1143 ), 500 

1144 

1145 

1146@research_bp.route("/api/history", methods=["GET"]) 

1147@login_required 

1148def get_history(): 

1149 """Get research history""" 

1150 username = session["username"] 

1151 

1152 try: 

1153 with get_user_db_session(username) as db_session: 

1154 # Query all research history ordered by created_at 

1155 research_records = ( 

1156 db_session.query(ResearchHistory) 

1157 .order_by(ResearchHistory.created_at.desc()) 

1158 .all() 

1159 ) 

1160 

1161 # Pre-compute Document counts in a single GROUP BY query 

1162 # to avoid an N+1 SELECT-COUNT-per-row inside the loop. The 

1163 # symmetric /history/api endpoint in history_routes.py 

1164 # already uses an outerjoin + group_by — this brings 

1165 # /api/history to parity for users with deep history. 

1166 research_ids = [r.id for r in research_records] 

1167 if research_ids: 

1168 doc_count_rows = ( 

1169 db_session.query( 

1170 Document.research_id, func.count(Document.id) 

1171 ) 

1172 .filter(Document.research_id.in_(research_ids)) 

1173 .group_by(Document.research_id) 

1174 .all() 

1175 ) 

1176 doc_counts = dict(doc_count_rows) 

1177 else: 

1178 doc_counts = {} 

1179 

1180 # Build history items while session is active to avoid 

1181 # DetachedInstanceError on ORM attribute access 

1182 history_items = [] 

1183 for research in research_records: 

1184 # Calculate duration if completed 

1185 duration_seconds = None 

1186 if research.completed_at and research.created_at: 

1187 try: 

1188 duration_seconds = calculate_duration( 

1189 research.created_at, research.completed_at 

1190 ) 

1191 except Exception: 

1192 logger.exception("Error calculating duration") 

1193 

1194 # Look up the pre-computed document count. 

1195 doc_count = doc_counts.get(research.id, 0) 

1196 

1197 # Create a history item 

1198 item = { 

1199 "id": research.id, 

1200 "query": research.query, 

1201 "mode": research.mode, 

1202 "status": research.status, 

1203 "created_at": research.created_at, 

1204 "completed_at": research.completed_at, 

1205 "duration_seconds": duration_seconds, 

1206 "metadata": filter_research_metadata( 

1207 research.research_meta 

1208 ), 

1209 "document_count": doc_count, 

1210 } 

1211 if research.chat_session_id is not None: 1211 ↛ 1212line 1211 didn't jump to line 1212 because the condition on line 1211 was never true

1212 item["metadata"]["chat_session_id"] = ( 

1213 research.chat_session_id 

1214 ) 

1215 

1216 # Add title if it exists 

1217 if hasattr(research, "title") and research.title is not None: 

1218 item["title"] = research.title 

1219 

1220 history_items.append(item) 

1221 

1222 return jsonify({"status": "success", "items": history_items}) 

1223 except Exception: 

1224 logger.exception("Error getting history") 

1225 return jsonify( 

1226 {"status": "error", "message": "Failed to process request"} 

1227 ), 500 

1228 

1229 

1230@research_bp.route("/api/research/<string:research_id>") 

1231@login_required 

1232def get_research_details(research_id): 

1233 """Get full details of a research using ORM""" 

1234 username = session["username"] 

1235 

1236 try: 

1237 with get_user_db_session(username) as db_session: 

1238 research = ( 

1239 db_session.query(ResearchHistory) 

1240 .filter(ResearchHistory.id == research_id) 

1241 .first() 

1242 ) 

1243 

1244 if not research: 

1245 return jsonify({"error": "Research not found"}), 404 

1246 

1247 return jsonify( 

1248 { 

1249 "id": research.id, 

1250 "query": research.query, 

1251 "status": research.status, 

1252 "progress": research.progress, 

1253 "progress_percentage": research.progress or 0, 

1254 "mode": research.mode, 

1255 "created_at": research.created_at, 

1256 "completed_at": research.completed_at, 

1257 "report_path": research.report_path, 

1258 "metadata": strip_settings_snapshot(research.research_meta), 

1259 } 

1260 ) 

1261 except Exception: 

1262 logger.exception("Error getting research details") 

1263 return jsonify({"error": "An internal error has occurred"}), 500 

1264 

1265 

1266@research_bp.route("/api/research/<string:research_id>/logs") 

1267@login_required 

1268def get_research_logs(research_id): 

1269 """Get logs for a specific research""" 

1270 username = session["username"] 

1271 

1272 try: 

1273 # First check if the research exists 

1274 with get_user_db_session(username) as db_session: 

1275 research = ( 

1276 db_session.query(ResearchHistory) 

1277 .filter_by(id=research_id) 

1278 .first() 

1279 ) 

1280 if not research: 

1281 return jsonify({"error": "Research not found"}), 404 

1282 

1283 # Get logs from research_logs table 

1284 log_results = ( 

1285 db_session.query(ResearchLog) 

1286 .filter_by(research_id=research_id) 

1287 .order_by(ResearchLog.timestamp) 

1288 .all() 

1289 ) 

1290 

1291 # Extract log attributes while session is active 

1292 # to avoid DetachedInstanceError on ORM attribute access 

1293 logs = [] 

1294 for row in log_results: 

1295 logs.append( 

1296 { 

1297 "id": row.id, 

1298 "message": row.message, 

1299 "timestamp": row.timestamp, 

1300 "log_type": row.level, 

1301 } 

1302 ) 

1303 

1304 return jsonify(logs) 

1305 

1306 except Exception: 

1307 logger.exception("Error getting research logs") 

1308 return jsonify({"error": "An internal error has occurred"}), 500 

1309 

1310 

1311@research_bp.route("/api/report/<string:research_id>") 

1312@login_required 

1313def get_research_report(research_id): 

1314 """Get the research report content""" 

1315 username = session["username"] 

1316 

1317 try: 

1318 with get_user_db_session(username) as db_session: 

1319 # Query using ORM 

1320 research = ( 

1321 db_session.query(ResearchHistory) 

1322 .filter_by(id=research_id) 

1323 .first() 

1324 ) 

1325 

1326 if research is None: 

1327 return jsonify({"error": "Research not found"}), 404 

1328 

1329 # Parse metadata if it exists 

1330 metadata = research.research_meta 

1331 

1332 # research.report_content holds the answer-only string; 

1333 # rebuild the legacy display shape (answer + Sources from 

1334 # research_resources + Metrics from research_meta) on demand. 

1335 from ..services.report_assembly_service import ( 

1336 assemble_full_report, 

1337 get_research_source_links_batch, 

1338 ) 

1339 

1340 content = assemble_full_report(research, db_session) 

1341 # Only None means "research not found" — guarded above. 

1342 # Empty-but-found rows return "" and are valid responses. 

1343 if content is None: 

1344 return jsonify({"error": "Report not found"}), 404 

1345 

1346 # Sources live in the research_resources table, not research_meta. 

1347 # The post-refactor save path never writes the legacy 

1348 # `all_links_of_system` metadata key, so reading it here returned 

1349 # [] for every research created since chat-mode-v2 (#3665). Read 

1350 # the structured table instead — the same source of truth the 

1351 # assembled `content` and the news feed already use. limit=None 

1352 # returns every source (this field was never top-N), matching the 

1353 # full list the assembled `content` renders for the same research. 

1354 sources = get_research_source_links_batch( 

1355 [research.id], db_session, limit=None 

1356 ).get(research.id, []) 

1357 

1358 # Return the report data with backwards-compatible fields 

1359 # Examples expect 'summary', 'sources', 'findings' at top level 

1360 safe_metadata = strip_settings_snapshot(metadata) 

1361 return jsonify( 

1362 { 

1363 "content": content, 

1364 # Backwards-compatible fields for examples 

1365 "summary": content, # The markdown report is the summary 

1366 "sources": sources, 

1367 "findings": safe_metadata.get("findings", []), 

1368 "metadata": { 

1369 "title": research.title if research.title else None, 

1370 "query": research.query, 

1371 "mode": research.mode if research.mode else None, 

1372 "created_at": research.created_at 

1373 if research.created_at 

1374 else None, 

1375 "completed_at": research.completed_at 

1376 if research.completed_at 

1377 else None, 

1378 "report_path": research.report_path, 

1379 **safe_metadata, 

1380 }, 

1381 } 

1382 ) 

1383 

1384 except Exception: 

1385 logger.exception("Error getting research report") 

1386 return jsonify({"error": "An internal error has occurred"}), 500 

1387 

1388 

1389@research_bp.route( 

1390 "/api/v1/research/<research_id>/export/<format>", methods=["POST"] 

1391) 

1392@login_required 

1393def export_research_report(research_id, format): 

1394 """Export research report to different formats (LaTeX, Quarto, RIS, PDF, ODT, etc.)""" 

1395 try: 

1396 # Use the exporter registry to validate format 

1397 from ...exporters import ExporterRegistry 

1398 

1399 if not ExporterRegistry.is_format_supported(format): 

1400 available = ExporterRegistry.get_available_formats() 

1401 return jsonify( 

1402 { 

1403 "error": f"Invalid format. Available formats: {', '.join(available)}" 

1404 } 

1405 ), 400 

1406 

1407 # Get research from database 

1408 username = session["username"] 

1409 

1410 try: 

1411 with get_user_db_session(username) as db_session: 

1412 research = ( 

1413 db_session.query(ResearchHistory) 

1414 .filter_by(id=research_id) 

1415 .first() 

1416 ) 

1417 if not research: 

1418 return jsonify({"error": "Research not found"}), 404 

1419 

1420 # Build the full assembled report (answer + Sources + 

1421 # Metrics) so exporters get the same shape they did 

1422 # before the report_content refactor. 

1423 from ..services.report_assembly_service import ( 

1424 assemble_full_report, 

1425 ) 

1426 

1427 report_content = assemble_full_report(research, db_session) 

1428 if report_content is None: 

1429 return jsonify({"error": "Report content not found"}), 404 

1430 

1431 # Export to requested format (all in memory) 

1432 try: 

1433 # Use title or query for the PDF title 

1434 pdf_title = research.title or research.query 

1435 

1436 # Generate export content in memory 

1437 export_content, filename, mimetype = ( 

1438 export_report_to_memory( 

1439 report_content, format, title=pdf_title 

1440 ) 

1441 ) 

1442 

1443 # Send the file directly from memory 

1444 return send_file( 

1445 io.BytesIO(export_content), 

1446 as_attachment=True, 

1447 download_name=filename, 

1448 mimetype=mimetype, 

1449 ) 

1450 except MissingPDFDependencyError: 

1451 logger.exception( 

1452 "PDF export failed: WeasyPrint unavailable" 

1453 ) 

1454 return jsonify( 

1455 {"error": get_weasyprint_install_instructions()} 

1456 ), 500 

1457 except Exception: 

1458 logger.exception("Error exporting report") 

1459 return jsonify( 

1460 { 

1461 "error": f"Failed to export to {format}. Please try again later." 

1462 } 

1463 ), 500 

1464 

1465 except Exception: 

1466 logger.exception("Error in export endpoint") 

1467 return jsonify({"error": "An internal error has occurred"}), 500 

1468 

1469 except Exception: 

1470 logger.exception("Unexpected error in export endpoint") 

1471 return jsonify({"error": "An internal error has occurred"}), 500 

1472 

1473 

1474@research_bp.route("/api/research/<string:research_id>/status") 

1475@limiter.exempt 

1476@login_required 

1477def get_research_status(research_id): 

1478 """Get the status of a research process""" 

1479 username = session["username"] 

1480 

1481 try: 

1482 with get_user_db_session(username) as db_session: 

1483 research = ( 

1484 db_session.query(ResearchHistory) 

1485 .filter_by(id=research_id) 

1486 .first() 

1487 ) 

1488 

1489 if research is None: 

1490 return jsonify({"error": "Research not found"}), 404 

1491 

1492 status = research.status 

1493 progress = research.progress 

1494 completed_at = research.completed_at 

1495 report_path = research.report_path 

1496 metadata = research.research_meta or {} 

1497 

1498 # Extract and format error information for better UI display 

1499 error_info = {} 

1500 if metadata and "error" in metadata: 

1501 error_msg = metadata["error"] 

1502 error_type = "unknown" 

1503 

1504 # Detect specific error types 

1505 if "timeout" in error_msg.lower(): 

1506 error_type = "timeout" 

1507 error_info = { 

1508 "type": "timeout", 

1509 "message": "LLM service timed out during synthesis. This may be due to high server load or connectivity issues.", 

1510 "suggestion": "Try again later or use a smaller query scope.", 

1511 } 

1512 elif ( 

1513 "token limit" in error_msg.lower() 

1514 or "context length" in error_msg.lower() 

1515 ): 

1516 error_type = "token_limit" 

1517 error_info = { 

1518 "type": "token_limit", 

1519 "message": "The research query exceeded the AI model's token limit during synthesis.", 

1520 "suggestion": "Try using a more specific query or reduce the research scope.", 

1521 } 

1522 elif ( 

1523 "final answer synthesis fail" in error_msg.lower() 

1524 or "llm error" in error_msg.lower() 

1525 ): 

1526 error_type = "llm_error" 

1527 error_info = { 

1528 "type": "llm_error", 

1529 "message": "The AI model encountered an error during final answer synthesis.", 

1530 "suggestion": "Check that your LLM service is running correctly or try a different model.", 

1531 } 

1532 elif "ollama" in error_msg.lower(): 

1533 error_type = "ollama_error" 

1534 error_info = { 

1535 "type": "ollama_error", 

1536 "message": "The Ollama service is not responding properly.", 

1537 "suggestion": "Make sure Ollama is running with 'ollama serve' and the model is downloaded.", 

1538 } 

1539 elif "connection" in error_msg.lower(): 

1540 error_type = "connection" 

1541 error_info = { 

1542 "type": "connection", 

1543 "message": "Connection error with the AI service.", 

1544 "suggestion": "Check your internet connection and AI service status.", 

1545 } 

1546 elif metadata.get("solution"): 

1547 # Use the solution provided in metadata if available 

1548 error_info = { 

1549 "type": error_type, 

1550 "message": error_msg, 

1551 "suggestion": str(metadata.get("solution")), 

1552 } 

1553 else: 

1554 # Generic error with the original message 

1555 error_info = { 

1556 "type": error_type, 

1557 "message": error_msg, 

1558 "suggestion": "Try again with a different query or check the application logs.", 

1559 } 

1560 

1561 # Get the latest milestone log for this research 

1562 latest_milestone = None 

1563 try: 

1564 milestone_log = ( 

1565 db_session.query(ResearchLog) 

1566 .filter_by(research_id=research_id, level="MILESTONE") 

1567 .order_by(ResearchLog.timestamp.desc()) 

1568 .first() 

1569 ) 

1570 if milestone_log: 

1571 latest_milestone = { 

1572 "message": milestone_log.message, 

1573 "time": milestone_log.timestamp.isoformat() 

1574 if milestone_log.timestamp 

1575 else None, 

1576 "type": "MILESTONE", 

1577 } 

1578 logger.debug( 

1579 f"Found latest milestone for research {research_id}: {milestone_log.message}" 

1580 ) 

1581 else: 

1582 logger.debug( 

1583 f"No milestone logs found for research {research_id}" 

1584 ) 

1585 except Exception: 

1586 logger.warning("Error fetching latest milestone") 

1587 

1588 filtered_metadata = strip_settings_snapshot(metadata) 

1589 if error_info: 

1590 filtered_metadata["error_info"] = error_info 

1591 

1592 response_data = { 

1593 "status": status, 

1594 "progress": progress, 

1595 "completed_at": completed_at, 

1596 "report_path": report_path, 

1597 "metadata": filtered_metadata, 

1598 } 

1599 

1600 # Include latest milestone as a log_entry for frontend compatibility 

1601 if latest_milestone: 

1602 response_data["log_entry"] = latest_milestone 

1603 

1604 return jsonify(response_data) 

1605 except Exception: 

1606 logger.exception("Error getting research status") 

1607 return jsonify({"error": "Error checking research status"}), 500 

1608 

1609 

1610@research_bp.route("/api/queue/status", methods=["GET"]) 

1611@login_required 

1612def get_queue_status(): 

1613 """Get the current queue status for the user""" 

1614 username = session["username"] 

1615 

1616 from ..queue import QueueManager 

1617 

1618 try: 

1619 queue_items = QueueManager.get_user_queue(username) 

1620 

1621 return jsonify( 

1622 { 

1623 "status": "success", 

1624 "queue": queue_items, 

1625 "total": len(queue_items), 

1626 } 

1627 ) 

1628 except Exception: 

1629 logger.exception("Error getting queue status") 

1630 return jsonify( 

1631 {"status": "error", "message": "Failed to process request"} 

1632 ), 500 

1633 

1634 

1635@research_bp.route("/api/queue/<string:research_id>/position", methods=["GET"]) 

1636@login_required 

1637def get_queue_position(research_id): 

1638 """Get the queue position for a specific research""" 

1639 username = session["username"] 

1640 

1641 from ..queue import QueueManager 

1642 

1643 try: 

1644 position = QueueManager.get_queue_position(username, research_id) 

1645 

1646 if position is None: 1646 ↛ 1647line 1646 didn't jump to line 1647 because the condition on line 1646 was never true

1647 return jsonify( 

1648 {"status": "error", "message": "Research not found in queue"} 

1649 ), 404 

1650 

1651 return jsonify({"status": "success", "position": position}) 

1652 except Exception: 

1653 logger.exception("Error getting queue position") 

1654 return jsonify( 

1655 {"status": "error", "message": "Failed to process request"} 

1656 ), 500 

1657 

1658 

1659@research_bp.route("/api/config/limits", methods=["GET"]) 

1660@login_required 

1661def get_upload_limits(): 

1662 """ 

1663 Get file upload configuration limits. 

1664 

1665 Returns the backend's authoritative limits for file uploads, 

1666 allowing the frontend to stay in sync without hardcoding values. 

1667 """ 

1668 return jsonify( 

1669 { 

1670 "max_file_size": FileUploadValidator.MAX_FILE_SIZE, 

1671 "max_files": FileUploadValidator.MAX_FILES_PER_REQUEST, 

1672 "allowed_mime_types": list(FileUploadValidator.ALLOWED_MIME_TYPES), 

1673 } 

1674 ) 

1675 

1676 

1677@research_bp.route("/api/upload/pdf", methods=["POST"]) 

1678@login_required 

1679@upload_rate_limit_user 

1680@upload_rate_limit_ip 

1681def upload_pdf(): 

1682 """ 

1683 Upload and extract text from PDF files with comprehensive security validation. 

1684 

1685 Security features: 

1686 - Rate limiting (10 uploads/min, 100/hour per user) 

1687 - File size validation (50MB max per file) 

1688 - File count validation (100 files max) 

1689 - PDF structure validation 

1690 - MIME type validation 

1691 

1692 Performance improvements: 

1693 - Single-pass PDF processing (text + metadata) 

1694 - Optimized extraction service 

1695 """ 

1696 try: 

1697 # Early request size validation (before reading any files) 

1698 # This prevents memory exhaustion from chunked encoding attacks 

1699 max_request_size = ( 

1700 FileUploadValidator.MAX_FILES_PER_REQUEST 

1701 * FileUploadValidator.MAX_FILE_SIZE 

1702 ) 

1703 if request.content_length and request.content_length > max_request_size: 1703 ↛ 1704line 1703 didn't jump to line 1704 because the condition on line 1703 was never true

1704 return jsonify( 

1705 { 

1706 "error": f"Request too large. Maximum size is {max_request_size // (1024 * 1024)}MB" 

1707 } 

1708 ), 413 

1709 

1710 # Check if files are present in the request 

1711 if "files" not in request.files: 

1712 return jsonify({"error": "No files provided"}), 400 

1713 

1714 files = request.files.getlist("files") 

1715 if not files or files[0].filename == "": 

1716 return jsonify({"error": "No files selected"}), 400 

1717 

1718 # Validate file count 

1719 is_valid, error_msg = FileUploadValidator.validate_file_count( 

1720 len(files) 

1721 ) 

1722 if not is_valid: 

1723 return jsonify({"error": error_msg}), 400 

1724 

1725 # Get PDF extraction service 

1726 pdf_service = get_pdf_extraction_service() 

1727 

1728 extracted_texts = [] 

1729 total_files = len(files) 

1730 processed_files = 0 

1731 errors = [] 

1732 

1733 for file in files: 

1734 if not file or not file.filename: 1734 ↛ 1735line 1734 didn't jump to line 1735 because the condition on line 1734 was never true

1735 errors.append("Unnamed file: Skipped") 

1736 continue 

1737 

1738 try: 

1739 filename = sanitize_filename( 

1740 file.filename, allowed_extensions={".pdf"} 

1741 ) 

1742 except UnsafeFilenameError: 

1743 errors.append("Rejected file: invalid or disallowed filename") 

1744 continue 

1745 

1746 try: 

1747 # Read file content (with disk spooling, large files are read from temp file) 

1748 pdf_content = file.read() 

1749 

1750 # Comprehensive validation 

1751 is_valid, error_msg = FileUploadValidator.validate_upload( 

1752 filename=filename, 

1753 file_content=pdf_content, 

1754 content_length=file.content_length, 

1755 ) 

1756 

1757 if not is_valid: 

1758 errors.append(f"{filename}: {error_msg}") 

1759 continue 

1760 

1761 # Extract text and metadata in single pass (performance fix) 

1762 result = pdf_service.extract_text_and_metadata( 

1763 pdf_content, filename 

1764 ) 

1765 

1766 if result["success"]: 

1767 extracted_texts.append( 

1768 { 

1769 "filename": result["filename"], 

1770 "text": result["text"], 

1771 "size": result["size"], 

1772 "pages": result["pages"], 

1773 } 

1774 ) 

1775 processed_files += 1 

1776 else: 

1777 errors.append(f"{filename}: {result['error']}") 

1778 

1779 except Exception: 

1780 logger.exception(f"Error processing {filename}") 

1781 errors.append(f"{filename}: Error processing file") 

1782 finally: 

1783 # Close the file stream to release resources 

1784 try: 

1785 file.close() 

1786 except Exception: 

1787 logger.debug("best-effort file stream close", exc_info=True) 

1788 

1789 # Prepare response 

1790 response_data = { 

1791 "status": "success", 

1792 "processed_files": processed_files, 

1793 "total_files": total_files, 

1794 "extracted_texts": extracted_texts, 

1795 "combined_text": "\n\n".join( 

1796 [ 

1797 f"--- From {item['filename']} ---\n{item['text']}" 

1798 for item in extracted_texts 

1799 ] 

1800 ), 

1801 "errors": errors, 

1802 } 

1803 

1804 if processed_files == 0: 

1805 return jsonify( 

1806 { 

1807 "status": "error", 

1808 "message": "No files were processed successfully", 

1809 "errors": errors, 

1810 } 

1811 ), 400 

1812 

1813 return jsonify(response_data) 

1814 

1815 except Exception: 

1816 logger.exception("Error processing PDF upload") 

1817 return jsonify({"error": "Failed to process PDF files"}), 500