Coverage for src/local_deep_research/advanced_search_system/findings/repository.py: 94%

186 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Findings repository for managing research findings. 

3""" 

4 

5from loguru import logger 

6from typing import Dict, List, Union 

7 

8from langchain_core.documents import Document 

9from langchain_core.language_models import BaseLanguageModel 

10 

11from ...utilities.json_utils import get_llm_response_text 

12from ...utilities.search_utilities import format_findings 

13from .base_findings import BaseFindingsRepository 

14 

15 

16def format_links(links: List[Dict]) -> str: 

17 """Format a list of links into a readable string. 

18 

19 Args: 

20 links: List of dictionaries containing 'title' and 'url' keys 

21 

22 Returns: 

23 str: Formatted string of links 

24 """ 

25 return "\n".join( 

26 f"{i + 1}. {link['title']}\n URL: {link['url']}" 

27 for i, link in enumerate(links) 

28 ) 

29 

30 

31class FindingsRepository(BaseFindingsRepository): 

32 """Repository for managing research findings.""" 

33 

34 def __init__(self, model: BaseLanguageModel): 

35 """Initialize the repository. 

36 

37 Args: 

38 model: The LLM model to use for synthesis 

39 """ 

40 super().__init__(model) 

41 self.findings: Dict[str, List[Dict]] = {} 

42 self.documents: List[Document] = [] 

43 self.questions_by_iteration: Dict[int, List[str]] = {} 

44 

45 def add_finding(self, query: str, finding: Dict | str) -> None: 

46 """Add a finding for a query.""" 

47 self.findings.setdefault(query, []) 

48 

49 # Convert to dictionary if it's a string 

50 if isinstance(finding, str): 

51 finding_dict = { 

52 "phase": "Synthesis", 

53 "content": finding, 

54 "question": query, 

55 "search_results": [], 

56 "documents": [], 

57 } 

58 self.findings[query].append(finding_dict) 

59 else: 

60 # It's already a dictionary 

61 self.findings[query].append(finding) 

62 

63 # Store raw synthesized content if it's the final synthesis 

64 # Only check for phase if it's a dictionary 

65 if ( 

66 isinstance(finding, dict) 

67 and finding.get("phase") == "Final synthesis" 

68 ): 

69 self.findings[query + "_synthesis"] = [ 

70 { 

71 "phase": "Synthesis", 

72 "content": finding.get("content", ""), 

73 "question": query, 

74 "search_results": [], 

75 "documents": [], 

76 } 

77 ] 

78 

79 logger.info( 

80 f"Added finding for query: {query}. Total findings: {len(self.findings[query])}" 

81 ) 

82 

83 def get_findings(self, query: str) -> List[Dict]: 

84 """Get findings for a query. 

85 

86 Args: 

87 query: The query to get findings for 

88 

89 Returns: 

90 List of findings for the query 

91 """ 

92 return self.findings.get(query, []) 

93 

94 def clear_findings(self, query: str) -> None: 

95 """Clear findings for a query. 

96 

97 Args: 

98 query: The query to clear findings for 

99 """ 

100 if query in self.findings: 

101 del self.findings[query] 

102 logger.info(f"Cleared findings for query: {query}") 

103 

104 def add_documents(self, documents: List[Document]) -> None: 

105 """Add documents to the repository. 

106 

107 Args: 

108 documents: List of documents to add 

109 """ 

110 self.documents.extend(documents) 

111 logger.info(f"Added {len(documents)} documents to repository") 

112 

113 def set_questions_by_iteration( 

114 self, questions_by_iteration: Dict[int, List[str]] 

115 ) -> None: 

116 """Set the questions by iteration. 

117 

118 Args: 

119 questions_by_iteration: Dictionary mapping iteration numbers to lists of questions 

120 """ 

121 self.questions_by_iteration = questions_by_iteration.copy() 

122 logger.info( 

123 f"Set questions for {len(questions_by_iteration)} iterations" 

124 ) 

125 

126 def format_findings_to_text( 

127 self, findings_list: List[Dict], synthesized_content: str 

128 ) -> str: 

129 """Format findings into a detailed text output using the utility function. 

130 

131 Args: 

132 findings_list: List of finding dictionaries from the strategy execution. 

133 synthesized_content: The final synthesized content generated by the LLM. 

134 

135 Returns: 

