Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / parallel_explorer.py: 99%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Parallel candidate explorer implementation. 

3 

4This explorer runs multiple search queries in parallel to quickly discover 

5a wide range of candidates. 

6""" 

7 

8import concurrent.futures 

9import time 

10from typing import List, Optional 

11 

12from loguru import logger 

13 

14from ..candidates.base_candidate import Candidate 

15from ..constraints.base_constraint import Constraint 

16from .base_explorer import ( 

17 BaseCandidateExplorer, 

18 ExplorationResult, 

19 ExplorationStrategy, 

20) 

21 

22 

23class ParallelExplorer(BaseCandidateExplorer): 

24 """ 

25 Parallel candidate explorer that runs multiple searches concurrently. 

26 

27 This explorer: 

28 1. Generates multiple search queries from the initial query 

29 2. Runs searches in parallel for speed 

30 3. Collects and deduplicates candidates 

31 4. Focuses on breadth-first exploration 

32 """ 

33 

34 def __init__( 

35 self, 

36 *args, 

37 max_workers: int = 5, 

38 queries_per_round: int = 8, 

39 max_rounds: int = 3, 

40 **kwargs, 

41 ): 

42 """ 

43 Initialize parallel explorer. 

44 

45 Args: 

46 max_workers: Maximum number of parallel search threads 

47 queries_per_round: Number of queries to generate per round 

48 max_rounds: Maximum exploration rounds 

49 """ 

50 super().__init__(*args, **kwargs) 

51 self.max_workers = max_workers 

52 self.queries_per_round = queries_per_round 

53 self.max_rounds = max_rounds 

54 

55 def explore( 

56 self, 

57 initial_query: str, 

58 constraints: Optional[List[Constraint]] = None, 

59 entity_type: Optional[str] = None, 

60 ) -> ExplorationResult: 

61 """Explore candidates using parallel search strategy.""" 

62 start_time = time.time() 

63 logger.info(f"Starting parallel exploration for: {initial_query}") 

64 

65 all_candidates = [] 

66 exploration_paths = [] 

67 total_searched = 0 

68 

69 # Initial search 

70 current_queries = [initial_query] 

71 

72 with concurrent.futures.ThreadPoolExecutor( 

73 max_workers=self.max_workers 

74 ) as executor: 

75 for round_num in range(self.max_rounds): 

76 if not self._should_continue_exploration( 

77 start_time, len(all_candidates) 

78 ): 

79 break 

80 

81 logger.info( 

82 f"Exploration round {round_num + 1}: {len(current_queries)} queries" 

83 ) 

84 

85 # Submit all queries for parallel execution 

86 future_to_query = { 

87 executor.submit(self._execute_search, query): query 

88 for query in current_queries 

89 } 

90 

91 round_candidates = [] 

92 

93 # Collect results as they complete 

94 for future in concurrent.futures.as_completed(future_to_query): 

95 query = future_to_query[future] 

96 total_searched += 1 

97 

98 try: 

99 results = future.result() 

100 candidates = self._extract_candidates_from_results( 

101 results, entity_type 

102 ) 

103 round_candidates.extend(candidates) 

104 exploration_paths.append( 

105 f"Round {round_num + 1}: {query} -> {len(candidates)} candidates" 

106 ) 

107 

108 except Exception: 

109 logger.exception(f"Error processing query '{query}'") 

110 

111 # Add new candidates 

112 all_candidates.extend(round_candidates) 

113 

114 # Generate queries for next round 

115 if round_num < self.max_rounds - 1: 

116 current_queries = self.generate_exploration_queries( 

117 initial_query, all_candidates, constraints 

118 )[: self.queries_per_round] 

119 

120 if not current_queries: 

121 logger.info("No more queries to explore") 

122 break 

123 

124 # Deduplicate and rank 

125 unique_candidates = self._deduplicate_candidates(all_candidates) 

126 ranked_candidates = self._rank_candidates_by_relevance( 

127 unique_candidates, initial_query 

128 ) 

129 

130 # Limit to max candidates 

131 final_candidates = ranked_candidates[: self.max_candidates] 

132 

133 elapsed_time = time.time() - start_time 

134 logger.info( 

135 f"Parallel exploration completed: {len(final_candidates)} unique candidates in {elapsed_time:.1f}s" 

136 ) 

137 

138 return ExplorationResult( 

139 candidates=final_candidates, 

140 total_searched=total_searched, 

141 unique_candidates=len(unique_candidates), 

142 exploration_paths=exploration_paths, 

143 metadata={ 

144 "strategy": "parallel", 

145 "rounds": min(round_num + 1, self.max_rounds), 

146 "max_workers": self.max_workers, 

147 "entity_type": entity_type, 

148 }, 

149 elapsed_time=elapsed_time, 

150 strategy_used=ExplorationStrategy.BREADTH_FIRST, 

151 ) 

152 

153 def generate_exploration_queries( 

154 self, 

155 base_query: str, 

156 found_candidates: List[Candidate], 

157 constraints: Optional[List[Constraint]] = None, 

158 ) -> List[str]: 

159 """Generate queries for parallel exploration.""" 

160 queries = [] 

161 

162 # Query variations based on base query 

163 base_variations = self._generate_query_variations(base_query) 

164 queries.extend(base_variations) 

165 

166 # Queries based on found candidates 

167 if found_candidates: 

168 candidate_queries = self._generate_candidate_based_queries( 

169 found_candidates, base_query 

170 ) 

171 queries.extend(candidate_queries) 

172 

173 # Constraint-based queries 

174 if constraints: 

175 constraint_queries = self._generate_constraint_queries( 

176 constraints, base_query 

177 ) 

178 queries.extend(constraint_queries) 

179 

180 # Remove already explored queries 

181 new_queries = [ 

182 q for q in queries if q.lower() not in self.explored_queries 

183 ] 

184 

185 return new_queries[: self.queries_per_round] 

186 

187 def _generate_query_variations(self, base_query: str) -> List[str]: 

188 """Generate variations of the base query.""" 

189 try: 

190 prompt = f""" 

