Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / base_explorer.py: 55%

137 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Base candidate explorer for inheritance-based exploration system. 

3 

4This module provides the base interface and common functionality for 

5candidate exploration implementations. 

6""" 

7 

8from abc import ABC, abstractmethod 

9from dataclasses import dataclass 

10from enum import Enum 

11from typing import Dict, List, Optional, Set 

12 

13from langchain_core.language_models import BaseChatModel 

14from loguru import logger 

15 

16from ..candidates.base_candidate import Candidate 

17from ..constraints.base_constraint import Constraint 

18 

19 

20class ExplorationStrategy(Enum): 

21 """Different exploration strategies.""" 

22 

23 BREADTH_FIRST = "breadth_first" # Explore widely first 

24 DEPTH_FIRST = "depth_first" # Deep dive into promising areas 

25 CONSTRAINT_GUIDED = "constraint_guided" # Let constraints guide exploration 

26 DIVERSITY_FOCUSED = "diversity_focused" # Maximize candidate diversity 

27 ADAPTIVE = "adaptive" # Adapt based on findings 

28 

29 

30@dataclass 

31class ExplorationResult: 

32 """Result of candidate exploration.""" 

33 

34 candidates: List[Candidate] 

35 total_searched: int 

36 unique_candidates: int 

37 exploration_paths: List[str] 

38 metadata: Dict 

39 elapsed_time: float 

40 strategy_used: ExplorationStrategy 

41 

42 

43class BaseCandidateExplorer(ABC): 

44 """ 

45 Base class for candidate exploration implementations. 

46 

47 This provides the common interface and shared functionality that 

48 all candidate explorers should implement. 

49 """ 

50 

51 def __init__( 

52 self, 

53 model: BaseChatModel, 

54 search_engine, 

55 max_candidates: int = 50, 

56 max_search_time: float = 60.0, 

57 **kwargs, 

58 ): 

59 """ 

60 Initialize the base candidate explorer. 

61 

62 Args: 

63 model: Language model for analysis 

64 search_engine: Search engine for finding candidates 

65 max_candidates: Maximum number of candidates to find 

66 max_search_time: Maximum time to spend searching 

67 **kwargs: Additional parameters for specific implementations 

68 """ 

69 self.model = model 

70 self.search_engine = search_engine 

71 self.max_candidates = max_candidates 

72 self.max_search_time = max_search_time 

73 

74 # Tracking 

75 self.explored_queries: Set[str] = set() 

76 self.found_candidates: Dict[str, Candidate] = {} 

77 

78 @abstractmethod 

79 def explore( 

80 self, 

81 initial_query: str, 

82 constraints: Optional[List[Constraint]] = None, 

83 entity_type: Optional[str] = None, 

84 ) -> ExplorationResult: 

85 """ 

86 Explore and discover candidates. 

87 

88 Args: 

89 initial_query: Starting query for exploration 

90 constraints: Optional constraints to guide exploration 

91 entity_type: Optional entity type to focus on 

92 

93 Returns: 

94 ExplorationResult: Complete exploration results 

95 """ 

96 pass 

97 

98 @abstractmethod 

99 def generate_exploration_queries( 

100 self, 

101 base_query: str, 

102 found_candidates: List[Candidate], 

103 constraints: Optional[List[Constraint]] = None, 

104 ) -> List[str]: 

105 """ 

106 Generate new queries for continued exploration. 

107 

108 Args: 

109 base_query: Original base query 

110 found_candidates: Candidates found so far 

111 constraints: Optional constraints to consider 

112 

113 Returns: 

114 List[str]: New queries to explore 

115 """ 

116 pass 

117 

118 def _execute_search(self, query: str) -> Dict: 

119 """Execute a search query.""" 

120 try: 

121 # Mark query as explored 

122 self.explored_queries.add(query.lower()) 

123 

124 # Execute search 

125 results = self.search_engine.run(query) 

126 

127 # Handle different result formats 

128 if isinstance(results, list): 

129 # If results is a list, wrap it in the expected format 

130 formatted_results = {"results": results, "query": query} 

131 logger.info( 

132 f"Search '{query[:50]}...' returned {len(results)} results" 

133 ) 

134 return formatted_results 

135 elif isinstance(results, dict): 135 ↛ 144line 135 didn't jump to line 144 because the condition on line 135 was always true

136 # If results is already a dict, use it as is 

137 result_count = len(results.get("results", [])) 

138 logger.info( 

139 f"Search '{query[:50]}...' returned {result_count} results" 

140 ) 

141 return results 

142 else: 

143 # Unknown format, return empty 

144 logger.warning(f"Unknown search result format: {type(results)}") 

145 return {"results": [], "query": query} 

146 

147 except Exception: 

148 logger.exception(f"Error executing search '{query}'") 

149 return {"results": []} 

150 

151 def _extract_candidates_from_results( 

152 self, 

153 results: Dict, 

154 original_query: str = None, 

155 entity_type: Optional[str] = None, 

156 ) -> List[Candidate]: 

157 """Generate answer candidates directly from search results using LLM.""" 

158 candidates = [] 

159 

160 # Collect all search result content 

161 all_content = [] 

162 for result in results.get("results", []): 

163 title = result.get("title", "") 

164 snippet = result.get("snippet", "") 

165 if title or snippet: 

166 all_content.append(f"Title: {title}\nContent: {snippet}") 

167 

168 if not all_content or not original_query: 

169 return candidates 

170 

171 # Generate answer candidates using LLM 

172 answer_candidates = self._generate_answer_candidates( 

173 original_query, 

174 "\n\n".join(all_content[:10]), # Limit to first 10 results 

175 ) 

176 

177 for answer in answer_candidates: 

178 if answer and answer not in self.found_candidates: 

179 candidate = Candidate( 

180 name=answer, 

181 metadata={ 

182 "source": "llm_answer_generation", 

183 "query": results.get("query", ""), 

184 "original_query": original_query, 

185 "result_count": len(results.get("results", [])), 

186 }, 

187 ) 

188 candidates.append(candidate) 

189 self.found_candidates[answer] = candidate 

190 

191 return candidates 

192 

193 def _generate_answer_candidates( 

194 self, question: str, search_content: str 

195 ) -> List[str]: 

196 """Generate multiple answer candidates from search results.""" 

197 prompt = f""" 

