Coverage for src / local_deep_research / web_search_engines / rate_limiting / tracker.py: 62%

290 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Adaptive rate limit tracker that learns optimal retry wait times for each search engine. 

3""" 

4 

5import random 

6import time 

7from collections import deque 

8from typing import Dict, List, Optional, Tuple 

9 

10from loguru import logger 

11 

12from ...settings.env_registry import use_fallback_llm, is_ci_environment 

13from ...utilities.thread_context import get_search_context 

14from ...config.thread_settings import ( 

15 get_settings_context, 

16 get_setting_from_snapshot, 

17 NoSettingsContextError, 

18) 

19 

20# Lazy imports to avoid database initialization in programmatic mode 

21_db_imports = None 

22 

23 

24def _get_db_imports(): 

25 """Lazy load database imports only when needed.""" 

26 global _db_imports 

27 if _db_imports is None: 

28 try: 

29 from ...database.models import RateLimitAttempt, RateLimitEstimate 

30 from ...database.session_context import get_user_db_session 

31 

32 _db_imports = { 

33 "RateLimitAttempt": RateLimitAttempt, 

34 "RateLimitEstimate": RateLimitEstimate, 

35 "get_user_db_session": get_user_db_session, 

36 } 

37 except (ImportError, RuntimeError): 

38 # Database not available - programmatic mode 

39 _db_imports = {} 

40 return _db_imports 

41 

42 

43class AdaptiveRateLimitTracker: 

44 """ 

45 Tracks and learns optimal retry wait times for each search engine. 

46 Persists learned patterns to the main application database using SQLAlchemy. 

47 """ 

48 

49 def __init__(self, settings_snapshot=None, programmatic_mode=False): 

50 self.settings_snapshot = settings_snapshot or {} 

51 self.programmatic_mode = programmatic_mode 

52 

53 # Helper function to get settings with defaults 

54 def get_setting_or_default(key, default, type_fn=None): 

55 try: 

56 value = get_setting_from_snapshot( 

57 key, 

58 settings_snapshot=self.settings_snapshot, 

59 ) 

60 # Handle None values - return default instead of calling type_fn(None) 

61 if value is None: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 return default 

63 return type_fn(value) if type_fn else value 

64 except NoSettingsContextError: 

65 return default 

66 

67 # Get settings with explicit defaults 

68 self.memory_window = get_setting_or_default( 

69 "rate_limiting.memory_window", 100, int 

70 ) 

71 self.exploration_rate = get_setting_or_default( 

72 "rate_limiting.exploration_rate", 0.1, float 

73 ) 

74 self.learning_rate = get_setting_or_default( 

75 "rate_limiting.learning_rate", 0.3, float 

76 ) 

77 self.decay_per_day = get_setting_or_default( 

78 "rate_limiting.decay_per_day", 0.95, float 

79 ) 

80 

81 # In programmatic mode, default to disabled 

82 self.enabled = get_setting_or_default( 

83 "rate_limiting.enabled", 

84 not self.programmatic_mode, # Default based on mode 

85 bool, 

86 ) 

87 

88 profile = get_setting_or_default("rate_limiting.profile", "balanced") 

89 

90 if self.programmatic_mode and self.enabled: 

91 logger.info( 

92 "Rate limiting enabled in programmatic mode - using memory-only tracking without persistence" 

93 ) 

94 

95 # Apply rate limiting profile 

96 self._apply_profile(profile) 

97 

98 # In-memory cache for fast access 

99 self.recent_attempts: Dict[str, deque] = {} 

100 self.current_estimates: Dict[str, Dict[str, float]] = {} 

101 

102 # Initialize the _estimates_loaded flag 

103 self._estimates_loaded = False 

104 

105 # Load estimates from database 

106 self._load_estimates() 

107 

108 logger.info( 

109 f"AdaptiveRateLimitTracker initialized: enabled={self.enabled}, profile={profile}" 

110 ) 

111 

112 def _apply_profile(self, profile: str) -> None: 

113 """Apply rate limiting profile settings.""" 

114 if profile == "conservative": 

115 # More conservative: lower exploration, slower learning 

116 self.exploration_rate = min( 

117 self.exploration_rate * 0.5, 0.05 

118 ) # 5% max exploration 

119 self.learning_rate = min( 

120 self.learning_rate * 0.7, 0.2 

121 ) # Slower learning 

122 logger.info("Applied conservative rate limiting profile") 

123 elif profile == "aggressive": 

124 # More aggressive: higher exploration, faster learning 

125 self.exploration_rate = min( 

126 self.exploration_rate * 1.5, 0.2 

127 ) # Up to 20% exploration 

128 self.learning_rate = min( 

129 self.learning_rate * 1.3, 0.5 

130 ) # Faster learning 

131 logger.info("Applied aggressive rate limiting profile") 

132 else: # balanced 

133 # Use settings as-is 

134 logger.info("Applied balanced rate limiting profile") 

135 

136 def _load_estimates(self) -> None: 

137 """Load estimates from database into memory.""" 

138 # Skip database operations in programmatic mode 

139 if self.programmatic_mode: 

140 logger.debug( 

141 "Skipping rate limit estimate loading in programmatic mode" 

142 ) 

143 self._estimates_loaded = ( 

144 True # Mark as loaded to skip future attempts 

145 ) 

146 return 

147 

148 # Skip database operations in fallback mode 

149 if use_fallback_llm(): 

150 logger.debug( 

151 "Skipping rate limit estimate loading in fallback mode" 

152 ) 

153 return 

154 

155 # During initialization, we don't have user context yet 

156 # Mark that we need to load estimates when user context becomes available 

157 self._estimates_loaded = False 

158 logger.debug( 

159 "Rate limit estimates will be loaded on-demand when user context is available" 

160 ) 

161 

162 def _ensure_estimates_loaded(self) -> None: 

163 """Load estimates from user's encrypted database if not already loaded.""" 

