Coverage for src / local_deep_research / advanced_search_system / findings / repository.py: 13%

189 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Findings repository for managing research findings. 

3""" 

4 

5from loguru import logger 

6from typing import Dict, List, Union 

7 

8from langchain_core.documents import Document 

9from langchain_core.language_models import BaseLLM 

10 

11from ...utilities.search_utilities import format_findings 

12from .base_findings import BaseFindingsRepository 

13 

14 

15def format_links(links: List[Dict]) -> str: 

16 """Format a list of links into a readable string. 

17 

18 Args: 

19 links: List of dictionaries containing 'title' and 'url' keys 

20 

21 Returns: 

22 str: Formatted string of links 

23 """ 

24 return "\n".join( 

25 f"{i + 1}. {link['title']}\n URL: {link['url']}" 

26 for i, link in enumerate(links) 

27 ) 

28 

29 

30class FindingsRepository(BaseFindingsRepository): 

31 """Repository for managing research findings.""" 

32 

33 def __init__(self, model: BaseLLM): 

34 """Initialize the repository. 

35 

36 Args: 

37 model: The LLM model to use for synthesis 

38 """ 

39 super().__init__(model) 

40 self.findings: Dict[str, List[Dict]] = {} 

41 self.documents: List[Document] = [] 

42 self.questions_by_iteration: Dict[int, List[str]] = {} 

43 

44 def add_finding(self, query: str, finding: Dict | str) -> None: 

45 """Add a finding for a query.""" 

46 self.findings.setdefault(query, []) 

47 

48 # Convert to dictionary if it's a string 

49 if isinstance(finding, str): 

50 finding_dict = { 

51 "phase": "Synthesis", 

52 "content": finding, 

53 "question": query, 

54 "search_results": [], 

55 "documents": [], 

56 } 

57 self.findings[query].append(finding_dict) 

58 else: 

59 # It's already a dictionary 

60 self.findings[query].append(finding) 

61 

62 # Store raw synthesized content if it's the final synthesis 

63 # Only check for phase if it's a dictionary 

64 if ( 

65 isinstance(finding, dict) 

66 and finding.get("phase") == "Final synthesis" 

67 ): 

68 self.findings[query + "_synthesis"] = [ 

69 { 

70 "phase": "Synthesis", 

71 "content": finding.get("content", ""), 

72 "question": query, 

73 "search_results": [], 

74 "documents": [], 

75 } 

76 ] 

77 

78 logger.info( 

79 f"Added finding for query: {query}. Total findings: {len(self.findings[query])}" 

80 ) 

81 

82 def get_findings(self, query: str) -> List[Dict]: 

83 """Get findings for a query. 

84 

85 Args: 

86 query: The query to get findings for 

87 

88 Returns: 

89 List of findings for the query 

90 """ 

91 return self.findings.get(query, []) 

92 

93 def clear_findings(self, query: str) -> None: 

94 """Clear findings for a query. 

95 

96 Args: 

97 query: The query to clear findings for 

98 """ 

99 if query in self.findings: 

100 del self.findings[query] 

101 logger.info(f"Cleared findings for query: {query}") 

102 

103 def add_documents(self, documents: List[Document]) -> None: 

104 """Add documents to the repository. 

105 

106 Args: 

107 documents: List of documents to add 

108 """ 

109 self.documents.extend(documents) 

110 logger.info(f"Added {len(documents)} documents to repository") 

111 

112 def set_questions_by_iteration( 

113 self, questions_by_iteration: Dict[int, List[str]] 

114 ) -> None: 

115 """Set the questions by iteration. 

116 

117 Args: 

118 questions_by_iteration: Dictionary mapping iteration numbers to lists of questions 

119 """ 

120 self.questions_by_iteration = questions_by_iteration.copy() 

121 logger.info( 

122 f"Set questions for {len(questions_by_iteration)} iterations" 

123 ) 

124 

125 def format_findings_to_text( 

126 self, findings_list: List[Dict], synthesized_content: str 

127 ) -> str: 

128 """Format findings into a detailed text output using the utility function. 

129 

130 Args: 

131 findings_list: List of finding dictionaries from the strategy execution. 

132 synthesized_content: The final synthesized content generated by the LLM. 

133 

134 Returns: 

135 str: Formatted text output. 

