Coverage for src / local_deep_research / utilities / search_utilities.py: 98%
136 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1import re
2from typing import Dict, List
4from loguru import logger
7LANGUAGE_CODE_MAP = {
8 "english": "en",
9 "french": "fr",
10 "german": "de",
11 "spanish": "es",
12 "italian": "it",
13 "japanese": "ja",
14 "chinese": "zh",
15 "hindi": "hi",
16 "arabic": "ar",
17 "bengali": "bn",
18 "portuguese": "pt",
19 "russian": "ru",
20 "korean": "ko",
21}
24def remove_think_tags(text: str) -> str:
25 # Remove paired <think>...</think> tags
26 text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL)
27 # Remove any orphaned opening or closing think tags
28 text = re.sub(r"</think>", "", text)
29 text = re.sub(r"<think>", "", text)
30 return text.strip()
33def extract_links_from_search_results(search_results: List[Dict]) -> List[Dict]:
34 """
35 Extracts links and titles from a list of search result dictionaries.
37 Each dictionary is expected to have at least the keys "title" and "link".
39 Returns a list of dictionaries with 'title' and 'url' keys.
40 """
41 links = []
42 if not search_results:
43 return links
45 for result in search_results:
46 try:
47 # Ensure we handle None values safely before calling strip()
48 title = result.get("title", "")
49 url = result.get("link", "")
50 index = result.get("index", "")
52 # Apply strip() only if the values are not None
53 title = title.strip() if title is not None else ""
54 url = url.strip() if url is not None else ""
55 index = index.strip() if index is not None else ""
57 if title and url:
58 links.append({"title": title, "url": url, "index": index})
59 except Exception:
60 # Log the specific error for debugging
61 logger.exception("Error extracting link from result")
62 continue
63 return links
66def format_links_to_markdown(all_links: List[Dict]) -> str:
67 parts: list[str] = []
68 logger.info(f"Formatting {len(all_links)} links to markdown...")
70 if all_links:
71 # Group links by URL and collect all their indices
72 url_to_indices: dict[str, list] = {}
73 for link in all_links:
74 url = link.get("url")
75 if url is None:
76 url = link.get("link")
77 index = link.get("index", "")
78 if url:
79 if url not in url_to_indices:
80 url_to_indices[url] = []
81 url_to_indices[url].append(index)
83 # Format each unique URL with all its indices
84 seen_urls: set[str] = set()
85 for link in all_links:
86 url = link.get("url")
87 if url is None:
88 url = link.get("link")
89 title = link.get("title", "Untitled")
90 if url and url not in seen_urls:
91 # Get all indices for this URL
92 indices = sorted(
93 set(url_to_indices[url])
94 ) # Sort for consistent ordering
95 # Format as [1, 3, 5] if multiple indices, or just [1] if single
96 indices_str = f"[{', '.join(map(str, indices))}]"
97 # Add (source nr) as a visible fallback for humans
98 parts.append(
99 f"{indices_str} {title} (source nr: {', '.join(map(str, indices))})\n URL: {url}\n\n"
100 )
101 seen_urls.add(url)
103 parts.append("\n")
105 return "".join(parts)
108def format_findings(
109 findings_list: List[Dict],
110 synthesized_content: str,
111 questions_by_iteration: Dict[int, List[str]],
112) -> str:
113 """Format findings into a detailed text output.
115 Args:
116 findings_list: List of finding dictionaries
117 synthesized_content: The synthesized content from the LLM.
118 questions_by_iteration: Dictionary mapping iteration numbers to lists of questions
120 Returns:
121 str: Formatted text output
122 """
123 logger.info(
124 f"Inside format_findings utility. Findings count: {len(findings_list)}, Questions iterations: {len(questions_by_iteration)}"
125 )
126 parts: list[str] = []
128 # Extract all sources from findings
129 all_links = []
130 for finding in findings_list:
131 search_results = finding.get("search_results", [])
132 if search_results:
133 try:
134 links = extract_links_from_search_results(search_results)
135 all_links.extend(links)
136 except Exception:
137 logger.exception("Error processing search results/links")
139 # Start with the synthesized content (passed as synthesized_content)
140 parts.append(f"{synthesized_content}\n\n")
142 # Add sources section after synthesized content if sources exist
143 parts.append(format_links_to_markdown(all_links))
145 parts.append("\n\n") # Separator after synthesized content
147 # Add Search Questions by Iteration section
148 if questions_by_iteration:
149 parts.append("## SEARCH QUESTIONS BY ITERATION\n")
150 parts.append("\n")
151 for iter_num, questions in questions_by_iteration.items():
152 parts.append(f"\n #### Iteration {iter_num}:\n")
153 for i, q in enumerate(questions, 1):
154 parts.append(f"{i}. {q}\n")
155 parts.append("\n\n\n")
156 else:
157 logger.warning("No questions by iteration found to format.")
159 # Add Detailed Findings section
160 if findings_list:
161 parts.append("## DETAILED FINDINGS\n\n")
162 logger.info(f"Formatting {len(findings_list)} detailed finding items.")
164 for idx, finding in enumerate(findings_list):
165 logger.debug(
166 f"Formatting finding item {idx}. Keys: {list(finding.keys())}"
167 )
168 # Use .get() for safety
169 phase = finding.get("phase", "Unknown Phase")
170 content = finding.get("content", "No content available.")
171 search_results = finding.get("search_results", [])
173 # Phase header
174 parts.append(f"\n### {phase}\n\n\n")
176 question_displayed = False
177 # If this is a follow-up phase, try to show the corresponding question
178 if isinstance(phase, str) and phase.startswith("Follow-up"):
179 try:
180 phase_parts = phase.replace(
181 "Follow-up Iteration ", ""
182 ).split(".")
183 if len(phase_parts) == 2:
184 iteration = int(phase_parts[0])
185 question_index = int(phase_parts[1]) - 1
186 if (
187 iteration in questions_by_iteration
188 and 0
189 <= question_index
190 < len(questions_by_iteration[iteration])
191 ):
192 parts.append(
193 f"#### {questions_by_iteration[iteration][question_index]}\n\n"
194 )
195 question_displayed = True
196 else:
197 logger.warning(
198 f"Could not find matching question for phase: {phase}"
199 )
200 else:
201 logger.warning(
202 f"Could not parse iteration/index from phase: {phase}"
203 )
204 except ValueError:
205 logger.warning(
206 f"Could not parse iteration/index from phase: {phase}"
207 )
208 # Handle Sub-query phases from IterDRAG strategy
209 elif isinstance(phase, str) and phase.startswith("Sub-query"):
210 try:
211 # Extract the index number from "Sub-query X"
212 query_index = int(phase.replace("Sub-query ", "")) - 1
213 # In IterDRAG, sub-queries are stored in iteration 0
214 if 0 in questions_by_iteration and query_index < len(
215 questions_by_iteration[0]
216 ):
217 parts.append(
218 f"#### {questions_by_iteration[0][query_index]}\n\n"
219 )
220 question_displayed = True
221 else:
222 logger.warning(
223 f"Could not find matching question for phase: {phase}"
224 )
225 except ValueError:
226 logger.warning(
227 f"Could not parse question index from phase: {phase}"
228 )
230 # If the question is in the finding itself, display it
231 if (
232 not question_displayed
233 and "question" in finding
234 and finding["question"]
235 ):
236 parts.append(f"### SEARCH QUESTION:\n{finding['question']}\n\n")
238 # Content
239 parts.append(f"\n\n{content}\n\n")
241 # Search results if they exist
242 if search_results:
243 try:
244 links = extract_links_from_search_results(search_results)
245 if links:
246 parts.append("### SOURCES USED IN THIS SECTION:\n")
247 parts.append(format_links_to_markdown(links) + "\n\n")
248 except Exception:
249 logger.exception(
250 f"Error processing search results/links for finding {idx}"
251 )
252 else:
253 logger.debug(f"No search_results found for finding item {idx}.")
255 parts.append(f"{'_' * 80}\n\n")
256 else:
257 logger.warning("No detailed findings found to format.")
259 # Add summary of all sources at the end
260 if all_links:
261 parts.append("## ALL SOURCES:\n")
262 parts.append(format_links_to_markdown(all_links))
263 else:
264 logger.info("No unique sources found across all findings to list.")
266 logger.info("Finished format_findings utility.")
267 return "".join(parts)
270def print_search_results(search_results):
271 formatted_text = ""
272 links = extract_links_from_search_results(search_results)
273 if links:
274 formatted_text = format_links_to_markdown(links)
275 logger.info(formatted_text)