Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / diversity_explorer.py: 92%

185 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Diversity-focused candidate explorer implementation. 

3 

4This explorer prioritizes finding diverse candidates across different 

5categories, types, and characteristics. 

6""" 

7 

8import time 

9from collections import defaultdict 

10from typing import List, Optional 

11 

12from loguru import logger 

13 

14from ..candidates.base_candidate import Candidate 

15from ..constraints.base_constraint import Constraint 

16from .base_explorer import ( 

17 BaseCandidateExplorer, 

18 ExplorationResult, 

19 ExplorationStrategy, 

20) 

21 

22 

23class DiversityExplorer(BaseCandidateExplorer): 

24 """ 

25 Diversity-focused candidate explorer. 

26 

27 This explorer: 

28 1. Seeks candidates from different categories/types 

29 2. Avoids clustering around similar candidates 

30 3. Uses diversity metrics to guide exploration 

31 4. Balances breadth over depth 

32 """ 

33 

34 def __init__( 

35 self, 

36 *args, 

37 diversity_threshold: float = 0.7, # Minimum diversity score 

38 category_limit: int = 10, # Max candidates per category 

39 similarity_threshold: float = 0.8, # Similarity threshold for deduplication 

40 **kwargs, 

41 ): 

42 """ 

43 Initialize diversity explorer. 

44 

45 Args: 

46 diversity_threshold: Minimum diversity score to maintain 

47 category_limit: Maximum candidates per category 

48 similarity_threshold: Threshold for considering candidates similar 

