Coverage for src / local_deep_research / web_search_engines / rate_limiting / tracker.py: 97%

329 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Adaptive rate limit tracker that learns optimal retry wait times for each search engine. 

3""" 

4 

5import random 

6import threading 

7import time 

8from collections import deque 

9from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple 

10 

11from cachetools import LRUCache 

12 

13if TYPE_CHECKING: 

14 from sqlalchemy.orm import Session 

15from loguru import logger 

16 

17from ...config.thread_settings import ( 

18 NoSettingsContextError, 

19 get_setting_from_snapshot, 

20 get_settings_context, 

21) 

22from ...settings.env_registry import is_ci_environment 

23from ...utilities.thread_context import get_search_context 

24 

25# Lazy imports to avoid database initialization in programmatic mode 

26_db_imports = None 

27 

28 

29def _get_db_imports(): 

30 """Lazy load database imports only when needed.""" 

31 global _db_imports 

32 if _db_imports is None: 

33 try: 

34 from ...database.models import RateLimitAttempt, RateLimitEstimate 

35 from ...database.session_context import get_user_db_session 

36 

37 _db_imports = { 

38 "RateLimitAttempt": RateLimitAttempt, 

39 "RateLimitEstimate": RateLimitEstimate, 

40 "get_user_db_session": get_user_db_session, 

41 } 

42 except (ImportError, RuntimeError): 

43 # Database not available - programmatic mode 

44 _db_imports = {} 

45 return _db_imports 

46 

47 

48class AdaptiveRateLimitTracker: 

49 """ 

50 Tracks and learns optimal retry wait times for each search engine. 

51 Persists learned patterns to the main application database using SQLAlchemy. 

52 """ 

53 

54 def __init__(self, settings_snapshot=None, programmatic_mode=False): 

55 self.settings_snapshot = settings_snapshot or {} 

56 self.programmatic_mode = programmatic_mode 

57 

58 # Helper function to get settings with defaults 

59 def get_setting_or_default(key, default, type_fn=None): 

60 try: 

61 value = get_setting_from_snapshot( 

62 key, 

63 settings_snapshot=self.settings_snapshot, 

64 ) 

65 # Handle None values - return default instead of calling type_fn(None) 

66 if value is None: 

67 return default 

68 return type_fn(value) if type_fn else value 

69 except NoSettingsContextError: 

70 return default 

71 

72 # Get settings with explicit defaults 

73 self.memory_window = get_setting_or_default( 

74 "rate_limiting.memory_window", 100, int 

75 ) 

76 self.exploration_rate = get_setting_or_default( 

77 "rate_limiting.exploration_rate", 0.1, float 

78 ) 

79 self.learning_rate = get_setting_or_default( 

80 "rate_limiting.learning_rate", 0.3, float 

81 ) 

82 self.decay_per_day = get_setting_or_default( 

83 "rate_limiting.decay_per_day", 0.95, float 

84 ) 

85 

86 # In programmatic mode, default to disabled 

87 self.enabled = get_setting_or_default( 

88 "rate_limiting.enabled", 

89 not self.programmatic_mode, # Default based on mode 

90 bool, 

91 ) 

92 

93 profile = get_setting_or_default("rate_limiting.profile", "balanced") 

94 

95 if self.programmatic_mode and self.enabled: 

96 logger.info( 

97 "Rate limiting enabled in programmatic mode - using memory-only tracking without persistence" 

98 ) 

99 

100 # Apply rate limiting profile 

101 self._apply_profile(profile) 

102 

103 # Bounded in-memory caches for fast access (prevent memory leaks) 

104 # Lock required: LRUCache is not thread-safe (mutates on read via LRU reordering) 

105 self._cache_lock = threading.Lock() 

106 self.recent_attempts: LRUCache = LRUCache(maxsize=100) 

107 self.current_estimates: LRUCache = LRUCache(maxsize=100) 

108 

109 # Initialize the _estimates_loaded flag 

110 self._estimates_loaded = False 

111 

112 # Load estimates from database 

113 self._load_estimates() 

114 

115 logger.info( 

116 f"AdaptiveRateLimitTracker initialized: enabled={self.enabled}, profile={profile}" 

117 ) 

118 

119 def _apply_profile(self, profile: str) -> None: 

120 """Apply rate limiting profile settings.""" 

121 if profile == "conservative": 

122 # More conservative: lower exploration, slower learning 

123 self.exploration_rate = min( 

124 self.exploration_rate * 0.5, 0.05 

125 ) # 5% max exploration 

126 self.learning_rate = min( 

127 self.learning_rate * 0.7, 0.2 

128 ) # Slower learning 

129 logger.info("Applied conservative rate limiting profile") 

130 elif profile == "aggressive": 

131 # More aggressive: higher exploration, faster learning 

132 self.exploration_rate = min( 

133 self.exploration_rate * 1.5, 0.2 

134 ) # Up to 20% exploration 

135 self.learning_rate = min( 

136 self.learning_rate * 1.3, 0.5 

137 ) # Faster learning 

138 logger.info("Applied aggressive rate limiting profile") 

139 else: # balanced 

140 # Use settings as-is 

141 logger.info("Applied balanced rate limiting profile") 

142 

143 def _load_estimates(self) -> None: 

144 """Load estimates from database into memory.""" 

145 # Skip database operations in programmatic mode 

146 if self.programmatic_mode: 

147 logger.debug( 

148 "Skipping rate limit estimate loading in programmatic mode" 

149 ) 

150 self._estimates_loaded = ( 

151 True # Mark as loaded to skip future attempts 

152 ) 

153 return 

154 

155 # During initialization, we don't have user context yet 

156 # Mark that we need to load estimates when user context becomes available 

157 self._estimates_loaded = False 

158 logger.debug( 

159 "Rate limit estimates will be loaded on-demand when user context is available" 

160 ) 

161 

162 def _ensure_estimates_loaded(self) -> None: 

163 """Load estimates from user's encrypted database if not already loaded.""" 

