Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / base_explorer.py: 98%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Base candidate explorer for inheritance-based exploration system. 

3 

4This module provides the base interface and common functionality for 

5candidate exploration implementations. 

6""" 

7 

8from abc import ABC, abstractmethod 

9from dataclasses import dataclass 

10from enum import Enum 

11from typing import Any, Optional 

12 

13from langchain_core.language_models import BaseChatModel 

14from loguru import logger 

15 

16from ..candidates.base_candidate import Candidate 

17from ..constraints.base_constraint import Constraint 

18 

19 

20class ExplorationStrategy(Enum): 

21 """Different exploration strategies.""" 

22 

23 BREADTH_FIRST = "breadth_first" # Explore widely first 

24 DEPTH_FIRST = "depth_first" # Deep dive into promising areas 

25 CONSTRAINT_GUIDED = "constraint_guided" # Let constraints guide exploration 

26 DIVERSITY_FOCUSED = "diversity_focused" # Maximize candidate diversity 

27 ADAPTIVE = "adaptive" # Adapt based on findings 

28 

29 

30@dataclass 

31class ExplorationResult: 

32 """Result of candidate exploration.""" 

33 

34 candidates: list[Candidate] 

35 total_searched: int 

36 unique_candidates: int 

37 exploration_paths: list[str] 

38 metadata: dict[str, Any] 

39 elapsed_time: float 

40 strategy_used: ExplorationStrategy 

41 

42 

43class BaseCandidateExplorer(ABC): 

44 """ 

45 Base class for candidate exploration implementations. 

46 

47 This provides the common interface and shared functionality that 

48 all candidate explorers should implement. 

49 """ 

50 

51 def __init__( 

52 self, 

53 model: BaseChatModel, 

54 search_engine, 

55 max_candidates: int = 50, 

56 max_search_time: float = 60.0, 

57 **kwargs, 

58 ): 

59 """ 

60 Initialize the base candidate explorer. 

61 

62 Args: 

63 model: Language model for analysis 

64 search_engine: Search engine for finding candidates 

65 max_candidates: Maximum number of candidates to find 

66 max_search_time: Maximum time to spend searching 

67 **kwargs: Additional parameters for specific implementations 

68 """ 

69 self.model = model 

70 self.search_engine = search_engine 

71 self.max_candidates = max_candidates 

72 self.max_search_time = max_search_time 

73 

74 # Tracking 

75 self.explored_queries: set[str] = set() 

76 self.found_candidates: dict[str, Candidate] = {} 

77 

78 @abstractmethod 

79 def explore( 

80 self, 

81 initial_query: str, 

82 constraints: Optional[list[Constraint]] = None, 

83 entity_type: Optional[str] = None, 

84 ) -> ExplorationResult: 

85 """ 

86 Explore and discover candidates. 

87 

88 Args: 

89 initial_query: Starting query for exploration 

90 constraints: Optional constraints to guide exploration 

91 entity_type: Optional entity type to focus on 

92 

93 Returns: 

94 ExplorationResult: Complete exploration results 

95 """ 

96 pass 

97 

98 @abstractmethod 

99 def generate_exploration_queries( 

100 self, 

101 base_query: str, 

102 found_candidates: list[Candidate], 

103 constraints: Optional[list[Constraint]] = None, 

104 ) -> list[str]: 

105 """ 

106 Generate new queries for continued exploration. 

107 

108 Args: 

109 base_query: Original base query 

110 found_candidates: Candidates found so far 

111 constraints: Optional constraints to consider 

112 

113 Returns: 

114 List[str]: New queries to explore 

115 """ 

116 pass 

117 

118 def _execute_search(self, query: str) -> dict[str, Any]: 

119 """Execute a search query.""" 

120 try: 

121 # Mark query as explored 

122 self.explored_queries.add(query.lower()) 

123 

124 # Execute search 

125 results = self.search_engine.run(query) 

126 

127 # Handle different result formats 

128 if isinstance(results, list): 

129 # If results is a list, wrap it in the expected format 

130 formatted_results = {"results": results, "query": query} 

131 logger.info( 

132 f"Search '{query[:50]}...' returned {len(results)} results" 

133 ) 

134 return formatted_results 

135 if isinstance(results, dict): 

136 # If results is already a dict, use it as is 

137 result_count = len(results.get("results", [])) 

138 logger.info( 

139 f"Search '{query[:50]}...' returned {result_count} results" 

140 ) 

141 return results 

142 # Unknown format, return empty 

143 logger.warning(f"Unknown search result format: {type(results)}") 

144 return {"results": [], "query": query} 

145 

146 except Exception: 

147 logger.exception(f"Error executing search '{query}'") 

148 return {"results": []} 

149 

150 def _extract_candidates_from_results( 

151 self, 

152 results: dict[str, Any], 

153 original_query: str = None, 

154 entity_type: Optional[str] = None, 

155 ) -> list[Candidate]: 

156 """Generate answer candidates directly from search results using LLM.""" 

157 candidates = [] 

158 

159 # Collect all search result content 

160 all_content = [] 

161 for result in results.get("results", []): 

162 title = result.get("title", "") 

163 snippet = result.get("snippet", "") 

164 if title or snippet: 164 ↛ 161line 164 didn't jump to line 161 because the condition on line 164 was always true

165 all_content.append(f"Title: {title}\nContent: {snippet}") 

166 

167 if not all_content or not original_query: 

168 return candidates 

169 

170 # Generate answer candidates using LLM 

171 answer_candidates = self._generate_answer_candidates( 

172 original_query, 

173 "\n\n".join(all_content[:10]), # Limit to first 10 results 

174 ) 

175 

176 for answer in answer_candidates: 

177 if answer and answer not in self.found_candidates: 

178 candidate = Candidate( 

179 name=answer, 

180 metadata={ 

181 "source": "llm_answer_generation", 

182 "query": results.get("query", ""), 

183 "original_query": original_query, 

184 "result_count": len(results.get("results", [])), 

185 }, 

186 ) 

187 candidates.append(candidate) 

188 self.found_candidates[answer] = candidate 

189 

190 return candidates 

191 

192 def _generate_answer_candidates( 

193 self, question: str, search_content: str 

194 ) -> list[str]: 

195 """Generate multiple answer candidates from search results.""" 

196 prompt = f""" 

