Coverage for src/local_deep_research/chat/routes.py: 87%
351 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Flask routes for chat API.
4Provides endpoints for:
5- Chat page rendering
6- Session management (create, list, get, archive, delete)
7- Message management (send, list)
8- Research triggering from chat
9"""
11import unicodedata
12import uuid
13from datetime import datetime, timedelta, UTC
14from flask import Blueprint, request, jsonify, session
15from loguru import logger
16from sqlalchemy import update as sa_update
17from sqlalchemy.exc import IntegrityError, SQLAlchemyError
19from .service import (
20 ArchiveBlockedError,
21 ChatService,
22 ChatSessionNotFound,
23 DB_EXCEPTIONS,
24)
25from .context import ChatContextManager
26from ..constants import ResearchStatus
27from ..database.models import (
28 ChatMessage,
29 ChatSession,
30 ChatSessionStatus,
31 ResearchHistory,
32 UserActiveResearch,
33)
34from ..database.session_context import get_user_db_session
35from ..exceptions import DuplicateResearchError, SystemAtCapacityError
36from ..security.decorators import require_json_body
37from ..security.rate_limiter import _get_api_user_key, limiter
38from ..settings.manager import SettingsManager
39from ..web.auth.decorators import login_required
40from ..web.utils.templates import render_template_with_defaults
41from ..web.auth.password_utils import get_user_password
42from ..web.routes.globals import (
43 cleanup_research,
44 is_research_thread_alive,
45)
46from ..web.services.research_service import (
47 run_research_process,
48 start_research_process,
49)
51# Create blueprint
52chat_bp = Blueprint("chat", __name__)
54# Valid status values for sessions (built from the enum so a typo never
55# silently passes validation; the literal "all" sentinel widens the list
56# filter to every status without bypassing the whitelist).
57VALID_UPDATE_STATUSES = {
58 ChatSessionStatus.ACTIVE.value,
59 ChatSessionStatus.ARCHIVED.value,
60}
61VALID_LIST_STATUSES = {*(s.value for s in ChatSessionStatus), "all"}
63# Input length limits
64MAX_QUERY_LENGTH = 10_000
65MAX_TITLE_LENGTH = 500
66MAX_MESSAGE_LENGTH = 10_000
67# Hard cap on `offset` to prevent server-side DoS: get_session_messages
68# fetches `limit + offset` rows from BOTH chat_messages and chat_progress_steps
69# tables, so unbounded offset means unbounded SQL LIMIT. With cursor-based
70# pagination (`before_created_at`) as the recommended path, offset above a few
71# pages is not a normal access pattern.
72MAX_OFFSET = 1_000
74# Wider exception tuple used by HTTP route handlers (subsumes
75# service.DB_EXCEPTIONS plus the attribute/type errors that can escape
76# request-shape coercion code). DB_EXCEPTIONS itself is single-sourced
77# from chat.service so the two never drift.
78ROUTE_EXCEPTIONS = (
79 ValueError,
80 RuntimeError,
81 SQLAlchemyError,
82 AttributeError,
83 TypeError,
84)
87def _load_settings(username):
88 """Load all settings for a user.
90 ``bypass_cache=True`` matches the call pattern in
91 ``research_routes.start_research``: a setting changed via the UI
92 moments before the user sends a chat message must take effect on the
93 next research, not be served from a stale cache.
94 """
95 with get_user_db_session(username) as db:
96 return SettingsManager(db_session=db).get_all_settings(
97 bypass_cache=True
98 )
101def _parse_int_param(
102 value: str | None,
103 default: int,
104 min_val: int = 0,
105 max_val: int | None = None,
106) -> int:
107 """Safely parse an integer parameter with bounds checking."""
108 try:
109 result = int(value) if value is not None else default
110 if result < min_val:
111 return min_val
112 if max_val is not None and result > max_val:
113 return max_val
114 return result
115 except (ValueError, TypeError):
116 return default
119_INVISIBLE_UNICODE_CATEGORIES = {"Cf", "Zl", "Zp"}
122def _validate_title(title) -> tuple[str, int] | None:
123 """Return (error_message, http_status) when *title* is invalid, else None.
125 A title is invalid when it is not a non-empty string or exceeds
126 ``MAX_TITLE_LENGTH``. Callers that allow ``None`` (e.g. create_session
127 where omitting the title is fine) should short-circuit on ``None``
128 before calling this helper.
130 Strips Unicode format / line-separator characters (``Cf``/``Zl``/``Zp``,
131 including zero-width spaces U+200B-U+200D and BOM U+FEFF) before the
132 emptiness check so an "invisible" title like 500 zero-width chars is
133 rejected instead of saving a session that looks blank in the UI.
134 """
135 if not isinstance(title, str): 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 return ("Title cannot be empty", 400)
137 visible = "".join(
138 c
139 for c in title
140 if unicodedata.category(c) not in _INVISIBLE_UNICODE_CATEGORIES
141 )
142 if not visible.strip():
143 return ("Title cannot be empty", 400)
144 if len(title) > MAX_TITLE_LENGTH:
145 return (
146 f"Title too long (max {MAX_TITLE_LENGTH} characters)",
147 400,
148 )
149 return None
152def _cleanup_chat_send_rows(
153 username, research_id, message_id, session_id, reason: str
154) -> None:
155 """Undo the user-message + research_history rows committed by send_message
156 when ``start_research_process`` rejects the spawn.
158 Used by both the ``DuplicateResearchError`` (409) and
159 ``SystemAtCapacityError`` (429) paths. Failure to clean up is logged at
160 ERROR level so orphan rows + inflated message_count are visible to ops.
161 """
162 try:
163 with get_user_db_session(username) as cleanup_db:
164 cleanup_db.query(ResearchHistory).filter_by(id=research_id).delete()
165 cleanup_db.query(ChatMessage).filter_by(id=message_id).delete()
166 # Drop the per-user-cap tracking row too (the spawn never
167 # started, so no live thread owns it). research_id is a fresh
168 # UUID, so this only ever matches our own just-inserted row.
169 cleanup_db.query(UserActiveResearch).filter_by(
170 username=username, research_id=research_id
171 ).delete()
172 cleanup_db.execute(
173 sa_update(ChatSession)
174 .where(ChatSession.id == session_id)
175 .values(message_count=ChatSession.message_count - 1)
176 )
177 cleanup_db.commit()
178 except DB_EXCEPTIONS:
179 logger.exception(
180 f"Cleanup after {reason} chat-send rejection FAILED "
181 f"for research {research_id[:8]}... in chat "
182 f"{session_id[:8]}...; orphan rows + inflated "
183 f"message_count may persist until next sweep."
184 )
187# ============================================================================
188# Page Routes
189# ============================================================================
192@chat_bp.route("/chat/")
193@chat_bp.route("/chat/<session_id>")
194@login_required
195def chat_page(session_id=None):
196 """
197 Render the chat page.
199 Args:
200 session_id: Optional session ID to load existing session
201 """
202 return render_template_with_defaults(
203 "pages/chat.html", session_id=session_id
204 )
207# ============================================================================
208# Session API Routes
209# ============================================================================
212@chat_bp.route("/api/chat/sessions", methods=["POST"])
213@login_required
214# Per-user keying (default is per-IP). Without this, users behind a shared
215# NAT/proxy share one bucket and can DoS each other for legitimate chat use.
216@limiter.limit("20 per minute", key_func=_get_api_user_key)
217@require_json_body(
218 error_format="success",
219 error_message="Request body must be a JSON object",
220)
221def create_session():
222 """
223 Create a new chat session.
225 Request body:
226 {
227 "initial_query": "optional initial question",
228 "title": "optional custom title"
229 }
231 Returns:
232 {
233 "success": true,
234 "session_id": "uuid",
235 "session": { session data }
236 }
237 """
238 try:
239 username = session.get("username")
241 # @require_json_body has already guaranteed a dict body; reach for it
242 # directly. Flask caches the parse so this is not a duplicate call.
243 data = request.get_json(silent=True)
245 # Validate input lengths
246 initial_query = data.get("initial_query")
247 title = data.get("title")
249 # Reject non-string initial_query early so len() / downstream
250 # string ops don't raise TypeError → 500.
251 if initial_query is not None and not isinstance(initial_query, str): 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true
252 return jsonify(
253 {
254 "success": False,
255 "error": "initial_query must be a string",
256 }
257 ), 400
259 if initial_query and len(initial_query) > MAX_QUERY_LENGTH: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true
260 return jsonify(
261 {
262 "success": False,
263 "error": f"Initial query too long (max {MAX_QUERY_LENGTH} characters)",
264 }
265 ), 400
267 if title is not None:
268 err = _validate_title(title)
269 if err is not None: 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true
270 msg, status = err
271 return jsonify({"success": False, "error": msg}), status
273 settings_snapshot = _load_settings(username)
275 service = ChatService(username)
276 session_id = service.create_session(
277 initial_query=initial_query,
278 title=title,
279 settings_snapshot=settings_snapshot,
280 )
282 # Get the created session
283 try:
284 session_data = service.get_session(session_id)
285 except ChatSessionNotFound:
286 # Session was just created in this request — getting "not
287 # found" here means a delete-race or storage failure.
288 # Don't include session_id in the log message (flagged as
289 # sensitive by check-sensitive-logging); the exception's
290 # stack trace already carries enough context to diagnose.
291 logger.exception("Just-created chat session missing on read-back")
292 return jsonify(
293 {"success": False, "error": "Failed to load created session"}
294 ), 500
296 return jsonify(
297 {
298 "success": True,
299 "session_id": session_id,
300 "session": session_data,
301 }
302 )
304 except ROUTE_EXCEPTIONS:
305 logger.exception("Error creating chat session")
306 return jsonify(
307 {
308 "success": False,
309 "error": "Failed to create chat session",
310 }
311 ), 500
314@chat_bp.route(
315 "/api/chat/sessions/<session_id>/generate-title", methods=["POST"]
316)
317@login_required
318# Per-user keying + lower limit than create_session because each call is a
319# real LLM round-trip on a server-paid endpoint (vs create_session which is
320# zero-LLM DB work). Without per-user keying, shared-IP users share the bucket.
321@limiter.limit("10 per minute", key_func=_get_api_user_key)
322@require_json_body(
323 error_format="success",
324 error_message="Request body must be a JSON object",
325)
326def generate_session_title(session_id):
327 """
328 Regenerate the session title using the configured LLM.
330 This is a fire-and-forget endpoint the frontend calls asynchronously
331 right after creating a session, so the synchronous POST
332 /api/chat/sessions response isn't blocked on an LLM round-trip.
334 Request body: {"query": "the initial research query"}
336 Returns: {"success": true, "title": "..."} on success,
337 {"success": false, "error": "..."} on failure.
338 """
339 try:
340 username = session.get("username")
341 # @require_json_body has already guaranteed a dict body.
342 data = request.get_json(silent=True)
344 query = data.get("query")
346 if not query:
347 return jsonify(
348 {"success": False, "error": "query is required"}
349 ), 400
350 if not isinstance(query, str) or len(query) > MAX_QUERY_LENGTH: 350 ↛ 351line 350 didn't jump to line 351 because the condition on line 350 was never true
351 return jsonify(
352 {
353 "success": False,
354 "error": f"query must be a string up to {MAX_QUERY_LENGTH} chars",
355 }
356 ), 400
358 service = ChatService(username)
359 try:
360 service.get_session(session_id)
361 except ChatSessionNotFound:
362 return jsonify(
363 {"success": False, "error": "Session not found"}
364 ), 404
366 settings_snapshot = _load_settings(username)
367 new_title = service.regenerate_title_with_llm(
368 session_id, query, settings_snapshot
369 )
370 if not new_title: 370 ↛ 374line 370 didn't jump to line 374 because the condition on line 370 was always true
371 # LLM disabled, or LLM call failed — keep existing fallback title
372 return jsonify({"success": False, "title": None}), 200
374 return jsonify({"success": True, "title": new_title})
376 except ROUTE_EXCEPTIONS:
377 logger.exception("Error regenerating chat title")
378 return jsonify(
379 {"success": False, "error": "Failed to regenerate title"}
380 ), 500
383@chat_bp.route("/api/chat/sessions", methods=["GET"])
384@login_required
385def list_sessions():
386 """
387 List chat sessions for the current user.
389 Query params:
390 - status: active, archived, deleted, or all (default: active)
391 - limit: max sessions to return (default: 20)
392 - offset: pagination offset (default: 0)
394 Returns:
395 {
396 "success": true,
397 "sessions": [ session data list ]
398 }
399 """
400 try:
401 username = session.get("username")
402 status = request.args.get("status", ChatSessionStatus.ACTIVE.value)
403 # Validate status parameter
404 if status not in VALID_LIST_STATUSES:
405 status = ChatSessionStatus.ACTIVE.value
406 limit = _parse_int_param(
407 request.args.get("limit"), 20, min_val=1, max_val=100
408 )
409 offset = _parse_int_param(
410 request.args.get("offset"), 0, min_val=0, max_val=MAX_OFFSET
411 )
413 service = ChatService(username)
414 sessions = service.list_sessions(
415 status=status, limit=limit, offset=offset
416 )
418 return jsonify(
419 {
420 "success": True,
421 "sessions": sessions,
422 }
423 )
425 except ROUTE_EXCEPTIONS:
426 logger.exception("Error listing chat sessions")
427 return jsonify(
428 {
429 "success": False,
430 "error": "Failed to list chat sessions",
431 }
432 ), 500
435@chat_bp.route("/api/chat/sessions/<session_id>", methods=["GET"])
436@login_required
437def get_session(session_id):
438 """
439 Get a specific chat session.
441 Returns:
442 {
443 "success": true,
444 "session": { session data }
445 }
446 """
447 try:
448 username = session.get("username")
449 service = ChatService(username)
450 try:
451 session_data = service.get_session(session_id)
452 except ChatSessionNotFound:
453 return jsonify(
454 {
455 "success": False,
456 "error": "Session not found",
457 }
458 ), 404
460 return jsonify(
461 {
462 "success": True,
463 "session": session_data,
464 }
465 )
467 except ROUTE_EXCEPTIONS:
468 logger.exception("Error getting chat session")
469 return jsonify(
470 {
471 "success": False,
472 "error": "Failed to get chat session",
473 }
474 ), 500
477@chat_bp.route("/api/chat/sessions/<session_id>", methods=["PATCH"])
478@login_required
479# Per-user keying, like the other state-changing chat routes. Without a
480# per-route limit, rename/archive was bounded only by the global limiter,
481# leaving an uneven abuse surface across the session API.
482@limiter.limit("30 per minute", key_func=_get_api_user_key)
483@require_json_body(
484 error_format="success",
485 error_message="Request body must be a JSON object",
486)
487def update_session(session_id):
488 """
489 Update a chat session (title, archive, delete).
491 Request body:
492 {
493 "title": "new title", // optional
494 "status": "archived" // optional: active, archived
495 }
496 """
497 try:
498 username = session.get("username")
499 # @require_json_body has already guaranteed a dict body.
500 data = request.get_json(silent=True)
502 # Require at least one valid field
503 valid_fields = {"title", "status"}
504 if not any(field in data for field in valid_fields): 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true
505 return jsonify(
506 {
507 "success": False,
508 "error": "Request must include at least one of: title, status",
509 }
510 ), 400
512 service = ChatService(username)
514 try:
515 service.get_session(session_id)
516 except ChatSessionNotFound:
517 return jsonify(
518 {"success": False, "error": "Session not found"}
519 ), 404
521 ops_ok = True
523 if "title" in data:
524 title = data["title"]
525 err = _validate_title(title)
526 if err is not None:
527 msg, status = err
528 return jsonify({"success": False, "error": msg}), status
529 ops_ok = service.update_session_title(session_id, title) and ops_ok
531 if "status" in data:
532 new_status = data["status"]
533 if new_status not in VALID_UPDATE_STATUSES:
534 return jsonify(
535 {"success": False, "error": "Invalid status value"}
536 ), 400
537 if new_status == ChatSessionStatus.ACTIVE.value:
538 ops_ok = service.reactivate_session(session_id) and ops_ok
539 elif new_status == ChatSessionStatus.ARCHIVED.value: 539 ↛ 557line 539 didn't jump to line 557 because the condition on line 539 was always true
540 try:
541 ops_ok = service.archive_session(session_id) and ops_ok
542 except ArchiveBlockedError:
543 # Symmetric with send-to-archived (also 409): the
544 # client should stop the research or wait for it to
545 # finish before archiving the session.
546 # Hard-coded message — never echo str(exc) here so a
547 # future ArchiveBlockedError raise with interpolated
548 # data can't leak to the response (information
549 # exposure through an exception, CWE-209).
550 return jsonify(
551 {
552 "success": False,
553 "error": "Cannot archive: research in_progress. Stop it first.",
554 }
555 ), 409
557 try:
558 session_data = service.get_session(session_id)
559 except ChatSessionNotFound:
560 # Session was deleted by a concurrent request between the
561 # update above and this read-back. Treat as 404 rather than
562 # returning a partial success with null data.
563 return jsonify(
564 {"success": False, "error": "Session not found"}
565 ), 404
567 if not ops_ok: 567 ↛ 571line 567 didn't jump to line 571 because the condition on line 567 was never true
568 # The read-back above succeeded, so the session still exists, yet
569 # an update reported failure — a DB write error was swallowed into
570 # a False return. Surface it instead of reporting success.
571 logger.error(
572 f"Chat session update failed at DB layer for "
573 f"{session_id[:8]}..."
574 )
575 return jsonify(
576 {"success": False, "error": "Failed to update session"}
577 ), 500
579 return jsonify(
580 {
581 "success": True,
582 "session": session_data,
583 }
584 )
586 except ROUTE_EXCEPTIONS:
587 logger.exception("Error updating chat session")
588 return jsonify(
589 {
590 "success": False,
591 "error": "Failed to update chat session",
592 }
593 ), 500
596@chat_bp.route("/api/chat/sessions/<session_id>", methods=["DELETE"])
597@login_required
598# Per-user keying, like the other state-changing chat routes. Caps bulk
599# delete attempts that the global limiter alone left under-constrained.
600@limiter.limit("30 per minute", key_func=_get_api_user_key)
601def delete_session(session_id):
602 """Delete a chat session permanently."""
603 try:
604 username = session.get("username")
605 service = ChatService(username)
606 success = service.delete_session(session_id)
608 if not success:
609 return jsonify(
610 {
611 "success": False,
612 "error": "Session not found",
613 }
614 ), 404
616 return jsonify(
617 {
618 "success": True,
619 }
620 )
622 except ROUTE_EXCEPTIONS:
623 logger.exception("Error deleting chat session")
624 return jsonify(
625 {
626 "success": False,
627 "error": "Failed to delete chat session",
628 }
629 ), 500
632# ============================================================================
633# Message API Routes
634# ============================================================================
637@chat_bp.route("/api/chat/sessions/<session_id>/messages", methods=["GET"])
638@login_required
639def get_messages(session_id):
640 """
641 Get messages for a chat session.
643 Query params:
644 - limit: max messages to return (default: 50, max: 100)
645 - offset: pagination offset into the DESC slice (default: 0)
646 - before_created_at: ISO timestamp cursor — return only entries
647 strictly older than this. Use the oldest currently-displayed
648 ``created_at`` to implement "load older messages".
649 - before_id: optional id of the oldest currently-displayed row;
650 when paired with `before_created_at` the cursor becomes
651 composite, preventing same-millisecond rows at the page boundary
652 from being silently dropped.
654 Returns:
655 {
656 "success": true,
657 "messages": [ message data list, ASC by created_at ],
658 "has_more": bool,
659 "in_progress_research_id": str | null
660 }
661 """
662 try:
663 username = session.get("username")
664 limit = _parse_int_param(
665 request.args.get("limit"), 50, min_val=1, max_val=100
666 )
667 offset = _parse_int_param(
668 request.args.get("offset"), 0, min_val=0, max_val=MAX_OFFSET
669 )
670 before_created_at = request.args.get("before_created_at") or None
671 before_id = request.args.get("before_id") or None
673 service = ChatService(username)
675 try:
676 service.get_session(session_id)
677 except ChatSessionNotFound:
678 return jsonify(
679 {"success": False, "error": "Session not found"}
680 ), 404
682 # Fetch one extra row so we can tell the client whether more
683 # older entries exist without a second round-trip.
684 peek_limit = limit + 1
685 page = service.get_session_messages(
686 session_id,
687 limit=peek_limit,
688 offset=offset,
689 before_created_at=before_created_at,
690 before_id=before_id,
691 )
692 has_more = len(page) > limit
693 messages = page[-limit:] if has_more else page
695 # The client (chat.js loadSession) restores the live "thinking"
696 # indicator from this field instead of inferring in-flight state
697 # from message metadata. O(1) via the partial-unique index
698 # ux_research_history_chat_session_in_progress.
699 in_progress_research_id = service.get_in_progress_research_id(
700 session_id
701 )
703 return jsonify(
704 {
705 "success": True,
706 "messages": messages,
707 "has_more": has_more,
708 "in_progress_research_id": in_progress_research_id,
709 }
710 )
712 except ROUTE_EXCEPTIONS:
713 logger.exception("Error getting chat messages")
714 return jsonify(
715 {
716 "success": False,
717 "error": "Failed to get chat messages",
718 }
719 ), 500
722@chat_bp.route("/api/chat/sessions/<session_id>/messages", methods=["POST"])
723@login_required
724# Per-user keying (default is per-IP). send_message launches a full research
725# run, so this is the heaviest chat endpoint; shared-IP users sharing the
726# bucket would lock each other out.
727@limiter.limit("10 per minute", key_func=_get_api_user_key)
728@require_json_body(
729 error_format="success",
730 error_message="Request body must be a JSON object",
731)
732def send_message(session_id):
733 """
734 Send a message in a chat session.
736 This endpoint:
737 1. Adds the user message to the session
738 2. Decides if research is needed
739 3. If research needed, starts research process
740 4. Returns message ID and research ID (if applicable)
742 Request body:
743 {
744 "content": "user message",
745 "trigger_research": true // optional, default true
746 }
748 Note: Research mode is always "quick" in chat. This is intentional for v1.
750 Returns:
751 {
752 "success": true,
753 "message_id": "uuid",
754 "research_id": "uuid or null",
755 "research_mode": "quick/none"
756 }
757 """
758 try:
759 username = session.get("username")
760 # @require_json_body has already guaranteed a dict body and rejected
761 # non-JSON content types (which also hardens CSRF, matching the other
762 # state-changing chat POSTs). Flask caches the parse, so this is free.
763 data = request.get_json(silent=True)
765 if not data or not data.get("content"):
766 return jsonify(
767 {
768 "success": False,
769 "error": "Message content is required",
770 }
771 ), 400
773 # Reject non-string content before .strip() raises AttributeError
774 # → 500. Mirrors the isinstance guard in _validate_title.
775 if not isinstance(data["content"], str): 775 ↛ 776line 775 didn't jump to line 776 because the condition on line 775 was never true
776 return jsonify(
777 {
778 "success": False,
779 "error": "content must be a string",
780 }
781 ), 400
783 content = data["content"].strip()
785 # Reject whitespace-only content
786 if not content:
787 return jsonify(
788 {
789 "success": False,
790 "error": "Message content is required",
791 }
792 ), 400
794 if len(content) > MAX_MESSAGE_LENGTH:
795 return jsonify(
796 {
797 "success": False,
798 "error": f"Message too long (max {MAX_MESSAGE_LENGTH} characters)",
799 }
800 ), 400
802 raw = data.get("trigger_research", True)
803 trigger_research = raw if isinstance(raw, bool) else True
805 service = ChatService(username)
807 # Verify session exists (informational fast-fail; the
808 # UPDATE...RETURNING inside insert_message_in_db is the
809 # authoritative check that survives a delete-race).
810 try:
811 session_data = service.get_session(session_id)
812 except ChatSessionNotFound:
813 return jsonify(
814 {
815 "success": False,
816 "error": "Session not found",
817 }
818 ), 404
820 # Reject sends to non-active sessions. Archived/deleted sessions
821 # are intentionally read-only — users must reactivate before
822 # continuing the conversation.
823 if session_data.get("status") != ChatSessionStatus.ACTIVE.value: 823 ↛ 824line 823 didn't jump to line 824 because the condition on line 823 was never true
824 return jsonify(
825 {
826 "success": False,
827 "error": "This chat is archived. Reactivate it to continue.",
828 }
829 ), 409
831 # Pre-fetch existing messages for context decisions.
832 messages = service.get_session_messages(session_id, limit=20)
834 research_id = None
835 research_mode = "none"
836 message_id = None
837 settings_snapshot = None
838 research_context = None
840 if trigger_research:
841 # Always quick mode in chat (intentional v1 scope).
842 research_mode = "quick"
844 # ---- Concurrency guards (per-session + global per-user) ----
845 # Both guards run in one transaction so a stale-row reclaim
846 # is visible to the count check below it.
847 #
848 # Without the stale-thread sweep, a process crash leaves the
849 # ResearchHistory row at IN_PROGRESS forever — every later
850 # send_message returns 409 with no in-chat way to recover.
851 #
852 # Sweep AGE NOTE: a brand-new IN_PROGRESS row briefly exists
853 # before its worker registers in `_active_research` (between
854 # the DB commit below and the `start_research_process` call).
855 # During that window `is_research_thread_alive` would return
856 # False even though the thread spawn is in flight. Only reclaim
857 # rows older than `_STALE_RESEARCH_GRACE_SECONDS` (default 30s)
858 # so we don't kill our own freshly-inserted research from a
859 # racing concurrent send.
860 _STALE_RESEARCH_GRACE_SECONDS = 30
861 grace_cutoff_dt = datetime.now(UTC) - timedelta(
862 seconds=_STALE_RESEARCH_GRACE_SECONDS
863 )
864 # ResearchHistory.created_at is a String column (ISO-8601);
865 # UserActiveResearch.started_at is a UtcDateTime column.
866 grace_cutoff_iso = grace_cutoff_dt.isoformat()
867 with get_user_db_session(username) as cap_db:
868 # 1. Reclaim stale chat-session research rows whose
869 # worker thread is dead AND that are older than the
870 # spawn-grace cutoff.
871 stale_chat = (
872 cap_db.query(ResearchHistory)
873 .filter(
874 ResearchHistory.chat_session_id == session_id,
875 ResearchHistory.status == ResearchStatus.IN_PROGRESS,
876 ResearchHistory.created_at < grace_cutoff_iso,
877 )
878 .all()
879 )
880 reclaimed_chat = False
881 for row in stale_chat: 881 ↛ 882line 881 didn't jump to line 882 because the loop on line 881 never started
882 if not is_research_thread_alive(row.id):
883 logger.warning(
884 f"Reclaiming stale chat research {row.id[:8]}... "
885 f"(thread dead) on chat {session_id[:8]}..."
886 )
887 row.status = ResearchStatus.FAILED
888 cleanup_research(row.id)
889 reclaimed_chat = True
890 if reclaimed_chat: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true
891 cap_db.commit()
893 # 2. Per-session guard: at most one live research per chat.
894 existing_session_research = (
895 cap_db.query(ResearchHistory)
896 .filter_by(
897 chat_session_id=session_id,
898 status=ResearchStatus.IN_PROGRESS,
899 )
900 .first()
901 )
902 if existing_session_research:
903 return jsonify(
904 {
905 "success": False,
906 "error": "Research already in progress on this chat session. Stop it before sending a new message.",
907 "active_research_id": existing_session_research.id,
908 }
909 ), 409
911 # 3. Reclaim stale UserActiveResearch rows so the count
912 # below isn't inflated by dead threads. Same grace
913 # window applied via started_at to avoid killing a
914 # sibling request's just-spawned thread. Shared with
915 # research_routes.start_research; chat passes a
916 # grace_cutoff_dt because chat send can race with
917 # its own concurrent sibling, research_routes can't.
918 from ..web.routes.globals import (
919 reclaim_stale_user_active_research,
920 )
922 if reclaim_stale_user_active_research( 922 ↛ 928line 922 didn't jump to line 928 because the condition on line 922 was never true
923 cap_db,
924 username,
925 grace_cutoff_dt=grace_cutoff_dt,
926 logger=logger,
927 ):
928 cap_db.commit()
930 # 4. Global per-user cap (mirrors
931 # research_routes.start_research). Without this,
932 # multiple chat tabs let a user bypass the cap.
933 active_count = (
934 cap_db.query(UserActiveResearch)
935 .filter_by(
936 username=username,
937 status=ResearchStatus.IN_PROGRESS,
938 )
939 .count()
940 )
941 max_concurrent = SettingsManager(db_session=cap_db).get_setting(
942 "app.max_concurrent_researches", 3
943 )
944 if active_count >= max_concurrent:
945 return jsonify(
946 {
947 "success": False,
948 "error": (
949 f"Concurrent research limit reached "
950 f"({active_count}/{max_concurrent}). "
951 "Wait for an existing research to finish."
952 ),
953 }
954 ), 429
955 # ---- end concurrency guards ----
957 # Settings + context (read-only — fine to do after the cap
958 # check, before the atomic write).
959 if trigger_research:
960 settings_snapshot = _load_settings(username)
961 context_manager = ChatContextManager(
962 session_id,
963 messages,
964 session_data.get("accumulated_context"),
965 settings_snapshot=settings_snapshot,
966 )
967 # Pass the new user message so prior conversation is condensed
968 # into a summary focused on this question (used as the follow-up
969 # prompt's "previous findings").
970 research_context = context_manager.build_research_context(
971 current_query=content
972 )
973 research_id = str(uuid.uuid4())
975 # Parse numeric search settings up-front. A malformed value
976 # (a non-numeric string in the user's settings DB) must
977 # return a clean 400 HERE — before the atomic write below
978 # commits the user message + IN_PROGRESS research row. If the
979 # int() cast ran after the commit (as it used to, down in the
980 # research-dispatch block), the ValueError would propagate as
981 # an unhandled 500 with those rows already committed, orphaning
982 # them and soft-bricking the session via the per-session 409
983 # guard.
984 try:
985 iterations = int(
986 settings_snapshot.get("search.iterations", {}).get(
987 "value", 3
988 )
989 )
990 questions = int(
991 settings_snapshot.get(
992 "search.questions_per_iteration", {}
993 ).get("value", 1)
994 )
995 except (ValueError, TypeError):
996 return jsonify(
997 {
998 "success": False,
999 "error": (
1000 "Invalid numeric value in search settings "
1001 "(iterations / questions_per_iteration)."
1002 ),
1003 }
1004 ), 400
1006 # ---- Atomic write: user message + research row in ONE transaction ----
1007 # Closes the orphan window: any IntegrityError or
1008 # concurrent-delete on the research insert rolls back the user
1009 # message too. The UPDATE...RETURNING inside
1010 # insert_message_in_db doubles as the authoritative
1011 # "session-still-exists" check; if the session was deleted
1012 # between the get_session call above and now, a ValueError
1013 # surfaces with "not found" and we map it to 404.
1014 try:
1015 with get_user_db_session(username) as db_session:
1016 message_id = service.insert_message_in_db(
1017 db_session,
1018 session_id=session_id,
1019 role="user",
1020 content=content,
1021 message_type="query" if len(messages) == 0 else "followup",
1022 )
1024 if trigger_research:
1025 created_at = datetime.now(UTC).isoformat()
1026 research_meta = {
1027 "submission": {
1028 "chat_session_id": session_id,
1029 "message_id": message_id,
1030 "research_mode": research_mode,
1031 },
1032 }
1033 research = ResearchHistory(
1034 id=research_id,
1035 query=content,
1036 mode=research_mode,
1037 status=ResearchStatus.IN_PROGRESS.value,
1038 created_at=created_at,
1039 progress_log=[{"time": created_at, "progress": 0}],
1040 research_meta=research_meta,
1041 chat_session_id=session_id,
1042 )
1043 db_session.add(research)
1045 # Count this research toward the per-user concurrent
1046 # cap. Mirrors research_routes.start_research — without a
1047 # UserActiveResearch row, chat research is invisible to
1048 # the cap (queried at the top of this handler AND by the
1049 # UI start path), letting multiple chat tabs bypass it.
1050 # Added in the SAME transaction as the research row so the
1051 # IntegrityError rollback below undoes both. Removed on
1052 # spawn failure by _cleanup_chat_send_rows, and on normal
1053 # completion by the cleanup_completed_research middleware
1054 # (keyed on is_research_active(research_id), which covers
1055 # chat and non-chat research alike).
1056 import threading
1058 db_session.add(
1059 UserActiveResearch(
1060 username=username,
1061 research_id=research_id,
1062 status=ResearchStatus.IN_PROGRESS,
1063 thread_id=str(threading.current_thread().ident),
1064 settings_snapshot=settings_snapshot,
1065 )
1066 )
1068 try:
1069 db_session.commit()
1070 except IntegrityError:
1071 # Two near-simultaneous POSTs both passed the
1072 # per-session guard; the partial unique index on
1073 # (chat_session_id) WHERE status='in_progress'
1074 # (migration 0010) catches the loser here.
1075 # Rolling back the transaction also undoes the
1076 # user-message INSERT and the message_count
1077 # increment — no orphan.
1078 db_session.rollback()
1079 logger.warning(
1080 f"Concurrent in-progress research race for chat {session_id[:8]}..."
1081 )
1082 return jsonify(
1083 {
1084 "success": False,
1085 "error": "Research already in progress on this chat session.",
1086 }
1087 ), 409
1088 except ValueError as exc:
1089 # `insert_message_in_db` raises ValueError("not found")
1090 # when the session row was deleted between the existence
1091 # check and the UPDATE...RETURNING. Map to 404 so the
1092 # client can distinguish a deleted session from a 500.
1093 if "not found" in str(exc).lower(): 1093 ↛ 1097line 1093 didn't jump to line 1097 because the condition on line 1093 was always true
1094 return jsonify(
1095 {"success": False, "error": "Session not found"}
1096 ), 404
1097 raise
1098 # ---- end atomic write ----
1100 if trigger_research:
1101 # Type narrowing: the variables below were initialized to None
1102 # at the top of the route and then assigned inside the matching
1103 # `if trigger_research:` block above. They are guaranteed
1104 # non-None here, but mypy doesn't connect the two branches —
1105 # so we narrow explicitly. Uses a real runtime check (not
1106 # ``assert``) so it survives ``python -O`` (bandit S101).
1107 if (
1108 settings_snapshot is None
1109 or research_context is None
1110 or research_id is None
1111 or message_id is None
1112 ): # pragma: no cover — unreachable invariant guard
1113 raise RuntimeError(
1114 "trigger_research path entered with unset state"
1115 )
1116 # Get user password for metrics
1117 pw = get_user_password(username)
1119 # Extract settings values with safe .get() defaults
1120 model_provider = settings_snapshot.get("llm.provider", {}).get(
1121 "value", ""
1122 )
1123 model = settings_snapshot.get("llm.model", {}).get("value", "")
1124 search_engine = settings_snapshot.get("search.tool", {}).get(
1125 "value", ""
1126 )
1127 custom_endpoint = settings_snapshot.get(
1128 "llm.openai_endpoint.url", {}
1129 ).get("value")
1130 # Defensive fallbacks kept in sync with default_settings.json:
1131 # `search.search_strategy` defaults to "langgraph-agent",
1132 # `search.iterations` to 3, `search.questions_per_iteration`
1133 # to 1. Out-of-sync fallbacks here silently produce a
1134 # *different* research product if the snapshot ever lacks
1135 # the key (e.g., fresh user DB before defaults are seeded).
1136 user_strategy = settings_snapshot.get(
1137 "search.search_strategy", {}
1138 ).get("value", "langgraph-agent")
1139 # `iterations` and `questions` were parsed + validated up-front
1140 # (before the atomic write) so a malformed setting returns a
1141 # clean 400 instead of orphaning a committed research row.
1143 # For follow-up messages, use the contextual follow-up strategy
1144 # which wraps the user's preferred strategy as a delegate
1145 if research_context.get("is_multi_turn"):
1146 strategy = "enhanced-contextual-followup"
1147 research_context["delegate_strategy"] = user_strategy
1148 else:
1149 strategy = user_strategy
1151 # Spawn the worker thread. ``DuplicateResearchError``
1152 # inherits from ``Exception`` (not RuntimeError) so it is
1153 # NOT in ROUTE_EXCEPTIONS and would otherwise escape to a
1154 # generic 500 — leaving the user message + research row
1155 # we just committed as orphans. Catch it here, undo our
1156 # side effects, and return 409.
1157 try:
1158 start_research_process(
1159 research_id,
1160 content,
1161 research_mode,
1162 run_research_process,
1163 username=username,
1164 user_password=pw,
1165 model_provider=model_provider,
1166 model=model,
1167 search_engine=search_engine,
1168 custom_endpoint=custom_endpoint,
1169 strategy=strategy,
1170 iterations=iterations,
1171 questions_per_iteration=questions,
1172 research_context=research_context,
1173 chat_session_id=session_id,
1174 chat_message_id=message_id,
1175 settings_snapshot=settings_snapshot,
1176 )
1177 except DuplicateResearchError:
1178 logger.warning(
1179 f"DuplicateResearchError on chat send_message "
1180 f"for {research_id[:8]}... (chat {session_id[:8]}...)"
1181 )
1182 # Per ``DuplicateResearchError`` docstring: do NOT
1183 # mutate UserActiveResearch or the existing
1184 # ResearchHistory row — those belong to the live
1185 # thread. Only undo the rows we created in our own
1186 # transaction above.
1187 _cleanup_chat_send_rows(
1188 username, research_id, message_id, session_id, "duplicate"
1189 )
1190 return jsonify(
1191 {
1192 "success": False,
1193 "error": "Research already in progress on this chat session.",
1194 }
1195 ), 409
1196 except SystemAtCapacityError:
1197 # System at concurrent-research capacity. Undo the rows we
1198 # committed above and return 429 so the client can retry.
1199 logger.warning(
1200 f"SystemAtCapacityError on chat send_message "
1201 f"for {research_id[:8]}... (chat {session_id[:8]}...)"
1202 )
1203 _cleanup_chat_send_rows(
1204 username, research_id, message_id, session_id, "capacity"
1205 )
1206 return jsonify(
1207 {
1208 "success": False,
1209 "error": "Server is at research capacity. Please retry shortly.",
1210 }
1211 ), 429
1213 logger.info(
1214 f"Started chat research {research_id[:8]}... for chat {session_id[:8]}..."
1215 )
1217 return jsonify(
1218 {
1219 "success": True,
1220 "message_id": message_id,
1221 "session_id": session_id,
1222 "research_id": research_id,
1223 "research_mode": research_mode,
1224 }
1225 )
1227 except ROUTE_EXCEPTIONS:
1228 logger.exception("Error sending chat message")
1229 return jsonify(
1230 {
1231 "success": False,
1232 "error": "Failed to send message",
1233 }
1234 ), 500