Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / progressive_explorer.py: 12%

119 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Progressive explorer for BrowseComp-style systematic search exploration. 

3""" 

4 

5import concurrent.futures 

6from dataclasses import dataclass, field 

7from typing import Dict, List, Set, Tuple 

8 

9from loguru import logger 

10 

11from ...utilities.thread_context import preserve_research_context 

12 

13 

14@dataclass 

15class SearchProgress: 

16 """Track search progress and findings.""" 

17 

18 searched_terms: Set[str] = field(default_factory=set) 

19 found_candidates: Dict[str, float] = field( 

20 default_factory=dict 

21 ) # name -> confidence 

22 verified_facts: Dict[str, str] = field( 

23 default_factory=dict 

24 ) # fact -> source 

25 entity_coverage: Dict[str, Set[str]] = field( 

26 default_factory=dict 

27 ) # entity_type -> searched_entities 

28 search_depth: int = 0 

29 

30 def update_coverage(self, entity_type: str, entity: str): 

31 """Update entity coverage tracking.""" 

32 if entity_type not in self.entity_coverage: 

33 self.entity_coverage[entity_type] = set() 

34 self.entity_coverage[entity_type].add(entity.lower()) 

35 

36 def get_uncovered_entities( 

37 self, entities: Dict[str, List[str]] 

38 ) -> Dict[str, List[str]]: 

39 """Get entities that haven't been searched yet.""" 

40 uncovered = {} 

41 for entity_type, entity_list in entities.items(): 

42 covered = self.entity_coverage.get(entity_type, set()) 

43 uncovered_list = [ 

44 e for e in entity_list if e.lower() not in covered 

45 ] 

46 if uncovered_list: 

47 uncovered[entity_type] = uncovered_list 

48 return uncovered 

49 

50 

51class ProgressiveExplorer: 

52 """ 

53 Explorer that implements progressive search strategies for BrowseComp. 

54 

55 Key features: 

56 1. Tracks search progress to avoid redundancy 

57 2. Progressively combines entities 

58 3. Identifies and pursues promising candidates 

59 4. Maintains simple approach without over-filtering 

60 """ 

61 

62 def __init__(self, search_engine, model): 

63 self.search_engine = search_engine 

64 self.model = model 

65 self.progress = SearchProgress() 

66 self.max_results_per_search = 20 # Keep more results 

67 

68 def explore( 

69 self, 

70 queries: List[str], 

71 constraints: List = None, 

72 max_workers: int = 5, 

73 extracted_entities: Dict[str, List[str]] = None, 

74 ) -> Tuple[List, SearchProgress]: 

75 """ 

76 Execute progressive exploration with entity tracking. 

77 

78 Returns both candidates and search progress for strategy use. 

79 """ 

80 all_results = [] 

81 extracted_entities = extracted_entities or {} 

82 

83 # Execute searches in parallel (like source-based strategy) 

84 search_results = self._parallel_search(queries, max_workers) 

85 

86 # Process results without filtering (trust the LLM later) 

87 for query, results in search_results: 

88 self.progress.searched_terms.add(query.lower()) 

89 

90 # Track which entities were covered in this search 

91 self._update_entity_coverage(query, extracted_entities) 

92 

93 # Extract any specific names/candidates from results 

94 candidates = self._extract_candidates_from_results(results, query) 

95 for candidate_name, confidence in candidates.items(): 

96 if candidate_name in self.progress.found_candidates: 

97 # Update confidence if higher 

98 self.progress.found_candidates[candidate_name] = max( 

99 self.progress.found_candidates[candidate_name], 

100 confidence, 

101 ) 

102 else: 

103 self.progress.found_candidates[candidate_name] = confidence 

104 

105 # Keep all results for final synthesis 

106 all_results.extend(results) 

107 

108 self.progress.search_depth += 1 

109 

110 # Return both results and progress 

111 return all_results, self.progress 

112 

113 def generate_verification_searches( 

114 self, 

115 candidates: Dict[str, float], 

116 constraints: List, 

117 max_searches: int = 5, 

118 ) -> List[str]: 

119 """Generate targeted searches to verify top candidates.""" 

120 if not candidates: 

121 return [] 

122 

123 # Get top candidates by confidence 

124 top_candidates = sorted( 

125 candidates.items(), key=lambda x: x[1], reverse=True 

126 )[:3] 

127 

128 verification_searches = [] 

129 for candidate_name, confidence in top_candidates: 

130 # Generate verification searches for this candidate 

131 for constraint in constraints[:2]: # Verify top constraints 

132 search = f'"{candidate_name}" {constraint.description}' 

133 if search.lower() not in self.progress.searched_terms: 

134 verification_searches.append(search) 

135 

