Coverage for src / local_deep_research / advanced_search_system / findings / repository.py: 73%

189 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Findings repository for managing research findings. 

3""" 

4 

5from loguru import logger 

6from typing import Dict, List, Union 

7 

8from langchain_core.documents import Document 

9from langchain_core.language_models import BaseLLM 

10 

11from ...utilities.search_utilities import format_findings 

12from .base_findings import BaseFindingsRepository 

13 

14 

15def format_links(links: List[Dict]) -> str: 

16 """Format a list of links into a readable string. 

17 

18 Args: 

19 links: List of dictionaries containing 'title' and 'url' keys 

20 

21 Returns: 

22 str: Formatted string of links 

23 """ 

24 return "\n".join( 

25 f"{i + 1}. {link['title']}\n URL: {link['url']}" 

26 for i, link in enumerate(links) 

27 ) 

28 

29 

30class FindingsRepository(BaseFindingsRepository): 

31 """Repository for managing research findings.""" 

32 

33 def __init__(self, model: BaseLLM): 

34 """Initialize the repository. 

35 

36 Args: 

37 model: The LLM model to use for synthesis 

38 """ 

39 super().__init__(model) 

40 self.findings: Dict[str, List[Dict]] = {} 

41 self.documents: List[Document] = [] 

42 self.questions_by_iteration: Dict[int, List[str]] = {} 

43 

44 def add_finding(self, query: str, finding: Dict | str) -> None: 

45 """Add a finding for a query.""" 

46 self.findings.setdefault(query, []) 

47 

48 # Convert to dictionary if it's a string 

49 if isinstance(finding, str): 

50 finding_dict = { 

51 "phase": "Synthesis", 

52 "content": finding, 

53 "question": query, 

54 "search_results": [], 

55 "documents": [], 

56 } 

57 self.findings[query].append(finding_dict) 

58 else: 

59 # It's already a dictionary 

60 self.findings[query].append(finding) 

61 

62 # Store raw synthesized content if it's the final synthesis 

63 # Only check for phase if it's a dictionary 

64 if ( 

65 isinstance(finding, dict) 

66 and finding.get("phase") == "Final synthesis" 

67 ): 

68 self.findings[query + "_synthesis"] = [ 

69 { 

70 "phase": "Synthesis", 

71 "content": finding.get("content", ""), 

72 "question": query, 

73 "search_results": [], 

74 "documents": [], 

75 } 

76 ] 

77 

78 logger.info( 

79 f"Added finding for query: {query}. Total findings: {len(self.findings[query])}" 

80 ) 

81 

82 def get_findings(self, query: str) -> List[Dict]: 

83 """Get findings for a query. 

84 

85 Args: 

86 query: The query to get findings for 

87 

88 Returns: 

89 List of findings for the query 

90 """ 

91 return self.findings.get(query, []) 

92 

93 def clear_findings(self, query: str) -> None: 

94 """Clear findings for a query. 

95 

96 Args: 

97 query: The query to clear findings for 

98 """ 

99 if query in self.findings: 

100 del self.findings[query] 

101 logger.info(f"Cleared findings for query: {query}") 

102 

103 def add_documents(self, documents: List[Document]) -> None: 

104 """Add documents to the repository. 

105 

106 Args: 

107 documents: List of documents to add 

108 """ 

109 self.documents.extend(documents) 

110 logger.info(f"Added {len(documents)} documents to repository") 

111 

112 def set_questions_by_iteration( 

113 self, questions_by_iteration: Dict[int, List[str]] 

114 ) -> None: 

115 """Set the questions by iteration. 

116 

117 Args: 

118 questions_by_iteration: Dictionary mapping iteration numbers to lists of questions 

119 """ 

120 self.questions_by_iteration = questions_by_iteration.copy() 

121 logger.info( 

122 f"Set questions for {len(questions_by_iteration)} iterations" 

123 ) 

124 

125 def format_findings_to_text( 

126 self, findings_list: List[Dict], synthesized_content: str 

127 ) -> str: 

128 """Format findings into a detailed text output using the utility function. 

129 

130 Args: 

131 findings_list: List of finding dictionaries from the strategy execution. 

132 synthesized_content: The final synthesized content generated by the LLM. 

133 

134 Returns: 

135 str: Formatted text output. 

