Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / parallel_explorer.py: 99%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Parallel candidate explorer implementation. 

3 

4This explorer runs multiple search queries in parallel to quickly discover 

5a wide range of candidates. 

6""" 

7 

8import concurrent.futures 

9import time 

10from typing import List, Optional 

11 

12from loguru import logger 

13 

14from ...database.thread_local_session import thread_cleanup 

15from ..candidates.base_candidate import Candidate 

16from ..constraints.base_constraint import Constraint 

17from .base_explorer import ( 

18 BaseCandidateExplorer, 

19 ExplorationResult, 

20 ExplorationStrategy, 

21) 

22 

23 

24class ParallelExplorer(BaseCandidateExplorer): 

25 """ 

26 Parallel candidate explorer that runs multiple searches concurrently. 

27 

28 This explorer: 

29 1. Generates multiple search queries from the initial query 

30 2. Runs searches in parallel for speed 

31 3. Collects and deduplicates candidates 

32 4. Focuses on breadth-first exploration 

33 """ 

34 

35 def __init__( 

36 self, 

37 *args, 

38 max_workers: int = 5, 

39 queries_per_round: int = 8, 

40 max_rounds: int = 3, 

41 **kwargs, 

42 ): 

43 """ 

44 Initialize parallel explorer. 

45 

46 Args: 

47 max_workers: Maximum number of parallel search threads 

48 queries_per_round: Number of queries to generate per round 

49 max_rounds: Maximum exploration rounds 

50 """ 

51 super().__init__(*args, **kwargs) 

52 self.max_workers = max_workers 

53 self.queries_per_round = queries_per_round 

54 self.max_rounds = max_rounds 

55 

56 def explore( 

57 self, 

58 initial_query: str, 

59 constraints: Optional[List[Constraint]] = None, 

60 entity_type: Optional[str] = None, 

61 ) -> ExplorationResult: 

62 """Explore candidates using parallel search strategy.""" 

63 start_time = time.time() 

64 logger.info(f"Starting parallel exploration for: {initial_query}") 

65 

66 all_candidates = [] 

67 exploration_paths = [] 

68 total_searched = 0 

69 

70 # Initial search 

71 current_queries = [initial_query] 

72 

73 with concurrent.futures.ThreadPoolExecutor( 

74 max_workers=self.max_workers 

75 ) as executor: 

76 for round_num in range(self.max_rounds): 

77 if not self._should_continue_exploration( 

78 start_time, len(all_candidates) 

79 ): 

80 break 

81 

82 logger.info( 

83 f"Exploration round {round_num + 1}: {len(current_queries)} queries" 

84 ) 

85 

86 # Submit all queries for parallel execution 

87 future_to_query = { 

88 executor.submit( 

89 thread_cleanup(self._execute_search), query 

90 ): query 

91 for query in current_queries 

92 } 

93 

94 round_candidates = [] 

95 

96 # Collect results as they complete 

97 for future in concurrent.futures.as_completed(future_to_query): 

98 query = future_to_query[future] 

99 total_searched += 1 

100 

101 try: 

102 results = future.result() 

103 candidates = self._extract_candidates_from_results( 

104 results, entity_type 

105 ) 

106 round_candidates.extend(candidates) 

107 exploration_paths.append( 

108 f"Round {round_num + 1}: {query} -> {len(candidates)} candidates" 

109 ) 

110 

111 except Exception: 

112 logger.exception(f"Error processing query '{query}'") 

113 

114 # Add new candidates 

115 all_candidates.extend(round_candidates) 

116 

117 # Generate queries for next round 

118 if round_num < self.max_rounds - 1: 

119 current_queries = self.generate_exploration_queries( 

120 initial_query, all_candidates, constraints 

121 )[: self.queries_per_round] 

122 

123 if not current_queries: 

124 logger.info("No more queries to explore") 

125 break 

126 

127 # Deduplicate and rank 

128 unique_candidates = self._deduplicate_candidates(all_candidates) 

129 ranked_candidates = self._rank_candidates_by_relevance( 

130 unique_candidates, initial_query 

131 ) 

132 

133 # Limit to max candidates 

134 final_candidates = ranked_candidates[: self.max_candidates] 

135 

136 elapsed_time = time.time() - start_time 

137 logger.info( 

138 f"Parallel exploration completed: {len(final_candidates)} unique candidates in {elapsed_time:.1f}s" 

139 ) 

140 

141 return ExplorationResult( 

142 candidates=final_candidates, 

143 total_searched=total_searched, 

144 unique_candidates=len(unique_candidates), 

145 exploration_paths=exploration_paths, 

146 metadata={ 

147 "strategy": "parallel", 

148 "rounds": min(round_num + 1, self.max_rounds), 

149 "max_workers": self.max_workers, 

150 "entity_type": entity_type, 

151 }, 

152 elapsed_time=elapsed_time, 

153 strategy_used=ExplorationStrategy.BREADTH_FIRST, 

154 ) 

155 

156 def generate_exploration_queries( 

157 self, 

158 base_query: str, 

159 found_candidates: List[Candidate], 

160 constraints: Optional[List[Constraint]] = None, 

161 ) -> List[str]: 

162 """Generate queries for parallel exploration.""" 

163 queries = [] 

164 

165 # Query variations based on base query 

166 base_variations = self._generate_query_variations(base_query) 

167 queries.extend(base_variations) 

168 

169 # Queries based on found candidates 

170 if found_candidates: 

171 candidate_queries = self._generate_candidate_based_queries( 

172 found_candidates, base_query 

173 ) 

174 queries.extend(candidate_queries) 

175 

176 # Constraint-based queries 

177 if constraints: 

178 constraint_queries = self._generate_constraint_queries( 

179 constraints, base_query 

180 ) 

181 queries.extend(constraint_queries) 

182 

183 # Remove already explored queries 

184 new_queries = [ 

185 q for q in queries if q.lower() not in self.explored_queries 

186 ] 

187 

188 return new_queries[: self.queries_per_round] 

189 

190 def _generate_query_variations(self, base_query: str) -> List[str]: 

191 """Generate variations of the base query.""" 

192 try: 

193 prompt = f""" 

