Coverage for src / local_deep_research / utilities / search_utilities.py: 98%

136 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1import re 

2from typing import Dict, List 

3 

4from loguru import logger 

5 

6 

7LANGUAGE_CODE_MAP = { 

8 "english": "en", 

9 "french": "fr", 

10 "german": "de", 

11 "spanish": "es", 

12 "italian": "it", 

13 "japanese": "ja", 

14 "chinese": "zh", 

15 "hindi": "hi", 

16 "arabic": "ar", 

17 "bengali": "bn", 

18 "portuguese": "pt", 

19 "russian": "ru", 

20 "korean": "ko", 

21} 

22 

23 

24def remove_think_tags(text: str) -> str: 

25 # Remove paired <think>...</think> tags 

26 text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL) 

27 # Remove any orphaned opening or closing think tags 

28 text = re.sub(r"</think>", "", text) 

29 text = re.sub(r"<think>", "", text) 

30 return text.strip() 

31 

32 

33def extract_links_from_search_results(search_results: List[Dict]) -> List[Dict]: 

34 """ 

35 Extracts links and titles from a list of search result dictionaries. 

36 

37 Each dictionary is expected to have at least the keys "title" and "link". 

38 

39 Returns a list of dictionaries with 'title' and 'url' keys. 

40 """ 

41 links = [] 

42 if not search_results: 

43 return links 

44 

45 for result in search_results: 

46 try: 

47 # Ensure we handle None values safely before calling strip() 

48 title = result.get("title", "") 

49 url = result.get("link", "") 

50 index = result.get("index", "") 

51 

52 # Apply strip() only if the values are not None 

53 title = title.strip() if title is not None else "" 

54 url = url.strip() if url is not None else "" 

55 index = index.strip() if index is not None else "" 

56 

57 if title and url: 

58 links.append({"title": title, "url": url, "index": index}) 

59 except Exception: 

60 # Log the specific error for debugging 

61 logger.exception("Error extracting link from result") 

62 continue 

63 return links 

64 

65 

66def format_links_to_markdown(all_links: List[Dict]) -> str: 

67 parts: list[str] = [] 

68 logger.info(f"Formatting {len(all_links)} links to markdown...") 

69 

70 if all_links: 

71 # Group links by URL and collect all their indices 

72 url_to_indices: dict[str, list] = {} 

73 for link in all_links: 

74 url = link.get("url") 

75 if url is None: 

76 url = link.get("link") 

77 index = link.get("index", "") 

78 if url: 

79 if url not in url_to_indices: 

80 url_to_indices[url] = [] 

81 url_to_indices[url].append(index) 

82 

83 # Format each unique URL with all its indices 

84 seen_urls: set[str] = set() 

85 for link in all_links: 

86 url = link.get("url") 

87 if url is None: 

88 url = link.get("link") 

89 title = link.get("title", "Untitled") 

90 if url and url not in seen_urls: 

91 # Get all indices for this URL 

92 indices = sorted( 

93 set(url_to_indices[url]) 

94 ) # Sort for consistent ordering 

95 # Format as [1, 3, 5] if multiple indices, or just [1] if single 

96 indices_str = f"[{', '.join(map(str, indices))}]" 

97 # Add (source nr) as a visible fallback for humans 

98 parts.append( 

99 f"{indices_str} {title} (source nr: {', '.join(map(str, indices))})\n URL: {url}\n\n" 

100 ) 

101 seen_urls.add(url) 

102 

103 parts.append("\n") 

104 

105 return "".join(parts) 

106 

107 

108def format_findings( 

109 findings_list: List[Dict], 

110 synthesized_content: str, 

111 questions_by_iteration: Dict[int, List[str]], 

112) -> str: 

113 """Format findings into a detailed text output. 

114 

115 Args: 

116 findings_list: List of finding dictionaries 

117 synthesized_content: The synthesized content from the LLM. 

118 questions_by_iteration: Dictionary mapping iteration numbers to lists of questions 

119 

120 Returns: 

121 str: Formatted text output 

122 """ 

123 logger.info( 

124 f"Inside format_findings utility. Findings count: {len(findings_list)}, Questions iterations: {len(questions_by_iteration)}" 

125 ) 

126 parts: list[str] = [] 

127 

128 # Extract all sources from findings 

129 all_links = [] 

130 for finding in findings_list: 

131 search_results = finding.get("search_results", []) 

132 if search_results: 

133 try: 

134 links = extract_links_from_search_results(search_results) 

135 all_links.extend(links) 

136 except Exception: 

137 logger.exception("Error processing search results/links") 

138 

139 # Start with the synthesized content (passed as synthesized_content) 

140 parts.append(f"{synthesized_content}\n\n") 

141 

142 # Add sources section after synthesized content if sources exist 

143 parts.append(format_links_to_markdown(all_links)) 

