Coverage for src/local_deep_research/utilities/search_utilities.py: 96%

179 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1import re 

2from typing import Dict, List 

3 

4from loguru import logger 

5 

6from .url_utils import canonical_url_key 

7 

8 

9LANGUAGE_CODE_MAP = { 

10 "english": "en", 

11 "french": "fr", 

12 "german": "de", 

13 "spanish": "es", 

14 "italian": "it", 

15 "japanese": "ja", 

16 "chinese": "zh", 

17 "hindi": "hi", 

18 "arabic": "ar", 

19 "bengali": "bn", 

20 "portuguese": "pt", 

21 "russian": "ru", 

22 "korean": "ko", 

23} 

24 

25 

26def remove_think_tags(text: str) -> str: 

27 # Remove paired <think>...</think> tags 

28 text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL) 

29 # Remove any orphaned opening or closing think tags 

30 text = re.sub(r"</think>", "", text) 

31 text = re.sub(r"<think>", "", text) 

32 return text.strip() 

33 

34 

35# Sentinel values used by the journal reputation filter alongside the 

36# numeric 1-10 quality scores. Distinguish structurally different 

37# "not scored" cases so the renderer can show the user *why* the tag 

38# isn't a numeric quality tier: 

39# 

40# - QUALITY_PENDING: reference DB hadn't finished building when the 

41# search ran (first-search-during-install case). 

42# - QUALITY_PREPRINT: result has no journal_ref at all (pure arxiv 

43# preprint or similar); there's no venue to score. Distinct from 

44# "venue unknown to our catalog" (that becomes score 3, rendered 

45# as Unranked). 

46QUALITY_PENDING = "pending" 

47QUALITY_PREPRINT = "preprint" 

48 

49 

50def _format_quality_tag(quality) -> str: 

51 """Format a journal quality score as a compact tag for source lists. 

52 

53 The output is plaintext / Markdown. **Do NOT** render the containing 

54 string through a template filter like ``{{ foo|safe }}`` or 

55 ``DOMPurify.sanitize(..., {ALLOWED_TAGS:['a']})`` without first HTML- 

56 escaping the surrounding title — the tag itself is safe, but a 

57 downstream caller that concatenates ``title + quality_tag`` and 

58 emits the result as HTML will leak any tags in ``title`` (XSS). 

59 

60 See :func:`_format_quality_tag_html` for the HTML-safe variant. 

61 

62 Accepts int | None for scored journals, plus the string sentinels 

63 ``QUALITY_PENDING`` and ``QUALITY_PREPRINT``. Every numeric value 

64 in VALID_QUALITY_SCORES has its own explicit branch so a bad 

65 scoring-logic change can't silently rebucket a score — unexpected 

66 values fall through to a debug tag that shows the raw value. 

67 """ 

68 if quality is None: 

69 return "" 

70 if quality == QUALITY_PENDING: 

71 return ( 

72 " [journal quality data is downloading in the background; " 

73 "by the time you open /metrics/journals it may already " 

74 "be complete — re-run this search in a minute to get " 

75 "real quality scores]" 

76 ) 

77 if quality == QUALITY_PREPRINT: 

78 # No venue at all (arxiv preprint / working paper / dataset). 

79 # Distinct from score 3 ("we looked and didn't find the 

80 # venue") — here there's nothing *to* look up. 

81 return " [preprint — not in journal catalog]" 

82 # Numeric tiers. Explicit per-score branches instead of ``>=`` 

83 # ranges so boundary changes can't silently shift a bucket. 

84 if quality == 10: 

85 return " [Q1 ★★★★★]" 

86 # KNOWN-DEFERRED: quality == 9 is a dead branch — 

87 # constants.VALID_QUALITY_SCORES excludes 9 and the filter rejects 

88 # any LLM output of that value. Kept defensively so a future change 

89 # to VALID_QUALITY_SCORES does not require editing the formatter. 

90 # Post-merge candidate for removal together with any score-9 

91 # reintroduction work. 

92 if quality == 9: 

93 return " [Q1 ★★★★★]" 

94 if quality == 8: 

95 return " [Q1 ★★★★]" 

96 if quality == 7: 

97 return " [Q1 ★★★★]" 

98 if quality == 6: 

99 return " [Q2 ★★★]" 

100 if quality == 5: 

