Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / adaptive_explorer.py: 100%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Adaptive candidate explorer implementation. 

3 

4This explorer adapts its search strategy based on the success of different 

5approaches and the quality of candidates found. 

6""" 

7 

8import time 

9from collections import defaultdict 

10from typing import List, Optional 

11 

12from loguru import logger 

13 

14from ..candidates.base_candidate import Candidate 

15from ..constraints.base_constraint import Constraint 

16from .base_explorer import ( 

17 BaseCandidateExplorer, 

18 ExplorationResult, 

19 ExplorationStrategy, 

20) 

21 

22 

23class AdaptiveExplorer(BaseCandidateExplorer): 

24 """ 

25 Adaptive candidate explorer that learns from search results. 

26 

27 This explorer: 

28 1. Tries different search strategies 

29 2. Tracks which strategies work best 

30 3. Adapts future searches based on success rates 

31 4. Focuses effort on the most productive approaches 

32 """ 

33 

34 def __init__( 

35 self, 

36 *args, 

37 initial_strategies: List[str] = None, 

38 adaptation_threshold: int = 5, # Adapt after this many searches 

39 **kwargs, 

40 ): 

41 """ 

42 Initialize adaptive explorer. 

43 

44 Args: 

45 initial_strategies: Starting search strategies to try 

46 adaptation_threshold: Number of searches before adapting 

47 """ 

48 super().__init__(*args, **kwargs) 

49 

50 self.initial_strategies = initial_strategies or [ 

51 "direct_search", 

52 "synonym_expansion", 

53 "category_exploration", 

54 "related_terms", 

55 ] 

56 

57 self.adaptation_threshold = adaptation_threshold 

58 

59 # Track strategy performance 

60 self.strategy_stats = defaultdict( 

61 lambda: {"attempts": 0, "candidates_found": 0, "quality_sum": 0.0} 

62 ) 

63 self.current_strategy = self.initial_strategies[0] 

64 

65 def explore( 

66 self, 

67 initial_query: str, 

68 constraints: Optional[List[Constraint]] = None, 

69 entity_type: Optional[str] = None, 

70 ) -> ExplorationResult: 

71 """Explore candidates using adaptive strategy.""" 

72 start_time = time.time() 

73 logger.info(f"Starting adaptive exploration for: {initial_query}") 

74 

75 all_candidates = [] 

76 exploration_paths = [] 

77 total_searched = 0 

78 

79 # Track current strategy performance 

80 search_count = 0 

81 

82 while self._should_continue_exploration( 

83 start_time, len(all_candidates) 

84 ): 

85 # Choose strategy based on current performance 

86 strategy = self._choose_strategy(search_count) 

87 

88 # Generate query using chosen strategy 

89 query = self._generate_query_with_strategy( 

90 initial_query, strategy, all_candidates, constraints 

91 ) 

92 

93 if not query or query.lower() in self.explored_queries: 

94 # Try next strategy or stop 

95 if not self._try_next_strategy(): 

96 break 

97 continue 

98 

99 # Execute search 

100 logger.info( 

101 f"Using strategy '{strategy}' for query: {query[:50]}..." 

102 ) 

103 results = self._execute_search(query) 

104 candidates = self._extract_candidates_from_results( 

105 results, entity_type 

106 ) 

107 

108 # Track strategy performance 

109 self._update_strategy_stats(strategy, candidates) 

110 

111 # Add results 

112 all_candidates.extend(candidates) 

113 total_searched += 1 

114 search_count += 1 

115 

116 exploration_paths.append( 

117 f"{strategy}: {query} -> {len(candidates)} candidates" 

118 ) 

119 

120 # Adapt strategy if threshold reached 

121 if search_count >= self.adaptation_threshold: 

122 self._adapt_strategy() 

123 search_count = 0 

124 

125 # Process final results 

126 unique_candidates = self._deduplicate_candidates(all_candidates) 

127 ranked_candidates = self._rank_candidates_by_relevance( 

128 unique_candidates, initial_query 

129 ) 

130 final_candidates = ranked_candidates[: self.max_candidates] 

131 

132 elapsed_time = time.time() - start_time 

133 logger.info( 

134 f"Adaptive exploration completed: {len(final_candidates)} candidates in {elapsed_time:.1f}s" 

135 ) 

136 

137 return ExplorationResult( 

138 candidates=final_candidates, 

139 total_searched=total_searched, 

140 unique_candidates=len(unique_candidates), 

141 exploration_paths=exploration_paths, 

142 metadata={ 

143 "strategy": "adaptive", 

144 "strategy_stats": dict(self.strategy_stats), 

145 "final_strategy": self.current_strategy, 

146 "entity_type": entity_type, 

147 }, 

148 elapsed_time=elapsed_time, 

149 strategy_used=ExplorationStrategy.ADAPTIVE, 

150 ) 

151 

152 def generate_exploration_queries( 

153 self, 

154 base_query: str, 

155 found_candidates: List[Candidate], 

156 constraints: Optional[List[Constraint]] = None, 

157 ) -> List[str]: 

158 """Generate queries using adaptive approach.""" 

159 queries = [] 

160 

161 # Generate queries using best performing strategies 

162 top_strategies = self._get_top_strategies(3) 

163 

164 for strategy in top_strategies: 

165 query = self._generate_query_with_strategy( 

166 base_query, strategy, found_candidates, constraints 

167 ) 

168 if query: 

169 queries.append(query) 

170 

171 return queries 

172 

173 def _choose_strategy(self, search_count: int) -> str: 

174 """Choose the best strategy based on current performance.""" 

175 if search_count < self.adaptation_threshold: 

176 # Use current strategy during initial phase 

177 return self.current_strategy 

178 

179 # Choose best performing strategy 

180 best_strategies = self._get_top_strategies(1) 

181 return best_strategies[0] if best_strategies else self.current_strategy 

182 

183 def _get_top_strategies(self, n: int) -> List[str]: 

184 """Get top N performing strategies.""" 

185 if not self.strategy_stats: 

186 return self.initial_strategies[:n] 

187 

188 # Sort by candidates found per attempt 

189 sorted_strategies = sorted( 

190 self.strategy_stats.items(), 

191 key=lambda x: x[1]["candidates_found"] / max(x[1]["attempts"], 1), 

192 reverse=True, 

193 ) 

194 

195 return [strategy for strategy, _ in sorted_strategies[:n]] 

196 

197 def _generate_query_with_strategy( 

198 self, 

199 base_query: str, 

200 strategy: str, 

201 found_candidates: List[Candidate], 

202 constraints: Optional[List[Constraint]] = None, 

203 ) -> Optional[str]: 

204 """Generate a query using specific strategy.""" 

205 try: 

206 if strategy == "direct_search": 

207 return self._direct_search_query(base_query) 

208 elif strategy == "synonym_expansion": 

209 return self._synonym_expansion_query(base_query) 

210 elif strategy == "category_exploration": 

211 return self._category_exploration_query( 

212 base_query, found_candidates 

213 ) 

214 elif strategy == "related_terms": 

215 return self._related_terms_query(base_query, found_candidates) 

216 elif strategy == "constraint_focused" and constraints: 

217 return self._constraint_focused_query(base_query, constraints) 

218 else: 

219 return self._direct_search_query(base_query) 

220 

221 except Exception: 

222 logger.exception(f"Error generating query with strategy {strategy}") 

223 return None 

224 

225 def _direct_search_query(self, base_query: str) -> str: 

226 """Generate direct search variation.""" 

227 variations = [ 

228 f'"{base_query}" examples', 

229 f"{base_query} list", 

230 f"{base_query} instances", 

231 f"types of {base_query}", 

232 ] 

233 

234 # Choose variation not yet explored 

235 for variation in variations: 

236 if variation.lower() not in self.explored_queries: 

237 return variation 

238 

239 return base_query 

240 

241 def _synonym_expansion_query(self, base_query: str) -> Optional[str]: 

242 """Generate query with synonym expansion.""" 

243 prompt = f""" 

