Coverage for src / local_deep_research / web_search_engines / engines / search_engine_github.py: 99%

326 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1import base64 

2import json 

3import time 

4from typing import Any, Dict, List, Optional 

5 

6from langchain_core.language_models import BaseLLM 

7from loguru import logger 

8 

9from ...config import llm_config, search_config 

10from ...security.safe_requests import safe_get 

11from ...utilities.json_utils import extract_json, get_llm_response_text 

12from ..search_engine_base import BaseSearchEngine 

13 

14 

15class GitHubSearchEngine(BaseSearchEngine): 

16 """ 

17 GitHub search engine implementation. 

18 Provides search across GitHub repositories, code, issues, and users. 

19 """ 

20 

21 is_lexical = True 

22 needs_llm_relevance_filter = True 

23 

24 def __init__( 

25 self, 

26 max_results: int = 15, 

27 api_key: Optional[str] = None, 

28 search_type: str = "repositories", 

29 include_readme: bool = True, 

30 include_issues: bool = False, 

31 llm: Optional[BaseLLM] = None, 

32 max_filtered_results: Optional[int] = None, 

33 settings_snapshot: Optional[Dict[str, Any]] = None, 

34 ): 

35 """ 

36 Initialize the GitHub search engine. 

37 

38 Args: 

39 max_results: Maximum number of search results 

40 api_key: GitHub API token (can also be set via LDR_SEARCH_ENGINE_WEB_GITHUB_API_KEY env var or in UI settings) 

41 search_type: Type of GitHub search ("repositories", "code", "issues", "users") 

42 include_readme: Whether to include README content for repositories 

43 include_issues: Whether to include recent issues for repositories 

44 llm: Language model for relevance filtering 

45 max_filtered_results: Maximum number of results to keep after filtering 

46 """ 

47 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results 

48 super().__init__( 

49 llm=llm, 

50 max_filtered_results=max_filtered_results, 

51 max_results=max_results, 

52 settings_snapshot=settings_snapshot, 

53 ) 

54 self.api_key = api_key 

55 self.search_type = search_type 

56 self.include_readme = include_readme 

57 self.include_issues = include_issues 

58 

59 self._owns_llm = False 

60 

61 # API endpoints 

62 self.api_base = "https://api.github.com" 

63 self.search_endpoint = f"{self.api_base}/search/{search_type}" 

64 

65 # Set up API headers 

66 self.headers = { 

67 "Accept": "application/vnd.github.v3+json", 

68 "User-Agent": "Local-Deep-Research-Agent", 

69 } 

70 

71 # Add authentication if API key provided 

72 if self.api_key: 

73 self.headers["Authorization"] = f"token {self.api_key}" 

74 logger.info("Using authenticated GitHub API requests") 

75 else: 

76 logger.warning( 

77 "No GitHub API key provided. Rate limits will be restricted." 

78 ) 

79 

80 def close(self) -> None: 

81 """Close the lazily-loaded LLM client if this engine created it.""" 

82 from ...utilities.resource_utils import safe_close 

83 

84 if self._owns_llm: 

85 safe_close(self.llm, "GitHub LLM") 

86 super().close() 

87 

88 def _handle_rate_limits(self, response): 

89 """Handle GitHub API rate limits by logging warnings and sleeping if necessary""" 

90 remaining = int(response.headers.get("X-RateLimit-Remaining", 60)) 

91 reset_time = int(response.headers.get("X-RateLimit-Reset", 0)) 

92 

93 if remaining < 5: 

94 current_time = time.time() 

95 wait_time = max(reset_time - current_time, 0) 

96 logger.warning( 

97 f"GitHub API rate limit almost reached. {remaining} requests remaining." 

98 ) 

99 

100 if wait_time > 0 and remaining == 0: 

101 logger.warning( 

102 f"GitHub API rate limit exceeded. Waiting {wait_time:.0f} seconds." 

103 ) 

104 time.sleep(min(wait_time, 60)) # Wait at most 60 seconds 

105 

106 def _optimize_github_query(self, query: str) -> str: 

107 """ 

108 Optimize the GitHub search query using LLM to improve search results. 

109 

110 Args: 

111 query: Original search query 

112 

113 Returns: 

114 Optimized GitHub search query 

115 """ 

116 # Get LLM from config if not already set 

117 if not self.llm: 

