Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / diversity_explorer.py: 8%

186 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Diversity-focused candidate explorer implementation. 

3 

4This explorer prioritizes finding diverse candidates across different 

5categories, types, and characteristics. 

6""" 

7 

8import time 

9from collections import defaultdict 

10from typing import List, Optional 

11 

12from loguru import logger 

13 

14from ..candidates.base_candidate import Candidate 

15from ..constraints.base_constraint import Constraint 

16from .base_explorer import ( 

17 BaseCandidateExplorer, 

18 ExplorationResult, 

19 ExplorationStrategy, 

20) 

21 

22 

23class DiversityExplorer(BaseCandidateExplorer): 

24 """ 

25 Diversity-focused candidate explorer. 

26 

27 This explorer: 

28 1. Seeks candidates from different categories/types 

29 2. Avoids clustering around similar candidates 

30 3. Uses diversity metrics to guide exploration 

31 4. Balances breadth over depth 

32 """ 

33 

34 def __init__( 

35 self, 

36 *args, 

37 diversity_threshold: float = 0.7, # Minimum diversity score 

38 category_limit: int = 10, # Max candidates per category 

39 similarity_threshold: float = 0.8, # Similarity threshold for deduplication 

40 **kwargs, 

41 ): 

42 """ 

43 Initialize diversity explorer. 

44 

45 Args: 

46 diversity_threshold: Minimum diversity score to maintain 

47 category_limit: Maximum candidates per category 

48 similarity_threshold: Threshold for considering candidates similar 

