Coverage for src / local_deep_research / utilities / search_utilities.py: 98%
138 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1import re
2from typing import Dict, List
4from loguru import logger
7LANGUAGE_CODE_MAP = {
8 "english": "en",
9 "french": "fr",
10 "german": "de",
11 "spanish": "es",
12 "italian": "it",
13 "japanese": "ja",
14 "chinese": "zh",
15 "hindi": "hi",
16 "arabic": "ar",
17 "bengali": "bn",
18 "portuguese": "pt",
19 "russian": "ru",
20 "korean": "ko",
21}
24def remove_think_tags(text: str) -> str:
25 # Remove paired <think>...</think> tags
26 text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL)
27 # Remove any orphaned opening or closing think tags
28 text = re.sub(r"</think>", "", text)
29 text = re.sub(r"<think>", "", text)
30 return text.strip()
33def extract_links_from_search_results(search_results: List[Dict]) -> List[Dict]:
34 """
35 Extracts links and titles from a list of search result dictionaries.
37 Each dictionary is expected to have at least the keys "title" and "link".
39 Returns a list of dictionaries with 'title' and 'url' keys.
40 """
41 links = []
42 if not search_results:
43 return links
45 for result in search_results:
46 try:
47 # Ensure we handle None values safely before calling strip()
48 title = result.get("title", "")
49 url = result.get("link", "")
50 index = result.get("index", "")
52 # Apply strip() only if the values are not None
53 title = title.strip() if title is not None else ""
54 url = url.strip() if url is not None else ""
55 index = index.strip() if index is not None else ""
57 if title and url:
58 links.append({"title": title, "url": url, "index": index})
59 except Exception:
60 # Log the specific error for debugging
61 logger.exception("Error extracting link from result")
62 continue
63 return links
66def format_links_to_markdown(all_links: List[Dict]) -> str:
67 formatted_text = ""
68 logger.info(f"Formatting {len(all_links)} links to markdown...")
70 if all_links:
71 # Group links by URL and collect all their indices
72 url_to_indices = {}
73 for link in all_links:
74 url = link.get("url")
75 if url is None:
76 url = link.get("link")
77 index = link.get("index", "")
78 # logger.info(f"URL \n {str(url)} ")
79 if url:
80 if url not in url_to_indices:
81 url_to_indices[url] = []
82 url_to_indices[url].append(index)
84 # Format each unique URL with all its indices
85 seen_urls = set() # Initialize the set here
86 for link in all_links:
87 url = link.get("url")
88 if url is None:
89 url = link.get("link")
90 title = link.get("title", "Untitled")
91 if url and url not in seen_urls:
92 # Get all indices for this URL
93 indices = sorted(
94 set(url_to_indices[url])
95 ) # Sort for consistent ordering
96 # Format as [1, 3, 5] if multiple indices, or just [1] if single
97 indices_str = f"[{', '.join(map(str, indices))}]"
98 # Add (source nr) as a visible fallback for humans
99 formatted_text += f"{indices_str} {title} (source nr: {', '.join(map(str, indices))})\n URL: {url}\n\n"
100 seen_urls.add(url)
102 formatted_text += "\n"
104 return formatted_text
107def format_findings(
108 findings_list: List[Dict],
109 synthesized_content: str,
110 questions_by_iteration: Dict[int, List[str]],
111) -> str:
112 """Format findings into a detailed text output.
114 Args:
115 findings_list: List of finding dictionaries
116 synthesized_content: The synthesized content from the LLM.
117 questions_by_iteration: Dictionary mapping iteration numbers to lists of questions
119 Returns:
120 str: Formatted text output
121 """
122 logger.info(
123 f"Inside format_findings utility. Findings count: {len(findings_list)}, Questions iterations: {len(questions_by_iteration)}"
124 )
125 formatted_text = ""
127 # Extract all sources from findings
128 all_links = []
129 for finding in findings_list:
130 search_results = finding.get("search_results", [])
131 if search_results:
132 try:
133 links = extract_links_from_search_results(search_results)
134 all_links.extend(links)
135 except Exception:
136 logger.exception("Error processing search results/links")
138 # Start with the synthesized content (passed as synthesized_content)
139 formatted_text += f"{synthesized_content}\n\n"
141 # Add sources section after synthesized content if sources exist
142 formatted_text += format_links_to_markdown(all_links)
144 formatted_text += "\n\n" # Separator after synthesized content
146 # Add Search Questions by Iteration section
147 if questions_by_iteration:
148 formatted_text += "## SEARCH QUESTIONS BY ITERATION\n"
149 formatted_text += "\n"
150 for iter_num, questions in questions_by_iteration.items():
151 formatted_text += f"\n #### Iteration {iter_num}:\n"
152 for i, q in enumerate(questions, 1):
153 formatted_text += f"{i}. {q}\n"
154 formatted_text += "\n" + "\n\n"
155 else:
156 logger.warning("No questions by iteration found to format.")
158 # Add Detailed Findings section
159 if findings_list:
160 formatted_text += "## DETAILED FINDINGS\n\n"
161 logger.info(f"Formatting {len(findings_list)} detailed finding items.")
163 for idx, finding in enumerate(findings_list):
164 logger.debug(
165 f"Formatting finding item {idx}. Keys: {list(finding.keys())}"
166 )
167 # Use .get() for safety
168 phase = finding.get("phase", "Unknown Phase")
169 content = finding.get("content", "No content available.")
170 search_results = finding.get("search_results", [])
172 # Phase header
173 formatted_text += "\n"
174 formatted_text += f"### {phase}\n"
175 formatted_text += "\n\n"
177 question_displayed = False
178 # If this is a follow-up phase, try to show the corresponding question
179 if isinstance(phase, str) and phase.startswith("Follow-up"):
180 try:
181 parts = phase.replace("Follow-up Iteration ", "").split(".")
182 if len(parts) == 2:
183 iteration = int(parts[0])
184 question_index = int(parts[1]) - 1
185 if (
186 iteration in questions_by_iteration
187 and 0
188 <= question_index
189 < len(questions_by_iteration[iteration])
190 ):
191 formatted_text += f"#### {questions_by_iteration[iteration][question_index]}\n\n"
192 question_displayed = True
193 else:
194 logger.warning(
195 f"Could not find matching question for phase: {phase}"
196 )
197 else:
198 logger.warning(
199 f"Could not parse iteration/index from phase: {phase}"
200 )
201 except ValueError:
202 logger.warning(
203 f"Could not parse iteration/index from phase: {phase}"
204 )
205 # Handle Sub-query phases from IterDRAG strategy
206 elif isinstance(phase, str) and phase.startswith("Sub-query"):
207 try:
208 # Extract the index number from "Sub-query X"
209 query_index = int(phase.replace("Sub-query ", "")) - 1
210 # In IterDRAG, sub-queries are stored in iteration 0
211 if 0 in questions_by_iteration and query_index < len(
212 questions_by_iteration[0]
213 ):
214 formatted_text += (
215 f"#### {questions_by_iteration[0][query_index]}\n\n"
216 )
217 question_displayed = True
218 else:
219 logger.warning(
220 f"Could not find matching question for phase: {phase}"
221 )
222 except ValueError:
223 logger.warning(
224 f"Could not parse question index from phase: {phase}"
225 )
227 # If the question is in the finding itself, display it
228 if (
229 not question_displayed
230 and "question" in finding
231 and finding["question"]
232 ):
233 formatted_text += (
234 f"### SEARCH QUESTION:\n{finding['question']}\n\n"
235 )
237 # Content
238 formatted_text += f"\n\n{content}\n\n"
240 # Search results if they exist
241 if search_results:
242 try:
243 links = extract_links_from_search_results(search_results)
244 if links:
245 formatted_text += "### SOURCES USED IN THIS SECTION:\n"
246 formatted_text += (
247 format_links_to_markdown(links) + "\n\n"
248 )
249 except Exception:
250 logger.exception(
251 f"Error processing search results/links for finding {idx}"
252 )
253 else:
254 logger.debug(f"No search_results found for finding item {idx}.")
256 formatted_text += f"{'_' * 80}\n\n"
257 else:
258 logger.warning("No detailed findings found to format.")
260 # Add summary of all sources at the end
261 if all_links:
262 formatted_text += "## ALL SOURCES:\n"
263 formatted_text += format_links_to_markdown(all_links)
264 else:
265 logger.info("No unique sources found across all findings to list.")
267 logger.info("Finished format_findings utility.")
268 return formatted_text
271def print_search_results(search_results):
272 formatted_text = ""
273 links = extract_links_from_search_results(search_results)
274 if links:
275 formatted_text = format_links_to_markdown(links)
276 logger.info(formatted_text)