Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / parallel_explorer.py: 99%
90 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Parallel candidate explorer implementation.
4This explorer runs multiple search queries in parallel to quickly discover
5a wide range of candidates.
6"""
8import concurrent.futures
9import time
10from typing import List, Optional
12from loguru import logger
14from ...database.thread_local_session import thread_cleanup
15from ..candidates.base_candidate import Candidate
16from ..constraints.base_constraint import Constraint
17from .base_explorer import (
18 BaseCandidateExplorer,
19 ExplorationResult,
20 ExplorationStrategy,
21)
24class ParallelExplorer(BaseCandidateExplorer):
25 """
26 Parallel candidate explorer that runs multiple searches concurrently.
28 This explorer:
29 1. Generates multiple search queries from the initial query
30 2. Runs searches in parallel for speed
31 3. Collects and deduplicates candidates
32 4. Focuses on breadth-first exploration
33 """
35 def __init__(
36 self,
37 *args,
38 max_workers: int = 5,
39 queries_per_round: int = 8,
40 max_rounds: int = 3,
41 **kwargs,
42 ):
43 """
44 Initialize parallel explorer.
46 Args:
47 max_workers: Maximum number of parallel search threads
48 queries_per_round: Number of queries to generate per round
49 max_rounds: Maximum exploration rounds
50 """
51 super().__init__(*args, **kwargs)
52 self.max_workers = max_workers
53 self.queries_per_round = queries_per_round
54 self.max_rounds = max_rounds
56 def explore(
57 self,
58 initial_query: str,
59 constraints: Optional[List[Constraint]] = None,
60 entity_type: Optional[str] = None,
61 ) -> ExplorationResult:
62 """Explore candidates using parallel search strategy."""
63 start_time = time.time()
64 logger.info(f"Starting parallel exploration for: {initial_query}")
66 all_candidates = []
67 exploration_paths = []
68 total_searched = 0
70 # Initial search
71 current_queries = [initial_query]
73 with concurrent.futures.ThreadPoolExecutor(
74 max_workers=self.max_workers
75 ) as executor:
76 for round_num in range(self.max_rounds):
77 if not self._should_continue_exploration(
78 start_time, len(all_candidates)
79 ):
80 break
82 logger.info(
83 f"Exploration round {round_num + 1}: {len(current_queries)} queries"
84 )
86 # Submit all queries for parallel execution
87 future_to_query = {
88 executor.submit(
89 thread_cleanup(self._execute_search), query
90 ): query
91 for query in current_queries
92 }
94 round_candidates = []
96 # Collect results as they complete
97 for future in concurrent.futures.as_completed(future_to_query):
98 query = future_to_query[future]
99 total_searched += 1
101 try:
102 results = future.result()
103 candidates = self._extract_candidates_from_results(
104 results, entity_type
105 )
106 round_candidates.extend(candidates)
107 exploration_paths.append(
108 f"Round {round_num + 1}: {query} -> {len(candidates)} candidates"
109 )
111 except Exception:
112 logger.exception(f"Error processing query '{query}'")
114 # Add new candidates
115 all_candidates.extend(round_candidates)
117 # Generate queries for next round
118 if round_num < self.max_rounds - 1:
119 current_queries = self.generate_exploration_queries(
120 initial_query, all_candidates, constraints
121 )[: self.queries_per_round]
123 if not current_queries:
124 logger.info("No more queries to explore")
125 break
127 # Deduplicate and rank
128 unique_candidates = self._deduplicate_candidates(all_candidates)
129 ranked_candidates = self._rank_candidates_by_relevance(
130 unique_candidates, initial_query
131 )
133 # Limit to max candidates
134 final_candidates = ranked_candidates[: self.max_candidates]
136 elapsed_time = time.time() - start_time
137 logger.info(
138 f"Parallel exploration completed: {len(final_candidates)} unique candidates in {elapsed_time:.1f}s"
139 )
141 return ExplorationResult(
142 candidates=final_candidates,
143 total_searched=total_searched,
144 unique_candidates=len(unique_candidates),
145 exploration_paths=exploration_paths,
146 metadata={
147 "strategy": "parallel",
148 "rounds": min(round_num + 1, self.max_rounds),
149 "max_workers": self.max_workers,
150 "entity_type": entity_type,
151 },
152 elapsed_time=elapsed_time,
153 strategy_used=ExplorationStrategy.BREADTH_FIRST,
154 )
156 def generate_exploration_queries(
157 self,
158 base_query: str,
159 found_candidates: List[Candidate],
160 constraints: Optional[List[Constraint]] = None,
161 ) -> List[str]:
162 """Generate queries for parallel exploration."""
163 queries = []
165 # Query variations based on base query
166 base_variations = self._generate_query_variations(base_query)
167 queries.extend(base_variations)
169 # Queries based on found candidates
170 if found_candidates:
171 candidate_queries = self._generate_candidate_based_queries(
172 found_candidates, base_query
173 )
174 queries.extend(candidate_queries)
176 # Constraint-based queries
177 if constraints:
178 constraint_queries = self._generate_constraint_queries(
179 constraints, base_query
180 )
181 queries.extend(constraint_queries)
183 # Remove already explored queries
184 new_queries = [
185 q for q in queries if q.lower() not in self.explored_queries
186 ]
188 return new_queries[: self.queries_per_round]
190 def _generate_query_variations(self, base_query: str) -> List[str]:
191 """Generate variations of the base query."""
192 try:
193 prompt = f"""
194Generate 4 search query variations for: "{base_query}"
196Each variation should:
1971. Use different keywords but same intent
1982. Be specific and searchable
1993. Focus on finding concrete examples or instances
201Format as numbered list:
2021. [query]
2032. [query]
2043. [query]
2054. [query]
206"""
208 response = self.model.invoke(prompt).content.strip()
210 # Parse numbered list
211 queries = []
212 for line in response.split("\n"):
213 line = line.strip()
214 if line and any(line.startswith(f"{i}.") for i in range(1, 10)):
215 # Remove number prefix
216 query = line.split(".", 1)[1].strip()
217 if query: 217 ↛ 212line 217 didn't jump to line 212 because the condition on line 217 was always true
218 queries.append(query)
220 return queries[:4]
222 except Exception:
223 logger.exception("Error generating query variations")
224 return []
226 def _generate_candidate_based_queries(
227 self, candidates: List[Candidate], base_query: str
228 ) -> List[str]:
229 """Generate queries based on found candidates."""
230 queries = []
232 # Sample a few candidates to avoid too many queries
233 sample_candidates = candidates[:3]
235 for candidate in sample_candidates:
236 # Query for similar entities
237 queries.append(f'similar to "{candidate.name}"')
238 queries.append(f'like "{candidate.name}" examples')
240 return queries
242 def _generate_constraint_queries(
243 self, constraints: List[Constraint], base_query: str
244 ) -> List[str]:
245 """Generate queries focusing on specific constraints."""
246 queries = []
248 # Sample constraints to avoid too many queries
249 for constraint in constraints[:2]:
250 queries.append(f"{constraint.value} examples")
251 queries.append(f'"{constraint.value}" instances')
253 return queries