Coverage for src/local_deep_research/chat/service.py: 85%
285 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Service layer for chat functionality.
4This service handles the business logic for chat sessions and messages,
5including session management, message handling, and context building.
6"""
8from typing import Dict, Any, List, Optional
9from concurrent.futures import (
10 ThreadPoolExecutor,
11 TimeoutError as FuturesTimeoutError,
12)
13from datetime import datetime, UTC
14import uuid
15from loguru import logger
16from sqlalchemy import and_, or_, update
17from sqlalchemy.exc import SQLAlchemyError
19from ..database.models import (
20 ChatMessage,
21 ChatMessageType,
22 ChatProgressStep,
23 ChatRole,
24 ChatSession,
25 ChatSessionStatus,
26 ResearchHistory,
27)
28from ..database.session_context import get_user_db_session
29from ..web.routes.globals import set_termination_flag
30from ..constants import ResearchStatus
32# Standard exception tuple for service-layer DB operations
33DB_EXCEPTIONS = (ValueError, RuntimeError, SQLAlchemyError)
36class ArchiveBlockedError(RuntimeError):
37 """Raised when archive_session is called while a research is in_progress.
39 Archive flips a session to read-only; allowing it while research is
40 still running would leave an orphaned research tied to a session the
41 user thinks is frozen. The route layer translates this to HTTP 409.
42 """
45class ChatSessionNotFound(LookupError):
46 """Raised by get_session when no row matches the supplied session_id.
48 The route layer translates this to HTTP 404. Distinct from
49 ChatRepositoryError so a transient DB failure cannot masquerade as
50 "session does not exist".
51 """
54class ChatRepositoryError(RuntimeError):
55 """Raised by get_session when the underlying DB query fails.
57 The route layer translates this to HTTP 500. Keeping this separate
58 from ChatSessionNotFound prevents false 404s on infrastructure
59 errors (locked DB file, encryption key failure, etc.).
60 """
63# Title generation should not block the request thread on a slow LLM.
64# Wrap the synchronous llm.invoke() call in a worker future with a hard
65# wall-clock timeout. The default matches the rest of the codebase's
66# 30s LLM-timeout convention; tune via chat.title_llm_timeout_seconds.
67_DEFAULT_TITLE_LLM_TIMEOUT_SECONDS = 30.0
70def _serialize_dt(value):
71 """Return an ISO-8601 string for a datetime, or None."""
72 return value.isoformat() if value is not None else None
75class ChatService:
76 """Service for managing chat conversations and messages."""
78 def __init__(self, username: str):
79 """
80 Initialize the chat service.
82 Args:
83 username: Username for database access
84 """
85 self.username = username
87 def create_session(
88 self,
89 initial_query: Optional[str] = None,
90 title: Optional[str] = None,
91 settings_snapshot: Optional[Dict[str, Any]] = None,
92 ) -> str:
93 """
94 Create a new chat session.
96 Args:
97 initial_query: Optional initial query (used for title generation)
98 title: Optional custom title for the session
99 settings_snapshot: Optional settings for LLM title generation
101 Returns:
102 Session ID (UUID string)
103 """
104 try:
105 session_id = str(uuid.uuid4())
107 # Use fast, non-LLM fallback title synchronously. If the caller
108 # wants an LLM-generated title they trigger it asynchronously via
109 # POST /api/chat/sessions/<id>/generate-title so the creation
110 # request isn't blocked on an LLM round-trip.
111 resolved_title = title or self._fallback_title(initial_query)
113 with get_user_db_session(self.username) as db:
114 # created_at is populated by the utcnow() default on the
115 # column — no need to pass it explicitly.
116 session = ChatSession(
117 id=session_id,
118 title=resolved_title,
119 status=ChatSessionStatus.ACTIVE.value,
120 accumulated_context={
121 "key_entities": [],
122 "topics": [],
123 "summary": "",
124 "source_count": 0,
125 },
126 message_count=0,
127 )
128 db.add(session)
129 db.commit()
131 logger.info(
132 f"Created chat {session_id[:8]}... for user {self.username}"
133 )
134 return session_id
136 except DB_EXCEPTIONS:
137 logger.exception("Error creating chat session")
138 raise
140 def regenerate_title_with_llm(
141 self,
142 session_id: str,
143 query: Optional[str],
144 settings_snapshot: Optional[Dict[str, Any]],
145 ) -> Optional[str]:
146 """
147 Regenerate a session's title using the LLM.
149 Intended to be called from a dedicated endpoint the frontend fires
150 after session creation, so the create request doesn't block on the
151 LLM round-trip.
153 Idempotency: if the current title no longer matches the non-LLM
154 fallback (i.e. the user manually edited it, or a sibling tab's
155 LLM-gen already ran), skip the LLM call so we don't spend credits
156 only to overwrite the user's deliberate edit on the way back.
158 Returns the new title on success, or None on failure / no-op.
159 """
160 if not query:
161 return None
162 # Check whether the session still has the fallback title. If the
163 # user (or a concurrent generate-title request) has already moved
164 # past the fallback, don't burn an LLM call to overwrite their work.
165 try:
166 current = self.get_session(session_id)
167 except ChatSessionNotFound:
168 return None
169 current_title = (current or {}).get("title") or ""
170 fallback = self._fallback_title(query)
171 if current_title and current_title != fallback:
172 logger.info(
173 f"Skipping LLM title gen for {session_id[:8]}...: title "
174 f"already set ('{current_title[:30]}...')"
175 )
176 return None
177 new_title = self._generate_title(query, settings_snapshot)
178 if not new_title: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 return None
180 updated = self.update_session_title(session_id, new_title)
181 return new_title if updated else None
183 def add_message(
184 self,
185 session_id: str,
186 role: str,
187 content: str,
188 message_type: str,
189 research_id: Optional[str] = None,
190 allow_archived: bool = False,
191 ) -> str:
192 """
193 Add a durable message (query/followup/response) to a chat session.
195 Content is required and stored inline. Step rows live in
196 chat_progress_steps and are written via add_progress_step().
198 Args:
199 session_id: ID of the session to add message to
200 role: Message role (user or assistant)
201 content: Message content (required, non-empty)
202 message_type: Type of message (query, followup, response)
203 research_id: Optional ID of associated research
204 allow_archived: When True, the atomic-update WHERE clause omits
205 the ``status='active'`` filter so a system-written assistant
206 response can land even if the session was archived between
207 research start and completion. Use ONLY for system writes
208 (final assistant response, terminate-partial) — the user
209 send-message path MUST keep the default False so that
210 archiving a session in one browser tab still blocks a
211 concurrent user reply from another tab mid-flight.
213 Returns:
214 Message ID (UUID string)
216 Raises:
217 ValueError: if content is None. Validated before opening the DB
218 session so callers (e.g. the route layer) can return HTTP 400
219 without paying SQLCipher cold-open cost on a doomed request.
220 """
221 if content is None:
222 raise ValueError("content is required for chat messages")
223 try:
224 with get_user_db_session(self.username) as db:
225 message_id = self.insert_message_in_db(
226 db,
227 session_id=session_id,
228 role=role,
229 content=content,
230 message_type=message_type,
231 research_id=research_id,
232 allow_archived=allow_archived,
233 )
234 db.commit()
235 return message_id
237 except DB_EXCEPTIONS:
238 logger.exception("Error adding message to chat session")
239 raise
241 def insert_message_in_db(
242 self,
243 db,
244 session_id: str,
245 role: str,
246 content: str,
247 message_type: str,
248 research_id: Optional[str] = None,
249 allow_archived: bool = False,
250 ) -> str:
251 """
252 Insert a durable chat message in an active SQLAlchemy session WITHOUT
253 committing. The caller owns the transaction lifecycle and is
254 responsible for commit/rollback.
256 This exists so the route layer can atomically commit the user message
257 together with the research-history row in a single transaction —
258 avoiding the orphan-message bug that occurs if the user-message
259 commit succeeds and the research insert later raises.
261 Validation and the atomic message_count increment are identical to
262 ``add_message``; only the commit responsibility differs.
264 Raises:
265 ValueError: if role/message_type are invalid, content is None,
266 or the session row does not exist.
267 """
268 # Content is required (NOT NULL on the column).
269 # Empty string is permitted by the column (NOT NULL only rejects
270 # SQL NULL); reject Python None.
271 if content is None: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true
272 raise ValueError("content is required for chat messages")
274 # Authoritative validation via the enum constructors — raises
275 # ValueError for unknown values, which the route layer maps to HTTP
276 # 400 via ROUTE_EXCEPTIONS. Keeps failure fast (before DB hit) and
277 # avoids the HTTP-500 regression we'd get from letting SQLAlchemy's
278 # StatementError surface at commit time.
279 try:
280 ChatRole(role)
281 except ValueError as exc:
282 raise ValueError(f"Invalid role: {role!r}") from exc
283 try:
284 ChatMessageType(message_type)
285 except ValueError as exc:
286 raise ValueError(f"Invalid message_type: {message_type!r}") from exc
288 message_id = str(uuid.uuid4())
289 # Atomic increment-and-return on ChatSession.message_count.
290 # By default the WHERE clause re-checks `status='active'` so an
291 # archive PATCH racing with a user-message send cannot land on
292 # a now-archived session. When ``allow_archived=True`` (system-
293 # written assistant responses), the filter is relaxed so a
294 # final-report save can complete even if the session flipped to
295 # archived mid-research — losing the answer is worse than the
296 # "archive means stop" semantic for system writes.
297 if allow_archived:
298 where_clause = ChatSession.id == session_id
299 not_found_msg = f"Chat session {session_id} not found"
300 else:
301 where_clause = (ChatSession.id == session_id) & (
302 ChatSession.status == ChatSessionStatus.ACTIVE.value
303 )
304 not_found_msg = f"Chat session {session_id} not found or not active"
305 stmt = (
306 update(ChatSession)
307 .where(where_clause)
308 .values(message_count=ChatSession.message_count + 1)
309 .returning(ChatSession.message_count)
310 )
311 sequence = db.execute(stmt).scalar_one_or_none()
312 if sequence is None:
313 raise ValueError(not_found_msg)
315 # created_at populated by column default (utcnow()).
316 message = ChatMessage(
317 id=message_id,
318 session_id=session_id,
319 research_id=research_id,
320 role=role,
321 message_type=message_type,
322 content=content,
323 sequence_number=sequence,
324 )
325 db.add(message)
326 logger.debug(
327 f"Staged message {sequence} for chat {session_id[:8]}... (uncommitted)"
328 )
329 return message_id
331 def add_progress_step(
332 self,
333 session_id: str,
334 research_id: str,
335 content: str,
336 phase: Optional[str] = None,
337 ) -> str:
338 """
339 Add a transient research-progress step for a chat session.
341 Step rows live in chat_progress_steps and have their
342 own per-research sequence (allocated atomically against
343 ResearchHistory.step_count). They do NOT increment the chat
344 session's message_count.
346 Args:
347 session_id: ID of the parent chat session
348 research_id: ID of the research producing the step
349 content: Rendered step text (e.g. "Searching for ...")
350 phase: Optional phase tag from research_service._STEP_PHASES
352 Returns:
353 Step ID (UUID string)
354 """
355 if content is None:
356 raise ValueError("content is required for progress steps")
358 try:
359 step_id = str(uuid.uuid4())
361 with get_user_db_session(self.username) as db:
362 # Atomic increment-and-return on research_history.step_count.
363 stmt = (
364 update(ResearchHistory)
365 .where(ResearchHistory.id == research_id)
366 .values(step_count=ResearchHistory.step_count + 1)
367 .returning(ResearchHistory.step_count)
368 )
369 sequence = db.execute(stmt).scalar_one_or_none()
370 if sequence is None:
371 raise ValueError( # noqa: TRY301
372 f"Research {research_id} not found"
373 )
375 step = ChatProgressStep(
376 id=step_id,
377 research_id=research_id,
378 session_id=session_id,
379 phase=phase,
380 content=content,
381 sequence_number=sequence,
382 )
383 db.add(step)
384 db.commit()
386 logger.debug(
387 f"Added progress step {sequence} for research "
388 f"{research_id[:8]}... in chat {session_id[:8]}..."
389 )
390 return step_id
392 except DB_EXCEPTIONS:
393 logger.exception("Error adding progress step")
394 raise
396 def get_session(self, session_id: str) -> Dict[str, Any]:
397 """
398 Get a chat session by ID.
400 Args:
401 session_id: ID of the session
403 Returns:
404 Session data dictionary.
406 Raises:
407 ChatSessionNotFound: if no row matches ``session_id``.
408 Route layer maps to HTTP 404.
409 ChatRepositoryError: if the DB query itself fails. Route
410 layer maps to HTTP 500. Keeping these separate avoids
411 masking transient DB errors as "not found".
412 """
413 try:
414 with get_user_db_session(self.username) as db:
415 session = db.query(ChatSession).filter_by(id=session_id).first()
417 if not session:
418 logger.warning(f"Chat not found: {session_id[:8]}...")
419 # noqa: TRY301 — re-raised by the outer except
420 # ChatSessionNotFound below to propagate as 404.
421 raise ChatSessionNotFound(session_id) # noqa: TRY301
423 return {
424 "id": session.id,
425 "title": session.title,
426 "status": session.status,
427 "message_count": session.message_count,
428 "created_at": _serialize_dt(session.created_at),
429 "accumulated_context": session.accumulated_context,
430 }
432 except ChatSessionNotFound:
433 # Propagate as-is; this is the genuine 404 signal.
434 raise
435 except DB_EXCEPTIONS as exc:
436 logger.exception("Error getting chat session")
437 raise ChatRepositoryError(
438 f"DB error reading session {session_id[:8]}..."
439 ) from exc
441 def get_session_messages(
442 self,
443 session_id: str,
444 limit: int = 50,
445 offset: int = 0,
446 before_created_at: Optional[str] = None,
447 before_id: Optional[str] = None,
448 ) -> List[Dict[str, Any]]:
449 """
450 Get messages for a session, server-side merged with progress steps.
452 chat_messages.content is always inline; step rows live in
453 chat_progress_steps. This method merges both into a single ordered
454 stream by created_at so the client renderer (chat.js) sees a
455 unified message list with `message_type='step'` rows interleaved.
457 Pagination is SQL-level via per-table LIMIT + Python merge: each
458 table fetches at most ``limit`` rows ordered by created_at DESC,
459 the two streams are merged on the (timestamp, kind) sort key, and
460 the latest ``limit`` are returned in ASC order so the client
461 renders oldest→newest as before.
463 Pass ``before_created_at`` to fetch the page IMMEDIATELY older
464 than the given ISO timestamp (use the oldest currently-displayed
465 ``created_at`` to implement a "load older messages" trigger).
466 Without the cursor, ``offset`` selects which DESC slice to return
467 (offset=0 → newest, offset=limit → next older window, …).
469 Args:
470 session_id: ID of the session
471 limit: Maximum number of (merged) entries to return
472 offset: Number of entries to skip (DESC-ordered slice index)
473 before_created_at: Optional ISO timestamp cursor — return only
474 entries strictly older than this. Useful for cursor-based
475 "load older" pagination instead of offset arithmetic.
477 Returns:
478 List of message + step data dictionaries, ordered by
479 created_at ascending.
480 """
481 try:
482 with get_user_db_session(self.username) as db:
483 msg_q = db.query(ChatMessage).filter_by(session_id=session_id)
484 step_q = db.query(ChatProgressStep).filter_by(
485 session_id=session_id
486 )
488 if before_created_at: 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true
489 try:
490 cutoff = datetime.fromisoformat(
491 before_created_at.replace("Z", "+00:00")
492 )
493 except ValueError:
494 logger.warning(
495 "Invalid before_created_at cursor: "
496 f"{before_created_at!r} — ignoring."
497 )
498 else:
499 # Composite cursor: when `before_id` is also
500 # supplied, the filter becomes
501 # created_at < cutoff
502 # OR (created_at = cutoff AND id < before_id)
503 # which prevents same-millisecond rows at the
504 # page boundary from being silently dropped on
505 # "Load older" pagination. With a bare timestamp
506 # cursor we fall back to strict `<` for
507 # backwards-compat with older clients.
508 if before_id:
509 msg_q = msg_q.filter(
510 or_(
511 ChatMessage.created_at < cutoff,
512 and_(
513 ChatMessage.created_at == cutoff,
514 ChatMessage.id < before_id,
515 ),
516 )
517 )
518 # ChatProgressStep ids are integers but
519 # message ids are UUID strings; using the
520 # bare `<` operator on string ids gives a
521 # stable lexicographic tie-break, and
522 # progress-step rows tie-break by their
523 # own integer id (id < int(before_id) is
524 # not safe because before_id is the UUID of
525 # a chat message, not a step). For steps,
526 # drop the equality branch so duplicates
527 # rather than drops occur on tie — the
528 # client-side dedup catches them.
529 step_q = step_q.filter(
530 ChatProgressStep.created_at <= cutoff
531 )
532 else:
533 msg_q = msg_q.filter(
534 ChatMessage.created_at < cutoff
535 )
536 step_q = step_q.filter(
537 ChatProgressStep.created_at < cutoff
538 )
540 # Pull at most ``limit`` rows from EACH table in DESC
541 # order. The merged window is at most 2 * limit rows
542 # (one extreme: all from one table), which we trim to
543 # ``limit`` after the Python merge. This bounds the SQL
544 # work and avoids the old .all() cliff at large N.
545 fetch_n = limit + offset
546 # Secondary ORDER BY on sequence_number stabilises rows
547 # whose created_at collide at SQLite's millisecond
548 # precision (sqlalchemy_utc stores `%Y-%m-%d %H:%M:%S.fff`).
549 # Without it, rapid-fire inserts (paste-and-submit,
550 # auto-retries) can be returned in arbitrary order even
551 # though sequence_number is monotonic.
552 messages = (
553 msg_q.order_by(
554 ChatMessage.created_at.desc(),
555 ChatMessage.sequence_number.desc(),
556 )
557 .limit(fetch_n)
558 .all()
559 )
560 steps = (
561 step_q.order_by(
562 ChatProgressStep.created_at.desc(),
563 ChatProgressStep.sequence_number.desc(),
564 )
565 .limit(fetch_n)
566 .all()
567 )
569 merged: List[Dict[str, Any]] = []
570 for msg in messages:
571 merged.append(
572 {
573 "id": msg.id,
574 "session_id": msg.session_id,
575 "role": msg.role,
576 "message_type": msg.message_type,
577 "content": msg.content,
578 "sequence_number": msg.sequence_number,
579 "research_id": msg.research_id,
580 "created_at": _serialize_dt(msg.created_at),
581 }
582 )
583 for step in steps:
584 merged.append(
585 {
586 "id": f"step-{step.id}",
587 "session_id": step.session_id,
588 "role": "assistant",
589 "message_type": "step",
590 "content": step.content,
591 "phase": step.phase,
592 "sequence_number": step.sequence_number,
593 "research_id": step.research_id,
594 "created_at": _serialize_dt(step.created_at),
595 }
596 )
598 # Sort DESC by (created_at, sequence_number,
599 # step-before-message on tie), take the newest
600 # [offset:offset+limit] slice, then flip to ASC so the
601 # client still renders oldest→newest. Including
602 # sequence_number in the Python tie-break mirrors the
603 # SQL ORDER BY above and prevents same-timestamp messages
604 # from rendering out of insertion order.
605 merged.sort(
606 key=lambda m: (
607 m["created_at"] or "",
608 m.get("sequence_number") or 0,
609 0 if m["message_type"] == "step" else 1,
610 ),
611 reverse=True,
612 )
613 window = merged[offset : offset + limit]
614 window.reverse()
615 return window
617 except DB_EXCEPTIONS:
618 # Re-raise so the route returns HTTP 500 instead of a
619 # misleading 200 + []. An empty list here would be
620 # indistinguishable from a session that genuinely has no
621 # messages, hiding infrastructure failures from the client.
622 logger.exception("Error getting chat messages")
623 raise
625 def get_in_progress_research_id(self, session_id: str) -> Optional[str]:
626 """Return the id of the in-progress research for this chat session,
627 or ``None`` if no research is currently running.
629 Used by the GET messages endpoint so the client can restore the
630 live "thinking" indicator on reload without inferring it from
631 message metadata (which fails during the wrapper-strategy
632 preprocessing window before any progress step has persisted).
634 The partial-unique index
635 ``ux_research_history_chat_session_in_progress`` (migration 0010)
636 guarantees at most one matching row exists and turns this into
637 an O(1) index lookup.
638 """
639 try:
640 with get_user_db_session(self.username) as db:
641 row = (
642 db.query(ResearchHistory.id)
643 .filter(
644 ResearchHistory.chat_session_id == session_id,
645 ResearchHistory.status == ResearchStatus.IN_PROGRESS,
646 )
647 .first()
648 )
649 return row[0] if row else None
650 except DB_EXCEPTIONS:
651 # Re-raise rather than swallow → the route handler can
652 # surface a 500 so the client shows an error banner. Returning
653 # None here is indistinguishable from "no research running",
654 # which leaves the send button enabled and lets the user
655 # double-submit into the unique-index guard.
656 logger.exception(
657 "Error fetching in-progress research id for chat session"
658 )
659 raise
661 def list_sessions(
662 self,
663 status: str = ChatSessionStatus.ACTIVE.value,
664 limit: int = 20,
665 offset: int = 0,
666 ) -> List[Dict[str, Any]]:
667 """
668 List chat sessions for the user.
670 Args:
671 status: Filter by status (active, archived, deleted, or all)
672 limit: Maximum number of sessions to return
673 offset: Number of sessions to skip
675 Returns:
676 List of session data dictionaries
677 """
678 try:
679 with get_user_db_session(self.username) as db:
680 query = db.query(ChatSession)
682 if status != "all":
683 query = query.filter_by(status=status)
685 sessions = (
686 query.order_by(ChatSession.created_at.desc())
687 .offset(offset)
688 .limit(limit)
689 .all()
690 )
692 return [
693 {
694 "id": s.id,
695 "title": s.title,
696 "status": s.status,
697 "message_count": s.message_count,
698 "created_at": _serialize_dt(s.created_at),
699 }
700 for s in sessions
701 ]
703 except DB_EXCEPTIONS:
704 # Re-raise so the route returns HTTP 500. Silently returning
705 # [] would make a real DB failure look like a brand-new user
706 # with no sessions, hiding the problem from operators and
707 # confusing the UI.
708 logger.exception("Error listing chat sessions")
709 raise
711 def update_session_title(self, session_id: str, title: str) -> bool:
712 """
713 Update the title of a chat session.
715 Args:
716 session_id: ID of the session
717 title: New title
719 Returns:
720 True if updated successfully
721 """
722 try:
723 with get_user_db_session(self.username) as db:
724 session = db.query(ChatSession).filter_by(id=session_id).first()
725 if session: 725 ↛ 730line 725 didn't jump to line 730 because the condition on line 725 was always true
726 session.title = title
728 db.commit()
729 return True
730 return False
732 except DB_EXCEPTIONS:
733 logger.exception("Error updating chat session title")
734 return False
736 def reactivate_session(self, session_id: str) -> bool:
737 """
738 Reactivate an archived or deleted chat session.
740 Args:
741 session_id: ID of the session to reactivate
743 Returns:
744 True if reactivated successfully
745 """
746 try:
747 with get_user_db_session(self.username) as db:
748 session = db.query(ChatSession).filter_by(id=session_id).first()
749 if session: 749 ↛ 755line 749 didn't jump to line 755 because the condition on line 749 was always true
750 session.status = ChatSessionStatus.ACTIVE.value
752 db.commit()
753 logger.info(f"Reactivated chat: {session_id[:8]}...")
754 return True
755 return False
757 except DB_EXCEPTIONS:
758 logger.exception("Error reactivating chat session")
759 return False
761 def archive_session(self, session_id: str) -> bool:
762 """
763 Archive a chat session.
765 Refuses to archive while a research is still in_progress for the
766 session: archive flips the session read-only, and an in-flight
767 research would otherwise survive as an orphaned process writing
768 back into a session the user believes is frozen. The caller (route layer)
769 must stop the research first (or use delete, which terminates
770 in-flight research as a side effect).
772 Args:
773 session_id: ID of the session to archive
775 Returns:
776 True if archived successfully, False if the session does not
777 exist or a DB error occurred.
779 Raises:
780 ArchiveBlockedError: if the session has an in_progress
781 research tied to it. The route layer maps this to HTTP
782 409, mirroring the existing send-to-archived 409 rule.
783 """
784 try:
785 with get_user_db_session(self.username) as db:
786 session = db.query(ChatSession).filter_by(id=session_id).first()
787 if not session:
788 return False
790 in_flight = (
791 db.query(ResearchHistory.id)
792 .filter(
793 ResearchHistory.chat_session_id == session_id,
794 ResearchHistory.status == ResearchStatus.IN_PROGRESS,
795 )
796 .first()
797 )
798 if in_flight is not None:
799 # Bubble up to route layer for 409 mapping. Caught
800 # and re-raised by the inner ``except
801 # ArchiveBlockedError`` below — the broad
802 # ``except DB_EXCEPTIONS`` must not swallow it.
803 raise ArchiveBlockedError( # noqa: TRY301 — re-raised by inner except ArchiveBlockedError
804 "Cannot archive: research in_progress. Stop it first."
805 )
807 session.status = ChatSessionStatus.ARCHIVED.value
808 db.commit()
809 logger.info(f"Archived chat: {session_id[:8]}...")
810 return True
812 except ArchiveBlockedError:
813 # Bubble up so the route layer can produce a 409 response.
814 raise
815 except DB_EXCEPTIONS:
816 logger.exception("Error archiving chat session")
817 return False
819 def delete_session(self, session_id: str) -> bool:
820 """
821 Permanently delete a chat session.
823 Cascades: ChatMessages deleted (CASCADE), ResearchHistory.chat_session_id set NULL.
825 Args:
826 session_id: ID of the session to delete
828 Returns:
829 True if deleted successfully
830 """
831 try:
832 # Terminate any in-progress research tied to this session, so the
833 # FK's ON DELETE SET NULL doesn't leave it alive with a null
834 # chat_session_id — an orphan that keeps burning LLM cycles for a
835 # conversation the user already discarded.
836 #
837 # Order matters: collect the in-flight ids inside the transaction,
838 # but set the (in-memory, non-transactional) termination flags only
839 # AFTER the delete commits. Flagging before the commit would, on a
840 # commit failure, kill the research of a session that still exists.
841 with get_user_db_session(self.username) as db:
842 session = db.query(ChatSession).filter_by(id=session_id).first()
843 if not session:
844 return False
845 in_flight = (
846 db.query(ResearchHistory.id)
847 .filter(
848 ResearchHistory.chat_session_id == session_id,
849 ResearchHistory.status == ResearchStatus.IN_PROGRESS,
850 )
851 .all()
852 )
853 db.delete(session)
854 db.commit()
855 for (rid,) in in_flight:
856 set_termination_flag(rid)
857 logger.info(f"Deleted chat: {session_id[:8]}...")
858 return True
860 except DB_EXCEPTIONS:
861 logger.exception("Error deleting chat session")
862 return False
864 def update_accumulated_context(
865 self,
866 session_id: str,
867 new_entities: Optional[List[str]] = None,
868 new_topics: Optional[List[str]] = None,
869 summary_addition: Optional[str] = None,
870 source_count_delta: int = 0,
871 ) -> bool:
872 """
873 Update the accumulated context for a session.
875 Args:
876 session_id: ID of the session
877 new_entities: New entities to add
878 new_topics: New topics to add
879 summary_addition: Text to append to summary
880 source_count_delta: Number of sources to add to count
882 Returns:
883 True if updated successfully
884 """
885 try:
886 with get_user_db_session(self.username) as db:
887 # with_for_update() is a no-op on SQLite but provides
888 # row locking on PostgreSQL/MySQL if ever used
889 session = (
890 db.query(ChatSession)
891 .filter_by(id=session_id)
892 .with_for_update()
893 .first()
894 )
895 if not session:
896 return False
898 # Build a NEW dict and reassign so SQLAlchemy's plain JSON
899 # column marks the row dirty. In-place mutation of the existing
900 # dict (or reassigning the same object identity) is not
901 # detected without MutableDict.as_mutable() — at flush time
902 # the loaded snapshot equals the current value and no UPDATE
903 # is emitted. Same convention as research_sources_service.py.
904 existing_ctx = session.accumulated_context or {}
905 ctx = dict(existing_ctx)
907 # Merge entities (deduplicate)
908 if new_entities:
909 existing = set(ctx.get("key_entities", []))
910 existing.update(new_entities)
911 ctx["key_entities"] = list(existing)[:50]
913 # Merge topics
914 if new_topics:
915 existing = set(ctx.get("topics", []))
916 existing.update(new_topics)
917 ctx["topics"] = list(existing)[:20]
919 # Append to summary (with size limit)
920 if summary_addition:
921 current = ctx.get("summary", "")
922 new_summary = (
923 f"{current}\n\n{summary_addition}"
924 if current
925 else summary_addition
926 )
927 ctx["summary"] = new_summary[-8000:] # Keep last 8000 chars
929 # Update source count
930 if source_count_delta:
931 ctx["source_count"] = (
932 ctx.get("source_count", 0) + source_count_delta
933 )
935 session.accumulated_context = ctx
936 db.commit()
937 return True
939 except DB_EXCEPTIONS:
940 logger.exception("Error updating accumulated context")
941 return False
943 def _fallback_title(self, query: Optional[str]) -> str:
944 """Non-LLM title used at creation time (never blocks on I/O)."""
945 if not query:
946 return f"Chat {datetime.now(UTC).strftime('%Y-%m-%d %H:%M')}"
947 if len(query) > 100:
948 return query[:97].strip() + "..."
949 return query.strip()
951 def _generate_title(
952 self,
953 query: Optional[str],
954 settings_snapshot: Optional[Dict[str, Any]] = None,
955 ) -> str:
956 """
957 Generate a title from the initial query.
959 When chat.llm_title_generation is enabled and settings_snapshot is
960 provided, uses an LLM for concise titles. Otherwise returns the
961 non-LLM fallback title.
962 """
963 if not query:
964 return self._fallback_title(query)
966 if settings_snapshot:
967 from ..config.llm_config import get_llm
968 from ..config.thread_settings import get_setting_from_snapshot
970 if get_setting_from_snapshot( 970 ↛ 1037line 970 didn't jump to line 1037 because the condition on line 970 was always true
971 "chat.llm_title_generation",
972 False,
973 settings_snapshot=settings_snapshot,
974 ):
975 timeout = float(
976 get_setting_from_snapshot(
977 "chat.title_llm_timeout_seconds",
978 _DEFAULT_TITLE_LLM_TIMEOUT_SECONDS,
979 settings_snapshot=settings_snapshot,
980 )
981 )
982 # Run the blocking invoke in a worker thread so the request
983 # thread isn't parked past `timeout` by an unresponsive LLM.
984 # `with ThreadPoolExecutor(...) as pool:` would call
985 # shutdown(wait=True) on __exit__, defeating the timeout —
986 # use wait=False + cancel_futures so the timeout actually fires.
987 pool = ThreadPoolExecutor(
988 max_workers=1,
989 thread_name_prefix="chat-title",
990 )
991 try:
992 llm = get_llm(settings_snapshot=settings_snapshot)
993 prompt = (
994 "Generate a concise 3-7 word title for this research "
995 "query. Return ONLY the title, no quotes or "
996 f"explanation.\n\nQuery: {query[:200]}"
997 )
998 future = pool.submit(llm.invoke, prompt)
999 try:
1000 response = future.result(timeout=timeout)
1001 except FuturesTimeoutError:
1002 logger.warning(
1003 "LLM title generation exceeded {}s timeout; "
1004 "falling back to truncation",
1005 timeout,
1006 )
1007 return self._fallback_title(query)
1008 # Strip CR/LF before storing: the title is later
1009 # interpolated into loguru f-strings (e.g. the
1010 # "title already set" log line above) — an embedded
1011 # newline forges what looks like a second log entry
1012 # in aggregators. Also keeps document.title /
1013 # chatTitle.textContent visually clean.
1014 title = (
1015 str(response.content)
1016 .replace("\n", " ")
1017 .replace("\r", " ")
1018 .strip()
1019 .strip("\"'")[:100]
1020 )
1021 if title: 1021 ↛ 1035line 1021 didn't jump to line 1035 because the condition on line 1021 was always true
1022 return title
1023 except Exception:
1024 # User opted into LLM title generation via the
1025 # `chat.llm_title_generation` setting; a silent
1026 # debug-level swallow would hide provider misconfig,
1027 # auth failures, or response-shape regressions in
1028 # production where stderr level is INFO. Log with
1029 # traceback so operators can diagnose, then fall back
1030 # to truncation for UX continuity.
1031 logger.exception(
1032 "LLM title generation failed, falling back to truncation"
1033 )
1034 finally:
1035 pool.shutdown(wait=False, cancel_futures=True)
1037 return self._fallback_title(query)