Coverage for src / local_deep_research / web_search_engines / rate_limiting / tracker.py: 69%

288 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Adaptive rate limit tracker that learns optimal retry wait times for each search engine. 

3""" 

4 

5import random 

6import time 

7from collections import deque 

8from typing import Dict, List, Optional, Tuple 

9 

10from loguru import logger 

11 

12from ...settings.env_registry import use_fallback_llm, is_ci_environment 

13from ...utilities.thread_context import get_search_context 

14from ...config.thread_settings import ( 

15 get_settings_context, 

16 get_setting_from_snapshot, 

17 NoSettingsContextError, 

18) 

19 

20# Lazy imports to avoid database initialization in programmatic mode 

21_db_imports = None 

22 

23 

24def _get_db_imports(): 

25 """Lazy load database imports only when needed.""" 

26 global _db_imports 

27 if _db_imports is None: 

28 try: 

29 from ...database.models import RateLimitAttempt, RateLimitEstimate 

30 from ...database.session_context import get_user_db_session 

31 

32 _db_imports = { 

33 "RateLimitAttempt": RateLimitAttempt, 

34 "RateLimitEstimate": RateLimitEstimate, 

35 "get_user_db_session": get_user_db_session, 

36 } 

37 except (ImportError, RuntimeError): 

38 # Database not available - programmatic mode 

39 _db_imports = {} 

40 return _db_imports 

41 

42 

43class AdaptiveRateLimitTracker: 

44 """ 

45 Tracks and learns optimal retry wait times for each search engine. 

46 Persists learned patterns to the main application database using SQLAlchemy. 

47 """ 

48 

49 def __init__(self, settings_snapshot=None, programmatic_mode=False): 

50 self.settings_snapshot = settings_snapshot or {} 

51 self.programmatic_mode = programmatic_mode 

52 

53 # Helper function to get settings with defaults 

54 def get_setting_or_default(key, default, type_fn=None): 

55 try: 

56 value = get_setting_from_snapshot( 

57 key, 

58 settings_snapshot=self.settings_snapshot, 

59 ) 

60 return type_fn(value) if type_fn else value 

61 except NoSettingsContextError: 

62 return default 

63 

64 # Get settings with explicit defaults 

65 self.memory_window = get_setting_or_default( 

66 "rate_limiting.memory_window", 100, int 

67 ) 

68 self.exploration_rate = get_setting_or_default( 

69 "rate_limiting.exploration_rate", 0.1, float 

70 ) 

71 self.learning_rate = get_setting_or_default( 

72 "rate_limiting.learning_rate", 0.3, float 

73 ) 

74 self.decay_per_day = get_setting_or_default( 

75 "rate_limiting.decay_per_day", 0.95, float 

76 ) 

77 

78 # In programmatic mode, default to disabled 

79 self.enabled = get_setting_or_default( 

80 "rate_limiting.enabled", 

81 not self.programmatic_mode, # Default based on mode 

82 bool, 

83 ) 

84 

85 profile = get_setting_or_default("rate_limiting.profile", "balanced") 

86 

87 if self.programmatic_mode and self.enabled: 

88 logger.info( 

89 "Rate limiting enabled in programmatic mode - using memory-only tracking without persistence" 

90 ) 

91 

92 # Apply rate limiting profile 

93 self._apply_profile(profile) 

94 

95 # In-memory cache for fast access 

96 self.recent_attempts: Dict[str, deque] = {} 

97 self.current_estimates: Dict[str, Dict[str, float]] = {} 

98 

99 # Initialize the _estimates_loaded flag 

100 self._estimates_loaded = False 

101 

102 # Load estimates from database 

103 self._load_estimates() 

104 

105 logger.info( 

106 f"AdaptiveRateLimitTracker initialized: enabled={self.enabled}, profile={profile}" 

107 ) 

108 

109 def _apply_profile(self, profile: str) -> None: 

110 """Apply rate limiting profile settings.""" 

111 if profile == "conservative": 

112 # More conservative: lower exploration, slower learning 

113 self.exploration_rate = min( 

114 self.exploration_rate * 0.5, 0.05 

115 ) # 5% max exploration 

116 self.learning_rate = min( 

117 self.learning_rate * 0.7, 0.2 

118 ) # Slower learning 

119 logger.info("Applied conservative rate limiting profile") 

120 elif profile == "aggressive": 

121 # More aggressive: higher exploration, faster learning 

122 self.exploration_rate = min( 

123 self.exploration_rate * 1.5, 0.2 

124 ) # Up to 20% exploration 

125 self.learning_rate = min( 

126 self.learning_rate * 1.3, 0.5 

127 ) # Faster learning 

128 logger.info("Applied aggressive rate limiting profile") 

129 else: # balanced 

130 # Use settings as-is 

131 logger.info("Applied balanced rate limiting profile") 

132 

133 def _load_estimates(self) -> None: 

134 """Load estimates from database into memory.""" 

135 # Skip database operations in programmatic mode 

136 if self.programmatic_mode: 

137 logger.debug( 

138 "Skipping rate limit estimate loading in programmatic mode" 

139 ) 

140 self._estimates_loaded = ( 

141 True # Mark as loaded to skip future attempts 

142 ) 

143 return 

144 

145 # Skip database operations in fallback mode 

146 if use_fallback_llm(): 

147 logger.debug( 

148 "Skipping rate limit estimate loading in fallback mode" 

149 ) 

150 return 

151 

152 # During initialization, we don't have user context yet 

153 # Mark that we need to load estimates when user context becomes available 

154 self._estimates_loaded = False 

155 logger.debug( 

156 "Rate limit estimates will be loaded on-demand when user context is available" 

157 ) 

158 

159 def _ensure_estimates_loaded(self) -> None: 

160 """Load estimates from user's encrypted database if not already loaded.""" 

