Coverage for src/local_deep_research/web_search_engines/engines/search_engine_stackexchange.py: 95%

231 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1"""Stack Exchange search engine for Q&A content.""" 

2 

3import html 

4import re 

5import time 

6from typing import Any, Dict, List, Optional 

7 

8import requests 

9from langchain_core.language_models import BaseLLM 

10from loguru import logger 

11 

12from ...constants import USER_AGENT 

13from ...security.safe_requests import safe_get 

14from ..rate_limiting import RateLimitError 

15from ..search_engine_base import BaseSearchEngine 

16 

17 

18class StackExchangeSearchEngine(BaseSearchEngine): 

19 """ 

20 Stack Exchange search engine for Q&A content. 

21 

22 Provides access to Stack Overflow and other Stack Exchange sites. 

23 No authentication required (300 requests/day without key). 

24 """ 

25 

26 is_public = True 

27 is_generic = False 

28 is_scientific = False 

29 is_code = True 

30 is_lexical = True 

31 needs_llm_relevance_filter = True 

32 

33 # Common Stack Exchange sites 

34 SITES = { 

35 "stackoverflow": "Stack Overflow", 

36 "serverfault": "Server Fault", 

37 "superuser": "Super User", 

38 "askubuntu": "Ask Ubuntu", 

39 "unix": "Unix & Linux", 

40 "math": "Mathematics", 

41 "physics": "Physics", 

42 "stats": "Cross Validated", 

43 "security": "Information Security", 

44 "dba": "Database Administrators", 

45 } 

46 

47 # Sites with their own .com domains (not *.stackexchange.com) 

48 SITE_DOMAINS = { 

49 "stackoverflow": "stackoverflow.com", 

50 "serverfault": "serverfault.com", 

51 "superuser": "superuser.com", 

52 "askubuntu": "askubuntu.com", 

53 } 

54 

55 def __init__( 

56 self, 

57 max_results: int = 10, 

58 site: str = "stackoverflow", 

59 sort: str = "relevance", 

60 accepted_only: bool = False, 

61 has_answers: bool = False, 

62 min_score: Optional[int] = None, 

63 tagged: Optional[str] = None, 

64 llm: Optional[BaseLLM] = None, 

65 max_filtered_results: Optional[int] = None, 

66 settings_snapshot: Optional[Dict[str, Any]] = None, 

67 **kwargs, 

68 ): 

69 """ 

70 Initialize the Stack Exchange search engine. 

71 

72 Args: 

73 max_results: Maximum number of search results 

74 site: Stack Exchange site to search (stackoverflow, serverfault, etc.) 

75 sort: Sort order (relevance, votes, creation, activity) 

76 accepted_only: Only return questions with accepted answers 

77 has_answers: Only return questions that have answers 

78 min_score: Minimum score for questions 

79 tagged: Filter by tags (semicolon separated) 

80 llm: Language model for relevance filtering 

81 max_filtered_results: Maximum results after filtering 

82 settings_snapshot: Settings snapshot for thread context 

83 """ 

84 super().__init__( 

85 llm=llm, 

86 max_filtered_results=max_filtered_results, 

87 max_results=max_results, 

88 settings_snapshot=settings_snapshot, 

89 **kwargs, 

90 ) 

91 

92 # Validate site parameter 

93 if site not in self.SITES: 

94 valid_sites = ", ".join(self.SITES.keys()) 

95 raise ValueError( 

96 f"Invalid site: '{site}'. Must be one of: {valid_sites}" 

97 ) 

98 

99 # Validate sort parameter 

100 valid_sorts = ("relevance", "votes", "creation", "activity") 

101 if sort not in valid_sorts: 

102 raise ValueError( 

103 f"Invalid sort: '{sort}'. Must be one of: {', '.join(valid_sorts)}" 

104 ) 

105 

106 # Validate sort/min_score combination: the StackExchange API's "min" 

107 # parameter works with any sort except "relevance". 

108 if min_score is not None and sort == "relevance": 

109 raise ValueError( 

110 "min_score requires a numeric sort order (votes, creation, or activity). " 

111 "sort='relevance' does not support the 'min' parameter." 

112 ) 

113 

114 self.site = site 

115 self.sort = sort 

116 self.accepted_only = accepted_only 

117 self.has_answers = has_answers 

118 self.min_score = min_score 

119 self.tagged = tagged 

120 

121 self.base_url = "https://api.stackexchange.com/2.3" 

122 self.search_url = f"{self.base_url}/search/advanced" 

