Coverage for src / local_deep_research / web_search_engines / engines / search_engine_github.py: 74%

318 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1import base64 

2import json 

3import time 

4from typing import Any, Dict, List, Optional 

5 

6from langchain_core.language_models import BaseLLM 

7from loguru import logger 

8 

9from ...config import llm_config, search_config 

10from ...security.safe_requests import safe_get 

11from ...utilities.json_utils import extract_json, get_llm_response_text 

12from ..search_engine_base import BaseSearchEngine 

13 

14 

15class GitHubSearchEngine(BaseSearchEngine): 

16 """ 

17 GitHub search engine implementation. 

18 Provides search across GitHub repositories, code, issues, and users. 

19 """ 

20 

21 def __init__( 

22 self, 

23 max_results: int = 15, 

24 api_key: Optional[str] = None, 

25 search_type: str = "repositories", 

26 include_readme: bool = True, 

27 include_issues: bool = False, 

28 llm: Optional[BaseLLM] = None, 

29 max_filtered_results: Optional[int] = None, 

30 ): 

31 """ 

32 Initialize the GitHub search engine. 

33 

34 Args: 

35 max_results: Maximum number of search results 

36 api_key: GitHub API token (can also be set in GITHUB_API_KEY env) 

37 search_type: Type of GitHub search ("repositories", "code", "issues", "users") 

38 include_readme: Whether to include README content for repositories 

39 include_issues: Whether to include recent issues for repositories 

40 llm: Language model for relevance filtering 

41 max_filtered_results: Maximum number of results to keep after filtering 

42 """ 

43 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results 

44 super().__init__( 

45 llm=llm, 

46 max_filtered_results=max_filtered_results, 

47 max_results=max_results, 

48 ) 

49 self.api_key = api_key 

50 self.search_type = search_type 

51 self.include_readme = include_readme 

52 self.include_issues = include_issues 

53 

54 # API endpoints 

55 self.api_base = "https://api.github.com" 

56 self.search_endpoint = f"{self.api_base}/search/{search_type}" 

57 

58 # Set up API headers 

59 self.headers = { 

60 "Accept": "application/vnd.github.v3+json", 

61 "User-Agent": "Local-Deep-Research-Agent", 

62 } 

63 

64 # Add authentication if API key provided 

65 if self.api_key: 

66 self.headers["Authorization"] = f"token {self.api_key}" 

67 logger.info("Using authenticated GitHub API requests") 

68 else: 

69 logger.warning( 

70 "No GitHub API key provided. Rate limits will be restricted." 

71 ) 

72 

73 def _handle_rate_limits(self, response): 

74 """Handle GitHub API rate limits by logging warnings and sleeping if necessary""" 

75 remaining = int(response.headers.get("X-RateLimit-Remaining", 60)) 

76 reset_time = int(response.headers.get("X-RateLimit-Reset", 0)) 

77 

78 if remaining < 5: 

79 current_time = time.time() 

80 wait_time = max(reset_time - current_time, 0) 

81 logger.warning( 

82 f"GitHub API rate limit almost reached. {remaining} requests remaining." 

83 ) 

84 

85 if wait_time > 0 and remaining == 0: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 logger.warning( 

87 f"GitHub API rate limit exceeded. Waiting {wait_time:.0f} seconds." 

88 ) 

89 time.sleep(min(wait_time, 60)) # Wait at most 60 seconds 

90 

91 def _optimize_github_query(self, query: str) -> str: 

92 """ 

93 Optimize the GitHub search query using LLM to improve search results. 

94 

95 Args: 

96 query: Original search query 

97 

98 Returns: 

99 Optimized GitHub search query 

100 """ 

101 # Get LLM from config if not already set 

102 if not self.llm: 

103 try: 

104 self.llm = llm_config.get_llm() 

105 if not self.llm: 

106 logger.warning("No LLM available for query optimization") 

107 return query 

108 except Exception: 