164 # Early return if already loaded or should skip 

165 if self._estimates_loaded or self.programmatic_mode: 

166 if not self._estimates_loaded: 

167 self._estimates_loaded = True 

168 return 

169 

170 # Get database imports 

171 db_imports = _get_db_imports() 

172 RateLimitEstimate = ( 

173 db_imports.get("RateLimitEstimate") if db_imports else None 

174 ) 

175 

176 if not db_imports or not RateLimitEstimate: 

177 # Database not available 

178 self._estimates_loaded = True 

179 return 

180 

181 # Try to get research context from search tracker 

182 

183 context = get_search_context() 

184 if not context: 

185 # No context available (e.g., in tests or programmatic access) 

186 self._estimates_loaded = True 

187 return 

188 

189 username = context.get("username") 

190 password = context.get("user_password") 

191 

192 if username and password: 

193 try: 

194 # Use thread-safe metrics writer to read from user's encrypted database 

195 from ...database.thread_metrics import metrics_writer 

196 

197 # Set password for this thread 

198 metrics_writer.set_user_password(username, password) 

199 

200 session: "Session" 

201 with metrics_writer.get_session(username) as session: 

202 estimates: list[Any] = session.query( 

203 RateLimitEstimate 

204 ).all() 

205 

206 for estimate in estimates: 

207 # Apply decay for old estimates 

208 age_hours = (time.time() - estimate.last_updated) / 3600 

209 decay = self.decay_per_day ** (age_hours / 24) 

210 

211 with self._cache_lock: 

212 self.current_estimates[estimate.engine_type] = { 

213 "base": estimate.base_wait_seconds, 

214 "min": estimate.min_wait_seconds, 

215 "max": estimate.max_wait_seconds, 

216 "confidence": decay, 

217 } 

218 

219 logger.debug( 

220 f"Loaded estimate for {estimate.engine_type}: base={estimate.base_wait_seconds:.2f}s, confidence={decay:.2f}" 

221 ) 

222 

223 self._estimates_loaded = True 

224 logger.info( 

225 f"Loaded {len(estimates)} rate limit estimates from encrypted database" 

226 ) 

227 

228 except Exception: 

229 logger.warning("Could not load rate limit estimates") 

230 # Mark as loaded anyway to avoid repeated attempts 

231 self._estimates_loaded = True 

232 

