Coverage for src/local_deep_research/web/services/research_service.py: 81%

916 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1import hashlib 

2import json 

3import re 

4import threading 

5import time 

6from datetime import datetime, UTC 

7from pathlib import Path 

8 

9from loguru import logger 

10 

11from ...exceptions import DuplicateResearchError, ResearchTerminatedException 

12from ...config.llm_config import get_llm 

13from ...settings.manager import SnapshotSettingsContext 

14 

15# Output directory for research results 

16from ...config.paths import get_research_outputs_directory 

17from ...config.search_config import get_search 

18from ...constants import ResearchStatus 

19from ...database.models import ResearchHistory, ResearchStrategy 

20from ...database.session_context import get_user_db_session 

21from ...database.thread_local_session import thread_cleanup 

22from ...error_handling.openai_compat_errors import ( 

23 friendly_openai_compatible_error, 

24 is_openai_compat_runtime_error, 

25) 

26from ...error_handling.report_generator import ErrorReportGenerator 

27from ...utilities.thread_context import set_search_context 

28from ...report_generator import IntegratedReportGenerator 

29from ...search_system import AdvancedSearchSystem 

30from ...text_optimization import CitationFormatter, CitationMode 

31from ...utilities.log_utils import log_for_research 

32from ...utilities.search_utilities import extract_links_from_search_results 

33from ...utilities.threading_utils import thread_context, thread_with_app_context 

34from ..models.database import calculate_duration 

35from ...settings.env_registry import get_env_setting 

36from .socket_service import SocketIOService 

37 

38OUTPUT_DIR = get_research_outputs_directory() 

39 

40 

41# Global concurrent research limit (server-wide, across all users) 

42_MAX_GLOBAL_CONCURRENT = get_env_setting( 

43 "server.max_concurrent_research", default=10 

44) 

45_global_research_semaphore = threading.Semaphore(_MAX_GLOBAL_CONCURRENT) 

46 

47# Socket.IO emission throttling: minimum interval between progress emissions per research 

48_EMIT_THROTTLE_SECONDS = 0.2 # 200ms 

49_EMIT_TTL_SECONDS = 3600 # 1 hour — evict stale entries from orphaned research 

50 

51# Cap on the partial-content buffer kept server-side so chat-mode termination 

52# can persist whatever was already streamed. Bounded to keep memory predictable 

53# under pathologically long answers (typical answers are a few KB). 

54_MAX_PARTIAL_BUFFER_BYTES = 256 * 1024 # 256 KB 

55_emit_cleanup_counter = 0 

56_last_emit_times: dict[str, float] = {} 

57_last_emit_lock = threading.Lock() 

58 

59# Phases that produce user-visible step messages in chat mode. 

60# "complete" is excluded — it fires AFTER the response message is written, 

61# which would create a step with a higher sequence_number than the response. 

62_STEP_PHASES = frozenset( 

63 { 

64 "init", 

65 "setup", 

66 "search_planning", 

67 "search", 

68 "observation", 

69 "output_generation", 

70 "synthesis_error", 

71 "synthesis_fallback", 

72 "report_generation", 

73 "report_complete", 

74 "error", 

75 } 

76) 

77 

78 

79def _chat_step_decision( 

80 phase: str | None, 

81 last_step_phase: str | None, 

82 is_final: bool, 

83) -> tuple[bool, bool]: 

84 """Decide whether to persist + emit a chat-mode step event. 

85 

86 Encodes the symmetry invariant the progress_callback in 

87 run_research_process enforces: for chat sessions, what the live UI 

88 surfaces over the socket must equal what `loadSession` reconstructs 

89 from chat_progress_steps on reload. The repeat-phase dedup must 

90 therefore block BOTH writes, not just the DB write. 

91 

92 Returns: 

93 (persist, suppress_emit) 

94 - persist: True iff add_progress_step should be called for this event. 

95 - suppress_emit: True iff the caller should null the socket payload 

96 so the emit is dropped. Only suppressed when this is a non-final 

97 repeat — final phases (complete/error/report_complete) always 

98 emit so the client completion handler fires. 

99 

100 Args: 

101 phase: the phase tag from this event (e.g. "search", "observation") 

102 last_step_phase: phase tag of the previously persisted chat step 

103 (None until the first persist of this research). 

104 is_final: True if this is a "final" phase event the client must 

105 see to fire its completion handler (complete | error | 

106 report_complete | progress==100). 

107 """ 

108 if phase not in _STEP_PHASES: 

109 # Not a chat-step phase at all (e.g. "complete"). Persist no, 

110 # and let the emit through unchanged — completion / control 

111 # events still need to reach the client. 

112 return False, False 

113 dedup_ok = phase != last_step_phase or phase == "observation" 

114 if dedup_ok: 

115 return True, False 

116 return False, not is_final 

117 

118 

119def _parse_research_metadata(research_meta) -> dict: 

120 """Parse research_meta into a dict, handling both dict and JSON string types.""" 

121 if isinstance(research_meta, dict): 

122 return dict(research_meta) 

123 if isinstance(research_meta, str): 

124 try: 

125 parsed = json.loads(research_meta) 

126 return dict(parsed) if isinstance(parsed, dict) else {} 

127 except json.JSONDecodeError: 

128 logger.exception("Failed to parse research_meta as JSON") 

129 return {} 

130 return {} 

131 

132 

133def _extract_synthesized_answer(results: dict) -> str: 

134 """Pull the LLM-synthesized answer out of a strategy result dict. 

135 

136 ``report_content`` must store ONLY the synthesized answer (LLM 

137 prose with [N] inline citations). The strategy's 

138 ``formatted_findings`` is the full ``format_findings`` blob — 

139 answer + ``format_links_to_markdown`` source list + 

140 ``## SEARCH QUESTIONS BY ITERATION`` + ``## DETAILED FINDINGS`` 

141 + ``## ALL SOURCES`` — and ``format_document_split`` only knows 

142 how to strip ``## Sources`` headers, not those other sections. 

143 Saving the blob would leak sources/findings into ``report_content``. 

144 

145 Resolution order: 

146 1. ``Final synthesis`` finding (set by source_based, parallel, 

147 rapid, focused_iteration, iterdrag). 

148 2. ``current_knowledge`` (other strategies expose the answer 

149 there). 

150 3. Empty string — caller decides whether to fall back further. 

151 """ 

152 for finding in results.get("findings") or []: 

153 if finding.get("phase") == "Final synthesis": 

154 content = finding.get("content") or "" 

155 if content: 

156 return content 

157 return results.get("current_knowledge") or "" 

158 

159 

160def get_citation_formatter(): 

161 """Get citation formatter with settings from thread context.""" 

162 # Import here to avoid circular imports 

163 from ...config.search_config import get_setting_from_snapshot 

164 

165 citation_format = get_setting_from_snapshot( 

166 "report.citation_format", "number_hyperlinks" 

167 ) 

168 mode_map = { 

169 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS, 

170 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS, 

171 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS, 

172 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS, 

173 "source_tagged_hyperlinks": CitationMode.SOURCE_TAGGED_HYPERLINKS, 

174 "no_hyperlinks": CitationMode.NO_HYPERLINKS, 

175 } 

176 mode = mode_map.get(citation_format, CitationMode.NUMBER_HYPERLINKS) 

177 return CitationFormatter(mode=mode) 

178 

179 

180def export_report_to_memory( 

181 markdown_content: str, format: str, title: str | None = None 

182): 

183 """ 

184 Export a markdown report to different formats in memory. 

185 

186 Uses the modular exporter registry to support multiple formats. 

187 Available formats can be queried with ExporterRegistry.get_available_formats(). 

188 

189 Args: 

190 markdown_content: The markdown content to export 

191 format: Export format (e.g., 'pdf', 'odt', 'latex', 'quarto', 'ris') 

192 title: Optional title for the document 

193 

194 Returns: 

195 Tuple of (content_bytes, filename, mimetype) 

196 """ 

197 from ...exporters import ExporterRegistry, ExportOptions 

198 

199 # Normalize format 

200 format_lower = format.lower() 

201 

202 # Get exporter from registry 

203 exporter = ExporterRegistry.get_exporter(format_lower) 

204 

205 if exporter is None: 

206 available = ExporterRegistry.get_available_formats() 

207 raise ValueError( 

208 f"Unsupported export format: {format}. " 

209 f"Available formats: {', '.join(available)}" 

210 ) 

211 

212 # Title prepending is now handled by each exporter via _prepend_title_if_needed() 

213 # PDF and ODT exporters prepend titles; RIS and other formats ignore them 

214 

215 # Create options 

216 options = ExportOptions(title=title) 

217 

218 # Export 

219 result = exporter.export(markdown_content, options) 

220 

221 logger.info( 

222 f"Generated {format_lower} in memory, size: {len(result.content)} bytes" 

223 ) 

224 

225 return result.content, result.filename, result.mimetype 

226 

227 

228def save_research_strategy(research_id, strategy_name, username=None): 

229 """ 

230 Save the strategy used for a research to the database. 

231 

232 Args: 

233 research_id: The ID of the research 

234 strategy_name: The name of the strategy used 

235 username: The username for database access (required for thread context) 

236 """ 

237 try: 

238 logger.debug( 

239 f"save_research_strategy called with research_id={research_id}, strategy_name={strategy_name}" 

240 ) 

241 with get_user_db_session(username) as session: 

242 # Check if a strategy already exists for this research 

243 existing_strategy = ( 

244 session.query(ResearchStrategy) 

245 .filter_by(research_id=research_id) 

246 .first() 

247 ) 

248 

249 if existing_strategy: 

250 # Update existing strategy 

251 existing_strategy.strategy_name = strategy_name 

252 logger.debug( 

253 f"Updating existing strategy for research {research_id}" 

254 ) 

255 else: 

256 # Create new strategy record 

257 new_strategy = ResearchStrategy( 

258 research_id=research_id, strategy_name=strategy_name 

259 ) 

260 session.add(new_strategy) 

261 logger.debug( 

262 f"Creating new strategy record for research {research_id}" 

263 ) 

264 

265 session.commit() 

266 logger.info( 

267 f"Saved strategy '{strategy_name}' for research {research_id}" 

268 ) 

269 except Exception: 

270 logger.exception("Error saving research strategy") 

271 

272 

273def get_research_strategy(research_id, username=None): 

274 """ 

275 Get the strategy used for a research. 

276 

277 Args: 

278 research_id: The ID of the research 

279 username: The username for database access (required for thread context) 

280 

281 Returns: 

282 str: The strategy name or None if not found 

283 """ 

284 try: 

285 with get_user_db_session(username) as session: 

286 strategy = ( 

287 session.query(ResearchStrategy) 

288 .filter_by(research_id=research_id) 

289 .first() 

290 ) 

291 

292 return strategy.strategy_name if strategy else None 

293 except Exception: 

294 logger.exception("Error getting research strategy") 

295 return None 

296 

297 

298def start_research_process( 

299 research_id, 

300 query, 

301 mode, 

302 run_research_callback, 

303 **kwargs, 

304): 

305 """ 

306 Start a research process in a background thread. 

307 

308 Args: 

309 research_id: The ID of the research 

310 query: The research query 

311 mode: The research mode (quick/detailed) 

312 run_research_callback: The callback function to run the research 

313 **kwargs: Additional parameters to pass to the research process (model, search_engine, etc.) 

314 

315 Returns: 

316 threading.Thread: The thread running the research 

317 """ 

318 from ..routes.globals import check_and_start_research 

319 from ...exceptions import SystemAtCapacityError 

320 

321 # Acquire the global concurrency semaphore SYNCHRONOUSLY in the caller's 

322 # thread. Previously this happened inside the worker after the HTTP route 

323 # had already returned 200 — at capacity, the worker parked and the user 

324 # saw an infinite thinking spinner with the partial unique in-progress 

325 # index blocking retries. Surfacing capacity as an exception lets the 

326 # route return HTTP 429 before committing any DB state. 

327 if not _global_research_semaphore.acquire(blocking=False): 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true

328 raise SystemAtCapacityError( 

329 f"At research capacity (max {_MAX_GLOBAL_CONCURRENT} concurrent)" 

330 ) 

331 

332 # Pass the app context to the thread. 

333 run_research_callback = thread_with_app_context(run_research_callback) 

334 

335 # Wrap callback so the worker releases the already-held semaphore on exit. 

336 original_callback = run_research_callback 

337 

338 def _release_semaphore_on_exit(*args, **kw): 

339 try: 

340 return original_callback(*args, **kw) 

341 finally: 

342 _global_research_semaphore.release() 

343 

344 # Prepare (but do not start) the background thread. 

345 thread = threading.Thread( 

346 target=_release_semaphore_on_exit, 

347 args=( 

348 thread_context(), 

349 research_id, 

350 query, 

351 mode, 

352 ), 

353 kwargs=kwargs, 

354 ) 

355 thread.daemon = True 

356 

357 # Atomic check-and-start: refuses to spawn a second live thread 

358 # for the same research_id. Guards against the double-spawn window 

359 # where a post-spawn commit failure in the queue processor could 

360 # otherwise cause the retry loop to dispatch the same research twice. 

361 try: 

362 started = check_and_start_research( 

363 research_id, 

364 { 

365 "thread": thread, 

366 "progress": 0, 

367 "status": ResearchStatus.IN_PROGRESS, 

368 "log": [], 

369 "settings": kwargs, 

370 }, 

371 ) 

372 except Exception: 

373 # check_and_start_research raised before the thread ran — the 

