Coverage for src/local_deep_research/web_search_engines/engines/search_engine_stackexchange.py: 95%
231 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""Stack Exchange search engine for Q&A content."""
3import html
4import re
5import time
6from typing import Any, Dict, List, Optional
8import requests
9from langchain_core.language_models import BaseLLM
10from loguru import logger
12from ...constants import USER_AGENT
13from ...security.safe_requests import safe_get
14from ..rate_limiting import RateLimitError
15from ..search_engine_base import BaseSearchEngine
18class StackExchangeSearchEngine(BaseSearchEngine):
19 """
20 Stack Exchange search engine for Q&A content.
22 Provides access to Stack Overflow and other Stack Exchange sites.
23 No authentication required (300 requests/day without key).
24 """
26 is_public = True
27 is_generic = False
28 is_scientific = False
29 is_code = True
30 is_lexical = True
31 needs_llm_relevance_filter = True
33 # Common Stack Exchange sites
34 SITES = {
35 "stackoverflow": "Stack Overflow",
36 "serverfault": "Server Fault",
37 "superuser": "Super User",
38 "askubuntu": "Ask Ubuntu",
39 "unix": "Unix & Linux",
40 "math": "Mathematics",
41 "physics": "Physics",
42 "stats": "Cross Validated",
43 "security": "Information Security",
44 "dba": "Database Administrators",
45 }
47 # Sites with their own .com domains (not *.stackexchange.com)
48 SITE_DOMAINS = {
49 "stackoverflow": "stackoverflow.com",
50 "serverfault": "serverfault.com",
51 "superuser": "superuser.com",
52 "askubuntu": "askubuntu.com",
53 }
55 def __init__(
56 self,
57 max_results: int = 10,
58 site: str = "stackoverflow",
59 sort: str = "relevance",
60 accepted_only: bool = False,
61 has_answers: bool = False,
62 min_score: Optional[int] = None,
63 tagged: Optional[str] = None,
64 llm: Optional[BaseLLM] = None,
65 max_filtered_results: Optional[int] = None,
66 settings_snapshot: Optional[Dict[str, Any]] = None,
67 **kwargs,
68 ):
69 """
70 Initialize the Stack Exchange search engine.
72 Args:
73 max_results: Maximum number of search results
74 site: Stack Exchange site to search (stackoverflow, serverfault, etc.)
75 sort: Sort order (relevance, votes, creation, activity)
76 accepted_only: Only return questions with accepted answers
77 has_answers: Only return questions that have answers
78 min_score: Minimum score for questions
79 tagged: Filter by tags (semicolon separated)
80 llm: Language model for relevance filtering
81 max_filtered_results: Maximum results after filtering
82 settings_snapshot: Settings snapshot for thread context
83 """
84 super().__init__(
85 llm=llm,
86 max_filtered_results=max_filtered_results,
87 max_results=max_results,
88 settings_snapshot=settings_snapshot,
89 **kwargs,
90 )
92 # Validate site parameter
93 if site not in self.SITES:
94 valid_sites = ", ".join(self.SITES.keys())
95 raise ValueError(
96 f"Invalid site: '{site}'. Must be one of: {valid_sites}"
97 )
99 # Validate sort parameter
100 valid_sorts = ("relevance", "votes", "creation", "activity")
101 if sort not in valid_sorts:
102 raise ValueError(
103 f"Invalid sort: '{sort}'. Must be one of: {', '.join(valid_sorts)}"
104 )
106 # Validate sort/min_score combination: the StackExchange API's "min"
107 # parameter works with any sort except "relevance".
108 if min_score is not None and sort == "relevance":
109 raise ValueError(
110 "min_score requires a numeric sort order (votes, creation, or activity). "
111 "sort='relevance' does not support the 'min' parameter."
112 )
114 self.site = site
115 self.sort = sort
116 self.accepted_only = accepted_only
117 self.has_answers = has_answers
118 self.min_score = min_score
119 self.tagged = tagged
121 self.base_url = "https://api.stackexchange.com/2.3"
122 self.search_url = f"{self.base_url}/search/advanced"
124 # User-Agent and required headers for API requests
125 self.headers = {
126 "User-Agent": USER_AGENT,
127 "Accept-Encoding": "gzip, deflate",
128 }
130 # Track backoff requirement from API responses
131 self._backoff_until: float = 0
133 def _apply_backoff(self) -> None:
134 """Apply backoff if required by previous API response."""
135 if self._backoff_until > 0:
136 wait_time = self._backoff_until - time.time()
137 if wait_time > 0: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 logger.info(
139 f"Stack Exchange backoff: waiting {wait_time:.1f} seconds"
140 )
141 time.sleep(wait_time)
142 self._backoff_until = 0
144 def _handle_backoff(self, data: Dict[str, Any]) -> None:
145 """Handle backoff field in API response."""
146 backoff = data.get("backoff")
147 if backoff:
148 self._backoff_until = time.time() + min(int(backoff), 300)
149 logger.warning(
150 f"Stack Exchange API requested backoff of {backoff} seconds"
151 )
153 def _build_query_params(self, query: str) -> Dict[str, Any]:
154 """Build query parameters for the API request."""
155 params = {
156 "q": query,
157 "site": self.site,
158 "order": "desc",
159 "sort": self.sort,
160 "pagesize": min(self.max_results, 100),
161 "filter": "withbody", # Include question body
162 }
164 if self.accepted_only:
165 params["accepted"] = "True"
167 if self.has_answers: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 params["answers"] = "1"
170 if self.min_score is not None:
171 params["min"] = self.min_score
173 if self.tagged:
174 params["tagged"] = self.tagged
176 return params
178 def _decode_html(self, text: str) -> str:
179 """Decode HTML entities in text."""
180 return html.unescape(text)
182 def _get_site_name(self) -> str:
183 """Get human-readable site name."""
184 return self.SITES.get(self.site, self.site.title())
186 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
187 """
188 Get preview information for Stack Exchange questions.
190 Args:
191 query: The search query
193 Returns:
194 List of preview dictionaries
195 """
196 logger.info(
197 f"Getting Stack Exchange previews for query: {query} on {self.site}"
198 )
200 # Apply rate limiting
201 self._last_wait_time = self.rate_tracker.apply_rate_limit(
202 self.engine_type
203 )
205 # Apply backoff if required by previous API response
206 self._apply_backoff()
208 try:
209 params = self._build_query_params(query)
211 response = safe_get(
212 self.search_url,
213 params=params,
214 headers=self.headers,
215 timeout=30,
216 )
218 self._raise_if_rate_limit(response.status_code)
220 response.raise_for_status()
221 data = response.json()
223 # Handle backoff if present in response
224 self._handle_backoff(data)
226 # Check for API errors
227 if "error_id" in data:
228 error_msg = data.get("error_message", "Unknown error")
229 logger.error(f"Stack Exchange API error: {error_msg}")
230 return []
232 results = data.get("items", [])
233 quota_remaining = data.get("quota_remaining", 0)
234 logger.info(
235 f"Found {len(results)} Stack Exchange results, quota remaining: {quota_remaining}"
236 )
238 if quota_remaining < 10:
239 logger.warning(f"Stack Exchange quota low: {quota_remaining}")
241 previews = []
242 for question in results[: self.max_results]:
243 try:
244 question_id = question.get("question_id")
245 title = self._decode_html(question.get("title", "Untitled"))
247 # Get owner info
248 owner = question.get("owner", {})
249 author = self._decode_html(
250 owner.get("display_name", "Unknown")
251 )
252 author_link = owner.get("link", "")
253 author_reputation = owner.get("reputation", 0)
255 # Get question stats
256 score = question.get("score", 0)
257 view_count = question.get("view_count", 0)
258 answer_count = question.get("answer_count", 0)
259 is_answered = question.get("is_answered", False)
260 accepted_answer_id = question.get("accepted_answer_id")
262 # Get tags
263 tags = question.get("tags", [])
265 # Build answer status prefix
266 status_parts = []
267 if is_answered:
268 status = f"Answered ({answer_count} answer{'s' if answer_count != 1 else ''}"
269 if accepted_answer_id: 269 ↛ 271line 269 didn't jump to line 271 because the condition on line 269 was always true
270 status += ", accepted"
271 status += ")"
272 status_parts.append(status)
273 elif answer_count > 0: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true
274 status_parts.append(
275 f"{answer_count} answer{'s' if answer_count != 1 else ''}"
276 )
277 if tags:
278 status_parts.append(f"Tags: {', '.join(tags[:4])}")
279 prefix = " | ".join(status_parts)
281 # Get body (snippet)
282 body = question.get("body", "")
283 # Strip HTML for snippet
284 body_text = html.unescape(re.sub(r"<[^>]+>", " ", body))
285 body_text = " ".join(body_text.split())[:1000]
286 snippet = f"{prefix} | {body_text}" if prefix else body_text
288 # Get dates
289 creation_date = question.get("creation_date", 0)
290 last_activity = question.get("last_activity_date", 0)
292 # Build link
293 fallback_domain = self.SITE_DOMAINS.get(
294 self.site, f"{self.site}.stackexchange.com"
295 )
296 link = question.get(
297 "link",
298 f"https://{fallback_domain}/questions/{question_id}",
299 )
301 preview = {
302 "id": str(question_id),
303 "title": title,
304 "link": link,
305 "snippet": snippet,
306 "author": author,
307 "author_link": author_link,
308 "author_reputation": author_reputation,
309 "score": score,
310 "view_count": view_count,
311 "answer_count": answer_count,
312 "is_answered": is_answered,
313 "has_accepted_answer": accepted_answer_id is not None,
314 "tags": tags,
315 "creation_date": creation_date,
316 "last_activity_date": last_activity,
317 "site": self.site,
318 "source": self._get_site_name(),
319 "_raw": question,
320 }
322 previews.append(preview)
324 except Exception:
325 logger.exception("Error parsing Stack Exchange question")
326 continue
328 return previews
330 except (requests.RequestException, ValueError) as e:
331 logger.exception("Stack Exchange API request failed")
332 self._raise_if_rate_limit(e)
333 return []
335 def _get_full_content(
336 self, relevant_items: List[Dict[str, Any]]
337 ) -> List[Dict[str, Any]]:
338 """
339 Get full content for the relevant Stack Exchange questions.
341 Fetches the question body and top answers from the API.
343 Args:
344 relevant_items: List of relevant preview dictionaries
346 Returns:
347 List of result dictionaries with full content
348 """
349 logger.info(
350 f"Getting full content for {len(relevant_items)} Stack Exchange questions"
351 )
353 results = []
354 for item in relevant_items:
355 result = item.copy()
357 raw = item.get("_raw", {})
358 if raw:
359 # Get full body
360 body = raw.get("body", "")
361 clean_body = html.unescape(re.sub(r"<[^>]+>", " ", body))
362 clean_body = " ".join(clean_body.split())
364 # Build content with question + answers
365 content_parts = []
366 content_parts.append(
367 f"Question: {result.get('title', 'Untitled')}"
368 )
369 if result.get("tags"):
370 content_parts.append(f"Tags: {', '.join(result['tags'])}")
371 content_parts.append(f"\n{clean_body}")
373 # Fetch top answers
374 question_id = raw.get("question_id")
375 if question_id:
376 try:
377 question_id = int(question_id)
378 except (TypeError, ValueError):
379 question_id = None
380 if question_id:
381 answers = self._fetch_top_answers(
382 question_id, max_answers=3
383 )
384 if answers: 384 ↛ 401line 384 didn't jump to line 401 because the condition on line 384 was always true
385 content_parts.append(
386 f"\n--- Top Answers ({len(answers)}) ---"
387 )
388 for ans in answers:
389 ans_body = html.unescape(
390 re.sub(r"<[^>]+>", " ", ans.get("body", ""))
391 )
392 ans_body = " ".join(ans_body.split())[:3000]
393 score = ans.get("score", 0)
394 accepted = ans.get("is_accepted", False)
395 label = f"[Score: {score}"
396 if accepted: 396 ↛ 398line 396 didn't jump to line 398 because the condition on line 396 was always true
397 label += ", Accepted"
398 label += "]"
399 content_parts.append(f"\n{label}\n{ans_body}")
401 result["content"] = "\n".join(content_parts)
403 # Clean up internal fields
404 if "_raw" in result:
405 del result["_raw"]
407 results.append(result)
409 return results
411 def _fetch_top_answers(
412 self, question_id: int, max_answers: int = 3
413 ) -> List[Dict[str, Any]]:
414 """Fetch top answers for a question, sorted by votes."""
415 try:
416 self._apply_backoff()
417 url = f"{self.base_url}/questions/{question_id}/answers"
418 params = {
419 "site": self.site,
420 "order": "desc",
421 "sort": "votes",
422 "pagesize": max_answers,
423 "filter": "withbody",
424 }
425 response = safe_get(
426 url, params=params, headers=self.headers, timeout=30
427 )
428 self._raise_if_rate_limit(response.status_code)
429 response.raise_for_status()
430 data = response.json()
431 self._handle_backoff(data)
433 if "error_id" in data:
434 logger.warning(
435 f"Stack Exchange API error fetching answers for "
436 f"{question_id}: {data.get('error_message', 'Unknown')}"
437 )
438 return []
440 quota_remaining = data.get("quota_remaining")
441 if quota_remaining is not None and quota_remaining < 10:
442 logger.warning(f"Stack Exchange quota low: {quota_remaining}")
444 return data.get("items", []) # type: ignore[no-any-return]
445 except (RateLimitError, ValueError):
446 raise
447 except Exception:
448 logger.warning(
449 f"Failed to fetch answers for question {question_id}"
450 )
451 return []
453 def get_question(self, question_id: int) -> Optional[Dict[str, Any]]:
454 """
455 Get a specific question by ID.
457 Args:
458 question_id: The Stack Exchange question ID
460 Returns:
461 Question dictionary or None
462 """
463 try:
464 url = f"{self.base_url}/questions/{question_id}"
465 params = {"site": self.site, "filter": "withbody"}
466 response = safe_get(
467 url, params=params, headers=self.headers, timeout=30
468 )
469 self._raise_if_rate_limit(response.status_code)
470 response.raise_for_status()
471 data = response.json()
472 self._handle_backoff(data)
473 items = data.get("items", [])
474 return items[0] if items else None
475 except RateLimitError:
476 raise
477 except Exception:
478 logger.exception(
479 f"Error fetching Stack Exchange question {question_id}"
480 )
481 return None
483 def get_answers(self, question_id: int) -> List[Dict[str, Any]]:
484 """
485 Get answers for a specific question.
487 Args:
488 question_id: The Stack Exchange question ID
490 Returns:
491 List of answer dictionaries
492 """
493 try:
494 url = f"{self.base_url}/questions/{question_id}/answers"
495 params = {
496 "site": self.site,
497 "order": "desc",
498 "sort": "votes",
499 "filter": "withbody",
500 }
501 response = safe_get(
502 url, params=params, headers=self.headers, timeout=30
503 )
504 self._raise_if_rate_limit(response.status_code)
505 response.raise_for_status()
506 data = response.json()
507 self._handle_backoff(data)
508 return data.get("items", []) # type: ignore[no-any-return]
509 except RateLimitError:
510 raise
511 except Exception:
512 logger.exception(
513 f"Error fetching answers for question {question_id}"
514 )
515 return []
517 def search_by_tag(self, tag: str, query: str = "") -> List[Dict[str, Any]]:
518 """
519 Search questions by tag.
521 Args:
522 tag: The tag to filter by
523 query: Optional search query
525 Returns:
526 List of matching questions
527 """
528 original_tagged = self.tagged
529 try:
530 self.tagged = tag
531 return self.run(query)
532 finally:
533 self.tagged = original_tagged