Coverage for src / local_deep_research / web_search_engines / engines / search_engine_stackexchange.py: 95%

230 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1"""Stack Exchange search engine for Q&A content.""" 

2 

3import html 

4import re 

5import time 

6from typing import Any, Dict, List, Optional 

7 

8import requests 

9from langchain_core.language_models import BaseLLM 

10from loguru import logger 

11 

12from ...security.safe_requests import safe_get 

13from ..rate_limiting import RateLimitError 

14from ..search_engine_base import BaseSearchEngine 

15 

16 

17class StackExchangeSearchEngine(BaseSearchEngine): 

18 """ 

19 Stack Exchange search engine for Q&A content. 

20 

21 Provides access to Stack Overflow and other Stack Exchange sites. 

22 No authentication required (300 requests/day without key). 

23 """ 

24 

25 is_public = True 

26 is_generic = False 

27 is_scientific = False 

28 is_code = True 

29 is_lexical = True 

30 needs_llm_relevance_filter = True 

31 

32 # Common Stack Exchange sites 

33 SITES = { 

34 "stackoverflow": "Stack Overflow", 

35 "serverfault": "Server Fault", 

36 "superuser": "Super User", 

37 "askubuntu": "Ask Ubuntu", 

38 "unix": "Unix & Linux", 

39 "math": "Mathematics", 

40 "physics": "Physics", 

41 "stats": "Cross Validated", 

42 "security": "Information Security", 

43 "dba": "Database Administrators", 

44 } 

45 

46 # Sites with their own .com domains (not *.stackexchange.com) 

47 SITE_DOMAINS = { 

48 "stackoverflow": "stackoverflow.com", 

49 "serverfault": "serverfault.com", 

50 "superuser": "superuser.com", 

51 "askubuntu": "askubuntu.com", 

52 } 

53 

54 def __init__( 

55 self, 

56 max_results: int = 10, 

57 site: str = "stackoverflow", 

58 sort: str = "relevance", 

59 accepted_only: bool = False, 

60 has_answers: bool = False, 

61 min_score: Optional[int] = None, 

62 tagged: Optional[str] = None, 

63 llm: Optional[BaseLLM] = None, 

64 max_filtered_results: Optional[int] = None, 

65 settings_snapshot: Optional[Dict[str, Any]] = None, 

66 **kwargs, 

67 ): 

68 """ 

69 Initialize the Stack Exchange search engine. 

70 

71 Args: 

72 max_results: Maximum number of search results 

73 site: Stack Exchange site to search (stackoverflow, serverfault, etc.) 

74 sort: Sort order (relevance, votes, creation, activity) 

75 accepted_only: Only return questions with accepted answers 

76 has_answers: Only return questions that have answers 

77 min_score: Minimum score for questions 

78 tagged: Filter by tags (semicolon separated) 

79 llm: Language model for relevance filtering 

80 max_filtered_results: Maximum results after filtering 

81 settings_snapshot: Settings snapshot for thread context 

82 """ 

83 super().__init__( 

84 llm=llm, 

85 max_filtered_results=max_filtered_results, 

86 max_results=max_results, 

87 settings_snapshot=settings_snapshot, 

88 **kwargs, 

89 ) 

90 

91 # Validate site parameter 

92 if site not in self.SITES: 

93 valid_sites = ", ".join(self.SITES.keys()) 

94 raise ValueError( 

95 f"Invalid site: '{site}'. Must be one of: {valid_sites}" 

96 ) 

97 

98 # Validate sort parameter 

99 valid_sorts = ("relevance", "votes", "creation", "activity") 

100 if sort not in valid_sorts: 

101 raise ValueError( 

102 f"Invalid sort: '{sort}'. Must be one of: {', '.join(valid_sorts)}" 

103 ) 

104 

105 # Validate sort/min_score combination: the StackExchange API's "min" 

106 # parameter works with any sort except "relevance". 

107 if min_score is not None and sort == "relevance": 

108 raise ValueError( 

109 "min_score requires a numeric sort order (votes, creation, or activity). " 

110 "sort='relevance' does not support the 'min' parameter." 

111 ) 

112 

113 self.site = site 

114 self.sort = sort 

115 self.accepted_only = accepted_only 

116 self.has_answers = has_answers 

117 self.min_score = min_score 

118 self.tagged = tagged 

119 

120 self.base_url = "https://api.stackexchange.com/2.3" 

121 self.search_url = f"{self.base_url}/search/advanced" 

122 

123 # User-Agent and required headers for API requests 