144 

145 parts.append("\n\n") # Separator after synthesized content 

146 

147 # Add Search Questions by Iteration section 

148 if questions_by_iteration: 

149 parts.append("## SEARCH QUESTIONS BY ITERATION\n") 

150 parts.append("\n") 

151 for iter_num, questions in questions_by_iteration.items(): 

152 parts.append(f"\n #### Iteration {iter_num}:\n") 

153 for i, q in enumerate(questions, 1): 

154 parts.append(f"{i}. {q}\n") 

155 parts.append("\n\n\n") 

156 else: 

157 logger.warning("No questions by iteration found to format.") 

158 

159 # Add Detailed Findings section 

160 if findings_list: 

161 parts.append("## DETAILED FINDINGS\n\n") 

162 logger.info(f"Formatting {len(findings_list)} detailed finding items.") 

163 

164 for idx, finding in enumerate(findings_list): 

165 logger.debug( 

166 f"Formatting finding item {idx}. Keys: {list(finding.keys())}" 

167 ) 

168 # Use .get() for safety 

169 phase = finding.get("phase", "Unknown Phase") 

170 content = finding.get("content", "No content available.") 

171 search_results = finding.get("search_results", []) 

172 

173 # Phase header 

174 parts.append(f"\n### {phase}\n\n\n") 

175 

176 question_displayed = False 

177 # If this is a follow-up phase, try to show the corresponding question 

178 if isinstance(phase, str) and phase.startswith("Follow-up"): 

179 try: 

180 phase_parts = phase.replace( 

181 "Follow-up Iteration ", "" 

182 ).split(".") 

183 if len(phase_parts) == 2: 

184 iteration = int(phase_parts[0]) 

185 question_index = int(phase_parts[1]) - 1 

186 if ( 

187 iteration in questions_by_iteration 

188 and 0 

189 <= question_index 

190 < len(questions_by_iteration[iteration]) 

191 ): 

192 parts.append( 

193 f"#### {questions_by_iteration[iteration][question_index]}\n\n" 

194 ) 

195 question_displayed = True 

196 else: 

197 logger.warning( 

198 f"Could not find matching question for phase: {phase}" 

199 ) 

200 else: 

201 logger.warning( 

202 f"Could not parse iteration/index from phase: {phase}" 

203 ) 

204 except ValueError: 

205 logger.warning( 

206 f"Could not parse iteration/index from phase: {phase}" 

207 ) 

208 # Handle Sub-query phases from IterDRAG strategy 

209 elif isinstance(phase, str) and phase.startswith("Sub-query"): 

210 try: 

211 # Extract the index number from "Sub-query X" 

212 query_index = int(phase.replace("Sub-query ", "")) - 1 

213 # In IterDRAG, sub-queries are stored in iteration 0 

214 if 0 in questions_by_iteration and query_index < len( 

215 questions_by_iteration[0] 

216 ): 

217 parts.append( 

218 f"#### {questions_by_iteration[0][query_index]}\n\n" 

219 ) 

220 question_displayed = True 

221 else: 

222 logger.warning( 

223 f"Could not find matching question for phase: {phase}" 

224 ) 

225 except ValueError: 

226 logger.warning( 

227 f"Could not parse question index from phase: {phase}" 

228 ) 

229 

230 # If the question is in the finding itself, display it 

231 if ( 

232 not question_displayed 

233 and "question" in finding 

234 and finding["question"] 

235 ): 

236 parts.append(f"### SEARCH QUESTION:\n{finding['question']}\n\n") 

237 

238 # Content 

239 parts.append(f"\n\n{content}\n\n") 

240 

241 # Search results if they exist 

242 if search_results: 

243 try: 

244 links = extract_links_from_search_results(search_results) 

245 if links: 

246 parts.append("### SOURCES USED IN THIS SECTION:\n") 

247 parts.append(format_links_to_markdown(links) + "\n\n") 

248 except Exception: 

249 logger.exception( 

250 f"Error processing search results/links for finding {idx}" 

251 ) 

252 else: 

253 logger.debug(f"No search_results found for finding item {idx}.") 

254 

255 parts.append(f"{'_' * 80}\n\n") 

256 else: 

257 logger.warning("No detailed findings found to format.") 

258 

259 # Add summary of all sources at the end 

260 if all_links: 

261 parts.append("## ALL SOURCES:\n") 

262 parts.append(format_links_to_markdown(all_links)) 

263 else: 

264 logger.info("No unique sources found across all findings to list.") 

265 

266 logger.info("Finished format_findings utility.") 

267 return "".join(parts) 

268 

269 

270def print_search_results(search_results): 

271 formatted_text = "" 

272 links = extract_links_from_search_results(search_results) 

273 if links: 

274 formatted_text = format_links_to_markdown(links) 

275 logger.info(formatted_text)