Coverage for src/local_deep_research/chat/context.py: 88%

143 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2ChatContextManager - Custom context building for multi-turn conversations. 

3 

4This is DIFFERENT from FollowUpResearchService (single parent-child context). 

5Multi-turn chat requires: 

6- Rolling window of recent messages 

7- Accumulated findings across conversation 

8- Source deduplication across turns 

9- Context summarization for long conversations 

10""" 

11 

12from typing import Dict, Any, List, Optional 

13 

14from loguru import logger 

15 

16from ..config.thread_settings import get_setting_from_snapshot 

17 

18 

19class ChatContextManager: 

20 """ 

21 Build context from multi-turn conversation history. 

22 

23 Handles: rolling window, summarization, context accumulation. 

24 Different from follow-up: accumulates from MULTIPLE previous turns. 

25 """ 

26 

27 MAX_CONTEXT_MESSAGES = 10 # Recent messages to include fully 

28 MAX_FINDINGS_TO_INCLUDE = 5 # Recent findings to include 

29 

30 # Limits for the query-focused conversation summary that becomes the 

31 # follow-up prompt's "previous findings" block. 

32 CONTEXT_SUMMARY_MAX_SENTENCES = 8 

33 CONTEXT_SUMMARY_MAX_CHARS = 2000 

34 # Transcript char budget kept below BaseSummarizer.INPUT_TRUNCATE_CHARS 

35 # (8000) so the summarizer's own truncation never has to drop the most 

36 # recent turns — we trim oldest-first ourselves below. 

37 CONTEXT_INPUT_CHAR_BUDGET = 7500 

38 

39 # Default for the chat.followup_context_mode setting (summary | raw | 

40 # full | none) — what prior context a follow-up turn receives. 

41 DEFAULT_FOLLOWUP_CONTEXT_MODE = "summary" 

42 

43 def __init__( 

44 self, 

45 session_id: str, 

46 messages: List[Dict[str, Any]], 

47 accumulated_context: Optional[Dict[str, Any]] = None, 

48 settings_snapshot: Optional[Dict[str, Any]] = None, 

49 ): 

50 """ 

51 Initialize context manager. 

52 

53 Args: 

54 session_id: Chat session ID 

55 messages: List of message dictionaries with role, content, etc. 

56 accumulated_context: Previously accumulated context from session 

57 settings_snapshot: Optional settings to override class-level defaults 

58 """ 

59 self.session_id = session_id 

60 # chat_messages no longer contains step rows (they live in 

61 # chat_progress_steps), but get_session_messages MERGES both for 

62 # client rendering. Filter out steps + non-dict entries here so 

63 # accumulated context only reflects durable conversation turns. 

64 self.messages = [ 

65 msg 

66 for msg in (messages or []) 

67 if isinstance(msg, dict) and msg.get("message_type") != "step" 

68 ] 

69 self.accumulated_context = accumulated_context or {} 

70 # Used by build_research_context to construct the LLM that produces 

71 # the query-focused conversation summary. 

72 self.settings_snapshot = settings_snapshot 

73 

74 # Override class defaults from settings if provided. 

75 # Note: settings_snapshot is the 4th keyword arg; passing it positionally 

76 # would bind it to the unused `username` param, silently using defaults. 

77 if settings_snapshot: 

78 self.MAX_CONTEXT_MESSAGES = get_setting_from_snapshot( 

79 "chat.max_context_messages", 

80 self.MAX_CONTEXT_MESSAGES, 

81 settings_snapshot=settings_snapshot, 

82 ) 

83 self.MAX_FINDINGS_TO_INCLUDE = get_setting_from_snapshot( 

84 "chat.max_findings_to_include", 

85 self.MAX_FINDINGS_TO_INCLUDE, 

86 settings_snapshot=settings_snapshot, 

87 ) 

88 

89 def build_research_context(self, current_query: str = "") -> Dict[str, Any]: 

90 """ 

91 Build context for the next research query. 

92 

93 Args: 

94 current_query: The user's new message. On a follow-up turn it is 

