Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / diversity_explorer.py: 8%
186 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Diversity-focused candidate explorer implementation.
4This explorer prioritizes finding diverse candidates across different
5categories, types, and characteristics.
6"""
8import time
9from collections import defaultdict
10from typing import List, Optional
12from loguru import logger
14from ..candidates.base_candidate import Candidate
15from ..constraints.base_constraint import Constraint
16from .base_explorer import (
17 BaseCandidateExplorer,
18 ExplorationResult,
19 ExplorationStrategy,
20)
23class DiversityExplorer(BaseCandidateExplorer):
24 """
25 Diversity-focused candidate explorer.
27 This explorer:
28 1. Seeks candidates from different categories/types
29 2. Avoids clustering around similar candidates
30 3. Uses diversity metrics to guide exploration
31 4. Balances breadth over depth
32 """
34 def __init__(
35 self,
36 *args,
37 diversity_threshold: float = 0.7, # Minimum diversity score
38 category_limit: int = 10, # Max candidates per category
39 similarity_threshold: float = 0.8, # Similarity threshold for deduplication
40 **kwargs,
41 ):
42 """
43 Initialize diversity explorer.
45 Args:
46 diversity_threshold: Minimum diversity score to maintain
47 category_limit: Maximum candidates per category
48 similarity_threshold: Threshold for considering candidates similar
49 """
50 super().__init__(*args, **kwargs)
51 self.diversity_threshold = diversity_threshold
52 self.category_limit = category_limit
53 self.similarity_threshold = similarity_threshold
55 # Track diversity
56 self.category_counts = defaultdict(int)
57 self.diversity_categories = set()
59 def explore(
60 self,
61 initial_query: str,
62 constraints: Optional[List[Constraint]] = None,
63 entity_type: Optional[str] = None,
64 ) -> ExplorationResult:
65 """Explore candidates using diversity-focused strategy."""
66 start_time = time.time()
67 logger.info(
68 f"Starting diversity-focused exploration for: {initial_query}"
69 )
71 all_candidates = []
72 exploration_paths = []
73 total_searched = 0
75 # Initial broad search
76 initial_results = self._execute_search(initial_query)
77 initial_candidates = self._extract_candidates_from_results(
78 initial_results, entity_type
79 )
80 all_candidates.extend(initial_candidates)
81 total_searched += 1
82 exploration_paths.append(
83 f"Initial search: {initial_query} -> {len(initial_candidates)} candidates"
84 )
86 # Categorize initial candidates
87 self._categorize_candidates(initial_candidates)
89 # Generate diverse exploration paths
90 while self._should_continue_exploration(
91 start_time, len(all_candidates)
92 ):
93 # Calculate current diversity
94 diversity_score = self._calculate_diversity_score(all_candidates)
96 if (
97 diversity_score >= self.diversity_threshold
98 and len(all_candidates) >= 10
99 ):
100 logger.info(f"Diversity threshold met ({diversity_score:.2f})")
101 break
103 # Find underrepresented categories
104 underrepresented_categories = (
105 self._find_underrepresented_categories()
106 )
108 if not underrepresented_categories:
109 # Generate new category exploration
110 new_queries = self._generate_diversity_queries(
111 initial_query, all_candidates, entity_type
112 )
113 else:
114 # Focus on underrepresented categories
115 new_queries = self._generate_category_queries(
116 underrepresented_categories, initial_query, entity_type
117 )
119 if not new_queries:
120 break
122 # Execute diverse searches
123 for query in new_queries[:3]: # Limit concurrent searches
124 if query.lower() in self.explored_queries:
125 continue
127 results = self._execute_search(query)
128 candidates = self._extract_candidates_from_results(
129 results, entity_type
130 )
132 # Filter for diversity
133 diverse_candidates = self._filter_for_diversity(
134 candidates, all_candidates
135 )
137 all_candidates.extend(diverse_candidates)
138 total_searched += 1
140 # Update categories
141 self._categorize_candidates(diverse_candidates)
143 exploration_paths.append(
144 f"Diversity search: {query} -> {len(diverse_candidates)} diverse candidates"
145 )
147 if not self._should_continue_exploration(
148 start_time, len(all_candidates)
149 ):
150 break
152 # Final diversity filtering and ranking
153 diverse_candidates = self._final_diversity_selection(all_candidates)
154 ranked_candidates = self._rank_by_diversity(
155 diverse_candidates, initial_query
156 )
157 final_candidates = ranked_candidates[: self.max_candidates]
159 elapsed_time = time.time() - start_time
160 final_diversity = self._calculate_diversity_score(final_candidates)
162 logger.info(
163 f"Diversity exploration completed: {len(final_candidates)} candidates, diversity: {final_diversity:.2f}"
164 )
166 return ExplorationResult(
167 candidates=final_candidates,
168 total_searched=total_searched,
169 unique_candidates=len(diverse_candidates),
170 exploration_paths=exploration_paths,
171 metadata={
172 "strategy": "diversity_focused",
173 "final_diversity_score": final_diversity,
174 "categories_found": len(self.diversity_categories),
175 "category_distribution": dict(self.category_counts),
176 "entity_type": entity_type,
177 },
178 elapsed_time=elapsed_time,
179 strategy_used=ExplorationStrategy.DIVERSITY_FOCUSED,
180 )
182 def generate_exploration_queries(
183 self,
184 base_query: str,
185 found_candidates: List[Candidate],
186 constraints: Optional[List[Constraint]] = None,
187 ) -> List[str]:
188 """Generate diversity-focused exploration queries."""
189 return self._generate_diversity_queries(base_query, found_candidates)
191 def _categorize_candidates(self, candidates: List[Candidate]):
192 """Categorize candidates for diversity tracking."""
193 for candidate in candidates:
194 category = self._determine_category(candidate)
195 self.category_counts[category] += 1
196 self.diversity_categories.add(category)
198 # Store category in candidate metadata
199 if not candidate.metadata:
200 candidate.metadata = {}
201 candidate.metadata["diversity_category"] = category
203 def _determine_category(self, candidate: Candidate) -> str:
204 """Determine the category of a candidate."""
205 name = candidate.name.lower()
207 # Simple categorization based on common patterns
208 if any(word in name for word in ["mountain", "peak", "summit", "hill"]):
209 return "mountain"
210 elif any(
211 word in name
212 for word in ["lake", "river", "creek", "stream", "pond"]
213 ):
214 return "water"
215 elif any(
216 word in name for word in ["park", "forest", "reserve", "wilderness"]
217 ):
218 return "park"
219 elif any(word in name for word in ["trail", "path", "route", "way"]):
220 return "trail"
221 elif any(word in name for word in ["canyon", "gorge", "valley", "gap"]):
222 return "canyon"
223 elif any(
224 word in name for word in ["cliff", "bluff", "overlook", "viewpoint"]
225 ):
226 return "viewpoint"
227 elif any(
228 word in name for word in ["island", "beach", "coast", "shore"]
229 ):
230 return "coastal"
231 elif any(word in name for word in ["city", "town", "county", "state"]):
232 return "place"
233 else:
234 return "other"
236 def _calculate_diversity_score(self, candidates: List[Candidate]) -> float:
237 """Calculate diversity score for a set of candidates."""
238 if not candidates:
239 return 0.0
241 # Count categories
242 category_counts = defaultdict(int)
243 for candidate in candidates:
244 category = candidate.metadata.get("diversity_category", "other")
245 category_counts[category] += 1
247 # Calculate diversity using Shannon entropy
248 total = len(candidates)
249 entropy = 0.0
251 for count in category_counts.values():
252 if count > 0:
253 p = count / total
254 entropy -= p * (p.bit_length() - 1) if p > 0 else 0
256 # Normalize to 0-1 scale
257 max_entropy = (
258 (len(category_counts).bit_length() - 1)
259 if len(category_counts) > 1
260 else 1
261 )
262 return entropy / max_entropy if max_entropy > 0 else 0.0
264 def _find_underrepresented_categories(self) -> List[str]:
265 """Find categories that are underrepresented."""
266 if not self.category_counts:
267 return []
269 avg_count = sum(self.category_counts.values()) / len(
270 self.category_counts
271 )
272 threshold = avg_count * 0.5 # Categories with less than 50% of average
274 underrepresented = [
275 category
276 for category, count in self.category_counts.items()
277 if count < threshold and count < self.category_limit
278 ]
280 return underrepresented
282 def _generate_diversity_queries(
283 self,
284 base_query: str,
285 found_candidates: List[Candidate],
286 entity_type: Optional[str] = None,
287 ) -> List[str]:
288 """Generate queries to increase diversity."""
289 queries = []
291 # Analyze existing categories
292 existing_categories = set()
293 for candidate in found_candidates:
294 if (
295 candidate.metadata
296 and "diversity_category" in candidate.metadata
297 ):
298 existing_categories.add(
299 candidate.metadata["diversity_category"]
300 )
302 # Generate queries for missing categories
303 all_categories = [
304 "mountain",
305 "water",
306 "park",
307 "trail",
308 "canyon",
309 "viewpoint",
310 "coastal",
311 "place",
312 ]
313 missing_categories = [
314 cat for cat in all_categories if cat not in existing_categories
315 ]
317 base = entity_type or base_query
319 for category in missing_categories[:3]: # Limit to 3 new categories
320 if category == "mountain":
321 queries.append(f"{base} mountain peak summit")
322 elif category == "water":
323 queries.append(f"{base} lake river creek")
324 elif category == "park":
325 queries.append(f"{base} park forest reserve")
326 elif category == "trail":
327 queries.append(f"{base} trail path route")
328 elif category == "canyon":
329 queries.append(f"{base} canyon gorge valley")
330 elif category == "viewpoint":
331 queries.append(f"{base} overlook viewpoint cliff")
332 elif category == "coastal":
333 queries.append(f"{base} beach coast island")
334 elif category == "place":
335 queries.append(f"{base} location place area")
337 return queries
339 def _generate_category_queries(
340 self, categories: List[str], base_query: str, entity_type: Optional[str]
341 ) -> List[str]:
342 """Generate queries for specific underrepresented categories."""
343 queries = []
344 base = entity_type or base_query
346 for category in categories[:3]:
347 queries.append(f"{base} {category}")
348 queries.append(f"{category} examples {base}")
350 return queries
352 def _filter_for_diversity(
353 self,
354 new_candidates: List[Candidate],
355 existing_candidates: List[Candidate],
356 ) -> List[Candidate]:
357 """Filter new candidates to maintain diversity."""
358 filtered = []
360 for candidate in new_candidates:
361 category = self._determine_category(candidate)
363 # Check if this category is already well-represented
364 if self.category_counts[category] >= self.category_limit:
365 continue
367 # Check for similarity with existing candidates
368 if not self._is_sufficiently_different(
369 candidate, existing_candidates
370 ):
371 continue
373 filtered.append(candidate)
375 return filtered
377 def _is_sufficiently_different(
378 self, candidate: Candidate, existing_candidates: List[Candidate]
379 ) -> bool:
380 """Check if candidate is sufficiently different from existing ones."""
381 candidate_words = set(candidate.name.lower().split())
383 for existing in existing_candidates[
384 -10:
385 ]: # Check against recent candidates
386 existing_words = set(existing.name.lower().split())
388 # Calculate Jaccard similarity
389 intersection = len(candidate_words.intersection(existing_words))
390 union = len(candidate_words.union(existing_words))
392 if union > 0:
393 similarity = intersection / union
394 if similarity > self.similarity_threshold:
395 return False
397 return True
399 def _final_diversity_selection(
400 self, candidates: List[Candidate]
401 ) -> List[Candidate]:
402 """Final selection to maximize diversity."""
403 if not candidates:
404 return candidates
406 # Group by category
407 category_groups = defaultdict(list)
408 for candidate in candidates:
409 category = candidate.metadata.get("diversity_category", "other")
410 category_groups[category].append(candidate)
412 # Select balanced representation from each category
413 selected = []
414 max_per_category = max(1, self.max_candidates // len(category_groups))
416 for category, group in category_groups.items():
417 # Sort by relevance score if available
418 sorted_group = sorted(
419 group,
420 key=lambda c: getattr(c, "relevance_score", 0.0),
421 reverse=True,
422 )
423 selected.extend(sorted_group[:max_per_category])
425 return selected
427 def _rank_by_diversity(
428 self, candidates: List[Candidate], base_query: str
429 ) -> List[Candidate]:
430 """Rank candidates considering both relevance and diversity contribution."""
431 # First rank by relevance
432 relevance_ranked = self._rank_candidates_by_relevance(
433 candidates, base_query
434 )
436 # Then adjust based on diversity contribution
437 for i, candidate in enumerate(relevance_ranked):
438 category = candidate.metadata.get("diversity_category", "other")
440 # Boost score for underrepresented categories
441 category_count = self.category_counts[category]
442 avg_count = (
443 sum(self.category_counts.values()) / len(self.category_counts)
444 if self.category_counts
445 else 1
446 )
448 diversity_boost = max(0, (avg_count - category_count) / avg_count)
450 relevance_score = getattr(candidate, "relevance_score", 0.0)
451 candidate.final_score = relevance_score + (diversity_boost * 0.2)
453 return sorted(
454 relevance_ranked,
455 key=lambda c: getattr(c, "final_score", 0.0),
456 reverse=True,
457 )