136 """ 

137 logger.info( 

138 f"Formatting final report. Number of detailed findings: {len(findings_list)}. Synthesized content length: {len(synthesized_content)}. Number of question iterations: {len(self.questions_by_iteration)}" 

139 ) 

140 # Log details about the inputs 

141 logger.debug( 

142 f"Detailed findings list structure (first item type if exists): {type(findings_list[0]) if findings_list else 'Empty'}" 

143 ) 

144 logger.debug( 

145 f"Questions by iteration keys: {list(self.questions_by_iteration.keys())}" 

146 ) 

147 if findings_list: 

148 logger.debug( 

149 f"First finding item keys: {list(findings_list[0].keys())}" 

150 ) 

151 

152 try: 

153 # Pass the detailed findings list, the synthesized content (as current_knowledge), and the stored questions 

154 formatted_report = format_findings( 

155 findings_list, 

156 synthesized_content, # This goes to the 'current_knowledge' param in format_findings 

157 self.questions_by_iteration, 

158 ) 

159 logger.info("Successfully formatted final report.") 

160 return formatted_report 

161 except Exception: 

162 logger.exception("Error occurred during final report formatting") 

163 # Fallback: return just the synthesized content if formatting fails 

164 return f"Error during final formatting. Raw Synthesized Content:\n\n{synthesized_content}" 

165 

166 def synthesize_findings( 

167 self, 

168 query: str, 

169 sub_queries: List[str], 

170 findings: List[Union[Dict, str]], 

171 accumulated_knowledge: str = None, 

172 old_formatting: bool = False, 

173 ) -> str: 

174 """ 

175 Synthesize accumulated knowledge into a final answer. 

176 

177 Args: 

178 query: The original query 

179 sub_queries: List of sub-queries (for context) 

180 findings: List of findings strings or dictionaries from previous steps 

181 accumulated_knowledge: Optional pre-existing knowledge to incorporate 

182 old_formatting: Whether to use the old formatting approach 

183 

184 Returns: 

185 str: Synthesized final answer content. 

186 """ 

187 logger.info(f"synthesize_findings called with query: '{query}'") 

188 logger.info( 

189 f"sub_queries type: {type(sub_queries)}, length: {len(sub_queries)}" 

190 ) 

191 logger.info(f"findings type: {type(findings)}, length: {len(findings)}") 

192 

193 # Use provided accumulated_knowledge or join findings if it's None 

194 if accumulated_knowledge is None: 

195 # Convert findings to text if they are dictionaries 

196 finding_texts = [] 

197 for item in findings: 

198 if isinstance(item, dict) and "content" in item: 

199 finding_texts.append(item["content"]) 

200 elif isinstance(item, str): 

201 finding_texts.append(item) 

202 accumulated_knowledge = "\n\n".join(finding_texts) 

203 

204 if findings: 

205 logger.info(f"first finding type: {type(findings[0])}") 

206 if isinstance(findings[0], dict): 

207 logger.info( 

208 f"first finding keys: {list(findings[0].keys()) if hasattr(findings[0], 'keys') else 'No keys'}" 

209 ) 

210 if "content" in findings[0]: 

211 logger.info( 

212 f"first finding content type: {type(findings[0]['content'])}" 

213 ) 

214 elif isinstance(findings[0], str): 214 ↛ 220line 214 didn't jump to line 220 because the condition on line 214 was always true

215 logger.info(f"first finding string length: {len(findings[0])}") 

216 logger.info( 

217 f"first finding string preview: {findings[0][:100]}..." 

218 ) 

219 

220 if old_formatting: 

221 # Convert findings list if it contains strings instead of dictionaries 

222 findings_list = [] 

223 for i, item in enumerate(findings): 

224 if isinstance(item, str): 224 ↛ 228line 224 didn't jump to line 228 because the condition on line 224 was always true

225 findings_list.append( 

226 {"phase": f"Finding {i + 1}", "content": item} 

227 ) 

228 elif isinstance(item, dict): 

229 findings_list.append(item) 

230 

231 return format_findings( 

232 findings_list=findings_list, 

233 synthesized_content=accumulated_knowledge, 

234 questions_by_iteration=self.questions_by_iteration, 

235 ) 

236 try: 

237 # Extract finding content texts for the prompt 

238 finding_texts = [] 

239 for item in findings: 

240 if isinstance(item, dict) and "content" in item: 

241 finding_texts.append(item["content"]) 

242 elif isinstance(item, str): 

243 finding_texts.append(item) 

244 

245 # Use finding_texts for the prompt 

246 current_knowledge = ( 

247 "\n\n".join(finding_texts) if finding_texts else "" 

248 ) 

249 

250 # Check if knowledge exceeds a reasonable token limit (rough estimate based on characters) 

251 # 1 token ≈ 4 characters in English 

252 estimated_tokens = len(current_knowledge) / 4 

253 max_safe_tokens = ( 

254 12000 # Adjust based on your model's context window 

255 ) 

256 

257 if estimated_tokens > max_safe_tokens: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true

258 logger.warning( 

259 f"Knowledge size may exceed model's capacity: ~{int(estimated_tokens)} tokens" 

260 ) 

261 # Truncate if needed (keeping the beginning and end which are often most important) 

262 # This is a simple approach - a more sophisticated chunking might be better 

263 if len(current_knowledge) > 24000: # ~6000 tokens 

264 first_part = current_knowledge[ 

265 :12000 

266 ] # ~3000 tokens from start 

267 last_part = current_knowledge[ 

268 -12000: 

269 ] # ~3000 tokens from end 

270 current_knowledge = f"{first_part}\n\n[...content truncated due to length...]\n\n{last_part}" 

271 logger.info( 

272 "Knowledge truncated to fit within token limits" 

273 ) 

274 

275 prompt = f"""Use IEEE style citations [1], [2], etc. Never make up your own citations. Synthesize the following accumulated knowledge into a comprehensive answer for the original query. 