194Generate 4 search query variations for: "{base_query}" 

195 

196Each variation should: 

1971. Use different keywords but same intent 

1982. Be specific and searchable 

1993. Focus on finding concrete examples or instances 

200 

201Format as numbered list: 

2021. [query] 

2032. [query] 

2043. [query] 

2054. [query] 

206""" 

207 

208 response = self.model.invoke(prompt).content.strip() 

209 

210 # Parse numbered list 

211 queries = [] 

212 for line in response.split("\n"): 

213 line = line.strip() 

214 if line and any(line.startswith(f"{i}.") for i in range(1, 10)): 

215 # Remove number prefix 

216 query = line.split(".", 1)[1].strip() 

217 if query: 217 ↛ 212line 217 didn't jump to line 212 because the condition on line 217 was always true

218 queries.append(query) 

219 

220 return queries[:4] 

221 

222 except Exception: 

223 logger.exception("Error generating query variations") 

224 return [] 

225 

226 def _generate_candidate_based_queries( 

227 self, candidates: List[Candidate], base_query: str 

228 ) -> List[str]: 

229 """Generate queries based on found candidates.""" 

230 queries = [] 

231 

232 # Sample a few candidates to avoid too many queries 

233 sample_candidates = candidates[:3] 

234 

235 for candidate in sample_candidates: 

236 # Query for similar entities 

237 queries.append(f'similar to "{candidate.name}"') 

238 queries.append(f'like "{candidate.name}" examples') 

239 

240 return queries 

241 

242 def _generate_constraint_queries( 

243 self, constraints: List[Constraint], base_query: str 

244 ) -> List[str]: 

245 """Generate queries focusing on specific constraints.""" 

246 queries = [] 

247 

248 # Sample constraints to avoid too many queries 

249 for constraint in constraints[:2]: 

250 queries.append(f"{constraint.value} examples") 

251 queries.append(f'"{constraint.value}" instances') 

252 

253 return queries