109 logger.exception("Error getting LLM from config") 

110 return query 

111 

112 prompt = f"""Transform this GitHub search query into an optimized version for the GitHub search API. Follow these steps: 

113 1. Strip question words (e.g., 'what', 'are', 'is'), stop words (e.g., 'and', 'as', 'of', 'on'), and redundant terms (e.g., 'repositories', 'repos', 'github') since they're implied by the search context. 

114 2. Keep only domain-specific keywords and avoid using "-related" terms. 

115 3. Add GitHub-specific filters with dynamic thresholds based on query context: 

116 - For stars: Use higher threshold (e.g., 'stars:>1000') for mainstream topics, lower (e.g., 'stars:>50') for specialized topics 

117 - For language: Detect programming language from query or omit if unclear 

118 - For search scope: Use 'in:name,description,readme' for general queries, 'in:file' for code-specific queries 

119 4. For date ranges, adapt based on query context: 

120 - For emerging: Use 'created:>2024-01-01' 

121 - For mature: Use 'pushed:>2023-01-01' 

122 - For historical research: Use 'created:2020-01-01..2024-01-01' 

123 5. For excluding results, adapt based on query: 

124 - Exclude irrelevant languages based on context 

125 - Use 'NOT' to exclude competing terms 

126 6. Ensure the output is a concise, space-separated string with no punctuation or extra text beyond keywords and filters. 

127 

128 

129 Original query: "{query}" 

130 

131 Return ONLY the optimized query, ready for GitHub's search API. Do not include explanations or additional text.""" 

132 

133 try: 

134 response = self.llm.invoke(prompt) 

135 

136 # Handle different response formats (string or object with content attribute) 

137 if hasattr(response, "content"): 137 ↛ 141line 137 didn't jump to line 141 because the condition on line 137 was always true

138 optimized_query = response.content.strip() 

139 else: 

140 # Handle string responses 

141 optimized_query = str(response).strip() 

142 

143 # Validate the optimized query 

144 if optimized_query and len(optimized_query) > 0: 

145 logger.info( 

146 f"LLM optimized query from '{query}' to '{optimized_query}'" 

147 ) 

148 return optimized_query 

149 else: 

150 logger.warning("LLM returned empty query, using original") 

151 return query 

152 

153 except Exception: 

154 logger.exception("Error optimizing query with LLM") 

155 return query 

156 

157 def _search_github(self, query: str) -> List[Dict[str, Any]]: 

158 """ 

159 Perform a GitHub search based on the configured search type. 

160 

161 Args: 

162 query: The search query 

163 

164 Returns: 

165 List of GitHub search result items 

166 """ 

167 results = [] 

168 

169 try: 

170 # Optimize GitHub query using LLM 

171 github_query = self._optimize_github_query(query) 

172 

173 logger.info(f"Final GitHub query: {github_query}") 

174 

175 # Construct search parameters 

176 params = { 

177 "q": github_query, 

178 "per_page": min( 

179 self.max_results, 100 

180 ), # GitHub API max is 100 per page 

181 "page": 1, 

182 } 

183 

184 # Add sort parameters based on search type 

185 if self.search_type == "repositories": 185 ↛ 188line 185 didn't jump to line 188 because the condition on line 185 was always true

186 params["sort"] = "stars" 

187 params["order"] = "desc" 

188 elif self.search_type == "code": 

189 params["sort"] = "indexed" 

190 params["order"] = "desc" 

191 elif self.search_type == "issues": 

192 params["sort"] = "updated" 

193 params["order"] = "desc" 

194 elif self.search_type == "users": 

195 params["sort"] = "followers" 

196 params["order"] = "desc" 

197 

198 # Apply rate limiting before request 

199 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

200 self.engine_type 

201 ) 

202 

203 # Execute the API request 

204 response = safe_get( 

205 self.search_endpoint, headers=self.headers, params=params 

206 ) 

207 

208 # Check for rate limiting 