276Format the response with clear sections, citations, and a concise summary. 

277 

278Original Query: {query} 

279 

280Accumulated Knowledge: 

281{current_knowledge} 

282 

283Sub-questions asked (for context): 

284{chr(10).join(f"- {sq}" for sq in sub_queries)} 

285 

286Generate a well-structured, concise answer that: 

2871. Starts with a clear explanation of the most important points 

2882. Organizes information into logical sections with headers if needed 

2893. Maintains logical flow and prioritizes important information over minor details 

2904. Avoids repetition and unnecessary detail 

291 

292Use IEEE style citations [1], [2], etc. Never make up your own citations. 

293""" 

294 

295 logger.info( 

296 f"Synthesizing final answer. Query: '{query}'. Knowledge length: {len(current_knowledge)}. Prompt length: {len(prompt)}" 

297 ) 

298 # Log first 500 chars of prompt for debugging context length issues 

299 logger.debug( 

300 f"Synthesis prompt (first 500 chars): {prompt[:500]}..." 

301 ) 

302 

303 try: 

304 # Add timeout handling 

305 import platform 

306 import signal 

307 import threading 

308 from contextlib import contextmanager 

309 

310 # Check if we're on Windows 

311 if platform.system() == "Windows": 311 ↛ 313line 311 didn't jump to line 313 because the condition on line 311 was never true

312 

313 def timeout_handler(timeout_seconds, callback, args): 

314 def handler(): 

315 callback(*args) 

316 

317 timer = threading.Timer(timeout_seconds, handler) 

318 timer.daemon = True 

319 return timer 

320 

321 def invoke_with_timeout( 

322 timeout_seconds, func, *args, **kwargs 

323 ): 

324 """ 

325 Function for implementing timeouts on Windows 

