Coverage for src/local_deep_research/chat/service.py: 85%

285 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Service layer for chat functionality. 

3 

4This service handles the business logic for chat sessions and messages, 

5including session management, message handling, and context building. 

6""" 

7 

8from typing import Dict, Any, List, Optional 

9from concurrent.futures import ( 

10 ThreadPoolExecutor, 

11 TimeoutError as FuturesTimeoutError, 

12) 

13from datetime import datetime, UTC 

14import uuid 

15from loguru import logger 

16from sqlalchemy import and_, or_, update 

17from sqlalchemy.exc import SQLAlchemyError 

18 

19from ..database.models import ( 

20 ChatMessage, 

21 ChatMessageType, 

22 ChatProgressStep, 

23 ChatRole, 

24 ChatSession, 

25 ChatSessionStatus, 

26 ResearchHistory, 

27) 

28from ..database.session_context import get_user_db_session 

29from ..web.routes.globals import set_termination_flag 

30from ..constants import ResearchStatus 

31 

32# Standard exception tuple for service-layer DB operations 

33DB_EXCEPTIONS = (ValueError, RuntimeError, SQLAlchemyError) 

34 

35 

36class ArchiveBlockedError(RuntimeError): 

37 """Raised when archive_session is called while a research is in_progress. 

38 

39 Archive flips a session to read-only; allowing it while research is 

40 still running would leave an orphaned research tied to a session the 

41 user thinks is frozen. The route layer translates this to HTTP 409. 

42 """ 

43 

44 

45class ChatSessionNotFound(LookupError): 

46 """Raised by get_session when no row matches the supplied session_id. 

47 

48 The route layer translates this to HTTP 404. Distinct from 

49 ChatRepositoryError so a transient DB failure cannot masquerade as 

50 "session does not exist". 

51 """ 

52 

53 

54class ChatRepositoryError(RuntimeError): 

55 """Raised by get_session when the underlying DB query fails. 

56 

57 The route layer translates this to HTTP 500. Keeping this separate 

58 from ChatSessionNotFound prevents false 404s on infrastructure 

59 errors (locked DB file, encryption key failure, etc.). 

60 """ 

61 

62 

63# Title generation should not block the request thread on a slow LLM. 

64# Wrap the synchronous llm.invoke() call in a worker future with a hard 

65# wall-clock timeout. The default matches the rest of the codebase's 

66# 30s LLM-timeout convention; tune via chat.title_llm_timeout_seconds. 

67_DEFAULT_TITLE_LLM_TIMEOUT_SECONDS = 30.0 

68 

69 

70def _serialize_dt(value): 

71 """Return an ISO-8601 string for a datetime, or None.""" 

72 return value.isoformat() if value is not None else None 

73 

74 

75class ChatService: 

76 """Service for managing chat conversations and messages.""" 

77 

78 def __init__(self, username: str): 

79 """ 

80 Initialize the chat service. 

81 

82 Args: 

83 username: Username for database access 

84 """ 

85 self.username = username 

86 

87 def create_session( 

88 self, 

89 initial_query: Optional[str] = None, 

90 title: Optional[str] = None, 

91 settings_snapshot: Optional[Dict[str, Any]] = None, 

92 ) -> str: 

93 """ 

94 Create a new chat session. 

95 

96 Args: 

97 initial_query: Optional initial query (used for title generation) 

98 title: Optional custom title for the session 

99 settings_snapshot: Optional settings for LLM title generation 

100 

101 Returns: 

102 Session ID (UUID string) 

103 """ 

104 try: 

105 session_id = str(uuid.uuid4()) 

106 

107 # Use fast, non-LLM fallback title synchronously. If the caller 

108 # wants an LLM-generated title they trigger it asynchronously via 

109 # POST /api/chat/sessions/<id>/generate-title so the creation 

110 # request isn't blocked on an LLM round-trip. 

111 resolved_title = title or self._fallback_title(initial_query) 

112 

113 with get_user_db_session(self.username) as db: 

114 # created_at is populated by the utcnow() default on the 

115 # column — no need to pass it explicitly. 

116 session = ChatSession( 

117 id=session_id, 

118 title=resolved_title, 

119 status=ChatSessionStatus.ACTIVE.value, 

120 accumulated_context={ 

121 "key_entities": [], 

122 "topics": [], 

123 "summary": "", 

124 "source_count": 0, 

125 }, 

126 message_count=0, 

127 ) 

128 db.add(session) 

129 db.commit() 

130 

131 logger.info( 

132 f"Created chat {session_id[:8]}... for user {self.username}" 

133 ) 

134 return session_id 

135 

136 except DB_EXCEPTIONS: 

137 logger.exception("Error creating chat session") 

138 raise 

139 

140 def regenerate_title_with_llm( 

141 self, 

142 session_id: str, 

143 query: Optional[str], 

144 settings_snapshot: Optional[Dict[str, Any]], 

145 ) -> Optional[str]: 

146 """ 

