Coverage for src / local_deep_research / utilities / search_utilities.py: 94%

138 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1import re 

2from typing import Dict, List 

3 

4from loguru import logger 

5 

6 

7LANGUAGE_CODE_MAP = { 

8 "english": "en", 

9 "french": "fr", 

10 "german": "de", 

11 "spanish": "es", 

12 "italian": "it", 

13 "japanese": "ja", 

14 "chinese": "zh", 

15} 

16 

17 

18def remove_think_tags(text: str) -> str: 

19 # Remove paired <think>...</think> tags 

20 text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL) 

21 # Remove any orphaned opening or closing think tags 

22 text = re.sub(r"</think>", "", text) 

23 text = re.sub(r"<think>", "", text) 

24 return text.strip() 

25 

26 

27def extract_links_from_search_results(search_results: List[Dict]) -> List[Dict]: 

28 """ 

29 Extracts links and titles from a list of search result dictionaries. 

30 

31 Each dictionary is expected to have at least the keys "title" and "link". 

32 

33 Returns a list of dictionaries with 'title' and 'url' keys. 

34 """ 

35 links = [] 

36 if not search_results: 

37 return links 

38 

39 for result in search_results: 

40 try: 

41 # Ensure we handle None values safely before calling strip() 

42 title = result.get("title", "") 

43 url = result.get("link", "") 

44 index = result.get("index", "") 

45 

46 # Apply strip() only if the values are not None 

47 title = title.strip() if title is not None else "" 

48 url = url.strip() if url is not None else "" 

49 index = index.strip() if index is not None else "" 

50 

51 if title and url: 

52 links.append({"title": title, "url": url, "index": index}) 

53 except Exception: 

54 # Log the specific error for debugging 

55 logger.exception("Error extracting link from result") 

56 continue 

57 return links 

58 

59 

60def format_links_to_markdown(all_links: List[Dict]) -> str: 

61 formatted_text = "" 

62 logger.info(f"Formatting {len(all_links)} links to markdown...") 

63 

64 if all_links: 

65 # Group links by URL and collect all their indices 

66 url_to_indices = {} 

67 for link in all_links: 

68 url = link.get("url") 

69 if url is None: 

70 url = link.get("link") 

71 index = link.get("index", "") 

72 # logger.info(f"URL \n {str(url)} ") 

73 if url: 73 ↛ 67line 73 didn't jump to line 67 because the condition on line 73 was always true

74 if url not in url_to_indices: 

75 url_to_indices[url] = [] 

76 url_to_indices[url].append(index) 

77 

78 # Format each unique URL with all its indices 

79 seen_urls = set() # Initialize the set here 

80 for link in all_links: 

81 url = link.get("url") 

82 if url is None: 

83 url = link.get("link") 

84 title = link.get("title", "Untitled") 

85 if url and url not in seen_urls: 

86 # Get all indices for this URL 

87 indices = sorted( 

88 set(url_to_indices[url]) 

89 ) # Sort for consistent ordering 

90 # Format as [1, 3, 5] if multiple indices, or just [1] if single 

91 indices_str = f"[{', '.join(map(str, indices))}]" 

92 # Add (source nr) as a visible fallback for humans 

93 formatted_text += f"{indices_str} {title} (source nr: {', '.join(map(str, indices))})\n URL: {url}\n\n" 

94 seen_urls.add(url) 

95 

96 formatted_text += "\n" 

97 

98 return formatted_text 

99 

100 

101def format_findings( 

102 findings_list: List[Dict], 

103 synthesized_content: str, 

104 questions_by_iteration: Dict[int, List[str]], 

105) -> str: 

106 """Format findings into a detailed text output. 

107 

108 Args: 

109 findings_list: List of finding dictionaries 

110 synthesized_content: The synthesized content from the LLM. 

111 questions_by_iteration: Dictionary mapping iteration numbers to lists of questions 

112 

113 Returns: 

114 str: Formatted text output 

115 """ 

116 logger.info( 

117 f"Inside format_findings utility. Findings count: {len(findings_list)}, Questions iterations: {len(questions_by_iteration)}" 

118 ) 

119 formatted_text = "" 

120 

121 # Extract all sources from findings 

122 all_links = [] 

123 for finding in findings_list: 

124 search_results = finding.get("search_results", []) 

125 if search_results: 

126 try: 

127 links = extract_links_from_search_results(search_results) 

128 all_links.extend(links) 

129 except Exception: 

130 logger.exception("Error processing search results/links") 

131 

132 # Start with the synthesized content (passed as synthesized_content) 

133 formatted_text += f"{synthesized_content}\n\n" 

134 

135 # Add sources section after synthesized content if sources exist 

136 formatted_text += format_links_to_markdown(all_links) 

137 

138 formatted_text += "\n\n" # Separator after synthesized content 

139 

140 # Add Search Questions by Iteration section 

141 if questions_by_iteration: 

142 formatted_text += "## SEARCH QUESTIONS BY ITERATION\n" 