244Generate a search query that means the same as "{base_query}" but uses different words. 

245Focus on synonyms and alternative terminology. 

246 

247Query: 

248""" 

249 

250 try: 

251 response = self.model.invoke(prompt).content.strip() 

252 return response if response != base_query else None 

253 except Exception as e: 

254 logger.debug( 

255 f"Error generating synonym query for '{base_query}': {e}" 

256 ) 

257 return None 

258 

259 def _category_exploration_query( 

260 self, base_query: str, found_candidates: List[Candidate] 

261 ) -> Optional[str]: 

262 """Generate query exploring categories of found candidates.""" 

263 if not found_candidates: 

264 return f"categories of {base_query}" 

265 

266 sample_names = [c.name for c in found_candidates[:3]] 

267 return f"similar to {', '.join(sample_names)}" 

268 

269 def _related_terms_query( 

270 self, base_query: str, found_candidates: List[Candidate] 

271 ) -> Optional[str]: 

272 """Generate query using related terms.""" 

273 prompt = f""" 

274Given the search topic "{base_query}", suggest a related search term that would find similar but different examples. 

275 

276Related search term: 

277""" 

278 

279 try: 

280 response = self.model.invoke(prompt).content.strip() 

281 return response if response != base_query else None 

282 except Exception as e: 

283 logger.debug( 

284 f"Error generating related terms query for '{base_query}': {e}" 

285 ) 

286 return None 

287 

288 def _constraint_focused_query( 

289 self, base_query: str, constraints: List[Constraint] 

290 ) -> Optional[str]: 

291 """Generate query focused on a specific constraint.""" 

292 if not constraints: 

293 return None 

294 

295 # Pick least explored constraint 

296 constraint = constraints[0] # Simple selection 

297 return f"{base_query} {constraint.value}" 

298 

299 def _update_strategy_stats( 

300 self, strategy: str, candidates: List[Candidate] 

301 ): 

302 """Update performance statistics for a strategy.""" 

303 self.strategy_stats[strategy]["attempts"] += 1 

304 self.strategy_stats[strategy]["candidates_found"] += len(candidates) 

305 

306 # Simple quality assessment (could be more sophisticated) 

307 quality = len(candidates) * 0.1 # Basic quality based on quantity 

308 self.strategy_stats[strategy]["quality_sum"] += quality 

309 

310 def _adapt_strategy(self): 

311 """Adapt current strategy based on performance.""" 

312 best_strategies = self._get_top_strategies(1) 

313 if best_strategies and best_strategies[0] != self.current_strategy: 

314 old_strategy = self.current_strategy 

315 self.current_strategy = best_strategies[0] 

316 logger.info( 

317 f"Adapted strategy from '{old_strategy}' to '{self.current_strategy}'" 

318 ) 

319 

320 def _try_next_strategy(self) -> bool: 

321 """Try the next available strategy.""" 

322 current_index = ( 

323 self.initial_strategies.index(self.current_strategy) 

324 if self.current_strategy in self.initial_strategies 

325 else 0 

326 ) 

327 next_index = (current_index + 1) % len(self.initial_strategies) 

328 

329 if next_index == 0: # We've tried all strategies 

330 return False 

331 

332 self.current_strategy = self.initial_strategies[next_index] 

333 return True