147 Regenerate a session's title using the LLM. 

148 

149 Intended to be called from a dedicated endpoint the frontend fires 

150 after session creation, so the create request doesn't block on the 

151 LLM round-trip. 

152 

153 Idempotency: if the current title no longer matches the non-LLM 

154 fallback (i.e. the user manually edited it, or a sibling tab's 

155 LLM-gen already ran), skip the LLM call so we don't spend credits 

156 only to overwrite the user's deliberate edit on the way back. 

157 

158 Returns the new title on success, or None on failure / no-op. 

159 """ 

160 if not query: 

161 return None 

162 # Check whether the session still has the fallback title. If the 

163 # user (or a concurrent generate-title request) has already moved 

164 # past the fallback, don't burn an LLM call to overwrite their work. 

165 try: 

166 current = self.get_session(session_id) 

167 except ChatSessionNotFound: 

168 return None 

169 current_title = (current or {}).get("title") or "" 

170 fallback = self._fallback_title(query) 

171 if current_title and current_title != fallback: 

172 logger.info( 

173 f"Skipping LLM title gen for {session_id[:8]}...: title " 

174 f"already set ('{current_title[:30]}...')" 

175 ) 

176 return None 

177 new_title = self._generate_title(query, settings_snapshot) 

178 if not new_title: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 return None 

180 updated = self.update_session_title(session_id, new_title) 

181 return new_title if updated else None 

182 

183 def add_message( 

184 self, 

185 session_id: str, 

186 role: str, 

187 content: str, 

188 message_type: str, 

189 research_id: Optional[str] = None, 

190 allow_archived: bool = False, 

191 ) -> str: 

192 """ 

193 Add a durable message (query/followup/response) to a chat session. 

194 

195 Content is required and stored inline. Step rows live in 

196 chat_progress_steps and are written via add_progress_step(). 

197 

198 Args: 

199 session_id: ID of the session to add message to 

200 role: Message role (user or assistant) 

201 content: Message content (required, non-empty) 

202 message_type: Type of message (query, followup, response) 

203 research_id: Optional ID of associated research 

204 allow_archived: When True, the atomic-update WHERE clause omits 

205 the ``status='active'`` filter so a system-written assistant 

206 response can land even if the session was archived between 

207 research start and completion. Use ONLY for system writes 

208 (final assistant response, terminate-partial) — the user 

209 send-message path MUST keep the default False so that 

210 archiving a session in one browser tab still blocks a 

211 concurrent user reply from another tab mid-flight. 

212 

213 Returns: 

214 Message ID (UUID string) 

215 

216 Raises: 

217 ValueError: if content is None. Validated before opening the DB 

218 session so callers (e.g. the route layer) can return HTTP 400 

219 without paying SQLCipher cold-open cost on a doomed request. 

220 """ 

221 if content is None: 

222 raise ValueError("content is required for chat messages") 

223 try: 

224 with get_user_db_session(self.username) as db: 

225 message_id = self.insert_message_in_db( 

226 db, 

227 session_id=session_id, 

228 role=role, 

229 content=content, 

230 message_type=message_type, 

231 research_id=research_id, 

232 allow_archived=allow_archived, 

233 ) 

234 db.commit() 

235 return message_id 

236 

237 except DB_EXCEPTIONS: 

238 logger.exception("Error adding message to chat session") 

239 raise 

240 

241 def insert_message_in_db( 

242 self, 

243 db, 

244 session_id: str, 

245 role: str, 

246 content: str, 

247 message_type: str, 

248 research_id: Optional[str] = None, 

249 allow_archived: bool = False, 

250 ) -> str: 

251 """ 

252 Insert a durable chat message in an active SQLAlchemy session WITHOUT 

253 committing. The caller owns the transaction lifecycle and is 

254 responsible for commit/rollback. 

255 

256 This exists so the route layer can atomically commit the user message 

257 together with the research-history row in a single transaction — 

258 avoiding the orphan-message bug that occurs if the user-message 

259 commit succeeds and the research insert later raises. 

260 

261 Validation and the atomic message_count increment are identical to 

262 ``add_message``; only the commit responsibility differs. 

263 

264 Raises: 

265 ValueError: if role/message_type are invalid, content is None, 

266 or the session row does not exist. 