164 # Early return if already loaded or should skip 

165 if self._estimates_loaded or self.programmatic_mode: 165 ↛ 171line 165 didn't jump to line 171 because the condition on line 165 was always true

166 if not self._estimates_loaded: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true

167 self._estimates_loaded = True 

168 return 

169 

170 # Get database imports 

171 db_imports = _get_db_imports() 

172 RateLimitEstimate = ( 

173 db_imports.get("RateLimitEstimate") if db_imports else None 

174 ) 

175 

176 if not db_imports or not RateLimitEstimate: 

177 # Database not available 

178 self._estimates_loaded = True 

179 return 

180 

181 # Try to get research context from search tracker 

182 

183 context = get_search_context() 

184 if not context: 

185 # No context available (e.g., in tests or programmatic access) 

186 self._estimates_loaded = True 

187 return 

188 

189 username = context.get("username") 

190 password = context.get("user_password") 

191 

192 if username and password: 

193 try: 

194 # Use thread-safe metrics writer to read from user's encrypted database 

195 from ...database.thread_metrics import metrics_writer 

196 

197 # Set password for this thread 

198 metrics_writer.set_user_password(username, password) 

199 

200 with metrics_writer.get_session(username) as session: 

201 estimates = session.query(RateLimitEstimate).all() 

202 

203 for estimate in estimates: 

204 # Apply decay for old estimates 

205 age_hours = (time.time() - estimate.last_updated) / 3600 

206 decay = self.decay_per_day ** (age_hours / 24) 

207 

208 self.current_estimates[estimate.engine_type] = { 

209 "base": estimate.base_wait_seconds, 

210 "min": estimate.min_wait_seconds, 

211 "max": estimate.max_wait_seconds, 

212 "confidence": decay, 

213 } 

214 

215 logger.debug( 

216 f"Loaded estimate for {estimate.engine_type}: base={estimate.base_wait_seconds:.2f}s, confidence={decay:.2f}" 

217 ) 

