Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / adaptive_explorer.py: 100%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Adaptive candidate explorer implementation. 

3 

4This explorer adapts its search strategy based on the success of different 

5approaches and the quality of candidates found. 

6""" 

7 

8import time 

9from collections import defaultdict 

10from typing import List, Optional 

11 

12from loguru import logger 

13 

14from ..candidates.base_candidate import Candidate 

15from ..constraints.base_constraint import Constraint 

16from .base_explorer import ( 

17 BaseCandidateExplorer, 

18 ExplorationResult, 

19 ExplorationStrategy, 

20) 

21 

22 

23class AdaptiveExplorer(BaseCandidateExplorer): 

24 """ 

25 Adaptive candidate explorer that learns from search results. 

26 

27 This explorer: 

28 1. Tries different search strategies 

29 2. Tracks which strategies work best 

30 3. Adapts future searches based on success rates 

31 4. Focuses effort on the most productive approaches 

32 """ 

33 

34 def __init__( 

35 self, 

36 *args, 

37 initial_strategies: List[str] = None, 

38 adaptation_threshold: int = 5, # Adapt after this many searches 

39 **kwargs, 

40 ): 

41 """ 

42 Initialize adaptive explorer. 

43 

44 Args: 

45 initial_strategies: Starting search strategies to try 

46 adaptation_threshold: Number of searches before adapting 

47 """ 

48 super().__init__(*args, **kwargs) 

49 

50 self.initial_strategies = initial_strategies or [ 

51 "direct_search", 

52 "synonym_expansion", 

53 "category_exploration", 

54 "related_terms", 

55 ] 

56 

57 self.adaptation_threshold = adaptation_threshold 

58 

59 # Track strategy performance 

60 self.strategy_stats = defaultdict( 

61 lambda: {"attempts": 0, "candidates_found": 0, "quality_sum": 0.0} 

62 ) 

63 self.current_strategy = self.initial_strategies[0] 

64 

65 def explore( 

66 self, 

67 initial_query: str, 

68 constraints: Optional[List[Constraint]] = None, 

69 entity_type: Optional[str] = None, 

70 ) -> ExplorationResult: 

71 """Explore candidates using adaptive strategy.""" 

72 start_time = time.time() 

73 logger.info(f"Starting adaptive exploration for: {initial_query}") 

74 

75 all_candidates = [] 

76 exploration_paths = [] 

77 total_searched = 0 

78 

79 # Track current strategy performance 

80 search_count = 0 

81 

82 while self._should_continue_exploration( 

83 start_time, len(all_candidates) 

84 ): 

85 # Choose strategy based on current performance 

86 strategy = self._choose_strategy(search_count) 

87 

88 # Generate query using chosen strategy 

89 query = self._generate_query_with_strategy( 

90 initial_query, strategy, all_candidates, constraints 

91 ) 

92 

93 if not query or query.lower() in self.explored_queries: 

94 # Try next strategy or stop 

95 if not self._try_next_strategy(): 

96 break 

97 continue 

98 

99 # Execute search 

100 logger.info( 

101 f"Using strategy '{strategy}' for query: {query[:50]}..." 

102 ) 

103 results = self._execute_search(query) 

104 candidates = self._extract_candidates_from_results( 

105 results, entity_type 

106 ) 

107 

108 # Track strategy performance 

109 self._update_strategy_stats(strategy, candidates) 

110 

111 # Add results 

112 all_candidates.extend(candidates) 

113 total_searched += 1 

114 search_count += 1 

115 

116 exploration_paths.append( 

117 f"{strategy}: {query} -> {len(candidates)} candidates" 

118 ) 

119 

120 # Adapt strategy if threshold reached 

121 if search_count >= self.adaptation_threshold: 

122 self._adapt_strategy() 

123 search_count = 0 

124 

125 # Process final results 

126 unique_candidates = self._deduplicate_candidates(all_candidates) 

127 ranked_candidates = self._rank_candidates_by_relevance( 

128 unique_candidates, initial_query 

129 ) 

130 final_candidates = ranked_candidates[: self.max_candidates] 

131 

132 elapsed_time = time.time() - start_time 

133 logger.info( 

134 f"Adaptive exploration completed: {len(final_candidates)} candidates in {elapsed_time:.1f}s" 

135 ) 

136 

137 return ExplorationResult( 

138 candidates=final_candidates, 

139 total_searched=total_searched, 

140 unique_candidates=len(unique_candidates), 

141 exploration_paths=exploration_paths, 

142 metadata={ 

143 "strategy": "adaptive", 

144 "strategy_stats": dict(self.strategy_stats), 

145 "final_strategy": self.current_strategy, 

146 "entity_type": entity_type, 

147 }, 

148 elapsed_time=elapsed_time, 

149 strategy_used=ExplorationStrategy.ADAPTIVE, 

150 ) 

151 

152 def generate_exploration_queries( 

153 self, 

154 base_query: str, 

155 found_candidates: List[Candidate], 

156 constraints: Optional[List[Constraint]] = None, 

157 ) -> List[str]: 

158 """Generate queries using adaptive approach.""" 

159 queries = [] 

160 

161 # Generate queries using best performing strategies 

162 top_strategies = self._get_top_strategies(3) 

163 

164 for strategy in top_strategies: 

165 query = self._generate_query_with_strategy( 

166 base_query, strategy, found_candidates, constraints 

167 ) 

168 if query: 

169 queries.append(query) 

170 

171 return queries 

172 

173 def _choose_strategy(self, search_count: int) -> str: 

174 """Choose the best strategy based on current performance.""" 

175 if search_count < self.adaptation_threshold: 

176 # Use current strategy during initial phase 

177 return self.current_strategy 

178 

179 # Choose best performing strategy 

180 best_strategies = self._get_top_strategies(1) 

181 return best_strategies[0] if best_strategies else self.current_strategy 

182 

183 def _get_top_strategies(self, n: int) -> List[str]: 

184 """Get top N performing strategies.""" 

185 if not self.strategy_stats: 

186 return self.initial_strategies[:n] 

187 

188 # Sort by candidates found per attempt 

189 sorted_strategies = sorted( 

190 self.strategy_stats.items(), 

191 key=lambda x: x[1]["candidates_found"] / max(x[1]["attempts"], 1), 

192 reverse=True, 

193 ) 

194 

195 return [strategy for strategy, _ in sorted_strategies[:n]] 

196 

197 def _generate_query_with_strategy( 

198 self, 

199 base_query: str, 

200 strategy: str, 

201 found_candidates: List[Candidate], 

202 constraints: Optional[List[Constraint]] = None, 

203 ) -> Optional[str]: 

204 """Generate a query using specific strategy.""" 

205 try: 

206 if strategy == "direct_search": 

207 return self._direct_search_query(base_query) 

208 if strategy == "synonym_expansion": 

209 return self._synonym_expansion_query(base_query) 

210 if strategy == "category_exploration": 

211 return self._category_exploration_query( 

212 base_query, found_candidates 

213 ) 

214 if strategy == "related_terms": 

215 return self._related_terms_query(base_query, found_candidates) 

216 if strategy == "constraint_focused" and constraints: 

217 return self._constraint_focused_query(base_query, constraints) 

218 return self._direct_search_query(base_query) 

219 

220 except Exception: 

221 logger.exception(f"Error generating query with strategy {strategy}") 

222 return None 

223 

224 def _direct_search_query(self, base_query: str) -> str: 

225 """Generate direct search variation.""" 

226 variations = [ 

227 f'"{base_query}" examples', 

228 f"{base_query} list", 

229 f"{base_query} instances", 

230 f"types of {base_query}", 

231 ] 

232 

233 # Choose variation not yet explored 

234 for variation in variations: 

235 if variation.lower() not in self.explored_queries: 

236 return variation 

237 

238 return base_query 

239 

240 def _synonym_expansion_query(self, base_query: str) -> Optional[str]: 

241 """Generate query with synonym expansion.""" 

242 prompt = f""" 