161 # Early return if already loaded or should skip 

162 if self._estimates_loaded or self.programmatic_mode: 

163 if not self._estimates_loaded: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 self._estimates_loaded = True 

165 return 

166 

167 # Get database imports 

168 db_imports = _get_db_imports() 

169 RateLimitEstimate = ( 

170 db_imports.get("RateLimitEstimate") if db_imports else None 

171 ) 

172 

173 if not db_imports or not RateLimitEstimate: 173 ↛ 175line 173 didn't jump to line 175 because the condition on line 173 was never true

174 # Database not available 

175 self._estimates_loaded = True 

176 return 

177 

178 # Try to get research context from search tracker 

179 

180 context = get_search_context() 

181 if not context: 181 ↛ 183line 181 didn't jump to line 183 because the condition on line 181 was never true

182 # No context available (e.g., in tests or programmatic access) 

183 self._estimates_loaded = True 

184 return 

185 

186 username = context.get("username") 

187 password = context.get("user_password") 

188 

189 if username and password: 

190 try: 

191 # Use thread-safe metrics writer to read from user's encrypted database 

192 from ...database.thread_metrics import metrics_writer 

193 

194 # Set password for this thread 

195 metrics_writer.set_user_password(username, password) 

196 

197 with metrics_writer.get_session(username) as session: 

198 estimates = session.query(RateLimitEstimate).all() 

199 

200 for estimate in estimates: 200 ↛ 202line 200 didn't jump to line 202 because the loop on line 200 never started

201 # Apply decay for old estimates 

202 age_hours = (time.time() - estimate.last_updated) / 3600 

203 decay = self.decay_per_day ** (age_hours / 24) 

204 

205 self.current_estimates[estimate.engine_type] = { 

206 "base": estimate.base_wait_seconds, 

207 "min": estimate.min_wait_seconds, 

208 "max": estimate.max_wait_seconds, 

209 "confidence": decay, 

210 } 

211 

212 logger.debug( 

213 f"Loaded estimate for {estimate.engine_type}: base={estimate.base_wait_seconds:.2f}s, confidence={decay:.2f}" 

214 ) 

