Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / adaptive_explorer.py: 13%

127 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Adaptive candidate explorer implementation. 

3 

4This explorer adapts its search strategy based on the success of different 

5approaches and the quality of candidates found. 

6""" 

7 

8import time 

9from collections import defaultdict 

10from typing import List, Optional 

11 

12from loguru import logger 

13 

14from ..candidates.base_candidate import Candidate 

15from ..constraints.base_constraint import Constraint 

16from .base_explorer import ( 

17 BaseCandidateExplorer, 

18 ExplorationResult, 

19 ExplorationStrategy, 

20) 

21 

22 

23class AdaptiveExplorer(BaseCandidateExplorer): 

24 """ 

25 Adaptive candidate explorer that learns from search results. 

26 

27 This explorer: 

28 1. Tries different search strategies 

29 2. Tracks which strategies work best 

30 3. Adapts future searches based on success rates 

31 4. Focuses effort on the most productive approaches 

32 """ 

33 

34 def __init__( 

35 self, 

36 *args, 

37 initial_strategies: List[str] = None, 

38 adaptation_threshold: int = 5, # Adapt after this many searches 

39 **kwargs, 

40 ): 

41 """ 

42 Initialize adaptive explorer. 

43 

44 Args: 

45 initial_strategies: Starting search strategies to try 

46 adaptation_threshold: Number of searches before adapting 

47 """ 

48 super().__init__(*args, **kwargs) 

49 

50 self.initial_strategies = initial_strategies or [ 

51 "direct_search", 

52 "synonym_expansion", 

53 "category_exploration", 

54 "related_terms", 

55 ] 

56 

57 self.adaptation_threshold = adaptation_threshold 

58 

59 # Track strategy performance 

60 self.strategy_stats = defaultdict( 

61 lambda: {"attempts": 0, "candidates_found": 0, "quality_sum": 0.0} 

62 ) 

63 self.current_strategy = self.initial_strategies[0] 

64 

65 def explore( 

66 self, 

67 initial_query: str, 

68 constraints: Optional[List[Constraint]] = None, 

69 entity_type: Optional[str] = None, 

70 ) -> ExplorationResult: 

71 """Explore candidates using adaptive strategy.""" 

72 start_time = time.time() 

73 logger.info(f"Starting adaptive exploration for: {initial_query}") 

74 

75 all_candidates = [] 

76 exploration_paths = [] 

77 total_searched = 0 

78 

79 # Track current strategy performance 

80 search_count = 0 

81 

82 while self._should_continue_exploration( 

83 start_time, len(all_candidates) 

84 ): 

85 # Choose strategy based on current performance 

86 strategy = self._choose_strategy(search_count) 

87 

88 # Generate query using chosen strategy 

89 query = self._generate_query_with_strategy( 

90 initial_query, strategy, all_candidates, constraints 

91 ) 

92 

93 if not query or query.lower() in self.explored_queries: 

94 # Try next strategy or stop 

95 if not self._try_next_strategy(): 

96 break 

97 continue 

98 

99 # Execute search 

100 logger.info( 

101 f"Using strategy '{strategy}' for query: {query[:50]}..." 

102 ) 

103 results = self._execute_search(query) 

104 candidates = self._extract_candidates_from_results( 

105 results, entity_type 

106 ) 

107 

108 # Track strategy performance 

109 self._update_strategy_stats(strategy, candidates) 

110 

111 # Add results 

112 all_candidates.extend(candidates) 

113 total_searched += 1 

114 search_count += 1 

115 

116 exploration_paths.append( 

117 f"{strategy}: {query} -> {len(candidates)} candidates" 

118 ) 

119 

120 # Adapt strategy if threshold reached 

121 if search_count >= self.adaptation_threshold: 

122 self._adapt_strategy() 

123 search_count = 0 

124 

125 # Process final results 

126 unique_candidates = self._deduplicate_candidates(all_candidates) 

127 ranked_candidates = self._rank_candidates_by_relevance( 

128 unique_candidates, initial_query 

129 ) 

130 final_candidates = ranked_candidates[: self.max_candidates] 

131 

132 elapsed_time = time.time() - start_time 

133 logger.info( 

134 f"Adaptive exploration completed: {len(final_candidates)} candidates in {elapsed_time:.1f}s" 

135 ) 

136 

137 return ExplorationResult( 

138 candidates=final_candidates, 

139 total_searched=total_searched, 

140 unique_candidates=len(unique_candidates), 

141 exploration_paths=exploration_paths, 

142 metadata={ 

143 "strategy": "adaptive", 

144 "strategy_stats": dict(self.strategy_stats), 

145 "final_strategy": self.current_strategy, 

146 "entity_type": entity_type, 

147 }, 

148 elapsed_time=elapsed_time, 

149 strategy_used=ExplorationStrategy.ADAPTIVE, 

150 ) 

151 

152 def generate_exploration_queries( 

153 self, 

154 base_query: str, 

155 found_candidates: List[Candidate], 

156 constraints: Optional[List[Constraint]] = None, 

157 ) -> List[str]: 

158 """Generate queries using adaptive approach.""" 

159 queries = [] 

160 

161 # Generate queries using best performing strategies 

162 top_strategies = self._get_top_strategies(3) 

163 

164 for strategy in top_strategies: 

165 query = self._generate_query_with_strategy( 

166 base_query, strategy, found_candidates, constraints 

167 ) 

168 if query: 

169 queries.append(query) 

170 

171 return queries 

172 

173 def _choose_strategy(self, search_count: int) -> str: 

174 """Choose the best strategy based on current performance.""" 

175 if search_count < self.adaptation_threshold: 

176 # Use current strategy during initial phase 

177 return self.current_strategy 

178 

179 # Choose best performing strategy 

180 best_strategies = self._get_top_strategies(1) 

181 return best_strategies[0] if best_strategies else self.current_strategy 

182 

183 def _get_top_strategies(self, n: int) -> List[str]: 

184 """Get top N performing strategies.""" 

185 if not self.strategy_stats: 

186 return self.initial_strategies[:n] 

187 

188 # Sort by candidates found per attempt 

189 sorted_strategies = sorted( 

190 self.strategy_stats.items(), 

191 key=lambda x: x[1]["candidates_found"] / max(x[1]["attempts"], 1), 

192 reverse=True, 

193 ) 

194 

195 return [strategy for strategy, _ in sorted_strategies[:n]] 

196 

197 def _generate_query_with_strategy( 

198 self, 

199 base_query: str, 

200 strategy: str, 

201 found_candidates: List[Candidate], 

202 constraints: Optional[List[Constraint]] = None, 

203 ) -> Optional[str]: 

204 """Generate a query using specific strategy.""" 

205 try: 

206 if strategy == "direct_search": 

207 return self._direct_search_query(base_query) 

208 elif strategy == "synonym_expansion": 

209 return self._synonym_expansion_query(base_query) 

210 elif strategy == "category_exploration": 

211 return self._category_exploration_query( 

212 base_query, found_candidates 

213 ) 

214 elif strategy == "related_terms": 

215 return self._related_terms_query(base_query, found_candidates) 

216 elif strategy == "constraint_focused" and constraints: 

217 return self._constraint_focused_query(base_query, constraints) 

218 else: 

219 return self._direct_search_query(base_query) 

220 

221 except Exception as e: 

222 logger.exception( 

223 f"Error generating query with strategy {strategy}: {e}" 

224 ) 

225 return None 

226 

227 def _direct_search_query(self, base_query: str) -> str: 

228 """Generate direct search variation.""" 

229 variations = [ 

230 f'"{base_query}" examples', 

231 f"{base_query} list", 

232 f"{base_query} instances", 

233 f"types of {base_query}", 

234 ] 

235 

236 # Choose variation not yet explored 

237 for variation in variations: 

238 if variation.lower() not in self.explored_queries: 

239 return variation 

240 

241 return base_query 

242 

243 def _synonym_expansion_query(self, base_query: str) -> Optional[str]: 

244 """Generate query with synonym expansion.""" 

245 prompt = f""" 

