Coverage for src / local_deep_research / utilities / search_utilities.py: 98%

138 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1import re 

2from typing import Dict, List 

3 

4from loguru import logger 

5 

6 

7LANGUAGE_CODE_MAP = { 

8 "english": "en", 

9 "french": "fr", 

10 "german": "de", 

11 "spanish": "es", 

12 "italian": "it", 

13 "japanese": "ja", 

14 "chinese": "zh", 

15 "hindi": "hi", 

16 "arabic": "ar", 

17 "bengali": "bn", 

18 "portuguese": "pt", 

19 "russian": "ru", 

20 "korean": "ko", 

21} 

22 

23 

24def remove_think_tags(text: str) -> str: 

25 # Remove paired <think>...</think> tags 

26 text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL) 

27 # Remove any orphaned opening or closing think tags 

28 text = re.sub(r"</think>", "", text) 

29 text = re.sub(r"<think>", "", text) 

30 return text.strip() 

31 

32 

33def extract_links_from_search_results(search_results: List[Dict]) -> List[Dict]: 

34 """ 

35 Extracts links and titles from a list of search result dictionaries. 

36 

37 Each dictionary is expected to have at least the keys "title" and "link". 

38 

39 Returns a list of dictionaries with 'title' and 'url' keys. 

40 """ 

41 links = [] 

42 if not search_results: 

43 return links 

44 

45 for result in search_results: 

46 try: 

47 # Ensure we handle None values safely before calling strip() 

48 title = result.get("title", "") 

49 url = result.get("link", "") 

50 index = result.get("index", "") 

51 

52 # Apply strip() only if the values are not None 

53 title = title.strip() if title is not None else "" 

54 url = url.strip() if url is not None else "" 

55 index = index.strip() if index is not None else "" 

56 

57 if title and url: 

58 links.append({"title": title, "url": url, "index": index}) 

59 except Exception: 

60 # Log the specific error for debugging 

61 logger.exception("Error extracting link from result") 

62 continue 

63 return links 

64 

65 

66def format_links_to_markdown(all_links: List[Dict]) -> str: 

67 formatted_text = "" 

68 logger.info(f"Formatting {len(all_links)} links to markdown...") 

69 

70 if all_links: 

71 # Group links by URL and collect all their indices 

72 url_to_indices = {} 

73 for link in all_links: 

74 url = link.get("url") 

75 if url is None: 

76 url = link.get("link") 

77 index = link.get("index", "") 

78 # logger.info(f"URL \n {str(url)} ") 

79 if url: 

80 if url not in url_to_indices: 

81 url_to_indices[url] = [] 

82 url_to_indices[url].append(index) 

83 

84 # Format each unique URL with all its indices 

85 seen_urls = set() # Initialize the set here 

86 for link in all_links: 

87 url = link.get("url") 

88 if url is None: 

89 url = link.get("link") 

90 title = link.get("title", "Untitled") 

91 if url and url not in seen_urls: 

92 # Get all indices for this URL 

93 indices = sorted( 

94 set(url_to_indices[url]) 

95 ) # Sort for consistent ordering 

96 # Format as [1, 3, 5] if multiple indices, or just [1] if single 

97 indices_str = f"[{', '.join(map(str, indices))}]" 

98 # Add (source nr) as a visible fallback for humans 

99 formatted_text += f"{indices_str} {title} (source nr: {', '.join(map(str, indices))})\n URL: {url}\n\n" 

100 seen_urls.add(url) 

101 

102 formatted_text += "\n" 

103 

104 return formatted_text 

105 

106 

107def format_findings( 

108 findings_list: List[Dict], 

109 synthesized_content: str, 

110 questions_by_iteration: Dict[int, List[str]], 

111) -> str: 

112 """Format findings into a detailed text output. 

113 

114 Args: 

115 findings_list: List of finding dictionaries 

116 synthesized_content: The synthesized content from the LLM. 

117 questions_by_iteration: Dictionary mapping iteration numbers to lists of questions 

118 

119 Returns: 

120 str: Formatted text output 

121 """ 

122 logger.info( 

123 f"Inside format_findings utility. Findings count: {len(findings_list)}, Questions iterations: {len(questions_by_iteration)}" 

124 ) 

125 formatted_text = "" 

126 

127 # Extract all sources from findings 

128 all_links = [] 

129 for finding in findings_list: 

130 search_results = finding.get("search_results", []) 

131 if search_results: 

132 try: 

133 links = extract_links_from_search_results(search_results) 

134 all_links.extend(links) 

135 except Exception: 

136 logger.exception("Error processing search results/links") 

137 

138 # Start with the synthesized content (passed as synthesized_content) 

139 formatted_text += f"{synthesized_content}\n\n" 

140 

141 # Add sources section after synthesized content if sources exist 

142 formatted_text += format_links_to_markdown(all_links) 

143 

