Coverage for src / local_deep_research / web_search_engines / engines / full_search.py: 99%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1from loguru import logger 

2from datetime import datetime, UTC 

3from typing import Any, Dict, List, Optional, Protocol, runtime_checkable 

4 

5from langchain_core.language_models import BaseLLM 

6 

7from ...config.search_config import QUALITY_CHECK_DDG_URLS 

8from ...research_library.downloaders.extraction import ( 

9 batch_fetch_and_extract, 

10) 

11from ...security.ssrf_validator import validate_url 

12from ...utilities.json_utils import extract_json, get_llm_response_text 

13 

14 

15@runtime_checkable 

16class _Invokable(Protocol): 

17 def invoke(self, query: str) -> Any: ... 17 ↛ exitline 17 didn't return from function 'invoke' because

18 

19 

20class FullSearchResults: 

21 def __init__( 

22 self, 

23 llm: Optional[BaseLLM], 

24 web_search: _Invokable, 

25 output_format: str = "list", 

26 language: str = "English", 

27 max_results: int = 10, 

28 region: str = "wt-wt", 

29 time: Optional[str] = "y", 

30 safesearch: str | int = "Moderate", 

31 ): 

32 self.llm = llm 

33 self.output_format = output_format 

34 self.language = language 

35 self.max_results = max_results 

36 self.region = region 

37 self.time = time 

38 self.safesearch = safesearch 

39 self.web_search = web_search 

40 

41 def check_urls(self, results: List[Dict], query: str) -> List[Dict]: 

42 if not results: 

43 return results 

44 

45 now = datetime.now(UTC) 

46 current_time = now.strftime("%Y-%m-%d") 

47 prompt = f"""ONLY Return a JSON array. The response contains no letters. Evaluate these URLs for: 

48 1. Timeliness (today: {current_time}) 

49 2. Factual accuracy (cross-reference major claims) 

50 3. Source reliability (prefer official company websites, established news outlets) 

51 4. Direct relevance to query: {query} 

52 

53 URLs to evaluate: 

54 {results} 

55 

56 Return a JSON array of indices (0-based) for sources that meet ALL criteria. 

57 ONLY Return a JSON array of indices (0-based) and nothing else. No letters. 

58 Example response: \n[0, 2, 4]\n\n""" 

59 

60 try: 

61 if self.llm is None: 

62 return results 

63 response = self.llm.invoke(prompt) 

64 response_text = get_llm_response_text(response) 

65 good_indices = extract_json(response_text, expected_type=list) 

66 

67 if good_indices is None: 

68 good_indices = [] 

69 

70 return [r for i, r in enumerate(results) if i in good_indices] 

71 except Exception: 

72 logger.exception("URL filtering error") 

73 return [] 

74 

75 def run(self, query: str): 

76 # Step 1: Get search results 

77 search_results = self.web_search.invoke(query) 

78 if not isinstance(search_results, list): 

79 raise ValueError("Expected the search results in list format.") 

80 

81 # Step 2: Filter URLs using LLM 

82 if QUALITY_CHECK_DDG_URLS: 

83 filtered_results = self.check_urls(search_results, query) 

84 else: 

85 filtered_results = search_results 

86 

87 # Extract URLs from filtered results 

88 urls = [ 

89 result.get("link") 

90 for result in filtered_results 

91 if result.get("link") 

92 ] 

93 

94 if not urls: 

95 logger.error("\n === NO VALID LINKS ===\n") 

96 return [] 

97 

98 # SSRF-validate URLs 

99 safe_urls: List[str] = [] 

100 for url in urls: 

101 if url is not None and validate_url(url): 

102 safe_urls.append(url) 

103 else: 

104 logger.warning( 

105 f"SSRF validation blocked URL from full content fetch: {url}. " 

106 "If this is a trusted internal/private resource, note that " 

107 "full content fetching currently only supports public URLs." 

108 ) 

109 

110 if not safe_urls: 

111 logger.warning( 

112 "All URLs were blocked by SSRF validation — returning results " 

113 "without full content. This can happen when search results " 

114 "point to internal/private network addresses." 

115 ) 

116 for result in filtered_results: 

117 result["full_content"] = None 

118 return filtered_results 

119 

120 # Fetch and extract all pages — specialized downloaders (arXiv, 

121 # PubMed, etc.) are tried first, with HTML crawling as fallback. 

122 url_to_content = batch_fetch_and_extract( 

123 safe_urls, language=self.language 

124 ) 

125 

126 nr_full_text = sum(1 for v in url_to_content.values() if v) 

127 for result in filtered_results: 

128 link = result.get("link") 

129 result["full_content"] = url_to_content.get(link) if link else None 

130 

131 logger.info(f"Full search: retrieved content from {nr_full_text} pages") 

132 return filtered_results 

133 

134 def _get_full_content( 

135 self, relevant_items: List[Dict[str, Any]] 

136 ) -> List[Dict[str, Any]]: 

137 """Fetch and attach full content to an existing list of items.""" 

138 urls: List[str] = [] 

139 for item in relevant_items: 

140 link = item.get("link") 

141 if link is not None and validate_url(link): 

142 urls.append(link) 

143 elif link is not None: 

144 logger.warning( 

145 f"SSRF validation blocked URL from full content fetch: {link}." 

146 ) 

147 

148 if not urls: 

149 for item in relevant_items: 

150 item["full_content"] = None 

151 return relevant_items 

152 

153 try: 

154 url_to_content = batch_fetch_and_extract( 

155 urls, language=self.language 

156 ) 

157 except Exception: 

158 logger.exception("Error fetching full content") 

159 for item in relevant_items: 

160 item["full_content"] = None 

161 return relevant_items 

162 

163 for item in relevant_items: 

164 link = item.get("link") 

165 item["full_content"] = url_to_content.get(link) if link else None 

166 

167 return relevant_items 

168 

169 def invoke(self, query: str) -> Any: 

170 return self.run(query) 

171 

172 def __call__(self, query: str) -> Any: 

173 return self.invoke(query)