233 def get_wait_time(self, engine_type: str) -> float: 

234 """ 

235 Get adaptive wait time for a search engine. 

236 Includes exploration to discover better rates. 

237 

238 Args: 

239 engine_type: Name of the search engine 

240 

241 Returns: 

242 Wait time in seconds 

243 """ 

244 # If rate limiting is disabled, return minimal wait time 

245 if not self.enabled: 

246 return 0.1 

247 

248 # Check if we have a user context - if not, handle appropriately 

249 context = get_search_context() 

250 if not context and not self.programmatic_mode: 

251 # No context and not in programmatic mode - this is unexpected 

252 logger.warning( 

253 f"No user context available for rate limiting on {engine_type} " 

254 "but programmatic_mode=False. Disabling rate limiting. " 

255 "This may indicate a configuration issue." 

256 ) 

257 return 0.0 

258 

259 # In programmatic mode, we continue with memory-only rate limiting even without context 

260 

261 # Ensure estimates are loaded from database 

262 self._ensure_estimates_loaded() 

263 

264 with self._cache_lock: 

265 if engine_type not in self.current_estimates: 

266 # First time seeing this engine - start optimistic and learn from real responses 

267 # Use engine-specific optimistic defaults only for what we know for sure 

268 optimistic_defaults = { 

269 "SearXNGSearchEngine": 0.1, # Self-hosted default engine 

270 } 

271 

272 wait_time = optimistic_defaults.get( 

273 engine_type, 0.1 

274 ) # Default optimistic for others 

275 logger.info( 

276 f"No rate limit data for {engine_type}, starting optimistic with {wait_time}s" 

277 ) 

278 return wait_time 

279 

280 estimate = self.current_estimates[engine_type] 

281 base_wait = estimate["base"] 

282 

283 # Security: random used for non-security rate-limit jitter, not tokens or secrets 

284 # Exploration vs exploitation 

285 if random.random() < self.exploration_rate: 

286 # Explore: try a faster rate to see if API limits have relaxed 

287 wait_time = base_wait * random.uniform(0.5, 0.9) 

288 logger.debug( 

289 f"Exploring faster rate for {engine_type}: {wait_time:.2f}s" 

290 ) 

291 else: 

292 # Exploit: use learned estimate with jitter 

293 wait_time = base_wait * random.uniform(0.9, 1.1) 

294 

295 # Enforce bounds 

296 wait_time = max( 

297 float(estimate["min"]), min(wait_time, float(estimate["max"])) 

298 ) 

299 return float(wait_time) 

300 

301 def apply_rate_limit(self, engine_type: str) -> float: 

302 """ 

303 Apply rate limiting for the given engine type. 

304 This is a convenience method that combines checking if rate limiting 

305 is enabled, getting the wait time, and sleeping if necessary. 

306 

307 Args: 

308 engine_type: The type of search engine 

309 

310 Returns: 

311 The wait time that was applied (0 if rate limiting is disabled) 

312 """ 

313 if not self.enabled: 

314 return 0.0 

315 

316 wait_time = self.get_wait_time(engine_type) 

317 if wait_time > 0: 

318 logger.debug( 

319 f"{engine_type} waiting {wait_time:.2f}s before request" 

320 ) 

321 time.sleep(wait_time) 

322 return wait_time 

323 

324 def record_outcome( 

325 self, 

326 engine_type: str, 

327 wait_time: float, 

328 success: bool, 

329 retry_count: int, 

330 error_type: Optional[str] = None, 

331 search_result_count: Optional[int] = None, 

332 ) -> None: 

333 """ 

334 Record the outcome of a retry attempt. 

335 

336 Args: 

337 engine_type: Name of the search engine 

338 wait_time: How long we waited before this attempt 

339 success: Whether the attempt succeeded 

340 retry_count: Which retry attempt this was (1, 2, 3, etc.) 

341 error_type: Type of error if failed 

342 search_result_count: Number of search results returned (for quality monitoring) 

343 """ 

344 # If rate limiting is disabled, don't record outcomes 

345 if not self.enabled: 

346 logger.info( 

347 f"Rate limiting disabled - not recording outcome for {engine_type}" 

348 ) 

349 return 