218 

219 self._estimates_loaded = True 

220 logger.info( 

221 f"Loaded {len(estimates)} rate limit estimates from encrypted database" 

222 ) 

223 

224 except Exception as e: 

225 logger.warning(f"Could not load rate limit estimates: {e}") 

226 # Mark as loaded anyway to avoid repeated attempts 

227 self._estimates_loaded = True 

228 

229 def get_wait_time(self, engine_type: str) -> float: 

230 """ 

231 Get adaptive wait time for a search engine. 

232 Includes exploration to discover better rates. 

233 

234 Args: 

235 engine_type: Name of the search engine 

236 

237 Returns: 

238 Wait time in seconds 

239 """ 

240 # If rate limiting is disabled, return minimal wait time 

241 if not self.enabled: 

242 return 0.1 

243 

244 # Check if we have a user context - if not, handle appropriately 

245 context = get_search_context() 

246 if not context and not self.programmatic_mode: 

247 # No context and not in programmatic mode - this is unexpected 

248 logger.warning( 

249 f"No user context available for rate limiting on {engine_type} " 

250 "but programmatic_mode=False. Disabling rate limiting. " 

251 "This may indicate a configuration issue." 

252 ) 

253 return 0.0 

254 

255 # In programmatic mode, we continue with memory-only rate limiting even without context 

256 

257 # Ensure estimates are loaded from database 

258 self._ensure_estimates_loaded() 

259 

260 if engine_type not in self.current_estimates: 

261 # First time seeing this engine - start optimistic and learn from real responses 

262 # Use engine-specific optimistic defaults only for what we know for sure 

263 optimistic_defaults = { 

264 "LocalSearchEngine": 0.0, # No network calls 

265 "SearXNGSearchEngine": 0.1, # Self-hosted default engine 

266 } 

267 

268 wait_time = optimistic_defaults.get( 

269 engine_type, 0.1 

270 ) # Default optimistic for others 

271 logger.info( 

272 f"No rate limit data for {engine_type}, starting optimistic with {wait_time}s" 

273 ) 

274 return wait_time 

275 

276 estimate = self.current_estimates[engine_type] 

277 base_wait = estimate["base"] 

278 

279 # Security: random used for non-security rate-limit jitter, not tokens or secrets 

280 # Exploration vs exploitation 

281 if random.random() < self.exploration_rate: 

282 # Explore: try a faster rate to see if API limits have relaxed 

283 wait_time = base_wait * random.uniform(0.5, 0.9) 

284 logger.debug( 

285 f"Exploring faster rate for {engine_type}: {wait_time:.2f}s" 

286 ) 

287 else: 

288 # Exploit: use learned estimate with jitter 

289 wait_time = base_wait * random.uniform(0.9, 1.1) 

290 

291 # Enforce bounds 

292 wait_time = max(estimate["min"], min(wait_time, estimate["max"])) 

293 return wait_time 

294 

295 def apply_rate_limit(self, engine_type: str) -> float: 

296 """ 

297 Apply rate limiting for the given engine type. 

298 This is a convenience method that combines checking if rate limiting 

299 is enabled, getting the wait time, and sleeping if necessary. 

300 

301 Args: 

302 engine_type: The type of search engine 

303 

304 Returns: 

305 The wait time that was applied (0 if rate limiting is disabled) 

306 """ 

307 if not self.enabled: 

308 return 0.0 

309 

310 wait_time = self.get_wait_time(engine_type) 

311 if wait_time > 0: 

312 logger.debug( 

313 f"{engine_type} waiting {wait_time:.2f}s before request" 

314 ) 

315 time.sleep(wait_time) 

316 return wait_time 

317 

318 def record_outcome( 

319 self, 

320 engine_type: str, 

321 wait_time: float, 

322 success: bool, 

323 retry_count: int, 

324 error_type: Optional[str] = None, 

325 search_result_count: Optional[int] = None, 

326 ) -> None: 