267 """ 

268 # Content is required (NOT NULL on the column). 

269 # Empty string is permitted by the column (NOT NULL only rejects 

270 # SQL NULL); reject Python None. 

271 if content is None: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 raise ValueError("content is required for chat messages") 

273 

274 # Authoritative validation via the enum constructors — raises 

275 # ValueError for unknown values, which the route layer maps to HTTP 

276 # 400 via ROUTE_EXCEPTIONS. Keeps failure fast (before DB hit) and 

277 # avoids the HTTP-500 regression we'd get from letting SQLAlchemy's 

278 # StatementError surface at commit time. 

279 try: 

280 ChatRole(role) 

281 except ValueError as exc: 

282 raise ValueError(f"Invalid role: {role!r}") from exc 

283 try: 

284 ChatMessageType(message_type) 

285 except ValueError as exc: 

286 raise ValueError(f"Invalid message_type: {message_type!r}") from exc 

287 

288 message_id = str(uuid.uuid4()) 

289 # Atomic increment-and-return on ChatSession.message_count. 

290 # By default the WHERE clause re-checks `status='active'` so an 

291 # archive PATCH racing with a user-message send cannot land on 

292 # a now-archived session. When ``allow_archived=True`` (system- 

293 # written assistant responses), the filter is relaxed so a 

294 # final-report save can complete even if the session flipped to 

295 # archived mid-research — losing the answer is worse than the 

296 # "archive means stop" semantic for system writes. 

297 if allow_archived: 

298 where_clause = ChatSession.id == session_id 

299 not_found_msg = f"Chat session {session_id} not found" 

300 else: 

301 where_clause = (ChatSession.id == session_id) & ( 

302 ChatSession.status == ChatSessionStatus.ACTIVE.value 

303 ) 

304 not_found_msg = f"Chat session {session_id} not found or not active" 

305 stmt = ( 

306 update(ChatSession) 

307 .where(where_clause) 

308 .values(message_count=ChatSession.message_count + 1) 

309 .returning(ChatSession.message_count) 

310 ) 

311 sequence = db.execute(stmt).scalar_one_or_none() 

312 if sequence is None: 

313 raise ValueError(not_found_msg) 

314 

315 # created_at populated by column default (utcnow()). 

316 message = ChatMessage( 

317 id=message_id, 

318 session_id=session_id, 

319 research_id=research_id, 

320 role=role, 

321 message_type=message_type, 

322 content=content, 

323 sequence_number=sequence, 

324 ) 

325 db.add(message) 

326 logger.debug( 

327 f"Staged message {sequence} for chat {session_id[:8]}... (uncommitted)" 

328 ) 

329 return message_id 

330 

331 def add_progress_step( 

332 self, 

333 session_id: str, 

334 research_id: str, 

335 content: str, 

336 phase: Optional[str] = None, 

337 ) -> str: 

338 """ 

339 Add a transient research-progress step for a chat session. 

340 

341 Step rows live in chat_progress_steps and have their 

342 own per-research sequence (allocated atomically against 

343 ResearchHistory.step_count). They do NOT increment the chat 

344 session's message_count. 

345 

346 Args: 

347 session_id: ID of the parent chat session 

348 research_id: ID of the research producing the step 

349 content: Rendered step text (e.g. "Searching for ...") 

350 phase: Optional phase tag from research_service._STEP_PHASES 

351 

352 Returns: 

353 Step ID (UUID string) 

354 """ 

355 if content is None: 

356 raise ValueError("content is required for progress steps") 

357 

358 try: 

359 step_id = str(uuid.uuid4()) 

360 

361 with get_user_db_session(self.username) as db: 

362 # Atomic increment-and-return on research_history.step_count. 

363 stmt = ( 

364 update(ResearchHistory) 

365 .where(ResearchHistory.id == research_id) 

366 .values(step_count=ResearchHistory.step_count + 1) 

367 .returning(ResearchHistory.step_count) 

368 ) 

369 sequence = db.execute(stmt).scalar_one_or_none() 

370 if sequence is None: 

371 raise ValueError( # noqa: TRY301 

372 f"Research {research_id} not found" 

373 ) 

374 

375 step = ChatProgressStep( 

376 id=step_id, 

377 research_id=research_id, 

378 session_id=session_id, 

379 phase=phase, 

380 content=content, 

381 sequence_number=sequence, 

382 ) 

383 db.add(step) 

384 db.commit() 

385 

386 logger.debug( 

387 f"Added progress step {sequence} for research " 

388 f"{research_id[:8]}... in chat {session_id[:8]}..." 

389 ) 

390 return step_id 

391 

392 except DB_EXCEPTIONS: 

393 logger.exception("Error adding progress step") 

394 raise 

395 

396 def get_session(self, session_id: str) -> Dict[str, Any]: 

397 """ 

398 Get a chat session by ID. 

399 

400 Args: 

401 session_id: ID of the session 

