Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / parallel_explorer.py: 12%

89 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Parallel candidate explorer implementation. 

3 

4This explorer runs multiple search queries in parallel to quickly discover 

5a wide range of candidates. 

6""" 

7 

8import concurrent.futures 

9import time 

10from typing import List, Optional 

11 

12from loguru import logger 

13 

14from ..candidates.base_candidate import Candidate 

15from ..constraints.base_constraint import Constraint 

16from .base_explorer import ( 

17 BaseCandidateExplorer, 

18 ExplorationResult, 

19 ExplorationStrategy, 

20) 

21 

22 

23class ParallelExplorer(BaseCandidateExplorer): 

24 """ 

25 Parallel candidate explorer that runs multiple searches concurrently. 

26 

27 This explorer: 

28 1. Generates multiple search queries from the initial query 

29 2. Runs searches in parallel for speed 

30 3. Collects and deduplicates candidates 

31 4. Focuses on breadth-first exploration 

32 """ 

33 

34 def __init__( 

35 self, 

36 *args, 

37 max_workers: int = 5, 

38 queries_per_round: int = 8, 

39 max_rounds: int = 3, 

40 **kwargs, 

41 ): 

42 """ 

43 Initialize parallel explorer. 

44 

45 Args: 

46 max_workers: Maximum number of parallel search threads 

47 queries_per_round: Number of queries to generate per round 

48 max_rounds: Maximum exploration rounds 

49 """ 

50 super().__init__(*args, **kwargs) 

51 self.max_workers = max_workers 

52 self.queries_per_round = queries_per_round 

53 self.max_rounds = max_rounds 

54 

55 def explore( 

56 self, 

57 initial_query: str, 

58 constraints: Optional[List[Constraint]] = None, 

59 entity_type: Optional[str] = None, 

60 ) -> ExplorationResult: 

61 """Explore candidates using parallel search strategy.""" 

62 start_time = time.time() 

63 logger.info(f"Starting parallel exploration for: {initial_query}") 

64 

65 all_candidates = [] 

66 exploration_paths = [] 

67 total_searched = 0 

68 

69 # Initial search 

70 current_queries = [initial_query] 

71 

72 with concurrent.futures.ThreadPoolExecutor( 

73 max_workers=self.max_workers 

74 ) as executor: 

75 for round_num in range(self.max_rounds): 

76 if not self._should_continue_exploration( 

77 start_time, len(all_candidates) 

78 ): 

79 break 

80 

81 logger.info( 

82 f"Exploration round {round_num + 1}: {len(current_queries)} queries" 

83 ) 

84 

85 # Submit all queries for parallel execution 

86 future_to_query = { 

87 executor.submit(self._execute_search, query): query 

88 for query in current_queries 

89 } 

90 

91 round_candidates = [] 

92 

93 # Collect results as they complete 

94 for future in concurrent.futures.as_completed(future_to_query): 

95 query = future_to_query[future] 

96 total_searched += 1 

97 

98 try: 

99 results = future.result() 

100 candidates = self._extract_candidates_from_results( 

101 results, entity_type 

102 ) 

103 round_candidates.extend(candidates) 

104 exploration_paths.append( 

105 f"Round {round_num + 1}: {query} -> {len(candidates)} candidates" 

106 ) 

107 

108 except Exception as e: 

109 logger.exception( 

110 f"Error processing query '{query}': {e}" 

111 ) 

112 

113 # Add new candidates 

114 all_candidates.extend(round_candidates) 

115 

116 # Generate queries for next round 

117 if round_num < self.max_rounds - 1: 

118 current_queries = self.generate_exploration_queries( 

119 initial_query, all_candidates, constraints 

120 )[: self.queries_per_round] 

121 

122 if not current_queries: 

123 logger.info("No more queries to explore") 

124 break 

125 

126 # Deduplicate and rank 

127 unique_candidates = self._deduplicate_candidates(all_candidates) 

128 ranked_candidates = self._rank_candidates_by_relevance( 

129 unique_candidates, initial_query 

130 ) 

131 

132 # Limit to max candidates 

133 final_candidates = ranked_candidates[: self.max_candidates] 

134 

135 elapsed_time = time.time() - start_time 

136 logger.info( 

137 f"Parallel exploration completed: {len(final_candidates)} unique candidates in {elapsed_time:.1f}s" 

138 ) 

139 

140 return ExplorationResult( 

141 candidates=final_candidates, 

142 total_searched=total_searched, 

143 unique_candidates=len(unique_candidates), 

144 exploration_paths=exploration_paths, 

145 metadata={ 

146 "strategy": "parallel", 

147 "rounds": min(round_num + 1, self.max_rounds), 

148 "max_workers": self.max_workers, 

149 "entity_type": entity_type, 

150 }, 

151 elapsed_time=elapsed_time, 

152 strategy_used=ExplorationStrategy.BREADTH_FIRST, 

153 ) 

154 

155 def generate_exploration_queries( 

156 self, 

157 base_query: str, 

158 found_candidates: List[Candidate], 

159 constraints: Optional[List[Constraint]] = None, 

160 ) -> List[str]: 

161 """Generate queries for parallel exploration.""" 

162 queries = [] 

163 

164 # Query variations based on base query 

165 base_variations = self._generate_query_variations(base_query) 

166 queries.extend(base_variations) 

167 

168 # Queries based on found candidates 

169 if found_candidates: 

170 candidate_queries = self._generate_candidate_based_queries( 

171 found_candidates, base_query 

172 ) 

173 queries.extend(candidate_queries) 

174 

175 # Constraint-based queries 

176 if constraints: 

177 constraint_queries = self._generate_constraint_queries( 

178 constraints, base_query 

179 ) 

180 queries.extend(constraint_queries) 

181 

182 # Remove already explored queries 

183 new_queries = [ 

184 q for q in queries if q.lower() not in self.explored_queries 

185 ] 

186 

187 return new_queries[: self.queries_per_round] 

188 

189 def _generate_query_variations(self, base_query: str) -> List[str]: 

190 """Generate variations of the base query.""" 

191 try: 

192 prompt = f""" 