118 try: 

119 self.llm = llm_config.get_llm() 

120 self._owns_llm = True 

121 if not self.llm: 

122 logger.warning("No LLM available for query optimization") 

123 return query 

124 except Exception: 

125 logger.exception("Error getting LLM from config") 

126 return query 

127 

128 prompt = f"""Transform this GitHub search query into an optimized version for the GitHub search API. Follow these steps: 

129 1. Strip question words (e.g., 'what', 'are', 'is'), stop words (e.g., 'and', 'as', 'of', 'on'), and redundant terms (e.g., 'repositories', 'repos', 'github') since they're implied by the search context. 

130 2. Keep only domain-specific keywords and avoid using "-related" terms. 

131 3. Add GitHub-specific filters with dynamic thresholds based on query context: 

132 - For stars: Use higher threshold (e.g., 'stars:>1000') for mainstream topics, lower (e.g., 'stars:>50') for specialized topics 

133 - For language: Detect programming language from query or omit if unclear 

134 - For search scope: Use 'in:name,description,readme' for general queries, 'in:file' for code-specific queries 

135 4. For date ranges, adapt based on query context: 

136 - For emerging: Use 'created:>2024-01-01' 

137 - For mature: Use 'pushed:>2023-01-01' 

138 - For historical research: Use 'created:2020-01-01..2024-01-01' 

139 5. For excluding results, adapt based on query: 

140 - Exclude irrelevant languages based on context 

141 - Use 'NOT' to exclude competing terms 

142 6. Ensure the output is a concise, space-separated string with no punctuation or extra text beyond keywords and filters. 

143 

144 

145 Original query: "{query}" 

146 

147 Return ONLY the optimized query, ready for GitHub's search API. Do not include explanations or additional text.""" 

148 

149 try: 

150 response = self.llm.invoke(prompt) 

151 

152 # Handle different response formats (string or object with content attribute) 

153 if hasattr(response, "content"): 

154 optimized_query = str(response.content).strip() 

155 else: 

156 # Handle string responses 

157 optimized_query = str(response).strip() 

158 

159 # Validate the optimized query 

160 if optimized_query and len(optimized_query) > 0: 

161 logger.info( 

162 f"LLM optimized query from '{query}' to '{optimized_query}'" 

163 ) 

164 return optimized_query 

165 logger.warning("LLM returned empty query, using original") 

166 return query 

167 

168 except Exception: 

169 logger.exception("Error optimizing query with LLM") 

170 return query 

171 

172 def _search_github(self, query: str) -> List[Dict[str, Any]]: 

173 """ 

174 Perform a GitHub search based on the configured search type. 

175 

176 Args: 

177 query: The search query 

178 

179 Returns: 

180 List of GitHub search result items 

181 """ 

182 results = [] 

183 

184 try: 

185 # Optimize GitHub query using LLM 

186 github_query = self._optimize_github_query(query) 

187 

188 logger.info(f"Final GitHub query: {github_query}") 

189 

190 # Construct search parameters 

191 params = { 

192 "q": github_query, 

193 "per_page": min( 

194 self.max_results, 100 

195 ), # GitHub API max is 100 per page 

196 "page": 1, 

197 } 

198 

199 # Add sort parameters based on search type 

200 if self.search_type == "repositories": 

201 params["sort"] = "stars" 

202 params["order"] = "desc" 

203 elif self.search_type == "code": 

204 params["sort"] = "indexed" 

205 params["order"] = "desc" 

206 elif self.search_type == "issues": 

207 params["sort"] = "updated" 

208 params["order"] = "desc" 

209 elif self.search_type == "users": 209 ↛ 214line 209 didn't jump to line 214 because the condition on line 209 was always true

210 params["sort"] = "followers" 

211 params["order"] = "desc" 

212 

213 # Apply rate limiting before request 

214 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

215 self.engine_type 

216 ) 

217 

218 # Execute the API request 

219 response = safe_get( 

220 self.search_endpoint, headers=self.headers, params=params 

221 ) 

222 

223 # Check for rate limiting 

224 self._handle_rate_limits(response) 

225 

226 # Handle response with detailed logging 

227 if response.status_code == 200: 

228 data = response.json() 

229 total_count = data.get("total_count", 0) 

230 results = data.get("items", []) 