49 """ 

50 super().__init__(*args, **kwargs) 

51 self.diversity_threshold = diversity_threshold 

52 self.category_limit = category_limit 

53 self.similarity_threshold = similarity_threshold 

54 

55 # Track diversity 

56 self.category_counts = defaultdict(int) 

57 self.diversity_categories = set() 

58 

59 def explore( 

60 self, 

61 initial_query: str, 

62 constraints: Optional[List[Constraint]] = None, 

63 entity_type: Optional[str] = None, 

64 ) -> ExplorationResult: 

65 """Explore candidates using diversity-focused strategy.""" 

66 start_time = time.time() 

67 logger.info( 

68 f"Starting diversity-focused exploration for: {initial_query}" 

69 ) 

70 

71 all_candidates = [] 

72 exploration_paths = [] 

73 total_searched = 0 

74 

75 # Initial broad search 

76 initial_results = self._execute_search(initial_query) 

77 initial_candidates = self._extract_candidates_from_results( 

78 initial_results, entity_type 

79 ) 

80 all_candidates.extend(initial_candidates) 

81 total_searched += 1 

82 exploration_paths.append( 

83 f"Initial search: {initial_query} -> {len(initial_candidates)} candidates" 

84 ) 

85 

86 # Categorize initial candidates 

87 self._categorize_candidates(initial_candidates) 

88 

89 # Generate diverse exploration paths 

90 while self._should_continue_exploration( 

91 start_time, len(all_candidates) 

92 ): 

93 # Calculate current diversity 

94 diversity_score = self._calculate_diversity_score(all_candidates) 

95 

96 if ( 

97 diversity_score >= self.diversity_threshold 

98 and len(all_candidates) >= 10 

99 ): 

100 logger.info(f"Diversity threshold met ({diversity_score:.2f})") 

101 break 

102 

103 # Find underrepresented categories 

104 underrepresented_categories = ( 

105 self._find_underrepresented_categories() 

106 ) 

107 

108 if not underrepresented_categories: 

109 # Generate new category exploration 

110 new_queries = self._generate_diversity_queries( 

111 initial_query, all_candidates, entity_type 

112 ) 

113 else: 

114 # Focus on underrepresented categories 

115 new_queries = self._generate_category_queries( 

116 underrepresented_categories, initial_query, entity_type 

117 ) 

118 

119 if not new_queries: 

120 break 

121 

122 # Execute diverse searches 

123 for query in new_queries[:3]: # Limit concurrent searches 

124 if query.lower() in self.explored_queries: 

125 continue 

126 

127 results = self._execute_search(query) 

128 candidates = self._extract_candidates_from_results( 

129 results, entity_type 

130 ) 

131 

132 # Filter for diversity 

133 diverse_candidates = self._filter_for_diversity( 

134 candidates, all_candidates 

135 ) 

136 

137 all_candidates.extend(diverse_candidates) 

138 total_searched += 1 

139 

140 # Update categories 

141 self._categorize_candidates(diverse_candidates) 

142 

143 exploration_paths.append( 

144 f"Diversity search: {query} -> {len(diverse_candidates)} diverse candidates" 

145 ) 

146 

147 if not self._should_continue_exploration( 

148 start_time, len(all_candidates) 

149 ): 

150 break 

151 

152 # Final diversity filtering and ranking 

153 diverse_candidates = self._final_diversity_selection(all_candidates) 

154 ranked_candidates = self._rank_by_diversity( 

155 diverse_candidates, initial_query 

156 ) 

157 final_candidates = ranked_candidates[: self.max_candidates] 

158 

159 elapsed_time = time.time() - start_time 

160 final_diversity = self._calculate_diversity_score(final_candidates) 

161 

162 logger.info( 

163 f"Diversity exploration completed: {len(final_candidates)} candidates, diversity: {final_diversity:.2f}" 

164 ) 

165 

166 return ExplorationResult( 

167 candidates=final_candidates, 

168 total_searched=total_searched, 

169 unique_candidates=len(diverse_candidates), 

170 exploration_paths=exploration_paths, 

171 metadata={ 

172 "strategy": "diversity_focused", 

173 "final_diversity_score": final_diversity, 

174 "categories_found": len(self.diversity_categories), 

175 "category_distribution": dict(self.category_counts), 

176 "entity_type": entity_type, 

177 }, 

178 elapsed_time=elapsed_time, 

179 strategy_used=ExplorationStrategy.DIVERSITY_FOCUSED, 

180 ) 

181 

182 def generate_exploration_queries( 

183 self, 

184 base_query: str, 

185 found_candidates: List[Candidate], 

186 constraints: Optional[List[Constraint]] = None, 

187 ) -> List[str]: 

188 """Generate diversity-focused exploration queries.""" 

189 return self._generate_diversity_queries(base_query, found_candidates) 

190 

191 def _categorize_candidates(self, candidates: List[Candidate]): 

192 """Categorize candidates for diversity tracking.""" 

193 for candidate in candidates: 

194 category = self._determine_category(candidate) 

195 self.category_counts[category] += 1 

196 self.diversity_categories.add(category) 

197 

198 # Store category in candidate metadata 

199 if not candidate.metadata: 

200 candidate.metadata = {} 

201 candidate.metadata["diversity_category"] = category 

202 

203 def _determine_category(self, candidate: Candidate) -> str: 

204 """Determine the category of a candidate.""" 

205 name = candidate.name.lower() 

206 

207 # Simple categorization based on common patterns 

208 if any(word in name for word in ["mountain", "peak", "summit", "hill"]): 

209 return "mountain" 

210 elif any( 

211 word in name 

212 for word in ["lake", "river", "creek", "stream", "pond"] 

213 ): 

214 return "water" 

215 elif any( 

216 word in name for word in ["park", "forest", "reserve", "wilderness"] 

217 ): 

218 return "park" 

219 elif any(word in name for word in ["trail", "path", "route", "way"]): 

220 return "trail" 

221 elif any(word in name for word in ["canyon", "gorge", "valley", "gap"]): 

222 return "canyon" 

223 elif any( 

224 word in name for word in ["cliff", "bluff", "overlook", "viewpoint"] 

225 ): 

226 return "viewpoint" 

227 elif any( 

228 word in name for word in ["island", "beach", "coast", "shore"] 

229 ): 

230 return "coastal" 

231 elif any(word in name for word in ["city", "town", "county", "state"]): 

232 return "place" 

233 else: 

234 return "other" 

235 

236 def _calculate_diversity_score(self, candidates: List[Candidate]) -> float: 

237 """Calculate diversity score for a set of candidates.""" 

238 if not candidates: 

239 return 0.0 

240 

241 # Count categories 

242 category_counts = defaultdict(int) 

243 for candidate in candidates: 

244 category = candidate.metadata.get("diversity_category", "other") 

245 category_counts[category] += 1 

246 

247 # Calculate diversity using Shannon entropy 

248 total = len(candidates) 

249 entropy = 0.0 

250 

251 for count in category_counts.values(): 

252 if count > 0: 

253 p = count / total 

254 entropy -= p * (p.bit_length() - 1) if p > 0 else 0 

255 

256 # Normalize to 0-1 scale 

257 max_entropy = ( 

258 (len(category_counts).bit_length() - 1) 

259 if len(category_counts) > 1 

260 else 1 

261 ) 

262 return entropy / max_entropy if max_entropy > 0 else 0.0 

263 

264 def _find_underrepresented_categories(self) -> List[str]: 

265 """Find categories that are underrepresented.""" 

266 if not self.category_counts: 

267 return [] 

268 

269 avg_count = sum(self.category_counts.values()) / len( 

270 self.category_counts 

271 ) 

272 threshold = avg_count * 0.5 # Categories with less than 50% of average 

273 

274 underrepresented = [ 

275 category 

276 for category, count in self.category_counts.items() 

277 if count < threshold and count < self.category_limit 

278 ] 

279 

280 return underrepresented 

281 

282 def _generate_diversity_queries( 

283 self, 

284 base_query: str, 

285 found_candidates: List[Candidate], 

286 entity_type: Optional[str] = None, 

287 ) -> List[str]: 

288 """Generate queries to increase diversity.""" 

289 queries = [] 

290 

291 # Analyze existing categories 

292 existing_categories = set() 

293 for candidate in found_candidates: 

294 if ( 

295 candidate.metadata 

296 and "diversity_category" in candidate.metadata 

297 ): 

298 existing_categories.add( 

299 candidate.metadata["diversity_category"] 

300 ) 

301 

302 # Generate queries for missing categories 

303 all_categories = [ 

304 "mountain", 

305 "water", 

306 "park", 

307 "trail", 

308 "canyon", 

309 "viewpoint", 

310 "coastal", 

311 "place", 

312 ] 

313 missing_categories = [ 

314 cat for cat in all_categories if cat not in existing_categories 

315 ] 

316 

317 base = entity_type or base_query 

318 

319 for category in missing_categories[:3]: # Limit to 3 new categories 

320 if category == "mountain": 

321 queries.append(f"{base} mountain peak summit") 

322 elif category == "water": 

323 queries.append(f"{base} lake river creek") 

324 elif category == "park": 

325 queries.append(f"{base} park forest reserve") 

326 elif category == "trail": 

327 queries.append(f"{base} trail path route") 

328 elif category == "canyon": 

329 queries.append(f"{base} canyon gorge valley") 

330 elif category == "viewpoint": 

331 queries.append(f"{base} overlook viewpoint cliff") 

332 elif category == "coastal": 

333 queries.append(f"{base} beach coast island") 

334 elif category == "place": 

335 queries.append(f"{base} location place area") 

336 

337 return queries 

338 

339 def _generate_category_queries( 

340 self, categories: List[str], base_query: str, entity_type: Optional[str] 

341 ) -> List[str]: 

342 """Generate queries for specific underrepresented categories.""" 

343 queries = [] 

344 base = entity_type or base_query 

345 

346 for category in categories[:3]: 

347 queries.append(f"{base} {category}") 

348 queries.append(f"{category} examples {base}") 

349 

350 return queries 

351 

352 def _filter_for_diversity( 

353 self, 

354 new_candidates: List[Candidate], 

355 existing_candidates: List[Candidate], 

356 ) -> List[Candidate]: 

357 """Filter new candidates to maintain diversity.""" 

358 filtered = [] 

359 

360 for candidate in new_candidates: 

361 category = self._determine_category(candidate) 

362 

363 # Check if this category is already well-represented 

364 if self.category_counts[category] >= self.category_limit: 

365 continue 

366 

367 # Check for similarity with existing candidates 

368 if not self._is_sufficiently_different( 

369 candidate, existing_candidates 

370 ): 

371 continue 

372 

373 filtered.append(candidate) 

374 

375 return filtered 

376 

377 def _is_sufficiently_different( 

378 self, candidate: Candidate, existing_candidates: List[Candidate] 

379 ) -> bool: 

380 """Check if candidate is sufficiently different from existing ones.""" 

381 candidate_words = set(candidate.name.lower().split()) 

382 

383 for existing in existing_candidates[ 

384 -10: 

385 ]: # Check against recent candidates 

386 existing_words = set(existing.name.lower().split()) 

387 

388 # Calculate Jaccard similarity 

389 intersection = len(candidate_words.intersection(existing_words)) 

390 union = len(candidate_words.union(existing_words)) 

391 

392 if union > 0: 

393 similarity = intersection / union 

394 if similarity > self.similarity_threshold: 

395 return False 

396 

397 return True 

398 

399 def _final_diversity_selection( 

400 self, candidates: List[Candidate] 

401 ) -> List[Candidate]: 

402 """Final selection to maximize diversity.""" 

403 if not candidates: 

404 return candidates 

405 

406 # Group by category 

407 category_groups = defaultdict(list) 

408 for candidate in candidates: 

409 category = candidate.metadata.get("diversity_category", "other") 

410 category_groups[category].append(candidate) 

411 

412 # Select balanced representation from each category 

413 selected = [] 

414 max_per_category = max(1, self.max_candidates // len(category_groups)) 

415 

416 for category, group in category_groups.items(): 

417 # Sort by relevance score if available 

418 sorted_group = sorted( 

419 group, 

420 key=lambda c: getattr(c, "relevance_score", 0.0), 

421 reverse=True, 

422 ) 

423 selected.extend(sorted_group[:max_per_category]) 

424 

425 return selected 

426 

427 def _rank_by_diversity( 

428 self, candidates: List[Candidate], base_query: str 

429 ) -> List[Candidate]: 

430 """Rank candidates considering both relevance and diversity contribution.""" 

431 # First rank by relevance 

432 relevance_ranked = self._rank_candidates_by_relevance( 

433 candidates, base_query 

434 ) 

435 

436 # Then adjust based on diversity contribution 

437 for i, candidate in enumerate(relevance_ranked): 

438 category = candidate.metadata.get("diversity_category", "other") 

439 

440 # Boost score for underrepresented categories 

441 category_count = self.category_counts[category] 

442 avg_count = ( 

443 sum(self.category_counts.values()) / len(self.category_counts) 

444 if self.category_counts 

445 else 1 

446 ) 

447 

448 diversity_boost = max(0, (avg_count - category_count) / avg_count) 

449 

450 relevance_score = getattr(candidate, "relevance_score", 0.0) 

451 candidate.final_score = relevance_score + (diversity_boost * 0.2) 

452 

453 return sorted( 

454 relevance_ranked, 

455 key=lambda c: getattr(c, "final_score", 0.0), 

456 reverse=True, 

457 )