Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / progressive_explorer.py: 12%
119 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Progressive explorer for BrowseComp-style systematic search exploration.
3"""
5import concurrent.futures
6from dataclasses import dataclass, field
7from typing import Dict, List, Set, Tuple
9from loguru import logger
11from ...utilities.thread_context import preserve_research_context
14@dataclass
15class SearchProgress:
16 """Track search progress and findings."""
18 searched_terms: Set[str] = field(default_factory=set)
19 found_candidates: Dict[str, float] = field(
20 default_factory=dict
21 ) # name -> confidence
22 verified_facts: Dict[str, str] = field(
23 default_factory=dict
24 ) # fact -> source
25 entity_coverage: Dict[str, Set[str]] = field(
26 default_factory=dict
27 ) # entity_type -> searched_entities
28 search_depth: int = 0
30 def update_coverage(self, entity_type: str, entity: str):
31 """Update entity coverage tracking."""
32 if entity_type not in self.entity_coverage:
33 self.entity_coverage[entity_type] = set()
34 self.entity_coverage[entity_type].add(entity.lower())
36 def get_uncovered_entities(
37 self, entities: Dict[str, List[str]]
38 ) -> Dict[str, List[str]]:
39 """Get entities that haven't been searched yet."""
40 uncovered = {}
41 for entity_type, entity_list in entities.items():
42 covered = self.entity_coverage.get(entity_type, set())
43 uncovered_list = [
44 e for e in entity_list if e.lower() not in covered
45 ]
46 if uncovered_list:
47 uncovered[entity_type] = uncovered_list
48 return uncovered
51class ProgressiveExplorer:
52 """
53 Explorer that implements progressive search strategies for BrowseComp.
55 Key features:
56 1. Tracks search progress to avoid redundancy
57 2. Progressively combines entities
58 3. Identifies and pursues promising candidates
59 4. Maintains simple approach without over-filtering
60 """
62 def __init__(self, search_engine, model):
63 self.search_engine = search_engine
64 self.model = model
65 self.progress = SearchProgress()
66 self.max_results_per_search = 20 # Keep more results
68 def explore(
69 self,
70 queries: List[str],
71 constraints: List = None,
72 max_workers: int = 5,
73 extracted_entities: Dict[str, List[str]] = None,
74 ) -> Tuple[List, SearchProgress]:
75 """
76 Execute progressive exploration with entity tracking.
78 Returns both candidates and search progress for strategy use.
79 """
80 all_results = []
81 extracted_entities = extracted_entities or {}
83 # Execute searches in parallel (like source-based strategy)
84 search_results = self._parallel_search(queries, max_workers)
86 # Process results without filtering (trust the LLM later)
87 for query, results in search_results:
88 self.progress.searched_terms.add(query.lower())
90 # Track which entities were covered in this search
91 self._update_entity_coverage(query, extracted_entities)
93 # Extract any specific names/candidates from results
94 candidates = self._extract_candidates_from_results(results, query)
95 for candidate_name, confidence in candidates.items():
96 if candidate_name in self.progress.found_candidates:
97 # Update confidence if higher
98 self.progress.found_candidates[candidate_name] = max(
99 self.progress.found_candidates[candidate_name],
100 confidence,
101 )
102 else:
103 self.progress.found_candidates[candidate_name] = confidence
105 # Keep all results for final synthesis
106 all_results.extend(results)
108 self.progress.search_depth += 1
110 # Return both results and progress
111 return all_results, self.progress
113 def generate_verification_searches(
114 self,
115 candidates: Dict[str, float],
116 constraints: List,
117 max_searches: int = 5,
118 ) -> List[str]:
119 """Generate targeted searches to verify top candidates."""
120 if not candidates:
121 return []
123 # Get top candidates by confidence
124 top_candidates = sorted(
125 candidates.items(), key=lambda x: x[1], reverse=True
126 )[:3]
128 verification_searches = []
129 for candidate_name, confidence in top_candidates:
130 # Generate verification searches for this candidate
131 for constraint in constraints[:2]: # Verify top constraints
132 search = f'"{candidate_name}" {constraint.description}'
133 if search.lower() not in self.progress.searched_terms:
134 verification_searches.append(search)
136 return verification_searches[:max_searches]
138 def _extract_candidates_from_results(
139 self, results: List[Dict], query: str
140 ) -> Dict[str, float]:
141 """Extract potential answer candidates from search results."""
142 candidates = {}
144 # Simple extraction based on titles and snippets
145 for result in results[:10]: # Focus on top results
146 title = result.get("title", "")
147 snippet = result.get("snippet", "")
149 # Look for proper nouns and specific names
150 # This is simplified - in practice, might use NER or more sophisticated extraction
151 combined_text = f"{title} {snippet}"
153 # Extract quoted terms as potential candidates
154 import re
156 quoted_terms = re.findall(r'"([^"]+)"', combined_text)
157 for term in quoted_terms:
158 if (
159 len(term) > 2 and len(term) < 50
160 ): # Reasonable length for an answer
161 candidates[term] = 0.3 # Base confidence from appearance
163 # Boost confidence if appears in title
164 if title:
165 # Titles often contain the actual answer
166 title_words = title.split()
167 for i in range(len(title_words)):
168 for j in range(i + 1, min(i + 4, len(title_words) + 1)):
169 phrase = " ".join(title_words[i:j])
170 if (
171 len(phrase) > 3 and phrase[0].isupper()
172 ): # Likely proper noun
173 candidates[phrase] = candidates.get(phrase, 0) + 0.2
175 return candidates
177 def _update_entity_coverage(
178 self, query: str, entities: Dict[str, List[str]]
179 ):
180 """Track which entities have been covered in searches."""
181 query_lower = query.lower()
183 for entity_type, entity_list in entities.items():
184 for entity in entity_list:
185 if entity.lower() in query_lower:
186 self.progress.update_coverage(entity_type, entity)
188 def suggest_next_searches(
189 self, entities: Dict[str, List[str]], max_suggestions: int = 5
190 ) -> List[str]:
191 """Suggest next searches based on coverage and findings."""
192 suggestions = []
194 # 1. Check uncovered entities
195 uncovered = self.progress.get_uncovered_entities(entities)
197 # 2. If we have candidates, verify them with uncovered constraints
198 if self.progress.found_candidates:
199 top_candidate = max(
200 self.progress.found_candidates.items(), key=lambda x: x[1]
201 )[0]
203 # Combine candidate with uncovered entities
204 for entity_type, entity_list in uncovered.items():
205 for entity in entity_list[:2]:
206 search = f'"{top_candidate}" {entity}'
207 if search.lower() not in self.progress.searched_terms:
208 suggestions.append(search)
210 # 3. Otherwise, create new combinations of uncovered entities
211 else:
212 # Focus on systematic coverage
213 if uncovered.get("temporal"):
214 # Year-by-year with key term
215 key_term = (
216 entities.get("names", [""])[0]
217 or entities.get("descriptors", [""])[0]
218 )
219 for year in uncovered["temporal"][:3]:
220 search = f"{key_term} {year}".strip()
221 if search.lower() not in self.progress.searched_terms:
222 suggestions.append(search)
224 if uncovered.get("names") and uncovered.get("descriptors"):
225 # Combine names with descriptors
226 for name in uncovered["names"][:2]:
227 for desc in uncovered["descriptors"][:2]:
228 search = f"{name} {desc}"
229 if search.lower() not in self.progress.searched_terms:
230 suggestions.append(search)
232 return suggestions[:max_suggestions]
234 def _parallel_search(
235 self, queries: List[str], max_workers: int
236 ) -> List[Tuple[str, List[Dict]]]:
237 """Execute searches in parallel and return results."""
238 results = []
240 def search_query(query):
241 try:
242 search_results = self.search_engine.run(query)
243 return (query, search_results or [])
244 except Exception as e:
245 logger.exception(f"Error searching '{query}': {e!s}")
246 return (query, [])
248 # Create context-preserving wrapper for the search function
249 context_aware_search = preserve_research_context(search_query)
251 # Run searches in parallel
252 with concurrent.futures.ThreadPoolExecutor(
253 max_workers=max_workers
254 ) as executor:
255 futures = [
256 executor.submit(context_aware_search, q) for q in queries
257 ]
258 for future in concurrent.futures.as_completed(futures):
259 results.append(future.result())
261 return results