215 

216 self._estimates_loaded = True 

217 logger.info( 

218 f"Loaded {len(estimates)} rate limit estimates from encrypted database" 

219 ) 

220 

221 except Exception as e: 

222 logger.warning(f"Could not load rate limit estimates: {e}") 

223 # Mark as loaded anyway to avoid repeated attempts 

224 self._estimates_loaded = True 

225 

226 def get_wait_time(self, engine_type: str) -> float: 

227 """ 

228 Get adaptive wait time for a search engine. 

229 Includes exploration to discover better rates. 

230 

231 Args: 

232 engine_type: Name of the search engine 

233 

234 Returns: 

235 Wait time in seconds 

236 """ 

237 # If rate limiting is disabled, return minimal wait time 

238 if not self.enabled: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true

239 return 0.1 

240 

241 # Check if we have a user context - if not, handle appropriately 

242 context = get_search_context() 

243 if not context and not self.programmatic_mode: 243 ↛ 245line 243 didn't jump to line 245 because the condition on line 243 was never true

244 # No context and not in programmatic mode - this is unexpected 

245 logger.warning( 

246 f"No user context available for rate limiting on {engine_type} " 

247 "but programmatic_mode=False. Disabling rate limiting. " 

248 "This may indicate a configuration issue." 

249 ) 

250 return 0.0 

251 

252 # In programmatic mode, we continue with memory-only rate limiting even without context 

253 

254 # Ensure estimates are loaded from database 

255 self._ensure_estimates_loaded() 

256 

257 if engine_type not in self.current_estimates: 

258 # First time seeing this engine - start optimistic and learn from real responses 

259 # Use engine-specific optimistic defaults only for what we know for sure 

260 optimistic_defaults = { 

261 "LocalSearchEngine": 0.0, # No network calls 

262 "SearXNGSearchEngine": 0.1, # Self-hosted default engine 

263 } 

264 

265 wait_time = optimistic_defaults.get( 

266 engine_type, 0.1 

267 ) # Default optimistic for others 

268 logger.info( 

269 f"No rate limit data for {engine_type}, starting optimistic with {wait_time}s" 

270 ) 

271 return wait_time 

272 

273 estimate = self.current_estimates[engine_type] 

274 base_wait = estimate["base"] 

275 

276 # Exploration vs exploitation 

277 if random.random() < self.exploration_rate: 

278 # Explore: try a faster rate to see if API limits have relaxed 

279 wait_time = base_wait * random.uniform(0.5, 0.9) 

280 logger.debug( 

281 f"Exploring faster rate for {engine_type}: {wait_time:.2f}s" 

282 ) 

283 else: 

284 # Exploit: use learned estimate with jitter 

285 wait_time = base_wait * random.uniform(0.9, 1.1) 

286 

287 # Enforce bounds 

288 wait_time = max(estimate["min"], min(wait_time, estimate["max"])) 

289 return wait_time 

290 

291 def apply_rate_limit(self, engine_type: str) -> float: 

292 """ 

293 Apply rate limiting for the given engine type. 

294 This is a convenience method that combines checking if rate limiting 

295 is enabled, getting the wait time, and sleeping if necessary. 

296 

297 Args: 

298 engine_type: The type of search engine 

299 

300 Returns: 

301 The wait time that was applied (0 if rate limiting is disabled) 

302 """ 

303 if not self.enabled: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true

304 return 0.0 

305 

306 wait_time = self.get_wait_time(engine_type) 

307 if wait_time > 0: 307 ↛ 312line 307 didn't jump to line 312 because the condition on line 307 was always true

308 logger.debug( 

309 f"{engine_type} waiting {wait_time:.2f}s before request" 

310 ) 

311 time.sleep(wait_time) 

312 return wait_time 

313 

314 def record_outcome( 

315 self, 

316 engine_type: str, 

317 wait_time: float, 

318 success: bool, 

319 retry_count: int, 

320 error_type: Optional[str] = None, 

321 search_result_count: Optional[int] = None, 

322 ) -> None: 