327 """ 

328 Record the outcome of a retry attempt. 

329 

330 Args: 

331 engine_type: Name of the search engine 

332 wait_time: How long we waited before this attempt 

333 success: Whether the attempt succeeded 

334 retry_count: Which retry attempt this was (1, 2, 3, etc.) 

335 error_type: Type of error if failed 

336 search_result_count: Number of search results returned (for quality monitoring) 

337 """ 

338 # If rate limiting is disabled, don't record outcomes 

339 if not self.enabled: 

340 logger.info( 

341 f"Rate limiting disabled - not recording outcome for {engine_type}" 

342 ) 

343 return 

344 

345 logger.debug( 

346 f"Recording rate limit outcome for {engine_type}: success={success}, wait_time={wait_time}s" 

347 ) 

348 timestamp = time.time() 

349 

350 # NOTE: Database writes for rate limiting are disabled to prevent 

351 # database locking issues under heavy parallel search load. 

352 # Rate limiting still works via in-memory tracking below. 

353 # Historical rate limit data is not persisted to DB. 

354 

355 # Update in-memory tracking 

356 if engine_type not in self.recent_attempts: 

357 # Get current memory window setting from thread context 

358 settings_context = get_settings_context() 

359 if settings_context: 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true

360 current_memory_window = int( 

361 settings_context.get_setting( 

362 "rate_limiting.memory_window", self.memory_window 

363 ) 

364 ) 

365 else: 

366 current_memory_window = self.memory_window 

367 

368 self.recent_attempts[engine_type] = deque( 

369 maxlen=current_memory_window 

370 ) 

371 

372 self.recent_attempts[engine_type].append( 

373 { 

374 "wait_time": wait_time, 

375 "success": success, 

376 "timestamp": timestamp, 

377 "retry_count": retry_count, 

378 "search_result_count": search_result_count, 

379 } 

380 ) 

381 

382 # Update estimates 

383 self._update_estimate(engine_type) 

384 

385 def _update_estimate(self, engine_type: str) -> None: 

386 """Update wait time estimate based on recent attempts.""" 

387 if ( 

388 engine_type not in self.recent_attempts 

389 or len(self.recent_attempts[engine_type]) < 3 

390 ): 

391 logger.info( 

392 f"Not updating estimate for {engine_type} - only {len(self.recent_attempts.get(engine_type, []))} attempts (need 3)" 

393 ) 

394 return 

395 

396 attempts = list(self.recent_attempts[engine_type]) 

397 

398 # Calculate success rate and optimal wait time 

399 successful_waits = [a["wait_time"] for a in attempts if a["success"]] 

400 failed_waits = [a["wait_time"] for a in attempts if not a["success"]] 

401 

402 if not successful_waits: 

403 # All attempts failed - increase wait time with a cap 

404 new_base = max(failed_waits) * 1.5 if failed_waits else 10.0 

405 # Cap the base wait time to prevent runaway growth 

406 new_base = min(new_base, 10.0) # Max 10 seconds base when all fail 

407 else: 

408 # Use 50th percentile (median) of successful waits for more stability 

409 # This provides a balanced approach between speed and reliability 

410 successful_waits.sort() 

411 percentile_50 = successful_waits[ 

412 max(0, int(len(successful_waits) * 0.50) - 1) 

413 ] 

414 new_base = percentile_50 

415 

416 # Update estimate with learning rate (exponential moving average) 

417 if engine_type in self.current_estimates: 

418 old_base = self.current_estimates[engine_type]["base"] 

419 # Get current learning rate from settings context 

420 settings_context = get_settings_context() 

421 if settings_context: 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true

422 current_learning_rate = float( 

423 settings_context.get_setting( 

424 "rate_limiting.learning_rate", self.learning_rate 

425 ) 

426 ) 

427 else: 

