Coverage for src / local_deep_research / utilities / search_utilities.py: 94%
138 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1import re
2from typing import Dict, List
4from loguru import logger
7LANGUAGE_CODE_MAP = {
8 "english": "en",
9 "french": "fr",
10 "german": "de",
11 "spanish": "es",
12 "italian": "it",
13 "japanese": "ja",
14 "chinese": "zh",
15}
18def remove_think_tags(text: str) -> str:
19 # Remove paired <think>...</think> tags
20 text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL)
21 # Remove any orphaned opening or closing think tags
22 text = re.sub(r"</think>", "", text)
23 text = re.sub(r"<think>", "", text)
24 return text.strip()
27def extract_links_from_search_results(search_results: List[Dict]) -> List[Dict]:
28 """
29 Extracts links and titles from a list of search result dictionaries.
31 Each dictionary is expected to have at least the keys "title" and "link".
33 Returns a list of dictionaries with 'title' and 'url' keys.
34 """
35 links = []
36 if not search_results:
37 return links
39 for result in search_results:
40 try:
41 # Ensure we handle None values safely before calling strip()
42 title = result.get("title", "")
43 url = result.get("link", "")
44 index = result.get("index", "")
46 # Apply strip() only if the values are not None
47 title = title.strip() if title is not None else ""
48 url = url.strip() if url is not None else ""
49 index = index.strip() if index is not None else ""
51 if title and url:
52 links.append({"title": title, "url": url, "index": index})
53 except Exception:
54 # Log the specific error for debugging
55 logger.exception("Error extracting link from result")
56 continue
57 return links
60def format_links_to_markdown(all_links: List[Dict]) -> str:
61 formatted_text = ""
62 logger.info(f"Formatting {len(all_links)} links to markdown...")
64 if all_links:
65 # Group links by URL and collect all their indices
66 url_to_indices = {}
67 for link in all_links:
68 url = link.get("url")
69 if url is None:
70 url = link.get("link")
71 index = link.get("index", "")
72 # logger.info(f"URL \n {str(url)} ")
73 if url: 73 ↛ 67line 73 didn't jump to line 67 because the condition on line 73 was always true
74 if url not in url_to_indices:
75 url_to_indices[url] = []
76 url_to_indices[url].append(index)
78 # Format each unique URL with all its indices
79 seen_urls = set() # Initialize the set here
80 for link in all_links:
81 url = link.get("url")
82 if url is None:
83 url = link.get("link")
84 title = link.get("title", "Untitled")
85 if url and url not in seen_urls:
86 # Get all indices for this URL
87 indices = sorted(
88 set(url_to_indices[url])
89 ) # Sort for consistent ordering
90 # Format as [1, 3, 5] if multiple indices, or just [1] if single
91 indices_str = f"[{', '.join(map(str, indices))}]"
92 # Add (source nr) as a visible fallback for humans
93 formatted_text += f"{indices_str} {title} (source nr: {', '.join(map(str, indices))})\n URL: {url}\n\n"
94 seen_urls.add(url)
96 formatted_text += "\n"
98 return formatted_text
101def format_findings(
102 findings_list: List[Dict],
103 synthesized_content: str,
104 questions_by_iteration: Dict[int, List[str]],
105) -> str:
106 """Format findings into a detailed text output.
108 Args:
109 findings_list: List of finding dictionaries
110 synthesized_content: The synthesized content from the LLM.
111 questions_by_iteration: Dictionary mapping iteration numbers to lists of questions
113 Returns:
114 str: Formatted text output
115 """
116 logger.info(
117 f"Inside format_findings utility. Findings count: {len(findings_list)}, Questions iterations: {len(questions_by_iteration)}"
118 )
119 formatted_text = ""
121 # Extract all sources from findings
122 all_links = []
123 for finding in findings_list:
124 search_results = finding.get("search_results", [])
125 if search_results:
126 try:
127 links = extract_links_from_search_results(search_results)
128 all_links.extend(links)
129 except Exception:
130 logger.exception("Error processing search results/links")
132 # Start with the synthesized content (passed as synthesized_content)
133 formatted_text += f"{synthesized_content}\n\n"
135 # Add sources section after synthesized content if sources exist
136 formatted_text += format_links_to_markdown(all_links)
138 formatted_text += "\n\n" # Separator after synthesized content
140 # Add Search Questions by Iteration section
141 if questions_by_iteration:
142 formatted_text += "## SEARCH QUESTIONS BY ITERATION\n"
143 formatted_text += "\n"
144 for iter_num, questions in questions_by_iteration.items():
145 formatted_text += f"\n #### Iteration {iter_num}:\n"
146 for i, q in enumerate(questions, 1):
147 formatted_text += f"{i}. {q}\n"
148 formatted_text += "\n" + "\n\n"
149 else:
150 logger.warning("No questions by iteration found to format.")
152 # Add Detailed Findings section
153 if findings_list:
154 formatted_text += "## DETAILED FINDINGS\n\n"
155 logger.info(f"Formatting {len(findings_list)} detailed finding items.")
157 for idx, finding in enumerate(findings_list):
158 logger.debug(
159 f"Formatting finding item {idx}. Keys: {list(finding.keys())}"
160 )
161 # Use .get() for safety
162 phase = finding.get("phase", "Unknown Phase")
163 content = finding.get("content", "No content available.")
164 search_results = finding.get("search_results", [])
166 # Phase header
167 formatted_text += "\n"
168 formatted_text += f"### {phase}\n"
169 formatted_text += "\n\n"
171 question_displayed = False
172 # If this is a follow-up phase, try to show the corresponding question
173 if isinstance(phase, str) and phase.startswith("Follow-up"):
174 try:
175 parts = phase.replace("Follow-up Iteration ", "").split(".")
176 if len(parts) == 2: 176 ↛ 192line 176 didn't jump to line 192 because the condition on line 176 was always true
177 iteration = int(parts[0])
178 question_index = int(parts[1]) - 1
179 if (
180 iteration in questions_by_iteration
181 and 0
182 <= question_index
183 < len(questions_by_iteration[iteration])
184 ):
185 formatted_text += f"#### {questions_by_iteration[iteration][question_index]}\n\n"
186 question_displayed = True
187 else:
188 logger.warning(
189 f"Could not find matching question for phase: {phase}"
190 )
191 else:
192 logger.warning(
193 f"Could not parse iteration/index from phase: {phase}"
194 )
195 except ValueError:
196 logger.warning(
197 f"Could not parse iteration/index from phase: {phase}"
198 )
199 # Handle Sub-query phases from IterDRAG strategy
200 elif isinstance(phase, str) and phase.startswith("Sub-query"):
201 try:
202 # Extract the index number from "Sub-query X"
203 query_index = int(phase.replace("Sub-query ", "")) - 1
204 # In IterDRAG, sub-queries are stored in iteration 0
205 if 0 in questions_by_iteration and query_index < len( 205 ↛ 213line 205 didn't jump to line 213 because the condition on line 205 was always true
206 questions_by_iteration[0]
207 ):
208 formatted_text += (
209 f"#### {questions_by_iteration[0][query_index]}\n\n"
210 )
211 question_displayed = True
212 else:
213 logger.warning(
214 f"Could not find matching question for phase: {phase}"
215 )
216 except ValueError:
217 logger.warning(
218 f"Could not parse question index from phase: {phase}"
219 )
221 # If the question is in the finding itself, display it
222 if (
223 not question_displayed
224 and "question" in finding
225 and finding["question"]
226 ):
227 formatted_text += (
228 f"### SEARCH QUESTION:\n{finding['question']}\n\n"
229 )
231 # Content
232 formatted_text += f"\n\n{content}\n\n"
234 # Search results if they exist
235 if search_results:
236 try:
237 links = extract_links_from_search_results(search_results)
238 if links: 238 ↛ 250line 238 didn't jump to line 250 because the condition on line 238 was always true
239 formatted_text += "### SOURCES USED IN THIS SECTION:\n"
240 formatted_text += (
241 format_links_to_markdown(links) + "\n\n"
242 )
243 except Exception:
244 logger.exception(
245 f"Error processing search results/links for finding {idx}"
246 )
247 else:
248 logger.debug(f"No search_results found for finding item {idx}.")
250 formatted_text += f"{'_' * 80}\n\n"
251 else:
252 logger.warning("No detailed findings found to format.")
254 # Add summary of all sources at the end
255 if all_links:
256 formatted_text += "## ALL SOURCES:\n"
257 formatted_text += format_links_to_markdown(all_links)
258 else:
259 logger.info("No unique sources found across all findings to list.")
261 logger.info("Finished format_findings utility.")
262 return formatted_text
265def print_search_results(search_results):
266 formatted_text = ""
267 links = extract_links_from_search_results(search_results)
268 if links:
269 formatted_text = format_links_to_markdown(links)
270 logger.info(formatted_text)