246Generate a search query that means the same as "{base_query}" but uses different words. 

247Focus on synonyms and alternative terminology. 

248 

249Query: 

250""" 

251 

252 try: 

253 response = self.model.invoke(prompt).content.strip() 

254 return response if response != base_query else None 

255 except: 

256 return None 

257 

258 def _category_exploration_query( 

259 self, base_query: str, found_candidates: List[Candidate] 

260 ) -> Optional[str]: 

261 """Generate query exploring categories of found candidates.""" 

262 if not found_candidates: 

263 return f"categories of {base_query}" 

264 

265 sample_names = [c.name for c in found_candidates[:3]] 

266 return f"similar to {', '.join(sample_names)}" 

267 

268 def _related_terms_query( 

269 self, base_query: str, found_candidates: List[Candidate] 

270 ) -> Optional[str]: 

271 """Generate query using related terms.""" 

272 prompt = f""" 

273Given the search topic "{base_query}", suggest a related search term that would find similar but different examples. 

274 

275Related search term: 

276""" 

277 

278 try: 

279 response = self.model.invoke(prompt).content.strip() 

280 return response if response != base_query else None 

281 except: 

282 return None 

283 

284 def _constraint_focused_query( 

285 self, base_query: str, constraints: List[Constraint] 

286 ) -> Optional[str]: 

287 """Generate query focused on a specific constraint.""" 

288 if not constraints: 

289 return None 

290 

291 # Pick least explored constraint 

292 constraint = constraints[0] # Simple selection 

293 return f"{base_query} {constraint.value}" 

294 

295 def _update_strategy_stats( 

296 self, strategy: str, candidates: List[Candidate] 

297 ): 

298 """Update performance statistics for a strategy.""" 

299 self.strategy_stats[strategy]["attempts"] += 1 

300 self.strategy_stats[strategy]["candidates_found"] += len(candidates) 

301 

302 # Simple quality assessment (could be more sophisticated) 

303 quality = len(candidates) * 0.1 # Basic quality based on quantity 

304 self.strategy_stats[strategy]["quality_sum"] += quality 

305 

306 def _adapt_strategy(self): 

307 """Adapt current strategy based on performance.""" 

308 best_strategies = self._get_top_strategies(1) 

309 if best_strategies and best_strategies[0] != self.current_strategy: 

310 old_strategy = self.current_strategy 

311 self.current_strategy = best_strategies[0] 

312 logger.info( 

313 f"Adapted strategy from '{old_strategy}' to '{self.current_strategy}'" 

314 ) 

315 

316 def _try_next_strategy(self) -> bool: 

317 """Try the next available strategy.""" 

318 current_index = ( 

319 self.initial_strategies.index(self.current_strategy) 

320 if self.current_strategy in self.initial_strategies 

321 else 0 

322 ) 

323 next_index = (current_index + 1) % len(self.initial_strategies) 

324 

325 if next_index == 0: # We've tried all strategies 

326 return False 

327 

328 self.current_strategy = self.initial_strategies[next_index] 

329 return True