Coverage for src / local_deep_research / advanced_search_system / findings / repository.py: 73%
189 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Findings repository for managing research findings.
3"""
5from loguru import logger
6from typing import Dict, List, Union
8from langchain_core.documents import Document
9from langchain_core.language_models import BaseLLM
11from ...utilities.search_utilities import format_findings
12from .base_findings import BaseFindingsRepository
15def format_links(links: List[Dict]) -> str:
16 """Format a list of links into a readable string.
18 Args:
19 links: List of dictionaries containing 'title' and 'url' keys
21 Returns:
22 str: Formatted string of links
23 """
24 return "\n".join(
25 f"{i + 1}. {link['title']}\n URL: {link['url']}"
26 for i, link in enumerate(links)
27 )
30class FindingsRepository(BaseFindingsRepository):
31 """Repository for managing research findings."""
33 def __init__(self, model: BaseLLM):
34 """Initialize the repository.
36 Args:
37 model: The LLM model to use for synthesis
38 """
39 super().__init__(model)
40 self.findings: Dict[str, List[Dict]] = {}
41 self.documents: List[Document] = []
42 self.questions_by_iteration: Dict[int, List[str]] = {}
44 def add_finding(self, query: str, finding: Dict | str) -> None:
45 """Add a finding for a query."""
46 self.findings.setdefault(query, [])
48 # Convert to dictionary if it's a string
49 if isinstance(finding, str):
50 finding_dict = {
51 "phase": "Synthesis",
52 "content": finding,
53 "question": query,
54 "search_results": [],
55 "documents": [],
56 }
57 self.findings[query].append(finding_dict)
58 else:
59 # It's already a dictionary
60 self.findings[query].append(finding)
62 # Store raw synthesized content if it's the final synthesis
63 # Only check for phase if it's a dictionary
64 if (
65 isinstance(finding, dict)
66 and finding.get("phase") == "Final synthesis"
67 ):
68 self.findings[query + "_synthesis"] = [
69 {
70 "phase": "Synthesis",
71 "content": finding.get("content", ""),
72 "question": query,
73 "search_results": [],
74 "documents": [],
75 }
76 ]
78 logger.info(
79 f"Added finding for query: {query}. Total findings: {len(self.findings[query])}"
80 )
82 def get_findings(self, query: str) -> List[Dict]:
83 """Get findings for a query.
85 Args:
86 query: The query to get findings for
88 Returns:
89 List of findings for the query
90 """
91 return self.findings.get(query, [])
93 def clear_findings(self, query: str) -> None:
94 """Clear findings for a query.
96 Args:
97 query: The query to clear findings for
98 """
99 if query in self.findings:
100 del self.findings[query]
101 logger.info(f"Cleared findings for query: {query}")
103 def add_documents(self, documents: List[Document]) -> None:
104 """Add documents to the repository.
106 Args:
107 documents: List of documents to add
108 """
109 self.documents.extend(documents)
110 logger.info(f"Added {len(documents)} documents to repository")
112 def set_questions_by_iteration(
113 self, questions_by_iteration: Dict[int, List[str]]
114 ) -> None:
115 """Set the questions by iteration.
117 Args:
118 questions_by_iteration: Dictionary mapping iteration numbers to lists of questions
119 """
120 self.questions_by_iteration = questions_by_iteration.copy()
121 logger.info(
122 f"Set questions for {len(questions_by_iteration)} iterations"
123 )
125 def format_findings_to_text(
126 self, findings_list: List[Dict], synthesized_content: str
127 ) -> str:
128 """Format findings into a detailed text output using the utility function.
130 Args:
131 findings_list: List of finding dictionaries from the strategy execution.
132 synthesized_content: The final synthesized content generated by the LLM.
134 Returns:
135 str: Formatted text output.
136 """
137 logger.info(
138 f"Formatting final report. Number of detailed findings: {len(findings_list)}. Synthesized content length: {len(synthesized_content)}. Number of question iterations: {len(self.questions_by_iteration)}"
139 )
140 # Log details about the inputs
141 logger.debug(
142 f"Detailed findings list structure (first item type if exists): {type(findings_list[0]) if findings_list else 'Empty'}"
143 )
144 logger.debug(
145 f"Questions by iteration keys: {list(self.questions_by_iteration.keys())}"
146 )
147 if findings_list:
148 logger.debug(
149 f"First finding item keys: {list(findings_list[0].keys())}"
150 )
152 try:
153 # Pass the detailed findings list, the synthesized content (as current_knowledge), and the stored questions
154 formatted_report = format_findings(
155 findings_list,
156 synthesized_content, # This goes to the 'current_knowledge' param in format_findings
157 self.questions_by_iteration,
158 )
159 logger.info("Successfully formatted final report.")
160 return formatted_report
161 except Exception:
162 logger.exception("Error occurred during final report formatting")
163 # Fallback: return just the synthesized content if formatting fails
164 return f"Error during final formatting. Raw Synthesized Content:\n\n{synthesized_content}"
166 def synthesize_findings(
167 self,
168 query: str,
169 sub_queries: List[str],
170 findings: List[Union[Dict, str]],
171 accumulated_knowledge: str = None,
172 old_formatting: bool = False,
173 ) -> str:
174 """
175 Synthesize accumulated knowledge into a final answer.
177 Args:
178 query: The original query
179 sub_queries: List of sub-queries (for context)
180 findings: List of findings strings or dictionaries from previous steps
181 accumulated_knowledge: Optional pre-existing knowledge to incorporate
182 old_formatting: Whether to use the old formatting approach
184 Returns:
185 str: Synthesized final answer content.
186 """
187 logger.info(f"synthesize_findings called with query: '{query}'")
188 logger.info(
189 f"sub_queries type: {type(sub_queries)}, length: {len(sub_queries)}"
190 )
191 logger.info(f"findings type: {type(findings)}, length: {len(findings)}")
193 # Use provided accumulated_knowledge or join findings if it's None
194 if accumulated_knowledge is None:
195 # Convert findings to text if they are dictionaries
196 finding_texts = []
197 for item in findings:
198 if isinstance(item, dict) and "content" in item:
199 finding_texts.append(item["content"])
200 elif isinstance(item, str):
201 finding_texts.append(item)
202 accumulated_knowledge = "\n\n".join(finding_texts)
204 if findings:
205 logger.info(f"first finding type: {type(findings[0])}")
206 if isinstance(findings[0], dict):
207 logger.info(
208 f"first finding keys: {list(findings[0].keys()) if hasattr(findings[0], 'keys') else 'No keys'}"
209 )
210 if "content" in findings[0]:
211 logger.info(
212 f"first finding content type: {type(findings[0]['content'])}"
213 )
214 elif isinstance(findings[0], str): 214 ↛ 220line 214 didn't jump to line 220 because the condition on line 214 was always true
215 logger.info(f"first finding string length: {len(findings[0])}")
216 logger.info(
217 f"first finding string preview: {findings[0][:100]}..."
218 )
220 if old_formatting:
221 # Convert findings list if it contains strings instead of dictionaries
222 findings_list = []
223 for i, item in enumerate(findings):
224 if isinstance(item, str): 224 ↛ 228line 224 didn't jump to line 228 because the condition on line 224 was always true
225 findings_list.append(
226 {"phase": f"Finding {i + 1}", "content": item}
227 )
228 elif isinstance(item, dict):
229 findings_list.append(item)
231 return format_findings(
232 findings_list=findings_list,
233 synthesized_content=accumulated_knowledge,
234 questions_by_iteration=self.questions_by_iteration,
235 )
236 try:
237 # Extract finding content texts for the prompt
238 finding_texts = []
239 for item in findings:
240 if isinstance(item, dict) and "content" in item:
241 finding_texts.append(item["content"])
242 elif isinstance(item, str):
243 finding_texts.append(item)
245 # Use finding_texts for the prompt
246 current_knowledge = (
247 "\n\n".join(finding_texts) if finding_texts else ""
248 )
250 # Check if knowledge exceeds a reasonable token limit (rough estimate based on characters)
251 # 1 token ≈ 4 characters in English
252 estimated_tokens = len(current_knowledge) / 4
253 max_safe_tokens = (
254 12000 # Adjust based on your model's context window
255 )
257 if estimated_tokens > max_safe_tokens: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true
258 logger.warning(
259 f"Knowledge size may exceed model's capacity: ~{int(estimated_tokens)} tokens"
260 )
261 # Truncate if needed (keeping the beginning and end which are often most important)
262 # This is a simple approach - a more sophisticated chunking might be better
263 if len(current_knowledge) > 24000: # ~6000 tokens
264 first_part = current_knowledge[
265 :12000
266 ] # ~3000 tokens from start
267 last_part = current_knowledge[
268 -12000:
269 ] # ~3000 tokens from end
270 current_knowledge = f"{first_part}\n\n[...content truncated due to length...]\n\n{last_part}"
271 logger.info(
272 "Knowledge truncated to fit within token limits"
273 )
275 prompt = f"""Use IEEE style citations [1], [2], etc. Never make up your own citations. Synthesize the following accumulated knowledge into a comprehensive answer for the original query.
276Format the response with clear sections, citations, and a concise summary.
278Original Query: {query}
280Accumulated Knowledge:
281{current_knowledge}
283Sub-questions asked (for context):
284{chr(10).join(f"- {sq}" for sq in sub_queries)}
286Generate a well-structured, concise answer that:
2871. Starts with a clear explanation of the most important points
2882. Organizes information into logical sections with headers if needed
2893. Maintains logical flow and prioritizes important information over minor details
2904. Avoids repetition and unnecessary detail
292Use IEEE style citations [1], [2], etc. Never make up your own citations.
293"""
295 logger.info(
296 f"Synthesizing final answer. Query: '{query}'. Knowledge length: {len(current_knowledge)}. Prompt length: {len(prompt)}"
297 )
298 # Log first 500 chars of prompt for debugging context length issues
299 logger.debug(
300 f"Synthesis prompt (first 500 chars): {prompt[:500]}..."
301 )
303 try:
304 # Add timeout handling
305 import platform
306 import signal
307 import threading
308 from contextlib import contextmanager
310 # Check if we're on Windows
311 if platform.system() == "Windows": 311 ↛ 313line 311 didn't jump to line 313 because the condition on line 311 was never true
313 def timeout_handler(timeout_seconds, callback, args):
314 def handler():
315 callback(*args)
317 timer = threading.Timer(timeout_seconds, handler)
318 timer.daemon = True
319 return timer
321 def invoke_with_timeout(
322 timeout_seconds, func, *args, **kwargs
323 ):
324 """
325 Function for implementing timeouts on Windows
326 """
327 result = None
328 exception = None
329 completed = False
331 def target():
332 nonlocal result, exception, completed
333 try:
334 result = func(*args, **kwargs)
335 completed = True
336 except Exception as e:
337 exception = e
339 thread = threading.Thread(target=target)
340 thread.daemon = True
342 try:
343 thread.start()
344 thread.join(timeout_seconds)
345 if not completed and thread.is_alive():
346 raise TimeoutError(
347 f"Operation timed out after {timeout_seconds} seconds"
348 )
349 if exception:
350 raise exception
351 return result
352 finally:
353 # Nothing to clean up
354 pass
356 # Use Windows-compatible timeout
357 try:
358 logger.info(
359 "Using Windows-compatible timeout for LLM invocation"
360 )
361 response = invoke_with_timeout(
362 120, self.model.invoke, prompt
363 )
365 # Handle different response types (string or object with content attribute)
366 if hasattr(response, "content"):
367 synthesized_content = response.content
368 else:
369 # Handle string responses
370 synthesized_content = str(response)
372 logger.info(
373 f"Successfully synthesized final answer for query: '{query}'"
374 )
375 # Return only the synthesized content from the LLM
376 return synthesized_content
377 except TimeoutError as timeout_error:
378 logger.exception(
379 f"LLM invocation timed out during synthesis for query '{query}': {timeout_error}"
380 )
381 # Return more specific error about timeout
382 return "Error: Final answer synthesis failed due to LLM timeout. Please check your LLM service or try with a smaller query scope."
384 else:
385 # Unix-compatible timeout using SIGALRM
386 @contextmanager
387 def timeout(seconds, message="Operation timed out"):
388 def signal_handler(signum, frame):
389 raise TimeoutError(message)
391 signal.signal(signal.SIGALRM, signal_handler)
392 signal.alarm(seconds)
393 try:
394 yield
395 finally:
396 signal.alarm(0)
398 # Try with a timeout (adjust seconds as needed)
399 try:
400 with timeout(
401 120, "LLM invocation timed out after 120 seconds"
402 ):
403 response = self.model.invoke(prompt)
405 # Handle different response types (string or object with content attribute)
406 if hasattr(response, "content"):
407 synthesized_content = response.content
408 else:
409 # Handle string responses
410 synthesized_content = str(response)
412 logger.info(
413 f"Successfully synthesized final answer for query: '{query}'"
414 )
415 # Return only the synthesized content from the LLM
416 return synthesized_content
417 except TimeoutError as timeout_error:
418 logger.exception(
419 f"LLM invocation timed out during synthesis for query '{query}': {timeout_error}"
420 )
421 # Return more specific error about timeout
422 return "Error: Final answer synthesis failed due to LLM timeout. Please check your LLM service or try with a smaller query scope."
424 except Exception as invoke_error:
425 logger.exception(
426 f"LLM invocation failed during synthesis for query '{query}': {invoke_error}"
427 )
429 # Attempt to determine the type of error
430 error_message = str(invoke_error).lower()
431 error_type = "unknown"
433 if "timeout" in error_message or "timed out" in error_message:
434 error_type = "timeout"
435 elif (
436 "too many tokens" in error_message
437 or "context length" in error_message
438 or "token limit" in error_message
439 ):
440 error_type = "token_limit"
441 elif (
442 "rate limit" in error_message
443 or "rate_limit" in error_message
444 ):
445 error_type = "rate_limit"
446 elif (
447 "connection" in error_message or "network" in error_message
448 ):
449 error_type = "connection"
450 elif ( 450 ↛ 454line 450 didn't jump to line 454 because the condition on line 450 was never true
451 "api key" in error_message
452 or "authentication" in error_message
453 ):
454 error_type = "authentication"
456 # Return more detailed error message based on type
457 if error_type == "timeout":
458 return "Error: Failed to synthesize final answer due to LLM timeout. Please check your connection or try again later."
459 elif error_type == "token_limit":
460 return "Error: Failed to synthesize final answer due to token limit exceeded. Try reducing the scope of your query."
461 elif error_type == "rate_limit":
462 return "Error: Failed to synthesize final answer due to LLM rate limit. Please try again in a few minutes."
463 elif error_type == "connection":
464 return "Error: Failed to synthesize final answer due to connection issues. Please check your internet connection and LLM service status."
465 elif error_type == "authentication": 465 ↛ 466line 465 didn't jump to line 466 because the condition on line 465 was never true
466 return "Error: Failed to synthesize final answer due to authentication issues. Please check your API keys."
467 else:
468 # Generic error with details
469 return f"Error: Failed to synthesize final answer. LLM error: {invoke_error!s}"
471 except Exception as e:
472 # Catch potential errors during prompt construction or logging itself
473 logger.exception(
474 f"Error preparing or executing synthesis for query '{query}'"
475 )
476 # Return a specific error message for synthesis failure
477 return f"Error: Failed to synthesize final answer from knowledge. Details: {e!s}"