402 

403 Returns: 

404 Session data dictionary. 

405 

406 Raises: 

407 ChatSessionNotFound: if no row matches ``session_id``. 

408 Route layer maps to HTTP 404. 

409 ChatRepositoryError: if the DB query itself fails. Route 

410 layer maps to HTTP 500. Keeping these separate avoids 

411 masking transient DB errors as "not found". 

412 """ 

413 try: 

414 with get_user_db_session(self.username) as db: 

415 session = db.query(ChatSession).filter_by(id=session_id).first() 

416 

417 if not session: 

418 logger.warning(f"Chat not found: {session_id[:8]}...") 

419 # noqa: TRY301 — re-raised by the outer except 

420 # ChatSessionNotFound below to propagate as 404. 

421 raise ChatSessionNotFound(session_id) # noqa: TRY301 

422 

423 return { 

424 "id": session.id, 

425 "title": session.title, 

426 "status": session.status, 

427 "message_count": session.message_count, 

428 "created_at": _serialize_dt(session.created_at), 

429 "accumulated_context": session.accumulated_context, 

430 } 

431 

432 except ChatSessionNotFound: 

433 # Propagate as-is; this is the genuine 404 signal. 

434 raise 

435 except DB_EXCEPTIONS as exc: 

436 logger.exception("Error getting chat session") 

437 raise ChatRepositoryError( 

438 f"DB error reading session {session_id[:8]}..." 

439 ) from exc 

440 

441 def get_session_messages( 

442 self, 

443 session_id: str, 

444 limit: int = 50, 

445 offset: int = 0, 

446 before_created_at: Optional[str] = None, 

447 before_id: Optional[str] = None, 

448 ) -> List[Dict[str, Any]]: 

449 """ 

450 Get messages for a session, server-side merged with progress steps. 

451 

452 chat_messages.content is always inline; step rows live in 

453 chat_progress_steps. This method merges both into a single ordered 

454 stream by created_at so the client renderer (chat.js) sees a 

455 unified message list with `message_type='step'` rows interleaved. 

456 

457 Pagination is SQL-level via per-table LIMIT + Python merge: each 

458 table fetches at most ``limit`` rows ordered by created_at DESC, 

459 the two streams are merged on the (timestamp, kind) sort key, and 

460 the latest ``limit`` are returned in ASC order so the client 

461 renders oldest→newest as before. 

462 

463 Pass ``before_created_at`` to fetch the page IMMEDIATELY older 

464 than the given ISO timestamp (use the oldest currently-displayed 

465 ``created_at`` to implement a "load older messages" trigger). 

466 Without the cursor, ``offset`` selects which DESC slice to return 

467 (offset=0 → newest, offset=limit → next older window, …). 

468 

469 Args: 

470 session_id: ID of the session 

471 limit: Maximum number of (merged) entries to return 

472 offset: Number of entries to skip (DESC-ordered slice index) 

473 before_created_at: Optional ISO timestamp cursor — return only 

474 entries strictly older than this. Useful for cursor-based 

475 "load older" pagination instead of offset arithmetic. 

476 

477 Returns: 

478 List of message + step data dictionaries, ordered by 

479 created_at ascending. 

