Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / parallel_explorer.py: 12%
89 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Parallel candidate explorer implementation.
4This explorer runs multiple search queries in parallel to quickly discover
5a wide range of candidates.
6"""
8import concurrent.futures
9import time
10from typing import List, Optional
12from loguru import logger
14from ..candidates.base_candidate import Candidate
15from ..constraints.base_constraint import Constraint
16from .base_explorer import (
17 BaseCandidateExplorer,
18 ExplorationResult,
19 ExplorationStrategy,
20)
23class ParallelExplorer(BaseCandidateExplorer):
24 """
25 Parallel candidate explorer that runs multiple searches concurrently.
27 This explorer:
28 1. Generates multiple search queries from the initial query
29 2. Runs searches in parallel for speed
30 3. Collects and deduplicates candidates
31 4. Focuses on breadth-first exploration
32 """
34 def __init__(
35 self,
36 *args,
37 max_workers: int = 5,
38 queries_per_round: int = 8,
39 max_rounds: int = 3,
40 **kwargs,
41 ):
42 """
43 Initialize parallel explorer.
45 Args:
46 max_workers: Maximum number of parallel search threads
47 queries_per_round: Number of queries to generate per round
48 max_rounds: Maximum exploration rounds
49 """
50 super().__init__(*args, **kwargs)
51 self.max_workers = max_workers
52 self.queries_per_round = queries_per_round
53 self.max_rounds = max_rounds
55 def explore(
56 self,
57 initial_query: str,
58 constraints: Optional[List[Constraint]] = None,
59 entity_type: Optional[str] = None,
60 ) -> ExplorationResult:
61 """Explore candidates using parallel search strategy."""
62 start_time = time.time()
63 logger.info(f"Starting parallel exploration for: {initial_query}")
65 all_candidates = []
66 exploration_paths = []
67 total_searched = 0
69 # Initial search
70 current_queries = [initial_query]
72 with concurrent.futures.ThreadPoolExecutor(
73 max_workers=self.max_workers
74 ) as executor:
75 for round_num in range(self.max_rounds):
76 if not self._should_continue_exploration(
77 start_time, len(all_candidates)
78 ):
79 break
81 logger.info(
82 f"Exploration round {round_num + 1}: {len(current_queries)} queries"
83 )
85 # Submit all queries for parallel execution
86 future_to_query = {
87 executor.submit(self._execute_search, query): query
88 for query in current_queries
89 }
91 round_candidates = []
93 # Collect results as they complete
94 for future in concurrent.futures.as_completed(future_to_query):
95 query = future_to_query[future]
96 total_searched += 1
98 try:
99 results = future.result()
100 candidates = self._extract_candidates_from_results(
101 results, entity_type
102 )
103 round_candidates.extend(candidates)
104 exploration_paths.append(
105 f"Round {round_num + 1}: {query} -> {len(candidates)} candidates"
106 )
108 except Exception as e:
109 logger.exception(
110 f"Error processing query '{query}': {e}"
111 )
113 # Add new candidates
114 all_candidates.extend(round_candidates)
116 # Generate queries for next round
117 if round_num < self.max_rounds - 1:
118 current_queries = self.generate_exploration_queries(
119 initial_query, all_candidates, constraints
120 )[: self.queries_per_round]
122 if not current_queries:
123 logger.info("No more queries to explore")
124 break
126 # Deduplicate and rank
127 unique_candidates = self._deduplicate_candidates(all_candidates)
128 ranked_candidates = self._rank_candidates_by_relevance(
129 unique_candidates, initial_query
130 )
132 # Limit to max candidates
133 final_candidates = ranked_candidates[: self.max_candidates]
135 elapsed_time = time.time() - start_time
136 logger.info(
137 f"Parallel exploration completed: {len(final_candidates)} unique candidates in {elapsed_time:.1f}s"
138 )
140 return ExplorationResult(
141 candidates=final_candidates,
142 total_searched=total_searched,
143 unique_candidates=len(unique_candidates),
144 exploration_paths=exploration_paths,
145 metadata={
146 "strategy": "parallel",
147 "rounds": min(round_num + 1, self.max_rounds),
148 "max_workers": self.max_workers,
149 "entity_type": entity_type,
150 },
151 elapsed_time=elapsed_time,
152 strategy_used=ExplorationStrategy.BREADTH_FIRST,
153 )
155 def generate_exploration_queries(
156 self,
157 base_query: str,
158 found_candidates: List[Candidate],
159 constraints: Optional[List[Constraint]] = None,
160 ) -> List[str]:
161 """Generate queries for parallel exploration."""
162 queries = []
164 # Query variations based on base query
165 base_variations = self._generate_query_variations(base_query)
166 queries.extend(base_variations)
168 # Queries based on found candidates
169 if found_candidates:
170 candidate_queries = self._generate_candidate_based_queries(
171 found_candidates, base_query
172 )
173 queries.extend(candidate_queries)
175 # Constraint-based queries
176 if constraints:
177 constraint_queries = self._generate_constraint_queries(
178 constraints, base_query
179 )
180 queries.extend(constraint_queries)
182 # Remove already explored queries
183 new_queries = [
184 q for q in queries if q.lower() not in self.explored_queries
185 ]
187 return new_queries[: self.queries_per_round]
189 def _generate_query_variations(self, base_query: str) -> List[str]:
190 """Generate variations of the base query."""
191 try:
192 prompt = f"""
193Generate 4 search query variations for: "{base_query}"
195Each variation should:
1961. Use different keywords but same intent
1972. Be specific and searchable
1983. Focus on finding concrete examples or instances
200Format as numbered list:
2011. [query]
2022. [query]
2033. [query]
2044. [query]
205"""
207 response = self.model.invoke(prompt).content.strip()
209 # Parse numbered list
210 queries = []
211 for line in response.split("\n"):
212 line = line.strip()
213 if line and any(line.startswith(f"{i}.") for i in range(1, 10)):
214 # Remove number prefix
215 query = line.split(".", 1)[1].strip()
216 if query:
217 queries.append(query)
219 return queries[:4]
221 except Exception:
222 logger.exception("Error generating query variations")
223 return []
225 def _generate_candidate_based_queries(
226 self, candidates: List[Candidate], base_query: str
227 ) -> List[str]:
228 """Generate queries based on found candidates."""
229 queries = []
231 # Sample a few candidates to avoid too many queries
232 sample_candidates = candidates[:3]
234 for candidate in sample_candidates:
235 # Query for similar entities
236 queries.append(f'similar to "{candidate.name}"')
237 queries.append(f'like "{candidate.name}" examples')
239 return queries
241 def _generate_constraint_queries(
242 self, constraints: List[Constraint], base_query: str
243 ) -> List[str]:
244 """Generate queries focusing on specific constraints."""
245 queries = []
247 # Sample constraints to avoid too many queries
248 for constraint in constraints[:2]:
249 queries.append(f"{constraint.value} examples")
250 queries.append(f'"{constraint.value}" instances')
252 return queries