Coverage for src/local_deep_research/utilities/search_utilities.py: 96%
179 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1import re
2from typing import Dict, List
4from loguru import logger
6from .url_utils import canonical_url_key
9LANGUAGE_CODE_MAP = {
10 "english": "en",
11 "french": "fr",
12 "german": "de",
13 "spanish": "es",
14 "italian": "it",
15 "japanese": "ja",
16 "chinese": "zh",
17 "hindi": "hi",
18 "arabic": "ar",
19 "bengali": "bn",
20 "portuguese": "pt",
21 "russian": "ru",
22 "korean": "ko",
23}
26def remove_think_tags(text: str) -> str:
27 # Remove paired <think>...</think> tags
28 text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL)
29 # Remove any orphaned opening or closing think tags
30 text = re.sub(r"</think>", "", text)
31 text = re.sub(r"<think>", "", text)
32 return text.strip()
35# Sentinel values used by the journal reputation filter alongside the
36# numeric 1-10 quality scores. Distinguish structurally different
37# "not scored" cases so the renderer can show the user *why* the tag
38# isn't a numeric quality tier:
39#
40# - QUALITY_PENDING: reference DB hadn't finished building when the
41# search ran (first-search-during-install case).
42# - QUALITY_PREPRINT: result has no journal_ref at all (pure arxiv
43# preprint or similar); there's no venue to score. Distinct from
44# "venue unknown to our catalog" (that becomes score 3, rendered
45# as Unranked).
46QUALITY_PENDING = "pending"
47QUALITY_PREPRINT = "preprint"
50def _format_quality_tag(quality) -> str:
51 """Format a journal quality score as a compact tag for source lists.
53 The output is plaintext / Markdown. **Do NOT** render the containing
54 string through a template filter like ``{{ foo|safe }}`` or
55 ``DOMPurify.sanitize(..., {ALLOWED_TAGS:['a']})`` without first HTML-
56 escaping the surrounding title — the tag itself is safe, but a
57 downstream caller that concatenates ``title + quality_tag`` and
58 emits the result as HTML will leak any tags in ``title`` (XSS).
60 See :func:`_format_quality_tag_html` for the HTML-safe variant.
62 Accepts int | None for scored journals, plus the string sentinels
63 ``QUALITY_PENDING`` and ``QUALITY_PREPRINT``. Every numeric value
64 in VALID_QUALITY_SCORES has its own explicit branch so a bad
65 scoring-logic change can't silently rebucket a score — unexpected
66 values fall through to a debug tag that shows the raw value.
67 """
68 if quality is None:
69 return ""
70 if quality == QUALITY_PENDING:
71 return (
72 " [journal quality data is downloading in the background; "
73 "by the time you open /metrics/journals it may already "
74 "be complete — re-run this search in a minute to get "
75 "real quality scores]"
76 )
77 if quality == QUALITY_PREPRINT:
78 # No venue at all (arxiv preprint / working paper / dataset).
79 # Distinct from score 3 ("we looked and didn't find the
80 # venue") — here there's nothing *to* look up.
81 return " [preprint — not in journal catalog]"
82 # Numeric tiers. Explicit per-score branches instead of ``>=``
83 # ranges so boundary changes can't silently shift a bucket.
84 if quality == 10:
85 return " [Q1 ★★★★★]"
86 # KNOWN-DEFERRED: quality == 9 is a dead branch —
87 # constants.VALID_QUALITY_SCORES excludes 9 and the filter rejects
88 # any LLM output of that value. Kept defensively so a future change
89 # to VALID_QUALITY_SCORES does not require editing the formatter.
90 # Post-merge candidate for removal together with any score-9
91 # reintroduction work.
92 if quality == 9:
93 return " [Q1 ★★★★★]"
94 if quality == 8:
95 return " [Q1 ★★★★]"
96 if quality == 7:
97 return " [Q1 ★★★★]"
98 if quality == 6:
99 return " [Q2 ★★★]"
100 if quality == 5:
101 return " [Q2 ★★★]"
102 if quality == 4:
103 # JOURNAL_QUALITY_DEFAULT — venue found in the catalog but
104 # with no h-index / quartile / DOAJ signal.
105 return " [Unranked ★]"
106 if quality == 3:
107 # Low-confidence fallback — venue didn't match any tier. We
108 # don't know the journal, not "we know it's low-quality".
109 return " [Unranked ★]"
110 if quality == 2: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true
111 return " [Q4 ★]"
112 if quality == 1:
113 # Predatory. Usually auto-removed before this renderer sees
114 # it, but surfaces if whitelisted or the threshold is 1.
115 return " [Q4 ★]"
116 # Out-of-set value — VALID_QUALITY_SCORES gates the inputs so this
117 # is unreachable in normal operation. Show the raw value so bad
118 # data surfaces visibly instead of silently bucketing into Q4.
119 return f" [quality={quality!r}]"
122def _format_quality_tag_html(quality, *, title: str = "") -> str:
123 """HTML-safe wrapper for :func:`_format_quality_tag`.
125 Callers that render search-result titles + quality tags into an
126 HTML page must use this variant and pass the raw ``title`` so both
127 are escaped together. The quality tag itself is plaintext, but the
128 brackets and stars are safe to emit verbatim — the danger is the
129 untrusted ``title`` that a downstream HTML template may concatenate
130 alongside the tag.
132 Returns:
133 ``"{escaped_title}{quality_tag}"`` where ``escaped_title`` is
134 HTML-escaped with ``html.escape(..., quote=True)`` so quotes,
135 angle brackets, and ampersands are rendered as text.
136 """
137 import html as _html
139 return _html.escape(title, quote=True) + _format_quality_tag(quality)
142def extract_links_from_search_results(search_results: List[Dict]) -> List[Dict]:
143 """
144 Extracts links and titles from a list of search result dictionaries.
146 Each dictionary is expected to have at least the keys "title" and "link".
148 Returns a list of dictionaries with 'title' and 'url' keys.
149 """
150 links = []
151 if not search_results:
152 return links
154 for result in search_results:
155 try:
156 # Ensure we handle None values safely before calling strip()
157 title = result.get("title", "")
158 url = result.get("link", "")
159 index = result.get("index", "")
161 # Apply strip() only if the values are not None
162 title = title.strip() if title is not None else ""
163 url = url.strip() if url is not None else ""
164 index = index.strip() if index is not None else ""
166 if title and url:
167 link = {
168 "title": title,
169 "url": url,
170 "index": index,
171 "journal_quality": result.get("journal_quality"),
172 }
173 # Preserve citation-relevant fields from search engines
174 # so they reach the database (previously lost here)
175 for key in (
176 "doi",
177 "authors",
178 "published",
179 "publication_date",
180 "year",
181 "date",
182 "volume",
183 "issue",
184 "pages",
185 "journal_ref",
186 "journal",
187 "venue",
188 "publisher",
189 "source_type",
190 "openalex_source_id",
191 "source",
192 "source_engine",
193 "pmid",
194 "pmcid",
195 "arxiv_id",
196 "isbn",
197 "citations",
198 "is_open_access",
199 "abstract",
200 "metadata",
201 ):
202 val = result.get(key)
203 if val is not None: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 link[key] = val
205 links.append(link)
206 except Exception:
207 # Log the specific error for debugging
208 logger.exception("Error extracting link from result")
209 continue
210 return links
213def format_links_to_markdown(all_links: List[Dict]) -> str:
214 parts: list[str] = []
215 logger.info(f"Formatting {len(all_links)} links to markdown...")
217 if all_links:
218 # Group links by canonical URL (collapses trailing slash, utm
219 # params, fragments, default ports, scheme/host case, userinfo).
220 # The canonical form is also what gets displayed so the Sources
221 # section stays clean — no utm_*/fbclid clutter, no embedded
222 # credentials, no scheme/host casing noise. Click-through is
223 # unaffected (tracking params carry no content).
224 url_to_indices: dict[str, list] = {}
225 canon_to_title: dict[str, str] = {}
226 canon_to_quality: dict[str, int] = {}
227 # Track the RAG/library collection name per canonical URL so the
228 # citation formatter's source-tagged mode can surface it as the
229 # citation tag (e.g. `[mypapers-7]`) instead of falling back to
230 # the generic `local` label.
231 canon_to_collection: dict[str, str] = {}
232 for link in all_links:
233 raw = link.get("url") or link.get("link") or ""
234 canon = canonical_url_key(raw)
235 if not canon:
236 continue
237 url_to_indices.setdefault(canon, []).append(link.get("index", ""))
238 canon_to_title.setdefault(canon, link.get("title", "Untitled"))
239 # Track journal quality per canonical URL (first non-None wins)
240 if canon not in canon_to_quality and link.get("journal_quality"): 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 canon_to_quality[canon] = link["journal_quality"]
242 # First non-empty collection name wins (mirrors title/quality).
243 if canon not in canon_to_collection:
244 metadata = link.get("metadata") or {}
245 collection = metadata.get("collection_name")
246 if collection:
247 canon_to_collection[canon] = str(collection)
249 # Emit each unique source once, in first-seen order.
250 seen: set[str] = set()
251 for link in all_links:
252 raw = link.get("url") or link.get("link") or ""
253 canon = canonical_url_key(raw)
254 if not canon or canon in seen:
255 continue
256 title = canon_to_title[canon]
257 # Indices arrive as int (from strategy enumeration) or str (from
258 # _build_sources_markdown's fallback). Coerce so dedup collapses
259 # 1 and "1", and sorted() doesn't TypeError on mixed types.
260 indices = sorted(
261 {str(i) for i in url_to_indices[canon]},
262 key=lambda s: (0, int(s)) if s.isdigit() else (1, s),
263 )
264 indices_str = f"[{', '.join(indices)}]"
265 quality_tag = _format_quality_tag(canon_to_quality.get(canon))
266 collection_line = (
267 f" Collection: {canon_to_collection[canon]}\n"
268 if canon in canon_to_collection
269 else ""
270 )
271 parts.append(
272 f"{indices_str} {title}{quality_tag} "
273 f"(source nr: {', '.join(map(str, indices))})\n"
274 f" URL: {canon}\n"
275 f"{collection_line}"
276 f"\n"
277 )
278 seen.add(canon)
280 parts.append("\n")
282 return "".join(parts)
285def format_findings(
286 findings_list: List[Dict],
287 synthesized_content: str,
288 questions_by_iteration: Dict[int, List[str]],
289) -> str:
290 """Format findings into a detailed text output.
292 Args:
293 findings_list: List of finding dictionaries
294 synthesized_content: The synthesized content from the LLM.
295 questions_by_iteration: Dictionary mapping iteration numbers to lists of questions
297 Returns:
298 str: Formatted text output
299 """
300 logger.info(
301 f"Inside format_findings utility. Findings count: {len(findings_list)}, Questions iterations: {len(questions_by_iteration)}"
302 )
303 parts: list[str] = []
305 # Extract all sources from findings
306 all_links = []
307 for finding in findings_list:
308 search_results = finding.get("search_results", [])
309 if search_results:
310 try:
311 links = extract_links_from_search_results(search_results)
312 all_links.extend(links)
313 except Exception:
314 logger.exception("Error processing search results/links")
316 # Start with the synthesized content (passed as synthesized_content)
317 parts.append(f"{synthesized_content}\n\n")
319 # Add sources section after synthesized content if sources exist
320 parts.append(format_links_to_markdown(all_links))
322 parts.append("\n\n") # Separator after synthesized content
324 # Add Search Questions by Iteration section
325 if questions_by_iteration:
326 parts.append("## SEARCH QUESTIONS BY ITERATION\n")
327 parts.append("\n")
328 for iter_num, questions in questions_by_iteration.items():
329 parts.append(f"\n #### Iteration {iter_num}:\n")
330 for i, q in enumerate(questions, 1):
331 parts.append(f"{i}. {q}\n")
332 parts.append("\n\n\n")
333 else:
334 logger.warning("No questions by iteration found to format.")
336 # Add Detailed Findings section
337 if findings_list:
338 parts.append("## DETAILED FINDINGS\n\n")
339 logger.info(f"Formatting {len(findings_list)} detailed finding items.")
341 for idx, finding in enumerate(findings_list):
342 logger.debug(
343 f"Formatting finding item {idx}. Keys: {list(finding.keys())}"
344 )
345 # Use .get() for safety
346 phase = finding.get("phase", "Unknown Phase")
347 content = finding.get("content", "No content available.")
348 search_results = finding.get("search_results", [])
350 # Phase header
351 parts.append(f"\n### {phase}\n\n\n")
353 question_displayed = False
354 # If this is a follow-up phase, try to show the corresponding question
355 if isinstance(phase, str) and phase.startswith("Follow-up"):
356 try:
357 phase_parts = phase.replace(
358 "Follow-up Iteration ", ""
359 ).split(".")
360 if len(phase_parts) == 2:
361 iteration = int(phase_parts[0])
362 question_index = int(phase_parts[1]) - 1
363 if (
364 iteration in questions_by_iteration
365 and 0
366 <= question_index
367 < len(questions_by_iteration[iteration])
368 ):
369 parts.append(
370 f"#### {questions_by_iteration[iteration][question_index]}\n\n"
371 )
372 question_displayed = True
373 else:
374 logger.warning(
375 f"Could not find matching question for phase: {phase}"
376 )
377 else:
378 logger.warning(
379 f"Could not parse iteration/index from phase: {phase}"
380 )
381 except ValueError:
382 logger.warning(
383 f"Could not parse iteration/index from phase: {phase}"
384 )
385 # Handle Sub-query phases from IterDRAG strategy
386 elif isinstance(phase, str) and phase.startswith("Sub-query"):
387 try:
388 # Extract the index number from "Sub-query X"
389 query_index = int(phase.replace("Sub-query ", "")) - 1
390 # In IterDRAG, sub-queries are stored in iteration 0
391 if 0 in questions_by_iteration and query_index < len(
392 questions_by_iteration[0]
393 ):
394 parts.append(
395 f"#### {questions_by_iteration[0][query_index]}\n\n"
396 )
397 question_displayed = True
398 else:
399 logger.warning(
400 f"Could not find matching question for phase: {phase}"
401 )
402 except ValueError:
403 logger.warning(
404 f"Could not parse question index from phase: {phase}"
405 )
407 # If the question is in the finding itself, display it
408 if (
409 not question_displayed
410 and "question" in finding
411 and finding["question"]
412 ):
413 parts.append(f"### SEARCH QUESTION:\n{finding['question']}\n\n")
415 # Content
416 parts.append(f"\n\n{content}\n\n")
418 # Search results if they exist
419 if search_results:
420 try:
421 links = extract_links_from_search_results(search_results)
422 if links:
423 parts.append("### SOURCES USED IN THIS SECTION:\n")
424 parts.append(format_links_to_markdown(links) + "\n\n")
425 except Exception:
426 logger.exception(
427 f"Error processing search results/links for finding {idx}"
428 )
429 else:
430 logger.debug(f"No search_results found for finding item {idx}.")
432 parts.append(f"{'_' * 80}\n\n")
433 else:
434 logger.warning("No detailed findings found to format.")
436 # Add summary of all sources at the end
437 if all_links:
438 parts.append("## ALL SOURCES:\n")
439 parts.append(format_links_to_markdown(all_links))
440 else:
441 logger.info("No unique sources found across all findings to list.")
443 logger.info("Finished format_findings utility.")
444 return "".join(parts)