209 self._handle_rate_limits(response) 

210 

211 # Handle response with detailed logging 

212 if response.status_code == 200: 

213 data = response.json() 

214 total_count = data.get("total_count", 0) 

215 results = data.get("items", []) 

216 logger.info( 

217 f"GitHub search returned {len(results)} results (total available: {total_count})" 

218 ) 

219 

220 # Log the rate limit information 

221 rate_limit_remaining = response.headers.get( 

222 "X-RateLimit-Remaining", "unknown" 

223 ) 

224 logger.info( 

225 f"GitHub API rate limit: {rate_limit_remaining} requests remaining" 

226 ) 

227 

228 # If no results, try to provide more guidance 

229 if not results: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true

230 logger.warning( 

231 "No results found. Consider these search tips:" 

232 ) 

233 logger.warning("1. Use shorter, more specific queries") 

234 logger.warning( 

235 "2. For repositories, try adding 'stars:>100' or 'language:python'" 

236 ) 

237 logger.warning( 

238 "3. For contribution opportunities, search for 'good-first-issue' or 'help-wanted'" 

239 ) 

240 else: 

241 logger.error( 

242 f"GitHub API error: {response.status_code} - {response.text}" 

243 ) 

244 

245 except Exception: 

246 logger.exception("Error searching GitHub") 

247 

248 return results 

249 

250 def _get_readme_content(self, repo_full_name: str) -> str: 

251 """ 

252 Get README content for a repository. 

253 

254 Args: 

255 repo_full_name: Full name of the repository (owner/repo) 

256 

257 Returns: 

258 Decoded README content or empty string if not found 

259 """ 

260 try: 

261 # Get README 

262 # Apply rate limiting before request 

263 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

264 self.engine_type 

265 ) 

266 

267 response = safe_get( 

268 f"{self.api_base}/repos/{repo_full_name}/readme", 

269 headers=self.headers, 

270 ) 

271 

272 # Check for rate limiting 

273 self._handle_rate_limits(response) 

274 

275 if response.status_code == 200: 

276 data = response.json() 

277 content = data.get("content", "") 

278 encoding = data.get("encoding", "") 

279 

280 if encoding == "base64" and content: 280 ↛ 284line 280 didn't jump to line 284 because the condition on line 280 was always true

281 return base64.b64decode(content).decode( 

282 "utf-8", errors="replace" 

283 ) 

284 return content 

285 else: 

286 logger.warning( 

287 f"Could not get README for {repo_full_name}: {response.status_code}" 

288 ) 

289 return "" 

290 

291 except Exception: 

292 logger.exception(f"Error getting README for {repo_full_name}") 

293 return "" 

294 

295 def _get_recent_issues( 

296 self, repo_full_name: str, limit: int = 5 

297 ) -> List[Dict[str, Any]]: 

298 """ 

299 Get recent issues for a repository. 

300 

301 Args: 

302 repo_full_name: Full name of the repository (owner/repo) 

303 limit: Maximum number of issues to return 

304 

305 Returns: 

306 List of recent issues 

307 """ 

308 issues = [] 

309 

310 try: 

311 # Get recent issues 

312 # Apply rate limiting before request 

313 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

314 self.engine_type 

315 ) 

316 

317 response = safe_get( 

318 f"{self.api_base}/repos/{repo_full_name}/issues", 

319 headers=self.headers, 

320 params={ 

321 "state": "all", 

322 "per_page": limit, 

323 "sort": "updated", 

324 "direction": "desc", 

325 }, 

326 ) 

327 

328 # Check for rate limiting 

329 self._handle_rate_limits(response) 

330 

331 if response.status_code == 200: 

332 issues = response.json() 

333 logger.info( 

334 f"Got {len(issues)} recent issues for {repo_full_name}" 

335 ) 

336 else: 

337 logger.warning( 

338 f"Could not get issues for {repo_full_name}: {response.status_code}" 

339 ) 

340 

341 except Exception: 

