Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / parallel_explorer.py: 99%
89 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Parallel candidate explorer implementation.
4This explorer runs multiple search queries in parallel to quickly discover
5a wide range of candidates.
6"""
8import concurrent.futures
9import time
10from typing import List, Optional
12from loguru import logger
14from ..candidates.base_candidate import Candidate
15from ..constraints.base_constraint import Constraint
16from .base_explorer import (
17 BaseCandidateExplorer,
18 ExplorationResult,
19 ExplorationStrategy,
20)
23class ParallelExplorer(BaseCandidateExplorer):
24 """
25 Parallel candidate explorer that runs multiple searches concurrently.
27 This explorer:
28 1. Generates multiple search queries from the initial query
29 2. Runs searches in parallel for speed
30 3. Collects and deduplicates candidates
31 4. Focuses on breadth-first exploration
32 """
34 def __init__(
35 self,
36 *args,
37 max_workers: int = 5,
38 queries_per_round: int = 8,
39 max_rounds: int = 3,
40 **kwargs,
41 ):
42 """
43 Initialize parallel explorer.
45 Args:
46 max_workers: Maximum number of parallel search threads
47 queries_per_round: Number of queries to generate per round
48 max_rounds: Maximum exploration rounds
49 """
50 super().__init__(*args, **kwargs)
51 self.max_workers = max_workers
52 self.queries_per_round = queries_per_round
53 self.max_rounds = max_rounds
55 def explore(
56 self,
57 initial_query: str,
58 constraints: Optional[List[Constraint]] = None,
59 entity_type: Optional[str] = None,
60 ) -> ExplorationResult:
61 """Explore candidates using parallel search strategy."""
62 start_time = time.time()
63 logger.info(f"Starting parallel exploration for: {initial_query}")
65 all_candidates = []
66 exploration_paths = []
67 total_searched = 0
69 # Initial search
70 current_queries = [initial_query]
72 with concurrent.futures.ThreadPoolExecutor(
73 max_workers=self.max_workers
74 ) as executor:
75 for round_num in range(self.max_rounds):
76 if not self._should_continue_exploration(
77 start_time, len(all_candidates)
78 ):
79 break
81 logger.info(
82 f"Exploration round {round_num + 1}: {len(current_queries)} queries"
83 )
85 # Submit all queries for parallel execution
86 future_to_query = {
87 executor.submit(self._execute_search, query): query
88 for query in current_queries
89 }
91 round_candidates = []
93 # Collect results as they complete
94 for future in concurrent.futures.as_completed(future_to_query):
95 query = future_to_query[future]
96 total_searched += 1
98 try:
99 results = future.result()
100 candidates = self._extract_candidates_from_results(
101 results, entity_type
102 )
103 round_candidates.extend(candidates)
104 exploration_paths.append(
105 f"Round {round_num + 1}: {query} -> {len(candidates)} candidates"
106 )
108 except Exception:
109 logger.exception(f"Error processing query '{query}'")
111 # Add new candidates
112 all_candidates.extend(round_candidates)
114 # Generate queries for next round
115 if round_num < self.max_rounds - 1:
116 current_queries = self.generate_exploration_queries(
117 initial_query, all_candidates, constraints
118 )[: self.queries_per_round]
120 if not current_queries:
121 logger.info("No more queries to explore")
122 break
124 # Deduplicate and rank
125 unique_candidates = self._deduplicate_candidates(all_candidates)
126 ranked_candidates = self._rank_candidates_by_relevance(
127 unique_candidates, initial_query
128 )
130 # Limit to max candidates
131 final_candidates = ranked_candidates[: self.max_candidates]
133 elapsed_time = time.time() - start_time
134 logger.info(
135 f"Parallel exploration completed: {len(final_candidates)} unique candidates in {elapsed_time:.1f}s"
136 )
138 return ExplorationResult(
139 candidates=final_candidates,
140 total_searched=total_searched,
141 unique_candidates=len(unique_candidates),
142 exploration_paths=exploration_paths,
143 metadata={
144 "strategy": "parallel",
145 "rounds": min(round_num + 1, self.max_rounds),
146 "max_workers": self.max_workers,
147 "entity_type": entity_type,
148 },
149 elapsed_time=elapsed_time,
150 strategy_used=ExplorationStrategy.BREADTH_FIRST,
151 )
153 def generate_exploration_queries(
154 self,
155 base_query: str,
156 found_candidates: List[Candidate],
157 constraints: Optional[List[Constraint]] = None,
158 ) -> List[str]:
159 """Generate queries for parallel exploration."""
160 queries = []
162 # Query variations based on base query
163 base_variations = self._generate_query_variations(base_query)
164 queries.extend(base_variations)
166 # Queries based on found candidates
167 if found_candidates:
168 candidate_queries = self._generate_candidate_based_queries(
169 found_candidates, base_query
170 )
171 queries.extend(candidate_queries)
173 # Constraint-based queries
174 if constraints:
175 constraint_queries = self._generate_constraint_queries(
176 constraints, base_query
177 )
178 queries.extend(constraint_queries)
180 # Remove already explored queries
181 new_queries = [
182 q for q in queries if q.lower() not in self.explored_queries
183 ]
185 return new_queries[: self.queries_per_round]
187 def _generate_query_variations(self, base_query: str) -> List[str]:
188 """Generate variations of the base query."""
189 try:
190 prompt = f"""
191Generate 4 search query variations for: "{base_query}"
193Each variation should:
1941. Use different keywords but same intent
1952. Be specific and searchable
1963. Focus on finding concrete examples or instances
198Format as numbered list:
1991. [query]
2002. [query]
2013. [query]
2024. [query]
203"""
205 response = self.model.invoke(prompt).content.strip()
207 # Parse numbered list
208 queries = []
209 for line in response.split("\n"):
210 line = line.strip()
211 if line and any(line.startswith(f"{i}.") for i in range(1, 10)):
212 # Remove number prefix
213 query = line.split(".", 1)[1].strip()
214 if query: 214 ↛ 209line 214 didn't jump to line 209 because the condition on line 214 was always true
215 queries.append(query)
217 return queries[:4]
219 except Exception:
220 logger.exception("Error generating query variations")
221 return []
223 def _generate_candidate_based_queries(
224 self, candidates: List[Candidate], base_query: str
225 ) -> List[str]:
226 """Generate queries based on found candidates."""
227 queries = []
229 # Sample a few candidates to avoid too many queries
230 sample_candidates = candidates[:3]
232 for candidate in sample_candidates:
233 # Query for similar entities
234 queries.append(f'similar to "{candidate.name}"')
235 queries.append(f'like "{candidate.name}" examples')
237 return queries
239 def _generate_constraint_queries(
240 self, constraints: List[Constraint], base_query: str
241 ) -> List[str]:
242 """Generate queries focusing on specific constraints."""
243 queries = []
245 # Sample constraints to avoid too many queries
246 for constraint in constraints[:2]:
247 queries.append(f"{constraint.value} examples")
248 queries.append(f'"{constraint.value}" instances')
250 return queries