95 used to focus a summary of the whole prior conversation, which 

96 becomes the follow-up prompt's "previous findings". On the 

97 first turn there is no prior work to summarize. 

98 

99 Returns dict with: 

100 - session_id: Current session 

101 - conversation_history: Recent messages 

102 - accumulated_findings / past_findings: Prior work for the follow-up 

103 (query-focused summary on follow-ups; empty on the first turn) 

104 - accumulated_sources: Deduplicated sources 

105 - key_entities: Important entities mentioned 

106 - topics: Topics discussed 

107 - is_multi_turn: Whether this is a follow-up 

108 """ 

109 # The follow-up strategy reads "original_query" to anchor the prompt on 

110 # the topic that started the conversation; without it the contextual 

111 # follow-up loses the original question. Use the session's first user 

112 # message as that anchor. 

113 original_query = next( 

114 ( 

115 m.get("content", "") 

116 for m in self.messages 

117 if isinstance(m, dict) and m.get("role") == "user" 

118 ), 

119 "", 

120 ) 

121 

122 is_multi_turn = any( 

123 isinstance(m, dict) and m.get("role") == "assistant" 

124 for m in self.messages 

125 ) 

126 

127 findings = self._select_prior_findings(current_query, is_multi_turn) 

128 

129 return { 

130 "session_id": self.session_id, 

131 "original_query": original_query, 

132 "conversation_history": self._get_recent_messages(), 

133 "accumulated_findings": findings, 

134 "past_findings": findings, # Research engine expects this key 

135 "accumulated_sources": self._extract_sources_from_history(), 

136 "key_entities": self._get_key_entities(), 

137 "topics": self._get_topics(), 

138 "is_multi_turn": is_multi_turn, 

139 "turn_count": len(self.messages), 

140 } 

141 

142 def _select_prior_findings( 

143 self, current_query: str, is_multi_turn: bool 

144 ) -> str: 

145 """Pick the follow-up's "previous findings" per chat.followup_context_mode. 

146 

147 Modes: 

148 - ``summary`` (default): query-focused LLM summary of the conversation 

149 - ``raw``: recent research findings, truncated 

150 - ``full``: the entire conversation transcript 

151 - ``none``: no prior findings 

152 

153 Only follow-up turns carry prior work; the first turn returns "". 

154 """ 

155 if not is_multi_turn: 

156 return "" 

157 

158 mode = self.DEFAULT_FOLLOWUP_CONTEXT_MODE 

159 if self.settings_snapshot: 

160 mode = get_setting_from_snapshot( 

161 "chat.followup_context_mode", 

162 mode, 

163 settings_snapshot=self.settings_snapshot, 

164 ) 

165 

166 if mode == "none": 

167 findings = "" 

168 elif mode == "raw": 

169 findings = self._extract_findings_from_history() 

170 elif mode == "full": 

171 findings = self._build_conversation_text() 

172 elif current_query: 

173 # "summary" with a question to focus on. 

174 findings = self._summarize_prior_work(current_query) 

175 else: 

176 # "summary" with no question (e.g. a no-arg build_research_context 

177 # call): fall back to raw recent findings. 

178 findings = self._extract_findings_from_history() 

179 

180 # Observability: the summary path is otherwise silent (no token-counter 

181 # entry, since get_llm runs without a research_id), so a follow-up's 

182 # prior-context build looked like an unexplained pause. One line per 

183 # follow-up turn records which mode ran and how much context it built. 

184 logger.info( 

185 "Chat follow-up prior context: mode={}, {} chars", 

186 mode, 

187 len(findings), 

188 ) 

189 return findings 

190 

191 def _build_conversation_text(self) -> str: 

192 """Render the prior conversation (both roles) as a plain transcript. 

193 

194 Trims oldest-first to ``CONTEXT_INPUT_CHAR_BUDGET`` so the most recent 

195 turns survive and the summarizer's input cap never has to truncate. 