342 logger.exception(f"Error getting issues for {repo_full_name}") 

343 

344 return issues 

345 

346 def _get_file_content(self, file_url: str) -> str: 

347 """ 

348 Get content of a file from GitHub. 

349 

350 Args: 

351 file_url: API URL for the file 

352 

353 Returns: 

354 Decoded file content or empty string if not found 

355 """ 

356 try: 

357 # Apply rate limiting before request 

358 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

359 self.engine_type 

360 ) 

361 

362 # Get file content 

363 response = safe_get(file_url, headers=self.headers) 

364 

365 # Check for rate limiting 

366 self._handle_rate_limits(response) 

367 

368 if response.status_code == 200: 368 ↛ 379line 368 didn't jump to line 379 because the condition on line 368 was always true

369 data = response.json() 

370 content = data.get("content", "") 

371 encoding = data.get("encoding", "") 

372 

373 if encoding == "base64" and content: 373 ↛ 377line 373 didn't jump to line 377 because the condition on line 373 was always true

374 return base64.b64decode(content).decode( 

375 "utf-8", errors="replace" 

376 ) 

377 return content 

378 else: 

379 logger.warning( 

380 f"Could not get file content: {response.status_code}" 

381 ) 

382 return "" 

383 

384 except Exception: 

385 logger.exception("Error getting file content") 

386 return "" 

387 

388 def _format_repository_preview( 

389 self, repo: Dict[str, Any] 

390 ) -> Dict[str, Any]: 

391 """Format repository search result as preview""" 

392 return { 

393 "id": str(repo.get("id", "")), 

394 "title": repo.get("full_name", ""), 

395 "link": repo.get("html_url", ""), 

396 "snippet": repo.get("description", "No description provided"), 

397 "stars": repo.get("stargazers_count", 0), 

398 "forks": repo.get("forks_count", 0), 

399 "language": repo.get("language", ""), 

400 "updated_at": repo.get("updated_at", ""), 

401 "created_at": repo.get("created_at", ""), 

402 "topics": repo.get("topics", []), 

403 "owner": repo.get("owner", {}).get("login", ""), 

404 "is_fork": repo.get("fork", False), 

405 "search_type": "repository", 

406 "repo_full_name": repo.get("full_name", ""), 

407 } 

408 

409 def _format_code_preview(self, code: Dict[str, Any]) -> Dict[str, Any]: 

410 """Format code search result as preview""" 

411 repo = code.get("repository", {}) 

412 return { 

413 "id": f"code_{code.get('sha', '')}", 

414 "title": f"{code.get('name', '')} in {repo.get('full_name', '')}", 

415 "link": code.get("html_url", ""), 

416 "snippet": f"Match in {code.get('path', '')}", 

417 "path": code.get("path", ""), 

418 "repo_name": repo.get("full_name", ""), 

419 "repo_url": repo.get("html_url", ""), 

420 "search_type": "code", 

421 "file_url": code.get("url", ""), 

422 } 

423 

424 def _format_issue_preview(self, issue: Dict[str, Any]) -> Dict[str, Any]: 

425 """Format issue search result as preview""" 

426 repo = ( 

427 issue.get("repository", {}) 

428 if "repository" in issue 

429 else {"full_name": ""} 

430 ) 

431 return { 

432 "id": f"issue_{issue.get('number', '')}", 

433 "title": issue.get("title", ""), 

434 "link": issue.get("html_url", ""), 

435 "snippet": ( 

436 issue.get("body", "")[:200] + "..." 

437 if len(issue.get("body", "")) > 200 

438 else issue.get("body", "") 

439 ), 

440 "state": issue.get("state", ""), 

441 "created_at": issue.get("created_at", ""), 

442 "updated_at": issue.get("updated_at", ""), 

443 "user": issue.get("user", {}).get("login", ""), 

444 "comments": issue.get("comments", 0), 

445 "search_type": "issue", 

446 "repo_name": repo.get("full_name", ""), 

447 } 