123 

124 # User-Agent and required headers for API requests 

125 self.headers = { 

126 "User-Agent": USER_AGENT, 

127 "Accept-Encoding": "gzip, deflate", 

128 } 

129 

130 # Track backoff requirement from API responses 

131 self._backoff_until: float = 0 

132 

133 def _apply_backoff(self) -> None: 

134 """Apply backoff if required by previous API response.""" 

135 if self._backoff_until > 0: 

136 wait_time = self._backoff_until - time.time() 

137 if wait_time > 0: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true

138 logger.info( 

139 f"Stack Exchange backoff: waiting {wait_time:.1f} seconds" 

140 ) 

141 time.sleep(wait_time) 

142 self._backoff_until = 0 

143 

144 def _handle_backoff(self, data: Dict[str, Any]) -> None: 

145 """Handle backoff field in API response.""" 

146 backoff = data.get("backoff") 

147 if backoff: 

148 self._backoff_until = time.time() + min(int(backoff), 300) 

149 logger.warning( 

150 f"Stack Exchange API requested backoff of {backoff} seconds" 

151 ) 

152 

153 def _build_query_params(self, query: str) -> Dict[str, Any]: 

154 """Build query parameters for the API request.""" 

155 params = { 

156 "q": query, 

157 "site": self.site, 

158 "order": "desc", 

159 "sort": self.sort, 

160 "pagesize": min(self.max_results, 100), 

161 "filter": "withbody", # Include question body 

162 } 

163 

164 if self.accepted_only: 

165 params["accepted"] = "True" 

166 

167 if self.has_answers: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 params["answers"] = "1" 

169 

170 if self.min_score is not None: 

171 params["min"] = self.min_score 

172 

173 if self.tagged: 

174 params["tagged"] = self.tagged 

175 

176 return params 

177 

178 def _decode_html(self, text: str) -> str: 

179 """Decode HTML entities in text.""" 

180 return html.unescape(text) 

181 

182 def _get_site_name(self) -> str: 

183 """Get human-readable site name.""" 

184 return self.SITES.get(self.site, self.site.title()) 

185 

186 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

187 """ 

188 Get preview information for Stack Exchange questions. 

189 

190 Args: 

191 query: The search query 

192 

193 Returns: 

194 List of preview dictionaries 

195 """ 

196 logger.info( 

197 f"Getting Stack Exchange previews for query: {query} on {self.site}" 

198 ) 

199 

200 # Apply rate limiting 

201 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

202 self.engine_type 

203 ) 

204 

205 # Apply backoff if required by previous API response 

206 self._apply_backoff() 

207 

208 try: 

209 params = self._build_query_params(query) 

210 

211 response = safe_get( 

212 self.search_url, 

213 params=params, 

214 headers=self.headers, 

215 timeout=30, 

216 ) 

217 

218 self._raise_if_rate_limit(response.status_code) 

219 

220 response.raise_for_status() 

221 data = response.json() 

222 

223 # Handle backoff if present in response 

224 self._handle_backoff(data) 

225 

226 # Check for API errors 

227 if "error_id" in data: 

228 error_msg = data.get("error_message", "Unknown error") 

229 logger.error(f"Stack Exchange API error: {error_msg}") 

230 return [] 

231 

232 results = data.get("items", []) 

233 quota_remaining = data.get("quota_remaining", 0) 

234 logger.info( 

235 f"Found {len(results)} Stack Exchange results, quota remaining: {quota_remaining}" 

236 ) 

237 

238 if quota_remaining < 10: 

239 logger.warning(f"Stack Exchange quota low: {quota_remaining}") 

240 

241 previews = [] 

242 for question in results[: self.max_results]: 

243 try: 

244 question_id = question.get("question_id") 

245 title = self._decode_html(question.get("title", "Untitled")) 

246 

247 # Get owner info 

248 owner = question.get("owner", {}) 

249 author = self._decode_html( 

250 owner.get("display_name", "Unknown") 

251 ) 

252 author_link = owner.get("link", "") 

253 author_reputation = owner.get("reputation", 0) 

254 

255 # Get question stats 

256 score = question.get("score", 0) 

257 view_count = question.get("view_count", 0) 

258 answer_count = question.get("answer_count", 0) 

259 is_answered = question.get("is_answered", False) 

260 accepted_answer_id = question.get("accepted_answer_id") 

261 

262 # Get tags 

263 tags = question.get("tags", []) 

264 

265 # Build answer status prefix 