136 """ 

137 logger.info( 

138 f"Formatting final report. Number of detailed findings: {len(findings_list)}. Synthesized content length: {len(synthesized_content)}. Number of question iterations: {len(self.questions_by_iteration)}" 

139 ) 

140 # Log details about the inputs 

141 logger.debug( 

142 f"Detailed findings list structure (first item type if exists): {type(findings_list[0]) if findings_list else 'Empty'}" 

143 ) 

144 logger.debug( 

145 f"Questions by iteration keys: {list(self.questions_by_iteration.keys())}" 

146 ) 

147 if findings_list: 147 ↛ 152line 147 didn't jump to line 152 because the condition on line 147 was always true

148 logger.debug( 

149 f"First finding item keys: {list(findings_list[0].keys())}" 

150 ) 

151 

152 try: 

153 # Pass the detailed findings list, the synthesized content (as current_knowledge), and the stored questions 

154 formatted_report = format_findings( 

155 findings_list, 

156 synthesized_content, # This goes to the 'current_knowledge' param in format_findings 

157 self.questions_by_iteration, 

158 ) 

159 logger.info("Successfully formatted final report.") 

160 return formatted_report 

161 except Exception as e: 

162 logger.exception( 

163 f"Error occurred during final report formatting: {e!s}" 

164 ) 

165 # Fallback: return just the synthesized content if formatting fails 

166 return f"Error during final formatting. Raw Synthesized Content:\n\n{synthesized_content}" 

167 

168 def synthesize_findings( 

169 self, 

170 query: str, 

171 sub_queries: List[str], 

172 findings: List[Union[Dict, str]], 

173 accumulated_knowledge: str = None, 

174 old_formatting: bool = False, 

175 ) -> str: 

176 """ 

177 Synthesize accumulated knowledge into a final answer. 

178 

179 Args: 

180 query: The original query 

181 sub_queries: List of sub-queries (for context) 

182 findings: List of findings strings or dictionaries from previous steps 

183 accumulated_knowledge: Optional pre-existing knowledge to incorporate 

184 old_formatting: Whether to use the old formatting approach 

185 

186 Returns: 

187 str: Synthesized final answer content. 

188 """ 

189 logger.info(f"synthesize_findings called with query: '{query}'") 

190 logger.info( 

191 f"sub_queries type: {type(sub_queries)}, length: {len(sub_queries)}" 

192 ) 

193 logger.info(f"findings type: {type(findings)}, length: {len(findings)}") 

194 

195 # Use provided accumulated_knowledge or join findings if it's None 

196 if accumulated_knowledge is None: 

197 # Convert findings to text if they are dictionaries 

198 finding_texts = [] 

199 for item in findings: 

200 if isinstance(item, dict) and "content" in item: 

201 finding_texts.append(item["content"]) 

202 elif isinstance(item, str): 

203 finding_texts.append(item) 

204 accumulated_knowledge = "\n\n".join(finding_texts) 

205 

206 if findings: 

207 logger.info(f"first finding type: {type(findings[0])}") 

208 if isinstance(findings[0], dict): 

209 logger.info( 

210 f"first finding keys: {list(findings[0].keys()) if hasattr(findings[0], 'keys') else 'No keys'}" 

211 ) 

212 if "content" in findings[0]: 

213 logger.info( 

214 f"first finding content type: {type(findings[0]['content'])}" 

215 ) 

216 elif isinstance(findings[0], str): 

217 logger.info(f"first finding string length: {len(findings[0])}") 

218 logger.info( 

219 f"first finding string preview: {findings[0][:100]}..." 

220 ) 

221 

222 if old_formatting: 

223 # Convert findings list if it contains strings instead of dictionaries 

224 findings_list = [] 

225 for i, item in enumerate(findings): 

226 if isinstance(item, str): 

227 findings_list.append( 

228 {"phase": f"Finding {i + 1}", "content": item} 

229 ) 

230 elif isinstance(item, dict): 

231 findings_list.append(item) 

232 

233 return format_findings( 

234 findings_list=findings_list, 

235 synthesized_content=accumulated_knowledge, 

236 questions_by_iteration=self.questions_by_iteration, 

237 ) 

238 try: 

239 # Extract finding content texts for the prompt 

240 finding_texts = [] 

241 for item in findings: 

242 if isinstance(item, dict) and "content" in item: 

243 finding_texts.append(item["content"]) 

244 elif isinstance(item, str): 

245 finding_texts.append(item) 

246 

247 # Use finding_texts for the prompt 

248 current_knowledge = ( 

249 "\n\n".join(finding_texts) if finding_texts else "" 

250 ) 

251 

252 # Check if knowledge exceeds a reasonable token limit (rough estimate based on characters) 

253 # 1 token ≈ 4 characters in English 

254 estimated_tokens = len(current_knowledge) / 4 

255 max_safe_tokens = ( 

256 12000 # Adjust based on your model's context window 

257 ) 

258 

259 if estimated_tokens > max_safe_tokens: 

260 logger.warning( 

261 f"Knowledge size may exceed model's capacity: ~{int(estimated_tokens)} tokens" 

262 ) 

263 # Truncate if needed (keeping the beginning and end which are often most important) 

264 # This is a simple approach - a more sophisticated chunking might be better 

265 if len(current_knowledge) > 24000: # ~6000 tokens 

266 first_part = current_knowledge[ 

267 :12000 

268 ] # ~3000 tokens from start 

269 last_part = current_knowledge[ 

270 -12000: 

271 ] # ~3000 tokens from end 

272 current_knowledge = f"{first_part}\n\n[...content truncated due to length...]\n\n{last_part}" 

273 logger.info( 

274 "Knowledge truncated to fit within token limits" 

275 ) 

276 

277 prompt = f"""Use IEEE style citations [1], [2], etc. Never make up your own citations. Synthesize the following accumulated knowledge into a comprehensive answer for the original query. 