101 return " [Q2 ★★★]" 

102 if quality == 4: 

103 # JOURNAL_QUALITY_DEFAULT — venue found in the catalog but 

104 # with no h-index / quartile / DOAJ signal. 

105 return " [Unranked ★]" 

106 if quality == 3: 

107 # Low-confidence fallback — venue didn't match any tier. We 

108 # don't know the journal, not "we know it's low-quality". 

109 return " [Unranked ★]" 

110 if quality == 2: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true

111 return " [Q4 ★]" 

112 if quality == 1: 

113 # Predatory. Usually auto-removed before this renderer sees 

114 # it, but surfaces if whitelisted or the threshold is 1. 

115 return " [Q4 ★]" 

116 # Out-of-set value — VALID_QUALITY_SCORES gates the inputs so this 

117 # is unreachable in normal operation. Show the raw value so bad 

118 # data surfaces visibly instead of silently bucketing into Q4. 

119 return f" [quality={quality!r}]" 

120 

121 

122def _format_quality_tag_html(quality, *, title: str = "") -> str: 

123 """HTML-safe wrapper for :func:`_format_quality_tag`. 

124 

125 Callers that render search-result titles + quality tags into an 

126 HTML page must use this variant and pass the raw ``title`` so both 

127 are escaped together. The quality tag itself is plaintext, but the 

128 brackets and stars are safe to emit verbatim — the danger is the 

129 untrusted ``title`` that a downstream HTML template may concatenate 

130 alongside the tag. 

131 

132 Returns: 

133 ``"{escaped_title}{quality_tag}"`` where ``escaped_title`` is 

134 HTML-escaped with ``html.escape(..., quote=True)`` so quotes, 

135 angle brackets, and ampersands are rendered as text. 

136 """ 

137 import html as _html 

138 

139 return _html.escape(title, quote=True) + _format_quality_tag(quality) 

140 

141 

142def extract_links_from_search_results(search_results: List[Dict]) -> List[Dict]: 

143 """ 

144 Extracts links and titles from a list of search result dictionaries. 

145 

146 Each dictionary is expected to have at least the keys "title" and "link". 

147 

148 Returns a list of dictionaries with 'title' and 'url' keys. 

149 """ 

150 links = [] 

151 if not search_results: 

152 return links 

153 

154 for result in search_results: 

155 try: 

156 # Ensure we handle None values safely before calling strip() 

157 title = result.get("title", "") 

158 url = result.get("link", "") 

159 index = result.get("index", "") 

160 

161 # Apply strip() only if the values are not None 

162 title = title.strip() if title is not None else "" 

163 url = url.strip() if url is not None else "" 

164 index = index.strip() if index is not None else "" 

165 

166 if title and url: 

167 link = { 

168 "title": title, 

169 "url": url, 

170 "index": index, 

171 "journal_quality": result.get("journal_quality"), 

172 } 

173 # Preserve citation-relevant fields from search engines 

174 # so they reach the database (previously lost here) 

175 for key in ( 

176 "doi", 

177 "authors", 

178 "published", 

179 "publication_date", 

180 "year", 

181 "date", 

182 "volume", 

183 "issue", 

184 "pages", 

185 "journal_ref", 

186 "journal", 

187 "venue", 

188 "publisher", 

189 "source_type", 

190 "openalex_source_id", 

191 "source", 

192 "source_engine", 

193 "pmid", 

194 "pmcid", 

195 "arxiv_id", 

196 "isbn", 

197 "citations", 

198 "is_open_access", 

199 "abstract", 

200 "metadata", 

201 ): 

202 val = result.get(key) 

203 if val is not None: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 link[key] = val 

205 links.append(link) 

206 except Exception: 

207 # Log the specific error for debugging 

208 logger.exception("Error extracting link from result") 

209 continue 

210 return links 

211 

212 

213def format_links_to_markdown(all_links: List[Dict]) -> str: 

214 parts: list[str] = [] 

215 logger.info(f"Formatting {len(all_links)} links to markdown...") 

216 

217 if all_links: 

218 # Group links by canonical URL (collapses trailing slash, utm 

219 # params, fragments, default ports, scheme/host case, userinfo). 

220 # The canonical form is also what gets displayed so the Sources 

221 # section stays clean — no utm_*/fbclid clutter, no embedded 

