Coverage for src/local_deep_research/chat/routes.py: 87%

351 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Flask routes for chat API. 

3 

4Provides endpoints for: 

5- Chat page rendering 

6- Session management (create, list, get, archive, delete) 

7- Message management (send, list) 

8- Research triggering from chat 

9""" 

10 

11import unicodedata 

12import uuid 

13from datetime import datetime, timedelta, UTC 

14from flask import Blueprint, request, jsonify, session 

15from loguru import logger 

16from sqlalchemy import update as sa_update 

17from sqlalchemy.exc import IntegrityError, SQLAlchemyError 

18 

19from .service import ( 

20 ArchiveBlockedError, 

21 ChatService, 

22 ChatSessionNotFound, 

23 DB_EXCEPTIONS, 

24) 

25from .context import ChatContextManager 

26from ..constants import ResearchStatus 

27from ..database.models import ( 

28 ChatMessage, 

29 ChatSession, 

30 ChatSessionStatus, 

31 ResearchHistory, 

32 UserActiveResearch, 

33) 

34from ..database.session_context import get_user_db_session 

35from ..exceptions import DuplicateResearchError, SystemAtCapacityError 

36from ..security.decorators import require_json_body 

37from ..security.rate_limiter import _get_api_user_key, limiter 

38from ..settings.manager import SettingsManager 

39from ..web.auth.decorators import login_required 

40from ..web.utils.templates import render_template_with_defaults 

41from ..web.auth.password_utils import get_user_password 

42from ..web.routes.globals import ( 

43 cleanup_research, 

44 is_research_thread_alive, 

45) 

46from ..web.services.research_service import ( 

47 run_research_process, 

48 start_research_process, 

49) 

50 

51# Create blueprint 

52chat_bp = Blueprint("chat", __name__) 

53 

54# Valid status values for sessions (built from the enum so a typo never 

55# silently passes validation; the literal "all" sentinel widens the list 

56# filter to every status without bypassing the whitelist). 

57VALID_UPDATE_STATUSES = { 

58 ChatSessionStatus.ACTIVE.value, 

59 ChatSessionStatus.ARCHIVED.value, 

60} 

61VALID_LIST_STATUSES = {*(s.value for s in ChatSessionStatus), "all"} 

62 

63# Input length limits 

64MAX_QUERY_LENGTH = 10_000 

65MAX_TITLE_LENGTH = 500 

66MAX_MESSAGE_LENGTH = 10_000 

67# Hard cap on `offset` to prevent server-side DoS: get_session_messages 

68# fetches `limit + offset` rows from BOTH chat_messages and chat_progress_steps 

69# tables, so unbounded offset means unbounded SQL LIMIT. With cursor-based 

70# pagination (`before_created_at`) as the recommended path, offset above a few 

71# pages is not a normal access pattern. 

72MAX_OFFSET = 1_000 

73 

74# Wider exception tuple used by HTTP route handlers (subsumes 

75# service.DB_EXCEPTIONS plus the attribute/type errors that can escape 

76# request-shape coercion code). DB_EXCEPTIONS itself is single-sourced 

77# from chat.service so the two never drift. 

78ROUTE_EXCEPTIONS = ( 

79 ValueError, 

80 RuntimeError, 

81 SQLAlchemyError, 

82 AttributeError, 

83 TypeError, 

84) 

85 

86 

87def _load_settings(username): 

88 """Load all settings for a user. 

89 

90 ``bypass_cache=True`` matches the call pattern in 

91 ``research_routes.start_research``: a setting changed via the UI 

92 moments before the user sends a chat message must take effect on the 

93 next research, not be served from a stale cache. 

94 """ 

95 with get_user_db_session(username) as db: 

96 return SettingsManager(db_session=db).get_all_settings( 

97 bypass_cache=True 

98 ) 

99 

100 

101def _parse_int_param( 

102 value: str | None, 

103 default: int, 

104 min_val: int = 0, 

105 max_val: int | None = None, 

106) -> int: 

107 """Safely parse an integer parameter with bounds checking.""" 

108 try: 

109 result = int(value) if value is not None else default 

110 if result < min_val: 

111 return min_val 

112 if max_val is not None and result > max_val: 

113 return max_val 

114 return result 

115 except (ValueError, TypeError): 

116 return default 

117 

118 

119_INVISIBLE_UNICODE_CATEGORIES = {"Cf", "Zl", "Zp"} 

120 

121 

122def _validate_title(title) -> tuple[str, int] | None: 

123 """Return (error_message, http_status) when *title* is invalid, else None. 

124 

125 A title is invalid when it is not a non-empty string or exceeds 

126 ``MAX_TITLE_LENGTH``. Callers that allow ``None`` (e.g. create_session 

127 where omitting the title is fine) should short-circuit on ``None`` 

128 before calling this helper. 

129 

130 Strips Unicode format / line-separator characters (``Cf``/``Zl``/``Zp``, 

131 including zero-width spaces U+200B-U+200D and BOM U+FEFF) before the 

132 emptiness check so an "invisible" title like 500 zero-width chars is 

133 rejected instead of saving a session that looks blank in the UI. 

134 """ 

135 if not isinstance(title, str): 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 return ("Title cannot be empty", 400) 

137 visible = "".join( 

138 c 

139 for c in title 

140 if unicodedata.category(c) not in _INVISIBLE_UNICODE_CATEGORIES 

141 ) 

142 if not visible.strip(): 

143 return ("Title cannot be empty", 400) 

144 if len(title) > MAX_TITLE_LENGTH: 

145 return ( 

146 f"Title too long (max {MAX_TITLE_LENGTH} characters)", 

147 400, 

148 ) 

149 return None 

150 

151 

152def _cleanup_chat_send_rows( 

153 username, research_id, message_id, session_id, reason: str 

154) -> None: 

155 """Undo the user-message + research_history rows committed by send_message 