196 """ 

197 lines: List[str] = [] 

198 used = 0 

199 for msg in reversed(self.messages): 

200 if not isinstance(msg, dict): 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 continue 

202 content = (msg.get("content") or "").strip() 

203 if not content: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 continue 

205 role = (msg.get("role") or "unknown").capitalize() 

206 line = f"{role}: {content}" 

207 remaining = self.CONTEXT_INPUT_CHAR_BUDGET - used 

208 if remaining <= 0: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 break 

210 if len(line) > remaining: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 if lines: 

212 # Budget already spent on more recent turns — stop rather 

213 # than partially including an older one. 

214 break 

215 # The most recent turn alone exceeds the budget: keep its head 

216 # so the transcript still fits the summarizer's input cap. 

217 line = line[:remaining] 

218 lines.append(line) 

219 used += len(line) 

220 lines.reverse() 

221 return "\n\n".join(lines) 

222 

223 def _summarize_prior_work(self, current_query: str) -> str: 

224 """Summarize the prior conversation, focused on ``current_query``. 

225 

226 Returns an empty string when there is no prior conversation, the LLM 

227 cannot be constructed (e.g. a misconfigured provider), or the LLM call 

228 itself fails. The summary is additive context, so a failure here must 

229 not crash the follow-up request — the research dispatch that follows 

230 surfaces a genuinely-broken LLM through its own error handling. 

231 """ 

232 transcript = self._build_conversation_text() 

233 if not transcript: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 return "" 

235 

236 from ..config.llm_config import get_llm 

237 from ..advanced_search_system.summarization import FocusedSummarizer 

238 

239 try: 

240 llm = get_llm(settings_snapshot=self.settings_snapshot) 

241 except Exception: 

242 logger.opt(exception=True).debug( 

243 "Could not build LLM for chat context summary; skipping" 

244 ) 

245 return "" 

246 

247 return FocusedSummarizer( 

248 llm, 

249 focus_query=current_query, 

250 max_sentences=self.CONTEXT_SUMMARY_MAX_SENTENCES, 

251 max_chars=self.CONTEXT_SUMMARY_MAX_CHARS, 

252 ).summarize(transcript) 

253 

254 def build_prompt_context(self) -> str: 

255 """ 

256 Build a text context string suitable for including in prompts. 

257 

258 Returns a formatted string with conversation context. 

259 """ 

260 if not self.messages: 

261 return "" 

262 

263 parts = [] 

264 

265 # Add accumulated summary if available 

266 summary = self.accumulated_context.get("summary", "") 

267 if summary: 

268 parts.append("Previous conversation summary:") 

269 parts.append(summary[:2000]) # Limit summary length 

270 parts.append("") 

271 

272 # Add key entities and topics 

273 entities = self.accumulated_context.get("key_entities", []) 

274 if entities: 

275 parts.append(f"Key entities discussed: {', '.join(entities[:10])}") 

276 

277 topics = self.accumulated_context.get("topics", []) 

278 if topics: 

279 parts.append(f"Topics covered: {', '.join(topics[:10])}") 

280 

281 if entities or topics: 

282 parts.append("") 

283 

284 # Add recent conversation 

285 recent = self._get_recent_messages() 

286 if recent: 286 ↛ 296line 286 didn't jump to line 296 because the condition on line 286 was always true

287 parts.append("Recent conversation:") 

288 for msg in recent: 

289 role = (msg.get("role") or "unknown").capitalize() 

290 content = msg.get("content") or "" 

291 # Truncate long messages 

292 if len(content) > 500: 

293 content = content[:500] + "..." 

294 parts.append(f"{role}: {content}") 

295 

296 return "\n".join(parts) 

297 

298 def _get_recent_messages(self) -> List[Dict[str, Any]]: 

299 """ 

300 Get recent messages within context window. 

301 

302 Returns messages with limited content length. 

303 """ 

304 recent = self.messages[-self.MAX_CONTEXT_MESSAGES :] 

305 

306 # Return simplified message dicts 

307 return [ 

308 { 

309 "role": msg.get("role") or "unknown", 

310 "content": msg.get("content") or "", 

311 "message_type": msg.get("message_type"), 

312 "research_id": msg.get("research_id"), 

313 } 

314 for msg in recent 

315 if isinstance(msg, dict) 

316 ] 

317 

318 def _extract_findings_from_history(self) -> str: 

319 """ 