136 return verification_searches[:max_searches] 

137 

138 def _extract_candidates_from_results( 

139 self, results: List[Dict], query: str 

140 ) -> Dict[str, float]: 

141 """Extract potential answer candidates from search results.""" 

142 candidates = {} 

143 

144 # Simple extraction based on titles and snippets 

145 for result in results[:10]: # Focus on top results 

146 title = result.get("title", "") 

147 snippet = result.get("snippet", "") 

148 

149 # Look for proper nouns and specific names 

150 # This is simplified - in practice, might use NER or more sophisticated extraction 

151 combined_text = f"{title} {snippet}" 

152 

153 # Extract quoted terms as potential candidates 

154 import re 

155 

156 quoted_terms = re.findall(r'"([^"]+)"', combined_text) 

157 for term in quoted_terms: 

158 if ( 

159 len(term) > 2 and len(term) < 50 

160 ): # Reasonable length for an answer 

161 candidates[term] = 0.3 # Base confidence from appearance 

162 

163 # Boost confidence if appears in title 

164 if title: 

165 # Titles often contain the actual answer 

166 title_words = title.split() 

167 for i in range(len(title_words)): 

168 for j in range(i + 1, min(i + 4, len(title_words) + 1)): 

169 phrase = " ".join(title_words[i:j]) 

170 if ( 

171 len(phrase) > 3 and phrase[0].isupper() 

172 ): # Likely proper noun 

173 candidates[phrase] = candidates.get(phrase, 0) + 0.2 

174 

175 return candidates 

176 

177 def _update_entity_coverage( 

178 self, query: str, entities: Dict[str, List[str]] 

179 ): 

180 """Track which entities have been covered in searches.""" 

181 query_lower = query.lower() 

182 

183 for entity_type, entity_list in entities.items(): 

184 for entity in entity_list: 

185 if entity.lower() in query_lower: 

186 self.progress.update_coverage(entity_type, entity) 

187 

188 def suggest_next_searches( 

189 self, entities: Dict[str, List[str]], max_suggestions: int = 5 

190 ) -> List[str]: 

191 """Suggest next searches based on coverage and findings.""" 

192 suggestions = [] 

193 

194 # 1. Check uncovered entities 

195 uncovered = self.progress.get_uncovered_entities(entities) 

196 

197 # 2. If we have candidates, verify them with uncovered constraints 

198 if self.progress.found_candidates: 

199 top_candidate = max( 

200 self.progress.found_candidates.items(), key=lambda x: x[1] 

201 )[0] 

202 

203 # Combine candidate with uncovered entities 

204 for entity_type, entity_list in uncovered.items(): 

205 for entity in entity_list[:2]: 

206 search = f'"{top_candidate}" {entity}' 

207 if search.lower() not in self.progress.searched_terms: 

208 suggestions.append(search) 

209 

210 # 3. Otherwise, create new combinations of uncovered entities 

211 else: 

212 # Focus on systematic coverage 

213 if uncovered.get("temporal"): 

214 # Year-by-year with key term 

215 key_term = ( 

216 entities.get("names", [""])[0] 

217 or entities.get("descriptors", [""])[0] 

218 ) 

219 for year in uncovered["temporal"][:3]: 

220 search = f"{key_term} {year}".strip() 

221 if search.lower() not in self.progress.searched_terms: 

222 suggestions.append(search) 

223 

224 if uncovered.get("names") and uncovered.get("descriptors"): 

225 # Combine names with descriptors 

226 for name in uncovered["names"][:2]: 

227 for desc in uncovered["descriptors"][:2]: 

228 search = f"{name} {desc}" 

229 if search.lower() not in self.progress.searched_terms: 

230 suggestions.append(search) 

231 

232 return suggestions[:max_suggestions] 

233 

234 def _parallel_search( 

235 self, queries: List[str], max_workers: int 

236 ) -> List[Tuple[str, List[Dict]]]: 

237 """Execute searches in parallel and return results.""" 

238 results = [] 

239 

240 def search_query(query): 

241 try: 

242 search_results = self.search_engine.run(query) 

243 return (query, search_results or []) 

244 except Exception as e: 

245 logger.exception(f"Error searching '{query}': {e!s}") 

246 return (query, []) 

247 

248 # Create context-preserving wrapper for the search function 

249 context_aware_search = preserve_research_context(search_query) 

250 

251 # Run searches in parallel 

252 with concurrent.futures.ThreadPoolExecutor( 

253 max_workers=max_workers 

254 ) as executor: 

255 futures = [ 

256 executor.submit(context_aware_search, q) for q in queries 

257 ] 

258 for future in concurrent.futures.as_completed(futures): 

259 results.append(future.result()) 

260 

261 return results