124 self.headers = { 

125 "User-Agent": "LocalDeepResearch/1.0 (https://github.com/LearningCircuit/local-deep-research)", 

126 "Accept-Encoding": "gzip, deflate", 

127 } 

128 

129 # Track backoff requirement from API responses 

130 self._backoff_until: float = 0 

131 

132 def _apply_backoff(self) -> None: 

133 """Apply backoff if required by previous API response.""" 

134 if self._backoff_until > 0: 

135 wait_time = self._backoff_until - time.time() 

136 if wait_time > 0: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 logger.info( 

138 f"Stack Exchange backoff: waiting {wait_time:.1f} seconds" 

139 ) 

140 time.sleep(wait_time) 

141 self._backoff_until = 0 

142 

143 def _handle_backoff(self, data: Dict[str, Any]) -> None: 

144 """Handle backoff field in API response.""" 

145 backoff = data.get("backoff") 

146 if backoff: 

147 self._backoff_until = time.time() + min(int(backoff), 300) 

148 logger.warning( 

149 f"Stack Exchange API requested backoff of {backoff} seconds" 

150 ) 

151 

152 def _build_query_params(self, query: str) -> Dict[str, Any]: 

153 """Build query parameters for the API request.""" 

154 params = { 

155 "q": query, 

156 "site": self.site, 

157 "order": "desc", 

158 "sort": self.sort, 

159 "pagesize": min(self.max_results, 100), 

160 "filter": "withbody", # Include question body 

161 } 

162 

163 if self.accepted_only: 

164 params["accepted"] = "True" 

165 

166 if self.has_answers: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true

167 params["answers"] = "1" 

168 

169 if self.min_score is not None: 

170 params["min"] = self.min_score 

171 

172 if self.tagged: 

173 params["tagged"] = self.tagged 

174 

175 return params 

176 

177 def _decode_html(self, text: str) -> str: 

178 """Decode HTML entities in text.""" 

179 return html.unescape(text) 

180 

181 def _get_site_name(self) -> str: 

182 """Get human-readable site name.""" 

183 return self.SITES.get(self.site, self.site.title()) 

184 

185 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

186 """ 

187 Get preview information for Stack Exchange questions. 

188 

189 Args: 

190 query: The search query 

191 

192 Returns: 

193 List of preview dictionaries 

194 """ 

195 logger.info( 

196 f"Getting Stack Exchange previews for query: {query} on {self.site}" 

197 ) 

198 

199 # Apply rate limiting 

200 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

201 self.engine_type 

202 ) 

203 

204 # Apply backoff if required by previous API response 

205 self._apply_backoff() 

206 

207 try: 

208 params = self._build_query_params(query) 

209 

210 response = safe_get( 

211 self.search_url, 

212 params=params, 

213 headers=self.headers, 

214 timeout=30, 

215 ) 

216 

217 self._raise_if_rate_limit(response.status_code) 

218 

219 response.raise_for_status() 

220 data = response.json() 

221 

222 # Handle backoff if present in response 

223 self._handle_backoff(data) 

224 

225 # Check for API errors 

226 if "error_id" in data: 

227 error_msg = data.get("error_message", "Unknown error") 

228 logger.error(f"Stack Exchange API error: {error_msg}") 

229 return [] 

230 

231 results = data.get("items", []) 

232 quota_remaining = data.get("quota_remaining", 0) 

233 logger.info( 

234 f"Found {len(results)} Stack Exchange results, quota remaining: {quota_remaining}" 

235 ) 

236 

237 if quota_remaining < 10: 

238 logger.warning(f"Stack Exchange quota low: {quota_remaining}") 

239 

240 previews = [] 

241 for question in results[: self.max_results]: 

242 try: 

243 question_id = question.get("question_id") 

244 title = self._decode_html(question.get("title", "Untitled")) 

245 

246 # Get owner info 

247 owner = question.get("owner", {}) 

248 author = self._decode_html( 

249 owner.get("display_name", "Unknown") 

250 ) 

251 author_link = owner.get("link", "") 

252 author_reputation = owner.get("reputation", 0) 

253 

254 # Get question stats 

255 score = question.get("score", 0) 

256 view_count = question.get("view_count", 0) 

257 answer_count = question.get("answer_count", 0) 

258 is_answered = question.get("is_answered", False) 

259 accepted_answer_id = question.get("accepted_answer_id") 

260 

261 # Get tags 

262 tags = question.get("tags", []) 

263 

264 # Build answer status prefix 