323 """ 

324 Record the outcome of a retry attempt. 

325 

326 Args: 

327 engine_type: Name of the search engine 

328 wait_time: How long we waited before this attempt 

329 success: Whether the attempt succeeded 

330 retry_count: Which retry attempt this was (1, 2, 3, etc.) 

331 error_type: Type of error if failed 

332 search_result_count: Number of search results returned (for quality monitoring) 

333 """ 

334 # If rate limiting is disabled, don't record outcomes 

335 if not self.enabled: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true

336 logger.info( 

337 f"Rate limiting disabled - not recording outcome for {engine_type}" 

338 ) 

339 return 

340 

341 logger.debug( 

342 f"Recording rate limit outcome for {engine_type}: success={success}, wait_time={wait_time}s" 

343 ) 

344 timestamp = time.time() 

345 

346 # NOTE: Database writes for rate limiting are disabled to prevent 

347 # database locking issues under heavy parallel search load. 

348 # Rate limiting still works via in-memory tracking below. 

349 # Historical rate limit data is not persisted to DB. 

350 

351 # Update in-memory tracking 

352 if engine_type not in self.recent_attempts: 

353 # Get current memory window setting from thread context 

354 settings_context = get_settings_context() 

355 if settings_context: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 current_memory_window = int( 

357 settings_context.get_setting( 

358 "rate_limiting.memory_window", self.memory_window 

359 ) 

360 ) 

361 else: 

362 current_memory_window = self.memory_window 

363 

364 self.recent_attempts[engine_type] = deque( 

365 maxlen=current_memory_window 

366 ) 

367 

368 self.recent_attempts[engine_type].append( 

369 { 

370 "wait_time": wait_time, 

371 "success": success, 

372 "timestamp": timestamp, 

373 "retry_count": retry_count, 

374 "search_result_count": search_result_count, 

375 } 

376 ) 

377 

378 # Update estimates 

379 self._update_estimate(engine_type) 

380 

381 def _update_estimate(self, engine_type: str) -> None: 

382 """Update wait time estimate based on recent attempts.""" 

383 if ( 

384 engine_type not in self.recent_attempts 

385 or len(self.recent_attempts[engine_type]) < 3 

386 ): 

387 logger.info( 

388 f"Not updating estimate for {engine_type} - only {len(self.recent_attempts.get(engine_type, []))} attempts (need 3)" 

389 ) 

390 return 

391 

392 attempts = list(self.recent_attempts[engine_type]) 

393 

394 # Calculate success rate and optimal wait time 

395 successful_waits = [a["wait_time"] for a in attempts if a["success"]] 

396 failed_waits = [a["wait_time"] for a in attempts if not a["success"]] 

397 

398 if not successful_waits: 

399 # All attempts failed - increase wait time with a cap 

400 new_base = max(failed_waits) * 1.5 if failed_waits else 10.0 

401 # Cap the base wait time to prevent runaway growth 

402 new_base = min(new_base, 10.0) # Max 10 seconds base when all fail 

403 else: 

404 # Use 50th percentile (median) of successful waits for more stability 

405 # This provides a balanced approach between speed and reliability 

406 successful_waits.sort() 

407 percentile_50 = successful_waits[ 

408 max(0, int(len(successful_waits) * 0.50) - 1) 

409 ] 

410 new_base = percentile_50 

411 

412 # Update estimate with learning rate (exponential moving average) 

413 if engine_type in self.current_estimates: 

414 old_base = self.current_estimates[engine_type]["base"] 

415 # Get current learning rate from settings context 

416 settings_context = get_settings_context() 

417 if settings_context: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true

418 current_learning_rate = float( 

419 settings_context.get_setting( 

420 "rate_limiting.learning_rate", self.learning_rate 

421 ) 

422 ) 

423 else: 

424 current_learning_rate = self.learning_rate 

425 

426 new_base = ( 

427 1 - current_learning_rate 

428 ) * old_base + current_learning_rate * new_base 