156 when ``start_research_process`` rejects the spawn. 

157 

158 Used by both the ``DuplicateResearchError`` (409) and 

159 ``SystemAtCapacityError`` (429) paths. Failure to clean up is logged at 

160 ERROR level so orphan rows + inflated message_count are visible to ops. 

161 """ 

162 try: 

163 with get_user_db_session(username) as cleanup_db: 

164 cleanup_db.query(ResearchHistory).filter_by(id=research_id).delete() 

165 cleanup_db.query(ChatMessage).filter_by(id=message_id).delete() 

166 # Drop the per-user-cap tracking row too (the spawn never 

167 # started, so no live thread owns it). research_id is a fresh 

168 # UUID, so this only ever matches our own just-inserted row. 

169 cleanup_db.query(UserActiveResearch).filter_by( 

170 username=username, research_id=research_id 

171 ).delete() 

172 cleanup_db.execute( 

173 sa_update(ChatSession) 

174 .where(ChatSession.id == session_id) 

175 .values(message_count=ChatSession.message_count - 1) 

176 ) 

177 cleanup_db.commit() 

178 except DB_EXCEPTIONS: 

179 logger.exception( 

180 f"Cleanup after {reason} chat-send rejection FAILED " 

181 f"for research {research_id[:8]}... in chat " 

182 f"{session_id[:8]}...; orphan rows + inflated " 

183 f"message_count may persist until next sweep." 

184 ) 

185 

186 

187# ============================================================================ 

188# Page Routes 

189# ============================================================================ 

190 

191 

192@chat_bp.route("/chat/") 

193@chat_bp.route("/chat/<session_id>") 

194@login_required 

195def chat_page(session_id=None): 

196 """ 

197 Render the chat page. 

198 

199 Args: 

200 session_id: Optional session ID to load existing session 

201 """ 

202 return render_template_with_defaults( 

203 "pages/chat.html", session_id=session_id 

204 ) 

205 

206 

207# ============================================================================ 

208# Session API Routes 

209# ============================================================================ 

210 

211 

212@chat_bp.route("/api/chat/sessions", methods=["POST"]) 

213@login_required 

214# Per-user keying (default is per-IP). Without this, users behind a shared 

215# NAT/proxy share one bucket and can DoS each other for legitimate chat use. 

216@limiter.limit("20 per minute", key_func=_get_api_user_key) 

217@require_json_body( 

218 error_format="success", 

219 error_message="Request body must be a JSON object", 

220) 

221def create_session(): 

222 """ 

223 Create a new chat session. 

224 

225 Request body: 

226 { 

227 "initial_query": "optional initial question", 

228 "title": "optional custom title" 

229 } 

230 

231 Returns: 

232 { 

233 "success": true, 

234 "session_id": "uuid", 

235 "session": { session data } 

236 } 

237 """ 

238 try: 

239 username = session.get("username") 

240 

241 # @require_json_body has already guaranteed a dict body; reach for it 

242 # directly. Flask caches the parse so this is not a duplicate call. 

243 data = request.get_json(silent=True) 

244 

245 # Validate input lengths 

246 initial_query = data.get("initial_query") 

247 title = data.get("title") 

248 

249 # Reject non-string initial_query early so len() / downstream 

250 # string ops don't raise TypeError → 500. 

251 if initial_query is not None and not isinstance(initial_query, str): 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true

252 return jsonify( 

253 { 

254 "success": False, 

255 "error": "initial_query must be a string", 

256 } 

257 ), 400 

258 

259 if initial_query and len(initial_query) > MAX_QUERY_LENGTH: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true

260 return jsonify( 

261 { 

262 "success": False, 

263 "error": f"Initial query too long (max {MAX_QUERY_LENGTH} characters)", 

264 } 

265 ), 400 

266 

267 if title is not None: 

268 err = _validate_title(title) 

269 if err is not None: 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true

270 msg, status = err 

271 return jsonify({"success": False, "error": msg}), status 

272 

273 settings_snapshot = _load_settings(username) 

274 

275 service = ChatService(username) 

276 session_id = service.create_session( 

277 initial_query=initial_query, 

278 title=title, 

279 settings_snapshot=settings_snapshot, 

280 ) 

281 

282 # Get the created session 

283 try: 

284 session_data = service.get_session(session_id) 

285 except ChatSessionNotFound: 

286 # Session was just created in this request — getting "not 

287 # found" here means a delete-race or storage failure. 

288 # Don't include session_id in the log message (flagged as 

289 # sensitive by check-sensitive-logging); the exception's 

290 # stack trace already carries enough context to diagnose. 

291 logger.exception("Just-created chat session missing on read-back") 

292 return jsonify( 

293 {"success": False, "error": "Failed to load created session"} 

294 ), 500 

295 

296 return jsonify( 

297 { 

298 "success": True, 

299 "session_id": session_id, 

300 "session": session_data, 

301 } 

302 ) 

303 

304 except ROUTE_EXCEPTIONS: 

305 logger.exception("Error creating chat session") 

306 return jsonify( 

307 { 

308 "success": False, 

309 "error": "Failed to create chat session", 

310 } 

311 ), 500 

312 

313 

314@chat_bp.route( 

315 "/api/chat/sessions/<session_id>/generate-title", methods=["POST"] 

316) 

317@login_required 

318# Per-user keying + lower limit than create_session because each call is a 

319# real LLM round-trip on a server-paid endpoint (vs create_session which is 

320# zero-LLM DB work). Without per-user keying, shared-IP users share the bucket. 

321@limiter.limit("10 per minute", key_func=_get_api_user_key) 

322@require_json_body( 

323 error_format="success", 

324 error_message="Request body must be a JSON object", 

325) 

326def generate_session_title(session_id): 

327 """ 

