Coverage for src/local_deep_research/chat/context.py: 88%
143 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2ChatContextManager - Custom context building for multi-turn conversations.
4This is DIFFERENT from FollowUpResearchService (single parent-child context).
5Multi-turn chat requires:
6- Rolling window of recent messages
7- Accumulated findings across conversation
8- Source deduplication across turns
9- Context summarization for long conversations
10"""
12from typing import Dict, Any, List, Optional
14from loguru import logger
16from ..config.thread_settings import get_setting_from_snapshot
19class ChatContextManager:
20 """
21 Build context from multi-turn conversation history.
23 Handles: rolling window, summarization, context accumulation.
24 Different from follow-up: accumulates from MULTIPLE previous turns.
25 """
27 MAX_CONTEXT_MESSAGES = 10 # Recent messages to include fully
28 MAX_FINDINGS_TO_INCLUDE = 5 # Recent findings to include
30 # Limits for the query-focused conversation summary that becomes the
31 # follow-up prompt's "previous findings" block.
32 CONTEXT_SUMMARY_MAX_SENTENCES = 8
33 CONTEXT_SUMMARY_MAX_CHARS = 2000
34 # Transcript char budget kept below BaseSummarizer.INPUT_TRUNCATE_CHARS
35 # (8000) so the summarizer's own truncation never has to drop the most
36 # recent turns — we trim oldest-first ourselves below.
37 CONTEXT_INPUT_CHAR_BUDGET = 7500
39 # Default for the chat.followup_context_mode setting (summary | raw |
40 # full | none) — what prior context a follow-up turn receives.
41 DEFAULT_FOLLOWUP_CONTEXT_MODE = "summary"
43 def __init__(
44 self,
45 session_id: str,
46 messages: List[Dict[str, Any]],
47 accumulated_context: Optional[Dict[str, Any]] = None,
48 settings_snapshot: Optional[Dict[str, Any]] = None,
49 ):
50 """
51 Initialize context manager.
53 Args:
54 session_id: Chat session ID
55 messages: List of message dictionaries with role, content, etc.
56 accumulated_context: Previously accumulated context from session
57 settings_snapshot: Optional settings to override class-level defaults
58 """
59 self.session_id = session_id
60 # chat_messages no longer contains step rows (they live in
61 # chat_progress_steps), but get_session_messages MERGES both for
62 # client rendering. Filter out steps + non-dict entries here so
63 # accumulated context only reflects durable conversation turns.
64 self.messages = [
65 msg
66 for msg in (messages or [])
67 if isinstance(msg, dict) and msg.get("message_type") != "step"
68 ]
69 self.accumulated_context = accumulated_context or {}
70 # Used by build_research_context to construct the LLM that produces
71 # the query-focused conversation summary.
72 self.settings_snapshot = settings_snapshot
74 # Override class defaults from settings if provided.
75 # Note: settings_snapshot is the 4th keyword arg; passing it positionally
76 # would bind it to the unused `username` param, silently using defaults.
77 if settings_snapshot:
78 self.MAX_CONTEXT_MESSAGES = get_setting_from_snapshot(
79 "chat.max_context_messages",
80 self.MAX_CONTEXT_MESSAGES,
81 settings_snapshot=settings_snapshot,
82 )
83 self.MAX_FINDINGS_TO_INCLUDE = get_setting_from_snapshot(
84 "chat.max_findings_to_include",
85 self.MAX_FINDINGS_TO_INCLUDE,
86 settings_snapshot=settings_snapshot,
87 )
89 def build_research_context(self, current_query: str = "") -> Dict[str, Any]:
90 """
91 Build context for the next research query.
93 Args:
94 current_query: The user's new message. On a follow-up turn it is
95 used to focus a summary of the whole prior conversation, which
96 becomes the follow-up prompt's "previous findings". On the
97 first turn there is no prior work to summarize.
99 Returns dict with:
100 - session_id: Current session
101 - conversation_history: Recent messages
102 - accumulated_findings / past_findings: Prior work for the follow-up
103 (query-focused summary on follow-ups; empty on the first turn)
104 - accumulated_sources: Deduplicated sources
105 - key_entities: Important entities mentioned
106 - topics: Topics discussed
107 - is_multi_turn: Whether this is a follow-up
108 """
109 # The follow-up strategy reads "original_query" to anchor the prompt on
110 # the topic that started the conversation; without it the contextual
111 # follow-up loses the original question. Use the session's first user
112 # message as that anchor.
113 original_query = next(
114 (
115 m.get("content", "")
116 for m in self.messages
117 if isinstance(m, dict) and m.get("role") == "user"
118 ),
119 "",
120 )
122 is_multi_turn = any(
123 isinstance(m, dict) and m.get("role") == "assistant"
124 for m in self.messages
125 )
127 findings = self._select_prior_findings(current_query, is_multi_turn)
129 return {
130 "session_id": self.session_id,
131 "original_query": original_query,
132 "conversation_history": self._get_recent_messages(),
133 "accumulated_findings": findings,
134 "past_findings": findings, # Research engine expects this key
135 "accumulated_sources": self._extract_sources_from_history(),
136 "key_entities": self._get_key_entities(),
137 "topics": self._get_topics(),
138 "is_multi_turn": is_multi_turn,
139 "turn_count": len(self.messages),
140 }
142 def _select_prior_findings(
143 self, current_query: str, is_multi_turn: bool
144 ) -> str:
145 """Pick the follow-up's "previous findings" per chat.followup_context_mode.
147 Modes:
148 - ``summary`` (default): query-focused LLM summary of the conversation
149 - ``raw``: recent research findings, truncated
150 - ``full``: the entire conversation transcript
151 - ``none``: no prior findings
153 Only follow-up turns carry prior work; the first turn returns "".
154 """
155 if not is_multi_turn:
156 return ""
158 mode = self.DEFAULT_FOLLOWUP_CONTEXT_MODE
159 if self.settings_snapshot:
160 mode = get_setting_from_snapshot(
161 "chat.followup_context_mode",
162 mode,
163 settings_snapshot=self.settings_snapshot,
164 )
166 if mode == "none":
167 findings = ""
168 elif mode == "raw":
169 findings = self._extract_findings_from_history()
170 elif mode == "full":
171 findings = self._build_conversation_text()
172 elif current_query:
173 # "summary" with a question to focus on.
174 findings = self._summarize_prior_work(current_query)
175 else:
176 # "summary" with no question (e.g. a no-arg build_research_context
177 # call): fall back to raw recent findings.
178 findings = self._extract_findings_from_history()
180 # Observability: the summary path is otherwise silent (no token-counter
181 # entry, since get_llm runs without a research_id), so a follow-up's
182 # prior-context build looked like an unexplained pause. One line per
183 # follow-up turn records which mode ran and how much context it built.
184 logger.info(
185 "Chat follow-up prior context: mode={}, {} chars",
186 mode,
187 len(findings),
188 )
189 return findings
191 def _build_conversation_text(self) -> str:
192 """Render the prior conversation (both roles) as a plain transcript.
194 Trims oldest-first to ``CONTEXT_INPUT_CHAR_BUDGET`` so the most recent
195 turns survive and the summarizer's input cap never has to truncate.
196 """
197 lines: List[str] = []
198 used = 0
199 for msg in reversed(self.messages):
200 if not isinstance(msg, dict): 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true
201 continue
202 content = (msg.get("content") or "").strip()
203 if not content: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 continue
205 role = (msg.get("role") or "unknown").capitalize()
206 line = f"{role}: {content}"
207 remaining = self.CONTEXT_INPUT_CHAR_BUDGET - used
208 if remaining <= 0: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 break
210 if len(line) > remaining: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 if lines:
212 # Budget already spent on more recent turns — stop rather
213 # than partially including an older one.
214 break
215 # The most recent turn alone exceeds the budget: keep its head
216 # so the transcript still fits the summarizer's input cap.
217 line = line[:remaining]
218 lines.append(line)
219 used += len(line)
220 lines.reverse()
221 return "\n\n".join(lines)
223 def _summarize_prior_work(self, current_query: str) -> str:
224 """Summarize the prior conversation, focused on ``current_query``.
226 Returns an empty string when there is no prior conversation, the LLM
227 cannot be constructed (e.g. a misconfigured provider), or the LLM call
228 itself fails. The summary is additive context, so a failure here must
229 not crash the follow-up request — the research dispatch that follows
230 surfaces a genuinely-broken LLM through its own error handling.
231 """
232 transcript = self._build_conversation_text()
233 if not transcript: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 return ""
236 from ..config.llm_config import get_llm
237 from ..advanced_search_system.summarization import FocusedSummarizer
239 try:
240 llm = get_llm(settings_snapshot=self.settings_snapshot)
241 except Exception:
242 logger.opt(exception=True).debug(
243 "Could not build LLM for chat context summary; skipping"
244 )
245 return ""
247 return FocusedSummarizer(
248 llm,
249 focus_query=current_query,
250 max_sentences=self.CONTEXT_SUMMARY_MAX_SENTENCES,
251 max_chars=self.CONTEXT_SUMMARY_MAX_CHARS,
252 ).summarize(transcript)
254 def build_prompt_context(self) -> str:
255 """
256 Build a text context string suitable for including in prompts.
258 Returns a formatted string with conversation context.
259 """
260 if not self.messages:
261 return ""
263 parts = []
265 # Add accumulated summary if available
266 summary = self.accumulated_context.get("summary", "")
267 if summary:
268 parts.append("Previous conversation summary:")
269 parts.append(summary[:2000]) # Limit summary length
270 parts.append("")
272 # Add key entities and topics
273 entities = self.accumulated_context.get("key_entities", [])
274 if entities:
275 parts.append(f"Key entities discussed: {', '.join(entities[:10])}")
277 topics = self.accumulated_context.get("topics", [])
278 if topics:
279 parts.append(f"Topics covered: {', '.join(topics[:10])}")
281 if entities or topics:
282 parts.append("")
284 # Add recent conversation
285 recent = self._get_recent_messages()
286 if recent: 286 ↛ 296line 286 didn't jump to line 296 because the condition on line 286 was always true
287 parts.append("Recent conversation:")
288 for msg in recent:
289 role = (msg.get("role") or "unknown").capitalize()
290 content = msg.get("content") or ""
291 # Truncate long messages
292 if len(content) > 500:
293 content = content[:500] + "..."
294 parts.append(f"{role}: {content}")
296 return "\n".join(parts)
298 def _get_recent_messages(self) -> List[Dict[str, Any]]:
299 """
300 Get recent messages within context window.
302 Returns messages with limited content length.
303 """
304 recent = self.messages[-self.MAX_CONTEXT_MESSAGES :]
306 # Return simplified message dicts
307 return [
308 {
309 "role": msg.get("role") or "unknown",
310 "content": msg.get("content") or "",
311 "message_type": msg.get("message_type"),
312 "research_id": msg.get("research_id"),
313 }
314 for msg in recent
315 if isinstance(msg, dict)
316 ]
318 def _extract_findings_from_history(self) -> str:
319 """
320 Extract key findings from assistant messages with research.
322 Returns combined findings text, limited in length.
323 """
324 findings = []
326 for msg in self.messages:
327 if msg.get("role") == "assistant" and msg.get("research_id"):
328 content = msg.get("content") or ""
329 # Summarize long responses - take first part
330 if len(content) > 500: 330 ↛ 332line 330 didn't jump to line 332 because the condition on line 330 was never true
331 # Try to find a natural break point
332 break_point = content.find("\n\n", 300)
333 if break_point == -1 or break_point > 600:
334 break_point = 500
335 content = content[:break_point] + "..."
336 findings.append(content)
338 # Keep only recent findings
339 recent_findings = findings[-self.MAX_FINDINGS_TO_INCLUDE :]
340 return "\n\n---\n\n".join(recent_findings)
342 def _extract_sources_from_history(self) -> List[Dict[str, Any]]:
343 """
344 Collect a sources-summary from accumulated context if available.
346 Per-source metadata (url, title, snippet) is NOT persisted on
347 ChatMessage rows in the current schema, so the chat layer cannot
348 reconstruct individual source entries from message history. What
349 IS tracked across turns is the running source_count maintained
350 by ChatService.update_accumulated_context().
352 Returns either an empty list (no sources seen yet) or a
353 single-element list containing one summary dict of shape
354 ``[{"count": <int>}]``. Callers must NOT iterate this list as
355 if it were a list of source records — it is a count summary
356 wrapped in a list for consumer-shape stability.
357 """
358 # Sources are tracked in accumulated_context by update_accumulated_context()
359 # rather than per-message metadata (which is not stored in ChatMessage model)
360 source_count = self.accumulated_context.get("source_count", 0)
361 if source_count > 0:
362 return [{"count": source_count}]
363 return []
365 def _get_key_entities(self) -> List[str]:
366 """Get key entities from accumulated context."""
367 entities: List[str] = self.accumulated_context.get("key_entities", [])
368 return entities[:20]
370 def _get_topics(self) -> List[str]:
371 """Get topics from accumulated context."""
372 topics: List[str] = self.accumulated_context.get("topics", [])
373 return topics[:10]
375 def extract_context_updates(
376 self,
377 new_content: str,
378 new_sources: Optional[List[Dict[str, Any]]] = None,
379 ) -> Dict[str, Any]:
380 """
381 Extract context updates from new research response.
383 Args:
384 new_content: New assistant response content
385 new_sources: New sources from research
387 Returns:
388 Dict with entities, topics, summary update, source count
389 """
390 return {
391 "new_entities": [], # Could be enhanced with NLP entity extraction
392 "new_topics": [], # Could be enhanced with NLP topic modeling
393 "summary_addition": self._create_summary(new_content),
394 "source_count_delta": len(new_sources) if new_sources else 0,
395 }
397 def _create_summary(self, content: str) -> str:
398 """
399 Create a brief summary of content for context accumulation.
401 Returns first meaningful paragraph or truncated content.
402 """
403 if not content:
404 return ""
406 # Try to get first paragraph
407 paragraphs = content.split("\n\n")
408 for para in paragraphs:
409 para = para.strip()
410 # Skip headers and very short paragraphs
411 if para and len(para) > 50 and not para.startswith("#"):
412 if len(para) > 300:
413 return para[:300] + "..."
414 return para
416 # Fallback: just truncate
417 if len(content) > 300: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true
418 return content[:300] + "..."
419 return content