350 

351 logger.debug( 

352 f"Recording rate limit outcome for {engine_type}: success={success}, wait_time={wait_time}s" 

353 ) 

354 timestamp = time.time() 

355 

356 # NOTE: Database writes for rate limiting are disabled to prevent 

357 # database locking issues under heavy parallel search load. 

358 # Rate limiting still works via in-memory tracking below. 

359 # Historical rate limit data is not persisted to DB. 

360 

361 # Update in-memory tracking 

362 with self._cache_lock: 

363 if engine_type not in self.recent_attempts: 

364 # Get current memory window setting from thread context 

365 settings_context = get_settings_context() 

366 if settings_context: 

367 current_memory_window = int( 

368 settings_context.get_setting( 

369 "rate_limiting.memory_window", self.memory_window 

370 ) 

371 ) 

372 else: 

373 current_memory_window = self.memory_window 

374 

375 self.recent_attempts[engine_type] = deque( 

376 maxlen=current_memory_window 

377 ) 

378 

379 self.recent_attempts[engine_type].append( 

380 { 

381 "wait_time": wait_time, 

382 "success": success, 

383 "timestamp": timestamp, 

384 "retry_count": retry_count, 

385 "search_result_count": search_result_count, 

386 } 

387 ) 

388 

389 # Update estimates 

390 self._update_estimate(engine_type) 

391 

392 def _update_estimate(self, engine_type: str) -> None: 

393 """Update wait time estimate based on recent attempts.""" 

394 with self._cache_lock: 

395 if ( 

396 engine_type not in self.recent_attempts 

397 or len(self.recent_attempts[engine_type]) < 3 

398 ): 

399 attempt_count = len(self.recent_attempts.get(engine_type) or []) 

400 logger.info( 

401 f"Not updating estimate for {engine_type} - only {attempt_count} attempts (need 3)" 

402 ) 

403 return 

404 

405 attempts = list(self.recent_attempts[engine_type]) 

406 old_estimate = self.current_estimates.get(engine_type) 

407 

408 # Calculate success rate and optimal wait time 

409 successful_waits = [a["wait_time"] for a in attempts if a["success"]] 

410 failed_waits = [a["wait_time"] for a in attempts if not a["success"]] 

411 

412 if not successful_waits: 

413 # All attempts failed - increase wait time with a cap 

414 new_base = max(failed_waits) * 1.5 if failed_waits else 10.0 

415 # Cap the base wait time to prevent runaway growth 

416 new_base = min(new_base, 10.0) # Max 10 seconds base when all fail 

417 else: 

418 # Use 50th percentile (median) of successful waits for more stability 

419 # This provides a balanced approach between speed and reliability 

420 successful_waits.sort() 

421 percentile_50 = successful_waits[ 

422 max(0, int(len(successful_waits) * 0.50) - 1) 

423 ] 

424 new_base = percentile_50 

425 

426 # Update estimate with learning rate (exponential moving average) 

427 if old_estimate is not None: 

428 old_base = old_estimate["base"] 

429 # Get current learning rate from settings context 

430 settings_context = get_settings_context() 

431 if settings_context: 

432 current_learning_rate = float( 

433 settings_context.get_setting( 

434 "rate_limiting.learning_rate", self.learning_rate 

435 ) 

436 ) 

437 else: 

438 current_learning_rate = self.learning_rate 

439 

440 new_base = ( 

441 1 - current_learning_rate 

442 ) * old_base + current_learning_rate * new_base 

443 

444 # Apply absolute cap to prevent extreme wait times 

445 new_base = min(new_base, 10.0) # Cap base at 10 seconds 

446 

447 # Calculate bounds with more reasonable limits 

448 min_wait = max(0.01, new_base * 0.5) 

449 max_wait = min(10.0, new_base * 3.0) # Max 10 seconds absolute cap 

450 

451 # Update in memory 

452 with self._cache_lock: 

453 self.current_estimates[engine_type] = { 

454 "base": new_base, 

455 "min": min_wait, 

456 "max": max_wait, 

457 "confidence": min(len(attempts) / 20.0, 1.0), 

458 } 

459 

460 # Persist to database 