328 Regenerate the session title using the configured LLM. 

329 

330 This is a fire-and-forget endpoint the frontend calls asynchronously 

331 right after creating a session, so the synchronous POST 

332 /api/chat/sessions response isn't blocked on an LLM round-trip. 

333 

334 Request body: {"query": "the initial research query"} 

335 

336 Returns: {"success": true, "title": "..."} on success, 

337 {"success": false, "error": "..."} on failure. 

338 """ 

339 try: 

340 username = session.get("username") 

341 # @require_json_body has already guaranteed a dict body. 

342 data = request.get_json(silent=True) 

343 

344 query = data.get("query") 

345 

346 if not query: 

347 return jsonify( 

348 {"success": False, "error": "query is required"} 

349 ), 400 

350 if not isinstance(query, str) or len(query) > MAX_QUERY_LENGTH: 350 ↛ 351line 350 didn't jump to line 351 because the condition on line 350 was never true

351 return jsonify( 

352 { 

353 "success": False, 

354 "error": f"query must be a string up to {MAX_QUERY_LENGTH} chars", 

355 } 

356 ), 400 

357 

358 service = ChatService(username) 

359 try: 

360 service.get_session(session_id) 

361 except ChatSessionNotFound: 

362 return jsonify( 

363 {"success": False, "error": "Session not found"} 

364 ), 404 

365 

366 settings_snapshot = _load_settings(username) 

367 new_title = service.regenerate_title_with_llm( 

368 session_id, query, settings_snapshot 

369 ) 

370 if not new_title: 370 ↛ 374line 370 didn't jump to line 374 because the condition on line 370 was always true

371 # LLM disabled, or LLM call failed — keep existing fallback title 

372 return jsonify({"success": False, "title": None}), 200 

373 

374 return jsonify({"success": True, "title": new_title}) 

375 

376 except ROUTE_EXCEPTIONS: 

377 logger.exception("Error regenerating chat title") 

378 return jsonify( 

379 {"success": False, "error": "Failed to regenerate title"} 

380 ), 500 

381 

382 

383@chat_bp.route("/api/chat/sessions", methods=["GET"]) 

384@login_required 

385def list_sessions(): 

386 """ 

387 List chat sessions for the current user. 

388 

389 Query params: 

390 - status: active, archived, deleted, or all (default: active) 

391 - limit: max sessions to return (default: 20) 

392 - offset: pagination offset (default: 0) 

393 

394 Returns: 

395 { 

396 "success": true, 

397 "sessions": [ session data list ] 

398 } 

399 """ 

400 try: 

401 username = session.get("username") 

402 status = request.args.get("status", ChatSessionStatus.ACTIVE.value) 

403 # Validate status parameter 

404 if status not in VALID_LIST_STATUSES: 

405 status = ChatSessionStatus.ACTIVE.value 

406 limit = _parse_int_param( 

407 request.args.get("limit"), 20, min_val=1, max_val=100 

408 ) 

409 offset = _parse_int_param( 

410 request.args.get("offset"), 0, min_val=0, max_val=MAX_OFFSET 

411 ) 

412 

413 service = ChatService(username) 

414 sessions = service.list_sessions( 

415 status=status, limit=limit, offset=offset 

416 ) 

417 

418 return jsonify( 

419 { 

420 "success": True, 

421 "sessions": sessions, 

422 } 

423 ) 

424 

425 except ROUTE_EXCEPTIONS: 

426 logger.exception("Error listing chat sessions") 

427 return jsonify( 

428 { 

429 "success": False, 

430 "error": "Failed to list chat sessions", 

431 } 

432 ), 500 

433 

434 

435@chat_bp.route("/api/chat/sessions/<session_id>", methods=["GET"]) 

436@login_required 

437def get_session(session_id): 

438 """ 

439 Get a specific chat session. 

440 

441 Returns: 

442 { 

443 "success": true, 

444 "session": { session data } 

445 } 

446 """ 

447 try: 

448 username = session.get("username") 

449 service = ChatService(username) 

450 try: 

451 session_data = service.get_session(session_id) 

452 except ChatSessionNotFound: 

453 return jsonify( 

454 { 

455 "success": False, 

456 "error": "Session not found", 

457 } 

458 ), 404 

459 

460 return jsonify( 

461 { 

462 "success": True, 

463 "session": session_data, 

464 } 

465 ) 

466 

467 except ROUTE_EXCEPTIONS: 

468 logger.exception("Error getting chat session") 

469 return jsonify( 

470 { 

471 "success": False, 

472 "error": "Failed to get chat session", 

473 } 

474 ), 500 

475 

476 

477@chat_bp.route("/api/chat/sessions/<session_id>", methods=["PATCH"]) 

478@login_required 

479# Per-user keying, like the other state-changing chat routes. Without a 

480# per-route limit, rename/archive was bounded only by the global limiter, 

481# leaving an uneven abuse surface across the session API. 

482@limiter.limit("30 per minute", key_func=_get_api_user_key) 

483@require_json_body( 

484 error_format="success", 

485 error_message="Request body must be a JSON object", 

486) 

487def update_session(session_id): 

488 """ 