374 # semaphore won't be released by the worker, so release it here. 

375 _global_research_semaphore.release() 

376 raise 

377 if not started: 

378 # No thread will run → no _release_semaphore_on_exit → release here. 

379 _global_research_semaphore.release() 

380 raise DuplicateResearchError( 

381 f"Research {research_id} already has a live thread" 

382 ) 

383 

384 return thread 

385 

386 

387def _generate_report_path(query: str) -> Path: 

388 """ 

389 Generates a path for a new report file based on the query. 

390 

391 Args: 

392 query: The query used for the report. 

393 

394 Returns: 

395 The path that it generated. 

396 

397 """ 

398 # Generate a unique filename that does not contain 

399 # non-alphanumeric characters. 

400 query_hash = hashlib.md5( # DevSkim: ignore DS126858 

401 query.encode("utf-8"), usedforsecurity=False 

402 ).hexdigest()[:10] 

403 return OUTPUT_DIR / ( 

404 f"research_report_{query_hash}_{int(datetime.now(UTC).timestamp())}.md" 

405 ) 

406 

407 

408def _save_chat_message_and_context( 

409 chat_session_id, 

410 research_id, 

411 username, 

412 report_content, 

413 streaming_enabled, 

414 streaming_state, 

415 socket_service, 

416 settings_snapshot=None, 

417): 

418 """Save assistant message to chat and update accumulated context.""" 

419 from ...chat.service import ChatService 

420 from ...chat.context import ChatContextManager 

421 

422 chat_service = ChatService(username) 

423 # chat_messages.content is NOT NULL. Write report_content 

424 # inline (snapshot pattern). Falls back to a placeholder marker only 

425 # if report_content is itself empty — the same pattern used by the 

426 # terminate path's _STOPPED_BEFORE_OUTPUT_MARKER. 

427 snapshot_content = report_content or _NO_OUTPUT_MARKER 

428 # allow_archived=True: a multi-tab race can flip the session to 

429 # archived between research.status=COMPLETED commit and this write. 

430 # Losing the final assistant answer is worse than violating the 

431 # "no writes to archived sessions" rule for a system-generated row; 

432 # user-message writes from chat/routes.py still keep the default. 

433 chat_service.add_message( 

434 session_id=chat_session_id, 

435 role="assistant", 

436 content=snapshot_content, 

437 message_type="response", 

438 research_id=research_id, 

439 allow_archived=True, 

440 ) 

441 # Mark the response row as persisted so the trailing 

442 # progress_callback("Research completed successfully", 100) — which 

443 # runs the termination check and could fire if the user clicks Stop 

444 # in the small window between this write and the final emit — does 

445 # NOT write a duplicate row via _save_partial_chat_message_on_terminate. 

446 if streaming_state is not None: 446 ↛ 448line 446 didn't jump to line 448 because the condition on line 446 was always true

447 streaming_state["_persisted"] = True 

448 logger.info(f"Added research result to chat {chat_session_id[:8]}...") 

449 

450 try: 

451 # report_content is the answer-only string; no extraction needed. 

452 chat_content = report_content or "" 

453 ctx_manager = ChatContextManager( 

454 session_id=chat_session_id, 

455 messages=[], 

456 settings_snapshot=settings_snapshot, 

457 ) 

458 updates = ctx_manager.extract_context_updates(new_content=chat_content) 

459 chat_service.update_accumulated_context( 

460 session_id=chat_session_id, **updates 

461 ) 

462 logger.info( 

463 f"Updated accumulated context for chat {chat_session_id[:8]}..." 

464 ) 

465 except Exception: 

466 # Bumped from debug to warning: a failed accumulated-context 

467 # update silently degrades multi-turn context for the next 

468 # follow-up in this chat (entities/topics/source counts are 

469 # missing). Ops need visibility to catch widespread breakage 

470 # before users notice "the AI forgot what we were talking 

471 # about" — debug-level was invisible in production. 

472 logger.opt(exception=True).warning( 

473 "Could not update accumulated context" 

474 ) 

475 

476 if streaming_enabled and streaming_state.get("chunks_sent", 0) > 0: 476 ↛ exitline 476 didn't return from function '_save_chat_message_and_context' because the condition on line 476 was always true

477 # Flush any partial-bracket fragment held in the citation carry 

478 # buffer before sending the final empty sentinel. Without this, 

479 # a stream that ends mid-token (LLM emits "[12" as its last 

480 # bytes before closing) silently drops the leading "[" from 

481 # what the client renders — the carry would be discarded when 

482 # the callback's closure goes out of scope. 

483 flush = streaming_state.get("_flush_carry") 

484 if flush: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true

485 leftover = flush() 

486 if leftover: 

487 try: 

488 socket_service.emit_to_subscribers( 

489 "response_chunk", 

490 research_id, 

491 { 

492 "chunk": leftover, 

493 "is_streaming": True, 

494 "is_final": False, 

495 }, 

496 ) 

497 except Exception: 

498 logger.debug( 

499 "Carry-buffer flush emit failed (non-critical)" 

500 ) 

501 socket_service.emit_to_subscribers( 

502 "response_chunk", 

503 research_id, 

504 {"chunk": "", "is_streaming": True, "is_final": True}, 

505 ) 

506 

507 

508_STOPPED_BEFORE_OUTPUT_MARKER = "[Stopped before any output was generated.]" 

509_STOPPED_FOOTER = "\n\n_— Stopped by user._" 

510_NO_OUTPUT_MARKER = "_(Research completed without producing output.)_" 

511 

512 

513# Match a trailing incomplete citation opener at the chunk boundary. 

514# Covers both ASCII "[" and the lenticular "【" (U+3010) — some LLMs 

515# emit Chinese-style brackets and the citation formatter accepts them, 

516# so we have to hold those back the same way to avoid breaking a token 

517# across the next chunk. 

518_PARTIAL_BRACKET_RE = re.compile(r"[\[【]\d*$") 

519 

520# Upper bound on the inline-citation carry buffer. A real citation token 

521# is a handful of bytes ("[123"); anything longer means the "[" wasn't a 

522# citation opener or the stream is pathological. Flush raw past this so a 

523# never-closing "[" + endless digits can't grow the buffer without limit. 

524_MAX_CARRY_BYTES = 64 

525 

526 

527def _make_chat_stream_callback( 

528 research_id, 

529 streaming_state, 

530 socket_service, 

531 source_resolver=None, 

532 formatter=None, 

533): 

534 """Build the chat-mode streaming callback. 

535 

536 The callback: 

537 * Counts chunks (``chunks_sent``). 

538 * Buffers RAW chunks in ``streaming_state['chunks']`` so partial 

539 content survives termination — the citation handler's local list 

540 is discarded on raise. Capped at ``_MAX_PARTIAL_BUFFER_BYTES``; 

541 once capped, ``streaming_state['_truncated']`` flips to True and 

542 further chunks aren't accumulated server-side. 

543 * Raises ``ResearchTerminatedException`` if the user clicked Stop 

544 mid-stream — fails fast instead of letting the LLM finish. 

545 ``ResearchTerminatedException`` inherits from ``BaseException``, 

546 so the citation handler's ``except Exception`` blocks naturally 

547 propagate it. 

548 * Emits ``response_chunk`` over Socket.IO for live display, with 

549 inline citation hyperlinks applied per-chunk when both 

550 ``source_resolver`` and ``formatter`` are provided — so the 

551 client sees ``[[arxiv.org-1]](url)`` appearing live rather than 

552 plain ``[1]`` brackets that only get hyperlinked after the full 

553 response saves. Bracket tokens split across chunk boundaries are 

554 held in a small carry buffer until the closing ``]`` arrives. 

555 

556 ``source_resolver`` — optional ``() -> list[dict]`` returning the 

557 current ``all_links_of_system`` (so we read it lazily; the list 

558 grows as the agent collects sources). 

559 ``formatter`` — optional :class:`CitationFormatter` instance. Its 

560 ``mode`` controls the inline format (``[[arxiv.org-1]](url)`` 

561 etc.), matching what the final-save formatter produces so the 

562 live display doesn't mode-shift when ``handleResearchComplete`` 

563 swaps in the DB-saved version. 

564 

565 Extracted to module level so it can be unit-tested without spinning 

566 up the full ``run_research_process``. 

567 """ 

568 

569 # Per-call closure state for the streaming-substitution carry 

570 # buffer (holds a trailing incomplete bracket like "[12" so the 

571 # closing "]3]" on the next chunk completes the citation token). 

572 carry = [""] 

573 

574 def _flush_carry() -> str: 

575 """Release and clear any held partial-bracket fragment. 

576 

577 The completion finalizer (``_save_chat_message_and_context``) 

578 lives in a different scope and can't reach ``carry`` directly, 

579 so it calls this through ``streaming_state['_flush_carry']`` to 

580 avoid silently dropping the tail of a stream that ends mid-token 

581 like ``"[12"``. 

582 """ 

583 released, carry[0] = carry[0], "" 

584 return released 

585 

586 # Expose to the completion path via the shared state dict. 

587 streaming_state["_flush_carry"] = _flush_carry 

588 

589 def _hyperlink_chunk(chunk: str) -> str: 

590 """Apply inline citation hyperlinks to a single chunk. 

591 

592 Maintains the closure-level ``carry`` buffer for incomplete 

593 ``[N`` tokens straddling chunk boundaries. The carry contains 

594 the trailing partial-bracket fragment from the previous chunk 

595 that we haven't been able to substitute yet — it's prepended 

596 to the next chunk so the regex can see the full token. The 

597 DELTA returned is what the client should append next: it 

598 includes the just-completed carry (now hyperlinked) plus the 

599 new chunk's safe portion, MINUS the new chunk's own trailing 

600 partial bracket (which becomes the new carry). 

601 """ 

602 if source_resolver is None or formatter is None: 

603 return chunk 

604 try: 

605 sources = source_resolver() or [] 

606 if not sources: 606 ↛ 609line 606 didn't jump to line 609 because the condition on line 606 was never true

607 # Reset carry so the leading "[" we held onto doesn't 

608 # disappear from the client's accumulated text. 

609 released, carry[0] = carry[0], "" 

610 return released + chunk 

611 text = carry[0] + chunk 

612 pending = _PARTIAL_BRACKET_RE.search(text) 

613 if pending: 

614 safe = text[: pending.start()] 

615 new_carry = text[pending.start() :] 

616 # Bound the carry. A well-formed citation token is a few 

617 # bytes (`[12`); if the held fragment grows past this, the 

618 # "[" was not actually opening a citation (or a hostile / 

619 # misbehaving LLM is streaming `[` + endless digits with no 

620 # closing `]`). Flush it raw rather than buffering without 

621 # limit — preserves the text, just doesn't hyperlink it. 

622 if len(new_carry) > _MAX_CARRY_BYTES: 

623 safe = text 

624 carry[0] = "" 

625 else: 

626 carry[0] = new_carry 

627 else: 

628 safe = text 

629 carry[0] = "" 

630 if not safe: 630 ↛ 631line 630 didn't jump to line 631 because the condition on line 630 was never true

631 return "" 

632 return formatter.apply_inline_hyperlinks(safe, sources) 

633 except Exception: 

634 # Hyperlinking is quality-of-life. On any failure fall back 

635 # to emitting the raw chunk so the user still sees the text. 

636 logger.debug( 

637 "Inline citation hyperlinking failed; emitting raw chunk", 

638 exc_info=True, 

639 ) 

640 released, carry[0] = carry[0], "" 

641 return released + chunk 

642 

643 def stream_callback(chunk: str): 

644 # Resolve through the module namespace each call so tests can 

645 # ``patch("local_deep_research.web.routes.globals.is_termination_requested")``. 

646 # Cached in sys.modules — negligible cost. 

647 from ..routes.globals import is_termination_requested 

648 

649 if not chunk: 

650 return 

651 streaming_state["chunks_sent"] += 1 

652 if not streaming_state["_truncated"]: 652 ↛ 673line 652 didn't jump to line 673 because the condition on line 652 was always true

653 chunk_bytes = len(chunk.encode("utf-8")) 

654 if ( 

655 streaming_state["_bytes"] + chunk_bytes 

656 <= _MAX_PARTIAL_BUFFER_BYTES 

657 ): 

658 # IMPORTANT: store the RAW chunk in the partial-content 

659 # buffer (terminate handler joins these and saves them as 

660 # the partial assistant message). If we stored the 

661 # hyperlinked version, the saved-on-terminate text would 

662 # be double-formatted when the user resumes. 

663 streaming_state["chunks"].append(chunk) 

664 streaming_state["_bytes"] += chunk_bytes 

665 else: 

666 streaming_state["_truncated"] = True 

667 logger.warning( 

668 f"Partial-content buffer hit {_MAX_PARTIAL_BUFFER_BYTES} bytes " 

669 f"for research {research_id}; further chunks won't be persisted on terminate" 

670 ) 

671 # Mid-stream interrupt — fail fast if the user clicked Stop while 

672 # the LLM is still streaming. 

673 if is_termination_requested(research_id): 

674 raise ResearchTerminatedException( # noqa: TRY301 — propagated through citation handler 

675 "Research was terminated by user during streaming" 

676 ) 

677 # Apply citation hyperlinks for the client emit only — the 

678 # client accumulates substituted text into the streaming bubble 

679 # so the user sees [[arxiv.org-1]](url) appearing live as the 

680 # model writes "According to [1]…". 

681 display_chunk = _hyperlink_chunk(chunk) 