136 str: Formatted text output. 

137 """ 

138 logger.info( 

139 f"Formatting final report. Number of detailed findings: {len(findings_list)}. Synthesized content length: {len(synthesized_content)}. Number of question iterations: {len(self.questions_by_iteration)}" 

140 ) 

141 # Log details about the inputs 

142 logger.debug( 

143 f"Detailed findings list structure (first item type if exists): {type(findings_list[0]) if findings_list else 'Empty'}" 

144 ) 

145 logger.debug( 

146 f"Questions by iteration keys: {list(self.questions_by_iteration.keys())}" 

147 ) 

148 if findings_list: 

149 logger.debug( 

150 f"First finding item keys: {list(findings_list[0].keys())}" 

151 ) 

152 

153 try: 

154 # Pass the detailed findings list, the synthesized content (as current_knowledge), and the stored questions 

155 formatted_report = format_findings( 

156 findings_list, 

157 synthesized_content, # This goes to the 'current_knowledge' param in format_findings 

158 self.questions_by_iteration, 

159 ) 

160 logger.info("Successfully formatted final report.") 

161 return formatted_report 

162 except Exception: 

163 logger.exception("Error occurred during final report formatting") 

164 # Fallback: return just the synthesized content if formatting fails 

165 return f"Error during final formatting. Raw Synthesized Content:\n\n{synthesized_content}" 

166 

167 def synthesize_findings( 

168 self, 

169 query: str, 

170 sub_queries: List[str], 

171 findings: List[Union[Dict, str]], 

172 accumulated_knowledge: str = None, 

173 old_formatting: bool = False, 

174 ) -> str: 

175 """ 

176 Synthesize accumulated knowledge into a final answer. 

177 

178 Args: 

179 query: The original query 

180 sub_queries: List of sub-queries (for context) 

181 findings: List of findings strings or dictionaries from previous steps 

182 accumulated_knowledge: Optional pre-existing knowledge to incorporate 

183 old_formatting: Whether to use the old formatting approach 

184 

185 Returns: 

186 str: Synthesized final answer content. 

187 """ 

188 logger.info(f"synthesize_findings called with query: '{query}'") 

189 logger.info( 

190 f"sub_queries type: {type(sub_queries)}, length: {len(sub_queries)}" 

191 ) 

192 logger.info(f"findings type: {type(findings)}, length: {len(findings)}") 

193 

194 # Use provided accumulated_knowledge or join findings if it's None 

195 if accumulated_knowledge is None: 

196 # Convert findings to text if they are dictionaries 

197 finding_texts = [] 

198 for item in findings: 

199 if isinstance(item, dict) and "content" in item: 

200 finding_texts.append(item["content"]) 

201 elif isinstance(item, str): 

202 finding_texts.append(item) 

203 accumulated_knowledge = "\n\n".join(finding_texts) 

204 

205 if findings: 

206 logger.info(f"first finding type: {type(findings[0])}") 

207 if isinstance(findings[0], dict): 

208 logger.info( 

209 f"first finding keys: {list(findings[0].keys()) if hasattr(findings[0], 'keys') else 'No keys'}" 

210 ) 

211 if "content" in findings[0]: 

212 logger.info( 

213 f"first finding content type: {type(findings[0]['content'])}" 

214 ) 

215 elif isinstance(findings[0], str): 215 ↛ 221line 215 didn't jump to line 221 because the condition on line 215 was always true

216 logger.info(f"first finding string length: {len(findings[0])}") 

217 logger.info( 

218 f"first finding string preview: {findings[0][:100]}..." 

219 ) 

220 

221 if old_formatting: 

222 # Convert findings list if it contains strings instead of dictionaries 

223 findings_list = [] 

224 for i, item in enumerate(findings): 

225 if isinstance(item, str): 

226 findings_list.append( 

227 {"phase": f"Finding {i + 1}", "content": item} 

228 ) 

229 elif isinstance(item, dict): 229 ↛ 224line 229 didn't jump to line 224 because the condition on line 229 was always true

230 findings_list.append(item) 

231 

232 return format_findings( 

233 findings_list=findings_list, 

234 synthesized_content=accumulated_knowledge, 

235 questions_by_iteration=self.questions_by_iteration, 

236 ) 

237 try: 

238 # Extract finding content texts for the prompt 

239 finding_texts = [] 

240 for item in findings: 

241 if isinstance(item, dict) and "content" in item: 

242 finding_texts.append(item["content"]) 

243 elif isinstance(item, str): 

244 finding_texts.append(item) 

245 

246 # Use finding_texts for the prompt 

247 current_knowledge = ( 

248 "\n\n".join(finding_texts) if finding_texts else "" 

249 ) 

250 

251 # Check if knowledge exceeds a reasonable token limit (rough estimate based on characters) 

252 # 1 token ≈ 4 characters in English 

253 estimated_tokens = len(current_knowledge) / 4 

254 max_safe_tokens = ( 

255 12000 # Adjust based on your model's context window 

256 ) 

257 

258 if estimated_tokens > max_safe_tokens: 

259 logger.warning( 

260 f"Knowledge size may exceed model's capacity: ~{int(estimated_tokens)} tokens" 

261 ) 

262 # Truncate if needed (keeping the beginning and end which are often most important) 

263 # This is a simple approach - a more sophisticated chunking might be better 

264 if len(current_knowledge) > 24000: # ~6000 tokens 264 ↛ 276line 264 didn't jump to line 276 because the condition on line 264 was always true

265 first_part = current_knowledge[ 

266 :12000 

267 ] # ~3000 tokens from start 

268 last_part = current_knowledge[ 

269 -12000: 

270 ] # ~3000 tokens from end 

271 current_knowledge = f"{first_part}\n\n[...content truncated due to length...]\n\n{last_part}" 

272 logger.info( 

273 "Knowledge truncated to fit within token limits" 

274 ) 

275 

276 prompt = f"""Use IEEE style citations [1], [2], etc. Never make up your own citations. Synthesize the following accumulated knowledge into a comprehensive answer for the original query. 