198Question: {question} 

199 

200Based on these search results, provide 3-5 possible answers: 

201 

202{search_content} 

203 

204Give me multiple possible answers, one per line: 

205""" 

206 

207 try: 

208 response = self.model.invoke(prompt) 

209 content = response.content.strip() 

210 

211 # Parse multiple answers 

212 answers = [] 

213 for line in content.split("\n"): 

214 line = line.strip() 

215 if line: 

216 # Clean up common prefixes and formatting 

217 line = line.lstrip("•-*1234567890.").strip() 

218 if line and len(line) > 2: # Skip very short answers 

219 answers.append(line) 

220 

221 return answers[:5] # Limit to 5 candidates max 

222 

223 except Exception: 

224 logger.exception("Error generating answer candidates") 

225 return [] 

226 

227 def _extract_entity_names( 

228 self, text: str, entity_type: Optional[str] = None 

229 ) -> List[str]: 

230 """Extract entity names from text using LLM.""" 

231 if not text.strip(): 231 ↛ 234line 231 didn't jump to line 234 because the condition on line 231 was always true

232 return [] 

233 

234 prompt = f""" 

235Extract specific entity names from this text. 

236{"Focus on: " + entity_type if entity_type else "Extract any named entities."} 

237 

238Text: {text[:500]} 

239 

240Return only the names, one per line. Be selective - only include clear, specific names. 

241Do not include: 

242- Generic terms or categories 

243- Adjectives or descriptions 

244- Common words 

245 

246Names: 

247""" 

248 

249 try: 

250 response = self.model.invoke(prompt).content.strip() 

251 

252 # Parse response into names 

253 names = [] 

254 for line in response.split("\n"): 

255 name = line.strip() 

256 if ( 

257 name 

258 and len(name) > 2 

259 and not name.lower().startswith(("the ", "a ", "an ")) 

260 ): 

261 names.append(name) 

262 

263 return names[:5] # Limit to top 5 per text 

264 

265 except Exception: 

266 logger.exception("Error extracting entity names") 

267 return [] 

268 

269 def _should_continue_exploration( 

270 self, start_time: float, candidates_found: int 

271 ) -> bool: 

272 """Determine if exploration should continue.""" 

273 import time 

274 

275 elapsed = time.time() - start_time 

276 

277 # Stop if time limit reached 

278 if elapsed > self.max_search_time: 

279 logger.info(f"Time limit reached ({elapsed:.1f}s)") 

280 return False 

281 

282 # Stop if candidate limit reached 

283 if candidates_found >= self.max_candidates: 283 ↛ 287line 283 didn't jump to line 287 because the condition on line 283 was always true

284 logger.info(f"Candidate limit reached ({candidates_found})") 

285 return False 

286 

287 return True 

288 

289 def _deduplicate_candidates( 

290 self, candidates: List[Candidate] 

291 ) -> List[Candidate]: 

292 """Remove duplicate candidates based on name similarity.""" 

293 unique_candidates = [] 

294 seen_names = set() 

295 

296 for candidate in candidates: 

297 # Simple deduplication by exact name match 

298 name_key = candidate.name.lower().strip() 

299 if name_key not in seen_names: 

300 seen_names.add(name_key) 

301 unique_candidates.append(candidate) 

302 

303 return unique_candidates 

304 

305 def _rank_candidates_by_relevance( 

306 self, candidates: List[Candidate], query: str 

307 ) -> List[Candidate]: 

308 """Rank candidates by relevance to original query.""" 

309 if not candidates: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 return candidates 

311 

312 # Simple relevance scoring based on metadata 

313 for candidate in candidates: 

314 score = 0.0 

315 

316 # Score based on source query similarity 

317 if "query" in candidate.metadata: 317 ↛ 327line 317 didn't jump to line 327 because the condition on line 317 was always true

318 # Simple word overlap scoring 

319 query_words = set(query.lower().split()) 

320 candidate_query_words = set( 

321 candidate.metadata["query"].lower().split() 

322 ) 

323 overlap = len(query_words.intersection(candidate_query_words)) 

324 score += overlap * 0.1 

325 

326 # Score based on result title relevance 

327 if "result_title" in candidate.metadata: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true

328 title_words = set( 

329 candidate.metadata["result_title"].lower().split() 

330 ) 

331 overlap = len(query_words.intersection(title_words)) 

332 score += overlap * 0.2 

333 

334 candidate.relevance_score = score 

335 

336 # Sort by relevance 

337 return sorted( 

338 candidates, 

339 key=lambda c: getattr(c, "relevance_score", 0.0), 

340 reverse=True, 

341 )