480 """ 

481 try: 

482 with get_user_db_session(self.username) as db: 

483 msg_q = db.query(ChatMessage).filter_by(session_id=session_id) 

484 step_q = db.query(ChatProgressStep).filter_by( 

485 session_id=session_id 

486 ) 

487 

488 if before_created_at: 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true

489 try: 

490 cutoff = datetime.fromisoformat( 

491 before_created_at.replace("Z", "+00:00") 

492 ) 

493 except ValueError: 

494 logger.warning( 

495 "Invalid before_created_at cursor: " 

496 f"{before_created_at!r} — ignoring." 

497 ) 

498 else: 

499 # Composite cursor: when `before_id` is also 

500 # supplied, the filter becomes 

501 # created_at < cutoff 

502 # OR (created_at = cutoff AND id < before_id) 

503 # which prevents same-millisecond rows at the 

504 # page boundary from being silently dropped on 

505 # "Load older" pagination. With a bare timestamp 

506 # cursor we fall back to strict `<` for 

507 # backwards-compat with older clients. 

508 if before_id: 

509 msg_q = msg_q.filter( 

510 or_( 

511 ChatMessage.created_at < cutoff, 

512 and_( 

513 ChatMessage.created_at == cutoff, 

514 ChatMessage.id < before_id, 

515 ), 

516 ) 

517 ) 

518 # ChatProgressStep ids are integers but 

519 # message ids are UUID strings; using the 

520 # bare `<` operator on string ids gives a 

521 # stable lexicographic tie-break, and 

522 # progress-step rows tie-break by their 

523 # own integer id (id < int(before_id) is 

524 # not safe because before_id is the UUID of 

525 # a chat message, not a step). For steps, 

526 # drop the equality branch so duplicates 

527 # rather than drops occur on tie — the 

528 # client-side dedup catches them. 

529 step_q = step_q.filter( 

530 ChatProgressStep.created_at <= cutoff 

531 ) 

532 else: 

533 msg_q = msg_q.filter( 

534 ChatMessage.created_at < cutoff 

535 ) 

536 step_q = step_q.filter( 

537 ChatProgressStep.created_at < cutoff 

538 ) 

539 

540 # Pull at most ``limit`` rows from EACH table in DESC 

541 # order. The merged window is at most 2 * limit rows 

542 # (one extreme: all from one table), which we trim to 

543 # ``limit`` after the Python merge. This bounds the SQL 

544 # work and avoids the old .all() cliff at large N. 

545 fetch_n = limit + offset 

546 # Secondary ORDER BY on sequence_number stabilises rows 

547 # whose created_at collide at SQLite's millisecond 

548 # precision (sqlalchemy_utc stores `%Y-%m-%d %H:%M:%S.fff`). 

549 # Without it, rapid-fire inserts (paste-and-submit, 

550 # auto-retries) can be returned in arbitrary order even 

551 # though sequence_number is monotonic. 

552 messages = ( 

553 msg_q.order_by( 

554 ChatMessage.created_at.desc(), 

555 ChatMessage.sequence_number.desc(), 

556 ) 

557 .limit(fetch_n) 

558 .all() 

559 ) 

560 steps = ( 

561 step_q.order_by( 

562 ChatProgressStep.created_at.desc(), 

563 ChatProgressStep.sequence_number.desc(), 

564 ) 

565 .limit(fetch_n) 

566 .all() 

567 ) 

568 

569 merged: List[Dict[str, Any]] = [] 

570 for msg in messages: 

571 merged.append( 

572 { 

573 "id": msg.id, 

574 "session_id": msg.session_id, 

575 "role": msg.role, 

576 "message_type": msg.message_type, 

577 "content": msg.content, 

578 "sequence_number": msg.sequence_number, 

579 "research_id": msg.research_id, 

580 "created_at": _serialize_dt(msg.created_at), 

581 } 

582 ) 

583 for step in steps: 

584 merged.append( 

585 { 

586 "id": f"step-{step.id}", 

587 "session_id": step.session_id, 

588 "role": "assistant", 

589 "message_type": "step", 

590 "content": step.content, 

591 "phase": step.phase, 

592 "sequence_number": step.sequence_number, 

593 "research_id": step.research_id, 

594 "created_at": _serialize_dt(step.created_at), 

595 } 

596 ) 

597 

598 # Sort DESC by (created_at, sequence_number, 

599 # step-before-message on tie), take the newest 

600 # [offset:offset+limit] slice, then flip to ASC so the 

601 # client still renders oldest→newest. Including 

602 # sequence_number in the Python tie-break mirrors the 

603 # SQL ORDER BY above and prevents same-timestamp messages 

604 # from rendering out of insertion order. 

605 merged.sort( 

606 key=lambda m: ( 

607 m["created_at"] or "", 

608 m.get("sequence_number") or 0, 

609 0 if m["message_type"] == "step" else 1, 

610 ), 

611 reverse=True, 

612 ) 

613 window = merged[offset : offset + limit] 

614 window.reverse() 

615 return window 

616 

617 except DB_EXCEPTIONS: 

618 # Re-raise so the route returns HTTP 500 instead of a 

619 # misleading 200 + []. An empty list here would be 

620 # indistinguishable from a session that genuinely has no 

621 # messages, hiding infrastructure failures from the client. 

622 logger.exception("Error getting chat messages") 

623 raise 

624 

625 def get_in_progress_research_id(self, session_id: str) -> Optional[str]: 

626 """Return the id of the in-progress research for this chat session, 

627 or ``None`` if no research is currently running. 

628 

629 Used by the GET messages endpoint so the client can restore the 

630 live "thinking" indicator on reload without inferring it from 

631 message metadata (which fails during the wrapper-strategy 

632 preprocessing window before any progress step has persisted). 

633 

634 The partial-unique index 

635 ``ux_research_history_chat_session_in_progress`` (migration 0010) 

636 guarantees at most one matching row exists and turns this into 

637 an O(1) index lookup. 