429 

430 # Apply absolute cap to prevent extreme wait times 

431 new_base = min(new_base, 10.0) # Cap base at 10 seconds 

432 

433 # Calculate bounds with more reasonable limits 

434 min_wait = max(0.01, new_base * 0.5) 

435 max_wait = min(10.0, new_base * 3.0) # Max 10 seconds absolute cap 

436 

437 # Update in memory 

438 self.current_estimates[engine_type] = { 

439 "base": new_base, 

440 "min": min_wait, 

441 "max": max_wait, 

442 "confidence": min(len(attempts) / 20.0, 1.0), 

443 } 

444 

445 # Persist to database (skip in fallback mode) 

446 success_rate = len(successful_waits) / len(attempts) if attempts else 0 

447 

448 # Skip database operations in programmatic mode 

449 if self.programmatic_mode: 

450 logger.debug( 

451 f"Skipping estimate persistence in programmatic mode for {engine_type}" 

452 ) 

453 elif not use_fallback_llm(): 453 ↛ 512line 453 didn't jump to line 512 because the condition on line 453 was always true

454 # Try to get research context from search tracker 

455 

456 context = get_search_context() 

457 username = None 

458 password = None 

459 if context is not None: 459 ↛ 463line 459 didn't jump to line 463 because the condition on line 459 was always true

460 username = context.get("username") 

461 password = context.get("user_password") 

462 

463 if username and password: 463 ↛ 464line 463 didn't jump to line 464 because the condition on line 463 was never true

464 try: 

465 # Use thread-safe metrics writer to save to user's encrypted database 

466 from ...database.thread_metrics import metrics_writer 

467 

468 # Set password for this thread if not already set 

469 metrics_writer.set_user_password(username, password) 

470 

471 db_imports = _get_db_imports() 

472 RateLimitEstimate = db_imports.get("RateLimitEstimate") 

473 

474 with metrics_writer.get_session(username) as session: 

475 # Check if estimate exists 

476 estimate = ( 

477 session.query(RateLimitEstimate) 

478 .filter_by(engine_type=engine_type) 

479 .first() 

480 ) 

481 

482 if estimate: 

483 # Update existing estimate 

484 estimate.base_wait_seconds = new_base 

485 estimate.min_wait_seconds = min_wait 

486 estimate.max_wait_seconds = max_wait 

487 estimate.last_updated = time.time() 

488 estimate.total_attempts = len(attempts) 

489 estimate.success_rate = success_rate 

490 else: 

491 # Create new estimate 

492 estimate = RateLimitEstimate( 

493 engine_type=engine_type, 

494 base_wait_seconds=new_base, 

495 min_wait_seconds=min_wait, 

496 max_wait_seconds=max_wait, 

497 last_updated=time.time(), 

498 total_attempts=len(attempts), 

499 success_rate=success_rate, 

500 ) 

501 session.add(estimate) 

502 

503 except Exception as e: 

504 logger.exception( 

505 f"Failed to persist rate limit estimate: {e}" 

506 ) 

507 else: 

508 logger.debug( 

509 "Skipping rate limit estimate save - no user context" 

510 ) 

511 

512 logger.info( 

513 f"Updated rate limit for {engine_type}: {new_base:.2f}s " 

514 f"(success rate: {success_rate:.1%})" 

515 ) 

516 

517 def get_stats( 

518 self, engine_type: Optional[str] = None 

519 ) -> List[Tuple[str, float, float, float, float, int, float]]: 

520 """ 

521 Get statistics for monitoring. 

522 

523 Args: 

524 engine_type: Specific engine to get stats for, or None for all 

525 

526 Returns: 

527 List of tuples with engine statistics 

528 """ 

529 # Skip database operations in test/fallback mode 

530 if use_fallback_llm() or is_ci_environment(): 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true

531 logger.debug("Skipping database stats in test/CI mode") 

532 # Return stats from in-memory estimates 

533 stats = [] 