682 if not display_chunk: 682 ↛ 683line 682 didn't jump to line 683 because the condition on line 682 was never true

683 return # nothing safe to emit yet (all held in carry buffer) 

684 try: 

685 socket_service.emit_to_subscribers( 

686 "response_chunk", 

687 research_id, 

688 { 

689 "chunk": display_chunk, 

690 "is_streaming": True, 

691 "is_final": False, 

692 }, 

693 ) 

694 except Exception: 

695 logger.debug("Stream chunk emit failed (non-critical)") 

696 

697 return stream_callback 

698 

699 

700def _save_partial_chat_message_on_terminate( 

701 chat_session_id, 

702 research_id, 

703 username, 

704 partial_content, 

705 truncated=False, 

706 streaming_state=None, 

707): 

708 """Persist a chat 'response' row capturing whatever was streamed before 

709 termination, and emit a final ``response_chunk`` so the client strips 

710 the streaming class from the bubble. 

711 

712 Must run BEFORE ``handle_termination()`` because that path runs 

713 ``cleanup_research_resources()`` which removes the Socket.IO room 

714 subscriptions — anything emitted afterwards goes nowhere. 

715 

716 Idempotent: when ``streaming_state`` is supplied, sets a ``_persisted`` 

717 flag so duplicate calls (one from the progress callback, one from the 

718 outer except handler when a stream_callback raises mid-stream) only 

719 write a single row. 

720 

721 Skips silently when ``chat_session_id`` is falsy (single-turn case). 

722 All failures are swallowed — termination cleanup must never crash the 

723 worker. 

724 """ 

725 if not chat_session_id: 

726 return 

727 if streaming_state is not None and streaming_state.get("_persisted"): 

728 return 

729 try: 

730 from ...chat.service import ChatService 

731 

732 if partial_content: 

733 content = partial_content + _STOPPED_FOOTER 

734 if truncated: 

735 content += " _(output was very long; truncated.)_" 

736 else: 

737 content = _STOPPED_BEFORE_OUTPUT_MARKER 

738 

739 # allow_archived=True: same rationale as the completion path — 

740 # the partial response on Stop is system-generated and must 

741 # survive a concurrent archive (see _save_chat_message_and_context). 

742 ChatService(username).add_message( 

743 session_id=chat_session_id, 

744 role="assistant", 

745 content=content, 

746 message_type="response", 

747 research_id=research_id, 

748 allow_archived=True, 

749 ) 

750 # Set the idempotency flag ONLY after the write succeeds. If 

751 # `add_message` raises (DB lock, encryption error, archived 

752 # session), the outer ResearchTerminatedException handler retries 

753 # this helper — flipping the flag pre-write would short-circuit 

754 # the retry and silently lose the partial response. 

755 if streaming_state is not None: 

756 streaming_state["_persisted"] = True 

757 logger.info( 

758 f"Persisted partial chat response for terminated research " 

759 f"{research_id} ({len(content)} chars)" 

760 ) 

761 except Exception: 

762 logger.opt(exception=True).warning( 

763 "Failed to persist partial chat message on terminate" 

764 ) 

765 

766 try: 

767 SocketIOService().emit_to_subscribers( 

768 "response_chunk", 

769 research_id, 

770 {"chunk": "", "is_streaming": True, "is_final": True}, 

771 ) 

772 except Exception: 

773 logger.debug("Final-chunk emit on terminate failed (non-critical)") 

774 

775 

776@log_for_research 

777@thread_cleanup 

778def run_research_process(research_id, query, mode, **kwargs): 

779 """ 

780 Run the research process in the background for a given research ID. 

781 

782 Args: 

783 research_id: The ID of the research 

784 query: The research query 

785 mode: The research mode (quick/detailed) 

786 **kwargs: Additional parameters for the research (model_provider, model, search_engine, etc.) 

787 MUST include 'username' for database access 

788 """ 

789 from ..routes.globals import ( 

790 is_research_active, 

791 is_termination_requested, 

792 update_progress_and_check_active, 

793 ) 

794 

795 # Extract username - required for database access 

796 username = kwargs.get("username") 

797 if not username: 

798 logger.error("No username provided to research thread") 

799 raise ValueError("Username is required for research process") 

800 # Extract user_password early so it's available for all cleanup paths 

801 user_password = kwargs.get("user_password") 

802 

803 # Establish thread context FIRST so every subsequent log line in this 

804 # thread can be attributed to the correct user/research and persisted 

805 # to the user's encrypted ResearchLog. Otherwise the early INFO logs 

806 # below ("Research thread started", "Research strategy", "Research 

807 # parameters") fire before start_research_process gets to its own 

808 # set_search_context call (~line 417) and the daemon can't open the 

809 # encrypted DB to write them — silently dropped via the bare-except. 

810 set_search_context( 

811 { 

812 "research_id": research_id, 

813 "username": username, 

814 "user_password": user_password, 

815 } 

816 ) 

817 

818 logger.info(f"Research thread started with username: {username}") 

819 

820 try: 

821 # Check if this research has been terminated before we even start 

822 if is_termination_requested(research_id): 

823 logger.info( 

824 f"Research {research_id} was terminated before starting" 

825 ) 

826 cleanup_research_resources( 

827 research_id, 

828 username, 

829 user_password=user_password, 

830 final_status=ResearchStatus.SUSPENDED, 

831 ) 

832 return 

833 

834 logger.info( 

835 f"Starting research process for ID {research_id}, query: {query}" 

836 ) 

837 

838 # Extract key parameters 

839 model_provider = kwargs.get("model_provider") 

840 model = kwargs.get("model") 

841 custom_endpoint = kwargs.get("custom_endpoint") 

842 search_engine = kwargs.get("search_engine") 

843 max_results = kwargs.get("max_results") 

844 time_period = kwargs.get("time_period") 

845 iterations = kwargs.get("iterations") 

846 questions_per_iteration = kwargs.get("questions_per_iteration") 

847 strategy = kwargs.get( 

848 "strategy", "source-based" 

849 ) # Default to source-based 

850 settings_snapshot = kwargs.get( 

851 "settings_snapshot", {} 

852 ) # Complete settings snapshot 

853 

854 # Log settings snapshot to debug 

855 from ...settings.logger import log_settings 

856 

857 log_settings(settings_snapshot, "Settings snapshot received in thread") 

858 

859 # Strategy should already be saved in the database before thread starts 

860 logger.info(f"Research strategy: {strategy}") 

861 

862 # Log all parameters for debugging 

863 logger.info( 

864 f"Research parameters: provider={model_provider}, model={model}, " 

865 f"search_engine={search_engine}, max_results={max_results}, " 

866 f"time_period={time_period}, iterations={iterations}, " 

867 f"questions_per_iteration={questions_per_iteration}, " 

868 f"custom_endpoint={custom_endpoint}, strategy={strategy}" 

869 ) 

870 

871 # Set up the AI Context Manager 

872 output_dir = OUTPUT_DIR / f"research_{research_id}" 

873 output_dir.mkdir(parents=True, exist_ok=True) 

874 

875 # Create a settings context that uses snapshot - no database access in threads 

876 settings_context = SnapshotSettingsContext( 

877 settings_snapshot, username=username 

878 ) 

879 

880 # Only log settings if explicitly enabled via LDR_LOG_SETTINGS env var 

881 from ...settings.logger import log_settings 

882 

883 log_settings( 

884 settings_context.values, "SettingsContext values extracted" 

885 ) 

886 

887 # Set the settings context for this thread 

888 from ...config.thread_settings import ( 

889 set_settings_context, 

890 ) 

891 

892 set_settings_context(settings_context) 

893 

894 # user_password already extracted above (before termination check) 

895 

896 # Create shared research context that can be updated during research 

897 shared_research_context = { 

898 "research_id": research_id, 

899 "research_query": query, 

900 "research_mode": mode, 

901 "research_phase": "init", 

902 "search_iteration": 0, 

903 "search_engines_planned": None, 

904 "search_engine_selected": search_engine, 

905 "username": username, # Add username for queue operations 

906 "user_password": user_password, # Add password for metrics access 

907 "chat_session_id": kwargs.get("chat_session_id"), 

908 } 

909 

910 # If this is a follow-up research, include the parent context 

911 if "research_context" in kwargs and kwargs["research_context"]: 

912 logger.info( 

913 f"Adding parent research context with {len(kwargs['research_context'].get('past_findings', ''))} chars of findings" 

914 ) 

915 shared_research_context.update(kwargs["research_context"]) 

916 

917 # Do not log context keys as they may contain sensitive information 

918 logger.info(f"Created shared_research_context for user: {username}") 

919 

920 # Set search context for search tracking 

921 set_search_context(shared_research_context) 

922 

923 # Per-research dedup state for step message persistence 

924 last_step_phase = None 

925 

926 # Pre-bind streaming_state so progress_callback's closure cell has a 

927 # value even if an exception fires between this point and the full 

928 # initialization later in this function. Without this, the except handler's 

929 # call to progress_callback during a concurrent termination raises 

930 # UnboundLocalError, silently skipping the DB FAILED update and the 

931 # error socket emit. The full streaming_state is reassigned below 

932 # before any real streaming starts; keys must match the canonical shape. 

933 streaming_state: dict = { 

934 "chunks_sent": 0, 

935 "chunks": [], 

936 "_bytes": 0, 

937 "_truncated": False, 

938 } 

939 streaming_enabled = False 

940 

941 # Set up progress callback 

942 def progress_callback(message, progress_percent, metadata): 

943 nonlocal last_step_phase 

944 # Frequent termination check 

945 if is_termination_requested(research_id): 

946 # Persist the partial chat row + emit final chunk BEFORE 

947 # handle_termination — afterwards the Socket.IO room is gone. 

948 _save_partial_chat_message_on_terminate( 

949 shared_research_context.get("chat_session_id"), 

950 research_id, 

951 username, 

952 "".join(streaming_state.get("chunks", [])), 

953 truncated=streaming_state.get("_truncated", False), 

954 streaming_state=streaming_state, 

955 ) 

956 handle_termination(research_id, username) 

957 streaming_state["_termination_handled"] = True 

958 raise ResearchTerminatedException( # noqa: TRY301 — inside nested callback, not caught by enclosing try 

959 "Research was terminated by user" 

960 ) 

961 

962 # Silent phase — no UI logging or socket emission needed 

963 if metadata.get("phase") == "termination_check": 

964 return 

965 

966 # Bind research_id AND username so the database_sink + queue 

967 # daemon can resolve the per-user encrypted DB. Without username 

968 # the daemon's _write_log_to_database hits "No authenticated 

969 # user", silently swallows the error, and ResearchLog ends up 

970 # with zero milestone rows — leaving /api/research/<id>/status 

971 # without a log_entry to render and the frontend stuck on the 

972 # "Performing research..." fallback. 

973 bound_logger = logger.bind( 

974 research_id=research_id, username=username 

975 ) 

976 bound_logger.log("MILESTONE", message) 

977 

978 if "SEARCH_PLAN:" in message: 978 ↛ 979line 978 didn't jump to line 979 because the condition on line 978 was never true

979 engines = message.split("SEARCH_PLAN:")[1].strip() 

980 metadata["planned_engines"] = engines 

981 metadata["phase"] = "search_planning" # Use existing phase 

982 # Update shared context for token tracking 

983 shared_research_context["search_engines_planned"] = engines 

984 shared_research_context["research_phase"] = "search_planning" 

985 

986 if "ENGINE_SELECTED:" in message: 986 ↛ 987line 986 didn't jump to line 987 because the condition on line 986 was never true

987 engine = message.split("ENGINE_SELECTED:")[1].strip() 

988 metadata["selected_engine"] = engine 

989 metadata["phase"] = "search" # Use existing 'search' phase 

990 # Update shared context for token tracking 

991 shared_research_context["search_engine_selected"] = engine 

992 shared_research_context["research_phase"] = "search" 

993 

994 # Capture other research phases for better context tracking 

995 if metadata.get("phase"): 995 ↛ 999line 995 didn't jump to line 999 because the condition on line 995 was always true

996 shared_research_context["research_phase"] = metadata["phase"] 

997 

998 # Update search iteration if available 

999 if "iteration" in metadata: 

1000 shared_research_context["search_iteration"] = metadata[ 

1001 "iteration" 

1002 ] 

1003 

1004 # Adjust progress based on research mode 

1005 adjusted_progress = progress_percent 

1006 if ( 

1007 mode == "detailed" 

1008 and metadata.get("phase") == "output_generation" 

1009 ): 

1010 # For detailed mode, adjust the progress range for output generation 

1011 adjusted_progress = min(80, progress_percent) 

1012 elif ( 

1013 mode == "detailed" 

1014 and metadata.get("phase") == "report_generation" 

1015 ): 

1016 # Scale the progress from 80% to 95% for the report generation phase 

1017 if progress_percent is not None: 1017 ↛ 1031line 1017 didn't jump to line 1031 because the condition on line 1017 was always true

1018 normalized = progress_percent / 100 

1019 adjusted_progress = 80 + (normalized * 15) 

1020 elif ( 

1021 mode == "quick" and metadata.get("phase") == "output_generation" 

1022 ): 

1023 # For quick mode, ensure we're at least at 85% during output generation 

1024 adjusted_progress = max(85, progress_percent) 

1025 # Map any further progress within output_generation to 85-95% range 

1026 if progress_percent is not None and progress_percent > 0: 1026 ↛ 1031line 1026 didn't jump to line 1031 because the condition on line 1026 was always true

1027 normalized = progress_percent / 100 