231 logger.info( 

232 f"GitHub search returned {len(results)} results (total available: {total_count})" 

233 ) 

234 

235 # Log the rate limit information 

236 rate_limit_remaining = response.headers.get( 

237 "X-RateLimit-Remaining", "unknown" 

238 ) 

239 logger.info( 

240 f"GitHub API rate limit: {rate_limit_remaining} requests remaining" 

241 ) 

242 

243 # If no results, try to provide more guidance 

244 if not results: 

245 logger.warning( 

246 "No results found. Consider these search tips:" 

247 ) 

248 logger.warning("1. Use shorter, more specific queries") 

249 logger.warning( 

250 "2. For repositories, try adding 'stars:>100' or 'language:python'" 

251 ) 

252 logger.warning( 

253 "3. For contribution opportunities, search for 'good-first-issue' or 'help-wanted'" 

254 ) 

255 else: 

256 logger.error( 

257 f"GitHub API error: {response.status_code} - {response.text}" 

258 ) 

259 

260 except Exception: 

261 logger.exception("Error searching GitHub") 

262 

263 return results 

264 

265 def _get_readme_content(self, repo_full_name: str) -> str: 

266 """ 

267 Get README content for a repository. 

268 

269 Args: 

270 repo_full_name: Full name of the repository (owner/repo) 

271 

272 Returns: 

273 Decoded README content or empty string if not found 

274 """ 

275 try: 

276 # Get README 

277 # Apply rate limiting before request 

278 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

279 self.engine_type 

280 ) 

281 

282 response = safe_get( 

283 f"{self.api_base}/repos/{repo_full_name}/readme", 

284 headers=self.headers, 

285 ) 

286 

287 # Check for rate limiting 

288 self._handle_rate_limits(response) 

289 

290 if response.status_code == 200: 

291 data = response.json() 

292 content: str = data.get("content", "") 

293 encoding = data.get("encoding", "") 

294 

295 if encoding == "base64" and content: 

296 return base64.b64decode(content).decode( 

297 "utf-8", errors="replace" 

298 ) 

299 return content 

300 logger.warning( 

301 f"Could not get README for {repo_full_name}: {response.status_code}" 

302 ) 

303 return "" 

304 

305 except Exception: 

306 logger.exception(f"Error getting README for {repo_full_name}") 

307 return "" 

308 

309 def _get_recent_issues( 

310 self, repo_full_name: str, limit: int = 5 

311 ) -> List[Dict[str, Any]]: 

312 """ 

313 Get recent issues for a repository. 

314 

315 Args: 

316 repo_full_name: Full name of the repository (owner/repo) 

317 limit: Maximum number of issues to return 

318 

319 Returns: 

320 List of recent issues 

321 """ 

322 issues = [] 

323 

324 try: 

325 # Get recent issues 

326 # Apply rate limiting before request 

327 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

328 self.engine_type 

329 ) 

330 

331 response = safe_get( 

332 f"{self.api_base}/repos/{repo_full_name}/issues", 

333 headers=self.headers, 

334 params={ 

335 "state": "all", 

336 "per_page": limit, 

337 "sort": "updated", 

338 "direction": "desc", 

339 }, 

340 ) 

341 

342 # Check for rate limiting 

343 self._handle_rate_limits(response) 

344 

345 if response.status_code == 200: 

346 issues = response.json() 

347 logger.info( 

348 f"Got {len(issues)} recent issues for {repo_full_name}" 

349 ) 

350 else: 

351 logger.warning( 

352 f"Could not get issues for {repo_full_name}: {response.status_code}" 

353 ) 

354 

355 except Exception: 

356 logger.exception(f"Error getting issues for {repo_full_name}") 

357 

358 return issues 

359 

360 def _get_file_content(self, file_url: str) -> str: 

361 """ 

362 Get content of a file from GitHub. 

363 

364 Args: 

365 file_url: API URL for the file 

366 

367 Returns: 

368 Decoded file content or empty string if not found 

369 """ 

370 try: 

371 # Apply rate limiting before request 

372 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

373 self.engine_type 

374 ) 

375 

376 # Get file content 

377 response = safe_get(file_url, headers=self.headers) 

378 

379 # Check for rate limiting 

380 self._handle_rate_limits(response) 

381 

382 if response.status_code == 200: 

383 data = response.json() 

384 content2: str = data.get("content", "") 