326 """ 

327 result = None 

328 exception = None 

329 completed = False 

330 

331 def target(): 

332 nonlocal result, exception, completed 

333 try: 

334 result = func(*args, **kwargs) 

335 completed = True 

336 except Exception as e: 

337 exception = e 

338 

339 thread = threading.Thread(target=target) 

340 thread.daemon = True 

341 

342 try: 

343 thread.start() 

344 thread.join(timeout_seconds) 

345 if not completed and thread.is_alive(): 

346 raise TimeoutError( 

347 f"Operation timed out after {timeout_seconds} seconds" 

348 ) 

349 if exception: 

350 raise exception 

351 return result 

352 finally: 

353 # Nothing to clean up 

354 pass 

355 

356 # Use Windows-compatible timeout 

357 try: 

358 logger.info( 

359 "Using Windows-compatible timeout for LLM invocation" 

360 ) 

361 response = invoke_with_timeout( 

362 120, self.model.invoke, prompt 

363 ) 

364 

365 # Handle different response types (string or object with content attribute) 

366 if hasattr(response, "content"): 

367 synthesized_content = response.content 

368 else: 

369 # Handle string responses 

370 synthesized_content = str(response) 

371 

372 logger.info( 

373 f"Successfully synthesized final answer for query: '{query}'" 

374 ) 

375 # Return only the synthesized content from the LLM 

376 return synthesized_content 

377 except TimeoutError as timeout_error: 

378 logger.exception( 

379 f"LLM invocation timed out during synthesis for query '{query}': {timeout_error}" 

380 ) 

381 # Return more specific error about timeout 

382 return "Error: Final answer synthesis failed due to LLM timeout. Please check your LLM service or try with a smaller query scope." 

383 

384 else: 

385 # Unix-compatible timeout using SIGALRM 

386 @contextmanager 

387 def timeout(seconds, message="Operation timed out"): 

388 def signal_handler(signum, frame): 

389 raise TimeoutError(message) 

390 

391 signal.signal(signal.SIGALRM, signal_handler) 

392 signal.alarm(seconds) 

393 try: 

394 yield 

395 finally: 

396 signal.alarm(0) 

397 

398 # Try with a timeout (adjust seconds as needed) 

399 try: 

400 with timeout( 

401 120, "LLM invocation timed out after 120 seconds" 

402 ): 

403 response = self.model.invoke(prompt) 

404 

405 # Handle different response types (string or object with content attribute) 

406 if hasattr(response, "content"): 

407 synthesized_content = response.content 

408 else: 

409 # Handle string responses 

410 synthesized_content = str(response) 

411 

412 logger.info( 

413 f"Successfully synthesized final answer for query: '{query}'" 

414 ) 

415 # Return only the synthesized content from the LLM 

416 return synthesized_content 

417 except TimeoutError as timeout_error: 

418 logger.exception( 

419 f"LLM invocation timed out during synthesis for query '{query}': {timeout_error}" 

420 ) 

421 # Return more specific error about timeout 

422 return "Error: Final answer synthesis failed due to LLM timeout. Please check your LLM service or try with a smaller query scope." 

423 

424 except Exception as invoke_error: 

425 logger.exception( 

426 f"LLM invocation failed during synthesis for query '{query}': {invoke_error}" 

427 ) 

428 

429 # Attempt to determine the type of error 

430 error_message = str(invoke_error).lower() 

431 error_type = "unknown" 

432 

433 if "timeout" in error_message or "timed out" in error_message: 

434 error_type = "timeout" 

435 elif ( 

436 "too many tokens" in error_message 

437 or "context length" in error_message 

438 or "token limit" in error_message 

439 ): 

440 error_type = "token_limit" 

441 elif ( 

442 "rate limit" in error_message 

443 or "rate_limit" in error_message 

444 ): 

445 error_type = "rate_limit" 

446 elif ( 

447 "connection" in error_message or "network" in error_message 

448 ): 

449 error_type = "connection" 

450 elif ( 450 ↛ 454line 450 didn't jump to line 454 because the condition on line 450 was never true

451 "api key" in error_message 

452 or "authentication" in error_message 

453 ): 

454 error_type = "authentication" 

455 

456 # Return more detailed error message based on type 

457 if error_type == "timeout": 

458 return "Error: Failed to synthesize final answer due to LLM timeout. Please check your connection or try again later." 

459 elif error_type == "token_limit": 

460 return "Error: Failed to synthesize final answer due to token limit exceeded. Try reducing the scope of your query." 

461 elif error_type == "rate_limit": 

462 return "Error: Failed to synthesize final answer due to LLM rate limit. Please try again in a few minutes." 

463 elif error_type == "connection": 

464 return "Error: Failed to synthesize final answer due to connection issues. Please check your internet connection and LLM service status." 

465 elif error_type == "authentication": 465 ↛ 466line 465 didn't jump to line 466 because the condition on line 465 was never true

466 return "Error: Failed to synthesize final answer due to authentication issues. Please check your API keys." 

467 else: 

468 # Generic error with details 

469 return f"Error: Failed to synthesize final answer. LLM error: {invoke_error!s}" 

470 

471 except Exception as e: 

472 # Catch potential errors during prompt construction or logging itself 

473 logger.exception( 

474 f"Error preparing or executing synthesis for query '{query}'" 

475 ) 

476 # Return a specific error message for synthesis failure 

477 return f"Error: Failed to synthesize final answer from knowledge. Details: {e!s}"