Coverage for src/local_deep_research/advanced_search_system/findings/repository.py: 94%
186 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Findings repository for managing research findings.
3"""
5from loguru import logger
6from typing import Dict, List, Union
8from langchain_core.documents import Document
9from langchain_core.language_models import BaseLanguageModel
11from ...utilities.json_utils import get_llm_response_text
12from ...utilities.search_utilities import format_findings
13from .base_findings import BaseFindingsRepository
16def format_links(links: List[Dict]) -> str:
17 """Format a list of links into a readable string.
19 Args:
20 links: List of dictionaries containing 'title' and 'url' keys
22 Returns:
23 str: Formatted string of links
24 """
25 return "\n".join(
26 f"{i + 1}. {link['title']}\n URL: {link['url']}"
27 for i, link in enumerate(links)
28 )
31class FindingsRepository(BaseFindingsRepository):
32 """Repository for managing research findings."""
34 def __init__(self, model: BaseLanguageModel):
35 """Initialize the repository.
37 Args:
38 model: The LLM model to use for synthesis
39 """
40 super().__init__(model)
41 self.findings: Dict[str, List[Dict]] = {}
42 self.documents: List[Document] = []
43 self.questions_by_iteration: Dict[int, List[str]] = {}
45 def add_finding(self, query: str, finding: Dict | str) -> None:
46 """Add a finding for a query."""
47 self.findings.setdefault(query, [])
49 # Convert to dictionary if it's a string
50 if isinstance(finding, str):
51 finding_dict = {
52 "phase": "Synthesis",
53 "content": finding,
54 "question": query,
55 "search_results": [],
56 "documents": [],
57 }
58 self.findings[query].append(finding_dict)
59 else:
60 # It's already a dictionary
61 self.findings[query].append(finding)
63 # Store raw synthesized content if it's the final synthesis
64 # Only check for phase if it's a dictionary
65 if (
66 isinstance(finding, dict)
67 and finding.get("phase") == "Final synthesis"
68 ):
69 self.findings[query + "_synthesis"] = [
70 {
71 "phase": "Synthesis",
72 "content": finding.get("content", ""),
73 "question": query,
74 "search_results": [],
75 "documents": [],
76 }
77 ]
79 logger.info(
80 f"Added finding for query: {query}. Total findings: {len(self.findings[query])}"
81 )
83 def get_findings(self, query: str) -> List[Dict]:
84 """Get findings for a query.
86 Args:
87 query: The query to get findings for
89 Returns:
90 List of findings for the query
91 """
92 return self.findings.get(query, [])
94 def clear_findings(self, query: str) -> None:
95 """Clear findings for a query.
97 Args:
98 query: The query to clear findings for
99 """
100 if query in self.findings:
101 del self.findings[query]
102 logger.info(f"Cleared findings for query: {query}")
104 def add_documents(self, documents: List[Document]) -> None:
105 """Add documents to the repository.
107 Args:
108 documents: List of documents to add
109 """
110 self.documents.extend(documents)
111 logger.info(f"Added {len(documents)} documents to repository")
113 def set_questions_by_iteration(
114 self, questions_by_iteration: Dict[int, List[str]]
115 ) -> None:
116 """Set the questions by iteration.
118 Args:
119 questions_by_iteration: Dictionary mapping iteration numbers to lists of questions
120 """
121 self.questions_by_iteration = questions_by_iteration.copy()
122 logger.info(
123 f"Set questions for {len(questions_by_iteration)} iterations"
124 )
126 def format_findings_to_text(
127 self, findings_list: List[Dict], synthesized_content: str
128 ) -> str:
129 """Format findings into a detailed text output using the utility function.
131 Args:
132 findings_list: List of finding dictionaries from the strategy execution.
133 synthesized_content: The final synthesized content generated by the LLM.
135 Returns:
136 str: Formatted text output.
137 """
138 logger.info(
139 f"Formatting final report. Number of detailed findings: {len(findings_list)}. Synthesized content length: {len(synthesized_content)}. Number of question iterations: {len(self.questions_by_iteration)}"
140 )
141 # Log details about the inputs
142 logger.debug(
143 f"Detailed findings list structure (first item type if exists): {type(findings_list[0]) if findings_list else 'Empty'}"
144 )
145 logger.debug(
146 f"Questions by iteration keys: {list(self.questions_by_iteration.keys())}"
147 )
148 if findings_list:
149 logger.debug(
150 f"First finding item keys: {list(findings_list[0].keys())}"
151 )
153 try:
154 # Pass the detailed findings list, the synthesized content (as current_knowledge), and the stored questions
155 formatted_report = format_findings(
156 findings_list,
157 synthesized_content, # This goes to the 'current_knowledge' param in format_findings
158 self.questions_by_iteration,
159 )
160 logger.info("Successfully formatted final report.")
161 return formatted_report
162 except Exception:
163 logger.exception("Error occurred during final report formatting")
164 # Fallback: return just the synthesized content if formatting fails
165 return f"Error during final formatting. Raw Synthesized Content:\n\n{synthesized_content}"
167 def synthesize_findings(
168 self,
169 query: str,
170 sub_queries: List[str],
171 findings: List[Union[Dict, str]],
172 accumulated_knowledge: str = None,
173 old_formatting: bool = False,
174 ) -> str:
175 """
176 Synthesize accumulated knowledge into a final answer.
178 Args:
179 query: The original query
180 sub_queries: List of sub-queries (for context)
181 findings: List of findings strings or dictionaries from previous steps
182 accumulated_knowledge: Optional pre-existing knowledge to incorporate
183 old_formatting: Whether to use the old formatting approach
185 Returns:
186 str: Synthesized final answer content.
187 """
188 logger.info(f"synthesize_findings called with query: '{query}'")
189 logger.info(
190 f"sub_queries type: {type(sub_queries)}, length: {len(sub_queries)}"
191 )
192 logger.info(f"findings type: {type(findings)}, length: {len(findings)}")
194 # Use provided accumulated_knowledge or join findings if it's None
195 if accumulated_knowledge is None:
196 # Convert findings to text if they are dictionaries
197 finding_texts = []
198 for item in findings:
199 if isinstance(item, dict) and "content" in item:
200 finding_texts.append(item["content"])
201 elif isinstance(item, str):
202 finding_texts.append(item)
203 accumulated_knowledge = "\n\n".join(finding_texts)
205 if findings:
206 logger.info(f"first finding type: {type(findings[0])}")
207 if isinstance(findings[0], dict):
208 logger.info(
209 f"first finding keys: {list(findings[0].keys()) if hasattr(findings[0], 'keys') else 'No keys'}"
210 )
211 if "content" in findings[0]:
212 logger.info(
213 f"first finding content type: {type(findings[0]['content'])}"
214 )
215 elif isinstance(findings[0], str): 215 ↛ 221line 215 didn't jump to line 221 because the condition on line 215 was always true
216 logger.info(f"first finding string length: {len(findings[0])}")
217 logger.info(
218 f"first finding string preview: {findings[0][:100]}..."
219 )
221 if old_formatting:
222 # Convert findings list if it contains strings instead of dictionaries
223 findings_list = []
224 for i, item in enumerate(findings):
225 if isinstance(item, str):
226 findings_list.append(
227 {"phase": f"Finding {i + 1}", "content": item}
228 )
229 elif isinstance(item, dict): 229 ↛ 224line 229 didn't jump to line 224 because the condition on line 229 was always true
230 findings_list.append(item)
232 return format_findings(
233 findings_list=findings_list,
234 synthesized_content=accumulated_knowledge,
235 questions_by_iteration=self.questions_by_iteration,
236 )
237 try:
238 # Extract finding content texts for the prompt
239 finding_texts = []
240 for item in findings:
241 if isinstance(item, dict) and "content" in item:
242 finding_texts.append(item["content"])
243 elif isinstance(item, str):
244 finding_texts.append(item)
246 # Use finding_texts for the prompt
247 current_knowledge = (
248 "\n\n".join(finding_texts) if finding_texts else ""
249 )
251 # Check if knowledge exceeds a reasonable token limit (rough estimate based on characters)
252 # 1 token ≈ 4 characters in English
253 estimated_tokens = len(current_knowledge) / 4
254 max_safe_tokens = (
255 12000 # Adjust based on your model's context window
256 )
258 if estimated_tokens > max_safe_tokens:
259 logger.warning(
260 f"Knowledge size may exceed model's capacity: ~{int(estimated_tokens)} tokens"
261 )
262 # Truncate if needed (keeping the beginning and end which are often most important)
263 # This is a simple approach - a more sophisticated chunking might be better
264 if len(current_knowledge) > 24000: # ~6000 tokens 264 ↛ 276line 264 didn't jump to line 276 because the condition on line 264 was always true
265 first_part = current_knowledge[
266 :12000
267 ] # ~3000 tokens from start
268 last_part = current_knowledge[
269 -12000:
270 ] # ~3000 tokens from end
271 current_knowledge = f"{first_part}\n\n[...content truncated due to length...]\n\n{last_part}"
272 logger.info(
273 "Knowledge truncated to fit within token limits"
274 )
276 prompt = f"""Use IEEE style citations [1], [2], etc. Never make up your own citations. Synthesize the following accumulated knowledge into a comprehensive answer for the original query.
277Format the response with clear sections, citations, and a concise summary.
279Original Query: {query}
281Accumulated Knowledge:
282{current_knowledge}
284Sub-questions asked (for context):
285{chr(10).join(f"- {sq}" for sq in sub_queries)}
287Generate a well-structured, concise answer that:
2881. Starts with a clear explanation of the most important points
2892. Organizes information into logical sections with headers if needed
2903. Maintains logical flow and prioritizes important information over minor details
2914. Avoids repetition and unnecessary detail
293Use IEEE style citations [1], [2], etc. Never make up your own citations.
294"""
296 logger.info(
297 f"Synthesizing final answer. Query: '{query}'. Knowledge length: {len(current_knowledge)}. Prompt length: {len(prompt)}"
298 )
299 # Log first 500 chars of prompt for debugging context length issues
300 logger.debug(
301 f"Synthesis prompt (first 500 chars): {prompt[:500]}..."
302 )
304 try:
305 # Add timeout handling
306 import platform
307 import signal
308 import threading
309 from contextlib import contextmanager
311 # Check if we're on Windows
312 if platform.system() == "Windows":
314 def timeout_handler(timeout_seconds, callback, args):
315 def handler():
316 callback(*args)
318 timer = threading.Timer(timeout_seconds, handler)
319 timer.daemon = True
320 return timer
322 def invoke_with_timeout(
323 timeout_seconds, func, *args, **kwargs
324 ):
325 """
326 Function for implementing timeouts on Windows
327 """
328 result = None
329 exception = None
330 completed = False
332 def target():
333 nonlocal result, exception, completed
334 try:
335 result = func(*args, **kwargs)
336 completed = True
337 except Exception as e:
338 exception = e
340 thread = threading.Thread(target=target)
341 thread.daemon = True
343 try:
344 thread.start()
345 thread.join(timeout_seconds)
346 if not completed and thread.is_alive():
347 raise TimeoutError(
348 f"Operation timed out after {timeout_seconds} seconds"
349 )
350 if exception: 350 ↛ 351line 350 didn't jump to line 351 because the condition on line 350 was never true
351 raise exception
352 return result
353 finally:
354 # Nothing to clean up
355 pass
357 # Use Windows-compatible timeout
358 try:
359 logger.info(
360 "Using Windows-compatible timeout for LLM invocation"
361 )
362 response = invoke_with_timeout(
363 120, self.model.invoke, prompt
364 )
366 # Normalize response (handles str/.content, strips <think> tags)
367 synthesized_content = get_llm_response_text(response)
369 logger.info(
370 f"Successfully synthesized final answer for query: '{query}'"
371 )
372 # Return only the synthesized content from the LLM
373 return synthesized_content
374 except TimeoutError as timeout_error:
375 logger.exception(
376 f"LLM invocation timed out during synthesis for query '{query}': {timeout_error}"
377 )
378 # Return more specific error about timeout
379 return "Error: Final answer synthesis failed due to LLM timeout. Please check your LLM service or try with a smaller query scope."
381 else:
382 # Unix-compatible timeout using SIGALRM
383 @contextmanager
384 def timeout(seconds, message="Operation timed out"):
385 def signal_handler(signum, frame):
386 raise TimeoutError(message) # noqa: TRY301 — signal handler, not caught by enclosing try
388 signal.signal(signal.SIGALRM, signal_handler)
389 signal.alarm(seconds)
390 try:
391 yield
392 finally:
393 signal.alarm(0)
395 # Try with a timeout (adjust seconds as needed)
396 try:
397 with timeout(
398 120, "LLM invocation timed out after 120 seconds"
399 ):
400 response = self.model.invoke(prompt)
402 # Normalize response (handles str/.content, strips <think> tags)
403 synthesized_content = get_llm_response_text(
404 response
405 )
407 logger.info(
408 f"Successfully synthesized final answer for query: '{query}'"
409 )
410 # Return only the synthesized content from the LLM
411 return synthesized_content
412 except TimeoutError as timeout_error:
413 logger.exception(
414 f"LLM invocation timed out during synthesis for query '{query}': {timeout_error}"
415 )
416 # Return more specific error about timeout
417 return "Error: Final answer synthesis failed due to LLM timeout. Please check your LLM service or try with a smaller query scope."
419 except Exception as invoke_error:
420 logger.exception(
421 f"LLM invocation failed during synthesis for query '{query}': {invoke_error}"
422 )
424 # Attempt to determine the type of error
425 error_message = str(invoke_error).lower()
426 error_type = "unknown"
428 if "timeout" in error_message or "timed out" in error_message:
429 error_type = "timeout"
430 elif (
431 "too many tokens" in error_message
432 or "context length" in error_message
433 or "token limit" in error_message
434 ):
435 error_type = "token_limit"
436 elif (
437 "rate limit" in error_message
438 or "rate_limit" in error_message
439 ):
440 error_type = "rate_limit"
441 elif (
442 "connection" in error_message or "network" in error_message
443 ):
444 error_type = "connection"
445 elif (
446 "api key" in error_message
447 or "authentication" in error_message
448 ):
449 error_type = "authentication"
451 # Return more detailed error message based on type
452 if error_type == "timeout":
453 return "Error: Failed to synthesize final answer due to LLM timeout. Please check your connection or try again later."
454 if error_type == "token_limit":
455 return "Error: Failed to synthesize final answer due to token limit exceeded. Try reducing the scope of your query."
456 if error_type == "rate_limit":
457 return "Error: Failed to synthesize final answer due to LLM rate limit. Please try again in a few minutes."
458 if error_type == "connection":
459 return "Error: Failed to synthesize final answer due to connection issues. Please check your internet connection and LLM service status."
460 if error_type == "authentication":
461 return "Error: Failed to synthesize final answer due to authentication issues. Please check your API keys."
462 # Generic error with details
463 return f"Error: Failed to synthesize final answer. LLM error: {invoke_error!s}"
465 except Exception as e:
466 # Catch potential errors during prompt construction or logging itself
467 logger.exception(
468 f"Error preparing or executing synthesis for query '{query}'"
469 )
470 # Return a specific error message for synthesis failure
471 return f"Error: Failed to synthesize final answer from knowledge. Details: {e!s}"