265 status_parts = [] 

266 if is_answered: 

267 status = f"Answered ({answer_count} answer{'s' if answer_count != 1 else ''}" 

268 if accepted_answer_id: 268 ↛ 270line 268 didn't jump to line 270 because the condition on line 268 was always true

269 status += ", accepted" 

270 status += ")" 

271 status_parts.append(status) 

272 elif answer_count > 0: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 status_parts.append( 

274 f"{answer_count} answer{'s' if answer_count != 1 else ''}" 

275 ) 

276 if tags: 

277 status_parts.append(f"Tags: {', '.join(tags[:4])}") 

278 prefix = " | ".join(status_parts) 

279 

280 # Get body (snippet) 

281 body = question.get("body", "") 

282 # Strip HTML for snippet 

283 body_text = html.unescape(re.sub(r"<[^>]+>", " ", body)) 

284 body_text = " ".join(body_text.split())[:1000] 

285 snippet = f"{prefix} | {body_text}" if prefix else body_text 

286 

287 # Get dates 

288 creation_date = question.get("creation_date", 0) 

289 last_activity = question.get("last_activity_date", 0) 

290 

291 # Build link 

292 fallback_domain = self.SITE_DOMAINS.get( 

293 self.site, f"{self.site}.stackexchange.com" 

294 ) 

295 link = question.get( 

296 "link", 

297 f"https://{fallback_domain}/questions/{question_id}", 

298 ) 

299 

300 preview = { 

301 "id": str(question_id), 

302 "title": title, 

303 "link": link, 

304 "snippet": snippet, 

305 "author": author, 

306 "author_link": author_link, 

307 "author_reputation": author_reputation, 

308 "score": score, 

309 "view_count": view_count, 

310 "answer_count": answer_count, 

311 "is_answered": is_answered, 

312 "has_accepted_answer": accepted_answer_id is not None, 

313 "tags": tags, 

314 "creation_date": creation_date, 

315 "last_activity_date": last_activity, 

316 "site": self.site, 

317 "source": self._get_site_name(), 

318 "_raw": question, 

319 } 

320 

321 previews.append(preview) 

322 

323 except Exception: 

324 logger.exception("Error parsing Stack Exchange question") 

325 continue 

326 

327 return previews 

328 

329 except (requests.RequestException, ValueError) as e: 

330 logger.exception("Stack Exchange API request failed") 

331 self._raise_if_rate_limit(e) 

332 return [] 

333 

334 def _get_full_content( 

335 self, relevant_items: List[Dict[str, Any]] 

336 ) -> List[Dict[str, Any]]: 

337 """ 

338 Get full content for the relevant Stack Exchange questions. 

339 

340 Fetches the question body and top answers from the API. 

341 

342 Args: 

343 relevant_items: List of relevant preview dictionaries 

344 

345 Returns: 

346 List of result dictionaries with full content 

347 """ 

348 logger.info( 

349 f"Getting full content for {len(relevant_items)} Stack Exchange questions" 

350 ) 

351 

352 results = [] 

353 for item in relevant_items: 

354 result = item.copy() 

355 

356 raw = item.get("_raw", {}) 

357 if raw: 

358 # Get full body 

359 body = raw.get("body", "") 

360 clean_body = html.unescape(re.sub(r"<[^>]+>", " ", body)) 

361 clean_body = " ".join(clean_body.split()) 

362 

363 # Build content with question + answers 

364 content_parts = [] 

365 content_parts.append( 

366 f"Question: {result.get('title', 'Untitled')}" 

367 ) 

368 if result.get("tags"): 

369 content_parts.append(f"Tags: {', '.join(result['tags'])}") 

370 content_parts.append(f"\n{clean_body}") 

371 

372 # Fetch top answers 

373 question_id = raw.get("question_id") 

374 if question_id: 

375 try: 

376 question_id = int(question_id) 

377 except (TypeError, ValueError): 

378 question_id = None 

379 if question_id: 

380 answers = self._fetch_top_answers( 

381 question_id, max_answers=3 

382 ) 

383 if answers: 383 ↛ 400line 383 didn't jump to line 400 because the condition on line 383 was always true

384 content_parts.append( 

385 f"\n--- Top Answers ({len(answers)}) ---" 

386 ) 

387 for ans in answers: 

388 ans_body = html.unescape( 

389 re.sub(r"<[^>]+>", " ", ans.get("body", "")) 

390 ) 

391 ans_body = " ".join(ans_body.split())[:3000] 