638 """ 

639 try: 

640 with get_user_db_session(self.username) as db: 

641 row = ( 

642 db.query(ResearchHistory.id) 

643 .filter( 

644 ResearchHistory.chat_session_id == session_id, 

645 ResearchHistory.status == ResearchStatus.IN_PROGRESS, 

646 ) 

647 .first() 

648 ) 

649 return row[0] if row else None 

650 except DB_EXCEPTIONS: 

651 # Re-raise rather than swallow → the route handler can 

652 # surface a 500 so the client shows an error banner. Returning 

653 # None here is indistinguishable from "no research running", 

654 # which leaves the send button enabled and lets the user 

655 # double-submit into the unique-index guard. 

656 logger.exception( 

657 "Error fetching in-progress research id for chat session" 

658 ) 

659 raise 

660 

661 def list_sessions( 

662 self, 

663 status: str = ChatSessionStatus.ACTIVE.value, 

664 limit: int = 20, 

665 offset: int = 0, 

666 ) -> List[Dict[str, Any]]: 

667 """ 

668 List chat sessions for the user. 

669 

670 Args: 

671 status: Filter by status (active, archived, deleted, or all) 

672 limit: Maximum number of sessions to return 

673 offset: Number of sessions to skip 

674 

675 Returns: 

676 List of session data dictionaries 

677 """ 

678 try: 

679 with get_user_db_session(self.username) as db: 

680 query = db.query(ChatSession) 

681 

682 if status != "all": 

683 query = query.filter_by(status=status) 

684 

685 sessions = ( 

686 query.order_by(ChatSession.created_at.desc()) 

687 .offset(offset) 

688 .limit(limit) 

689 .all() 

690 ) 

691 

692 return [ 

693 { 

694 "id": s.id, 

695 "title": s.title, 

696 "status": s.status, 

697 "message_count": s.message_count, 

698 "created_at": _serialize_dt(s.created_at), 

699 } 

700 for s in sessions 

701 ] 

702 

703 except DB_EXCEPTIONS: 

704 # Re-raise so the route returns HTTP 500. Silently returning 

705 # [] would make a real DB failure look like a brand-new user 

706 # with no sessions, hiding the problem from operators and 

707 # confusing the UI. 

708 logger.exception("Error listing chat sessions") 

709 raise 

710 

711 def update_session_title(self, session_id: str, title: str) -> bool: 

712 """ 

713 Update the title of a chat session. 

714 

715 Args: 

716 session_id: ID of the session 

717 title: New title 

718 

719 Returns: 

720 True if updated successfully 

721 """ 

722 try: 

723 with get_user_db_session(self.username) as db: 

724 session = db.query(ChatSession).filter_by(id=session_id).first() 

725 if session: 725 ↛ 730line 725 didn't jump to line 730 because the condition on line 725 was always true

726 session.title = title 

727 

728 db.commit() 

729 return True 

730 return False 

731 

732 except DB_EXCEPTIONS: 

733 logger.exception("Error updating chat session title") 

734 return False 

735 

736 def reactivate_session(self, session_id: str) -> bool: 

737 """ 

738 Reactivate an archived or deleted chat session. 

739 

740 Args: 

741 session_id: ID of the session to reactivate 

742 

743 Returns: 

744 True if reactivated successfully 

745 """ 

746 try: 

747 with get_user_db_session(self.username) as db: 

748 session = db.query(ChatSession).filter_by(id=session_id).first() 

749 if session: 749 ↛ 755line 749 didn't jump to line 755 because the condition on line 749 was always true

750 session.status = ChatSessionStatus.ACTIVE.value 

751 

752 db.commit() 

753 logger.info(f"Reactivated chat: {session_id[:8]}...") 

754 return True 

755 return False 

756 

757 except DB_EXCEPTIONS: 

758 logger.exception("Error reactivating chat session") 

759 return False 

760 

761 def archive_session(self, session_id: str) -> bool: 

762 """ 

763 Archive a chat session. 

764 

765 Refuses to archive while a research is still in_progress for the 

766 session: archive flips the session read-only, and an in-flight 

767 research would otherwise survive as an orphaned process writing 

768 back into a session the user believes is frozen. The caller (route layer) 

769 must stop the research first (or use delete, which terminates 

770 in-flight research as a side effect). 

771 

772 Args: 

773 session_id: ID of the session to archive 

774 

775 Returns: 

776 True if archived successfully, False if the session does not 

777 exist or a DB error occurred. 

778 

779 Raises: 

780 ArchiveBlockedError: if the session has an in_progress 

781 research tied to it. The route layer maps this to HTTP 

782 409, mirroring the existing send-to-archived 409 rule. 