385 encoding = data.get("encoding", "") 

386 

387 if encoding == "base64" and content2: 

388 return base64.b64decode(content2).decode( 

389 "utf-8", errors="replace" 

390 ) 

391 return content2 

392 logger.warning( 

393 f"Could not get file content: {response.status_code}" 

394 ) 

395 return "" 

396 

397 except Exception: 

398 logger.exception("Error getting file content") 

399 return "" 

400 

401 def _format_repository_preview( 

402 self, repo: Dict[str, Any] 

403 ) -> Dict[str, Any]: 

404 """Format repository search result as preview""" 

405 return { 

406 "id": str(repo.get("id", "")), 

407 "title": repo.get("full_name", ""), 

408 "link": repo.get("html_url", ""), 

409 "snippet": repo.get("description", "No description provided"), 

410 "stars": repo.get("stargazers_count", 0), 

411 "forks": repo.get("forks_count", 0), 

412 "language": repo.get("language", ""), 

413 "updated_at": repo.get("updated_at", ""), 

414 "created_at": repo.get("created_at", ""), 

415 "topics": repo.get("topics", []), 

416 "owner": repo.get("owner", {}).get("login", ""), 

417 "is_fork": repo.get("fork", False), 

418 "search_type": "repository", 

419 "repo_full_name": repo.get("full_name", ""), 

420 } 

421 

422 def _format_code_preview(self, code: Dict[str, Any]) -> Dict[str, Any]: 

423 """Format code search result as preview""" 

424 repo = code.get("repository", {}) 

425 return { 

426 "id": f"code_{code.get('sha', '')}", 

427 "title": f"{code.get('name', '')} in {repo.get('full_name', '')}", 

428 "link": code.get("html_url", ""), 

429 "snippet": f"Match in {code.get('path', '')}", 

430 "path": code.get("path", ""), 

431 "repo_name": repo.get("full_name", ""), 

432 "repo_url": repo.get("html_url", ""), 

433 "search_type": "code", 

434 "file_url": code.get("url", ""), 

435 } 

436 

437 def _format_issue_preview(self, issue: Dict[str, Any]) -> Dict[str, Any]: 

438 """Format issue search result as preview""" 

439 repo = ( 

440 issue.get("repository", {}) 

441 if "repository" in issue 

442 else {"full_name": ""} 

443 ) 

444 return { 

445 "id": f"issue_{issue.get('number', '')}", 

446 "title": issue.get("title", ""), 

447 "link": issue.get("html_url", ""), 

448 "snippet": ( 

449 issue.get("body", "")[:200] + "..." 

450 if len(issue.get("body", "")) > 200 

451 else issue.get("body", "") 

452 ), 

453 "state": issue.get("state", ""), 

454 "created_at": issue.get("created_at", ""), 

455 "updated_at": issue.get("updated_at", ""), 

456 "user": issue.get("user", {}).get("login", ""), 

457 "comments": issue.get("comments", 0), 

458 "search_type": "issue", 

459 "repo_name": repo.get("full_name", ""), 

460 } 

461 

462 def _format_user_preview(self, user: Dict[str, Any]) -> Dict[str, Any]: 

463 """Format user search result as preview""" 

464 return { 

465 "id": f"user_{user.get('id', '')}", 

466 "title": user.get("login", ""), 

467 "link": user.get("html_url", ""), 

468 "snippet": user.get("bio", "No bio provided"), 

469 "name": user.get("name", ""), 

470 "followers": user.get("followers", 0), 

471 "public_repos": user.get("public_repos", 0), 

472 "location": user.get("location", ""), 

473 "search_type": "user", 

474 "user_login": user.get("login", ""), 

475 } 

476 

477 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

478 """ 

479 Get preview information for GitHub search results. 

480 

481 Args: 

482 query: The search query 

483 

484 Returns: 

485 List of preview dictionaries 

486 """ 

487 logger.info(f"Getting GitHub previews for query: {query}") 

488 

489 # For contribution-focused queries, automatically adjust search type and add filters 

490 if any( 

491 term in query.lower() 

492 for term in [ 

493 "contribute", 

494 "contributing", 

495 "contribution", 

496 "beginner", 

497 "newcomer", 

498 ] 

499 ): 

500 # Use repositories search with help-wanted or good-first-issue labels 