428 current_learning_rate = self.learning_rate 

429 

430 new_base = ( 

431 1 - current_learning_rate 

432 ) * old_base + current_learning_rate * new_base 

433 

434 # Apply absolute cap to prevent extreme wait times 

435 new_base = min(new_base, 10.0) # Cap base at 10 seconds 

436 

437 # Calculate bounds with more reasonable limits 

438 min_wait = max(0.01, new_base * 0.5) 

439 max_wait = min(10.0, new_base * 3.0) # Max 10 seconds absolute cap 

440 

441 # Update in memory 

442 self.current_estimates[engine_type] = { 

443 "base": new_base, 

444 "min": min_wait, 

445 "max": max_wait, 

446 "confidence": min(len(attempts) / 20.0, 1.0), 

447 } 

448 

449 # Persist to database (skip in fallback mode) 

450 success_rate = len(successful_waits) / len(attempts) if attempts else 0 

451 

452 # Skip database operations in programmatic mode 

453 if self.programmatic_mode: 453 ↛ 457line 453 didn't jump to line 457 because the condition on line 453 was always true

454 logger.debug( 

455 f"Skipping estimate persistence in programmatic mode for {engine_type}" 

456 ) 

457 elif not use_fallback_llm(): 

458 # Try to get research context from search tracker 

459 

460 context = get_search_context() 

461 username = None 

462 password = None 

463 if context is not None: 

464 username = context.get("username") 

465 password = context.get("user_password") 

466 

467 if username and password: 

468 try: 

469 # Use thread-safe metrics writer to save to user's encrypted database 

470 from ...database.thread_metrics import metrics_writer 

471 

472 # Set password for this thread if not already set 

473 metrics_writer.set_user_password(username, password) 

474 

475 db_imports = _get_db_imports() 

476 RateLimitEstimate = db_imports.get("RateLimitEstimate") 

477 

478 with metrics_writer.get_session(username) as session: 

479 # Check if estimate exists 

480 estimate = ( 

481 session.query(RateLimitEstimate) 

482 .filter_by(engine_type=engine_type) 

483 .first() 

484 ) 

485 

486 if estimate: 

487 # Update existing estimate 

488 estimate.base_wait_seconds = new_base 

489 estimate.min_wait_seconds = min_wait 

490 estimate.max_wait_seconds = max_wait 

491 estimate.last_updated = time.time() 

492 estimate.total_attempts = len(attempts) 

493 estimate.success_rate = success_rate 

494 else: 

495 # Create new estimate 

496 estimate = RateLimitEstimate( 

497 engine_type=engine_type, 

498 base_wait_seconds=new_base, 

499 min_wait_seconds=min_wait, 

500 max_wait_seconds=max_wait, 

501 last_updated=time.time(), 

502 total_attempts=len(attempts), 

503 success_rate=success_rate, 

504 ) 

505 session.add(estimate) 

506 

507 except Exception: 

508 logger.exception("Failed to persist rate limit estimate") 

509 else: 

510 logger.debug( 

511 "Skipping rate limit estimate save - no user context" 

512 ) 

513 

514 logger.info( 

515 f"Updated rate limit for {engine_type}: {new_base:.2f}s " 

516 f"(success rate: {success_rate:.1%})" 

517 ) 

518 

519 def get_stats( 

520 self, engine_type: Optional[str] = None 

521 ) -> List[Tuple[str, float, float, float, float, int, float]]: 

522 """ 

523 Get statistics for monitoring. 

524 

525 Args: 

526 engine_type: Specific engine to get stats for, or None for all 

527 

528 Returns: 

529 List of tuples with engine statistics 

530 """ 

531 # Skip database operations in test/fallback mode 

532 if use_fallback_llm() or is_ci_environment(): 532 ↛ 558line 532 didn't jump to line 558 because the condition on line 532 was always true

533 logger.debug("Skipping database stats in test/CI mode") 

534 # Return stats from in-memory estimates 