489 Update a chat session (title, archive, delete). 

490 

491 Request body: 

492 { 

493 "title": "new title", // optional 

494 "status": "archived" // optional: active, archived 

495 } 

496 """ 

497 try: 

498 username = session.get("username") 

499 # @require_json_body has already guaranteed a dict body. 

500 data = request.get_json(silent=True) 

501 

502 # Require at least one valid field 

503 valid_fields = {"title", "status"} 

504 if not any(field in data for field in valid_fields): 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true

505 return jsonify( 

506 { 

507 "success": False, 

508 "error": "Request must include at least one of: title, status", 

509 } 

510 ), 400 

511 

512 service = ChatService(username) 

513 

514 try: 

515 service.get_session(session_id) 

516 except ChatSessionNotFound: 

517 return jsonify( 

518 {"success": False, "error": "Session not found"} 

519 ), 404 

520 

521 ops_ok = True 

522 

523 if "title" in data: 

524 title = data["title"] 

525 err = _validate_title(title) 

526 if err is not None: 

527 msg, status = err 

528 return jsonify({"success": False, "error": msg}), status 

529 ops_ok = service.update_session_title(session_id, title) and ops_ok 

530 

531 if "status" in data: 

532 new_status = data["status"] 

533 if new_status not in VALID_UPDATE_STATUSES: 

534 return jsonify( 

535 {"success": False, "error": "Invalid status value"} 

536 ), 400 

537 if new_status == ChatSessionStatus.ACTIVE.value: 

538 ops_ok = service.reactivate_session(session_id) and ops_ok 

539 elif new_status == ChatSessionStatus.ARCHIVED.value: 539 ↛ 557line 539 didn't jump to line 557 because the condition on line 539 was always true

540 try: 

541 ops_ok = service.archive_session(session_id) and ops_ok 

542 except ArchiveBlockedError: 

543 # Symmetric with send-to-archived (also 409): the 

544 # client should stop the research or wait for it to 

545 # finish before archiving the session. 

546 # Hard-coded message — never echo str(exc) here so a 

547 # future ArchiveBlockedError raise with interpolated 

548 # data can't leak to the response (information 

549 # exposure through an exception, CWE-209). 

550 return jsonify( 

551 { 

552 "success": False, 

553 "error": "Cannot archive: research in_progress. Stop it first.", 

554 } 

555 ), 409 

556 

557 try: 

558 session_data = service.get_session(session_id) 

559 except ChatSessionNotFound: 

560 # Session was deleted by a concurrent request between the 

561 # update above and this read-back. Treat as 404 rather than 

562 # returning a partial success with null data. 

563 return jsonify( 

564 {"success": False, "error": "Session not found"} 

565 ), 404 

566 

567 if not ops_ok: 567 ↛ 571line 567 didn't jump to line 571 because the condition on line 567 was never true

568 # The read-back above succeeded, so the session still exists, yet 

569 # an update reported failure — a DB write error was swallowed into 

570 # a False return. Surface it instead of reporting success. 

571 logger.error( 

572 f"Chat session update failed at DB layer for " 

573 f"{session_id[:8]}..." 

574 ) 

575 return jsonify( 

576 {"success": False, "error": "Failed to update session"} 

577 ), 500 

578 

579 return jsonify( 

580 { 

581 "success": True, 

582 "session": session_data, 

583 } 

584 ) 

585 

586 except ROUTE_EXCEPTIONS: 

587 logger.exception("Error updating chat session") 

588 return jsonify( 

589 { 

590 "success": False, 

591 "error": "Failed to update chat session", 

592 } 

593 ), 500 

594 

595 

596@chat_bp.route("/api/chat/sessions/<session_id>", methods=["DELETE"]) 

597@login_required 

598# Per-user keying, like the other state-changing chat routes. Caps bulk 

599# delete attempts that the global limiter alone left under-constrained. 

600@limiter.limit("30 per minute", key_func=_get_api_user_key) 

601def delete_session(session_id): 

602 """Delete a chat session permanently.""" 

603 try: 

604 username = session.get("username") 

605 service = ChatService(username) 

606 success = service.delete_session(session_id) 

607 

608 if not success: 

609 return jsonify( 

610 { 

611 "success": False, 

612 "error": "Session not found", 

613 } 

614 ), 404 

615 

616 return jsonify( 

617 { 

618 "success": True, 

619 } 

620 ) 

621 

622 except ROUTE_EXCEPTIONS: 

623 logger.exception("Error deleting chat session") 

624 return jsonify( 

625 { 

626 "success": False, 

627 "error": "Failed to delete chat session", 

628 } 

629 ), 500 

630 

631 

632# ============================================================================ 

633# Message API Routes 

634# ============================================================================ 

635 

636 

637@chat_bp.route("/api/chat/sessions/<session_id>/messages", methods=["GET"]) 

638@login_required 

639def get_messages(session_id): 

640 """ 

641 Get messages for a chat session. 

642 

643 Query params: 

644 - limit: max messages to return (default: 50, max: 100) 

645 - offset: pagination offset into the DESC slice (default: 0) 

646 - before_created_at: ISO timestamp cursor — return only entries 

647 strictly older than this. Use the oldest currently-displayed 

648 ``created_at`` to implement "load older messages". 

649 - before_id: optional id of the oldest currently-displayed row; 

650 when paired with `before_created_at` the cursor becomes 