1028 adjusted_progress = 85 + (normalized * 10) 

1029 

1030 # Atomically update progress and check if research is still active 

1031 if adjusted_progress is not None: 

1032 adjusted_progress, still_active = ( 

1033 update_progress_and_check_active( 

1034 research_id, adjusted_progress 

1035 ) 

1036 ) 

1037 else: 

1038 still_active = is_research_active(research_id) 

1039 

1040 if still_active: 1040 ↛ exitline 1040 didn't return from function 'progress_callback' because the condition on line 1040 was always true

1041 # Queue the progress update to be processed in main thread 

1042 if adjusted_progress is not None: 

1043 from ..queue.processor_v2 import queue_processor 

1044 

1045 if username: 1045 ↛ 1050line 1045 didn't jump to line 1050 because the condition on line 1045 was always true

1046 queue_processor.queue_progress_update( 

1047 username, research_id, adjusted_progress 

1048 ) 

1049 else: 

1050 logger.warning( 

1051 f"Cannot queue progress update for research {research_id} - no username available" 

1052 ) 

1053 

1054 # Determine socket emit throttling 

1055 phase = metadata.get("phase", "") 

1056 is_final = ( 

1057 phase 

1058 in ( 

1059 "complete", 

1060 "error", 

1061 "report_complete", 

1062 ) 

1063 or adjusted_progress == 100 

1064 ) 

1065 

1066 should_emit = is_final 

1067 if not is_final: 

1068 now = time.monotonic() 

1069 with _last_emit_lock: 

1070 last = _last_emit_times.get(research_id, 0) 

1071 if now - last >= _EMIT_THROTTLE_SECONDS: 

1072 _last_emit_times[research_id] = now 

1073 should_emit = True 

1074 # Periodic TTL cleanup for orphaned entries 

1075 global _emit_cleanup_counter # noqa: PLW0603 

1076 _emit_cleanup_counter += 1 

1077 if _emit_cleanup_counter % 100 == 0: 1077 ↛ 1078line 1077 didn't jump to line 1078 because the condition on line 1077 was never true

1078 stale = [ 

1079 rid 

1080 for rid, t in _last_emit_times.items() 

1081 if now - t > _EMIT_TTL_SECONDS 

1082 ] 

1083 for rid in stale: 

1084 del _last_emit_times[rid] 

1085 

1086 # Build event data (before emit, shared scope) 

1087 event_data = None 

1088 if should_emit: 

1089 event_data = { 

1090 "progress": adjusted_progress, 

1091 "message": message, 

1092 "phase": phase, 

1093 } 

1094 # Include additional metadata for MCP/ReAct strategy display 

1095 if metadata.get("thought"): 1095 ↛ 1096line 1095 didn't jump to line 1096 because the condition on line 1095 was never true

1096 event_data["thought"] = metadata["thought"] 

1097 if metadata.get("tool"): 1097 ↛ 1098line 1097 didn't jump to line 1098 because the condition on line 1097 was never true

1098 event_data["tool"] = metadata["tool"] 

1099 if metadata.get("arguments"): 1099 ↛ 1100line 1099 didn't jump to line 1100 because the condition on line 1099 was never true

1100 event_data["arguments"] = metadata["arguments"] 

1101 if metadata.get("iteration"): 1101 ↛ 1102line 1101 didn't jump to line 1102 because the condition on line 1101 was never true

1102 event_data["iteration"] = metadata["iteration"] 

1103 if metadata.get("error"): 

1104 event_data["error"] = metadata["error"] 

1105 if metadata.get("content"): 1105 ↛ 1106line 1105 didn't jump to line 1106 because the condition on line 1105 was never true

1106 event_data["content"] = metadata["content"] 

1107 

1108 # Persist step message to chat session BEFORE emitting socket 

1109 # (ensures DB has the step when clients receive the event). 

1110 # 

1111 # Symmetry invariant: for chat sessions, what the user sees 

1112 # live must equal what `loadSession` reconstructs on reload. 

1113 # If dedup blocks persistence of a repeat step phase, drop 

1114 # the socket emit too (unless this is a final-phase event 

1115 # the completion handler depends on) so live UI doesn't 

1116 # surface events that vanish on reload. 

1117 _chat_session_id = shared_research_context.get( 

1118 "chat_session_id" 

1119 ) 

1120 if _chat_session_id: 

1121 _persist, _suppress_emit = _chat_step_decision( 

1122 phase, last_step_phase, is_final 

1123 ) 

1124 if _persist: 1124 ↛ 1144line 1124 didn't jump to line 1144 because the condition on line 1124 was always true

1125 try: 

1126 from ...chat.service import ChatService 

1127 

1128 ChatService(username).add_progress_step( 

1129 session_id=_chat_session_id, 

1130 research_id=research_id, 

1131 content=message, 

1132 phase=phase, 

1133 ) 

1134 last_step_phase = phase 

1135 except Exception: 

1136 logger.opt(exception=True).warning( 

1137 "Failed to persist progress step" 

1138 ) 

1139 # Symmetry invariant: if persistence failed 

1140 # (e.g. OperationalError under DB contention), 

1141 # suppress the live emit too — otherwise the 

1142 # client sees a step that vanishes on reload. 

1143 event_data = None 

1144 if _suppress_emit: 1144 ↛ 1145line 1144 didn't jump to line 1145 because the condition on line 1144 was never true

1145 event_data = None 

1146 

1147 # Emit socket event AFTER DB persistence 

1148 if event_data is not None: 

1149 try: 

1150 SocketIOService().emit_to_subscribers( 

1151 "progress", research_id, event_data 

1152 ) 

1153 except Exception: 

1154 logger.exception("Socket emit error (non-critical)") 

1155 

1156 # Function to check termination during long-running operations 

1157 def check_termination(): 

1158 if is_termination_requested(research_id): 

1159 _save_partial_chat_message_on_terminate( 

1160 shared_research_context.get("chat_session_id"), 

1161 research_id, 

1162 username, 

1163 "".join(streaming_state.get("chunks", [])), 

1164 truncated=streaming_state.get("_truncated", False), 

1165 streaming_state=streaming_state, 

1166 ) 

1167 handle_termination(research_id, username) 

1168 streaming_state["_termination_handled"] = True 

1169 raise ResearchTerminatedException( # noqa: TRY301 — inside nested callback, not caught by enclosing try 

1170 "Research was terminated by user during long-running operation" 

1171 ) 

1172 return False # Not terminated 

1173 

1174 # Configure the system with the specified parameters 

1175 use_llm = None 

1176 if model or search_engine or model_provider: 

1177 # Log that we're overriding system settings 

1178 logger.info( 

1179 f"Overriding system settings with: provider={model_provider}, model={model}, search_engine={search_engine}" 

1180 ) 

1181 

1182 # Override LLM if model or model_provider specified 

1183 if model or model_provider: 

1184 try: 

1185 # Get LLM with the overridden settings 

1186 # Use the shared_research_context which includes username 

1187 use_llm = get_llm( 

1188 model_name=model, 

1189 provider=model_provider, 

1190 openai_endpoint_url=custom_endpoint, 

1191 research_id=research_id, 

1192 research_context=shared_research_context, 

1193 ) 

1194 

1195 logger.info( 

1196 f"Successfully set LLM to: provider={model_provider}, model={model}" 

1197 ) 

1198 except Exception as e: 

1199 logger.exception( 

1200 f"Error setting LLM provider={model_provider}, model={model}" 

1201 ) 

1202 error_msg = str(e) 

1203 # Surface configuration errors to user instead of silently continuing 

1204 config_error_keywords = [ 

1205 "model path", 

1206 "llamacpp", 

1207 "cannot connect", 

1208 "server", 

1209 "not configured", 

1210 "not responding", 

1211 "directory", 

1212 ".gguf", 

1213 ] 

1214 if any( 

1215 keyword in error_msg.lower() 

1216 for keyword in config_error_keywords 

1217 ): 

1218 # This is a configuration error the user can fix 

1219 raise ValueError( 

1220 f"LLM Configuration Error: {error_msg}" 

1221 ) from e 

1222 # For other errors, re-raise to avoid silent failures 

1223 raise 

1224 

1225 # Create search engine first if specified, to avoid default creation without username 

1226 use_search = None 

1227 if search_engine: 

1228 try: 

1229 # Create a new search object with these settings 

1230 use_search = get_search( 

1231 search_tool=search_engine, 

1232 llm_instance=use_llm, 

1233 username=username, 

1234 settings_snapshot=settings_snapshot, 

1235 ) 

1236 logger.info( 

1237 f"Successfully created search engine: {search_engine}" 

1238 ) 

1239 except Exception as e: 

1240 logger.exception( 

1241 f"Error creating search engine {search_engine}" 

1242 ) 

1243 error_msg = str(e) 

1244 # Surface configuration errors to user instead of silently continuing 

1245 config_error_keywords = [ 

1246 "searxng", 

1247 "instance_url", 

1248 "api_key", 

1249 "cannot connect", 

1250 "connection", 

1251 "timeout", 

1252 "not configured", 

1253 ] 

1254 if any( 

1255 keyword in error_msg.lower() 

1256 for keyword in config_error_keywords 

1257 ): 

1258 # This is a configuration error the user can fix 

1259 raise ValueError( 

1260 f"Search Engine Configuration Error ({search_engine}): {error_msg}" 

1261 ) from e 

1262 # For other errors, re-raise to avoid silent failures 

1263 raise 

1264 

1265 # Set the progress callback in the system 

1266 system = AdvancedSearchSystem( 

1267 llm=use_llm, # type: ignore[arg-type] 

1268 search=use_search, # type: ignore[arg-type] 

1269 strategy_name=strategy, 

1270 max_iterations=iterations, 

1271 questions_per_iteration=questions_per_iteration, 

1272 username=username, 

1273 settings_snapshot=settings_snapshot, 

1274 research_id=research_id, 

1275 research_context=shared_research_context, 

1276 ) 

1277 system.set_progress_callback(progress_callback) 

1278 

1279 # Chat mode: set up LLM streaming callback for real-time response chunks 

1280 streaming_enabled = False 

1281 # chunks: server-side buffer so partial content survives termination 

1282 # (the citation handler's local list is discarded on raise). 

1283 # _bytes / _truncated cap the buffer to bound memory on pathologically 

1284 # long answers; once capped we still forward to the frontend but stop 

1285 # accumulating server-side. 

1286 streaming_state = { 

1287 "chunks_sent": 0, 

1288 "chunks": [], 

1289 "_bytes": 0, 

1290 "_truncated": False, 

1291 } 

1292 chat_session_id = shared_research_context.get("chat_session_id") 

1293 

1294 if chat_session_id: 1294 ↛ 1295line 1294 didn't jump to line 1295 because the condition on line 1294 was never true

1295 try: 

1296 socket_service = SocketIOService() 

1297 

1298 # Source resolver returns the strategy's currently-collected 

1299 # source list. Late-bound so the streaming callback can 

1300 # apply inline hyperlinks as the agent finishes adding to 

1301 # all_links_of_system (sources may still be growing when 

1302 # synthesis starts; reading via the closure picks up the 

1303 # final list at chunk-emit time, not callback-build time). 

1304 def _resolve_sources(): 

1305 if not hasattr(system, "all_links_of_system"): 

1306 return [] 

1307 return list(system.all_links_of_system or []) 

1308 

1309 # Build a formatter matching the user's report.citation_format 

1310 # so live-display brackets ([[arxiv.org-1]] / [[arxiv-1]] / 

1311 # [[1]] etc.) match what the final-save formatter will emit 

1312 # — avoids a visible format-flip when handleResearchComplete 

1313 # swaps in the DB-saved version. 

1314 live_formatter = get_citation_formatter() 

1315 

1316 stream_callback = _make_chat_stream_callback( 

1317 research_id, 

1318 streaming_state, 

1319 socket_service, 

1320 source_resolver=_resolve_sources, 

1321 formatter=live_formatter, 

1322 ) 

1323 

1324 # Hook into the citation handler's streaming 

1325 if hasattr(system, "strategy") and hasattr( 

1326 system.strategy, "citation_handler" 

1327 ): 

1328 handler = system.strategy.citation_handler 

1329 if hasattr(handler, "set_stream_callback"): 

1330 handler.set_stream_callback(stream_callback) 

1331 streaming_enabled = True 

1332 logger.info( 

1333 f"Streaming enabled for chat {chat_session_id[:8]}..." 

1334 ) 

1335 except Exception: 

1336 # exception=True so the traceback is visible: streaming is 

1337 # non-critical (research still completes without it), but a 

1338 # silent warning would hide real bugs in the setup above. 

1339 logger.opt(exception=True).warning( 

1340 "Could not set up streaming (non-critical)" 

1341 ) 

1342 

1343 # Helper to save chat message (closes over outer scope vars) 

1344 def _maybe_save_chat_message(content): 

1345 _chat_sid = shared_research_context.get("chat_session_id") 

1346 if not _chat_sid: 1346 ↛ 1348line 1346 didn't jump to line 1348 because the condition on line 1346 was always true

1347 return 

1348 try: 

1349 _save_chat_message_and_context( 

1350 _chat_sid, 

1351 research_id, 

1352 username, 

1353 content, 

1354 streaming_enabled, 

1355 streaming_state, 

1356 SocketIOService(), 

1357 settings_snapshot=settings_snapshot, 

1358 ) 

1359 except Exception: 

1360 # Promoted from debug→warning: at debug level this is invisible 

1361 # in production and the user sees "Research completed but no 