535 stats = [] 

536 engines_to_check = ( 

537 [engine_type] 

538 if engine_type 

539 else list(self.current_estimates.keys()) 

540 ) 

541 for engine in engines_to_check: 

542 if engine in self.current_estimates: 542 ↛ 541line 542 didn't jump to line 541 because the condition on line 542 was always true

543 est = self.current_estimates[engine] 

544 stats.append( 

545 ( 

546 engine, 

547 est["base"], 

548 est["min"], 

549 est["max"], 

550 time.time(), 

551 len(self.recent_attempts.get(engine, [])), 

552 est.get("confidence", 0.0), 

553 ) 

554 ) 

555 return stats 

556 

557 # Skip database operations in programmatic mode 

558 if self.programmatic_mode: 

559 return stats 

560 

561 try: 

562 db_imports = _get_db_imports() 

563 get_user_db_session = db_imports.get("get_user_db_session") 

564 RateLimitEstimate = db_imports.get("RateLimitEstimate") 

565 

566 with get_user_db_session() as session: 

567 if engine_type: 

568 estimates = ( 

569 session.query(RateLimitEstimate) 

570 .filter_by(engine_type=engine_type) 

571 .all() 

572 ) 

573 else: 

574 estimates = ( 

575 session.query(RateLimitEstimate) 

576 .order_by(RateLimitEstimate.engine_type) 

577 .all() 

578 ) 

579 

580 return [ 

581 ( 

582 est.engine_type, 

583 est.base_wait_seconds, 

584 est.min_wait_seconds, 

585 est.max_wait_seconds, 

586 est.last_updated, 

587 est.total_attempts, 

588 est.success_rate, 

589 ) 

590 for est in estimates 

591 ] 

592 except Exception as e: 

593 logger.warning(f"Failed to get rate limit stats from DB: {e}") 

594 # Return in-memory stats as fallback 

595 return self.get_stats(engine_type) 

596 

597 def reset_engine(self, engine_type: str) -> None: 

598 """ 

599 Reset learned values for a specific engine. 

600 

601 Args: 

602 engine_type: Engine to reset 

603 """ 

604 # Always clear from memory first 

605 if engine_type in self.recent_attempts: 

606 del self.recent_attempts[engine_type] 

607 if engine_type in self.current_estimates: 

608 del self.current_estimates[engine_type] 

609 

610 # Skip database operations in programmatic mode 

611 if self.programmatic_mode: 

612 logger.debug( 

613 f"Reset rate limit data for {engine_type} (memory only in programmatic mode)" 

614 ) 

615 return 

616 

617 # Skip database operations in test/fallback mode 

618 if use_fallback_llm() or is_ci_environment(): 618 ↛ 624line 618 didn't jump to line 624 because the condition on line 618 was always true

619 logger.debug( 

620 f"Reset rate limit data for {engine_type} (memory only in test/CI mode)" 

621 ) 

622 return 

623 

624 try: 

625 db_imports = _get_db_imports() 

626 get_user_db_session = db_imports.get("get_user_db_session") 

627 RateLimitAttempt = db_imports.get("RateLimitAttempt") 

628 RateLimitEstimate = db_imports.get("RateLimitEstimate") 

629 

630 with get_user_db_session() as session: 

631 # Delete historical attempts 

632 session.query(RateLimitAttempt).filter_by( 

633 engine_type=engine_type 

634 ).delete() 

635 

636 # Delete estimates 

637 session.query(RateLimitEstimate).filter_by( 

638 engine_type=engine_type 

639 ).delete() 

640 

641 session.commit() 

642 

643 logger.info(f"Reset rate limit data for {engine_type}") 

644 

645 except Exception as e: 

646 logger.warning( 

647 f"Failed to reset rate limit data in database for {engine_type}: {e}. " 

648 "In-memory data was cleared successfully." 

649 ) 

650 # Don't re-raise in test contexts - the memory cleanup is sufficient 

