Coverage for src / local_deep_research / web_search_engines / rate_limiting / tracker.py: 97%
329 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Adaptive rate limit tracker that learns optimal retry wait times for each search engine.
3"""
5import random
6import threading
7import time
8from collections import deque
9from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
11from cachetools import LRUCache
13if TYPE_CHECKING:
14 from sqlalchemy.orm import Session
15from loguru import logger
17from ...config.thread_settings import (
18 NoSettingsContextError,
19 get_setting_from_snapshot,
20 get_settings_context,
21)
22from ...settings.env_registry import is_ci_environment
23from ...utilities.thread_context import get_search_context
25# Lazy imports to avoid database initialization in programmatic mode
26_db_imports = None
29def _get_db_imports():
30 """Lazy load database imports only when needed."""
31 global _db_imports
32 if _db_imports is None:
33 try:
34 from ...database.models import RateLimitAttempt, RateLimitEstimate
35 from ...database.session_context import get_user_db_session
37 _db_imports = {
38 "RateLimitAttempt": RateLimitAttempt,
39 "RateLimitEstimate": RateLimitEstimate,
40 "get_user_db_session": get_user_db_session,
41 }
42 except (ImportError, RuntimeError):
43 # Database not available - programmatic mode
44 _db_imports = {}
45 return _db_imports
48class AdaptiveRateLimitTracker:
49 """
50 Tracks and learns optimal retry wait times for each search engine.
51 Persists learned patterns to the main application database using SQLAlchemy.
52 """
54 def __init__(self, settings_snapshot=None, programmatic_mode=False):
55 self.settings_snapshot = settings_snapshot or {}
56 self.programmatic_mode = programmatic_mode
58 # Helper function to get settings with defaults
59 def get_setting_or_default(key, default, type_fn=None):
60 try:
61 value = get_setting_from_snapshot(
62 key,
63 settings_snapshot=self.settings_snapshot,
64 )
65 # Handle None values - return default instead of calling type_fn(None)
66 if value is None:
67 return default
68 return type_fn(value) if type_fn else value
69 except NoSettingsContextError:
70 return default
72 # Get settings with explicit defaults
73 self.memory_window = get_setting_or_default(
74 "rate_limiting.memory_window", 100, int
75 )
76 self.exploration_rate = get_setting_or_default(
77 "rate_limiting.exploration_rate", 0.1, float
78 )
79 self.learning_rate = get_setting_or_default(
80 "rate_limiting.learning_rate", 0.3, float
81 )
82 self.decay_per_day = get_setting_or_default(
83 "rate_limiting.decay_per_day", 0.95, float
84 )
86 # In programmatic mode, default to disabled
87 self.enabled = get_setting_or_default(
88 "rate_limiting.enabled",
89 not self.programmatic_mode, # Default based on mode
90 bool,
91 )
93 profile = get_setting_or_default("rate_limiting.profile", "balanced")
95 if self.programmatic_mode and self.enabled:
96 logger.info(
97 "Rate limiting enabled in programmatic mode - using memory-only tracking without persistence"
98 )
100 # Apply rate limiting profile
101 self._apply_profile(profile)
103 # Bounded in-memory caches for fast access (prevent memory leaks)
104 # Lock required: LRUCache is not thread-safe (mutates on read via LRU reordering)
105 self._cache_lock = threading.Lock()
106 self.recent_attempts: LRUCache = LRUCache(maxsize=100)
107 self.current_estimates: LRUCache = LRUCache(maxsize=100)
109 # Initialize the _estimates_loaded flag
110 self._estimates_loaded = False
112 # Load estimates from database
113 self._load_estimates()
115 logger.info(
116 f"AdaptiveRateLimitTracker initialized: enabled={self.enabled}, profile={profile}"
117 )
119 def _apply_profile(self, profile: str) -> None:
120 """Apply rate limiting profile settings."""
121 if profile == "conservative":
122 # More conservative: lower exploration, slower learning
123 self.exploration_rate = min(
124 self.exploration_rate * 0.5, 0.05
125 ) # 5% max exploration
126 self.learning_rate = min(
127 self.learning_rate * 0.7, 0.2
128 ) # Slower learning
129 logger.info("Applied conservative rate limiting profile")
130 elif profile == "aggressive":
131 # More aggressive: higher exploration, faster learning
132 self.exploration_rate = min(
133 self.exploration_rate * 1.5, 0.2
134 ) # Up to 20% exploration
135 self.learning_rate = min(
136 self.learning_rate * 1.3, 0.5
137 ) # Faster learning
138 logger.info("Applied aggressive rate limiting profile")
139 else: # balanced
140 # Use settings as-is
141 logger.info("Applied balanced rate limiting profile")
143 def _load_estimates(self) -> None:
144 """Load estimates from database into memory."""
145 # Skip database operations in programmatic mode
146 if self.programmatic_mode:
147 logger.debug(
148 "Skipping rate limit estimate loading in programmatic mode"
149 )
150 self._estimates_loaded = (
151 True # Mark as loaded to skip future attempts
152 )
153 return
155 # During initialization, we don't have user context yet
156 # Mark that we need to load estimates when user context becomes available
157 self._estimates_loaded = False
158 logger.debug(
159 "Rate limit estimates will be loaded on-demand when user context is available"
160 )
162 def _ensure_estimates_loaded(self) -> None:
163 """Load estimates from user's encrypted database if not already loaded."""
164 # Early return if already loaded or should skip
165 if self._estimates_loaded or self.programmatic_mode:
166 if not self._estimates_loaded:
167 self._estimates_loaded = True
168 return
170 # Get database imports
171 db_imports = _get_db_imports()
172 RateLimitEstimate = (
173 db_imports.get("RateLimitEstimate") if db_imports else None
174 )
176 if not db_imports or not RateLimitEstimate:
177 # Database not available
178 self._estimates_loaded = True
179 return
181 # Try to get research context from search tracker
183 context = get_search_context()
184 if not context:
185 # No context available (e.g., in tests or programmatic access)
186 self._estimates_loaded = True
187 return
189 username = context.get("username")
190 password = context.get("user_password")
192 if username and password:
193 try:
194 # Use thread-safe metrics writer to read from user's encrypted database
195 from ...database.thread_metrics import metrics_writer
197 # Set password for this thread
198 metrics_writer.set_user_password(username, password)
200 session: "Session"
201 with metrics_writer.get_session(username) as session:
202 estimates: list[Any] = session.query(
203 RateLimitEstimate
204 ).all()
206 for estimate in estimates:
207 # Apply decay for old estimates
208 age_hours = (time.time() - estimate.last_updated) / 3600
209 decay = self.decay_per_day ** (age_hours / 24)
211 with self._cache_lock:
212 self.current_estimates[estimate.engine_type] = {
213 "base": estimate.base_wait_seconds,
214 "min": estimate.min_wait_seconds,
215 "max": estimate.max_wait_seconds,
216 "confidence": decay,
217 }
219 logger.debug(
220 f"Loaded estimate for {estimate.engine_type}: base={estimate.base_wait_seconds:.2f}s, confidence={decay:.2f}"
221 )
223 self._estimates_loaded = True
224 logger.info(
225 f"Loaded {len(estimates)} rate limit estimates from encrypted database"
226 )
228 except Exception:
229 logger.warning("Could not load rate limit estimates")
230 # Mark as loaded anyway to avoid repeated attempts
231 self._estimates_loaded = True
233 def get_wait_time(self, engine_type: str) -> float:
234 """
235 Get adaptive wait time for a search engine.
236 Includes exploration to discover better rates.
238 Args:
239 engine_type: Name of the search engine
241 Returns:
242 Wait time in seconds
243 """
244 # If rate limiting is disabled, return minimal wait time
245 if not self.enabled:
246 return 0.1
248 # Check if we have a user context - if not, handle appropriately
249 context = get_search_context()
250 if not context and not self.programmatic_mode:
251 # No context and not in programmatic mode - this is unexpected
252 logger.warning(
253 f"No user context available for rate limiting on {engine_type} "
254 "but programmatic_mode=False. Disabling rate limiting. "
255 "This may indicate a configuration issue."
256 )
257 return 0.0
259 # In programmatic mode, we continue with memory-only rate limiting even without context
261 # Ensure estimates are loaded from database
262 self._ensure_estimates_loaded()
264 with self._cache_lock:
265 if engine_type not in self.current_estimates:
266 # First time seeing this engine - start optimistic and learn from real responses
267 # Use engine-specific optimistic defaults only for what we know for sure
268 optimistic_defaults = {
269 "SearXNGSearchEngine": 0.1, # Self-hosted default engine
270 }
272 wait_time = optimistic_defaults.get(
273 engine_type, 0.1
274 ) # Default optimistic for others
275 logger.info(
276 f"No rate limit data for {engine_type}, starting optimistic with {wait_time}s"
277 )
278 return wait_time
280 estimate = self.current_estimates[engine_type]
281 base_wait = estimate["base"]
283 # Security: random used for non-security rate-limit jitter, not tokens or secrets
284 # Exploration vs exploitation
285 if random.random() < self.exploration_rate:
286 # Explore: try a faster rate to see if API limits have relaxed
287 wait_time = base_wait * random.uniform(0.5, 0.9)
288 logger.debug(
289 f"Exploring faster rate for {engine_type}: {wait_time:.2f}s"
290 )
291 else:
292 # Exploit: use learned estimate with jitter
293 wait_time = base_wait * random.uniform(0.9, 1.1)
295 # Enforce bounds
296 wait_time = max(
297 float(estimate["min"]), min(wait_time, float(estimate["max"]))
298 )
299 return float(wait_time)
301 def apply_rate_limit(self, engine_type: str) -> float:
302 """
303 Apply rate limiting for the given engine type.
304 This is a convenience method that combines checking if rate limiting
305 is enabled, getting the wait time, and sleeping if necessary.
307 Args:
308 engine_type: The type of search engine
310 Returns:
311 The wait time that was applied (0 if rate limiting is disabled)
312 """
313 if not self.enabled:
314 return 0.0
316 wait_time = self.get_wait_time(engine_type)
317 if wait_time > 0:
318 logger.debug(
319 f"{engine_type} waiting {wait_time:.2f}s before request"
320 )
321 time.sleep(wait_time)
322 return wait_time
324 def record_outcome(
325 self,
326 engine_type: str,
327 wait_time: float,
328 success: bool,
329 retry_count: int,
330 error_type: Optional[str] = None,
331 search_result_count: Optional[int] = None,
332 ) -> None:
333 """
334 Record the outcome of a retry attempt.
336 Args:
337 engine_type: Name of the search engine
338 wait_time: How long we waited before this attempt
339 success: Whether the attempt succeeded
340 retry_count: Which retry attempt this was (1, 2, 3, etc.)
341 error_type: Type of error if failed
342 search_result_count: Number of search results returned (for quality monitoring)
343 """
344 # If rate limiting is disabled, don't record outcomes
345 if not self.enabled:
346 logger.info(
347 f"Rate limiting disabled - not recording outcome for {engine_type}"
348 )
349 return
351 logger.debug(
352 f"Recording rate limit outcome for {engine_type}: success={success}, wait_time={wait_time}s"
353 )
354 timestamp = time.time()
356 # NOTE: Database writes for rate limiting are disabled to prevent
357 # database locking issues under heavy parallel search load.
358 # Rate limiting still works via in-memory tracking below.
359 # Historical rate limit data is not persisted to DB.
361 # Update in-memory tracking
362 with self._cache_lock:
363 if engine_type not in self.recent_attempts:
364 # Get current memory window setting from thread context
365 settings_context = get_settings_context()
366 if settings_context:
367 current_memory_window = int(
368 settings_context.get_setting(
369 "rate_limiting.memory_window", self.memory_window
370 )
371 )
372 else:
373 current_memory_window = self.memory_window
375 self.recent_attempts[engine_type] = deque(
376 maxlen=current_memory_window
377 )
379 self.recent_attempts[engine_type].append(
380 {
381 "wait_time": wait_time,
382 "success": success,
383 "timestamp": timestamp,
384 "retry_count": retry_count,
385 "search_result_count": search_result_count,
386 }
387 )
389 # Update estimates
390 self._update_estimate(engine_type)
392 def _update_estimate(self, engine_type: str) -> None:
393 """Update wait time estimate based on recent attempts."""
394 with self._cache_lock:
395 if (
396 engine_type not in self.recent_attempts
397 or len(self.recent_attempts[engine_type]) < 3
398 ):
399 attempt_count = len(self.recent_attempts.get(engine_type) or [])
400 logger.info(
401 f"Not updating estimate for {engine_type} - only {attempt_count} attempts (need 3)"
402 )
403 return
405 attempts = list(self.recent_attempts[engine_type])
406 old_estimate = self.current_estimates.get(engine_type)
408 # Calculate success rate and optimal wait time
409 successful_waits = [a["wait_time"] for a in attempts if a["success"]]
410 failed_waits = [a["wait_time"] for a in attempts if not a["success"]]
412 if not successful_waits:
413 # All attempts failed - increase wait time with a cap
414 new_base = max(failed_waits) * 1.5 if failed_waits else 10.0
415 # Cap the base wait time to prevent runaway growth
416 new_base = min(new_base, 10.0) # Max 10 seconds base when all fail
417 else:
418 # Use 50th percentile (median) of successful waits for more stability
419 # This provides a balanced approach between speed and reliability
420 successful_waits.sort()
421 percentile_50 = successful_waits[
422 max(0, int(len(successful_waits) * 0.50) - 1)
423 ]
424 new_base = percentile_50
426 # Update estimate with learning rate (exponential moving average)
427 if old_estimate is not None:
428 old_base = old_estimate["base"]
429 # Get current learning rate from settings context
430 settings_context = get_settings_context()
431 if settings_context:
432 current_learning_rate = float(
433 settings_context.get_setting(
434 "rate_limiting.learning_rate", self.learning_rate
435 )
436 )
437 else:
438 current_learning_rate = self.learning_rate
440 new_base = (
441 1 - current_learning_rate
442 ) * old_base + current_learning_rate * new_base
444 # Apply absolute cap to prevent extreme wait times
445 new_base = min(new_base, 10.0) # Cap base at 10 seconds
447 # Calculate bounds with more reasonable limits
448 min_wait = max(0.01, new_base * 0.5)
449 max_wait = min(10.0, new_base * 3.0) # Max 10 seconds absolute cap
451 # Update in memory
452 with self._cache_lock:
453 self.current_estimates[engine_type] = {
454 "base": new_base,
455 "min": min_wait,
456 "max": max_wait,
457 "confidence": min(len(attempts) / 20.0, 1.0),
458 }
460 # Persist to database
461 success_rate = len(successful_waits) / len(attempts) if attempts else 0
463 # Skip database operations in programmatic mode
464 if self.programmatic_mode:
465 logger.debug(
466 f"Skipping estimate persistence in programmatic mode for {engine_type}"
467 )
468 else:
469 # Try to get research context from search tracker
471 context = get_search_context()
472 username = None
473 password = None
474 if context is not None:
475 username = context.get("username")
476 password = context.get("user_password")
478 if username and password:
479 try:
480 # Use thread-safe metrics writer to save to user's encrypted database
481 from ...database.thread_metrics import metrics_writer
483 # Set password for this thread if not already set
484 metrics_writer.set_user_password(username, password)
486 db_imports = _get_db_imports()
487 RateLimitEstimate = db_imports.get("RateLimitEstimate")
489 session_inner: "Session"
490 with metrics_writer.get_session(username) as session_inner:
491 # Check if estimate exists
492 estimate: Any = (
493 session_inner.query(RateLimitEstimate)
494 .filter_by(engine_type=engine_type)
495 .first()
496 )
498 if estimate:
499 # Update existing estimate
500 estimate.base_wait_seconds = new_base
501 estimate.min_wait_seconds = min_wait
502 estimate.max_wait_seconds = max_wait
503 estimate.last_updated = time.time()
504 estimate.total_attempts = len(attempts)
505 estimate.success_rate = success_rate
506 else:
507 # Create new estimate
508 estimate = RateLimitEstimate(
509 engine_type=engine_type,
510 base_wait_seconds=new_base,
511 min_wait_seconds=min_wait,
512 max_wait_seconds=max_wait,
513 last_updated=time.time(),
514 total_attempts=len(attempts),
515 success_rate=success_rate,
516 )
517 session_inner.add(estimate)
519 except Exception:
520 logger.exception("Failed to persist rate limit estimate")
521 else:
522 logger.debug(
523 "Skipping rate limit estimate save - no user context"
524 )
526 logger.info(
527 f"Updated rate limit for {engine_type}: {new_base:.2f}s "
528 f"(success rate: {success_rate:.1%})"
529 )
531 def _get_in_memory_stats(
532 self, engine_type: Optional[str] = None
533 ) -> List[Tuple[str, float, float, float, float, int, float]]:
534 """Return stats from in-memory caches (no DB access)."""
535 with self._cache_lock:
536 stats = []
537 engines_to_check = (
538 [engine_type]
539 if engine_type
540 else list(self.current_estimates.keys())
541 )
542 for engine in engines_to_check:
543 if engine in self.current_estimates:
544 est = self.current_estimates[engine]
545 attempts = self.recent_attempts.get(engine)
546 attempt_count = len(attempts) if attempts else 0
547 stats.append(
548 (
549 engine,
550 est["base"],
551 est["min"],
552 est["max"],
553 time.time(),
554 attempt_count,
555 est.get("confidence", 0.0),
556 )
557 )
558 return stats
560 def get_stats(
561 self, engine_type: Optional[str] = None
562 ) -> List[Tuple[str, float, float, float, float, int, float]]:
563 """
564 Get statistics for monitoring.
566 Args:
567 engine_type: Specific engine to get stats for, or None for all
569 Returns:
570 List of tuples with engine statistics
571 """
572 # Skip database operations in CI mode
573 if is_ci_environment():
574 logger.debug("Skipping database stats in CI mode")
575 return self._get_in_memory_stats(engine_type)
577 # Skip database operations in programmatic mode
578 if self.programmatic_mode:
579 return self._get_in_memory_stats(engine_type)
581 try:
582 context = get_search_context()
583 if not context:
584 return self._get_in_memory_stats(engine_type)
586 username = context.get("username")
587 password = context.get("user_password")
588 if not username or not password: 588 ↛ 589line 588 didn't jump to line 589 because the condition on line 588 was never true
589 return self._get_in_memory_stats(engine_type)
591 db_imports = _get_db_imports()
592 RateLimitEstimate = db_imports.get("RateLimitEstimate")
594 from ...database.thread_metrics import metrics_writer
596 metrics_writer.set_user_password(username, password)
598 session_est: "Session"
599 with metrics_writer.get_session(username) as session_est:
600 if engine_type:
601 estimates: list[Any] = (
602 session_est.query(RateLimitEstimate)
603 .filter_by(engine_type=engine_type)
604 .all()
605 )
606 else:
607 estimates = (
608 session_est.query(RateLimitEstimate)
609 .order_by(RateLimitEstimate.engine_type)
610 .all()
611 )
613 return [
614 (
615 est.engine_type,
616 est.base_wait_seconds,
617 est.min_wait_seconds,
618 est.max_wait_seconds,
619 est.last_updated,
620 est.total_attempts,
621 est.success_rate,
622 )
623 for est in estimates
624 ]
625 except Exception:
626 logger.warning("Failed to get rate limit stats from DB")
627 # Return in-memory stats as fallback
628 return self._get_in_memory_stats(engine_type)
630 def reset_engine(self, engine_type: str) -> None:
631 """
632 Reset learned values for a specific engine.
634 Args:
635 engine_type: Engine to reset
636 """
637 # Always clear from memory first
638 with self._cache_lock:
639 if engine_type in self.recent_attempts:
640 del self.recent_attempts[engine_type]
641 if engine_type in self.current_estimates:
642 del self.current_estimates[engine_type]
644 # Skip database operations in programmatic mode
645 if self.programmatic_mode:
646 logger.debug(
647 f"Reset rate limit data for {engine_type} (memory only in programmatic mode)"
648 )
649 return
651 # Skip database operations in CI mode
652 if is_ci_environment():
653 logger.debug(
654 f"Reset rate limit data for {engine_type} (memory only in CI mode)"
655 )
656 return
658 try:
659 context = get_search_context()
660 if not context: 660 ↛ 661line 660 didn't jump to line 661 because the condition on line 660 was never true
661 logger.debug(
662 f"Reset rate limit data for {engine_type} (memory only - no user context)"
663 )
664 return
666 username = context.get("username")
667 password = context.get("user_password")
668 if not username or not password: 668 ↛ 669line 668 didn't jump to line 669 because the condition on line 668 was never true
669 logger.debug(
670 f"Reset rate limit data for {engine_type} (memory only - no credentials)"
671 )
672 return
674 db_imports = _get_db_imports()
675 RateLimitAttempt = db_imports.get("RateLimitAttempt")
676 RateLimitEstimate = db_imports.get("RateLimitEstimate")
678 from ...database.thread_metrics import metrics_writer
680 metrics_writer.set_user_password(username, password)
682 session_reset: "Session"
683 with metrics_writer.get_session(username) as session_reset:
684 # Delete historical attempts
685 session_reset.query(RateLimitAttempt).filter_by(
686 engine_type=engine_type
687 ).delete()
689 # Delete estimates
690 session_reset.query(RateLimitEstimate).filter_by(
691 engine_type=engine_type
692 ).delete()
694 session_reset.commit()
696 logger.info(f"Reset rate limit data for {engine_type}")
698 except Exception:
699 logger.warning(
700 f"Failed to reset rate limit data in database for {engine_type}"
701 "In-memory data was cleared successfully."
702 )
703 # Don't re-raise in test contexts - the memory cleanup is sufficient
705 def get_search_quality_stats(
706 self, engine_type: Optional[str] = None
707 ) -> List[Dict]:
708 """
709 Get basic search quality statistics for monitoring.
711 Args:
712 engine_type: Specific engine to get stats for, or None for all
714 Returns:
715 List of dictionaries with search quality metrics
716 """
717 stats = []
719 with self._cache_lock:
720 engines_to_check = (
721 [engine_type]
722 if engine_type
723 else list(self.recent_attempts.keys())
724 )
725 # Snapshot the data under lock, then process outside
726 engine_attempts = {}
727 for engine in engines_to_check:
728 if engine in self.recent_attempts:
729 engine_attempts[engine] = list(self.recent_attempts[engine])
731 for engine, recent in engine_attempts.items():
732 search_counts = [
733 attempt.get("search_result_count", 0)
734 for attempt in recent
735 if attempt.get("search_result_count") is not None
736 ]
738 if not search_counts:
739 continue
741 recent_avg = sum(search_counts) / len(search_counts)
743 stats.append(
744 {
745 "engine_type": engine,
746 "recent_avg_results": recent_avg,
747 "min_recent_results": min(search_counts),
748 "max_recent_results": max(search_counts),
749 "sample_size": len(search_counts),
750 "total_attempts": len(recent),
751 "status": self._get_quality_status(recent_avg),
752 }
753 )
755 return stats
757 def _get_quality_status(self, recent_avg: float) -> str:
758 """Get quality status string based on average results."""
759 if recent_avg < 1:
760 return "CRITICAL"
761 if recent_avg < 3:
762 return "WARNING"
763 if recent_avg < 5:
764 return "CAUTION"
765 if recent_avg >= 10:
766 return "EXCELLENT"
767 return "GOOD"
769 def cleanup_old_data(self, days: int = 30) -> None:
770 """
771 Remove old retry attempt data to prevent database bloat.
773 Args:
774 days: Remove data older than this many days
775 """
776 cutoff_time = time.time() - (days * 24 * 3600)
778 # Skip database operations in programmatic mode
779 if self.programmatic_mode:
780 logger.debug("Skipping database cleanup in programmatic mode")
781 return
783 # Skip database operations in CI mode
784 if is_ci_environment():
785 logger.debug("Skipping database cleanup in CI mode")
786 return
788 try:
789 context = get_search_context()
790 if not context: 790 ↛ 791line 790 didn't jump to line 791 because the condition on line 790 was never true
791 logger.debug("Skipping database cleanup - no user context")
792 return
794 username = context.get("username")
795 password = context.get("user_password")
796 if not username or not password: 796 ↛ 797line 796 didn't jump to line 797 because the condition on line 796 was never true
797 logger.debug("Skipping database cleanup - no credentials")
798 return
800 db_imports = _get_db_imports()
801 RateLimitAttempt = db_imports.get("RateLimitAttempt")
803 from ...database.thread_metrics import metrics_writer
805 metrics_writer.set_user_password(username, password)
807 session_clean: "Session"
808 with metrics_writer.get_session(username) as session_clean:
809 # Count and delete old attempts
810 old_attempts: Any = session_clean.query(
811 RateLimitAttempt
812 ).filter(RateLimitAttempt.timestamp < cutoff_time)
813 deleted_count = old_attempts.count()
814 old_attempts.delete()
816 session_clean.commit()
818 if deleted_count > 0:
819 logger.info(f"Cleaned up {deleted_count} old retry attempts")
821 except Exception:
822 logger.warning("Failed to cleanup old rate limit data")
825def get_tracker() -> AdaptiveRateLimitTracker:
826 """Create a fresh rate limit tracker instance.
828 Returns a new instance each call so that per-user DB credentials
829 are never cached across requests in a multi-user Flask app.
830 """
831 return AdaptiveRateLimitTracker()