448 

449 def _format_user_preview(self, user: Dict[str, Any]) -> Dict[str, Any]: 

450 """Format user search result as preview""" 

451 return { 

452 "id": f"user_{user.get('id', '')}", 

453 "title": user.get("login", ""), 

454 "link": user.get("html_url", ""), 

455 "snippet": user.get("bio", "No bio provided"), 

456 "name": user.get("name", ""), 

457 "followers": user.get("followers", 0), 

458 "public_repos": user.get("public_repos", 0), 

459 "location": user.get("location", ""), 

460 "search_type": "user", 

461 "user_login": user.get("login", ""), 

462 } 

463 

464 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

465 """ 

466 Get preview information for GitHub search results. 

467 

468 Args: 

469 query: The search query 

470 

471 Returns: 

472 List of preview dictionaries 

473 """ 

474 logger.info(f"Getting GitHub previews for query: {query}") 

475 

476 # For contribution-focused queries, automatically adjust search type and add filters 

477 if any( 

478 term in query.lower() 

479 for term in [ 

480 "contribute", 

481 "contributing", 

482 "contribution", 

483 "beginner", 

484 "newcomer", 

485 ] 

486 ): 

487 # Use repositories search with help-wanted or good-first-issue labels 

488 original_search_type = self.search_type 

489 self.search_type = "repositories" 

490 self.search_endpoint = f"{self.api_base}/search/repositories" 

491 

492 # Create a specialized query for finding beginner-friendly projects 

493 specialized_query = "good-first-issues:>5 is:public archived:false" 

494 

495 # Extract language preferences if present 

496 languages = [] 

497 for lang in [ 

498 "python", 

499 "javascript", 

500 "java", 

501 "rust", 

502 "go", 

503 "typescript", 

504 "c#", 

505 "c++", 

506 "ruby", 

507 ]: 

508 if lang in query.lower(): 

509 languages.append(lang) 

510 

511 if languages: 511 ↛ 515line 511 didn't jump to line 515 because the condition on line 511 was always true

512 specialized_query += f" language:{' language:'.join(languages)}" 

513 

514 # Extract keywords 

515 keywords = [ 

516 word 

517 for word in query.split() 

518 if len(word) > 3 

519 and word.lower() 

520 not in [ 

521 "recommend", 

522 "recommended", 

523 "github", 

524 "repositories", 

525 "looking", 

526 "developers", 

527 "contribute", 

528 "contributing", 

529 "beginner", 

530 "newcomer", 

531 ] 

532 ] 

533 

534 if keywords: 534 ↛ 539line 534 didn't jump to line 539 because the condition on line 534 was always true

535 specialized_query += " " + " ".join( 

536 keywords[:5] 

537 ) # Add up to 5 keywords 

538 

539 logger.info( 

540 f"Using specialized contribution query: {specialized_query}" 

541 ) 

542 

543 # Perform GitHub search with specialized query 

544 results = self._search_github(specialized_query) 

545 

546 # Restore original search type 

547 self.search_type = original_search_type 

548 self.search_endpoint = f"{self.api_base}/search/{self.search_type}" 

549 else: 

550 # Perform standard GitHub search 

551 results = self._search_github(query) 

552 

553 if not results: 

554 logger.warning(f"No GitHub results found for query: {query}") 

555 return [] 

556 

557 # Format results as previews 

558 previews = [] 

559 for result in results: 

560 # Format based on search type 

561 if self.search_type == "repositories": 

562 preview = self._format_repository_preview(result) 

563 elif self.search_type == "code": 

564 preview = self._format_code_preview(result) 

565 elif self.search_type == "issues": 565 ↛ 567line 565 didn't jump to line 567 because the condition on line 565 was always true

566 preview = self._format_issue_preview(result) 

567 elif self.search_type == "users": 

568 preview = self._format_user_preview(result) 

569 else: 