651 

652 def get_search_quality_stats( 

653 self, engine_type: Optional[str] = None 

654 ) -> List[Dict]: 

655 """ 

656 Get basic search quality statistics for monitoring. 

657 

658 Args: 

659 engine_type: Specific engine to get stats for, or None for all 

660 

661 Returns: 

662 List of dictionaries with search quality metrics 

663 """ 

664 stats = [] 

665 

666 engines_to_check = ( 

667 [engine_type] if engine_type else list(self.recent_attempts.keys()) 

668 ) 

669 

670 for engine in engines_to_check: 670 ↛ 671line 670 didn't jump to line 671 because the loop on line 670 never started

671 if engine not in self.recent_attempts: 

672 continue 

673 

674 recent = list(self.recent_attempts[engine]) 

675 search_counts = [ 

676 attempt.get("search_result_count", 0) 

677 for attempt in recent 

678 if attempt.get("search_result_count") is not None 

679 ] 

680 

681 if not search_counts: 

682 continue 

683 

684 recent_avg = sum(search_counts) / len(search_counts) 

685 

686 stats.append( 

687 { 

688 "engine_type": engine, 

689 "recent_avg_results": recent_avg, 

690 "min_recent_results": min(search_counts), 

691 "max_recent_results": max(search_counts), 

692 "sample_size": len(search_counts), 

693 "total_attempts": len(recent), 

694 "status": self._get_quality_status(recent_avg), 

695 } 

696 ) 

697 

698 return stats 

699 

700 def _get_quality_status(self, recent_avg: float) -> str: 

701 """Get quality status string based on average results.""" 

702 if recent_avg < 1: 

703 return "CRITICAL" 

704 elif recent_avg < 3: 

705 return "WARNING" 

706 elif recent_avg < 5: 

707 return "CAUTION" 

708 elif recent_avg >= 10: 

709 return "EXCELLENT" 

710 else: 

711 return "GOOD" 

712 

713 def cleanup_old_data(self, days: int = 30) -> None: 

714 """ 

715 Remove old retry attempt data to prevent database bloat. 

716 

717 Args: 

718 days: Remove data older than this many days 

719 """ 

720 cutoff_time = time.time() - (days * 24 * 3600) 

721 

722 # Skip database operations in programmatic mode 

723 if self.programmatic_mode: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true

724 logger.debug("Skipping database cleanup in programmatic mode") 

725 return 

726 

727 # Skip database operations in test/fallback mode 

728 if use_fallback_llm() or is_ci_environment(): 728 ↛ 732line 728 didn't jump to line 732 because the condition on line 728 was always true

729 logger.debug("Skipping database cleanup in test/CI mode") 

730 return 

731 

732 try: 

733 db_imports = _get_db_imports() 

734 get_user_db_session = db_imports.get("get_user_db_session") 

735 RateLimitAttempt = db_imports.get("RateLimitAttempt") 

736 

737 with get_user_db_session() as session: 

738 # Count and delete old attempts 

739 old_attempts = session.query(RateLimitAttempt).filter( 

740 RateLimitAttempt.timestamp < cutoff_time 

741 ) 

742 deleted_count = old_attempts.count() 

743 old_attempts.delete() 

744 

745 session.commit() 

746 

747 if deleted_count > 0: 

748 logger.info(f"Cleaned up {deleted_count} old retry attempts") 

749 

750 except Exception as e: 

751 logger.warning(f"Failed to cleanup old rate limit data: {e}") 

752 

753 

754# Create a singleton instance 

755_tracker_instance: Optional[AdaptiveRateLimitTracker] = None 

756 

757 

758def get_tracker() -> AdaptiveRateLimitTracker: 

759 """Get the global rate limit tracker instance.""" 

760 global _tracker_instance 

761 if _tracker_instance is None: 

762 _tracker_instance = AdaptiveRateLimitTracker() 

763 return _tracker_instance