534 engines_to_check = ( 

535 [engine_type] 

536 if engine_type 

537 else list(self.current_estimates.keys()) 

538 ) 

539 for engine in engines_to_check: 

540 if engine in self.current_estimates: 

541 est = self.current_estimates[engine] 

542 stats.append( 

543 ( 

544 engine, 

545 est["base"], 

546 est["min"], 

547 est["max"], 

548 time.time(), 

549 len(self.recent_attempts.get(engine, [])), 

550 est.get("confidence", 0.0), 

551 ) 

552 ) 

553 return stats 

554 

555 # Skip database operations in programmatic mode 

556 if self.programmatic_mode: 556 ↛ 557line 556 didn't jump to line 557 because the condition on line 556 was never true

557 return stats 

558 

559 try: 

560 db_imports = _get_db_imports() 

561 get_user_db_session = db_imports.get("get_user_db_session") 

562 RateLimitEstimate = db_imports.get("RateLimitEstimate") 

563 

564 with get_user_db_session() as session: 

565 if engine_type: 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true

566 estimates = ( 

567 session.query(RateLimitEstimate) 

568 .filter_by(engine_type=engine_type) 

569 .all() 

570 ) 

571 else: 

572 estimates = ( 

573 session.query(RateLimitEstimate) 

574 .order_by(RateLimitEstimate.engine_type) 

575 .all() 

576 ) 

577 

578 return [ 

579 ( 

580 est.engine_type, 

581 est.base_wait_seconds, 

582 est.min_wait_seconds, 

583 est.max_wait_seconds, 

584 est.last_updated, 

585 est.total_attempts, 

586 est.success_rate, 

587 ) 

588 for est in estimates 

589 ] 

590 except Exception as e: 

591 logger.warning(f"Failed to get rate limit stats from DB: {e}") 

592 # Return in-memory stats as fallback 

593 return self.get_stats(engine_type) 

594 

595 def reset_engine(self, engine_type: str) -> None: 

596 """ 

597 Reset learned values for a specific engine. 

598 

599 Args: 

600 engine_type: Engine to reset 

601 """ 

602 # Always clear from memory first 

603 if engine_type in self.recent_attempts: 

604 del self.recent_attempts[engine_type] 

605 if engine_type in self.current_estimates: 

606 del self.current_estimates[engine_type] 

607 

608 # Skip database operations in programmatic mode 

609 if self.programmatic_mode: 

610 logger.debug( 

611 f"Reset rate limit data for {engine_type} (memory only in programmatic mode)" 

612 ) 

613 return 

614 

615 # Skip database operations in test/fallback mode 

616 if use_fallback_llm() or is_ci_environment(): 616 ↛ 617line 616 didn't jump to line 617 because the condition on line 616 was never true

617 logger.debug( 

618 f"Reset rate limit data for {engine_type} (memory only in test/CI mode)" 

619 ) 

620 return 

621 

622 try: 

623 db_imports = _get_db_imports() 

624 get_user_db_session = db_imports.get("get_user_db_session") 

625 RateLimitAttempt = db_imports.get("RateLimitAttempt") 

626 RateLimitEstimate = db_imports.get("RateLimitEstimate") 

627 

628 with get_user_db_session() as session: 

629 # Delete historical attempts 

630 session.query(RateLimitAttempt).filter_by( 

631 engine_type=engine_type 

632 ).delete() 

633 

634 # Delete estimates 

635 session.query(RateLimitEstimate).filter_by( 

636 engine_type=engine_type 

637 ).delete() 

638 

639 session.commit() 

640 

641 logger.info(f"Reset rate limit data for {engine_type}") 

642 

643 except Exception as e: 

644 logger.warning( 

645 f"Failed to reset rate limit data in database for {engine_type}: {e}. " 

646 "In-memory data was cleared successfully." 

647 ) 

648 # Don't re-raise in test contexts - the memory cleanup is sufficient 

649 

650 def get_search_quality_stats( 

651 self, engine_type: Optional[str] = None 

652 ) -> List[Dict]: 