501 original_search_type = self.search_type 

502 self.search_type = "repositories" 

503 self.search_endpoint = f"{self.api_base}/search/repositories" 

504 

505 # Create a specialized query for finding beginner-friendly projects 

506 specialized_query = "good-first-issues:>5 is:public archived:false" 

507 

508 # Extract language preferences if present 

509 languages = [] 

510 for lang in [ 

511 "python", 

512 "javascript", 

513 "java", 

514 "rust", 

515 "go", 

516 "typescript", 

517 "c#", 

518 "c++", 

519 "ruby", 

520 ]: 

521 if lang in query.lower(): 

522 languages.append(lang) 

523 

524 if languages: 

525 specialized_query += f" language:{' language:'.join(languages)}" 

526 

527 # Extract keywords 

528 keywords = [ 

529 word 

530 for word in query.split() 

531 if len(word) > 3 

532 and word.lower() 

533 not in [ 

534 "recommend", 

535 "recommended", 

536 "github", 

537 "repositories", 

538 "looking", 

539 "developers", 

540 "contribute", 

541 "contributing", 

542 "beginner", 

543 "newcomer", 

544 ] 

545 ] 

546 

547 if keywords: 547 ↛ 552line 547 didn't jump to line 552 because the condition on line 547 was always true

548 specialized_query += " " + " ".join( 

549 keywords[:5] 

550 ) # Add up to 5 keywords 

551 

552 logger.info( 

553 f"Using specialized contribution query: {specialized_query}" 

554 ) 

555 

556 # Perform GitHub search with specialized query 

557 results = self._search_github(specialized_query) 

558 

559 # Restore original search type 

560 self.search_type = original_search_type 

561 self.search_endpoint = f"{self.api_base}/search/{self.search_type}" 

562 else: 

563 # Perform standard GitHub search 

564 results = self._search_github(query) 

565 

566 if not results: 

567 logger.warning(f"No GitHub results found for query: {query}") 

568 return [] 

569 

570 # Format results as previews 

571 previews = [] 

572 for result in results: 

573 # Format based on search type 

574 if self.search_type == "repositories": 

575 preview = self._format_repository_preview(result) 

576 elif self.search_type == "code": 

577 preview = self._format_code_preview(result) 

578 elif self.search_type == "issues": 

579 preview = self._format_issue_preview(result) 

580 elif self.search_type == "users": 

581 preview = self._format_user_preview(result) 

582 else: 

583 logger.warning(f"Unknown search type: {self.search_type}") 

584 continue 

585 

586 previews.append(preview) 

587 

588 logger.info(f"Formatted {len(previews)} GitHub preview results") 

589 return previews 

590 

591 def _get_full_content( 

592 self, relevant_items: List[Dict[str, Any]] 

593 ) -> List[Dict[str, Any]]: 

594 """ 

595 Get full content for the relevant GitHub search results. 

596 

597 Args: 

598 relevant_items: List of relevant preview dictionaries 

599 

600 Returns: 

601 List of result dictionaries with full content 

602 """ 

603 # Check if we should add full content 

604 if ( 

605 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

606 and search_config.SEARCH_SNIPPETS_ONLY 

607 ): 

608 logger.info("Snippet-only mode, skipping full content retrieval") 

609 return relevant_items 

610 

611 logger.info( 

612 f"Getting full content for {len(relevant_items)} GitHub results" 

613 ) 

614 

615 results = [] 

616 for item in relevant_items: 

617 result = item.copy() 

618 search_type = item.get("search_type", "") 

619 

620 # Add content based on search type 

621 if search_type == "repository" and self.include_readme: 

622 repo_full_name = item.get("repo_full_name", "") 

623 if repo_full_name: 

624 # Get README content 

625 readme_content = self._get_readme_content(repo_full_name) 

626 result["full_content"] = readme_content 

627 result["content_type"] = "readme" 

628 

629 # Get recent issues if requested 

630 if self.include_issues: 

631 issues = self._get_recent_issues(repo_full_name) 

632 result["recent_issues"] = issues 

633 

634 elif search_type == "code": 

635 file_url = item.get("file_url", "") 

636 if file_url: 

637 # Get file content 

638 file_content = self._get_file_content(file_url) 

639 result["full_content"] = file_content 

640 result["content_type"] = "file" 

641 

642 elif search_type == "issue": 