461 success_rate = len(successful_waits) / len(attempts) if attempts else 0 

462 

463 # Skip database operations in programmatic mode 

464 if self.programmatic_mode: 

465 logger.debug( 

466 f"Skipping estimate persistence in programmatic mode for {engine_type}" 

467 ) 

468 else: 

469 # Try to get research context from search tracker 

470 

471 context = get_search_context() 

472 username = None 

473 password = None 

474 if context is not None: 

475 username = context.get("username") 

476 password = context.get("user_password") 

477 

478 if username and password: 

479 try: 

480 # Use thread-safe metrics writer to save to user's encrypted database 

481 from ...database.thread_metrics import metrics_writer 

482 

483 # Set password for this thread if not already set 

484 metrics_writer.set_user_password(username, password) 

485 

486 db_imports = _get_db_imports() 

487 RateLimitEstimate = db_imports.get("RateLimitEstimate") 

488 

489 session_inner: "Session" 

490 with metrics_writer.get_session(username) as session_inner: 

491 # Check if estimate exists 

492 estimate: Any = ( 

493 session_inner.query(RateLimitEstimate) 

494 .filter_by(engine_type=engine_type) 

495 .first() 

496 ) 

497 

498 if estimate: 

499 # Update existing estimate 

500 estimate.base_wait_seconds = new_base 

501 estimate.min_wait_seconds = min_wait 

502 estimate.max_wait_seconds = max_wait 

503 estimate.last_updated = time.time() 

504 estimate.total_attempts = len(attempts) 

505 estimate.success_rate = success_rate 

506 else: 

507 # Create new estimate 

508 estimate = RateLimitEstimate( 

509 engine_type=engine_type, 

510 base_wait_seconds=new_base, 

511 min_wait_seconds=min_wait, 

512 max_wait_seconds=max_wait, 

513 last_updated=time.time(), 

514 total_attempts=len(attempts), 

515 success_rate=success_rate, 

516 ) 

517 session_inner.add(estimate) 

518 

519 except Exception: 

520 logger.exception("Failed to persist rate limit estimate") 

521 else: 

522 logger.debug( 

523 "Skipping rate limit estimate save - no user context" 

524 ) 

525 

526 logger.info( 

527 f"Updated rate limit for {engine_type}: {new_base:.2f}s " 

528 f"(success rate: {success_rate:.1%})" 

529 ) 

530 

531 def _get_in_memory_stats( 

532 self, engine_type: Optional[str] = None 

533 ) -> List[Tuple[str, float, float, float, float, int, float]]: 

534 """Return stats from in-memory caches (no DB access).""" 

535 with self._cache_lock: 

536 stats = [] 

537 engines_to_check = ( 

538 [engine_type] 

539 if engine_type 

540 else list(self.current_estimates.keys()) 

541 ) 

542 for engine in engines_to_check: 

543 if engine in self.current_estimates: 

544 est = self.current_estimates[engine] 

545 attempts = self.recent_attempts.get(engine) 

546 attempt_count = len(attempts) if attempts else 0 

547 stats.append( 

548 ( 

549 engine, 

550 est["base"], 

551 est["min"], 

552 est["max"], 

553 time.time(), 

554 attempt_count, 

555 est.get("confidence", 0.0), 

556 ) 

557 ) 

558 return stats 

559 

560 def get_stats( 

561 self, engine_type: Optional[str] = None 

562 ) -> List[Tuple[str, float, float, float, float, int, float]]: 

563 """ 

564 Get statistics for monitoring. 

565 

566 Args: 

567 engine_type: Specific engine to get stats for, or None for all 

568 

569 Returns: 

570 List of tuples with engine statistics 

571 """ 

572 # Skip database operations in CI mode 

573 if is_ci_environment(): 

574 logger.debug("Skipping database stats in CI mode") 

575 return self._get_in_memory_stats(engine_type) 

576 

577 # Skip database operations in programmatic mode 

578 if self.programmatic_mode: 

579 return self._get_in_memory_stats(engine_type) 

580 

581 try: 

582 context = get_search_context() 

583 if not context: 

584 return self._get_in_memory_stats(engine_type) 

585 

586 username = context.get("username") 

587 password = context.get("user_password") 