320 Extract key findings from assistant messages with research. 

321 

322 Returns combined findings text, limited in length. 

323 """ 

324 findings = [] 

325 

326 for msg in self.messages: 

327 if msg.get("role") == "assistant" and msg.get("research_id"): 

328 content = msg.get("content") or "" 

329 # Summarize long responses - take first part 

330 if len(content) > 500: 330 ↛ 332line 330 didn't jump to line 332 because the condition on line 330 was never true

331 # Try to find a natural break point 

332 break_point = content.find("\n\n", 300) 

333 if break_point == -1 or break_point > 600: 

334 break_point = 500 

335 content = content[:break_point] + "..." 

336 findings.append(content) 

337 

338 # Keep only recent findings 

339 recent_findings = findings[-self.MAX_FINDINGS_TO_INCLUDE :] 

340 return "\n\n---\n\n".join(recent_findings) 

341 

342 def _extract_sources_from_history(self) -> List[Dict[str, Any]]: 

343 """ 

344 Collect a sources-summary from accumulated context if available. 

345 

346 Per-source metadata (url, title, snippet) is NOT persisted on 

347 ChatMessage rows in the current schema, so the chat layer cannot 

348 reconstruct individual source entries from message history. What 

349 IS tracked across turns is the running source_count maintained 

350 by ChatService.update_accumulated_context(). 

351 

352 Returns either an empty list (no sources seen yet) or a 

353 single-element list containing one summary dict of shape 

354 ``[{"count": <int>}]``. Callers must NOT iterate this list as 

355 if it were a list of source records — it is a count summary 

356 wrapped in a list for consumer-shape stability. 

357 """ 

358 # Sources are tracked in accumulated_context by update_accumulated_context() 

359 # rather than per-message metadata (which is not stored in ChatMessage model) 

360 source_count = self.accumulated_context.get("source_count", 0) 

361 if source_count > 0: 

362 return [{"count": source_count}] 

363 return [] 

364 

365 def _get_key_entities(self) -> List[str]: 

366 """Get key entities from accumulated context.""" 

367 entities: List[str] = self.accumulated_context.get("key_entities", []) 

368 return entities[:20] 

369 

370 def _get_topics(self) -> List[str]: 

371 """Get topics from accumulated context.""" 

372 topics: List[str] = self.accumulated_context.get("topics", []) 

373 return topics[:10] 

374 

375 def extract_context_updates( 

376 self, 

377 new_content: str, 

378 new_sources: Optional[List[Dict[str, Any]]] = None, 

379 ) -> Dict[str, Any]: 

380 """ 

381 Extract context updates from new research response. 

382 

383 Args: 

384 new_content: New assistant response content 

385 new_sources: New sources from research 

386 

387 Returns: 

388 Dict with entities, topics, summary update, source count 

389 """ 

390 return { 

391 "new_entities": [], # Could be enhanced with NLP entity extraction 

392 "new_topics": [], # Could be enhanced with NLP topic modeling 

393 "summary_addition": self._create_summary(new_content), 

394 "source_count_delta": len(new_sources) if new_sources else 0, 

395 } 

396 

397 def _create_summary(self, content: str) -> str: 

398 """ 

399 Create a brief summary of content for context accumulation. 

400 

401 Returns first meaningful paragraph or truncated content. 

402 """ 

403 if not content: 

404 return "" 

405 

406 # Try to get first paragraph 

407 paragraphs = content.split("\n\n") 

408 for para in paragraphs: 

409 para = para.strip() 

410 # Skip headers and very short paragraphs 

411 if para and len(para) > 50 and not para.startswith("#"): 

412 if len(para) > 300: 

413 return para[:300] + "..." 

414 return para 

415 

416 # Fallback: just truncate 

417 if len(content) > 300: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true

418 return content[:300] + "..." 

419 return content