Coverage for src / local_deep_research / advanced_search_system / candidate_exploration / base_explorer.py: 98%
130 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Base candidate explorer for inheritance-based exploration system.
4This module provides the base interface and common functionality for
5candidate exploration implementations.
6"""
8from abc import ABC, abstractmethod
9from dataclasses import dataclass
10from enum import Enum
11from typing import Any, Optional
13from langchain_core.language_models import BaseChatModel
14from loguru import logger
16from ..candidates.base_candidate import Candidate
17from ..constraints.base_constraint import Constraint
20class ExplorationStrategy(Enum):
21 """Different exploration strategies."""
23 BREADTH_FIRST = "breadth_first" # Explore widely first
24 DEPTH_FIRST = "depth_first" # Deep dive into promising areas
25 CONSTRAINT_GUIDED = "constraint_guided" # Let constraints guide exploration
26 DIVERSITY_FOCUSED = "diversity_focused" # Maximize candidate diversity
27 ADAPTIVE = "adaptive" # Adapt based on findings
30@dataclass
31class ExplorationResult:
32 """Result of candidate exploration."""
34 candidates: list[Candidate]
35 total_searched: int
36 unique_candidates: int
37 exploration_paths: list[str]
38 metadata: dict[str, Any]
39 elapsed_time: float
40 strategy_used: ExplorationStrategy
43class BaseCandidateExplorer(ABC):
44 """
45 Base class for candidate exploration implementations.
47 This provides the common interface and shared functionality that
48 all candidate explorers should implement.
49 """
51 def __init__(
52 self,
53 model: BaseChatModel,
54 search_engine,
55 max_candidates: int = 50,
56 max_search_time: float = 60.0,
57 **kwargs,
58 ):
59 """
60 Initialize the base candidate explorer.
62 Args:
63 model: Language model for analysis
64 search_engine: Search engine for finding candidates
65 max_candidates: Maximum number of candidates to find
66 max_search_time: Maximum time to spend searching
67 **kwargs: Additional parameters for specific implementations
68 """
69 self.model = model
70 self.search_engine = search_engine
71 self.max_candidates = max_candidates
72 self.max_search_time = max_search_time
74 # Tracking
75 self.explored_queries: set[str] = set()
76 self.found_candidates: dict[str, Candidate] = {}
78 @abstractmethod
79 def explore(
80 self,
81 initial_query: str,
82 constraints: Optional[list[Constraint]] = None,
83 entity_type: Optional[str] = None,
84 ) -> ExplorationResult:
85 """
86 Explore and discover candidates.
88 Args:
89 initial_query: Starting query for exploration
90 constraints: Optional constraints to guide exploration
91 entity_type: Optional entity type to focus on
93 Returns:
94 ExplorationResult: Complete exploration results
95 """
96 pass
98 @abstractmethod
99 def generate_exploration_queries(
100 self,
101 base_query: str,
102 found_candidates: list[Candidate],
103 constraints: Optional[list[Constraint]] = None,
104 ) -> list[str]:
105 """
106 Generate new queries for continued exploration.
108 Args:
109 base_query: Original base query
110 found_candidates: Candidates found so far
111 constraints: Optional constraints to consider
113 Returns:
114 List[str]: New queries to explore
115 """
116 pass
118 def _execute_search(self, query: str) -> dict[str, Any]:
119 """Execute a search query."""
120 try:
121 # Mark query as explored
122 self.explored_queries.add(query.lower())
124 # Execute search
125 results = self.search_engine.run(query)
127 # Handle different result formats
128 if isinstance(results, list):
129 # If results is a list, wrap it in the expected format
130 formatted_results = {"results": results, "query": query}
131 logger.info(
132 f"Search '{query[:50]}...' returned {len(results)} results"
133 )
134 return formatted_results
135 if isinstance(results, dict):
136 # If results is already a dict, use it as is
137 result_count = len(results.get("results", []))
138 logger.info(
139 f"Search '{query[:50]}...' returned {result_count} results"
140 )
141 return results
142 # Unknown format, return empty
143 logger.warning(f"Unknown search result format: {type(results)}")
144 return {"results": [], "query": query}
146 except Exception:
147 logger.exception(f"Error executing search '{query}'")
148 return {"results": []}
150 def _extract_candidates_from_results(
151 self,
152 results: dict[str, Any],
153 original_query: str = None,
154 entity_type: Optional[str] = None,
155 ) -> list[Candidate]:
156 """Generate answer candidates directly from search results using LLM."""
157 candidates = []
159 # Collect all search result content
160 all_content = []
161 for result in results.get("results", []):
162 title = result.get("title", "")
163 snippet = result.get("snippet", "")
164 if title or snippet: 164 ↛ 161line 164 didn't jump to line 161 because the condition on line 164 was always true
165 all_content.append(f"Title: {title}\nContent: {snippet}")
167 if not all_content or not original_query:
168 return candidates
170 # Generate answer candidates using LLM
171 answer_candidates = self._generate_answer_candidates(
172 original_query,
173 "\n\n".join(all_content[:10]), # Limit to first 10 results
174 )
176 for answer in answer_candidates:
177 if answer and answer not in self.found_candidates:
178 candidate = Candidate(
179 name=answer,
180 metadata={
181 "source": "llm_answer_generation",
182 "query": results.get("query", ""),
183 "original_query": original_query,
184 "result_count": len(results.get("results", [])),
185 },
186 )
187 candidates.append(candidate)
188 self.found_candidates[answer] = candidate
190 return candidates
192 def _generate_answer_candidates(
193 self, question: str, search_content: str
194 ) -> list[str]:
195 """Generate multiple answer candidates from search results."""
196 prompt = f"""
197Question: {question}
199Based on these search results, provide 3-5 possible answers:
201{search_content}
203Give me multiple possible answers, one per line:
204"""
206 try:
207 response = self.model.invoke(prompt)
208 content = response.content.strip()
210 # Parse multiple answers
211 answers = []
212 for line in content.split("\n"):
213 line = line.strip()
214 if line: 214 ↛ 212line 214 didn't jump to line 212 because the condition on line 214 was always true
215 # Clean up common prefixes and formatting
216 line = line.lstrip("•-*1234567890.").strip()
217 if line and len(line) > 2: # Skip very short answers
218 answers.append(line)
220 return answers[:5] # Limit to 5 candidates max
222 except Exception:
223 logger.exception("Error generating answer candidates")
224 return []
226 def _extract_entity_names(
227 self, text: str, entity_type: Optional[str] = None
228 ) -> list[str]:
229 """Extract entity names from text using LLM."""
230 if not text.strip():
231 return []
233 prompt = f"""
234Extract specific entity names from this text.
235{"Focus on: " + entity_type if entity_type else "Extract any named entities."}
237Text: {text[:500]}
239Return only the names, one per line. Be selective - only include clear, specific names.
240Do not include:
241- Generic terms or categories
242- Adjectives or descriptions
243- Common words
245Names:
246"""
248 try:
249 response = self.model.invoke(prompt).content.strip()
251 # Parse response into names
252 names = []
253 for line in response.split("\n"):
254 name = line.strip()
255 if (
256 name
257 and len(name) > 2
258 and not name.lower().startswith(("the ", "a ", "an "))
259 ):
260 names.append(name)
262 return names[:5] # Limit to top 5 per text
264 except Exception:
265 logger.exception("Error extracting entity names")
266 return []
268 def _should_continue_exploration(
269 self, start_time: float, candidates_found: int
270 ) -> bool:
271 """Determine if exploration should continue."""
272 import time
274 elapsed = time.time() - start_time
276 # Stop if time limit reached
277 if elapsed > self.max_search_time:
278 logger.info(f"Time limit reached ({elapsed:.1f}s)")
279 return False
281 # Stop if candidate limit reached
282 if candidates_found >= self.max_candidates:
283 logger.info(f"Candidate limit reached ({candidates_found})")
284 return False
286 return True
288 def _deduplicate_candidates(
289 self, candidates: list[Candidate]
290 ) -> list[Candidate]:
291 """Remove duplicate candidates based on name similarity."""
292 unique_candidates = []
293 seen_names = set()
295 for candidate in candidates:
296 # Simple deduplication by exact name match
297 name_key = candidate.name.lower().strip()
298 if name_key not in seen_names:
299 seen_names.add(name_key)
300 unique_candidates.append(candidate)
302 return unique_candidates
304 def _rank_candidates_by_relevance(
305 self, candidates: list[Candidate], query: str
306 ) -> list[Candidate]:
307 """Rank candidates by relevance to original query."""
308 if not candidates:
309 return candidates
311 # Simple relevance scoring based on metadata
312 for candidate in candidates:
313 score = 0.0
315 # Score based on source query similarity
316 if "query" in candidate.metadata:
317 # Simple word overlap scoring
318 query_words = set(query.lower().split())
319 candidate_query_words = set(
320 candidate.metadata["query"].lower().split()
321 )
322 overlap = len(query_words.intersection(candidate_query_words))
323 score += overlap * 0.1
325 # Score based on result title relevance
326 if "result_title" in candidate.metadata:
327 title_words = set(
328 candidate.metadata["result_title"].lower().split()
329 )
330 overlap = len(query_words.intersection(title_words))
331 score += overlap * 0.2
333 candidate.relevance_score = score
335 # Sort by relevance
336 return sorted(
337 candidates,
338 key=lambda c: getattr(c, "relevance_score", 0.0),
339 reverse=True,
340 )