266 status_parts = [] 

267 if is_answered: 

268 status = f"Answered ({answer_count} answer{'s' if answer_count != 1 else ''}" 

269 if accepted_answer_id: 269 ↛ 271line 269 didn't jump to line 271 because the condition on line 269 was always true

270 status += ", accepted" 

271 status += ")" 

272 status_parts.append(status) 

273 elif answer_count > 0: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 status_parts.append( 

275 f"{answer_count} answer{'s' if answer_count != 1 else ''}" 

276 ) 

277 if tags: 

278 status_parts.append(f"Tags: {', '.join(tags[:4])}") 

279 prefix = " | ".join(status_parts) 

280 

281 # Get body (snippet) 

282 body = question.get("body", "") 

283 # Strip HTML for snippet 

284 body_text = html.unescape(re.sub(r"<[^>]+>", " ", body)) 

285 body_text = " ".join(body_text.split())[:1000] 

286 snippet = f"{prefix} | {body_text}" if prefix else body_text 

287 

288 # Get dates 

289 creation_date = question.get("creation_date", 0) 

290 last_activity = question.get("last_activity_date", 0) 

291 

292 # Build link 

293 fallback_domain = self.SITE_DOMAINS.get( 

294 self.site, f"{self.site}.stackexchange.com" 

295 ) 

296 link = question.get( 

297 "link", 

298 f"https://{fallback_domain}/questions/{question_id}", 

299 ) 

300 

301 preview = { 

302 "id": str(question_id), 

303 "title": title, 

304 "link": link, 

305 "snippet": snippet, 

306 "author": author, 

307 "author_link": author_link, 

308 "author_reputation": author_reputation, 

309 "score": score, 

310 "view_count": view_count, 

311 "answer_count": answer_count, 

312 "is_answered": is_answered, 

313 "has_accepted_answer": accepted_answer_id is not None, 

314 "tags": tags, 

315 "creation_date": creation_date, 

316 "last_activity_date": last_activity, 

317 "site": self.site, 

318 "source": self._get_site_name(), 

319 "_raw": question, 

320 } 

321 

322 previews.append(preview) 

323 

324 except Exception: 

325 logger.exception("Error parsing Stack Exchange question") 

326 continue 

327 

328 return previews 

329 

330 except (requests.RequestException, ValueError) as e: 

331 logger.exception("Stack Exchange API request failed") 

332 self._raise_if_rate_limit(e) 

333 return [] 

334 

335 def _get_full_content( 

336 self, relevant_items: List[Dict[str, Any]] 

337 ) -> List[Dict[str, Any]]: 

338 """ 

339 Get full content for the relevant Stack Exchange questions. 

340 

341 Fetches the question body and top answers from the API. 

342 

343 Args: 

344 relevant_items: List of relevant preview dictionaries 

345 

346 Returns: 

347 List of result dictionaries with full content 

348 """ 

349 logger.info( 

350 f"Getting full content for {len(relevant_items)} Stack Exchange questions" 

351 ) 

352 

353 results = [] 

354 for item in relevant_items: 

355 result = item.copy() 

356 

357 raw = item.get("_raw", {}) 

358 if raw: 

359 # Get full body 

360 body = raw.get("body", "") 

361 clean_body = html.unescape(re.sub(r"<[^>]+>", " ", body)) 

362 clean_body = " ".join(clean_body.split()) 

363 

364 # Build content with question + answers 

365 content_parts = [] 

366 content_parts.append( 

367 f"Question: {result.get('title', 'Untitled')}" 

368 ) 

369 if result.get("tags"): 

370 content_parts.append(f"Tags: {', '.join(result['tags'])}") 

371 content_parts.append(f"\n{clean_body}") 

372 

373 # Fetch top answers 

374 question_id = raw.get("question_id") 

375 if question_id: 

376 try: 

377 question_id = int(question_id) 

378 except (TypeError, ValueError): 

379 question_id = None 

380 if question_id: 

381 answers = self._fetch_top_answers( 

382 question_id, max_answers=3 

383 ) 

384 if answers: 384 ↛ 401line 384 didn't jump to line 401 because the condition on line 384 was always true

385 content_parts.append( 

386 f"\n--- Top Answers ({len(answers)}) ---" 

387 ) 

388 for ans in answers: 

389 ans_body = html.unescape( 

390 re.sub(r"<[^>]+>", " ", ans.get("body", "")) 

391 ) 

392 ans_body = " ".join(ans_body.split())[:3000] 