588 if not username or not password: 588 ↛ 589line 588 didn't jump to line 589 because the condition on line 588 was never true

589 return self._get_in_memory_stats(engine_type) 

590 

591 db_imports = _get_db_imports() 

592 RateLimitEstimate = db_imports.get("RateLimitEstimate") 

593 

594 from ...database.thread_metrics import metrics_writer 

595 

596 metrics_writer.set_user_password(username, password) 

597 

598 session_est: "Session" 

599 with metrics_writer.get_session(username) as session_est: 

600 if engine_type: 

601 estimates: list[Any] = ( 

602 session_est.query(RateLimitEstimate) 

603 .filter_by(engine_type=engine_type) 

604 .all() 

605 ) 

606 else: 

607 estimates = ( 

608 session_est.query(RateLimitEstimate) 

609 .order_by(RateLimitEstimate.engine_type) 

610 .all() 

611 ) 

612 

613 return [ 

614 ( 

615 est.engine_type, 

616 est.base_wait_seconds, 

617 est.min_wait_seconds, 

618 est.max_wait_seconds, 

619 est.last_updated, 

620 est.total_attempts, 

621 est.success_rate, 

622 ) 

623 for est in estimates 

624 ] 

625 except Exception: 

626 logger.warning("Failed to get rate limit stats from DB") 

627 # Return in-memory stats as fallback 

628 return self._get_in_memory_stats(engine_type) 

629 

630 def reset_engine(self, engine_type: str) -> None: 

631 """ 

632 Reset learned values for a specific engine. 

633 

634 Args: 

635 engine_type: Engine to reset 

636 """ 

637 # Always clear from memory first 

638 with self._cache_lock: 

639 if engine_type in self.recent_attempts: 

640 del self.recent_attempts[engine_type] 

641 if engine_type in self.current_estimates: 

642 del self.current_estimates[engine_type] 

643 

644 # Skip database operations in programmatic mode 

645 if self.programmatic_mode: 

646 logger.debug( 

647 f"Reset rate limit data for {engine_type} (memory only in programmatic mode)" 

648 ) 

649 return 

650 

651 # Skip database operations in CI mode 

652 if is_ci_environment(): 

653 logger.debug( 

654 f"Reset rate limit data for {engine_type} (memory only in CI mode)" 

655 ) 

656 return 

657 

658 try: 

659 context = get_search_context() 

660 if not context: 660 ↛ 661line 660 didn't jump to line 661 because the condition on line 660 was never true

661 logger.debug( 

662 f"Reset rate limit data for {engine_type} (memory only - no user context)" 

663 ) 

664 return 

665 

666 username = context.get("username") 

667 password = context.get("user_password") 

668 if not username or not password: 668 ↛ 669line 668 didn't jump to line 669 because the condition on line 668 was never true

669 logger.debug( 

670 f"Reset rate limit data for {engine_type} (memory only - no credentials)" 

671 ) 

672 return 

673 

674 db_imports = _get_db_imports() 

675 RateLimitAttempt = db_imports.get("RateLimitAttempt") 

676 RateLimitEstimate = db_imports.get("RateLimitEstimate") 

677 

678 from ...database.thread_metrics import metrics_writer 

679 

680 metrics_writer.set_user_password(username, password) 

681 

682 session_reset: "Session" 

683 with metrics_writer.get_session(username) as session_reset: 

684 # Delete historical attempts 

685 session_reset.query(RateLimitAttempt).filter_by( 

686 engine_type=engine_type 

687 ).delete() 

688 

689 # Delete estimates 

690 session_reset.query(RateLimitEstimate).filter_by( 

691 engine_type=engine_type 

692 ).delete() 

693 

694 session_reset.commit() 

695 

696 logger.info(f"Reset rate limit data for {engine_type}") 

697 

698 except Exception: 

699 logger.warning( 

700 f"Failed to reset rate limit data in database for {engine_type}" 

701 "In-memory data was cleared successfully." 

702 ) 

703 # Don't re-raise in test contexts - the memory cleanup is sufficient 

704 

705 def get_search_quality_stats( 

706 self, engine_type: Optional[str] = None 

707 ) -> List[Dict]: 