222 # credentials, no scheme/host casing noise. Click-through is 

223 # unaffected (tracking params carry no content). 

224 url_to_indices: dict[str, list] = {} 

225 canon_to_title: dict[str, str] = {} 

226 canon_to_quality: dict[str, int] = {} 

227 # Track the RAG/library collection name per canonical URL so the 

228 # citation formatter's source-tagged mode can surface it as the 

229 # citation tag (e.g. `[mypapers-7]`) instead of falling back to 

230 # the generic `local` label. 

231 canon_to_collection: dict[str, str] = {} 

232 for link in all_links: 

233 raw = link.get("url") or link.get("link") or "" 

234 canon = canonical_url_key(raw) 

235 if not canon: 

236 continue 

237 url_to_indices.setdefault(canon, []).append(link.get("index", "")) 

238 canon_to_title.setdefault(canon, link.get("title", "Untitled")) 

239 # Track journal quality per canonical URL (first non-None wins) 

240 if canon not in canon_to_quality and link.get("journal_quality"): 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 canon_to_quality[canon] = link["journal_quality"] 

242 # First non-empty collection name wins (mirrors title/quality). 

243 if canon not in canon_to_collection: 

244 metadata = link.get("metadata") or {} 

245 collection = metadata.get("collection_name") 

246 if collection: 

247 canon_to_collection[canon] = str(collection) 

248 

249 # Emit each unique source once, in first-seen order. 

250 seen: set[str] = set() 

251 for link in all_links: 

252 raw = link.get("url") or link.get("link") or "" 

253 canon = canonical_url_key(raw) 

254 if not canon or canon in seen: 

255 continue 

256 title = canon_to_title[canon] 

257 # Indices arrive as int (from strategy enumeration) or str (from 

258 # _build_sources_markdown's fallback). Coerce so dedup collapses 

259 # 1 and "1", and sorted() doesn't TypeError on mixed types. 

260 indices = sorted( 

261 {str(i) for i in url_to_indices[canon]}, 

262 key=lambda s: (0, int(s)) if s.isdigit() else (1, s), 

263 ) 

264 indices_str = f"[{', '.join(indices)}]" 

265 quality_tag = _format_quality_tag(canon_to_quality.get(canon)) 

266 collection_line = ( 

267 f" Collection: {canon_to_collection[canon]}\n" 

268 if canon in canon_to_collection 

269 else "" 

270 ) 

271 parts.append( 

272 f"{indices_str} {title}{quality_tag} " 

273 f"(source nr: {', '.join(map(str, indices))})\n" 

274 f" URL: {canon}\n" 

275 f"{collection_line}" 

276 f"\n" 

277 ) 

278 seen.add(canon) 

279 

280 parts.append("\n") 

281 

282 return "".join(parts) 

283 

284 

285def format_findings( 

286 findings_list: List[Dict], 

287 synthesized_content: str, 

288 questions_by_iteration: Dict[int, List[str]], 

289) -> str: 

290 """Format findings into a detailed text output. 

291 

292 Args: 

293 findings_list: List of finding dictionaries 

294 synthesized_content: The synthesized content from the LLM. 

295 questions_by_iteration: Dictionary mapping iteration numbers to lists of questions 

296 

297 Returns: 

298 str: Formatted text output 

299 """ 

300 logger.info( 

301 f"Inside format_findings utility. Findings count: {len(findings_list)}, Questions iterations: {len(questions_by_iteration)}" 

302 ) 

303 parts: list[str] = [] 

304 

305 # Extract all sources from findings 

306 all_links = [] 

307 for finding in findings_list: 

308 search_results = finding.get("search_results", []) 

309 if search_results: 

310 try: 

311 links = extract_links_from_search_results(search_results) 

312 all_links.extend(links) 

313 except Exception: 

314 logger.exception("Error processing search results/links") 

315 

316 # Start with the synthesized content (passed as synthesized_content) 

317 parts.append(f"{synthesized_content}\n\n") 

318 

319 # Add sources section after synthesized content if sources exist 

320 parts.append(format_links_to_markdown(all_links)) 

321 

322 parts.append("\n\n") # Separator after synthesized content 

323 

324 # Add Search Questions by Iteration section 

325 if questions_by_iteration: 

326 parts.append("## SEARCH QUESTIONS BY ITERATION\n") 

327 parts.append("\n") 