783 """ 

784 try: 

785 with get_user_db_session(self.username) as db: 

786 session = db.query(ChatSession).filter_by(id=session_id).first() 

787 if not session: 

788 return False 

789 

790 in_flight = ( 

791 db.query(ResearchHistory.id) 

792 .filter( 

793 ResearchHistory.chat_session_id == session_id, 

794 ResearchHistory.status == ResearchStatus.IN_PROGRESS, 

795 ) 

796 .first() 

797 ) 

798 if in_flight is not None: 

799 # Bubble up to route layer for 409 mapping. Caught 

800 # and re-raised by the inner ``except 

801 # ArchiveBlockedError`` below — the broad 

802 # ``except DB_EXCEPTIONS`` must not swallow it. 

803 raise ArchiveBlockedError( # noqa: TRY301 — re-raised by inner except ArchiveBlockedError 

804 "Cannot archive: research in_progress. Stop it first." 

805 ) 

806 

807 session.status = ChatSessionStatus.ARCHIVED.value 

808 db.commit() 

809 logger.info(f"Archived chat: {session_id[:8]}...") 

810 return True 

811 

812 except ArchiveBlockedError: 

813 # Bubble up so the route layer can produce a 409 response. 

814 raise 

815 except DB_EXCEPTIONS: 

816 logger.exception("Error archiving chat session") 

817 return False 

818 

819 def delete_session(self, session_id: str) -> bool: 

820 """ 

821 Permanently delete a chat session. 

822 

823 Cascades: ChatMessages deleted (CASCADE), ResearchHistory.chat_session_id set NULL. 

824 

825 Args: 

826 session_id: ID of the session to delete 

827 

828 Returns: 

829 True if deleted successfully 

830 """ 

831 try: 

832 # Terminate any in-progress research tied to this session, so the 

833 # FK's ON DELETE SET NULL doesn't leave it alive with a null 

834 # chat_session_id — an orphan that keeps burning LLM cycles for a 

835 # conversation the user already discarded. 

836 # 

837 # Order matters: collect the in-flight ids inside the transaction, 

838 # but set the (in-memory, non-transactional) termination flags only 

839 # AFTER the delete commits. Flagging before the commit would, on a 

840 # commit failure, kill the research of a session that still exists. 

841 with get_user_db_session(self.username) as db: 

842 session = db.query(ChatSession).filter_by(id=session_id).first() 

843 if not session: 

844 return False 

845 in_flight = ( 

846 db.query(ResearchHistory.id) 

847 .filter( 

848 ResearchHistory.chat_session_id == session_id, 

849 ResearchHistory.status == ResearchStatus.IN_PROGRESS, 

850 ) 

851 .all() 

852 ) 

853 db.delete(session) 

854 db.commit() 

855 for (rid,) in in_flight: 

856 set_termination_flag(rid) 

857 logger.info(f"Deleted chat: {session_id[:8]}...") 

858 return True 

859 

860 except DB_EXCEPTIONS: 

861 logger.exception("Error deleting chat session") 

862 return False 

863 

864 def update_accumulated_context( 

865 self, 

866 session_id: str, 

867 new_entities: Optional[List[str]] = None, 

868 new_topics: Optional[List[str]] = None, 

869 summary_addition: Optional[str] = None, 

870 source_count_delta: int = 0, 

871 ) -> bool: 

872 """ 

873 Update the accumulated context for a session. 

874 

875 Args: 

876 session_id: ID of the session 

877 new_entities: New entities to add 

878 new_topics: New topics to add 

879 summary_addition: Text to append to summary 

880 source_count_delta: Number of sources to add to count 

881 

882 Returns: 

883 True if updated successfully 

884 """ 

885 try: 

886 with get_user_db_session(self.username) as db: 

887 # with_for_update() is a no-op on SQLite but provides 

888 # row locking on PostgreSQL/MySQL if ever used 

889 session = ( 

890 db.query(ChatSession) 

891 .filter_by(id=session_id) 

892 .with_for_update() 

893 .first() 

894 ) 

895 if not session: 

896 return False 

897 

898 # Build a NEW dict and reassign so SQLAlchemy's plain JSON 

899 # column marks the row dirty. In-place mutation of the existing 

900 # dict (or reassigning the same object identity) is not 

901 # detected without MutableDict.as_mutable() — at flush time 

902 # the loaded snapshot equals the current value and no UPDATE 

903 # is emitted. Same convention as research_sources_service.py. 

904 existing_ctx = session.accumulated_context or {} 

905 ctx = dict(existing_ctx) 

906 

907 # Merge entities (deduplicate) 

908 if new_entities: 

909 existing = set(ctx.get("key_entities", [])) 

910 existing.update(new_entities) 

911 ctx["key_entities"] = list(existing)[:50] 

912 

913 # Merge topics 

914 if new_topics: 

915 existing = set(ctx.get("topics", [])) 

916 existing.update(new_topics) 

917 ctx["topics"] = list(existing)[:20] 

918 

919 # Append to summary (with size limit) 

920 if summary_addition: 

921 current = ctx.get("summary", "") 

922 new_summary = ( 

923 f"{current}\n\n{summary_addition}" 

924 if current 

925 else summary_addition 

926 ) 

927 ctx["summary"] = new_summary[-8000:] # Keep last 8000 chars 

928 

929 # Update source count 

930 if source_count_delta: 

931 ctx["source_count"] = ( 

932 ctx.get("source_count", 0) + source_count_delta 

933 ) 

934 

935 session.accumulated_context = ctx 

936 db.commit() 

937 return True 

938 

939 except DB_EXCEPTIONS: 

940 logger.exception("Error updating accumulated context") 

941 return False 

942 

943 def _fallback_title(self, query: Optional[str]) -> str: 

944 """Non-LLM title used at creation time (never blocks on I/O).""" 

945 if not query: 

946 return f"Chat {datetime.now(UTC).strftime('%Y-%m-%d %H:%M')}" 

947 if len(query) > 100: 

948 return query[:97].strip() + "..." 

949 return query.strip() 

950 

951 def _generate_title( 

952 self, 

953 query: Optional[str], 

954 settings_snapshot: Optional[Dict[str, Any]] = None, 

955 ) -> str: 

956 """ 

