Coverage for src / local_deep_research / web_search_engines / engines / search_engine_stackexchange.py: 95%
230 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""Stack Exchange search engine for Q&A content."""
3import html
4import re
5import time
6from typing import Any, Dict, List, Optional
8import requests
9from langchain_core.language_models import BaseLLM
10from loguru import logger
12from ...security.safe_requests import safe_get
13from ..rate_limiting import RateLimitError
14from ..search_engine_base import BaseSearchEngine
17class StackExchangeSearchEngine(BaseSearchEngine):
18 """
19 Stack Exchange search engine for Q&A content.
21 Provides access to Stack Overflow and other Stack Exchange sites.
22 No authentication required (300 requests/day without key).
23 """
25 is_public = True
26 is_generic = False
27 is_scientific = False
28 is_code = True
29 is_lexical = True
30 needs_llm_relevance_filter = True
32 # Common Stack Exchange sites
33 SITES = {
34 "stackoverflow": "Stack Overflow",
35 "serverfault": "Server Fault",
36 "superuser": "Super User",
37 "askubuntu": "Ask Ubuntu",
38 "unix": "Unix & Linux",
39 "math": "Mathematics",
40 "physics": "Physics",
41 "stats": "Cross Validated",
42 "security": "Information Security",
43 "dba": "Database Administrators",
44 }
46 # Sites with their own .com domains (not *.stackexchange.com)
47 SITE_DOMAINS = {
48 "stackoverflow": "stackoverflow.com",
49 "serverfault": "serverfault.com",
50 "superuser": "superuser.com",
51 "askubuntu": "askubuntu.com",
52 }
54 def __init__(
55 self,
56 max_results: int = 10,
57 site: str = "stackoverflow",
58 sort: str = "relevance",
59 accepted_only: bool = False,
60 has_answers: bool = False,
61 min_score: Optional[int] = None,
62 tagged: Optional[str] = None,
63 llm: Optional[BaseLLM] = None,
64 max_filtered_results: Optional[int] = None,
65 settings_snapshot: Optional[Dict[str, Any]] = None,
66 **kwargs,
67 ):
68 """
69 Initialize the Stack Exchange search engine.
71 Args:
72 max_results: Maximum number of search results
73 site: Stack Exchange site to search (stackoverflow, serverfault, etc.)
74 sort: Sort order (relevance, votes, creation, activity)
75 accepted_only: Only return questions with accepted answers
76 has_answers: Only return questions that have answers
77 min_score: Minimum score for questions
78 tagged: Filter by tags (semicolon separated)
79 llm: Language model for relevance filtering
80 max_filtered_results: Maximum results after filtering
81 settings_snapshot: Settings snapshot for thread context
82 """
83 super().__init__(
84 llm=llm,
85 max_filtered_results=max_filtered_results,
86 max_results=max_results,
87 settings_snapshot=settings_snapshot,
88 **kwargs,
89 )
91 # Validate site parameter
92 if site not in self.SITES:
93 valid_sites = ", ".join(self.SITES.keys())
94 raise ValueError(
95 f"Invalid site: '{site}'. Must be one of: {valid_sites}"
96 )
98 # Validate sort parameter
99 valid_sorts = ("relevance", "votes", "creation", "activity")
100 if sort not in valid_sorts:
101 raise ValueError(
102 f"Invalid sort: '{sort}'. Must be one of: {', '.join(valid_sorts)}"
103 )
105 # Validate sort/min_score combination: the StackExchange API's "min"
106 # parameter works with any sort except "relevance".
107 if min_score is not None and sort == "relevance":
108 raise ValueError(
109 "min_score requires a numeric sort order (votes, creation, or activity). "
110 "sort='relevance' does not support the 'min' parameter."
111 )
113 self.site = site
114 self.sort = sort
115 self.accepted_only = accepted_only
116 self.has_answers = has_answers
117 self.min_score = min_score
118 self.tagged = tagged
120 self.base_url = "https://api.stackexchange.com/2.3"
121 self.search_url = f"{self.base_url}/search/advanced"
123 # User-Agent and required headers for API requests
124 self.headers = {
125 "User-Agent": "LocalDeepResearch/1.0 (https://github.com/LearningCircuit/local-deep-research)",
126 "Accept-Encoding": "gzip, deflate",
127 }
129 # Track backoff requirement from API responses
130 self._backoff_until: float = 0
132 def _apply_backoff(self) -> None:
133 """Apply backoff if required by previous API response."""
134 if self._backoff_until > 0:
135 wait_time = self._backoff_until - time.time()
136 if wait_time > 0: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 logger.info(
138 f"Stack Exchange backoff: waiting {wait_time:.1f} seconds"
139 )
140 time.sleep(wait_time)
141 self._backoff_until = 0
143 def _handle_backoff(self, data: Dict[str, Any]) -> None:
144 """Handle backoff field in API response."""
145 backoff = data.get("backoff")
146 if backoff:
147 self._backoff_until = time.time() + min(int(backoff), 300)
148 logger.warning(
149 f"Stack Exchange API requested backoff of {backoff} seconds"
150 )
152 def _build_query_params(self, query: str) -> Dict[str, Any]:
153 """Build query parameters for the API request."""
154 params = {
155 "q": query,
156 "site": self.site,
157 "order": "desc",
158 "sort": self.sort,
159 "pagesize": min(self.max_results, 100),
160 "filter": "withbody", # Include question body
161 }
163 if self.accepted_only:
164 params["accepted"] = "True"
166 if self.has_answers: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true
167 params["answers"] = "1"
169 if self.min_score is not None:
170 params["min"] = self.min_score
172 if self.tagged:
173 params["tagged"] = self.tagged
175 return params
177 def _decode_html(self, text: str) -> str:
178 """Decode HTML entities in text."""
179 return html.unescape(text)
181 def _get_site_name(self) -> str:
182 """Get human-readable site name."""
183 return self.SITES.get(self.site, self.site.title())
185 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
186 """
187 Get preview information for Stack Exchange questions.
189 Args:
190 query: The search query
192 Returns:
193 List of preview dictionaries
194 """
195 logger.info(
196 f"Getting Stack Exchange previews for query: {query} on {self.site}"
197 )
199 # Apply rate limiting
200 self._last_wait_time = self.rate_tracker.apply_rate_limit(
201 self.engine_type
202 )
204 # Apply backoff if required by previous API response
205 self._apply_backoff()
207 try:
208 params = self._build_query_params(query)
210 response = safe_get(
211 self.search_url,
212 params=params,
213 headers=self.headers,
214 timeout=30,
215 )
217 self._raise_if_rate_limit(response.status_code)
219 response.raise_for_status()
220 data = response.json()
222 # Handle backoff if present in response
223 self._handle_backoff(data)
225 # Check for API errors
226 if "error_id" in data:
227 error_msg = data.get("error_message", "Unknown error")
228 logger.error(f"Stack Exchange API error: {error_msg}")
229 return []
231 results = data.get("items", [])
232 quota_remaining = data.get("quota_remaining", 0)
233 logger.info(
234 f"Found {len(results)} Stack Exchange results, quota remaining: {quota_remaining}"
235 )
237 if quota_remaining < 10:
238 logger.warning(f"Stack Exchange quota low: {quota_remaining}")
240 previews = []
241 for question in results[: self.max_results]:
242 try:
243 question_id = question.get("question_id")
244 title = self._decode_html(question.get("title", "Untitled"))
246 # Get owner info
247 owner = question.get("owner", {})
248 author = self._decode_html(
249 owner.get("display_name", "Unknown")
250 )
251 author_link = owner.get("link", "")
252 author_reputation = owner.get("reputation", 0)
254 # Get question stats
255 score = question.get("score", 0)
256 view_count = question.get("view_count", 0)
257 answer_count = question.get("answer_count", 0)
258 is_answered = question.get("is_answered", False)
259 accepted_answer_id = question.get("accepted_answer_id")
261 # Get tags
262 tags = question.get("tags", [])
264 # Build answer status prefix
265 status_parts = []
266 if is_answered:
267 status = f"Answered ({answer_count} answer{'s' if answer_count != 1 else ''}"
268 if accepted_answer_id: 268 ↛ 270line 268 didn't jump to line 270 because the condition on line 268 was always true
269 status += ", accepted"
270 status += ")"
271 status_parts.append(status)
272 elif answer_count > 0: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 status_parts.append(
274 f"{answer_count} answer{'s' if answer_count != 1 else ''}"
275 )
276 if tags:
277 status_parts.append(f"Tags: {', '.join(tags[:4])}")
278 prefix = " | ".join(status_parts)
280 # Get body (snippet)
281 body = question.get("body", "")
282 # Strip HTML for snippet
283 body_text = html.unescape(re.sub(r"<[^>]+>", " ", body))
284 body_text = " ".join(body_text.split())[:1000]
285 snippet = f"{prefix} | {body_text}" if prefix else body_text
287 # Get dates
288 creation_date = question.get("creation_date", 0)
289 last_activity = question.get("last_activity_date", 0)
291 # Build link
292 fallback_domain = self.SITE_DOMAINS.get(
293 self.site, f"{self.site}.stackexchange.com"
294 )
295 link = question.get(
296 "link",
297 f"https://{fallback_domain}/questions/{question_id}",
298 )
300 preview = {
301 "id": str(question_id),
302 "title": title,
303 "link": link,
304 "snippet": snippet,
305 "author": author,
306 "author_link": author_link,
307 "author_reputation": author_reputation,
308 "score": score,
309 "view_count": view_count,
310 "answer_count": answer_count,
311 "is_answered": is_answered,
312 "has_accepted_answer": accepted_answer_id is not None,
313 "tags": tags,
314 "creation_date": creation_date,
315 "last_activity_date": last_activity,
316 "site": self.site,
317 "source": self._get_site_name(),
318 "_raw": question,
319 }
321 previews.append(preview)
323 except Exception:
324 logger.exception("Error parsing Stack Exchange question")
325 continue
327 return previews
329 except (requests.RequestException, ValueError) as e:
330 logger.exception("Stack Exchange API request failed")
331 self._raise_if_rate_limit(e)
332 return []
334 def _get_full_content(
335 self, relevant_items: List[Dict[str, Any]]
336 ) -> List[Dict[str, Any]]:
337 """
338 Get full content for the relevant Stack Exchange questions.
340 Fetches the question body and top answers from the API.
342 Args:
343 relevant_items: List of relevant preview dictionaries
345 Returns:
346 List of result dictionaries with full content
347 """
348 logger.info(
349 f"Getting full content for {len(relevant_items)} Stack Exchange questions"
350 )
352 results = []
353 for item in relevant_items:
354 result = item.copy()
356 raw = item.get("_raw", {})
357 if raw:
358 # Get full body
359 body = raw.get("body", "")
360 clean_body = html.unescape(re.sub(r"<[^>]+>", " ", body))
361 clean_body = " ".join(clean_body.split())
363 # Build content with question + answers
364 content_parts = []
365 content_parts.append(
366 f"Question: {result.get('title', 'Untitled')}"
367 )
368 if result.get("tags"):
369 content_parts.append(f"Tags: {', '.join(result['tags'])}")
370 content_parts.append(f"\n{clean_body}")
372 # Fetch top answers
373 question_id = raw.get("question_id")
374 if question_id:
375 try:
376 question_id = int(question_id)
377 except (TypeError, ValueError):
378 question_id = None
379 if question_id:
380 answers = self._fetch_top_answers(
381 question_id, max_answers=3
382 )
383 if answers: 383 ↛ 400line 383 didn't jump to line 400 because the condition on line 383 was always true
384 content_parts.append(
385 f"\n--- Top Answers ({len(answers)}) ---"
386 )
387 for ans in answers:
388 ans_body = html.unescape(
389 re.sub(r"<[^>]+>", " ", ans.get("body", ""))
390 )
391 ans_body = " ".join(ans_body.split())[:3000]
392 score = ans.get("score", 0)
393 accepted = ans.get("is_accepted", False)
394 label = f"[Score: {score}"
395 if accepted: 395 ↛ 397line 395 didn't jump to line 397 because the condition on line 395 was always true
396 label += ", Accepted"
397 label += "]"
398 content_parts.append(f"\n{label}\n{ans_body}")
400 result["content"] = "\n".join(content_parts)
402 # Clean up internal fields
403 if "_raw" in result:
404 del result["_raw"]
406 results.append(result)
408 return results
410 def _fetch_top_answers(
411 self, question_id: int, max_answers: int = 3
412 ) -> List[Dict[str, Any]]:
413 """Fetch top answers for a question, sorted by votes."""
414 try:
415 self._apply_backoff()
416 url = f"{self.base_url}/questions/{question_id}/answers"
417 params = {
418 "site": self.site,
419 "order": "desc",
420 "sort": "votes",
421 "pagesize": max_answers,
422 "filter": "withbody",
423 }
424 response = safe_get(
425 url, params=params, headers=self.headers, timeout=30
426 )
427 self._raise_if_rate_limit(response.status_code)
428 response.raise_for_status()
429 data = response.json()
430 self._handle_backoff(data)
432 if "error_id" in data:
433 logger.warning(
434 f"Stack Exchange API error fetching answers for "
435 f"{question_id}: {data.get('error_message', 'Unknown')}"
436 )
437 return []
439 quota_remaining = data.get("quota_remaining")
440 if quota_remaining is not None and quota_remaining < 10:
441 logger.warning(f"Stack Exchange quota low: {quota_remaining}")
443 return data.get("items", []) # type: ignore[no-any-return]
444 except (RateLimitError, ValueError):
445 raise
446 except Exception:
447 logger.warning(
448 f"Failed to fetch answers for question {question_id}"
449 )
450 return []
452 def get_question(self, question_id: int) -> Optional[Dict[str, Any]]:
453 """
454 Get a specific question by ID.
456 Args:
457 question_id: The Stack Exchange question ID
459 Returns:
460 Question dictionary or None
461 """
462 try:
463 url = f"{self.base_url}/questions/{question_id}"
464 params = {"site": self.site, "filter": "withbody"}
465 response = safe_get(
466 url, params=params, headers=self.headers, timeout=30
467 )
468 self._raise_if_rate_limit(response.status_code)
469 response.raise_for_status()
470 data = response.json()
471 self._handle_backoff(data)
472 items = data.get("items", [])
473 return items[0] if items else None
474 except RateLimitError:
475 raise
476 except Exception:
477 logger.exception(
478 f"Error fetching Stack Exchange question {question_id}"
479 )
480 return None
482 def get_answers(self, question_id: int) -> List[Dict[str, Any]]:
483 """
484 Get answers for a specific question.
486 Args:
487 question_id: The Stack Exchange question ID
489 Returns:
490 List of answer dictionaries
491 """
492 try:
493 url = f"{self.base_url}/questions/{question_id}/answers"
494 params = {
495 "site": self.site,
496 "order": "desc",
497 "sort": "votes",
498 "filter": "withbody",
499 }
500 response = safe_get(
501 url, params=params, headers=self.headers, timeout=30
502 )
503 self._raise_if_rate_limit(response.status_code)
504 response.raise_for_status()
505 data = response.json()
506 self._handle_backoff(data)
507 return data.get("items", []) # type: ignore[no-any-return]
508 except RateLimitError:
509 raise
510 except Exception:
511 logger.exception(
512 f"Error fetching answers for question {question_id}"
513 )
514 return []
516 def search_by_tag(self, tag: str, query: str = "") -> List[Dict[str, Any]]:
517 """
518 Search questions by tag.
520 Args:
521 tag: The tag to filter by
522 query: Optional search query
524 Returns:
525 List of matching questions
526 """
527 original_tagged = self.tagged
528 try:
529 self.tagged = tag
530 return self.run(query)
531 finally:
532 self.tagged = original_tagged