328 for iter_num, questions in questions_by_iteration.items(): 

329 parts.append(f"\n #### Iteration {iter_num}:\n") 

330 for i, q in enumerate(questions, 1): 

331 parts.append(f"{i}. {q}\n") 

332 parts.append("\n\n\n") 

333 else: 

334 logger.warning("No questions by iteration found to format.") 

335 

336 # Add Detailed Findings section 

337 if findings_list: 

338 parts.append("## DETAILED FINDINGS\n\n") 

339 logger.info(f"Formatting {len(findings_list)} detailed finding items.") 

340 

341 for idx, finding in enumerate(findings_list): 

342 logger.debug( 

343 f"Formatting finding item {idx}. Keys: {list(finding.keys())}" 

344 ) 

345 # Use .get() for safety 

346 phase = finding.get("phase", "Unknown Phase") 

347 content = finding.get("content", "No content available.") 

348 search_results = finding.get("search_results", []) 

349 

350 # Phase header 

351 parts.append(f"\n### {phase}\n\n\n") 

352 

353 question_displayed = False 

354 # If this is a follow-up phase, try to show the corresponding question 

355 if isinstance(phase, str) and phase.startswith("Follow-up"): 

356 try: 

357 phase_parts = phase.replace( 

358 "Follow-up Iteration ", "" 

359 ).split(".") 

360 if len(phase_parts) == 2: 

361 iteration = int(phase_parts[0]) 

362 question_index = int(phase_parts[1]) - 1 

363 if ( 

364 iteration in questions_by_iteration 

365 and 0 

366 <= question_index 

367 < len(questions_by_iteration[iteration]) 

368 ): 

369 parts.append( 

370 f"#### {questions_by_iteration[iteration][question_index]}\n\n" 

371 ) 

372 question_displayed = True 

373 else: 

374 logger.warning( 

375 f"Could not find matching question for phase: {phase}" 

376 ) 

377 else: 

378 logger.warning( 

379 f"Could not parse iteration/index from phase: {phase}" 

380 ) 

381 except ValueError: 

382 logger.warning( 

383 f"Could not parse iteration/index from phase: {phase}" 

384 ) 

385 # Handle Sub-query phases from IterDRAG strategy 

386 elif isinstance(phase, str) and phase.startswith("Sub-query"): 

387 try: 

388 # Extract the index number from "Sub-query X" 

389 query_index = int(phase.replace("Sub-query ", "")) - 1 

390 # In IterDRAG, sub-queries are stored in iteration 0 

391 if 0 in questions_by_iteration and query_index < len( 

392 questions_by_iteration[0] 

393 ): 

394 parts.append( 

395 f"#### {questions_by_iteration[0][query_index]}\n\n" 

396 ) 

397 question_displayed = True 

398 else: 

399 logger.warning( 

400 f"Could not find matching question for phase: {phase}" 

401 ) 

402 except ValueError: 

403 logger.warning( 

404 f"Could not parse question index from phase: {phase}" 

405 ) 

406 

407 # If the question is in the finding itself, display it 

408 if ( 

409 not question_displayed 

410 and "question" in finding 

411 and finding["question"] 

412 ): 

413 parts.append(f"### SEARCH QUESTION:\n{finding['question']}\n\n") 

414 

415 # Content 

416 parts.append(f"\n\n{content}\n\n") 

417 

418 # Search results if they exist 

419 if search_results: 

420 try: 

421 links = extract_links_from_search_results(search_results) 

422 if links: 

423 parts.append("### SOURCES USED IN THIS SECTION:\n") 

424 parts.append(format_links_to_markdown(links) + "\n\n") 

425 except Exception: 

426 logger.exception( 

427 f"Error processing search results/links for finding {idx}" 

428 ) 

429 else: 

430 logger.debug(f"No search_results found for finding item {idx}.") 

431 

432 parts.append(f"{'_' * 80}\n\n") 

433 else: 

434 logger.warning("No detailed findings found to format.") 

435 

436 # Add summary of all sources at the end 

437 if all_links: 

438 parts.append("## ALL SOURCES:\n") 

439 parts.append(format_links_to_markdown(all_links)) 

440 else: 

441 logger.info("No unique sources found across all findings to list.") 

442 

443 logger.info("Finished format_findings utility.") 

444 return "".join(parts)