643 # For issues, the snippet usually contains a summary already 

644 # We'll just keep it as is 

645 result["full_content"] = item.get("snippet", "") 

646 result["content_type"] = "issue" 

647 

648 elif search_type == "user": 

649 # For users, construct a profile summary 

650 profile_summary = f"GitHub user: {item.get('title', '')}\n" 

651 

652 if item.get("name"): 

653 profile_summary += f"Name: {item.get('name')}\n" 

654 

655 if item.get("location"): 

656 profile_summary += f"Location: {item.get('location')}\n" 

657 

658 profile_summary += f"Followers: {item.get('followers', 0)}\n" 

659 profile_summary += ( 

660 f"Public repositories: {item.get('public_repos', 0)}\n" 

661 ) 

662 

663 if ( 

664 item.get("snippet") 

665 and item.get("snippet") != "No bio provided" 

666 ): 

667 profile_summary += f"\nBio: {item.get('snippet')}\n" 

668 

669 result["full_content"] = profile_summary 

670 result["content_type"] = "user_profile" 

671 

672 results.append(result) 

673 

674 return results 

675 

676 def search_repository( 

677 self, repo_owner: str, repo_name: str 

678 ) -> Dict[str, Any]: 

679 """ 

680 Get detailed information about a specific repository. 

681 

682 Args: 

683 repo_owner: Owner of the repository 

684 repo_name: Name of the repository 

685 

686 Returns: 

687 Dictionary with repository information 

688 """ 

689 repo_full_name = f"{repo_owner}/{repo_name}" 

690 logger.info(f"Getting details for repository: {repo_full_name}") 

691 

692 try: 

693 # Get repository details 

694 # Apply rate limiting before request 

695 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

696 self.engine_type 

697 ) 

698 

699 response = safe_get( 

700 f"{self.api_base}/repos/{repo_full_name}", headers=self.headers 

701 ) 

702 

703 # Check for rate limiting 

704 self._handle_rate_limits(response) 

705 

706 if response.status_code == 200: 

707 repo = response.json() 

708 

709 # Format as repository preview 

710 result = self._format_repository_preview(repo) 

711 

712 # Add README content if requested 

713 if self.include_readme: 

714 readme_content = self._get_readme_content(repo_full_name) 

715 result["full_content"] = readme_content 

716 result["content_type"] = "readme" 

717 

718 # Add recent issues if requested 

719 if self.include_issues: 

720 issues = self._get_recent_issues(repo_full_name) 

721 result["recent_issues"] = issues 

722 

723 return result 

724 logger.error( 

725 f"Error getting repository details: {response.status_code} - {response.text}" 

726 ) 

727 return {} 

728 

729 except Exception: 

730 logger.exception("Error getting repository details") 

731 return {} 

732 

733 def search_code( 

734 self, 

735 query: str, 

736 language: Optional[str] = None, 

737 user: Optional[str] = None, 

738 ) -> List[Dict[str, Any]]: 

739 """ 

740 Search for code with more specific parameters. 

741 

742 Args: 

743 query: Code search query 

744 language: Filter by programming language 

745 user: Filter by GitHub username/organization 

746 

747 Returns: 

748 List of code search results 

749 """ 

750 # Build advanced query 

751 advanced_query = query 

752 

753 if language: 

754 advanced_query += f" language:{language}" 

755 

756 if user: 

757 advanced_query += f" user:{user}" 

758 

759 # Save current search type 

760 original_search_type = self.search_type 

761 

762 try: 

763 # Set search type to code 

764 self.search_type = "code" 

765 self.search_endpoint = f"{self.api_base}/search/code" 

766 

767 # Perform search 

768 results = self._search_github(advanced_query) 

769 

770 # Format results 

771 previews = [self._format_code_preview(result) for result in results] 

772 

773 # Get full content if requested 

774 if ( 

775 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

776 and not search_config.SEARCH_SNIPPETS_ONLY 

777 ): 

778 return self._get_full_content(previews) 

779 

780 return previews 

781 

782 finally: 

783 # Restore original search type 

784 self.search_type = original_search_type 

785 self.search_endpoint = f"{self.api_base}/search/{self.search_type}" 

786 

787 def search_issues( 

788 self, query: str, state: str = "open", sort: str = "updated" 

789 ) -> List[Dict[str, Any]]: 