570 logger.warning(f"Unknown search type: {self.search_type}") 

571 continue 

572 

573 previews.append(preview) 

574 

575 logger.info(f"Formatted {len(previews)} GitHub preview results") 

576 return previews 

577 

578 def _get_full_content( 

579 self, relevant_items: List[Dict[str, Any]] 

580 ) -> List[Dict[str, Any]]: 

581 """ 

582 Get full content for the relevant GitHub search results. 

583 

584 Args: 

585 relevant_items: List of relevant preview dictionaries 

586 

587 Returns: 

588 List of result dictionaries with full content 

589 """ 

590 # Check if we should add full content 

591 if ( 591 ↛ 595line 591 didn't jump to line 595 because the condition on line 591 was never true

592 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

593 and search_config.SEARCH_SNIPPETS_ONLY 

594 ): 

595 logger.info("Snippet-only mode, skipping full content retrieval") 

596 return relevant_items 

597 

598 logger.info( 

599 f"Getting full content for {len(relevant_items)} GitHub results" 

600 ) 

601 

602 results = [] 

603 for item in relevant_items: 

604 result = item.copy() 

605 search_type = item.get("search_type", "") 

606 

607 # Add content based on search type 

608 if search_type == "repository" and self.include_readme: 

609 repo_full_name = item.get("repo_full_name", "") 

610 if repo_full_name: 610 ↛ 659line 610 didn't jump to line 659 because the condition on line 610 was always true

611 # Get README content 

612 readme_content = self._get_readme_content(repo_full_name) 

613 result["full_content"] = readme_content 

614 result["content_type"] = "readme" 

615 

616 # Get recent issues if requested 

617 if self.include_issues: 

618 issues = self._get_recent_issues(repo_full_name) 

619 result["recent_issues"] = issues 

620 

621 elif search_type == "code": 

622 file_url = item.get("file_url", "") 

623 if file_url: 623 ↛ 659line 623 didn't jump to line 659 because the condition on line 623 was always true

624 # Get file content 

625 file_content = self._get_file_content(file_url) 

626 result["full_content"] = file_content 

627 result["content_type"] = "file" 

628 

629 elif search_type == "issue": 629 ↛ 632line 629 didn't jump to line 632 because the condition on line 629 was never true

630 # For issues, the snippet usually contains a summary already 

631 # We'll just keep it as is 

632 result["full_content"] = item.get("snippet", "") 

633 result["content_type"] = "issue" 

634 

635 elif search_type == "user": 635 ↛ 659line 635 didn't jump to line 659 because the condition on line 635 was always true

636 # For users, construct a profile summary 

637 profile_summary = f"GitHub user: {item.get('title', '')}\n" 

638 

639 if item.get("name"): 639 ↛ 642line 639 didn't jump to line 642 because the condition on line 639 was always true

640 profile_summary += f"Name: {item.get('name')}\n" 

641 

642 if item.get("location"): 642 ↛ 645line 642 didn't jump to line 645 because the condition on line 642 was always true

643 profile_summary += f"Location: {item.get('location')}\n" 

644 

645 profile_summary += f"Followers: {item.get('followers', 0)}\n" 

646 profile_summary += ( 

647 f"Public repositories: {item.get('public_repos', 0)}\n" 

648 ) 

649 

650 if ( 650 ↛ 656line 650 didn't jump to line 656 because the condition on line 650 was always true

651 item.get("snippet") 

652 and item.get("snippet") != "No bio provided" 

653 ): 

654 profile_summary += f"\nBio: {item.get('snippet')}\n" 

655 

656 result["full_content"] = profile_summary 

657 result["content_type"] = "user_profile" 

658 

659 results.append(result) 

660 

661 return results 

662 

663 def search_repository( 

664 self, repo_owner: str, repo_name: str 

665 ) -> Dict[str, Any]: 

666 """ 

667 Get detailed information about a specific repository. 

668 

669 Args: 

670 repo_owner: Owner of the repository 

671 repo_name: Name of the repository 

672 

673 Returns: 

674 Dictionary with repository information 

675 """ 