143 formatted_text += "\n" 

144 for iter_num, questions in questions_by_iteration.items(): 

145 formatted_text += f"\n #### Iteration {iter_num}:\n" 

146 for i, q in enumerate(questions, 1): 

147 formatted_text += f"{i}. {q}\n" 

148 formatted_text += "\n" + "\n\n" 

149 else: 

150 logger.warning("No questions by iteration found to format.") 

151 

152 # Add Detailed Findings section 

153 if findings_list: 

154 formatted_text += "## DETAILED FINDINGS\n\n" 

155 logger.info(f"Formatting {len(findings_list)} detailed finding items.") 

156 

157 for idx, finding in enumerate(findings_list): 

158 logger.debug( 

159 f"Formatting finding item {idx}. Keys: {list(finding.keys())}" 

160 ) 

161 # Use .get() for safety 

162 phase = finding.get("phase", "Unknown Phase") 

163 content = finding.get("content", "No content available.") 

164 search_results = finding.get("search_results", []) 

165 

166 # Phase header 

167 formatted_text += "\n" 

168 formatted_text += f"### {phase}\n" 

169 formatted_text += "\n\n" 

170 

171 question_displayed = False 

172 # If this is a follow-up phase, try to show the corresponding question 

173 if isinstance(phase, str) and phase.startswith("Follow-up"): 

174 try: 

175 parts = phase.replace("Follow-up Iteration ", "").split(".") 

176 if len(parts) == 2: 176 ↛ 192line 176 didn't jump to line 192 because the condition on line 176 was always true

177 iteration = int(parts[0]) 

178 question_index = int(parts[1]) - 1 

179 if ( 

180 iteration in questions_by_iteration 

181 and 0 

182 <= question_index 

183 < len(questions_by_iteration[iteration]) 

184 ): 

185 formatted_text += f"#### {questions_by_iteration[iteration][question_index]}\n\n" 

186 question_displayed = True 

187 else: 

188 logger.warning( 

189 f"Could not find matching question for phase: {phase}" 

190 ) 

191 else: 

192 logger.warning( 

193 f"Could not parse iteration/index from phase: {phase}" 

194 ) 

195 except ValueError: 

196 logger.warning( 

197 f"Could not parse iteration/index from phase: {phase}" 

198 ) 

199 # Handle Sub-query phases from IterDRAG strategy 

200 elif isinstance(phase, str) and phase.startswith("Sub-query"): 

201 try: 

202 # Extract the index number from "Sub-query X" 

203 query_index = int(phase.replace("Sub-query ", "")) - 1 

204 # In IterDRAG, sub-queries are stored in iteration 0 

205 if 0 in questions_by_iteration and query_index < len( 205 ↛ 213line 205 didn't jump to line 213 because the condition on line 205 was always true

206 questions_by_iteration[0] 

207 ): 

208 formatted_text += ( 

209 f"#### {questions_by_iteration[0][query_index]}\n\n" 

210 ) 

211 question_displayed = True 

212 else: 

213 logger.warning( 

214 f"Could not find matching question for phase: {phase}" 

215 ) 

216 except ValueError: 

217 logger.warning( 

218 f"Could not parse question index from phase: {phase}" 

219 ) 

220 

221 # If the question is in the finding itself, display it 

222 if ( 

223 not question_displayed 

224 and "question" in finding 

225 and finding["question"] 

226 ): 

227 formatted_text += ( 

228 f"### SEARCH QUESTION:\n{finding['question']}\n\n" 

229 ) 

230 

231 # Content 

232 formatted_text += f"\n\n{content}\n\n" 

233 

234 # Search results if they exist 

235 if search_results: 

236 try: 

237 links = extract_links_from_search_results(search_results) 

238 if links: 238 ↛ 250line 238 didn't jump to line 250 because the condition on line 238 was always true

239 formatted_text += "### SOURCES USED IN THIS SECTION:\n" 

240 formatted_text += ( 

241 format_links_to_markdown(links) + "\n\n" 

242 ) 

243 except Exception: 

244 logger.exception( 

245 f"Error processing search results/links for finding {idx}" 

246 ) 

247 else: 

248 logger.debug(f"No search_results found for finding item {idx}.") 

249 

250 formatted_text += f"{'_' * 80}\n\n" 

251 else: 

252 logger.warning("No detailed findings found to format.") 

253 

254 # Add summary of all sources at the end 

255 if all_links: 

256 formatted_text += "## ALL SOURCES:\n" 

257 formatted_text += format_links_to_markdown(all_links) 

258 else: 

259 logger.info("No unique sources found across all findings to list.") 

260 

261 logger.info("Finished format_findings utility.") 

262 return formatted_text 

263 

264 

265def print_search_results(search_results): 

266 formatted_text = "" 

267 links = extract_links_from_search_results(search_results) 

268 if links: 

269 formatted_text = format_links_to_markdown(links) 

270 logger.info(formatted_text)