191Generate 4 search query variations for: "{base_query}" 

192 

193Each variation should: 

1941. Use different keywords but same intent 

1952. Be specific and searchable 

1963. Focus on finding concrete examples or instances 

197 

198Format as numbered list: 

1991. [query] 

2002. [query] 

2013. [query] 

2024. [query] 

203""" 

204 

205 response = self.model.invoke(prompt).content.strip() 

206 

207 # Parse numbered list 

208 queries = [] 

209 for line in response.split("\n"): 

210 line = line.strip() 

211 if line and any(line.startswith(f"{i}.") for i in range(1, 10)): 

212 # Remove number prefix 

213 query = line.split(".", 1)[1].strip() 

214 if query: 214 ↛ 209line 214 didn't jump to line 209 because the condition on line 214 was always true

215 queries.append(query) 

216 

217 return queries[:4] 

218 

219 except Exception: 

220 logger.exception("Error generating query variations") 

221 return [] 

222 

223 def _generate_candidate_based_queries( 

224 self, candidates: List[Candidate], base_query: str 

225 ) -> List[str]: 

226 """Generate queries based on found candidates.""" 

227 queries = [] 

228 

229 # Sample a few candidates to avoid too many queries 

230 sample_candidates = candidates[:3] 

231 

232 for candidate in sample_candidates: 

233 # Query for similar entities 

234 queries.append(f'similar to "{candidate.name}"') 

235 queries.append(f'like "{candidate.name}" examples') 

236 

237 return queries 

238 

239 def _generate_constraint_queries( 

240 self, constraints: List[Constraint], base_query: str 

241 ) -> List[str]: 

242 """Generate queries focusing on specific constraints.""" 

243 queries = [] 

244 

245 # Sample constraints to avoid too many queries 

246 for constraint in constraints[:2]: 

247 queries.append(f"{constraint.value} examples") 

248 queries.append(f'"{constraint.value}" instances') 

249 

250 return queries