197Question: {question} 

198 

199Based on these search results, provide 3-5 possible answers: 

200 

201{search_content} 

202 

203Give me multiple possible answers, one per line: 

204""" 

205 

206 try: 

207 response = self.model.invoke(prompt) 

208 content = response.content.strip() 

209 

210 # Parse multiple answers 

211 answers = [] 

212 for line in content.split("\n"): 

213 line = line.strip() 

214 if line: 214 ↛ 212line 214 didn't jump to line 212 because the condition on line 214 was always true

215 # Clean up common prefixes and formatting 

216 line = line.lstrip("•-*1234567890.").strip() 

217 if line and len(line) > 2: # Skip very short answers 

218 answers.append(line) 

219 

220 return answers[:5] # Limit to 5 candidates max 

221 

222 except Exception: 

223 logger.exception("Error generating answer candidates") 

224 return [] 

225 

226 def _extract_entity_names( 

227 self, text: str, entity_type: Optional[str] = None 

228 ) -> list[str]: 

229 """Extract entity names from text using LLM.""" 

230 if not text.strip(): 

231 return [] 

232 

233 prompt = f""" 

234Extract specific entity names from this text. 

235{"Focus on: " + entity_type if entity_type else "Extract any named entities."} 

236 

237Text: {text[:500]} 

238 

239Return only the names, one per line. Be selective - only include clear, specific names. 

240Do not include: 

241- Generic terms or categories 

242- Adjectives or descriptions 

243- Common words 

244 

245Names: 

246""" 

247 

248 try: 

249 response = self.model.invoke(prompt).content.strip() 

250 

251 # Parse response into names 

252 names = [] 

253 for line in response.split("\n"): 

254 name = line.strip() 

255 if ( 

256 name 

257 and len(name) > 2 

258 and not name.lower().startswith(("the ", "a ", "an ")) 

259 ): 

260 names.append(name) 

261 

262 return names[:5] # Limit to top 5 per text 

263 

264 except Exception: 

265 logger.exception("Error extracting entity names") 

266 return [] 

267 

268 def _should_continue_exploration( 

269 self, start_time: float, candidates_found: int 

270 ) -> bool: 

271 """Determine if exploration should continue.""" 

272 import time 

273 

274 elapsed = time.time() - start_time 

275 

276 # Stop if time limit reached 

277 if elapsed > self.max_search_time: 

278 logger.info(f"Time limit reached ({elapsed:.1f}s)") 

279 return False 

280 

281 # Stop if candidate limit reached 

282 if candidates_found >= self.max_candidates: 

283 logger.info(f"Candidate limit reached ({candidates_found})") 

284 return False 

285 

286 return True 

287 

288 def _deduplicate_candidates( 

289 self, candidates: list[Candidate] 

290 ) -> list[Candidate]: 

291 """Remove duplicate candidates based on name similarity.""" 

292 unique_candidates = [] 

293 seen_names = set() 

294 

295 for candidate in candidates: 

296 # Simple deduplication by exact name match 

297 name_key = candidate.name.lower().strip() 

298 if name_key not in seen_names: 

299 seen_names.add(name_key) 

300 unique_candidates.append(candidate) 

301 

302 return unique_candidates 

303 

304 def _rank_candidates_by_relevance( 

305 self, candidates: list[Candidate], query: str 

306 ) -> list[Candidate]: 

307 """Rank candidates by relevance to original query.""" 

308 if not candidates: 

309 return candidates 

310 

311 # Simple relevance scoring based on metadata 

312 for candidate in candidates: 

313 score = 0.0 

314 

315 # Score based on source query similarity 

316 if "query" in candidate.metadata: 

317 # Simple word overlap scoring 

318 query_words = set(query.lower().split()) 

319 candidate_query_words = set( 

320 candidate.metadata["query"].lower().split() 

321 ) 

322 overlap = len(query_words.intersection(candidate_query_words)) 

323 score += overlap * 0.1 

324 

325 # Score based on result title relevance 

326 if "result_title" in candidate.metadata: 

327 title_words = set( 

328 candidate.metadata["result_title"].lower().split() 

329 ) 

330 overlap = len(query_words.intersection(title_words)) 

331 score += overlap * 0.2 

332 

333 candidate.relevance_score = score 

334 

335 # Sort by relevance 

336 return sorted( 

337 candidates, 

338 key=lambda c: getattr(c, "relevance_score", 0.0), 

339 reverse=True, 

340 )