Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / base_explorer.py: 55%
137 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Base candidate explorer for inheritance-based exploration system.
4This module provides the base interface and common functionality for
5candidate exploration implementations.
6"""
8from abc import ABC, abstractmethod
9from dataclasses import dataclass
10from enum import Enum
11from typing import Dict, List, Optional, Set
13from langchain_core.language_models import BaseChatModel
14from loguru import logger
16from ..candidates.base_candidate import Candidate
17from ..constraints.base_constraint import Constraint
20class ExplorationStrategy(Enum):
21 """Different exploration strategies."""
23 BREADTH_FIRST = "breadth_first" # Explore widely first
24 DEPTH_FIRST = "depth_first" # Deep dive into promising areas
25 CONSTRAINT_GUIDED = "constraint_guided" # Let constraints guide exploration
26 DIVERSITY_FOCUSED = "diversity_focused" # Maximize candidate diversity
27 ADAPTIVE = "adaptive" # Adapt based on findings
30@dataclass
31class ExplorationResult:
32 """Result of candidate exploration."""
34 candidates: List[Candidate]
35 total_searched: int
36 unique_candidates: int
37 exploration_paths: List[str]
38 metadata: Dict
39 elapsed_time: float
40 strategy_used: ExplorationStrategy
43class BaseCandidateExplorer(ABC):
44 """
45 Base class for candidate exploration implementations.
47 This provides the common interface and shared functionality that
48 all candidate explorers should implement.
49 """
51 def __init__(
52 self,
53 model: BaseChatModel,
54 search_engine,
55 max_candidates: int = 50,
56 max_search_time: float = 60.0,
57 **kwargs,
58 ):
59 """
60 Initialize the base candidate explorer.
62 Args:
63 model: Language model for analysis
64 search_engine: Search engine for finding candidates
65 max_candidates: Maximum number of candidates to find
66 max_search_time: Maximum time to spend searching
67 **kwargs: Additional parameters for specific implementations
68 """
69 self.model = model
70 self.search_engine = search_engine
71 self.max_candidates = max_candidates
72 self.max_search_time = max_search_time
74 # Tracking
75 self.explored_queries: Set[str] = set()
76 self.found_candidates: Dict[str, Candidate] = {}
78 @abstractmethod
79 def explore(
80 self,
81 initial_query: str,
82 constraints: Optional[List[Constraint]] = None,
83 entity_type: Optional[str] = None,
84 ) -> ExplorationResult:
85 """
86 Explore and discover candidates.
88 Args:
89 initial_query: Starting query for exploration
90 constraints: Optional constraints to guide exploration
91 entity_type: Optional entity type to focus on
93 Returns:
94 ExplorationResult: Complete exploration results
95 """
96 pass
98 @abstractmethod
99 def generate_exploration_queries(
100 self,
101 base_query: str,
102 found_candidates: List[Candidate],
103 constraints: Optional[List[Constraint]] = None,
104 ) -> List[str]:
105 """
106 Generate new queries for continued exploration.
108 Args:
109 base_query: Original base query
110 found_candidates: Candidates found so far
111 constraints: Optional constraints to consider
113 Returns:
114 List[str]: New queries to explore
115 """
116 pass
118 def _execute_search(self, query: str) -> Dict:
119 """Execute a search query."""
120 try:
121 # Mark query as explored
122 self.explored_queries.add(query.lower())
124 # Execute search
125 results = self.search_engine.run(query)
127 # Handle different result formats
128 if isinstance(results, list):
129 # If results is a list, wrap it in the expected format
130 formatted_results = {"results": results, "query": query}
131 logger.info(
132 f"Search '{query[:50]}...' returned {len(results)} results"
133 )
134 return formatted_results
135 elif isinstance(results, dict): 135 ↛ 144line 135 didn't jump to line 144 because the condition on line 135 was always true
136 # If results is already a dict, use it as is
137 result_count = len(results.get("results", []))
138 logger.info(
139 f"Search '{query[:50]}...' returned {result_count} results"
140 )
141 return results
142 else:
143 # Unknown format, return empty
144 logger.warning(f"Unknown search result format: {type(results)}")
145 return {"results": [], "query": query}
147 except Exception:
148 logger.exception(f"Error executing search '{query}'")
149 return {"results": []}
151 def _extract_candidates_from_results(
152 self,
153 results: Dict,
154 original_query: str = None,
155 entity_type: Optional[str] = None,
156 ) -> List[Candidate]:
157 """Generate answer candidates directly from search results using LLM."""
158 candidates = []
160 # Collect all search result content
161 all_content = []
162 for result in results.get("results", []):
163 title = result.get("title", "")
164 snippet = result.get("snippet", "")
165 if title or snippet:
166 all_content.append(f"Title: {title}\nContent: {snippet}")
168 if not all_content or not original_query:
169 return candidates
171 # Generate answer candidates using LLM
172 answer_candidates = self._generate_answer_candidates(
173 original_query,
174 "\n\n".join(all_content[:10]), # Limit to first 10 results
175 )
177 for answer in answer_candidates:
178 if answer and answer not in self.found_candidates:
179 candidate = Candidate(
180 name=answer,
181 metadata={
182 "source": "llm_answer_generation",
183 "query": results.get("query", ""),
184 "original_query": original_query,
185 "result_count": len(results.get("results", [])),
186 },
187 )
188 candidates.append(candidate)
189 self.found_candidates[answer] = candidate
191 return candidates
193 def _generate_answer_candidates(
194 self, question: str, search_content: str
195 ) -> List[str]:
196 """Generate multiple answer candidates from search results."""
197 prompt = f"""
198Question: {question}
200Based on these search results, provide 3-5 possible answers:
202{search_content}
204Give me multiple possible answers, one per line:
205"""
207 try:
208 response = self.model.invoke(prompt)
209 content = response.content.strip()
211 # Parse multiple answers
212 answers = []
213 for line in content.split("\n"):
214 line = line.strip()
215 if line:
216 # Clean up common prefixes and formatting
217 line = line.lstrip("•-*1234567890.").strip()
218 if line and len(line) > 2: # Skip very short answers
219 answers.append(line)
221 return answers[:5] # Limit to 5 candidates max
223 except Exception:
224 logger.exception("Error generating answer candidates")
225 return []
227 def _extract_entity_names(
228 self, text: str, entity_type: Optional[str] = None
229 ) -> List[str]:
230 """Extract entity names from text using LLM."""
231 if not text.strip(): 231 ↛ 234line 231 didn't jump to line 234 because the condition on line 231 was always true
232 return []
234 prompt = f"""
235Extract specific entity names from this text.
236{"Focus on: " + entity_type if entity_type else "Extract any named entities."}
238Text: {text[:500]}
240Return only the names, one per line. Be selective - only include clear, specific names.
241Do not include:
242- Generic terms or categories
243- Adjectives or descriptions
244- Common words
246Names:
247"""
249 try:
250 response = self.model.invoke(prompt).content.strip()
252 # Parse response into names
253 names = []
254 for line in response.split("\n"):
255 name = line.strip()
256 if (
257 name
258 and len(name) > 2
259 and not name.lower().startswith(("the ", "a ", "an "))
260 ):
261 names.append(name)
263 return names[:5] # Limit to top 5 per text
265 except Exception:
266 logger.exception("Error extracting entity names")
267 return []
269 def _should_continue_exploration(
270 self, start_time: float, candidates_found: int
271 ) -> bool:
272 """Determine if exploration should continue."""
273 import time
275 elapsed = time.time() - start_time
277 # Stop if time limit reached
278 if elapsed > self.max_search_time:
279 logger.info(f"Time limit reached ({elapsed:.1f}s)")
280 return False
282 # Stop if candidate limit reached
283 if candidates_found >= self.max_candidates: 283 ↛ 287line 283 didn't jump to line 287 because the condition on line 283 was always true
284 logger.info(f"Candidate limit reached ({candidates_found})")
285 return False
287 return True
289 def _deduplicate_candidates(
290 self, candidates: List[Candidate]
291 ) -> List[Candidate]:
292 """Remove duplicate candidates based on name similarity."""
293 unique_candidates = []
294 seen_names = set()
296 for candidate in candidates:
297 # Simple deduplication by exact name match
298 name_key = candidate.name.lower().strip()
299 if name_key not in seen_names:
300 seen_names.add(name_key)
301 unique_candidates.append(candidate)
303 return unique_candidates
305 def _rank_candidates_by_relevance(
306 self, candidates: List[Candidate], query: str
307 ) -> List[Candidate]:
308 """Rank candidates by relevance to original query."""
309 if not candidates: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 return candidates
312 # Simple relevance scoring based on metadata
313 for candidate in candidates:
314 score = 0.0
316 # Score based on source query similarity
317 if "query" in candidate.metadata: 317 ↛ 327line 317 didn't jump to line 327 because the condition on line 317 was always true
318 # Simple word overlap scoring
319 query_words = set(query.lower().split())
320 candidate_query_words = set(
321 candidate.metadata["query"].lower().split()
322 )
323 overlap = len(query_words.intersection(candidate_query_words))
324 score += overlap * 0.1
326 # Score based on result title relevance
327 if "result_title" in candidate.metadata: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true
328 title_words = set(
329 candidate.metadata["result_title"].lower().split()
330 )
331 overlap = len(query_words.intersection(title_words))
332 score += overlap * 0.2
334 candidate.relevance_score = score
336 # Sort by relevance
337 return sorted(
338 candidates,
339 key=lambda c: getattr(c, "relevance_score", 0.0),
340 reverse=True,
341 )