957 Generate a title from the initial query. 

958 

959 When chat.llm_title_generation is enabled and settings_snapshot is 

960 provided, uses an LLM for concise titles. Otherwise returns the 

961 non-LLM fallback title. 

962 """ 

963 if not query: 

964 return self._fallback_title(query) 

965 

966 if settings_snapshot: 

967 from ..config.llm_config import get_llm 

968 from ..config.thread_settings import get_setting_from_snapshot 

969 

970 if get_setting_from_snapshot( 970 ↛ 1037line 970 didn't jump to line 1037 because the condition on line 970 was always true

971 "chat.llm_title_generation", 

972 False, 

973 settings_snapshot=settings_snapshot, 

974 ): 

975 timeout = float( 

976 get_setting_from_snapshot( 

977 "chat.title_llm_timeout_seconds", 

978 _DEFAULT_TITLE_LLM_TIMEOUT_SECONDS, 

979 settings_snapshot=settings_snapshot, 

980 ) 

981 ) 

982 # Run the blocking invoke in a worker thread so the request 

983 # thread isn't parked past `timeout` by an unresponsive LLM. 

984 # `with ThreadPoolExecutor(...) as pool:` would call 

985 # shutdown(wait=True) on __exit__, defeating the timeout — 

986 # use wait=False + cancel_futures so the timeout actually fires. 

987 pool = ThreadPoolExecutor( 

988 max_workers=1, 

989 thread_name_prefix="chat-title", 

990 ) 

991 try: 

992 llm = get_llm(settings_snapshot=settings_snapshot) 

993 prompt = ( 

994 "Generate a concise 3-7 word title for this research " 

995 "query. Return ONLY the title, no quotes or " 

996 f"explanation.\n\nQuery: {query[:200]}" 

997 ) 

998 future = pool.submit(llm.invoke, prompt) 

999 try: 

1000 response = future.result(timeout=timeout) 

1001 except FuturesTimeoutError: 

1002 logger.warning( 

1003 "LLM title generation exceeded {}s timeout; " 

1004 "falling back to truncation", 

1005 timeout, 

1006 ) 

1007 return self._fallback_title(query) 

1008 # Strip CR/LF before storing: the title is later 

1009 # interpolated into loguru f-strings (e.g. the 

1010 # "title already set" log line above) — an embedded 

1011 # newline forges what looks like a second log entry 

1012 # in aggregators. Also keeps document.title / 

1013 # chatTitle.textContent visually clean. 

1014 title = ( 

1015 str(response.content) 

1016 .replace("\n", " ") 

1017 .replace("\r", " ") 

1018 .strip() 

1019 .strip("\"'")[:100] 

1020 ) 

1021 if title: 1021 ↛ 1035line 1021 didn't jump to line 1035 because the condition on line 1021 was always true

1022 return title 

1023 except Exception: 

1024 # User opted into LLM title generation via the 

1025 # `chat.llm_title_generation` setting; a silent 

1026 # debug-level swallow would hide provider misconfig, 

1027 # auth failures, or response-shape regressions in 

1028 # production where stderr level is INFO. Log with 

1029 # traceback so operators can diagnose, then fall back 

1030 # to truncation for UX continuity. 

1031 logger.exception( 

1032 "LLM title generation failed, falling back to truncation" 

1033 ) 

1034 finally: 

1035 pool.shutdown(wait=False, cancel_futures=True) 

1036 

1037 return self._fallback_title(query)