277Format the response with clear sections, citations, and a concise summary. 

278 

279Original Query: {query} 

280 

281Accumulated Knowledge: 

282{current_knowledge} 

283 

284Sub-questions asked (for context): 

285{chr(10).join(f"- {sq}" for sq in sub_queries)} 

286 

287Generate a well-structured, concise answer that: 

2881. Starts with a clear explanation of the most important points 

2892. Organizes information into logical sections with headers if needed 

2903. Maintains logical flow and prioritizes important information over minor details 

2914. Avoids repetition and unnecessary detail 

292 

293Use IEEE style citations [1], [2], etc. Never make up your own citations. 

294""" 

295 

296 logger.info( 

297 f"Synthesizing final answer. Query: '{query}'. Knowledge length: {len(current_knowledge)}. Prompt length: {len(prompt)}" 

298 ) 

299 # Log first 500 chars of prompt for debugging context length issues 

300 logger.debug( 

301 f"Synthesis prompt (first 500 chars): {prompt[:500]}..." 

302 ) 

303 

304 try: 

305 # Add timeout handling 

306 import platform 

307 import signal 

308 import threading 

309 from contextlib import contextmanager 

310 

311 # Check if we're on Windows 

312 if platform.system() == "Windows": 

313 

314 def timeout_handler(timeout_seconds, callback, args): 

315 def handler(): 

316 callback(*args) 

317 

318 timer = threading.Timer(timeout_seconds, handler) 

319 timer.daemon = True 

320 return timer 

321 

322 def invoke_with_timeout( 

323 timeout_seconds, func, *args, **kwargs 

324 ): 

325 """ 

326 Function for implementing timeouts on Windows 