676 repo_full_name = f"{repo_owner}/{repo_name}" 

677 logger.info(f"Getting details for repository: {repo_full_name}") 

678 

679 try: 

680 # Get repository details 

681 # Apply rate limiting before request 

682 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

683 self.engine_type 

684 ) 

685 

686 response = safe_get( 

687 f"{self.api_base}/repos/{repo_full_name}", headers=self.headers 

688 ) 

689 

690 # Check for rate limiting 

691 self._handle_rate_limits(response) 

692 

693 if response.status_code == 200: 

694 repo = response.json() 

695 

696 # Format as repository preview 

697 result = self._format_repository_preview(repo) 

698 

699 # Add README content if requested 

700 if self.include_readme: 700 ↛ 701line 700 didn't jump to line 701 because the condition on line 700 was never true

701 readme_content = self._get_readme_content(repo_full_name) 

702 result["full_content"] = readme_content 

703 result["content_type"] = "readme" 

704 

705 # Add recent issues if requested 

706 if self.include_issues: 706 ↛ 707line 706 didn't jump to line 707 because the condition on line 706 was never true

707 issues = self._get_recent_issues(repo_full_name) 

708 result["recent_issues"] = issues 

709 

710 return result 

711 else: 

712 logger.error( 

713 f"Error getting repository details: {response.status_code} - {response.text}" 

714 ) 

715 return {} 

716 

717 except Exception: 

718 logger.exception("Error getting repository details") 

719 return {} 

720 

721 def search_code( 

722 self, 

723 query: str, 

724 language: Optional[str] = None, 

725 user: Optional[str] = None, 

726 ) -> List[Dict[str, Any]]: 

727 """ 

728 Search for code with more specific parameters. 

729 

730 Args: 

731 query: Code search query 

732 language: Filter by programming language 

733 user: Filter by GitHub username/organization 

734 

735 Returns: 

736 List of code search results 

737 """ 

738 # Build advanced query 

739 advanced_query = query 

740 

741 if language: 741 ↛ 744line 741 didn't jump to line 744 because the condition on line 741 was always true

742 advanced_query += f" language:{language}" 

743 

744 if user: 744 ↛ 745line 744 didn't jump to line 745 because the condition on line 744 was never true

745 advanced_query += f" user:{user}" 

746 

747 # Save current search type 

748 original_search_type = self.search_type 

749 

750 try: 

751 # Set search type to code 

752 self.search_type = "code" 

753 self.search_endpoint = f"{self.api_base}/search/code" 

754 

755 # Perform search 

756 results = self._search_github(advanced_query) 

757 

758 # Format results 

759 previews = [self._format_code_preview(result) for result in results] 

760 

761 # Get full content if requested 

762 if ( 762 ↛ 766line 762 didn't jump to line 766 because the condition on line 762 was never true

763 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

764 and not search_config.SEARCH_SNIPPETS_ONLY 

765 ): 

766 return self._get_full_content(previews) 

767 

768 return previews 

769 

770 finally: 

771 # Restore original search type 

772 self.search_type = original_search_type 

773 self.search_endpoint = f"{self.api_base}/search/{self.search_type}" 

774 

775 def search_issues( 

776 self, query: str, state: str = "open", sort: str = "updated" 

777 ) -> List[Dict[str, Any]]: 

778 """ 

779 Search for issues with more specific parameters. 

780 

781 Args: 

782 query: Issue search query 

783 state: Filter by issue state ("open", "closed", "all") 

784 sort: Sort order ("updated", "created", "comments") 

785 

786 Returns: 

787 List of issue search results 

788 """ 

789 # Build advanced query 

790 advanced_query = query + f" state:{state}" 

791 

792 # Save current search type 

793 original_search_type = self.search_type 

794 

795 try: 

796 # Set search type to issues 