193Generate 4 search query variations for: "{base_query}" 

194 

195Each variation should: 

1961. Use different keywords but same intent 

1972. Be specific and searchable 

1983. Focus on finding concrete examples or instances 

199 

200Format as numbered list: 

2011. [query] 

2022. [query] 

2033. [query] 

2044. [query] 

205""" 

206 

207 response = self.model.invoke(prompt).content.strip() 

208 

209 # Parse numbered list 

210 queries = [] 

211 for line in response.split("\n"): 

212 line = line.strip() 

213 if line and any(line.startswith(f"{i}.") for i in range(1, 10)): 

214 # Remove number prefix 

215 query = line.split(".", 1)[1].strip() 

216 if query: 

217 queries.append(query) 

218 

219 return queries[:4] 

220 

221 except Exception: 

222 logger.exception("Error generating query variations") 

223 return [] 

224 

225 def _generate_candidate_based_queries( 

226 self, candidates: List[Candidate], base_query: str 

227 ) -> List[str]: 

228 """Generate queries based on found candidates.""" 

229 queries = [] 

230 

231 # Sample a few candidates to avoid too many queries 

232 sample_candidates = candidates[:3] 

233 

234 for candidate in sample_candidates: 

235 # Query for similar entities 

236 queries.append(f'similar to "{candidate.name}"') 

237 queries.append(f'like "{candidate.name}" examples') 

238 

239 return queries 

240 

241 def _generate_constraint_queries( 

242 self, constraints: List[Constraint], base_query: str 

243 ) -> List[str]: 

244 """Generate queries focusing on specific constraints.""" 

245 queries = [] 

246 

247 # Sample constraints to avoid too many queries 

248 for constraint in constraints[:2]: 

249 queries.append(f"{constraint.value} examples") 

250 queries.append(f'"{constraint.value}" instances') 

251 

252 return queries