243Generate a search query that means the same as "{base_query}" but uses different words. 

244Focus on synonyms and alternative terminology. 

245 

246Query: 

247""" 

248 

249 try: 

250 response = self.model.invoke(prompt).content.strip() 

251 return response if response != base_query else None 

252 except Exception as e: 

253 logger.debug( 

254 f"Error generating synonym query for '{base_query}': {e}" 

255 ) 

256 return None 

257 

258 def _category_exploration_query( 

259 self, base_query: str, found_candidates: List[Candidate] 

260 ) -> Optional[str]: 

261 """Generate query exploring categories of found candidates.""" 

262 if not found_candidates: 

263 return f"categories of {base_query}" 

264 

265 sample_names = [c.name for c in found_candidates[:3]] 

266 return f"similar to {', '.join(sample_names)}" 

267 

268 def _related_terms_query( 

269 self, base_query: str, found_candidates: List[Candidate] 

270 ) -> Optional[str]: 

271 """Generate query using related terms.""" 

272 prompt = f""" 

273Given the search topic "{base_query}", suggest a related search term that would find similar but different examples. 

274 

275Related search term: 

276""" 

277 

278 try: 

279 response = self.model.invoke(prompt).content.strip() 

280 return response if response != base_query else None 

281 except Exception as e: 

282 logger.debug( 

283 f"Error generating related terms query for '{base_query}': {e}" 

284 ) 

285 return None 

286 

287 def _constraint_focused_query( 

288 self, base_query: str, constraints: List[Constraint] 

289 ) -> Optional[str]: 

290 """Generate query focused on a specific constraint.""" 

291 if not constraints: 

292 return None 

293 

294 # Pick least explored constraint 

295 constraint = constraints[0] # Simple selection 

296 return f"{base_query} {constraint.value}" 

297 

298 def _update_strategy_stats( 

299 self, strategy: str, candidates: List[Candidate] 

300 ): 

301 """Update performance statistics for a strategy.""" 

302 self.strategy_stats[strategy]["attempts"] += 1 

303 self.strategy_stats[strategy]["candidates_found"] += len(candidates) 

304 

305 # Simple quality assessment (could be more sophisticated) 

306 quality = len(candidates) * 0.1 # Basic quality based on quantity 

307 self.strategy_stats[strategy]["quality_sum"] += quality 

308 

309 def _adapt_strategy(self): 

310 """Adapt current strategy based on performance.""" 

311 best_strategies = self._get_top_strategies(1) 

312 if best_strategies and best_strategies[0] != self.current_strategy: 

313 old_strategy = self.current_strategy 

314 self.current_strategy = best_strategies[0] 

315 logger.info( 

316 f"Adapted strategy from '{old_strategy}' to '{self.current_strategy}'" 

317 ) 

318 

319 def _try_next_strategy(self) -> bool: 

320 """Try the next available strategy.""" 

321 current_index = ( 

322 self.initial_strategies.index(self.current_strategy) 

323 if self.current_strategy in self.initial_strategies 

324 else 0 

325 ) 

326 next_index = (current_index + 1) % len(self.initial_strategies) 

327 

328 if next_index == 0: # We've tried all strategies 

329 return False 

330 

331 self.current_strategy = self.initial_strategies[next_index] 

332 return True