392 score = ans.get("score", 0) 

393 accepted = ans.get("is_accepted", False) 

394 label = f"[Score: {score}" 

395 if accepted: 395 ↛ 397line 395 didn't jump to line 397 because the condition on line 395 was always true

396 label += ", Accepted" 

397 label += "]" 

398 content_parts.append(f"\n{label}\n{ans_body}") 

399 

400 result["content"] = "\n".join(content_parts) 

401 

402 # Clean up internal fields 

403 if "_raw" in result: 

404 del result["_raw"] 

405 

406 results.append(result) 

407 

408 return results 

409 

410 def _fetch_top_answers( 

411 self, question_id: int, max_answers: int = 3 

412 ) -> List[Dict[str, Any]]: 

413 """Fetch top answers for a question, sorted by votes.""" 

414 try: 

415 self._apply_backoff() 

416 url = f"{self.base_url}/questions/{question_id}/answers" 

417 params = { 

418 "site": self.site, 

419 "order": "desc", 

420 "sort": "votes", 

421 "pagesize": max_answers, 

422 "filter": "withbody", 

423 } 

424 response = safe_get( 

425 url, params=params, headers=self.headers, timeout=30 

426 ) 

427 self._raise_if_rate_limit(response.status_code) 

428 response.raise_for_status() 

429 data = response.json() 

430 self._handle_backoff(data) 

431 

432 if "error_id" in data: 

433 logger.warning( 

434 f"Stack Exchange API error fetching answers for " 

435 f"{question_id}: {data.get('error_message', 'Unknown')}" 

436 ) 

437 return [] 

438 

439 quota_remaining = data.get("quota_remaining") 

440 if quota_remaining is not None and quota_remaining < 10: 

441 logger.warning(f"Stack Exchange quota low: {quota_remaining}") 

442 

443 return data.get("items", []) # type: ignore[no-any-return] 

444 except (RateLimitError, ValueError): 

445 raise 

446 except Exception: 

447 logger.warning( 

448 f"Failed to fetch answers for question {question_id}" 

449 ) 

450 return [] 

451 

452 def get_question(self, question_id: int) -> Optional[Dict[str, Any]]: 

453 """ 

454 Get a specific question by ID. 

455 

456 Args: 

457 question_id: The Stack Exchange question ID 

458 

459 Returns: 

460 Question dictionary or None 

461 """ 

462 try: 

463 url = f"{self.base_url}/questions/{question_id}" 

464 params = {"site": self.site, "filter": "withbody"} 

465 response = safe_get( 

466 url, params=params, headers=self.headers, timeout=30 

467 ) 

468 self._raise_if_rate_limit(response.status_code) 

469 response.raise_for_status() 

470 data = response.json() 

471 self._handle_backoff(data) 

472 items = data.get("items", []) 

473 return items[0] if items else None 

474 except RateLimitError: 

475 raise 

476 except Exception: 

477 logger.exception( 

478 f"Error fetching Stack Exchange question {question_id}" 

479 ) 

480 return None 

481 

482 def get_answers(self, question_id: int) -> List[Dict[str, Any]]: 

483 """ 

484 Get answers for a specific question. 

485 

486 Args: 

487 question_id: The Stack Exchange question ID 

488 

489 Returns: 

490 List of answer dictionaries 

491 """ 

492 try: 

493 url = f"{self.base_url}/questions/{question_id}/answers" 

494 params = { 

495 "site": self.site, 

496 "order": "desc", 

497 "sort": "votes", 

498 "filter": "withbody", 

499 } 

500 response = safe_get( 

501 url, params=params, headers=self.headers, timeout=30 

502 ) 

503 self._raise_if_rate_limit(response.status_code) 

504 response.raise_for_status() 

505 data = response.json() 

506 self._handle_backoff(data) 

507 return data.get("items", []) # type: ignore[no-any-return] 

508 except RateLimitError: 

509 raise 

510 except Exception: 

511 logger.exception( 

512 f"Error fetching answers for question {question_id}" 

513 ) 

514 return [] 

515 

516 def search_by_tag(self, tag: str, query: str = "") -> List[Dict[str, Any]]: 

517 """ 

518 Search questions by tag. 

519 

520 Args: 

521 tag: The tag to filter by 

522 query: Optional search query 

523 

524 Returns: 

525 List of matching questions 

526 """ 

527 original_tagged = self.tagged 

528 try: 

529 self.tagged = tag 

530 return self.run(query) 

531 finally: 

532 self.tagged = original_tagged