278Format the response with clear sections, citations, and a concise summary. 

279 

280Original Query: {query} 

281 

282Accumulated Knowledge: 

283{current_knowledge} 

284 

285Sub-questions asked (for context): 

286{chr(10).join(f"- {sq}" for sq in sub_queries)} 

287 

288Generate a well-structured, concise answer that: 

2891. Starts with a clear explanation of the most important points 

2902. Organizes information into logical sections with headers if needed 

2913. Maintains logical flow and prioritizes important information over minor details 

2924. Avoids repetition and unnecessary detail 

293 

294Use IEEE style citations [1], [2], etc. Never make up your own citations. 

295""" 

296 

297 logger.info( 

298 f"Synthesizing final answer. Query: '{query}'. Knowledge length: {len(current_knowledge)}. Prompt length: {len(prompt)}" 

299 ) 

300 # Log first 500 chars of prompt for debugging context length issues 

301 logger.debug( 

302 f"Synthesis prompt (first 500 chars): {prompt[:500]}..." 

303 ) 

304 

305 try: 

306 # Add timeout handling 

307 import platform 

308 import signal 

309 import threading 

310 from contextlib import contextmanager 

311 

312 # Check if we're on Windows 

313 if platform.system() == "Windows": 

314 

315 def timeout_handler(timeout_seconds, callback, args): 

316 def handler(): 

317 callback(*args) 

318 

319 timer = threading.Timer(timeout_seconds, handler) 

320 timer.daemon = True 

321 return timer 

322 

323 def invoke_with_timeout( 

324 timeout_seconds, func, *args, **kwargs 

325 ): 

326 """ 

327 Function for implementing timeouts on Windows 

