Coverage for src / local_deep_research / web_search_engines / rate_limiting / tracker.py: 62%
290 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Adaptive rate limit tracker that learns optimal retry wait times for each search engine.
3"""
5import random
6import time
7from collections import deque
8from typing import Dict, List, Optional, Tuple
10from loguru import logger
12from ...settings.env_registry import use_fallback_llm, is_ci_environment
13from ...utilities.thread_context import get_search_context
14from ...config.thread_settings import (
15 get_settings_context,
16 get_setting_from_snapshot,
17 NoSettingsContextError,
18)
20# Lazy imports to avoid database initialization in programmatic mode
21_db_imports = None
24def _get_db_imports():
25 """Lazy load database imports only when needed."""
26 global _db_imports
27 if _db_imports is None:
28 try:
29 from ...database.models import RateLimitAttempt, RateLimitEstimate
30 from ...database.session_context import get_user_db_session
32 _db_imports = {
33 "RateLimitAttempt": RateLimitAttempt,
34 "RateLimitEstimate": RateLimitEstimate,
35 "get_user_db_session": get_user_db_session,
36 }
37 except (ImportError, RuntimeError):
38 # Database not available - programmatic mode
39 _db_imports = {}
40 return _db_imports
43class AdaptiveRateLimitTracker:
44 """
45 Tracks and learns optimal retry wait times for each search engine.
46 Persists learned patterns to the main application database using SQLAlchemy.
47 """
49 def __init__(self, settings_snapshot=None, programmatic_mode=False):
50 self.settings_snapshot = settings_snapshot or {}
51 self.programmatic_mode = programmatic_mode
53 # Helper function to get settings with defaults
54 def get_setting_or_default(key, default, type_fn=None):
55 try:
56 value = get_setting_from_snapshot(
57 key,
58 settings_snapshot=self.settings_snapshot,
59 )
60 # Handle None values - return default instead of calling type_fn(None)
61 if value is None: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 return default
63 return type_fn(value) if type_fn else value
64 except NoSettingsContextError:
65 return default
67 # Get settings with explicit defaults
68 self.memory_window = get_setting_or_default(
69 "rate_limiting.memory_window", 100, int
70 )
71 self.exploration_rate = get_setting_or_default(
72 "rate_limiting.exploration_rate", 0.1, float
73 )
74 self.learning_rate = get_setting_or_default(
75 "rate_limiting.learning_rate", 0.3, float
76 )
77 self.decay_per_day = get_setting_or_default(
78 "rate_limiting.decay_per_day", 0.95, float
79 )
81 # In programmatic mode, default to disabled
82 self.enabled = get_setting_or_default(
83 "rate_limiting.enabled",
84 not self.programmatic_mode, # Default based on mode
85 bool,
86 )
88 profile = get_setting_or_default("rate_limiting.profile", "balanced")
90 if self.programmatic_mode and self.enabled:
91 logger.info(
92 "Rate limiting enabled in programmatic mode - using memory-only tracking without persistence"
93 )
95 # Apply rate limiting profile
96 self._apply_profile(profile)
98 # In-memory cache for fast access
99 self.recent_attempts: Dict[str, deque] = {}
100 self.current_estimates: Dict[str, Dict[str, float]] = {}
102 # Initialize the _estimates_loaded flag
103 self._estimates_loaded = False
105 # Load estimates from database
106 self._load_estimates()
108 logger.info(
109 f"AdaptiveRateLimitTracker initialized: enabled={self.enabled}, profile={profile}"
110 )
112 def _apply_profile(self, profile: str) -> None:
113 """Apply rate limiting profile settings."""
114 if profile == "conservative":
115 # More conservative: lower exploration, slower learning
116 self.exploration_rate = min(
117 self.exploration_rate * 0.5, 0.05
118 ) # 5% max exploration
119 self.learning_rate = min(
120 self.learning_rate * 0.7, 0.2
121 ) # Slower learning
122 logger.info("Applied conservative rate limiting profile")
123 elif profile == "aggressive":
124 # More aggressive: higher exploration, faster learning
125 self.exploration_rate = min(
126 self.exploration_rate * 1.5, 0.2
127 ) # Up to 20% exploration
128 self.learning_rate = min(
129 self.learning_rate * 1.3, 0.5
130 ) # Faster learning
131 logger.info("Applied aggressive rate limiting profile")
132 else: # balanced
133 # Use settings as-is
134 logger.info("Applied balanced rate limiting profile")
136 def _load_estimates(self) -> None:
137 """Load estimates from database into memory."""
138 # Skip database operations in programmatic mode
139 if self.programmatic_mode:
140 logger.debug(
141 "Skipping rate limit estimate loading in programmatic mode"
142 )
143 self._estimates_loaded = (
144 True # Mark as loaded to skip future attempts
145 )
146 return
148 # Skip database operations in fallback mode
149 if use_fallback_llm():
150 logger.debug(
151 "Skipping rate limit estimate loading in fallback mode"
152 )
153 return
155 # During initialization, we don't have user context yet
156 # Mark that we need to load estimates when user context becomes available
157 self._estimates_loaded = False
158 logger.debug(
159 "Rate limit estimates will be loaded on-demand when user context is available"
160 )
162 def _ensure_estimates_loaded(self) -> None:
163 """Load estimates from user's encrypted database if not already loaded."""
164 # Early return if already loaded or should skip
165 if self._estimates_loaded or self.programmatic_mode: 165 ↛ 171line 165 didn't jump to line 171 because the condition on line 165 was always true
166 if not self._estimates_loaded: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true
167 self._estimates_loaded = True
168 return
170 # Get database imports
171 db_imports = _get_db_imports()
172 RateLimitEstimate = (
173 db_imports.get("RateLimitEstimate") if db_imports else None
174 )
176 if not db_imports or not RateLimitEstimate:
177 # Database not available
178 self._estimates_loaded = True
179 return
181 # Try to get research context from search tracker
183 context = get_search_context()
184 if not context:
185 # No context available (e.g., in tests or programmatic access)
186 self._estimates_loaded = True
187 return
189 username = context.get("username")
190 password = context.get("user_password")
192 if username and password:
193 try:
194 # Use thread-safe metrics writer to read from user's encrypted database
195 from ...database.thread_metrics import metrics_writer
197 # Set password for this thread
198 metrics_writer.set_user_password(username, password)
200 with metrics_writer.get_session(username) as session:
201 estimates = session.query(RateLimitEstimate).all()
203 for estimate in estimates:
204 # Apply decay for old estimates
205 age_hours = (time.time() - estimate.last_updated) / 3600
206 decay = self.decay_per_day ** (age_hours / 24)
208 self.current_estimates[estimate.engine_type] = {
209 "base": estimate.base_wait_seconds,
210 "min": estimate.min_wait_seconds,
211 "max": estimate.max_wait_seconds,
212 "confidence": decay,
213 }
215 logger.debug(
216 f"Loaded estimate for {estimate.engine_type}: base={estimate.base_wait_seconds:.2f}s, confidence={decay:.2f}"
217 )
219 self._estimates_loaded = True
220 logger.info(
221 f"Loaded {len(estimates)} rate limit estimates from encrypted database"
222 )
224 except Exception as e:
225 logger.warning(f"Could not load rate limit estimates: {e}")
226 # Mark as loaded anyway to avoid repeated attempts
227 self._estimates_loaded = True
229 def get_wait_time(self, engine_type: str) -> float:
230 """
231 Get adaptive wait time for a search engine.
232 Includes exploration to discover better rates.
234 Args:
235 engine_type: Name of the search engine
237 Returns:
238 Wait time in seconds
239 """
240 # If rate limiting is disabled, return minimal wait time
241 if not self.enabled:
242 return 0.1
244 # Check if we have a user context - if not, handle appropriately
245 context = get_search_context()
246 if not context and not self.programmatic_mode:
247 # No context and not in programmatic mode - this is unexpected
248 logger.warning(
249 f"No user context available for rate limiting on {engine_type} "
250 "but programmatic_mode=False. Disabling rate limiting. "
251 "This may indicate a configuration issue."
252 )
253 return 0.0
255 # In programmatic mode, we continue with memory-only rate limiting even without context
257 # Ensure estimates are loaded from database
258 self._ensure_estimates_loaded()
260 if engine_type not in self.current_estimates:
261 # First time seeing this engine - start optimistic and learn from real responses
262 # Use engine-specific optimistic defaults only for what we know for sure
263 optimistic_defaults = {
264 "LocalSearchEngine": 0.0, # No network calls
265 "SearXNGSearchEngine": 0.1, # Self-hosted default engine
266 }
268 wait_time = optimistic_defaults.get(
269 engine_type, 0.1
270 ) # Default optimistic for others
271 logger.info(
272 f"No rate limit data for {engine_type}, starting optimistic with {wait_time}s"
273 )
274 return wait_time
276 estimate = self.current_estimates[engine_type]
277 base_wait = estimate["base"]
279 # Security: random used for non-security rate-limit jitter, not tokens or secrets
280 # Exploration vs exploitation
281 if random.random() < self.exploration_rate:
282 # Explore: try a faster rate to see if API limits have relaxed
283 wait_time = base_wait * random.uniform(0.5, 0.9)
284 logger.debug(
285 f"Exploring faster rate for {engine_type}: {wait_time:.2f}s"
286 )
287 else:
288 # Exploit: use learned estimate with jitter
289 wait_time = base_wait * random.uniform(0.9, 1.1)
291 # Enforce bounds
292 wait_time = max(estimate["min"], min(wait_time, estimate["max"]))
293 return wait_time
295 def apply_rate_limit(self, engine_type: str) -> float:
296 """
297 Apply rate limiting for the given engine type.
298 This is a convenience method that combines checking if rate limiting
299 is enabled, getting the wait time, and sleeping if necessary.
301 Args:
302 engine_type: The type of search engine
304 Returns:
305 The wait time that was applied (0 if rate limiting is disabled)
306 """
307 if not self.enabled:
308 return 0.0
310 wait_time = self.get_wait_time(engine_type)
311 if wait_time > 0:
312 logger.debug(
313 f"{engine_type} waiting {wait_time:.2f}s before request"
314 )
315 time.sleep(wait_time)
316 return wait_time
318 def record_outcome(
319 self,
320 engine_type: str,
321 wait_time: float,
322 success: bool,
323 retry_count: int,
324 error_type: Optional[str] = None,
325 search_result_count: Optional[int] = None,
326 ) -> None:
327 """
328 Record the outcome of a retry attempt.
330 Args:
331 engine_type: Name of the search engine
332 wait_time: How long we waited before this attempt
333 success: Whether the attempt succeeded
334 retry_count: Which retry attempt this was (1, 2, 3, etc.)
335 error_type: Type of error if failed
336 search_result_count: Number of search results returned (for quality monitoring)
337 """
338 # If rate limiting is disabled, don't record outcomes
339 if not self.enabled:
340 logger.info(
341 f"Rate limiting disabled - not recording outcome for {engine_type}"
342 )
343 return
345 logger.debug(
346 f"Recording rate limit outcome for {engine_type}: success={success}, wait_time={wait_time}s"
347 )
348 timestamp = time.time()
350 # NOTE: Database writes for rate limiting are disabled to prevent
351 # database locking issues under heavy parallel search load.
352 # Rate limiting still works via in-memory tracking below.
353 # Historical rate limit data is not persisted to DB.
355 # Update in-memory tracking
356 if engine_type not in self.recent_attempts:
357 # Get current memory window setting from thread context
358 settings_context = get_settings_context()
359 if settings_context: 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true
360 current_memory_window = int(
361 settings_context.get_setting(
362 "rate_limiting.memory_window", self.memory_window
363 )
364 )
365 else:
366 current_memory_window = self.memory_window
368 self.recent_attempts[engine_type] = deque(
369 maxlen=current_memory_window
370 )
372 self.recent_attempts[engine_type].append(
373 {
374 "wait_time": wait_time,
375 "success": success,
376 "timestamp": timestamp,
377 "retry_count": retry_count,
378 "search_result_count": search_result_count,
379 }
380 )
382 # Update estimates
383 self._update_estimate(engine_type)
385 def _update_estimate(self, engine_type: str) -> None:
386 """Update wait time estimate based on recent attempts."""
387 if (
388 engine_type not in self.recent_attempts
389 or len(self.recent_attempts[engine_type]) < 3
390 ):
391 logger.info(
392 f"Not updating estimate for {engine_type} - only {len(self.recent_attempts.get(engine_type, []))} attempts (need 3)"
393 )
394 return
396 attempts = list(self.recent_attempts[engine_type])
398 # Calculate success rate and optimal wait time
399 successful_waits = [a["wait_time"] for a in attempts if a["success"]]
400 failed_waits = [a["wait_time"] for a in attempts if not a["success"]]
402 if not successful_waits:
403 # All attempts failed - increase wait time with a cap
404 new_base = max(failed_waits) * 1.5 if failed_waits else 10.0
405 # Cap the base wait time to prevent runaway growth
406 new_base = min(new_base, 10.0) # Max 10 seconds base when all fail
407 else:
408 # Use 50th percentile (median) of successful waits for more stability
409 # This provides a balanced approach between speed and reliability
410 successful_waits.sort()
411 percentile_50 = successful_waits[
412 max(0, int(len(successful_waits) * 0.50) - 1)
413 ]
414 new_base = percentile_50
416 # Update estimate with learning rate (exponential moving average)
417 if engine_type in self.current_estimates:
418 old_base = self.current_estimates[engine_type]["base"]
419 # Get current learning rate from settings context
420 settings_context = get_settings_context()
421 if settings_context: 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true
422 current_learning_rate = float(
423 settings_context.get_setting(
424 "rate_limiting.learning_rate", self.learning_rate
425 )
426 )
427 else:
428 current_learning_rate = self.learning_rate
430 new_base = (
431 1 - current_learning_rate
432 ) * old_base + current_learning_rate * new_base
434 # Apply absolute cap to prevent extreme wait times
435 new_base = min(new_base, 10.0) # Cap base at 10 seconds
437 # Calculate bounds with more reasonable limits
438 min_wait = max(0.01, new_base * 0.5)
439 max_wait = min(10.0, new_base * 3.0) # Max 10 seconds absolute cap
441 # Update in memory
442 self.current_estimates[engine_type] = {
443 "base": new_base,
444 "min": min_wait,
445 "max": max_wait,
446 "confidence": min(len(attempts) / 20.0, 1.0),
447 }
449 # Persist to database (skip in fallback mode)
450 success_rate = len(successful_waits) / len(attempts) if attempts else 0
452 # Skip database operations in programmatic mode
453 if self.programmatic_mode: 453 ↛ 457line 453 didn't jump to line 457 because the condition on line 453 was always true
454 logger.debug(
455 f"Skipping estimate persistence in programmatic mode for {engine_type}"
456 )
457 elif not use_fallback_llm():
458 # Try to get research context from search tracker
460 context = get_search_context()
461 username = None
462 password = None
463 if context is not None:
464 username = context.get("username")
465 password = context.get("user_password")
467 if username and password:
468 try:
469 # Use thread-safe metrics writer to save to user's encrypted database
470 from ...database.thread_metrics import metrics_writer
472 # Set password for this thread if not already set
473 metrics_writer.set_user_password(username, password)
475 db_imports = _get_db_imports()
476 RateLimitEstimate = db_imports.get("RateLimitEstimate")
478 with metrics_writer.get_session(username) as session:
479 # Check if estimate exists
480 estimate = (
481 session.query(RateLimitEstimate)
482 .filter_by(engine_type=engine_type)
483 .first()
484 )
486 if estimate:
487 # Update existing estimate
488 estimate.base_wait_seconds = new_base
489 estimate.min_wait_seconds = min_wait
490 estimate.max_wait_seconds = max_wait
491 estimate.last_updated = time.time()
492 estimate.total_attempts = len(attempts)
493 estimate.success_rate = success_rate
494 else:
495 # Create new estimate
496 estimate = RateLimitEstimate(
497 engine_type=engine_type,
498 base_wait_seconds=new_base,
499 min_wait_seconds=min_wait,
500 max_wait_seconds=max_wait,
501 last_updated=time.time(),
502 total_attempts=len(attempts),
503 success_rate=success_rate,
504 )
505 session.add(estimate)
507 except Exception:
508 logger.exception("Failed to persist rate limit estimate")
509 else:
510 logger.debug(
511 "Skipping rate limit estimate save - no user context"
512 )
514 logger.info(
515 f"Updated rate limit for {engine_type}: {new_base:.2f}s "
516 f"(success rate: {success_rate:.1%})"
517 )
519 def get_stats(
520 self, engine_type: Optional[str] = None
521 ) -> List[Tuple[str, float, float, float, float, int, float]]:
522 """
523 Get statistics for monitoring.
525 Args:
526 engine_type: Specific engine to get stats for, or None for all
528 Returns:
529 List of tuples with engine statistics
530 """
531 # Skip database operations in test/fallback mode
532 if use_fallback_llm() or is_ci_environment(): 532 ↛ 558line 532 didn't jump to line 558 because the condition on line 532 was always true
533 logger.debug("Skipping database stats in test/CI mode")
534 # Return stats from in-memory estimates
535 stats = []
536 engines_to_check = (
537 [engine_type]
538 if engine_type
539 else list(self.current_estimates.keys())
540 )
541 for engine in engines_to_check:
542 if engine in self.current_estimates: 542 ↛ 541line 542 didn't jump to line 541 because the condition on line 542 was always true
543 est = self.current_estimates[engine]
544 stats.append(
545 (
546 engine,
547 est["base"],
548 est["min"],
549 est["max"],
550 time.time(),
551 len(self.recent_attempts.get(engine, [])),
552 est.get("confidence", 0.0),
553 )
554 )
555 return stats
557 # Skip database operations in programmatic mode
558 if self.programmatic_mode:
559 return stats
561 try:
562 db_imports = _get_db_imports()
563 get_user_db_session = db_imports.get("get_user_db_session")
564 RateLimitEstimate = db_imports.get("RateLimitEstimate")
566 with get_user_db_session() as session:
567 if engine_type:
568 estimates = (
569 session.query(RateLimitEstimate)
570 .filter_by(engine_type=engine_type)
571 .all()
572 )
573 else:
574 estimates = (
575 session.query(RateLimitEstimate)
576 .order_by(RateLimitEstimate.engine_type)
577 .all()
578 )
580 return [
581 (
582 est.engine_type,
583 est.base_wait_seconds,
584 est.min_wait_seconds,
585 est.max_wait_seconds,
586 est.last_updated,
587 est.total_attempts,
588 est.success_rate,
589 )
590 for est in estimates
591 ]
592 except Exception as e:
593 logger.warning(f"Failed to get rate limit stats from DB: {e}")
594 # Return in-memory stats as fallback
595 return self.get_stats(engine_type)
597 def reset_engine(self, engine_type: str) -> None:
598 """
599 Reset learned values for a specific engine.
601 Args:
602 engine_type: Engine to reset
603 """
604 # Always clear from memory first
605 if engine_type in self.recent_attempts:
606 del self.recent_attempts[engine_type]
607 if engine_type in self.current_estimates:
608 del self.current_estimates[engine_type]
610 # Skip database operations in programmatic mode
611 if self.programmatic_mode:
612 logger.debug(
613 f"Reset rate limit data for {engine_type} (memory only in programmatic mode)"
614 )
615 return
617 # Skip database operations in test/fallback mode
618 if use_fallback_llm() or is_ci_environment(): 618 ↛ 624line 618 didn't jump to line 624 because the condition on line 618 was always true
619 logger.debug(
620 f"Reset rate limit data for {engine_type} (memory only in test/CI mode)"
621 )
622 return
624 try:
625 db_imports = _get_db_imports()
626 get_user_db_session = db_imports.get("get_user_db_session")
627 RateLimitAttempt = db_imports.get("RateLimitAttempt")
628 RateLimitEstimate = db_imports.get("RateLimitEstimate")
630 with get_user_db_session() as session:
631 # Delete historical attempts
632 session.query(RateLimitAttempt).filter_by(
633 engine_type=engine_type
634 ).delete()
636 # Delete estimates
637 session.query(RateLimitEstimate).filter_by(
638 engine_type=engine_type
639 ).delete()
641 session.commit()
643 logger.info(f"Reset rate limit data for {engine_type}")
645 except Exception as e:
646 logger.warning(
647 f"Failed to reset rate limit data in database for {engine_type}: {e}. "
648 "In-memory data was cleared successfully."
649 )
650 # Don't re-raise in test contexts - the memory cleanup is sufficient
652 def get_search_quality_stats(
653 self, engine_type: Optional[str] = None
654 ) -> List[Dict]:
655 """
656 Get basic search quality statistics for monitoring.
658 Args:
659 engine_type: Specific engine to get stats for, or None for all
661 Returns:
662 List of dictionaries with search quality metrics
663 """
664 stats = []
666 engines_to_check = (
667 [engine_type] if engine_type else list(self.recent_attempts.keys())
668 )
670 for engine in engines_to_check: 670 ↛ 671line 670 didn't jump to line 671 because the loop on line 670 never started
671 if engine not in self.recent_attempts:
672 continue
674 recent = list(self.recent_attempts[engine])
675 search_counts = [
676 attempt.get("search_result_count", 0)
677 for attempt in recent
678 if attempt.get("search_result_count") is not None
679 ]
681 if not search_counts:
682 continue
684 recent_avg = sum(search_counts) / len(search_counts)
686 stats.append(
687 {
688 "engine_type": engine,
689 "recent_avg_results": recent_avg,
690 "min_recent_results": min(search_counts),
691 "max_recent_results": max(search_counts),
692 "sample_size": len(search_counts),
693 "total_attempts": len(recent),
694 "status": self._get_quality_status(recent_avg),
695 }
696 )
698 return stats
700 def _get_quality_status(self, recent_avg: float) -> str:
701 """Get quality status string based on average results."""
702 if recent_avg < 1:
703 return "CRITICAL"
704 elif recent_avg < 3:
705 return "WARNING"
706 elif recent_avg < 5:
707 return "CAUTION"
708 elif recent_avg >= 10:
709 return "EXCELLENT"
710 else:
711 return "GOOD"
713 def cleanup_old_data(self, days: int = 30) -> None:
714 """
715 Remove old retry attempt data to prevent database bloat.
717 Args:
718 days: Remove data older than this many days
719 """
720 cutoff_time = time.time() - (days * 24 * 3600)
722 # Skip database operations in programmatic mode
723 if self.programmatic_mode: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true
724 logger.debug("Skipping database cleanup in programmatic mode")
725 return
727 # Skip database operations in test/fallback mode
728 if use_fallback_llm() or is_ci_environment(): 728 ↛ 732line 728 didn't jump to line 732 because the condition on line 728 was always true
729 logger.debug("Skipping database cleanup in test/CI mode")
730 return
732 try:
733 db_imports = _get_db_imports()
734 get_user_db_session = db_imports.get("get_user_db_session")
735 RateLimitAttempt = db_imports.get("RateLimitAttempt")
737 with get_user_db_session() as session:
738 # Count and delete old attempts
739 old_attempts = session.query(RateLimitAttempt).filter(
740 RateLimitAttempt.timestamp < cutoff_time
741 )
742 deleted_count = old_attempts.count()
743 old_attempts.delete()
745 session.commit()
747 if deleted_count > 0:
748 logger.info(f"Cleaned up {deleted_count} old retry attempts")
750 except Exception as e:
751 logger.warning(f"Failed to cleanup old rate limit data: {e}")
754# Create a singleton instance
755_tracker_instance: Optional[AdaptiveRateLimitTracker] = None
758def get_tracker() -> AdaptiveRateLimitTracker:
759 """Get the global rate limit tracker instance."""
760 global _tracker_instance
761 if _tracker_instance is None:
762 _tracker_instance = AdaptiveRateLimitTracker()
763 return _tracker_instance