790 """ 

791 Search for issues with more specific parameters. 

792 

793 Args: 

794 query: Issue search query 

795 state: Filter by issue state ("open", "closed", "all") 

796 sort: Sort order ("updated", "created", "comments") 

797 

798 Returns: 

799 List of issue search results 

800 """ 

801 # Build advanced query 

802 advanced_query = query + f" state:{state}" 

803 

804 # Save current search type 

805 original_search_type = self.search_type 

806 

807 try: 

808 # Set search type to issues 

809 self.search_type = "issues" 

810 self.search_endpoint = f"{self.api_base}/search/issues" 

811 

812 # Set sort parameter 

813 params = { 

814 "q": advanced_query, 

815 "per_page": min(self.max_results, 100), 

816 "page": 1, 

817 "sort": sort, 

818 "order": "desc", 

819 } 

820 

821 # Perform search 

822 response = safe_get( 

823 self.search_endpoint, headers=self.headers, params=params 

824 ) 

825 

826 # Check for rate limiting 

827 self._handle_rate_limits(response) 

828 

829 if response.status_code == 200: 

830 data = response.json() 

831 results = data.get("items", []) 

832 

833 # Format results 

834 return [ 

835 self._format_issue_preview(result) for result in results 

836 ] 

837 

838 # For issues, we don't need to get full content 

839 logger.error( 

840 f"GitHub API error: {response.status_code} - {response.text}" 

841 ) 

842 return [] 

843 

844 finally: 

845 # Restore original search type 

846 self.search_type = original_search_type 

847 self.search_endpoint = f"{self.api_base}/search/{self.search_type}" 

848 

849 def set_search_type(self, search_type: str): 

850 """ 

851 Set the search type for subsequent searches. 

852 

853 Args: 

854 search_type: Type of GitHub search ("repositories", "code", "issues", "users") 

855 """ 

856 if search_type in ["repositories", "code", "issues", "users"]: 

857 self.search_type = search_type 

858 self.search_endpoint = f"{self.api_base}/search/{search_type}" 

859 logger.info(f"Set GitHub search type to: {search_type}") 

860 else: 

861 logger.error(f"Invalid GitHub search type: {search_type}") 

862 

863 def _filter_for_relevance( 

864 self, previews: List[Dict[str, Any]], query: str 

865 ) -> List[Dict[str, Any]]: 

866 """ 

867 Filter GitHub search results for relevance using LLM. 

868 

869 Args: 

870 previews: List of preview dictionaries 

871 query: Original search query 

872 

873 Returns: 

874 List of relevant preview dictionaries 

875 """ 

876 if not self.llm or not previews: 

877 return previews 

878 

879 # Create a specialized prompt for GitHub results 

880 prompt = f"""Analyze these GitHub search results and rank them by relevance to the query. 

881Consider: 

8821. Repository stars and activity (higher is better) 

8832. Match between query intent and repository description 

8843. Repository language and topics 

8854. Last update time (more recent is better) 

8865. Whether it's a fork (original repositories are preferred) 

887 

888Query: "{query}" 

889 

890Results: 

891{json.dumps(previews, indent=2)} 

892 

893Return ONLY a JSON array of indices in order of relevance (most relevant first). 

894Example: [0, 2, 1, 3] 

895Do not include any other text or explanation.""" 

896 

897 try: 

898 response = self.llm.invoke(prompt) 

899 response_text = get_llm_response_text(response) 

900 

901 ranked_indices = extract_json(response_text, expected_type=list) 

902 

903 if ranked_indices is not None: 

904 # Return the results in ranked order 

905 ranked_results = [] 

906 for idx in ranked_indices: 

907 if idx < len(previews): 

908 ranked_results.append(previews[idx]) 

909 

910 # Limit to max_filtered_results if specified 

911 if ( 

912 self.max_filtered_results 

913 and len(ranked_results) > self.max_filtered_results 

914 ): 

915 logger.info( 

916 f"Limiting filtered results to top {self.max_filtered_results}" 

917 ) 

918 return ranked_results[: self.max_filtered_results] 

919 

920 return ranked_results 

921 logger.info( 

922 "Could not find JSON array in response, returning no previews" 

923 ) 

924 return [] 

925 

926 except Exception: 

927 logger.exception("Error filtering GitHub results") 

928 return []