144 formatted_text += "\n\n" # Separator after synthesized content 

145 

146 # Add Search Questions by Iteration section 

147 if questions_by_iteration: 

148 formatted_text += "## SEARCH QUESTIONS BY ITERATION\n" 

149 formatted_text += "\n" 

150 for iter_num, questions in questions_by_iteration.items(): 

151 formatted_text += f"\n #### Iteration {iter_num}:\n" 

152 for i, q in enumerate(questions, 1): 

153 formatted_text += f"{i}. {q}\n" 

154 formatted_text += "\n" + "\n\n" 

155 else: 

156 logger.warning("No questions by iteration found to format.") 

157 

158 # Add Detailed Findings section 

159 if findings_list: 

160 formatted_text += "## DETAILED FINDINGS\n\n" 

161 logger.info(f"Formatting {len(findings_list)} detailed finding items.") 

162 

163 for idx, finding in enumerate(findings_list): 

164 logger.debug( 

165 f"Formatting finding item {idx}. Keys: {list(finding.keys())}" 

166 ) 

167 # Use .get() for safety 

168 phase = finding.get("phase", "Unknown Phase") 

169 content = finding.get("content", "No content available.") 

170 search_results = finding.get("search_results", []) 

171 

172 # Phase header 

173 formatted_text += "\n" 

174 formatted_text += f"### {phase}\n" 

175 formatted_text += "\n\n" 

176 

177 question_displayed = False 

178 # If this is a follow-up phase, try to show the corresponding question 

179 if isinstance(phase, str) and phase.startswith("Follow-up"): 

180 try: 

181 parts = phase.replace("Follow-up Iteration ", "").split(".") 

182 if len(parts) == 2: 

183 iteration = int(parts[0]) 

184 question_index = int(parts[1]) - 1 

185 if ( 

186 iteration in questions_by_iteration 

187 and 0 

188 <= question_index 

189 < len(questions_by_iteration[iteration]) 

190 ): 

191 formatted_text += f"#### {questions_by_iteration[iteration][question_index]}\n\n" 

192 question_displayed = True 

193 else: 

194 logger.warning( 

195 f"Could not find matching question for phase: {phase}" 

196 ) 

197 else: 

198 logger.warning( 

199 f"Could not parse iteration/index from phase: {phase}" 

200 ) 

201 except ValueError: 

202 logger.warning( 

203 f"Could not parse iteration/index from phase: {phase}" 

204 ) 

205 # Handle Sub-query phases from IterDRAG strategy 

206 elif isinstance(phase, str) and phase.startswith("Sub-query"): 

207 try: 

208 # Extract the index number from "Sub-query X" 

209 query_index = int(phase.replace("Sub-query ", "")) - 1 

210 # In IterDRAG, sub-queries are stored in iteration 0 

211 if 0 in questions_by_iteration and query_index < len( 

212 questions_by_iteration[0] 

213 ): 

214 formatted_text += ( 

215 f"#### {questions_by_iteration[0][query_index]}\n\n" 

216 ) 

217 question_displayed = True 

218 else: 

219 logger.warning( 

220 f"Could not find matching question for phase: {phase}" 

221 ) 

222 except ValueError: 

223 logger.warning( 

224 f"Could not parse question index from phase: {phase}" 

225 ) 

226 

227 # If the question is in the finding itself, display it 

228 if ( 

229 not question_displayed 

230 and "question" in finding 

231 and finding["question"] 

232 ): 

233 formatted_text += ( 

234 f"### SEARCH QUESTION:\n{finding['question']}\n\n" 

235 ) 

236 

237 # Content 

238 formatted_text += f"\n\n{content}\n\n" 

239 

240 # Search results if they exist 

241 if search_results: 

242 try: 

243 links = extract_links_from_search_results(search_results) 

244 if links: 

245 formatted_text += "### SOURCES USED IN THIS SECTION:\n" 

246 formatted_text += ( 

247 format_links_to_markdown(links) + "\n\n" 

248 ) 

249 except Exception: 

250 logger.exception( 

251 f"Error processing search results/links for finding {idx}" 

252 ) 

253 else: 

254 logger.debug(f"No search_results found for finding item {idx}.") 

255 

256 formatted_text += f"{'_' * 80}\n\n" 

257 else: 

258 logger.warning("No detailed findings found to format.") 

259 

260 # Add summary of all sources at the end 

261 if all_links: 

262 formatted_text += "## ALL SOURCES:\n" 

263 formatted_text += format_links_to_markdown(all_links) 

264 else: 

265 logger.info("No unique sources found across all findings to list.") 

266 

267 logger.info("Finished format_findings utility.") 

268 return formatted_text 

269 

270 

271def print_search_results(search_results): 

272 formatted_text = "" 

273 links = extract_links_from_search_results(search_results) 

274 if links: 

275 formatted_text = format_links_to_markdown(links) 

276 logger.info(formatted_text)