651 composite, preventing same-millisecond rows at the page boundary 

652 from being silently dropped. 

653 

654 Returns: 

655 { 

656 "success": true, 

657 "messages": [ message data list, ASC by created_at ], 

658 "has_more": bool, 

659 "in_progress_research_id": str | null 

660 } 

661 """ 

662 try: 

663 username = session.get("username") 

664 limit = _parse_int_param( 

665 request.args.get("limit"), 50, min_val=1, max_val=100 

666 ) 

667 offset = _parse_int_param( 

668 request.args.get("offset"), 0, min_val=0, max_val=MAX_OFFSET 

669 ) 

670 before_created_at = request.args.get("before_created_at") or None 

671 before_id = request.args.get("before_id") or None 

672 

673 service = ChatService(username) 

674 

675 try: 

676 service.get_session(session_id) 

677 except ChatSessionNotFound: 

678 return jsonify( 

679 {"success": False, "error": "Session not found"} 

680 ), 404 

681 

682 # Fetch one extra row so we can tell the client whether more 

683 # older entries exist without a second round-trip. 

684 peek_limit = limit + 1 

685 page = service.get_session_messages( 

686 session_id, 

687 limit=peek_limit, 

688 offset=offset, 

689 before_created_at=before_created_at, 

690 before_id=before_id, 

691 ) 

692 has_more = len(page) > limit 

693 messages = page[-limit:] if has_more else page 

694 

695 # The client (chat.js loadSession) restores the live "thinking" 

696 # indicator from this field instead of inferring in-flight state 

697 # from message metadata. O(1) via the partial-unique index 

698 # ux_research_history_chat_session_in_progress. 

699 in_progress_research_id = service.get_in_progress_research_id( 

700 session_id 

701 ) 

702 

703 return jsonify( 

704 { 

705 "success": True, 

706 "messages": messages, 

707 "has_more": has_more, 

708 "in_progress_research_id": in_progress_research_id, 

709 } 

710 ) 

711 

712 except ROUTE_EXCEPTIONS: 

713 logger.exception("Error getting chat messages") 

714 return jsonify( 

715 { 

716 "success": False, 

717 "error": "Failed to get chat messages", 

718 } 

719 ), 500 

720 

721 

722@chat_bp.route("/api/chat/sessions/<session_id>/messages", methods=["POST"]) 

723@login_required 

724# Per-user keying (default is per-IP). send_message launches a full research 

725# run, so this is the heaviest chat endpoint; shared-IP users sharing the 

726# bucket would lock each other out. 

727@limiter.limit("10 per minute", key_func=_get_api_user_key) 

728@require_json_body( 

729 error_format="success", 

730 error_message="Request body must be a JSON object", 

731) 

732def send_message(session_id): 

733 """ 

734 Send a message in a chat session. 

735 

736 This endpoint: 

737 1. Adds the user message to the session 

738 2. Decides if research is needed 

739 3. If research needed, starts research process 

740 4. Returns message ID and research ID (if applicable) 

741 

742 Request body: 

743 { 

744 "content": "user message", 

745 "trigger_research": true // optional, default true 

746 } 

747 

748 Note: Research mode is always "quick" in chat. This is intentional for v1. 

749 

750 Returns: 

751 { 

752 "success": true, 

753 "message_id": "uuid", 

754 "research_id": "uuid or null", 

755 "research_mode": "quick/none" 

756 } 