1362 # report available" with no operator signal. Same rationale as 

1363 # the accumulated_context handler in _save_chat_message_and_context. 

1364 logger.opt(exception=True).warning( 

1365 "Could not add message to chat session — assistant " 

1366 "response NOT persisted; user will see 'no report available'" 

1367 ) 

1368 # The DB write failed, so _save_chat_message_and_context never 

1369 # emitted its is_final response_chunk. Emit one here so the 

1370 # streaming UI clears its 'thinking' state instead of stalling. 

1371 try: 

1372 SocketIOService().emit_to_subscribers( 

1373 "response_chunk", 

1374 research_id, 

1375 {"chunk": "", "is_streaming": True, "is_final": True}, 

1376 ) 

1377 except Exception: 

1378 logger.opt(exception=True).debug( 

1379 "Failed to emit final chunk after chat-persist error" 

1380 ) 

1381 

1382 # Run the search 

1383 progress_callback("Starting research process", 5, {"phase": "init"}) 

1384 

1385 try: 

1386 results = system.analyze_topic(query) 

1387 if mode == "quick": 

1388 progress_callback( 

1389 "Search complete, preparing to generate summary...", 

1390 85, 

1391 {"phase": "output_generation"}, 

1392 ) 

1393 else: 

1394 progress_callback( 

1395 "Search complete, generating output", 

1396 80, 

1397 {"phase": "output_generation"}, 

1398 ) 

1399 except Exception as search_error: 

1400 # Better handling of specific search errors 

1401 error_message = str(search_error) 

1402 error_type = "unknown" 

1403 

1404 # OpenAI-compatible runtime failures (LM Studio / vLLM / llama.cpp 

1405 # server / OpenRouter / custom endpoint) -- rewrite to a message 

1406 # that names the provider, base URL, and model (#3878). 

1407 if model_provider in { 1407 ↛ 1417line 1407 didn't jump to line 1417 because the condition on line 1407 was never true

1408 "openai_endpoint", 

1409 "lmstudio", 

1410 "llamacpp", 

1411 "openai", 

1412 "openrouter", 

1413 "google", 

1414 "ionos", 

1415 "xai", 

1416 } and is_openai_compat_runtime_error(search_error): 

1417 rewritten = friendly_openai_compatible_error( 

1418 search_error, 

1419 provider=model_provider, 

1420 base_url=custom_endpoint, 

1421 model=model, 

1422 ) 

1423 raise RuntimeError(rewritten) from search_error 

1424 

1425 # Extract error details for common issues 

1426 if "status code: 503" in error_message: 

1427 error_message = "Ollama AI service is unavailable (HTTP 503). Please check that Ollama is running properly on your system." 

1428 error_type = "ollama_unavailable" 

1429 elif "status code: 404" in error_message: 

1430 error_message = "Ollama model not found (HTTP 404). Please check that you have pulled the required model." 

1431 error_type = "model_not_found" 

1432 elif "status code:" in error_message: 

1433 # Extract the status code for other HTTP errors 

1434 status_code = error_message.split("status code:")[1].strip() 

1435 error_message = f"API request failed with status code {status_code}. Please check your configuration." 

1436 error_type = "api_error" 

1437 elif "connection" in error_message.lower(): 

1438 error_message = "Connection error. Please check that your LLM service (Ollama/API) is running and accessible." 

1439 error_type = "connection_error" 

1440 

1441 # Raise with improved error message 

1442 raise RuntimeError( 

1443 f"{error_message} (Error type: {error_type})" 

1444 ) from search_error 

1445 

1446 # Generate output based on mode 

1447 if mode == "quick": 

1448 # Quick Summary 

1449 if results.get("findings") or results.get("formatted_findings"): 

1450 raw_formatted_findings = results["formatted_findings"] 

1451 

1452 # Check if formatted_findings contains an error message 

1453 if isinstance( 

1454 raw_formatted_findings, str 

1455 ) and raw_formatted_findings.startswith("Error:"): 

1456 logger.error( 

1457 f"Detected error in formatted findings: {raw_formatted_findings[:100]}..." 

1458 ) 

1459 

1460 # Determine error type for better user feedback 

1461 error_type = "unknown" 

1462 error_message = raw_formatted_findings.lower() 

1463 

1464 if ( 

1465 "token limit" in error_message 

1466 or "context length" in error_message 

1467 ): 

1468 error_type = "token_limit" 

1469 # Log specific error type 

1470 logger.warning( 

1471 "Detected token limit error in synthesis" 

1472 ) 

1473 

1474 # Update progress with specific error type 

1475 progress_callback( 

1476 "Synthesis hit token limits. Attempting fallback...", 

1477 87, 

1478 { 

1479 "phase": "synthesis_error", 

1480 "error_type": error_type, 

1481 }, 

1482 ) 

1483 elif ( 

1484 "timeout" in error_message 

1485 or "timed out" in error_message 

1486 ): 

1487 error_type = "timeout" 

1488 logger.warning("Detected timeout error in synthesis") 

1489 progress_callback( 

1490 "Synthesis timed out. Attempting fallback...", 

1491 87, 

1492 { 

1493 "phase": "synthesis_error", 

1494 "error_type": error_type, 

1495 }, 

1496 ) 

1497 elif "rate limit" in error_message: 

1498 error_type = "rate_limit" 

1499 logger.warning("Detected rate limit error in synthesis") 

1500 progress_callback( 

1501 "LLM rate limit reached. Attempting fallback...", 

1502 87, 

1503 { 

1504 "phase": "synthesis_error", 

1505 "error_type": error_type, 

1506 }, 

1507 ) 

1508 elif ( 

1509 "connection" in error_message 

1510 or "network" in error_message 

1511 ): 

1512 error_type = "connection" 

1513 logger.warning("Detected connection error in synthesis") 

1514 progress_callback( 

1515 "Connection issue with LLM. Attempting fallback...", 

1516 87, 

1517 { 

1518 "phase": "synthesis_error", 

1519 "error_type": error_type, 

1520 }, 

1521 ) 

1522 elif ( 

1523 "llm error" in error_message 

1524 or "final answer synthesis fail" in error_message 

1525 ): 

1526 error_type = "llm_error" 

1527 logger.warning( 

1528 "Detected general LLM error in synthesis" 

1529 ) 

1530 progress_callback( 

1531 "LLM error during synthesis. Attempting fallback...", 

1532 87, 

1533 { 

1534 "phase": "synthesis_error", 

1535 "error_type": error_type, 

1536 }, 

1537 ) 

1538 else: 

1539 # Generic error 

1540 logger.warning("Detected unknown error in synthesis") 

1541 progress_callback( 

1542 "Error during synthesis. Attempting fallback...", 

1543 87, 

1544 { 

1545 "phase": "synthesis_error", 

1546 "error_type": "unknown", 

1547 }, 

1548 ) 

1549 

1550 # Extract synthesized content from findings if available 

1551 synthesized_content = "" 

1552 for finding in results.get("findings", []): 

1553 if finding.get("phase") == "Final synthesis": 

1554 synthesized_content = finding.get("content", "") 

1555 break 

1556 

1557 # Use synthesized content as fallback 

1558 if ( 

1559 synthesized_content 

1560 and not synthesized_content.startswith("Error:") 

1561 ): 

1562 logger.info( 

1563 "Using existing synthesized content as fallback" 

1564 ) 

1565 raw_formatted_findings = synthesized_content 

1566 

1567 # Or use current_knowledge as another fallback 

1568 elif results.get("current_knowledge"): 

1569 logger.info("Using current_knowledge as fallback") 

1570 raw_formatted_findings = results["current_knowledge"] 

1571 

1572 # Or combine all finding contents as last resort 

1573 elif results.get("findings"): 

1574 logger.info("Combining all findings as fallback") 

1575 # First try to use any findings that are not errors 

1576 valid_findings = [ 

1577 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}" 

1578 for finding in results.get("findings", []) 

1579 if finding.get("content") 

1580 and not finding.get("content", "").startswith( 

1581 "Error:" 

1582 ) 

1583 ] 

1584 

1585 synthesis_error = raw_formatted_findings 

1586 if valid_findings: 

1587 raw_formatted_findings = ( 

1588 "# Research Results (Fallback Mode)\n\n" 

1589 ) 

1590 raw_formatted_findings += "\n\n".join( 

1591 valid_findings 

1592 ) 

1593 raw_formatted_findings += ( 

1594 f"\n\n## Error Information\n{synthesis_error}" 

1595 ) 

1596 else: 

1597 # Last resort: use everything including errors 

1598 raw_formatted_findings = ( 

1599 "# Research Results (Emergency Fallback)\n\n" 

1600 ) 

1601 raw_formatted_findings += "The system encountered errors during final synthesis.\n\n" 

1602 raw_formatted_findings += "\n\n".join( 

1603 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}" 

1604 for finding in results.get("findings", []) 

1605 if finding.get("content") 

1606 ) 

1607 

1608 progress_callback( 

1609 f"Using fallback synthesis due to {error_type} error", 

1610 88, 

1611 { 

1612 "phase": "synthesis_fallback", 

1613 "error_type": error_type, 

1614 }, 

1615 ) 

1616 

1617 logger.info( 

1618 "Found formatted_findings of length: {}", 

1619 len(str(raw_formatted_findings)), 

1620 ) 

1621 

1622 try: 

1623 # Check if we have an error in the findings and use enhanced error handling 

1624 if isinstance( 

1625 raw_formatted_findings, str 

1626 ) and raw_formatted_findings.startswith("Error:"): 

1627 logger.info( 

1628 "Generating enhanced error report using ErrorReportGenerator" 

1629 ) 

1630 

1631 # Generate comprehensive error report 

1632 # ErrorReportGenerator does not use LLM (kept for compat) 

1633 error_generator = ErrorReportGenerator() 

1634 clean_markdown = error_generator.generate_error_report( 

1635 error_message=raw_formatted_findings, 

1636 query=query, 

1637 partial_results=results, 

1638 search_iterations=results.get("iterations", 0), 

1639 research_id=research_id, 

1640 ) 

1641 

1642 logger.info( 

1643 "Generated enhanced error report with {} characters", 

1644 len(clean_markdown), 

1645 ) 

1646 else: 

1647 # report_content stores the synthesized answer 

1648 # only (see _extract_synthesized_answer for the 

1649 # full rationale). Fall back to the formatted 

1650 # blob only when neither Final synthesis nor 

1651 # current_knowledge is populated — leaks 

1652 # sources, but at least we save *something*. 

1653 clean_markdown = ( 

1654 _extract_synthesized_answer(results) 

1655 or raw_formatted_findings 

1656 ) 

1657 

1658 # Pull sources from the search-system's accumulated link 

1659 # buffer first — same source the detailed-report path 

1660 # uses at the equivalent point below. Wrapper 

1661 # strategies (e.g. EnhancedContextualFollowUpStrategy, 

1662 # IterativeRefinementStrategy) delegate the actual 

1663 # search to an inner strategy that populates 

1664 # `self.all_links_of_system`, but they don't bubble 

1665 # that list back into the result dict's `findings`. So 

1666 # the legacy `findings[*].search_results` extraction 

1667 # below stays empty for chat follow-ups, leaving the 

1668 # citation formatter with no urls to hyperlink. Prefer 

1669 # the system-level accumulator; fall back to the 

1670 # legacy extraction so direct strategies that bypass 

1671 # the system buffer still work. 

1672 all_links = list( 

1673 getattr(system, "all_links_of_system", None) or [] 

1674 ) 

1675 if not all_links: 1675 ↛ 1689line 1675 didn't jump to line 1689 because the condition on line 1675 was always true

1676 for finding in results.get("findings", []): 

1677 search_results = finding.get("search_results", []) 

1678 if search_results: 

1679 try: 

1680 links = extract_links_from_search_results( 

1681 search_results 

1682 ) 

1683 all_links.extend(links) 

1684 except Exception: 

1685 logger.exception( 

1686 "Error processing search results/links" 

1687 ) 

1688 

1689 logger.info( 

1690 "Successfully converted to clean markdown of length: {}", 

1691 len(clean_markdown), 

1692 ) 

1693 

1694 # First send a progress update for generating the summary 

1695 progress_callback( 

1696 "Generating clean summary from research data...", 

1697 90, 

1698 {"phase": "output_generation"}, 

1699 ) 

1700 

1701 # Send progress update for saving report 

1702 progress_callback( 

1703 "Saving research report to database...", 

1704 95, 

1705 {"phase": "report_complete"}, 

1706 ) 

1707 

1708 # Format citations in the markdown content. The 

1709 # split returns the answer-with-hyperlinks half 

1710 # separately from the trailing sources section the 

1711 # LLM may have emitted (which is discarded — sources 

1712 # live in research_resources, the canonical store). 

1713 # When no Sources section is found, fall back to 

1714 # structured-source hyperlinking — never re-parse 

1715 # concatenated formatter output downstream. 

1716 formatter = get_citation_formatter() 

1717 try: 

1718 answer_with_links, llm_sources = ( 

1719 formatter.format_document_split(clean_markdown) 

1720 ) 

1721 if not llm_sources: 1721 ↛ 1733line 1721 didn't jump to line 1733 because the condition on line 1721 was always true

1722 answer_with_links = ( 

1723 formatter.apply_inline_hyperlinks( 

1724 clean_markdown, all_links 

1725 ) 

1726 ) 

1727 # Safety check: a >50% strip on a long input 

1728 # likely means the regex over-stripped on a 

1729 # "Sources:" header inside the answer body. 