708 """ 

709 Get basic search quality statistics for monitoring. 

710 

711 Args: 

712 engine_type: Specific engine to get stats for, or None for all 

713 

714 Returns: 

715 List of dictionaries with search quality metrics 

716 """ 

717 stats = [] 

718 

719 with self._cache_lock: 

720 engines_to_check = ( 

721 [engine_type] 

722 if engine_type 

723 else list(self.recent_attempts.keys()) 

724 ) 

725 # Snapshot the data under lock, then process outside 

726 engine_attempts = {} 

727 for engine in engines_to_check: 

728 if engine in self.recent_attempts: 

729 engine_attempts[engine] = list(self.recent_attempts[engine]) 

730 

731 for engine, recent in engine_attempts.items(): 

732 search_counts = [ 

733 attempt.get("search_result_count", 0) 

734 for attempt in recent 

735 if attempt.get("search_result_count") is not None 

736 ] 

737 

738 if not search_counts: 

739 continue 

740 

741 recent_avg = sum(search_counts) / len(search_counts) 

742 

743 stats.append( 

744 { 

745 "engine_type": engine, 

746 "recent_avg_results": recent_avg, 

747 "min_recent_results": min(search_counts), 

748 "max_recent_results": max(search_counts), 

749 "sample_size": len(search_counts), 

750 "total_attempts": len(recent), 

751 "status": self._get_quality_status(recent_avg), 

752 } 

753 ) 

754 

755 return stats 

756 

757 def _get_quality_status(self, recent_avg: float) -> str: 

758 """Get quality status string based on average results.""" 

759 if recent_avg < 1: 

760 return "CRITICAL" 

761 if recent_avg < 3: 

762 return "WARNING" 

763 if recent_avg < 5: 

764 return "CAUTION" 

765 if recent_avg >= 10: 

766 return "EXCELLENT" 

767 return "GOOD" 

768 

769 def cleanup_old_data(self, days: int = 30) -> None: 

770 """ 

771 Remove old retry attempt data to prevent database bloat. 

772 

773 Args: 

774 days: Remove data older than this many days 

775 """ 

776 cutoff_time = time.time() - (days * 24 * 3600) 

777 

778 # Skip database operations in programmatic mode 

779 if self.programmatic_mode: 

780 logger.debug("Skipping database cleanup in programmatic mode") 

781 return 

782 

783 # Skip database operations in CI mode 

784 if is_ci_environment(): 

785 logger.debug("Skipping database cleanup in CI mode") 

786 return 

787 

788 try: 

789 context = get_search_context() 

790 if not context: 790 ↛ 791line 790 didn't jump to line 791 because the condition on line 790 was never true

791 logger.debug("Skipping database cleanup - no user context") 

792 return 

793 

794 username = context.get("username") 

795 password = context.get("user_password") 

796 if not username or not password: 796 ↛ 797line 796 didn't jump to line 797 because the condition on line 796 was never true

797 logger.debug("Skipping database cleanup - no credentials") 

798 return 

799 

800 db_imports = _get_db_imports() 

801 RateLimitAttempt = db_imports.get("RateLimitAttempt") 

802 

803 from ...database.thread_metrics import metrics_writer 

804 

805 metrics_writer.set_user_password(username, password) 

806 

807 session_clean: "Session" 

808 with metrics_writer.get_session(username) as session_clean: 

809 # Count and delete old attempts 

810 old_attempts: Any = session_clean.query( 

811 RateLimitAttempt 

812 ).filter(RateLimitAttempt.timestamp < cutoff_time) 

813 deleted_count = old_attempts.count() 

814 old_attempts.delete() 

815 

816 session_clean.commit() 

817 

818 if deleted_count > 0: 

819 logger.info(f"Cleaned up {deleted_count} old retry attempts") 

820 

821 except Exception: 

822 logger.warning("Failed to cleanup old rate limit data") 

823 

824 

825def get_tracker() -> AdaptiveRateLimitTracker: 

826 """Create a fresh rate limit tracker instance. 

827 

828 Returns a new instance each call so that per-user DB credentials 

829 are never cached across requests in a multi-user Flask app. 

830 """ 

831 return AdaptiveRateLimitTracker()