49 """ 

50 super().__init__(*args, **kwargs) 

51 self.diversity_threshold = diversity_threshold 

52 self.category_limit = category_limit 

53 self.similarity_threshold = similarity_threshold 

54 

55 # Track diversity 

56 self.category_counts = defaultdict(int) 

57 self.diversity_categories = set() 

58 

59 def explore( 

60 self, 

61 initial_query: str, 

62 constraints: Optional[List[Constraint]] = None, 

63 entity_type: Optional[str] = None, 

64 ) -> ExplorationResult: 

65 """Explore candidates using diversity-focused strategy.""" 

66 start_time = time.time() 

67 logger.info( 

68 f"Starting diversity-focused exploration for: {initial_query}" 

69 ) 

70 

71 all_candidates = [] 

72 exploration_paths = [] 

73 total_searched = 0 

74 

75 # Initial broad search 

76 initial_results = self._execute_search(initial_query) 

77 initial_candidates = self._extract_candidates_from_results( 

78 initial_results, entity_type 

79 ) 

80 all_candidates.extend(initial_candidates) 

81 total_searched += 1 

82 exploration_paths.append( 

83 f"Initial search: {initial_query} -> {len(initial_candidates)} candidates" 

84 ) 

85 

86 # Categorize initial candidates 

87 self._categorize_candidates(initial_candidates) 

88 

89 # Generate diverse exploration paths 

90 while self._should_continue_exploration( 

91 start_time, len(all_candidates) 

92 ): 

93 # Calculate current diversity 

94 diversity_score = self._calculate_diversity_score(all_candidates) 

95 

96 if ( 

97 diversity_score >= self.diversity_threshold 

98 and len(all_candidates) >= 10 

99 ): 

100 logger.info(f"Diversity threshold met ({diversity_score:.2f})") 

101 break 

102 

103 # Find underrepresented categories 

104 underrepresented_categories = ( 

105 self._find_underrepresented_categories() 

106 ) 

107 

108 if not underrepresented_categories: 

109 # Generate new category exploration 

110 new_queries = self._generate_diversity_queries( 

111 initial_query, all_candidates, entity_type 

112 ) 

113 else: 

114 # Focus on underrepresented categories 

115 new_queries = self._generate_category_queries( 

116 underrepresented_categories, initial_query, entity_type 

117 ) 

118 

119 if not new_queries: 

120 break 

121 

122 # Execute diverse searches 

123 for query in new_queries[:3]: # Limit concurrent searches 

124 if query.lower() in self.explored_queries: 

125 continue 

126 

127 results = self._execute_search(query) 

128 candidates = self._extract_candidates_from_results( 

129 results, entity_type 

130 ) 

131 

132 # Filter for diversity 

133 diverse_candidates = self._filter_for_diversity( 

134 candidates, all_candidates 

135 ) 

136 

137 all_candidates.extend(diverse_candidates) 

138 total_searched += 1 

139 

140 # Update categories 

141 self._categorize_candidates(diverse_candidates) 

142 

143 exploration_paths.append( 

144 f"Diversity search: {query} -> {len(diverse_candidates)} diverse candidates" 

145 ) 

146 

147 if not self._should_continue_exploration( 

148 start_time, len(all_candidates) 

149 ): 

150 break 

151 

152 # Final diversity filtering and ranking 

153 diverse_candidates = self._final_diversity_selection(all_candidates) 

154 ranked_candidates = self._rank_by_diversity( 

155 diverse_candidates, initial_query 

156 ) 

157 final_candidates = ranked_candidates[: self.max_candidates] 

158 

159 elapsed_time = time.time() - start_time 

160 final_diversity = self._calculate_diversity_score(final_candidates) 

161 

162 logger.info( 

163 f"Diversity exploration completed: {len(final_candidates)} candidates, diversity: {final_diversity:.2f}" 

164 ) 

165 

166 return ExplorationResult( 

167 candidates=final_candidates, 

168 total_searched=total_searched, 

169 unique_candidates=len(diverse_candidates), 

170 exploration_paths=exploration_paths, 

171 metadata={ 

172 "strategy": "diversity_focused", 

173 "final_diversity_score": final_diversity, 

174 "categories_found": len(self.diversity_categories), 

175 "category_distribution": dict(self.category_counts), 

176 "entity_type": entity_type, 

177 }, 

178 elapsed_time=elapsed_time, 

179 strategy_used=ExplorationStrategy.DIVERSITY_FOCUSED, 

180 ) 

181 

182 def generate_exploration_queries( 

183 self, 

184 base_query: str, 

185 found_candidates: List[Candidate], 

186 constraints: Optional[List[Constraint]] = None, 

187 ) -> List[str]: 

188 """Generate diversity-focused exploration queries.""" 

189 return self._generate_diversity_queries(base_query, found_candidates) 

190 

191 def _categorize_candidates(self, candidates: List[Candidate]): 

192 """Categorize candidates for diversity tracking.""" 

193 for candidate in candidates: 

194 category = self._determine_category(candidate) 

195 self.category_counts[category] += 1 

196 self.diversity_categories.add(category) 

197 

198 # Store category in candidate metadata 

199 if not candidate.metadata: 199 ↛ 201line 199 didn't jump to line 201 because the condition on line 199 was always true

200 candidate.metadata = {} 

201 candidate.metadata["diversity_category"] = category 

202 

203 def _determine_category(self, candidate: Candidate) -> str: 

204 """Determine the category of a candidate.""" 

205 name = candidate.name.lower() 

206 

207 # Simple categorization based on common patterns 

208 if any(word in name for word in ["mountain", "peak", "summit", "hill"]): 

209 return "mountain" 

210 if any( 

211 word in name 

212 for word in ["lake", "river", "creek", "stream", "pond"] 

213 ): 

214 return "water" 

215 if any( 

216 word in name for word in ["park", "forest", "reserve", "wilderness"] 

217 ): 

218 return "park" 

219 if any(word in name for word in ["trail", "path", "route", "way"]): 

220 return "trail" 

221 if any(word in name for word in ["canyon", "gorge", "valley", "gap"]): 

222 return "canyon" 

223 if any( 

224 word in name for word in ["cliff", "bluff", "overlook", "viewpoint"] 

225 ): 

226 return "viewpoint" 

227 if any(word in name for word in ["island", "beach", "coast", "shore"]): 

228 return "coastal" 

229 if any(word in name for word in ["city", "town", "county", "state"]): 

230 return "place" 

231 return "other" 

232 

233 def _calculate_diversity_score(self, candidates: List[Candidate]) -> float: 

234 """Calculate diversity score for a set of candidates.""" 

235 if not candidates: 

236 return 0.0 

237 

238 # Count categories 

239 category_counts = defaultdict(int) 

240 for candidate in candidates: 

241 category = candidate.metadata.get("diversity_category", "other") 

242 category_counts[category] += 1 

243 

244 # Calculate diversity using Shannon entropy 

245 total = len(candidates) 

246 entropy = 0.0 

247 

248 for count in category_counts.values(): 248 ↛ 254line 248 didn't jump to line 254 because the loop on line 248 didn't complete

249 if count > 0: 249 ↛ 248line 249 didn't jump to line 248 because the condition on line 249 was always true

250 p = count / total 

251 entropy -= p * (p.bit_length() - 1) if p > 0 else 0 

252 

253 # Normalize to 0-1 scale 

254 max_entropy = ( 

255 (len(category_counts).bit_length() - 1) 

256 if len(category_counts) > 1 

257 else 1 

258 ) 

259 return entropy / max_entropy if max_entropy > 0 else 0.0 

260 

261 def _find_underrepresented_categories(self) -> List[str]: 

262 """Find categories that are underrepresented.""" 

263 if not self.category_counts: 

264 return [] 

265 

266 avg_count = sum(self.category_counts.values()) / len( 

267 self.category_counts 

268 ) 

269 threshold = avg_count * 0.5 # Categories with less than 50% of average 

270 

271 return [ 

272 category 

273 for category, count in self.category_counts.items() 

274 if count < threshold and count < self.category_limit 

275 ] 

276 

277 def _generate_diversity_queries( 

278 self, 

279 base_query: str, 

280 found_candidates: List[Candidate], 

281 entity_type: Optional[str] = None, 

282 ) -> List[str]: 

283 """Generate queries to increase diversity.""" 

284 queries = [] 

285 

286 # Analyze existing categories 

287 existing_categories = set() 

288 for candidate in found_candidates: 

289 if ( 

290 candidate.metadata 

291 and "diversity_category" in candidate.metadata 

292 ): 

293 existing_categories.add( 

294 candidate.metadata["diversity_category"] 

295 ) 

296 

297 # Generate queries for missing categories 

298 all_categories = [ 

299 "mountain", 

300 "water", 

301 "park", 

302 "trail", 

303 "canyon", 

304 "viewpoint", 

305 "coastal", 

306 "place", 

307 ] 

308 missing_categories = [ 

309 cat for cat in all_categories if cat not in existing_categories 

310 ] 

311 

312 base = entity_type or base_query 

313 

314 for category in missing_categories[:3]: # Limit to 3 new categories 

315 if category == "mountain": 

316 queries.append(f"{base} mountain peak summit") 

317 elif category == "water": 

318 queries.append(f"{base} lake river creek") 

319 elif category == "park": 

320 queries.append(f"{base} park forest reserve") 

321 elif category == "trail": 321 ↛ 323line 321 didn't jump to line 323 because the condition on line 321 was always true

322 queries.append(f"{base} trail path route") 

323 elif category == "canyon": 

324 queries.append(f"{base} canyon gorge valley") 

325 elif category == "viewpoint": 

326 queries.append(f"{base} overlook viewpoint cliff") 

327 elif category == "coastal": 

328 queries.append(f"{base} beach coast island") 

329 elif category == "place": 

330 queries.append(f"{base} location place area") 

331 

332 return queries 

333 

334 def _generate_category_queries( 

335 self, categories: List[str], base_query: str, entity_type: Optional[str] 

336 ) -> List[str]: 

337 """Generate queries for specific underrepresented categories.""" 

338 queries = [] 

339 base = entity_type or base_query 

340 

341 for category in categories[:3]: 

342 queries.append(f"{base} {category}") 

343 queries.append(f"{category} examples {base}") 

344 

345 return queries 

346 

347 def _filter_for_diversity( 

348 self, 

349 new_candidates: List[Candidate], 

350 existing_candidates: List[Candidate], 

351 ) -> List[Candidate]: 

352 """Filter new candidates to maintain diversity.""" 

353 filtered = [] 

354 

355 for candidate in new_candidates: 

356 category = self._determine_category(candidate) 

357 

358 # Check if this category is already well-represented 

359 if self.category_counts[category] >= self.category_limit: 

360 continue 

361 

362 # Check for similarity with existing candidates 

363 if not self._is_sufficiently_different( 

364 candidate, existing_candidates 

365 ): 

366 continue 

367 

368 filtered.append(candidate) 

369 

370 return filtered 

371 

372 def _is_sufficiently_different( 

373 self, candidate: Candidate, existing_candidates: List[Candidate] 

374 ) -> bool: 

375 """Check if candidate is sufficiently different from existing ones.""" 

376 candidate_words = set(candidate.name.lower().split()) 

377 

378 for existing in existing_candidates[ 

379 -10: 

380 ]: # Check against recent candidates 

381 existing_words = set(existing.name.lower().split()) 

382 

383 # Calculate Jaccard similarity 

384 intersection = len(candidate_words.intersection(existing_words)) 

385 union = len(candidate_words.union(existing_words)) 

386 

387 if union > 0: 387 ↛ 378line 387 didn't jump to line 378 because the condition on line 387 was always true

388 similarity = intersection / union 

389 if similarity > self.similarity_threshold: 

390 return False 

391 

392 return True 

393 

394 def _final_diversity_selection( 

395 self, candidates: List[Candidate] 

396 ) -> List[Candidate]: 

397 """Final selection to maximize diversity.""" 

398 if not candidates: 

399 return candidates 

400 

401 # Group by category 

402 category_groups = defaultdict(list) 

403 for candidate in candidates: 

404 category = candidate.metadata.get("diversity_category", "other") 

405 category_groups[category].append(candidate) 

406 

407 # Select balanced representation from each category 

408 selected = [] 

409 max_per_category = max(1, self.max_candidates // len(category_groups)) 

410 

411 for category, group in category_groups.items(): 

412 # Sort by relevance score if available 

413 sorted_group = sorted( 

414 group, 

415 key=lambda c: getattr(c, "relevance_score", 0.0), 

416 reverse=True, 

417 ) 

418 selected.extend(sorted_group[:max_per_category]) 

419 

420 return selected 

421 

422 def _rank_by_diversity( 

423 self, candidates: List[Candidate], base_query: str 

424 ) -> List[Candidate]: 

425 """Rank candidates considering both relevance and diversity contribution.""" 

426 # First rank by relevance 

427 relevance_ranked = self._rank_candidates_by_relevance( 

428 candidates, base_query 

429 ) 

430 

431 # Then adjust based on diversity contribution 

432 for i, candidate in enumerate(relevance_ranked): 

433 category = candidate.metadata.get("diversity_category", "other") 

434 

435 # Boost score for underrepresented categories 

436 category_count = self.category_counts[category] 

437 avg_count = ( 

438 sum(self.category_counts.values()) / len(self.category_counts) 

439 if self.category_counts 

440 else 1 

441 ) 

442 

443 diversity_boost = max(0, (avg_count - category_count) / avg_count) 

444 

445 relevance_score = getattr(candidate, "relevance_score", 0.0) 

446 candidate.final_score = relevance_score + (diversity_boost * 0.2) 

447 

448 return sorted( 

449 relevance_ranked, 

450 key=lambda c: getattr(c, "final_score", 0.0), 

451 reverse=True, 

452 )