1730 # Fall back to structured-source hyperlinking 

1731 # on the full text. Min-length floor prevents 

1732 # false-fires on legitimately short answers. 

1733 SAFETY_MIN_LEN = 800 

1734 if ( 1734 ↛ 1740line 1734 didn't jump to line 1740 because the condition on line 1734 was never true

1735 llm_sources 

1736 and len(clean_markdown) > SAFETY_MIN_LEN 

1737 and len(answer_with_links) 

1738 < len(clean_markdown) * 0.5 

1739 ): 

1740 logger.warning( 

1741 "format_document_split appears to have " 

1742 "over-stripped (answer={} chars, " 

1743 "original={} chars) for research {}. " 

1744 "Falling back to structured-source " 

1745 "hyperlinking on full input.", 

1746 len(answer_with_links), 

1747 len(clean_markdown), 

1748 research_id, 

1749 ) 

1750 answer_with_links = ( 

1751 formatter.apply_inline_hyperlinks( 

1752 clean_markdown, all_links 

1753 ) 

1754 ) 

1755 except Exception: 

1756 # Hyperlinking is quality-of-life, not a hard 

1757 # requirement. If anything blows up, save the 

1758 # raw LLM text rather than fail the research. 

1759 logger.exception( 

1760 "Citation formatter failed; saving raw answer" 

1761 ) 

1762 answer_with_links = clean_markdown 

1763 

1764 # report_content stores ONLY the synthesized answer. 

1765 # The legacy "answer + ## Sources + ## Research 

1766 # Metrics" view is reconstructed at render time by 

1767 # report_assembly_service.assemble_full_report. 

1768 full_report_content = answer_with_links 

1769 

1770 # Save report FIRST, then sources: 

1771 # a chat read between commits sees a report with no 

1772 # sources (assembler renders just the answer) — better 

1773 # failure mode than partial assembly with sources but 

1774 # no answer body. 

1775 from ...storage import get_report_storage 

1776 

1777 with get_user_db_session(username) as db_session: 

1778 storage = get_report_storage(session=db_session) 

1779 

1780 # Prepare metadata 

1781 metadata = { 

1782 "iterations": results["iterations"], 

1783 "generated_at": datetime.now(UTC).isoformat(), 

1784 } 

1785 

1786 # Save report using storage abstraction 

1787 success = storage.save_report( 

1788 research_id=research_id, 

1789 content=full_report_content, 

1790 metadata=metadata, 

1791 username=username, 

1792 ) 

1793 

1794 if not success: 

1795 raise RuntimeError("Failed to save research report") # noqa: TRY301 — triggers research failure handling in outer except 

1796 

1797 # Save sources to database (non-fatal - report 

1798 # already saved; sources missing is recoverable 

1799 # because the assembler omits empty Sources blocks) 

1800 try: 

1801 from .research_sources_service import ( 

1802 ResearchSourcesService, 

1803 ) 

1804 

1805 sources_service = ResearchSourcesService() 

1806 if all_links: 

1807 logger.info( 

1808 f"Quick summary: Saving {len(all_links)} sources to database" 

1809 ) 

1810 sources_saved = ( 

1811 sources_service.save_research_sources( 

1812 research_id=research_id, 

1813 sources=all_links, 

1814 username=username, 

1815 ) 

1816 ) 

1817 logger.info( 

1818 f"Quick summary: Saved {sources_saved} sources for research {research_id}" 

1819 ) 

1820 except Exception: 

1821 logger.exception( 

1822 f"Failed to save sources for research {research_id} (continuing with report save)" 

1823 ) 

1824 

1825 logger.info(f"Report saved for research_id: {research_id}") 

1826 

1827 # Skip export to additional formats - we're storing in database only 

1828 

1829 # Update research status in database 

1830 completed_at = datetime.now(UTC).isoformat() 

1831 

1832 with get_user_db_session(username) as db_session: 

1833 research = ( 

1834 db_session.query(ResearchHistory) 

1835 .filter_by(id=research_id) 

1836 .first() 

1837 ) 

1838 

1839 # Preserve existing metadata and update with new values 

1840 metadata = _parse_research_metadata( 

1841 research.research_meta 

1842 ) 

1843 

1844 metadata.update( 

1845 { 

1846 "iterations": results["iterations"], 

1847 "generated_at": datetime.now(UTC).isoformat(), 

1848 } 

1849 ) 

1850 

1851 # Use the helper function for consistent duration calculation 

1852 duration_seconds = calculate_duration( 

1853 research.created_at, completed_at 

1854 ) 

1855 

1856 research.status = ResearchStatus.COMPLETED 

1857 research.completed_at = completed_at 

1858 research.duration_seconds = duration_seconds 

1859 # Note: report_content is saved by CachedResearchService 

1860 # report_path is not used in encrypted database version 

1861 

1862 # Generate headline and topics only for news searches 

1863 if ( 

1864 metadata.get("is_news_search") 

1865 or metadata.get("search_type") == "news_analysis" 

1866 ): 

1867 try: 

1868 from ...news.utils.headline_generator import ( 

1869 generate_headline, 

1870 ) 

1871 from ...news.utils.topic_generator import ( 

1872 generate_topics, 

1873 ) 

1874 

1875 # Get the report content from database for better headline/topic generation 

1876 report_content = "" 

1877 try: 

1878 research = ( 

1879 db_session.query(ResearchHistory) 

1880 .filter_by(id=research_id) 

1881 .first() 

1882 ) 

1883 if research and research.report_content: 1883 ↛ 1889line 1883 didn't jump to line 1889 because the condition on line 1883 was always true

1884 report_content = research.report_content 

1885 logger.info( 

1886 f"Retrieved {len(report_content)} chars from database for headline generation" 

1887 ) 

1888 else: 

1889 logger.warning( 

1890 f"No report content found in database for research_id: {research_id}" 

1891 ) 

1892 except Exception: 

1893 logger.warning( 

1894 "Could not retrieve report content from database" 

1895 ) 

1896 

1897 # Generate headline 

1898 logger.info( 

1899 f"Generating headline for query: {query[:100]}" 

1900 ) 

1901 headline = generate_headline( 

1902 query, report_content 

1903 ) 

1904 metadata["generated_headline"] = headline 

1905 

1906 # Generate topics 

1907 logger.info( 

1908 f"Generating topics with category: {metadata.get('category', 'News')}" 

1909 ) 

1910 topics = generate_topics( 

1911 query=query, 

1912 findings=report_content, 

1913 category=metadata.get("category", "News"), 

1914 max_topics=6, 

1915 ) 

1916 metadata["generated_topics"] = topics 

1917 

1918 logger.info(f"Generated headline: {headline}") 

1919 logger.info(f"Generated topics: {topics}") 

1920 

1921 except Exception: 

1922 logger.warning( 

1923 "Could not generate headline/topics" 

1924 ) 

1925 

1926 research.research_meta = metadata 

1927 

1928 db_session.commit() 

1929 logger.info( 

1930 f"Database commit completed for research_id: {research_id}" 

1931 ) 

1932 

1933 # Update subscription if this was triggered by a subscription 

1934 if metadata.get("subscription_id"): 

1935 try: 

1936 from ...news.subscription_manager.storage import ( 

1937 SQLSubscriptionStorage, 

1938 ) 

1939 from datetime import ( 

1940 datetime as dt, 

1941 timezone, 

1942 timedelta, 

1943 ) 

1944 

1945 sub_storage = SQLSubscriptionStorage(db_session) 

1946 subscription_id = metadata["subscription_id"] 

1947 

1948 # Get subscription to find refresh interval 

1949 subscription = sub_storage.get(subscription_id) 

1950 if subscription: 1950 ↛ 1979line 1950 didn't jump to line 1979

1951 refresh_minutes = subscription.get( 

1952 "refresh_minutes", 240 

1953 ) 

1954 now = dt.now(timezone.utc) 

1955 next_refresh = now + timedelta( 

1956 minutes=refresh_minutes 

1957 ) 

1958 

1959 # Update refresh times 

1960 sub_storage.update_refresh_time( 

1961 subscription_id=subscription_id, 

1962 last_refresh=now, 

1963 next_refresh=next_refresh, 

1964 ) 

1965 

1966 # Increment stats 

1967 sub_storage.increment_stats( 

1968 subscription_id, 1 

1969 ) 

1970 

1971 logger.info( 

1972 f"Updated subscription {subscription_id} refresh times" 

1973 ) 

1974 except Exception: 

1975 logger.warning( 

1976 "Could not update subscription refresh time" 

1977 ) 

1978 

1979 logger.info( 

1980 f"Database updated successfully for research_id: {research_id}" 

1981 ) 

1982 

1983 _maybe_save_chat_message(full_report_content) 

1984 

1985 # Send the final completion message 

1986 progress_callback( 

1987 "Research completed successfully", 

1988 100, 

1989 {"phase": "complete"}, 

1990 ) 

1991 

1992 # Clean up resources 

1993 logger.info( 

1994 "Cleaning up resources for research_id: {}", research_id 

1995 ) 

1996 cleanup_research_resources( 

1997 research_id, username, user_password=user_password 

1998 ) 

1999 logger.info( 

2000 "Resources cleaned up for research_id: {}", research_id 

2001 ) 

2002 

2003 except Exception as inner_e: 

2004 logger.exception("Error during quick summary generation") 

2005 raise RuntimeError( 

2006 f"Error generating quick summary: {inner_e!s}" 

2007 ) 

2008 else: 

2009 raise RuntimeError( # noqa: TRY301 — triggers research failure handling in outer except 

2010 "No research findings were generated. Please try again." 

2011 ) 

2012 else: 

2013 # Full Report 

2014 progress_callback( 

2015 "Generating detailed report...", 

2016 85, 

2017 {"phase": "report_generation"}, 

2018 ) 

2019 

2020 # Extract the search system from the results if available 

2021 search_system = results.get("search_system", None) 

2022 

2023 # Wrapper that maps report generator's 0-100% to 85-95% range 

2024 # and relays cancellation checks through the outer progress_callback 

2025 def report_progress_callback(message, progress_percent, metadata): 

2026 if progress_percent is not None: 

2027 adjusted = 85 + (progress_percent / 100) * 10 

2028 else: 

2029 adjusted = progress_percent 

2030 progress_callback(message, adjusted, metadata) 

2031 

2032 # Pass the existing search system to maintain citation indices 

2033 report_generator = IntegratedReportGenerator( 

2034 search_system=search_system, 

2035 settings_snapshot=settings_snapshot, 

2036 ) 

2037 final_report = report_generator.generate_report( 

2038 results, query, progress_callback=report_progress_callback 

2039 ) 

2040 

2041 progress_callback( 

2042 "Report generation complete", 95, {"phase": "report_complete"} 

2043 ) 

2044 

2045 # Format citations and split off the trailing Sources 

2046 # section. Save only the answer half — sources are 

2047 # persisted structurally to research_resources. 

2048 all_links = ( 

2049 getattr(search_system, "all_links_of_system", None) or [] 

2050 ) 

2051 formatter = get_citation_formatter() 

2052 try: 

2053 answer_with_links, llm_sources = ( 

2054 formatter.format_document_split(final_report["content"]) 

2055 ) 

2056 if not llm_sources: 

2057 answer_with_links = formatter.apply_inline_hyperlinks( 

2058 final_report["content"], all_links 

2059 ) 

2060 SAFETY_MIN_LEN = 800 

2061 if ( 

2062 llm_sources 

2063 and len(final_report["content"]) > SAFETY_MIN_LEN 

2064 and len(answer_with_links) 

2065 < len(final_report["content"]) * 0.5 

2066 ): 

2067 logger.warning( 

2068 "format_document_split appears to have over-stripped " 

2069 "(answer={} chars, original={} chars) for research {}.", 

2070 len(answer_with_links), 

2071 len(final_report["content"]), 

2072 research_id, 

2073 ) 

2074 answer_with_links = formatter.apply_inline_hyperlinks( 

2075 final_report["content"], all_links 

2076 ) 

2077 except Exception: 

2078 logger.exception("Citation formatter failed; saving raw answer") 

2079 answer_with_links = final_report["content"] 

2080 formatted_content = answer_with_links 

2081 

2082 # Save report FIRST, sources after. 

2083 # See quick-summary path for rationale. 

2084 with get_user_db_session(username) as db_session: 

2085 # Update metadata 

2086 metadata = final_report["metadata"] 

2087 metadata["iterations"] = results["iterations"] 

2088 

2089 # Save report to database 

2090 try: 

2091 research = ( 

2092 db_session.query(ResearchHistory) 

2093 .filter_by(id=research_id) 

2094 .first() 

2095 ) 

2096 

2097 if not research: 2097 ↛ 2098line 2097 didn't jump to line 2098 because the condition on line 2097 was never true

2098 logger.error(f"Research {research_id} not found") 

2099 success = False 

2100 else: 

2101 research.report_content = formatted_content 

2102 if research.research_meta: 2102 ↛ 2103line 2102 didn't jump to line 2103 because the condition on line 2102 was never true

2103 research.research_meta.update(metadata) 

2104 else: 

2105 research.research_meta = metadata 

2106 db_session.commit() 

2107 success = True 

2108 logger.info( 

2109 f"Saved report for research {research_id} to database" 

2110 ) 

2111 except Exception: 

2112 logger.exception("Error saving report to database") 

2113 db_session.rollback() 

2114 success = False 

2115 

2116 if not success: 2116 ↛ 2117line 2116 didn't jump to line 2117 because the condition on line 2116 was never true