393 score = ans.get("score", 0) 

394 accepted = ans.get("is_accepted", False) 

395 label = f"[Score: {score}" 

396 if accepted: 396 ↛ 398line 396 didn't jump to line 398 because the condition on line 396 was always true

397 label += ", Accepted" 

398 label += "]" 

399 content_parts.append(f"\n{label}\n{ans_body}") 

400 

401 result["content"] = "\n".join(content_parts) 

402 

403 # Clean up internal fields 

404 if "_raw" in result: 

405 del result["_raw"] 

406 

407 results.append(result) 

408 

409 return results 

410 

411 def _fetch_top_answers( 

412 self, question_id: int, max_answers: int = 3 

413 ) -> List[Dict[str, Any]]: 

414 """Fetch top answers for a question, sorted by votes.""" 

415 try: 

416 self._apply_backoff() 

417 url = f"{self.base_url}/questions/{question_id}/answers" 

418 params = { 

419 "site": self.site, 

420 "order": "desc", 

421 "sort": "votes", 

422 "pagesize": max_answers, 

423 "filter": "withbody", 

424 } 

425 response = safe_get( 

426 url, params=params, headers=self.headers, timeout=30 

427 ) 

428 self._raise_if_rate_limit(response.status_code) 

429 response.raise_for_status() 

430 data = response.json() 

431 self._handle_backoff(data) 

432 

433 if "error_id" in data: 

434 logger.warning( 

435 f"Stack Exchange API error fetching answers for " 

436 f"{question_id}: {data.get('error_message', 'Unknown')}" 

437 ) 

438 return [] 

439 

440 quota_remaining = data.get("quota_remaining") 

441 if quota_remaining is not None and quota_remaining < 10: 

442 logger.warning(f"Stack Exchange quota low: {quota_remaining}") 

443 

444 return data.get("items", []) # type: ignore[no-any-return] 

445 except (RateLimitError, ValueError): 

446 raise 

447 except Exception: 

448 logger.warning( 

449 f"Failed to fetch answers for question {question_id}" 

450 ) 

451 return [] 

452 

453 def get_question(self, question_id: int) -> Optional[Dict[str, Any]]: 

454 """ 

455 Get a specific question by ID. 

456 

457 Args: 

458 question_id: The Stack Exchange question ID 

459 

460 Returns: 

461 Question dictionary or None 

462 """ 

463 try: 

464 url = f"{self.base_url}/questions/{question_id}" 

465 params = {"site": self.site, "filter": "withbody"} 

466 response = safe_get( 

467 url, params=params, headers=self.headers, timeout=30 

468 ) 

469 self._raise_if_rate_limit(response.status_code) 

470 response.raise_for_status() 

471 data = response.json() 

472 self._handle_backoff(data) 

473 items = data.get("items", []) 

474 return items[0] if items else None 

475 except RateLimitError: 

476 raise 

477 except Exception: 

478 logger.exception( 

479 f"Error fetching Stack Exchange question {question_id}" 

480 ) 

481 return None 

482 

483 def get_answers(self, question_id: int) -> List[Dict[str, Any]]: 

484 """ 

485 Get answers for a specific question. 

486 

487 Args: 

488 question_id: The Stack Exchange question ID 

489 

490 Returns: 

491 List of answer dictionaries 

492 """ 

493 try: 

494 url = f"{self.base_url}/questions/{question_id}/answers" 

495 params = { 

496 "site": self.site, 

497 "order": "desc", 

498 "sort": "votes", 

499 "filter": "withbody", 

500 } 

501 response = safe_get( 

502 url, params=params, headers=self.headers, timeout=30 

503 ) 

504 self._raise_if_rate_limit(response.status_code) 

505 response.raise_for_status() 

506 data = response.json() 

507 self._handle_backoff(data) 

508 return data.get("items", []) # type: ignore[no-any-return] 

509 except RateLimitError: 

510 raise 

511 except Exception: 

512 logger.exception( 

513 f"Error fetching answers for question {question_id}" 

514 ) 

515 return [] 

516 

517 def search_by_tag(self, tag: str, query: str = "") -> List[Dict[str, Any]]: 

518 """ 

519 Search questions by tag. 

520 

521 Args: 

522 tag: The tag to filter by 

523 query: Optional search query 

524 

525 Returns: 

526 List of matching questions 

527 """ 

528 original_tagged = self.tagged 

529 try: 

530 self.tagged = tag 

531 return self.run(query) 

532 finally: 

533 self.tagged = original_tagged