757 """ 

758 try: 

759 username = session.get("username") 

760 # @require_json_body has already guaranteed a dict body and rejected 

761 # non-JSON content types (which also hardens CSRF, matching the other 

762 # state-changing chat POSTs). Flask caches the parse, so this is free. 

763 data = request.get_json(silent=True) 

764 

765 if not data or not data.get("content"): 

766 return jsonify( 

767 { 

768 "success": False, 

769 "error": "Message content is required", 

770 } 

771 ), 400 

772 

773 # Reject non-string content before .strip() raises AttributeError 

774 # → 500. Mirrors the isinstance guard in _validate_title. 

775 if not isinstance(data["content"], str): 775 ↛ 776line 775 didn't jump to line 776 because the condition on line 775 was never true

776 return jsonify( 

777 { 

778 "success": False, 

779 "error": "content must be a string", 

780 } 

781 ), 400 

782 

783 content = data["content"].strip() 

784 

785 # Reject whitespace-only content 

786 if not content: 

787 return jsonify( 

788 { 

789 "success": False, 

790 "error": "Message content is required", 

791 } 

792 ), 400 

793 

794 if len(content) > MAX_MESSAGE_LENGTH: 

795 return jsonify( 

796 { 

797 "success": False, 

798 "error": f"Message too long (max {MAX_MESSAGE_LENGTH} characters)", 

799 } 

800 ), 400 

801 

802 raw = data.get("trigger_research", True) 

803 trigger_research = raw if isinstance(raw, bool) else True 

804 

805 service = ChatService(username) 

806 

807 # Verify session exists (informational fast-fail; the 

808 # UPDATE...RETURNING inside insert_message_in_db is the 

809 # authoritative check that survives a delete-race). 

810 try: 

811 session_data = service.get_session(session_id) 

812 except ChatSessionNotFound: 

813 return jsonify( 

814 { 

815 "success": False, 

816 "error": "Session not found", 

817 } 

818 ), 404 

819 

820 # Reject sends to non-active sessions. Archived/deleted sessions 

821 # are intentionally read-only — users must reactivate before 

822 # continuing the conversation. 

823 if session_data.get("status") != ChatSessionStatus.ACTIVE.value: 823 ↛ 824line 823 didn't jump to line 824 because the condition on line 823 was never true

824 return jsonify( 

825 { 

826 "success": False, 

827 "error": "This chat is archived. Reactivate it to continue.", 

828 } 

829 ), 409 

830 

831 # Pre-fetch existing messages for context decisions. 

832 messages = service.get_session_messages(session_id, limit=20) 

833 

834 research_id = None 

835 research_mode = "none" 

836 message_id = None 

837 settings_snapshot = None 

838 research_context = None 

839 

840 if trigger_research: 

841 # Always quick mode in chat (intentional v1 scope). 

842 research_mode = "quick" 

843 

844 # ---- Concurrency guards (per-session + global per-user) ---- 

845 # Both guards run in one transaction so a stale-row reclaim 

846 # is visible to the count check below it. 

847 # 

848 # Without the stale-thread sweep, a process crash leaves the 

849 # ResearchHistory row at IN_PROGRESS forever — every later 

850 # send_message returns 409 with no in-chat way to recover. 

851 # 

852 # Sweep AGE NOTE: a brand-new IN_PROGRESS row briefly exists 

853 # before its worker registers in `_active_research` (between 

854 # the DB commit below and the `start_research_process` call). 

855 # During that window `is_research_thread_alive` would return 

856 # False even though the thread spawn is in flight. Only reclaim 

857 # rows older than `_STALE_RESEARCH_GRACE_SECONDS` (default 30s) 

858 # so we don't kill our own freshly-inserted research from a 

859 # racing concurrent send. 

860 _STALE_RESEARCH_GRACE_SECONDS = 30 

861 grace_cutoff_dt = datetime.now(UTC) - timedelta( 

862 seconds=_STALE_RESEARCH_GRACE_SECONDS 

863 ) 

864 # ResearchHistory.created_at is a String column (ISO-8601); 

865 # UserActiveResearch.started_at is a UtcDateTime column. 

866 grace_cutoff_iso = grace_cutoff_dt.isoformat() 

867 with get_user_db_session(username) as cap_db: 

868 # 1. Reclaim stale chat-session research rows whose 

869 # worker thread is dead AND that are older than the 

870 # spawn-grace cutoff. 

871 stale_chat = ( 

872 cap_db.query(ResearchHistory) 

873 .filter( 

874 ResearchHistory.chat_session_id == session_id, 

875 ResearchHistory.status == ResearchStatus.IN_PROGRESS, 

876 ResearchHistory.created_at < grace_cutoff_iso, 

877 ) 

878 .all() 

879 ) 

880 reclaimed_chat = False 

881 for row in stale_chat: 881 ↛ 882line 881 didn't jump to line 882 because the loop on line 881 never started

882 if not is_research_thread_alive(row.id): 

883 logger.warning( 

884 f"Reclaiming stale chat research {row.id[:8]}... " 

885 f"(thread dead) on chat {session_id[:8]}..." 

886 ) 

887 row.status = ResearchStatus.FAILED 

888 cleanup_research(row.id) 

889 reclaimed_chat = True 

890 if reclaimed_chat: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true

891 cap_db.commit() 

892 

893 # 2. Per-session guard: at most one live research per chat. 

894 existing_session_research = ( 

895 cap_db.query(ResearchHistory) 

896 .filter_by( 

897 chat_session_id=session_id, 

898 status=ResearchStatus.IN_PROGRESS, 

899 ) 

900 .first() 

901 ) 

902 if existing_session_research: 

903 return jsonify( 

904 { 

905 "success": False, 

906 "error": "Research already in progress on this chat session. Stop it before sending a new message.", 

907 "active_research_id": existing_session_research.id, 

908 } 

909 ), 409 

910 

911 # 3. Reclaim stale UserActiveResearch rows so the count 

912 # below isn't inflated by dead threads. Same grace 

913 # window applied via started_at to avoid killing a 

914 # sibling request's just-spawned thread. Shared with 

915 # research_routes.start_research; chat passes a 

916 # grace_cutoff_dt because chat send can race with 

917 # its own concurrent sibling, research_routes can't. 

918 from ..web.routes.globals import ( 

919 reclaim_stale_user_active_research, 

920 ) 

921 

922 if reclaim_stale_user_active_research( 922 ↛ 928line 922 didn't jump to line 928 because the condition on line 922 was never true

923 cap_db, 

924 username, 

925 grace_cutoff_dt=grace_cutoff_dt, 

926 logger=logger, 

927 ): 

928 cap_db.commit() 

929 

930 # 4. Global per-user cap (mirrors 

931 # research_routes.start_research). Without this, 

932 # multiple chat tabs let a user bypass the cap. 

933 active_count = ( 

934 cap_db.query(UserActiveResearch) 

935 .filter_by( 

936 username=username, 

937 status=ResearchStatus.IN_PROGRESS, 

938 ) 

939 .count() 

940 ) 

941 max_concurrent = SettingsManager(db_session=cap_db).get_setting( 

942 "app.max_concurrent_researches", 3 

943 ) 

944 if active_count >= max_concurrent: 

945 return jsonify( 

946 { 

947 "success": False, 

948 "error": ( 

949 f"Concurrent research limit reached " 

950 f"({active_count}/{max_concurrent}). " 

951 "Wait for an existing research to finish." 

952 ), 

953 } 

954 ), 429 

955 # ---- end concurrency guards ---- 

956 

957 # Settings + context (read-only — fine to do after the cap 

958 # check, before the atomic write). 

959 if trigger_research: 

960 settings_snapshot = _load_settings(username) 

961 context_manager = ChatContextManager( 

962 session_id, 

963 messages, 

964 session_data.get("accumulated_context"), 

965 settings_snapshot=settings_snapshot, 

966 ) 

967 # Pass the new user message so prior conversation is condensed 

968 # into a summary focused on this question (used as the follow-up 

969 # prompt's "previous findings"). 

970 research_context = context_manager.build_research_context( 

971 current_query=content 

972 ) 

973 research_id = str(uuid.uuid4()) 

974 

975 # Parse numeric search settings up-front. A malformed value 

976 # (a non-numeric string in the user's settings DB) must 

977 # return a clean 400 HERE — before the atomic write below 

978 # commits the user message + IN_PROGRESS research row. If the 

979 # int() cast ran after the commit (as it used to, down in the 

980 # research-dispatch block), the ValueError would propagate as 

981 # an unhandled 500 with those rows already committed, orphaning 

982 # them and soft-bricking the session via the per-session 409 

983 # guard. 

984 try: 

985 iterations = int( 

986 settings_snapshot.get("search.iterations", {}).get( 

987 "value", 3 

988 ) 

989 ) 

990 questions = int( 

991 settings_snapshot.get( 

992 "search.questions_per_iteration", {} 

993 ).get("value", 1) 

994 ) 

995 except (ValueError, TypeError): 

996 return jsonify( 

997 { 

998 "success": False, 

999 "error": ( 

1000 "Invalid numeric value in search settings " 

1001 "(iterations / questions_per_iteration)." 

1002 ), 

1003 } 

1004 ), 400 

1005 

1006 # ---- Atomic write: user message + research row in ONE transaction ---- 

1007 # Closes the orphan window: any IntegrityError or 

1008 # concurrent-delete on the research insert rolls back the user 

1009 # message too. The UPDATE...RETURNING inside 

1010 # insert_message_in_db doubles as the authoritative 

1011 # "session-still-exists" check; if the session was deleted 

1012 # between the get_session call above and now, a ValueError 

1013 # surfaces with "not found" and we map it to 404. 

1014 try: 

1015 with get_user_db_session(username) as db_session: 

1016 message_id = service.insert_message_in_db( 

1017 db_session, 

1018 session_id=session_id, 

1019 role="user", 

1020 content=content, 

1021 message_type="query" if len(messages) == 0 else "followup", 

1022 ) 

1023 

1024 if trigger_research: 

1025 created_at = datetime.now(UTC).isoformat() 

1026 research_meta = { 

1027 "submission": { 

1028 "chat_session_id": session_id, 

1029 "message_id": message_id, 

1030 "research_mode": research_mode, 

1031 }, 

1032 } 

1033 research = ResearchHistory( 

1034 id=research_id, 

1035 query=content, 

1036 mode=research_mode, 

1037 status=ResearchStatus.IN_PROGRESS.value, 

1038 created_at=created_at, 

1039 progress_log=[{"time": created_at, "progress": 0}], 

1040 research_meta=research_meta, 

1041 chat_session_id=session_id, 

1042 ) 

1043 db_session.add(research) 

1044 

1045 # Count this research toward the per-user concurrent 

1046 # cap. Mirrors research_routes.start_research — without a 

1047 # UserActiveResearch row, chat research is invisible to 

1048 # the cap (queried at the top of this handler AND by the 

1049 # UI start path), letting multiple chat tabs bypass it. 

1050 # Added in the SAME transaction as the research row so the 

1051 # IntegrityError rollback below undoes both. Removed on 

1052 # spawn failure by _cleanup_chat_send_rows, and on normal 

1053 # completion by the cleanup_completed_research middleware 

1054 # (keyed on is_research_active(research_id), which covers 

1055 # chat and non-chat research alike). 

1056 import threading 

1057 

1058 db_session.add( 

1059 UserActiveResearch( 

1060 username=username, 

1061 research_id=research_id, 

1062 status=ResearchStatus.IN_PROGRESS, 

1063 thread_id=str(threading.current_thread().ident), 

1064 settings_snapshot=settings_snapshot, 

1065 ) 

1066 ) 

1067 

1068 try: 

1069 db_session.commit() 

1070 except IntegrityError: 

1071 # Two near-simultaneous POSTs both passed the 

1072 # per-session guard; the partial unique index on 

1073 # (chat_session_id) WHERE status='in_progress' 

1074 # (migration 0010) catches the loser here. 

1075 # Rolling back the transaction also undoes the 

1076 # user-message INSERT and the message_count 

1077 # increment — no orphan. 

1078 db_session.rollback() 

1079 logger.warning( 

1080 f"Concurrent in-progress research race for chat {session_id[:8]}..." 

1081 ) 

1082 return jsonify( 

1083 { 

1084 "success": False, 

1085 "error": "Research already in progress on this chat session.", 

1086 } 

1087 ), 409 

1088 except ValueError as exc: 

1089 # `insert_message_in_db` raises ValueError("not found") 

1090 # when the session row was deleted between the existence 

1091 # check and the UPDATE...RETURNING. Map to 404 so the 

1092 # client can distinguish a deleted session from a 500. 

1093 if "not found" in str(exc).lower(): 1093 ↛ 1097line 1093 didn't jump to line 1097 because the condition on line 1093 was always true

1094 return jsonify( 

1095 {"success": False, "error": "Session not found"} 

1096 ), 404 

1097 raise 

1098 # ---- end atomic write ---- 

1099 

1100 if trigger_research: 

1101 # Type narrowing: the variables below were initialized to None 

1102 # at the top of the route and then assigned inside the matching 

1103 # `if trigger_research:` block above. They are guaranteed 

1104 # non-None here, but mypy doesn't connect the two branches — 

1105 # so we narrow explicitly. Uses a real runtime check (not 

1106 # ``assert``) so it survives ``python -O`` (bandit S101). 

1107 if ( 

1108 settings_snapshot is None 

1109 or research_context is None 

1110 or research_id is None 

1111 or message_id is None 

1112 ): # pragma: no cover — unreachable invariant guard 

1113 raise RuntimeError( 

1114 "trigger_research path entered with unset state" 

1115 ) 

1116 # Get user password for metrics 

1117 pw = get_user_password(username) 

1118 

1119 # Extract settings values with safe .get() defaults 

1120 model_provider = settings_snapshot.get("llm.provider", {}).get( 

1121 "value", "" 

1122 ) 

1123 model = settings_snapshot.get("llm.model", {}).get("value", "") 

1124 search_engine = settings_snapshot.get("search.tool", {}).get( 

1125 "value", "" 

1126 ) 

1127 custom_endpoint = settings_snapshot.get( 

1128 "llm.openai_endpoint.url", {} 

1129 ).get("value") 

1130 # Defensive fallbacks kept in sync with default_settings.json: 

1131 # `search.search_strategy` defaults to "langgraph-agent", 

1132 # `search.iterations` to 3, `search.questions_per_iteration` 

1133 # to 1. Out-of-sync fallbacks here silently produce a 

1134 # *different* research product if the snapshot ever lacks 

1135 # the key (e.g., fresh user DB before defaults are seeded). 

1136 user_strategy = settings_snapshot.get( 

1137 "search.search_strategy", {} 

1138 ).get("value", "langgraph-agent") 

1139 # `iterations` and `questions` were parsed + validated up-front 

1140 # (before the atomic write) so a malformed setting returns a 

1141 # clean 400 instead of orphaning a committed research row. 

1142 

1143 # For follow-up messages, use the contextual follow-up strategy 

1144 # which wraps the user's preferred strategy as a delegate 

1145 if research_context.get("is_multi_turn"): 

1146 strategy = "enhanced-contextual-followup" 

1147 research_context["delegate_strategy"] = user_strategy 

1148 else: 

1149 strategy = user_strategy 

1150 

1151 # Spawn the worker thread. ``DuplicateResearchError`` 

1152 # inherits from ``Exception`` (not RuntimeError) so it is 

1153 # NOT in ROUTE_EXCEPTIONS and would otherwise escape to a 

1154 # generic 500 — leaving the user message + research row 

1155 # we just committed as orphans. Catch it here, undo our 

1156 # side effects, and return 409. 

1157 try: 

1158 start_research_process( 

1159 research_id, 

1160 content, 

1161 research_mode, 

1162 run_research_process, 

1163 username=username, 

1164 user_password=pw, 

1165 model_provider=model_provider, 

1166 model=model, 

1167 search_engine=search_engine, 

1168 custom_endpoint=custom_endpoint, 

1169 strategy=strategy, 

1170 iterations=iterations, 

1171 questions_per_iteration=questions, 

1172 research_context=research_context, 

1173 chat_session_id=session_id, 

1174 chat_message_id=message_id, 

1175 settings_snapshot=settings_snapshot, 

1176 ) 

1177 except DuplicateResearchError: 

1178 logger.warning( 

1179 f"DuplicateResearchError on chat send_message " 

1180 f"for {research_id[:8]}... (chat {session_id[:8]}...)" 

1181 ) 

1182 # Per ``DuplicateResearchError`` docstring: do NOT 

1183 # mutate UserActiveResearch or the existing 

1184 # ResearchHistory row — those belong to the live 

1185 # thread. Only undo the rows we created in our own 

1186 # transaction above. 

1187 _cleanup_chat_send_rows( 

1188 username, research_id, message_id, session_id, "duplicate" 

1189 ) 

1190 return jsonify( 

1191 { 

1192 "success": False, 

1193 "error": "Research already in progress on this chat session.", 

1194 } 

1195 ), 409 

1196 except SystemAtCapacityError: 

1197 # System at concurrent-research capacity. Undo the rows we 

1198 # committed above and return 429 so the client can retry. 

1199 logger.warning( 

1200 f"SystemAtCapacityError on chat send_message " 

1201 f"for {research_id[:8]}... (chat {session_id[:8]}...)" 

1202 ) 

1203 _cleanup_chat_send_rows( 

1204 username, research_id, message_id, session_id, "capacity" 

1205 ) 

1206 return jsonify( 

1207 { 

1208 "success": False, 

1209 "error": "Server is at research capacity. Please retry shortly.", 

1210 } 

1211 ), 429 

1212 

1213 logger.info( 

1214 f"Started chat research {research_id[:8]}... for chat {session_id[:8]}..." 

1215 ) 

1216 

1217 return jsonify( 

1218 { 

1219 "success": True, 

1220 "message_id": message_id, 

1221 "session_id": session_id, 

1222 "research_id": research_id, 

1223 "research_mode": research_mode, 

1224 } 

1225 ) 

1226 

1227 except ROUTE_EXCEPTIONS: 

1228 logger.exception("Error sending chat message") 

1229 return jsonify( 

1230 { 

1231 "success": False, 

1232 "error": "Failed to send message", 

1233 } 

1234 ), 500