328 """ 

329 result = None 

330 exception = None 

331 completed = False 

332 

333 def target(): 

334 nonlocal result, exception, completed 

335 try: 

336 result = func(*args, **kwargs) 

337 completed = True 

338 except Exception as e: 

339 exception = e 

340 

341 thread = threading.Thread(target=target) 

342 thread.daemon = True 

343 

344 try: 

345 thread.start() 

346 thread.join(timeout_seconds) 

347 if not completed and thread.is_alive(): 

348 raise TimeoutError( 

349 f"Operation timed out after {timeout_seconds} seconds" 

350 ) 

351 if exception: 

352 raise exception 

353 return result 

354 finally: 

355 # Nothing to clean up 

356 pass 

357 

358 # Use Windows-compatible timeout 

359 try: 

360 logger.info( 

361 "Using Windows-compatible timeout for LLM invocation" 

362 ) 

363 response = invoke_with_timeout( 

364 120, self.model.invoke, prompt 

365 ) 

366 

367 # Handle different response types (string or object with content attribute) 

368 if hasattr(response, "content"): 

369 synthesized_content = response.content 

370 else: 

371 # Handle string responses 

372 synthesized_content = str(response) 

373 

374 logger.info( 

375 f"Successfully synthesized final answer for query: '{query}'" 

376 ) 

377 # Return only the synthesized content from the LLM 

378 return synthesized_content 

379 except TimeoutError as timeout_error: 

380 logger.exception( 

381 f"LLM invocation timed out during synthesis for query '{query}': {timeout_error}" 

382 ) 

383 # Return more specific error about timeout 

384 return "Error: Final answer synthesis failed due to LLM timeout. Please check your LLM service or try with a smaller query scope." 

385 

386 else: 

387 # Unix-compatible timeout using SIGALRM 

388 @contextmanager 

389 def timeout(seconds, message="Operation timed out"): 

390 def signal_handler(signum, frame): 

391 raise TimeoutError(message) 

392 

393 signal.signal(signal.SIGALRM, signal_handler) 

394 signal.alarm(seconds) 

395 try: 

396 yield 

397 finally: 

398 signal.alarm(0) 

399 

400 # Try with a timeout (adjust seconds as needed) 

401 try: 

402 with timeout( 

403 120, "LLM invocation timed out after 120 seconds" 

404 ): 

405 response = self.model.invoke(prompt) 

406 

407 # Handle different response types (string or object with content attribute) 

408 if hasattr(response, "content"): 

409 synthesized_content = response.content 

410 else: 

411 # Handle string responses 

412 synthesized_content = str(response) 

413 

414 logger.info( 

415 f"Successfully synthesized final answer for query: '{query}'" 

416 ) 

417 # Return only the synthesized content from the LLM 

418 return synthesized_content 

419 except TimeoutError as timeout_error: 

420 logger.exception( 

421 f"LLM invocation timed out during synthesis for query '{query}': {timeout_error}" 

422 ) 

423 # Return more specific error about timeout 

424 return "Error: Final answer synthesis failed due to LLM timeout. Please check your LLM service or try with a smaller query scope." 

425 

426 except Exception as invoke_error: 

427 logger.exception( 

428 f"LLM invocation failed during synthesis for query '{query}': {invoke_error}" 

429 ) 

430 

431 # Attempt to determine the type of error 

432 error_message = str(invoke_error).lower() 

433 error_type = "unknown" 

434 

435 if "timeout" in error_message or "timed out" in error_message: 

436 error_type = "timeout" 

437 elif ( 

438 "too many tokens" in error_message 

439 or "context length" in error_message 

440 or "token limit" in error_message 

441 ): 

442 error_type = "token_limit" 

443 elif ( 

444 "rate limit" in error_message 

445 or "rate_limit" in error_message 

446 ): 

447 error_type = "rate_limit" 

448 elif ( 

449 "connection" in error_message or "network" in error_message 

450 ): 

451 error_type = "connection" 

452 elif ( 

453 "api key" in error_message 

454 or "authentication" in error_message 

455 ): 

456 error_type = "authentication" 

457 

458 # Return more detailed error message based on type 

459 if error_type == "timeout": 

460 return "Error: Failed to synthesize final answer due to LLM timeout. Please check your connection or try again later." 

461 elif error_type == "token_limit": 

462 return "Error: Failed to synthesize final answer due to token limit exceeded. Try reducing the scope of your query." 

463 elif error_type == "rate_limit": 

464 return "Error: Failed to synthesize final answer due to LLM rate limit. Please try again in a few minutes." 

465 elif error_type == "connection": 

466 return "Error: Failed to synthesize final answer due to connection issues. Please check your internet connection and LLM service status." 

467 elif error_type == "authentication": 

468 return "Error: Failed to synthesize final answer due to authentication issues. Please check your API keys." 

469 else: 

470 # Generic error with details 

471 return f"Error: Failed to synthesize final answer. LLM error: {invoke_error!s}" 

472 

473 except Exception as e: 

474 # Catch potential errors during prompt construction or logging itself 

475 logger.exception( 

476 f"Error preparing or executing synthesis for query '{query}': {e!s}" 

477 ) 

478 # Return a specific error message for synthesis failure 

479 return f"Error: Failed to synthesize final answer from knowledge. Details: {e!s}"