327 """ 

328 result = None 

329 exception = None 

330 completed = False 

331 

332 def target(): 

333 nonlocal result, exception, completed 

334 try: 

335 result = func(*args, **kwargs) 

336 completed = True 

337 except Exception as e: 

338 exception = e 

339 

340 thread = threading.Thread(target=target) 

341 thread.daemon = True 

342 

343 try: 

344 thread.start() 

345 thread.join(timeout_seconds) 

346 if not completed and thread.is_alive(): 

347 raise TimeoutError( 

348 f"Operation timed out after {timeout_seconds} seconds" 

349 ) 

350 if exception: 350 ↛ 351line 350 didn't jump to line 351 because the condition on line 350 was never true

351 raise exception 

352 return result 

353 finally: 

354 # Nothing to clean up 

355 pass 

356 

357 # Use Windows-compatible timeout 

358 try: 

359 logger.info( 

360 "Using Windows-compatible timeout for LLM invocation" 

361 ) 

362 response = invoke_with_timeout( 

363 120, self.model.invoke, prompt 

364 ) 

365 

366 # Normalize response (handles str/.content, strips <think> tags) 

367 synthesized_content = get_llm_response_text(response) 

368 

369 logger.info( 

370 f"Successfully synthesized final answer for query: '{query}'" 

371 ) 

372 # Return only the synthesized content from the LLM 

373 return synthesized_content 

374 except TimeoutError as timeout_error: 

375 logger.exception( 

376 f"LLM invocation timed out during synthesis for query '{query}': {timeout_error}" 

377 ) 

378 # Return more specific error about timeout 

379 return "Error: Final answer synthesis failed due to LLM timeout. Please check your LLM service or try with a smaller query scope." 

380 

381 else: 

382 # Unix-compatible timeout using SIGALRM 

383 @contextmanager 

384 def timeout(seconds, message="Operation timed out"): 

385 def signal_handler(signum, frame): 

386 raise TimeoutError(message) # noqa: TRY301 — signal handler, not caught by enclosing try 

387 

388 signal.signal(signal.SIGALRM, signal_handler) 

389 signal.alarm(seconds) 

390 try: 

391 yield 

392 finally: 

393 signal.alarm(0) 

394 

395 # Try with a timeout (adjust seconds as needed) 

396 try: 

397 with timeout( 

398 120, "LLM invocation timed out after 120 seconds" 

399 ): 

400 response = self.model.invoke(prompt) 

401 

402 # Normalize response (handles str/.content, strips <think> tags) 

403 synthesized_content = get_llm_response_text( 

404 response 

405 ) 

406 

407 logger.info( 

408 f"Successfully synthesized final answer for query: '{query}'" 

409 ) 

410 # Return only the synthesized content from the LLM 

411 return synthesized_content 

412 except TimeoutError as timeout_error: 

413 logger.exception( 

414 f"LLM invocation timed out during synthesis for query '{query}': {timeout_error}" 

415 ) 

416 # Return more specific error about timeout 

417 return "Error: Final answer synthesis failed due to LLM timeout. Please check your LLM service or try with a smaller query scope." 

418 

419 except Exception as invoke_error: 

420 logger.exception( 

421 f"LLM invocation failed during synthesis for query '{query}': {invoke_error}" 

422 ) 

423 

424 # Attempt to determine the type of error 

425 error_message = str(invoke_error).lower() 

426 error_type = "unknown" 

427 

428 if "timeout" in error_message or "timed out" in error_message: 

429 error_type = "timeout" 

430 elif ( 

431 "too many tokens" in error_message 

432 or "context length" in error_message 

433 or "token limit" in error_message 

434 ): 

435 error_type = "token_limit" 

436 elif ( 

437 "rate limit" in error_message 

438 or "rate_limit" in error_message 

439 ): 

440 error_type = "rate_limit" 

441 elif ( 

442 "connection" in error_message or "network" in error_message 

443 ): 

444 error_type = "connection" 

445 elif ( 

446 "api key" in error_message 

447 or "authentication" in error_message 

448 ): 

449 error_type = "authentication" 

450 

451 # Return more detailed error message based on type 

452 if error_type == "timeout": 

453 return "Error: Failed to synthesize final answer due to LLM timeout. Please check your connection or try again later." 

454 if error_type == "token_limit": 

455 return "Error: Failed to synthesize final answer due to token limit exceeded. Try reducing the scope of your query." 

456 if error_type == "rate_limit": 

457 return "Error: Failed to synthesize final answer due to LLM rate limit. Please try again in a few minutes." 

458 if error_type == "connection": 

459 return "Error: Failed to synthesize final answer due to connection issues. Please check your internet connection and LLM service status." 

460 if error_type == "authentication": 

461 return "Error: Failed to synthesize final answer due to authentication issues. Please check your API keys." 

462 # Generic error with details 

463 return f"Error: Failed to synthesize final answer. LLM error: {invoke_error!s}" 

464 

465 except Exception as e: 

466 # Catch potential errors during prompt construction or logging itself 

467 logger.exception( 

468 f"Error preparing or executing synthesis for query '{query}'" 

469 ) 

470 # Return a specific error message for synthesis failure 

471 return f"Error: Failed to synthesize final answer from knowledge. Details: {e!s}"