2117 raise RuntimeError("Failed to save research report") # noqa: TRY301 — triggers research failure handling in outer except 

2118 

2119 logger.info( 

2120 f"Report saved to database for research_id: {research_id}" 

2121 ) 

2122 

2123 # Save sources AFTER report (non-fatal; assembler omits 

2124 # empty Sources blocks if this fails). 

2125 try: 

2126 from .research_sources_service import ResearchSourcesService 

2127 

2128 sources_service = ResearchSourcesService() 

2129 if all_links: 

2130 logger.info(f"Saving {len(all_links)} sources to database") 

2131 sources_saved = sources_service.save_research_sources( 

2132 research_id=research_id, 

2133 sources=all_links, 

2134 username=username, 

2135 ) 

2136 logger.info( 

2137 f"Saved {sources_saved} sources for research {research_id}" 

2138 ) 

2139 except Exception: 

2140 logger.exception( 

2141 f"Failed to save sources for research {research_id} (continuing)" 

2142 ) 

2143 

2144 # Update research status in database 

2145 completed_at = datetime.now(UTC).isoformat() 

2146 

2147 with get_user_db_session(username) as db_session: 

2148 research = ( 

2149 db_session.query(ResearchHistory) 

2150 .filter_by(id=research_id) 

2151 .first() 

2152 ) 

2153 

2154 # Preserve existing metadata and merge with report metadata 

2155 metadata = _parse_research_metadata(research.research_meta) 

2156 

2157 metadata.update(final_report["metadata"]) 

2158 metadata["iterations"] = results["iterations"] 

2159 

2160 # Use the helper function for consistent duration calculation 

2161 duration_seconds = calculate_duration( 

2162 research.created_at, completed_at 

2163 ) 

2164 

2165 research.status = ResearchStatus.COMPLETED 

2166 research.completed_at = completed_at 

2167 research.duration_seconds = duration_seconds 

2168 # Note: report_content is saved by CachedResearchService 

2169 # report_path is not used in encrypted database version 

2170 

2171 # Generate headline and topics only for news searches 

2172 if ( 2172 ↛ 2176line 2172 didn't jump to line 2176 because the condition on line 2172 was never true

2173 metadata.get("is_news_search") 

2174 or metadata.get("search_type") == "news_analysis" 

2175 ): 

2176 try: 

2177 from ...news.utils.headline_generator import ( 

2178 generate_headline, # type: ignore[no-redef] 

2179 ) 

2180 from ...news.utils.topic_generator import ( 

2181 generate_topics, # type: ignore[no-redef] 

2182 ) 

2183 

2184 # Get the report content from database for better headline/topic generation 

2185 report_content = "" 

2186 try: 

2187 research = ( 

2188 db_session.query(ResearchHistory) 

2189 .filter_by(id=research_id) 

2190 .first() 

2191 ) 

2192 if research and research.report_content: 

2193 report_content = research.report_content 

2194 else: 

2195 logger.warning( 

2196 f"No report content found in database for research_id: {research_id}" 

2197 ) 

2198 except Exception: 

2199 logger.warning( 

2200 "Could not retrieve report content from database" 

2201 ) 

2202 

2203 # Generate headline 

2204 headline = generate_headline(query, report_content) 

2205 metadata["generated_headline"] = headline 

2206 

2207 # Generate topics 

2208 topics = generate_topics( 

2209 query=query, 

2210 findings=report_content, 

2211 category=metadata.get("category", "News"), 

2212 max_topics=6, 

2213 ) 

2214 metadata["generated_topics"] = topics 

2215 

2216 logger.info(f"Generated headline: {headline}") 

2217 logger.info(f"Generated topics: {topics}") 

2218 

2219 except Exception: 

2220 logger.warning("Could not generate headline/topics") 

2221 

2222 research.research_meta = metadata 

2223 

2224 db_session.commit() 

2225 

2226 # Update subscription if this was triggered by a subscription 

2227 if metadata.get("subscription_id"): 2227 ↛ 2228line 2227 didn't jump to line 2228 because the condition on line 2227 was never true

2228 try: 

2229 from ...news.subscription_manager.storage import ( 

2230 SQLSubscriptionStorage, 

2231 ) 

2232 from datetime import datetime as dt, timezone, timedelta 

2233 

2234 sub_storage = SQLSubscriptionStorage(db_session) 

2235 subscription_id = metadata["subscription_id"] 

2236 

2237 # Get subscription to find refresh interval 

2238 subscription = sub_storage.get(subscription_id) 

2239 if subscription: 

2240 refresh_minutes = subscription.get( 

2241 "refresh_minutes", 240 

2242 ) 

2243 now = dt.now(timezone.utc) 

2244 next_refresh = now + timedelta( 

2245 minutes=refresh_minutes 

2246 ) 

2247 

2248 # Update refresh times 

2249 sub_storage.update_refresh_time( 

2250 subscription_id=subscription_id, 

2251 last_refresh=now, 

2252 next_refresh=next_refresh, 

2253 ) 

2254 

2255 # Increment stats 

2256 sub_storage.increment_stats(subscription_id, 1) 

2257 

2258 logger.info( 

2259 f"Updated subscription {subscription_id} refresh times" 

2260 ) 

2261 except Exception: 

2262 logger.warning( 

2263 "Could not update subscription refresh time" 

2264 ) 

2265 

2266 _maybe_save_chat_message(formatted_content) 

2267 

2268 progress_callback( 

2269 "Research completed successfully", 

2270 100, 

2271 {"phase": "complete"}, 

2272 ) 

2273 

2274 # Clean up resources 

2275 cleanup_research_resources( 

2276 research_id, username, user_password=user_password 

2277 ) 

2278 

2279 except ResearchTerminatedException: 

2280 logger.info(f"Research {research_id} terminated by user") 

2281 # Fallback path: when termination was raised from the streaming 

2282 # callback (mid-stream interrupt), progress_callback hasn't run 

2283 # since the flag was set, so handle_termination() was NOT called 

2284 # and the partial row hasn't been persisted yet. The helper is 

2285 # idempotent via streaming_state["_persisted"]; if the in-callback 

2286 # path already ran, both calls are no-ops. 

2287 _save_partial_chat_message_on_terminate( 

2288 shared_research_context.get("chat_session_id"), 

2289 research_id, 

2290 username, 

2291 "".join(streaming_state.get("chunks", [])), 

2292 truncated=streaming_state.get("_truncated", False), 

2293 streaming_state=streaming_state, 

2294 ) 

2295 # Ensure the SUSPENDED status update + cleanup runs even when the 

2296 # exception was raised mid-stream. The in-callback termination paths 

2297 # set "_termination_handled"; only run here when they did NOT, so a 

2298 # single termination doesn't queue two SUSPENDED updates, emit two 

2299 # final socket messages, and (in test mode) sleep twice. 

2300 if not streaming_state.get("_termination_handled"): 

2301 try: 

2302 handle_termination(research_id, username) 

2303 except Exception: 

2304 logger.opt(exception=True).debug( 

2305 "handle_termination in except block failed" 

2306 ) 

2307 

2308 except Exception as e: 

2309 # Handle error 

2310 error_message = f"Research failed: {e!s}" 

2311 logger.exception(error_message) 

2312 

2313 try: 

2314 # Check for common Ollama error patterns in the exception and provide more user-friendly errors 

2315 user_friendly_error = str(e) 

2316 error_context = {} 

2317 

2318 if "Error type: ollama_unavailable" in user_friendly_error: 

2319 user_friendly_error = "Ollama AI service is unavailable. Please check that Ollama is running properly on your system." 

2320 error_context = { 

2321 "solution": "Start Ollama with 'ollama serve' or check if it's installed correctly." 

2322 } 

2323 elif "Error type: model_not_found" in user_friendly_error: 

2324 user_friendly_error = "Required Ollama model not found. Please pull the model first." 

2325 error_context = { 

2326 "solution": "Run 'ollama pull mistral' to download the required model." 

2327 } 

2328 elif "Error type: connection_error" in user_friendly_error: 

2329 user_friendly_error = "Connection error with LLM service. Please check that your AI service is running." 

2330 error_context = { 

2331 "solution": "Ensure Ollama or your API service is running and accessible." 

2332 } 

2333 elif "Error type: api_error" in user_friendly_error: 

2334 # Keep the original error message as it's already improved 

2335 error_context = { 

2336 "solution": "Check API configuration and credentials." 

2337 } 

2338 # OpenAI-compatible runtime tokens (#3878). The friendly message 

2339 # built by friendly_openai_compatible_error() already names the 

2340 # provider, base URL, and model -- keep it as-is. 

2341 elif "Error type: openai_connection_refused" in user_friendly_error: 2341 ↛ 2342line 2341 didn't jump to line 2342 because the condition on line 2341 was never true

2342 error_context = { 

2343 "solution": "Start your LLM server (LM Studio / vLLM / llama.cpp server) and verify the base URL in Settings -> LLM Providers." 

2344 } 

2345 elif "Error type: openai_timeout" in user_friendly_error: 2345 ↛ 2346line 2345 didn't jump to line 2346 because the condition on line 2345 was never true

2346 error_context = { 

2347 "solution": "The server is reachable but slow -- it may be loading a model. Retry, or increase the request timeout." 

2348 } 

2349 elif "Error type: openai_auth" in user_friendly_error: 2349 ↛ 2350line 2349 didn't jump to line 2350 because the condition on line 2349 was never true

2350 error_context = { 

2351 "solution": "Set or correct the API key for this provider in Settings -> LLM Providers. Local servers usually accept any non-empty key." 

2352 } 

2353 elif "Error type: openai_permission_denied" in user_friendly_error: 2353 ↛ 2354line 2353 didn't jump to line 2354 because the condition on line 2353 was never true

2354 error_context = { 

2355 "solution": "Your API key is valid but lacks access to this model. Pick a model your account/server is permitted to use." 

2356 } 

2357 elif "Error type: openai_model_not_found" in user_friendly_error: 2357 ↛ 2358line 2357 didn't jump to line 2358 because the condition on line 2357 was never true

2358 error_context = { 

2359 "solution": "The model id is not loaded on this server. Pick a currently-loaded model in the provider's UI/config." 

2360 } 

2361 elif "Error type: openai_bad_request" in user_friendly_error: 2361 ↛ 2362line 2361 didn't jump to line 2362 because the condition on line 2361 was never true

2362 error_context = { 

2363 "solution": "The server rejected the request. Check the model id and any provider-specific parameters." 

2364 } 

2365 elif "Error type: openai_unknown" in user_friendly_error: 2365 ↛ 2366line 2365 didn't jump to line 2366 because the condition on line 2365 was never true

2366 error_context = { 

2367 "solution": "Check the provider's logs for the full error and verify the base URL / model id." 

2368 } 

2369 elif "Error type: openai_rate_limit" in user_friendly_error: 2369 ↛ 2370line 2369 didn't jump to line 2370 because the condition on line 2369 was never true

2370 error_context = { 

2371 "solution": "The provider rate-limited the request. Wait a moment and retry, or enable LLM Rate Limiting in Settings." 

2372 } 

2373 

2374 # Generate enhanced error report for failed research 

2375 enhanced_report_content = None 

2376 try: 

2377 # Get partial results if they exist 

2378 partial_results = results if "results" in locals() else None 

2379 search_iterations = ( 

2380 results.get("iterations", 0) if partial_results else 0 

2381 ) 

2382 

2383 # Generate comprehensive error report 

2384 # ErrorReportGenerator does not use LLM (kept for compat) 

2385 error_generator = ErrorReportGenerator() 

2386 enhanced_report_content = error_generator.generate_error_report( 

2387 error_message=f"Research failed: {e!s}", 

2388 query=query, 

2389 partial_results=partial_results, 

2390 search_iterations=search_iterations, 

2391 research_id=research_id, 

2392 ) 

2393 

2394 logger.info( 

2395 "Generated enhanced error report for failed research (length: {})", 

2396 len(enhanced_report_content), 

2397 ) 

2398 

2399 # Save enhanced error report to encrypted database 

2400 try: 

2401 # username already available from function scope (line 281) 

2402 if username: 2402 ↛ 2424line 2402 didn't jump to line 2424 because the condition on line 2402 was always true

2403 from ...storage import get_report_storage 

2404 

2405 with get_user_db_session(username) as db_session: 

2406 storage = get_report_storage(session=db_session) 

2407 success = storage.save_report( 

2408 research_id=research_id, 

2409 content=enhanced_report_content, 

2410 metadata={"error_report": True}, 

2411 username=username, 

2412 ) 

2413 if success: 

2414 logger.info( 

2415 "Saved enhanced error report to encrypted database for research {}", 

2416 research_id, 

2417 ) 

2418 else: 

2419 logger.warning( 

2420 "Failed to save enhanced error report to database for research {}", 

2421 research_id, 

2422 ) 

2423 else: 

2424 logger.warning( 

2425 "Cannot save error report: username not available" 

2426 ) 

2427 

2428 except Exception as report_error: 

2429 logger.exception( 

2430 "Failed to save enhanced error report: {}", report_error 

2431 ) 

2432 

2433 except Exception as error_gen_error: 

2434 logger.exception( 

2435 "Failed to generate enhanced error report: {}", 

2436 error_gen_error, 

2437 ) 

2438 enhanced_report_content = None 

2439 

2440 # Get existing metadata from database first 

2441 existing_metadata = {} 

2442 try: 

2443 # username already available from function scope (line 281) 

2444 if username: 2444 ↛ 2457line 2444 didn't jump to line 2457 because the condition on line 2444 was always true