653 """ 

654 Get basic search quality statistics for monitoring. 

655 

656 Args: 

657 engine_type: Specific engine to get stats for, or None for all 

658 

659 Returns: 

660 List of dictionaries with search quality metrics 

661 """ 

662 stats = [] 

663 

664 engines_to_check = ( 

665 [engine_type] if engine_type else list(self.recent_attempts.keys()) 

666 ) 

667 

668 for engine in engines_to_check: 668 ↛ 669line 668 didn't jump to line 669 because the loop on line 668 never started

669 if engine not in self.recent_attempts: 

670 continue 

671 

672 recent = list(self.recent_attempts[engine]) 

673 search_counts = [ 

674 attempt.get("search_result_count", 0) 

675 for attempt in recent 

676 if attempt.get("search_result_count") is not None 

677 ] 

678 

679 if not search_counts: 

680 continue 

681 

682 recent_avg = sum(search_counts) / len(search_counts) 

683 

684 stats.append( 

685 { 

686 "engine_type": engine, 

687 "recent_avg_results": recent_avg, 

688 "min_recent_results": min(search_counts), 

689 "max_recent_results": max(search_counts), 

690 "sample_size": len(search_counts), 

691 "total_attempts": len(recent), 

692 "status": self._get_quality_status(recent_avg), 

693 } 

694 ) 

695 

696 return stats 

697 

698 def _get_quality_status(self, recent_avg: float) -> str: 

699 """Get quality status string based on average results.""" 

700 if recent_avg < 1: 

701 return "CRITICAL" 

702 elif recent_avg < 3: 

703 return "WARNING" 

704 elif recent_avg < 5: 

705 return "CAUTION" 

706 elif recent_avg >= 10: 

707 return "EXCELLENT" 

708 else: 

709 return "GOOD" 

710 

711 def cleanup_old_data(self, days: int = 30) -> None: 

712 """ 

713 Remove old retry attempt data to prevent database bloat. 

714 

715 Args: 

716 days: Remove data older than this many days 

717 """ 

718 cutoff_time = time.time() - (days * 24 * 3600) 

719 

720 # Skip database operations in programmatic mode 

721 if self.programmatic_mode: 721 ↛ 722line 721 didn't jump to line 722 because the condition on line 721 was never true

722 logger.debug("Skipping database cleanup in programmatic mode") 

723 return 

724 

725 # Skip database operations in test/fallback mode 

726 if use_fallback_llm() or is_ci_environment(): 726 ↛ 727line 726 didn't jump to line 727 because the condition on line 726 was never true

727 logger.debug("Skipping database cleanup in test/CI mode") 

728 return 

729 

730 try: 

731 db_imports = _get_db_imports() 

732 get_user_db_session = db_imports.get("get_user_db_session") 

733 RateLimitAttempt = db_imports.get("RateLimitAttempt") 

734 

735 with get_user_db_session() as session: 

736 # Count and delete old attempts 

737 old_attempts = session.query(RateLimitAttempt).filter( 

738 RateLimitAttempt.timestamp < cutoff_time 

739 ) 

740 deleted_count = old_attempts.count() 

741 old_attempts.delete() 

742 

743 session.commit() 

744 

745 if deleted_count > 0: 745 ↛ 746line 745 didn't jump to line 746 because the condition on line 745 was never true

746 logger.info(f"Cleaned up {deleted_count} old retry attempts") 

747 

748 except Exception as e: 

749 logger.warning(f"Failed to cleanup old rate limit data: {e}") 

750 

751 

752# Create a singleton instance 

753_tracker_instance: Optional[AdaptiveRateLimitTracker] = None 

754 

755 

756def get_tracker() -> AdaptiveRateLimitTracker: 

757 """Get the global rate limit tracker instance.""" 

758 global _tracker_instance 

759 if _tracker_instance is None: 

760 _tracker_instance = AdaptiveRateLimitTracker() 

761 return _tracker_instance