797 self.search_type = "issues" 

798 self.search_endpoint = f"{self.api_base}/search/issues" 

799 

800 # Set sort parameter 

801 params = { 

802 "q": advanced_query, 

803 "per_page": min(self.max_results, 100), 

804 "page": 1, 

805 "sort": sort, 

806 "order": "desc", 

807 } 

808 

809 # Perform search 

810 response = safe_get( 

811 self.search_endpoint, headers=self.headers, params=params 

812 ) 

813 

814 # Check for rate limiting 

815 self._handle_rate_limits(response) 

816 

817 if response.status_code == 200: 817 ↛ 829line 817 didn't jump to line 829 because the condition on line 817 was always true

818 data = response.json() 

819 results = data.get("items", []) 

820 

821 # Format results 

822 previews = [ 

823 self._format_issue_preview(result) for result in results 

824 ] 

825 

826 # For issues, we don't need to get full content 

827 return previews 

828 else: 

829 logger.error( 

830 f"GitHub API error: {response.status_code} - {response.text}" 

831 ) 

832 return [] 

833 

834 finally: 

835 # Restore original search type 

836 self.search_type = original_search_type 

837 self.search_endpoint = f"{self.api_base}/search/{self.search_type}" 

838 

839 def set_search_type(self, search_type: str): 

840 """ 

841 Set the search type for subsequent searches. 

842 

843 Args: 

844 search_type: Type of GitHub search ("repositories", "code", "issues", "users") 

845 """ 

846 if search_type in ["repositories", "code", "issues", "users"]: 

847 self.search_type = search_type 

848 self.search_endpoint = f"{self.api_base}/search/{search_type}" 

849 logger.info(f"Set GitHub search type to: {search_type}") 

850 else: 

851 logger.error(f"Invalid GitHub search type: {search_type}") 

852 

853 def _filter_for_relevance( 

854 self, previews: List[Dict[str, Any]], query: str 

855 ) -> List[Dict[str, Any]]: 

856 """ 

857 Filter GitHub search results for relevance using LLM. 

858 

859 Args: 

860 previews: List of preview dictionaries 

861 query: Original search query 

862 

863 Returns: 

864 List of relevant preview dictionaries 

865 """ 

866 if not self.llm or not previews: 

867 return previews 

868 

869 # Create a specialized prompt for GitHub results 

870 prompt = f"""Analyze these GitHub search results and rank them by relevance to the query. 

871Consider: 

8721. Repository stars and activity (higher is better) 

8732. Match between query intent and repository description 

8743. Repository language and topics 

8754. Last update time (more recent is better) 

8765. Whether it's a fork (original repositories are preferred) 

877 

878Query: "{query}" 

879 

880Results: 

881{json.dumps(previews, indent=2)} 

882 

883Return ONLY a JSON array of indices in order of relevance (most relevant first). 

884Example: [0, 2, 1, 3] 

885Do not include any other text or explanation.""" 

886 

887 try: 

888 response = self.llm.invoke(prompt) 

889 response_text = get_llm_response_text(response) 

890 

891 ranked_indices = extract_json(response_text, expected_type=list) 

892 

893 if ranked_indices is not None: 

894 # Return the results in ranked order 

895 ranked_results = [] 

896 for idx in ranked_indices: 

897 if idx < len(previews): 

898 ranked_results.append(previews[idx]) 

899 

900 # Limit to max_filtered_results if specified 

901 if ( 

902 self.max_filtered_results 

903 and len(ranked_results) > self.max_filtered_results 

904 ): 

905 logger.info( 

906 f"Limiting filtered results to top {self.max_filtered_results}" 

907 ) 

908 return ranked_results[: self.max_filtered_results] 

909 

910 return ranked_results 

911 else: 

912 logger.info( 

913 "Could not find JSON array in response, returning no previews" 

914 ) 

915 return [] 

916 

917 except Exception: 

918 logger.exception("Error filtering GitHub results") 

919 return []