2445 with get_user_db_session(username) as db_session: 

2446 research = ( 

2447 db_session.query(ResearchHistory) 

2448 .filter_by(id=research_id) 

2449 .first() 

2450 ) 

2451 if research and research.research_meta: 

2452 existing_metadata = dict(research.research_meta) 

2453 except Exception: 

2454 logger.exception("Failed to get existing metadata") 

2455 

2456 # Update metadata with more context about the error while preserving existing values 

2457 metadata = existing_metadata 

2458 metadata.update({"phase": "error", "error": user_friendly_error}) 

2459 if error_context: 

2460 metadata.update(error_context) 

2461 if enhanced_report_content: 2461 ↛ 2465line 2461 didn't jump to line 2465 because the condition on line 2461 was always true

2462 metadata["has_enhanced_report"] = True 

2463 

2464 # If we still have an active research record, update its log 

2465 if is_research_active(research_id): 

2466 progress_callback(user_friendly_error, None, metadata) 

2467 

2468 # If termination was requested, mark as suspended instead of failed 

2469 status = ( 

2470 ResearchStatus.SUSPENDED 

2471 if is_termination_requested(research_id) 

2472 else ResearchStatus.FAILED 

2473 ) 

2474 message = ( 

2475 "Research was terminated by user" 

2476 if status == ResearchStatus.SUSPENDED 

2477 else user_friendly_error 

2478 ) 

2479 

2480 # Calculate duration up to termination point - using UTC consistently 

2481 now = datetime.now(UTC) 

2482 completed_at = now.isoformat() 

2483 

2484 # NOTE: Database updates from threads are handled by queue processor 

2485 # The queue_processor.queue_error_update() method is already being used below 

2486 # to safely update the database from the main thread 

2487 

2488 # Queue the error update to be processed in main thread 

2489 # Using the queue processor v2 system 

2490 from ..queue.processor_v2 import queue_processor 

2491 

2492 if username: 2492 ↛ 2506line 2492 didn't jump to line 2506 because the condition on line 2492 was always true

2493 queue_processor.queue_error_update( 

2494 username=username, 

2495 research_id=research_id, 

2496 status=status, 

2497 error_message=message, 

2498 metadata=metadata, 

2499 completed_at=completed_at, 

2500 report_path=None, 

2501 ) 

2502 logger.info( 

2503 f"Queued error update for research {research_id} with status '{status}'" 

2504 ) 

2505 else: 

2506 logger.error( 

2507 f"Cannot queue error update for research {research_id} - no username provided. " 

2508 f"Status: '{status}', Message: {message}" 

2509 ) 

2510 

2511 try: 

2512 SocketIOService().emit_to_subscribers( 

2513 "progress", 

2514 research_id, 

2515 {"status": status, "error": message}, 

2516 ) 

2517 except Exception: 

2518 logger.exception("Failed to emit error via socket") 

2519 

2520 # Add error message to chat session if applicable. 

2521 # Read chat_session_id from shared_research_context (the canonical 

2522 # source — kept consistent with the six other reads in this file). 

2523 chat_session_id = shared_research_context.get("chat_session_id") 

2524 if chat_session_id and username: 

2525 try: 

2526 from ...chat.service import ChatService 

2527 

2528 chat_service = ChatService(username) 

2529 # allow_archived=True: same multi-tab race rationale as 

2530 # the completion / stop-and-partial paths — if the user 

2531 # archived (or deleted) the session between research 

2532 # start and failure, the error message would otherwise 

2533 # be silently dropped by the active-only insert guard 

2534 # and the user sees nothing in the chat. 

2535 chat_service.add_message( 

2536 session_id=chat_session_id, 

2537 role="assistant", 

2538 content=f"Sorry, the research failed: {message}", 

2539 message_type="response", 

2540 allow_archived=True, 

2541 ) 

2542 except Exception: 

2543 # Promoted from debug → warning to match the success-path 

2544 # rationale: if this write fails the user never sees the 

2545 # error and operators get no signal at debug-off level. 

2546 logger.opt(exception=True).warning( 

2547 "Could not add error message to chat session" 

2548 ) 

2549 

2550 except Exception: 

2551 logger.exception("Error in error handler") 

2552 

2553 # Clean up resources. This is the error path, so report FAILED on 

2554 # the final socket message rather than a spurious "completed". 

2555 cleanup_research_resources( 

2556 research_id, 

2557 username, 

2558 user_password=user_password, 

2559 final_status=ResearchStatus.FAILED, 

2560 ) 

2561 

2562 finally: 

2563 # RESOURCE CLEANUP: Close search engine HTTP sessions. 

2564 # 

2565 # Search engines (created via get_search()) may hold HTTP connection 

2566 # pools. Currently only SemanticScholarSearchEngine creates a 

2567 # persistent SafeSession; other engines use stateless safe_get()/ 

2568 # safe_post() utility functions. However, BaseSearchEngine.close() 

2569 # is safe to call on any engine — it checks for a 'session' 

2570 # attribute and is fully idempotent (SemanticScholar sets 

2571 # self.session = None after close). 

2572 # 

2573 # Neither @thread_cleanup nor cleanup_research_resources() close 

2574 # the search engine — @thread_cleanup only handles database sessions 

2575 # and context cleanup, and cleanup_research_resources() only handles 

2576 # status updates, notifications, and tracking dict removal. 

2577 # 

2578 # Without this explicit close, search engine sessions rely on 

2579 # Python's non-deterministic garbage collection (__del__) for 

2580 # cleanup, which can cause file descriptor exhaustion under 

2581 # sustained load. 

2582 from ...utilities.resource_utils import safe_close 

2583 

2584 if "use_search" in locals(): 

2585 safe_close(use_search, "research search engine") 

2586 # Close search system (cascades to strategy thread pools). 

2587 # See AdvancedSearchSystem.close() for details. 

2588 if "system" in locals(): 

2589 safe_close(system, "research system") 

2590 # Close the LLM instance created for model/provider overrides. 

2591 # system.close() does NOT close the LLM passed to it via system.model, 

2592 # so we must close it explicitly here. 

2593 if "use_llm" in locals(): 

2594 safe_close(use_llm, "research LLM") 

2595 

2596 

2597def cleanup_research_resources( 

2598 research_id, 

2599 username=None, 

2600 user_password=None, 

2601 final_status=ResearchStatus.COMPLETED, 

2602): 

2603 """ 

2604 Clean up resources for a completed research. 

2605 

2606 Args: 

2607 research_id: The ID of the research 

2608 username: The username for database access (required for thread context) 

2609 final_status: The terminal status to report on the final socket 

2610 message. Callers that end a research for a reason other than 

2611 normal completion MUST pass the real status (e.g. SUSPENDED on 

2612 user termination, FAILED on error) so the final ``progress`` 

2613 event matches reality. Defaulting this to COMPLETED — and 

2614 previously hard-coding it — caused the stop/error paths to emit 

2615 a spurious "completed" signal to subscribers. 

2616 """ 

2617 from ..routes.globals import cleanup_research 

2618 

2619 logger.info("Cleaning up resources for research {}", research_id) 

2620 

2621 # For testing: Add a small delay to simulate research taking time 

2622 # This helps test concurrent research limits 

2623 from ...settings.env_registry import is_test_mode 

2624 

2625 if is_test_mode(): 

2626 import time 

2627 

2628 logger.info( 

2629 f"Test mode: Adding 5 second delay before cleanup for {research_id}" 

2630 ) 

2631 time.sleep(5) 

2632 

2633 # The terminal status to report on the final socket message. This comes 

2634 # from the caller (which knows why the research ended) rather than a 

2635 # hard-coded COMPLETED, so termination (SUSPENDED) and error (FAILED) 

2636 # paths no longer emit a false "completed" signal to subscribers. 

2637 current_status = final_status 

2638 

2639 # NOTE: Queue processor already handles database updates from the main thread 

2640 # The notify_research_completed() method is called at the end of this function 

2641 # which safely updates the database status 

2642 

2643 # Notify queue processor that research completed 

2644 # This uses processor_v2 which handles database updates in the main thread 

2645 # avoiding the Flask request context issues that occur in background threads 

2646 from ..queue.processor_v2 import queue_processor 

2647 

2648 if username: 

2649 queue_processor.notify_research_completed( 

2650 username, research_id, user_password=user_password 

2651 ) 

2652 logger.info( 

2653 f"Notified queue processor of completion for research {research_id} (user: {username})" 

2654 ) 

2655 else: 

2656 logger.warning( 

2657 f"Cannot notify completion for research {research_id} - no username provided" 

2658 ) 

2659 

2660 # Remove from active research and termination flags atomically 

2661 cleanup_research(research_id) 

2662 

2663 # Clean up throttle state for this research 

2664 with _last_emit_lock: 

2665 _last_emit_times.pop(research_id, None) 

2666 

2667 # Send a final message to subscribers 

2668 try: 

2669 # Send a final message to any remaining subscribers with explicit status 

2670 # Use the proper status message based on database status 

2671 if current_status in ( 

2672 ResearchStatus.SUSPENDED, 

2673 ResearchStatus.FAILED, 

2674 ): 

2675 final_message = { 

2676 "status": current_status, 

2677 "message": f"Research was {current_status}", 

2678 "progress": 0, # For suspended research, show 0% not 100% 

2679 } 

2680 else: 

2681 final_message = { 

2682 "status": ResearchStatus.COMPLETED, 

2683 "message": "Research process has ended and resources have been cleaned up", 

2684 "progress": 100, 

2685 } 

2686 

2687 logger.info( 

2688 "Sending final {} socket message for research {}", 

2689 current_status, 

2690 research_id, 

2691 ) 

2692 

2693 SocketIOService().emit_to_subscribers( 

2694 "progress", research_id, final_message 

2695 ) 

2696 

2697 # Clean up socket subscriptions for this research 

2698 SocketIOService().remove_subscriptions_for_research(research_id) 

2699 

2700 except Exception: 

2701 logger.exception("Error sending final cleanup message") 

2702 

2703 

2704def handle_termination(research_id, username=None): 

2705 """ 

2706 Handle the termination of a research process. 

2707 

2708 Args: 

2709 research_id: The ID of the research 

2710 username: The username for database access (required for thread context) 

2711 """ 

2712 logger.info(f"Handling termination for research {research_id}") 

2713 

2714 # Queue the status update to be processed in the main thread 

2715 # This avoids Flask request context errors in background threads 

2716 try: 

2717 from ..queue.processor_v2 import queue_processor 

2718 

2719 now = datetime.now(UTC) 

2720 completed_at = now.isoformat() 

2721 

2722 # Queue the suspension update 

2723 queue_processor.queue_error_update( 

2724 username=username, 

2725 research_id=research_id, 

2726 status=ResearchStatus.SUSPENDED, 

2727 error_message="Research was terminated by user", 

2728 metadata={"terminated_at": completed_at}, 

2729 completed_at=completed_at, 

2730 report_path=None, 

2731 ) 

2732 

2733 logger.info(f"Queued suspension update for research {research_id}") 

2734 except Exception: 

2735 logger.exception( 

2736 f"Error queueing termination update for research {research_id}" 

2737 ) 

2738 

2739 # Clean up resources (this already handles things properly). 

2740 # Pass SUSPENDED so the final socket message reports the real terminal 

2741 # status — not a spurious "completed" — to chat/progress subscribers. 

2742 cleanup_research_resources( 

2743 research_id, username, final_status=ResearchStatus.SUSPENDED 

2744 ) 

2745 

2746 

2747def cancel_research(research_id, username): 

2748 """ 

2749 Cancel/terminate a research process using ORM. 

2750 

2751 Args: 

2752 research_id: The ID of the research to cancel 

2753 username: The username of the user cancelling the research 

2754 

2755 Returns: 

2756 bool: True if the research was found and cancelled, False otherwise 

2757 """ 

2758 try: 

2759 from ..routes.globals import is_research_active, set_termination_flag 

2760 

2761 # Set termination flag 

2762 set_termination_flag(research_id) 

2763 

2764 # Check if the research is active 

2765 if is_research_active(research_id): 

2766 # Call handle_termination to update database 

2767 handle_termination(research_id, username) 

2768 return True 

2769 try: 

2770 with get_user_db_session(username) as db_session: 

2771 research = ( 

2772 db_session.query(ResearchHistory) 

2773 .filter_by(id=research_id) 

2774 .first() 

2775 ) 

2776 if not research: 

2777 logger.info(f"Research {research_id} not found in database") 

2778 return False 

2779 

2780 # Check if already in a terminal state 

2781 if research.status in ( 

2782 ResearchStatus.COMPLETED, 

2783 ResearchStatus.SUSPENDED, 

2784 ResearchStatus.FAILED, 

2785 ResearchStatus.ERROR, 

2786 ): 

2787 logger.info( 

2788 f"Research {research_id} already in terminal state: {research.status}" 

2789 ) 

2790 return True # Consider this a success since it's already stopped 

2791 

2792 # If it exists but isn't in active_research, still update status 

2793 research.status = ResearchStatus.SUSPENDED 

2794 db_session.commit() 

2795 logger.info(f"Successfully suspended research {research_id}") 

2796 except Exception: 

2797 logger.exception( 

2798 f"Error accessing database for research {research_id}" 

2799 ) 

2800 return False 

2801 

2802 return True 

2803 except Exception: 

2804 logger.exception( 

2805 f"Unexpected error in cancel_research for {research_id}" 

2806 ) 

2807 return False