Coverage for src / local_deep_research / metrics / token_counter.py: 86%
465 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""Token counting functionality for LLM usage tracking."""
3import inspect
4import json
5import time
6from datetime import datetime, timedelta, UTC
7from pathlib import Path
8from typing import Any, Dict, List, Optional
10from langchain_core.callbacks import BaseCallbackHandler
11from langchain_core.outputs import LLMResult
12from loguru import logger
13from sqlalchemy import func, text
15from ..database.models import ModelUsage, TokenUsage
16from .query_utils import get_research_mode_condition, get_time_filter_condition
19class TokenCountingCallback(BaseCallbackHandler):
20 """Callback handler for counting tokens across different models."""
22 def __init__(
23 self,
24 research_id: Optional[str] = None,
25 research_context: Optional[Dict[str, Any]] = None,
26 ):
27 """Initialize the token counting callback.
29 Args:
30 research_id: The ID of the research to track tokens for
31 research_context: Additional research context for enhanced tracking
32 """
33 super().__init__()
34 self.research_id = research_id
35 self.research_context = research_context or {}
36 self.current_model = None
37 self.current_provider = None
38 self.preset_model = None # Model name set during callback creation
39 self.preset_provider = None # Provider set during callback creation
41 # Phase 1 Enhancement: Track timing and context
42 self.start_time = None
43 self.response_time_ms = None
44 self.success_status = "success"
45 self.error_type = None
47 # Call stack tracking
48 self.calling_file = None
49 self.calling_function = None
50 self.call_stack = None
52 # Context overflow tracking
53 self.context_limit = None
54 self.context_truncated = False
55 self.tokens_truncated = 0
56 self.truncation_ratio = 0.0
57 self.original_prompt_estimate = 0
59 # Raw Ollama response metrics
60 self.ollama_metrics = {}
62 # Track token counts in memory
63 self.counts = {
64 "total_tokens": 0,
65 "total_prompt_tokens": 0,
66 "total_completion_tokens": 0,
67 "by_model": {},
68 }
70 def on_llm_start(
71 self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
72 ) -> None:
73 """Called when LLM starts running."""
74 # Phase 1 Enhancement: Start timing
75 self.start_time = time.time()
77 # Estimate original prompt size (rough estimate: ~4 chars per token)
78 if prompts:
79 total_chars = sum(len(prompt) for prompt in prompts)
80 self.original_prompt_estimate = total_chars // 4
81 logger.debug(
82 f"Estimated prompt tokens: {self.original_prompt_estimate} (from {total_chars} chars)"
83 )
85 # Get context limit from research context (will be set from settings)
86 self.context_limit = self.research_context.get("context_limit")
88 # Phase 1 Enhancement: Capture call stack information
89 try:
90 stack = inspect.stack()
92 # Skip the first few frames (this method, langchain internals)
93 # Look for the first frame that's in our project directory
94 for frame_info in stack[1:]:
95 file_path = frame_info.filename
96 # Look for any frame containing local_deep_research project
97 if (
98 "local_deep_research" in file_path
99 and "site-packages" not in file_path
100 and "venv" not in file_path
101 ):
102 # Extract relative path from local_deep_research
103 if "src/local_deep_research" in file_path:
104 relative_path = file_path.split(
105 "src/local_deep_research"
106 )[-1].lstrip("/")
107 elif "local_deep_research/src" in file_path: 107 ↛ 111line 107 didn't jump to line 111 because the condition on line 107 was always true
108 relative_path = file_path.split(
109 "local_deep_research/src"
110 )[-1].lstrip("/")
111 elif "local_deep_research" in file_path:
112 # Get everything after local_deep_research
113 relative_path = file_path.split("local_deep_research")[
114 -1
115 ].lstrip("/")
116 else:
117 relative_path = Path(file_path).name
119 self.calling_file = relative_path
120 self.calling_function = frame_info.function
122 # Capture a simplified call stack (just the relevant frames)
123 call_stack_frames = []
124 for frame in stack[1:6]: # Limit to 5 frames
125 if (
126 "local_deep_research" in frame.filename
127 and "site-packages" not in frame.filename
128 and "venv" not in frame.filename
129 ):
130 frame_name = f"{Path(frame.filename).name}:{frame.function}:{frame.lineno}"
131 call_stack_frames.append(frame_name)
133 self.call_stack = (
134 " -> ".join(call_stack_frames)
135 if call_stack_frames
136 else None
137 )
138 break
139 except Exception:
140 logger.warning("Error capturing call stack")
141 # Continue without call stack info if there's an error
143 # First, use preset values if available
144 if self.preset_model:
145 self.current_model = self.preset_model
146 else:
147 # Try multiple locations for model name
148 model_name = None
150 # First check invocation_params
151 invocation_params = kwargs.get("invocation_params", {})
152 model_name = invocation_params.get(
153 "model"
154 ) or invocation_params.get("model_name")
156 # Check kwargs directly
157 if not model_name:
158 model_name = kwargs.get("model") or kwargs.get("model_name")
160 # Check serialized data
161 if not model_name and "kwargs" in serialized:
162 model_name = serialized["kwargs"].get("model") or serialized[
163 "kwargs"
164 ].get("model_name")
166 # Check for name in serialized data
167 if not model_name and "name" in serialized:
168 model_name = serialized["name"]
170 # If still not found and we have Ollama, try to extract from the instance
171 if (
172 not model_name
173 and "_type" in serialized
174 and "ChatOllama" in serialized["_type"]
175 ):
176 # For Ollama, the model name might be in the serialized kwargs
177 if "kwargs" in serialized and "model" in serialized["kwargs"]: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 model_name = serialized["kwargs"]["model"]
179 else:
180 # Default to the type if we can't find the actual model
181 model_name = "ollama"
183 # Final fallback
184 if not model_name:
185 if "_type" in serialized:
186 model_name = serialized["_type"]
187 else:
188 model_name = "unknown"
190 self.current_model = model_name
192 # Use preset provider if available
193 if self.preset_provider:
194 self.current_provider = self.preset_provider
195 else:
196 # Extract provider from serialized type or kwargs
197 if "_type" in serialized:
198 type_str = serialized["_type"]
199 if "ChatOllama" in type_str:
200 self.current_provider = "ollama"
201 elif "ChatOpenAI" in type_str:
202 self.current_provider = "openai"
203 elif "ChatAnthropic" in type_str:
204 self.current_provider = "anthropic"
205 else:
206 self.current_provider = kwargs.get("provider", "unknown")
207 else:
208 self.current_provider = kwargs.get("provider", "unknown")
210 # Initialize model tracking if needed
211 if self.current_model not in self.counts["by_model"]:
212 self.counts["by_model"][self.current_model] = {
213 "prompt_tokens": 0,
214 "completion_tokens": 0,
215 "total_tokens": 0,
216 "calls": 0,
217 "provider": self.current_provider,
218 }
220 # Increment call count
221 self.counts["by_model"][self.current_model]["calls"] += 1
223 def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
224 """Called when LLM ends running."""
225 # Phase 1 Enhancement: Calculate response time
226 if self.start_time:
227 self.response_time_ms = int((time.time() - self.start_time) * 1000)
229 # Extract token usage from response
230 token_usage = None
232 # Check multiple locations for token usage
233 if hasattr(response, "llm_output") and response.llm_output:
234 token_usage = response.llm_output.get(
235 "token_usage"
236 ) or response.llm_output.get("usage", {})
238 # Check for usage metadata in generations (Ollama specific)
239 if not token_usage and hasattr(response, "generations"):
240 for generation_list in response.generations:
241 for generation in generation_list:
242 if hasattr(generation, "message") and hasattr(
243 generation.message, "usage_metadata"
244 ):
245 usage_meta = generation.message.usage_metadata
246 if usage_meta: # Check if usage_metadata is not None
247 token_usage = {
248 "prompt_tokens": usage_meta.get(
249 "input_tokens", 0
250 ),
251 "completion_tokens": usage_meta.get(
252 "output_tokens", 0
253 ),
254 "total_tokens": usage_meta.get(
255 "total_tokens", 0
256 ),
257 }
258 break
259 # Also check response_metadata
260 if hasattr(generation, "message") and hasattr(
261 generation.message, "response_metadata"
262 ):
263 resp_meta = generation.message.response_metadata
264 if resp_meta.get("prompt_eval_count") or resp_meta.get(
265 "eval_count"
266 ):
267 # Capture raw Ollama metrics
268 self.ollama_metrics = {
269 "prompt_eval_count": resp_meta.get(
270 "prompt_eval_count"
271 ),
272 "eval_count": resp_meta.get("eval_count"),
273 "total_duration": resp_meta.get(
274 "total_duration"
275 ),
276 "load_duration": resp_meta.get("load_duration"),
277 "prompt_eval_duration": resp_meta.get(
278 "prompt_eval_duration"
279 ),
280 "eval_duration": resp_meta.get("eval_duration"),
281 }
283 # Check for context overflow
284 prompt_eval_count = resp_meta.get(
285 "prompt_eval_count", 0
286 )
287 if self.context_limit and prompt_eval_count > 0:
288 # Check if we're near or at the context limit
289 if (
290 prompt_eval_count
291 >= self.context_limit * 0.95
292 ): # 95% threshold
293 self.context_truncated = True
295 # Estimate tokens truncated
296 if (
297 self.original_prompt_estimate
298 > prompt_eval_count
299 ):
300 self.tokens_truncated = max(
301 0,
302 self.original_prompt_estimate
303 - prompt_eval_count,
304 )
305 self.truncation_ratio = (
306 self.tokens_truncated
307 / self.original_prompt_estimate
308 if self.original_prompt_estimate > 0
309 else 0
310 )
311 logger.warning(
312 f"Context overflow detected! "
313 f"Prompt tokens: {prompt_eval_count}/{self.context_limit} "
314 f"(estimated {self.tokens_truncated} tokens truncated, "
315 f"{self.truncation_ratio:.1%} of prompt)"
316 )
318 token_usage = {
319 "prompt_tokens": resp_meta.get(
320 "prompt_eval_count", 0
321 ),
322 "completion_tokens": resp_meta.get(
323 "eval_count", 0
324 ),
325 "total_tokens": resp_meta.get(
326 "prompt_eval_count", 0
327 )
328 + resp_meta.get("eval_count", 0),
329 }
330 break
331 if token_usage:
332 break
334 if token_usage and isinstance(token_usage, dict):
335 prompt_tokens = token_usage.get("prompt_tokens", 0)
336 completion_tokens = token_usage.get("completion_tokens", 0)
337 total_tokens = token_usage.get(
338 "total_tokens", prompt_tokens + completion_tokens
339 )
341 # Update in-memory counts
342 self.counts["total_prompt_tokens"] += prompt_tokens
343 self.counts["total_completion_tokens"] += completion_tokens
344 self.counts["total_tokens"] += total_tokens
346 if self.current_model:
347 self.counts["by_model"][self.current_model][
348 "prompt_tokens"
349 ] += prompt_tokens
350 self.counts["by_model"][self.current_model][
351 "completion_tokens"
352 ] += completion_tokens
353 self.counts["by_model"][self.current_model]["total_tokens"] += (
354 total_tokens
355 )
357 # Save to database if we have a research_id
358 if self.research_id:
359 self._save_to_db(prompt_tokens, completion_tokens)
361 def on_llm_error(self, error, **kwargs: Any) -> None:
362 """Called when LLM encounters an error."""
363 # Phase 1 Enhancement: Track errors
364 if self.start_time:
365 self.response_time_ms = int((time.time() - self.start_time) * 1000)
367 self.success_status = "error"
368 self.error_type = str(type(error).__name__)
370 # Still save to database to track failed calls
371 if self.research_id:
372 self._save_to_db(0, 0)
374 def _get_context_overflow_fields(self) -> Dict[str, Any]:
375 """Get context overflow detection fields for database saving."""
376 return {
377 "context_limit": self.context_limit,
378 "context_truncated": self.context_truncated, # Now Boolean
379 "tokens_truncated": self.tokens_truncated
380 if self.context_truncated
381 else None,
382 "truncation_ratio": self.truncation_ratio
383 if self.context_truncated
384 else None,
385 # Raw Ollama metrics
386 "ollama_prompt_eval_count": self.ollama_metrics.get(
387 "prompt_eval_count"
388 ),
389 "ollama_eval_count": self.ollama_metrics.get("eval_count"),
390 "ollama_total_duration": self.ollama_metrics.get("total_duration"),
391 "ollama_load_duration": self.ollama_metrics.get("load_duration"),
392 "ollama_prompt_eval_duration": self.ollama_metrics.get(
393 "prompt_eval_duration"
394 ),
395 "ollama_eval_duration": self.ollama_metrics.get("eval_duration"),
396 }
398 def _save_to_db(self, prompt_tokens: int, completion_tokens: int):
399 """Save token usage to the database."""
400 # Check if we're in a thread - if so, queue the save for later
401 import threading
403 if threading.current_thread().name != "MainThread":
404 # Use thread-safe metrics database for background threads
405 username = (
406 self.research_context.get("username")
407 if self.research_context
408 else None
409 )
411 if not username:
412 logger.warning(
413 f"Cannot save token metrics - no username in research context. "
414 f"Token usage: prompt={prompt_tokens}, completion={completion_tokens}, "
415 f"Research context: {self.research_context}"
416 )
417 return
419 # Import the thread-safe metrics database
421 # Prepare token data
422 token_data = {
423 "model_name": self.current_model,
424 "provider": self.current_provider,
425 "prompt_tokens": prompt_tokens,
426 "completion_tokens": completion_tokens,
427 "research_query": self.research_context.get("research_query"),
428 "research_mode": self.research_context.get("research_mode"),
429 "research_phase": self.research_context.get("research_phase"),
430 "search_iteration": self.research_context.get(
431 "search_iteration"
432 ),
433 "response_time_ms": self.response_time_ms,
434 "success_status": self.success_status,
435 "error_type": self.error_type,
436 "search_engines_planned": self.research_context.get(
437 "search_engines_planned"
438 ),
439 "search_engine_selected": self.research_context.get(
440 "search_engine_selected"
441 ),
442 "calling_file": self.calling_file,
443 "calling_function": self.calling_function,
444 "call_stack": self.call_stack,
445 # Add context overflow fields using helper method
446 **self._get_context_overflow_fields(),
447 }
449 # Convert list to JSON string if needed
450 if isinstance(token_data.get("search_engines_planned"), list):
451 token_data["search_engines_planned"] = json.dumps(
452 token_data["search_engines_planned"]
453 )
455 # Get password from research context
456 password = self.research_context.get("user_password")
457 if not password:
458 logger.warning(
459 f"Cannot save token metrics - no password in research context. "
460 f"Username: {username}, Token usage: prompt={prompt_tokens}, completion={completion_tokens}"
461 )
462 return
464 # Write metrics directly using thread-safe database
465 try:
466 from ..database.thread_metrics import metrics_writer
468 # Set password for this thread
469 metrics_writer.set_user_password(username, password)
471 # Write metrics to encrypted database
472 metrics_writer.write_token_metrics(
473 username, self.research_id, token_data
474 )
475 except Exception:
476 logger.exception("Failed to write metrics from thread")
477 return
479 # In MainThread, save directly
480 try:
481 from flask import session as flask_session
482 from ..database.session_context import get_user_db_session
484 username = flask_session.get("username")
485 if not username:
486 logger.debug("No user session, skipping token metrics save")
487 return
489 with get_user_db_session(username) as session:
490 # Phase 1 Enhancement: Prepare additional context
491 research_query = self.research_context.get("research_query")
492 research_mode = self.research_context.get("research_mode")
493 research_phase = self.research_context.get("research_phase")
494 search_iteration = self.research_context.get("search_iteration")
495 search_engines_planned = self.research_context.get(
496 "search_engines_planned"
497 )
498 search_engine_selected = self.research_context.get(
499 "search_engine_selected"
500 )
502 # Debug logging for search engine context
503 if search_engines_planned or search_engine_selected:
504 logger.info(
505 f"Token tracking - Search context: planned={search_engines_planned}, selected={search_engine_selected}, phase={research_phase}"
506 )
507 else:
508 logger.debug(
509 f"Token tracking - No search engine context yet, phase={research_phase}"
510 )
512 # Convert list to JSON string if needed
513 if isinstance(search_engines_planned, list):
514 search_engines_planned = json.dumps(search_engines_planned)
516 # Log context overflow detection values before saving
517 logger.debug(
518 f"Saving TokenUsage - context_limit: {self.context_limit}, "
519 f"context_truncated: {self.context_truncated}, "
520 f"tokens_truncated: {self.tokens_truncated}, "
521 f"ollama_prompt_eval_count: {self.ollama_metrics.get('prompt_eval_count')}, "
522 f"prompt_tokens: {prompt_tokens}, "
523 f"completion_tokens: {completion_tokens}"
524 )
526 # Add token usage record with enhanced fields
527 token_usage = TokenUsage(
528 research_id=self.research_id,
529 model_name=self.current_model,
530 model_provider=self.current_provider, # Added provider
531 # for accurate cost tracking
532 prompt_tokens=prompt_tokens,
533 completion_tokens=completion_tokens,
534 total_tokens=prompt_tokens + completion_tokens,
535 # Phase 1 Enhancement: Research context
536 research_query=research_query,
537 research_mode=research_mode,
538 research_phase=research_phase,
539 search_iteration=search_iteration,
540 # Phase 1 Enhancement: Performance metrics
541 response_time_ms=self.response_time_ms,
542 success_status=self.success_status,
543 error_type=self.error_type,
544 # Phase 1 Enhancement: Search engine context
545 search_engines_planned=search_engines_planned,
546 search_engine_selected=search_engine_selected,
547 # Phase 1 Enhancement: Call stack tracking
548 calling_file=self.calling_file,
549 calling_function=self.calling_function,
550 call_stack=self.call_stack,
551 # Add context overflow fields using helper method
552 **self._get_context_overflow_fields(),
553 )
554 session.add(token_usage)
556 # Update or create model usage statistics
557 model_usage = (
558 session.query(ModelUsage)
559 .filter_by(
560 model_name=self.current_model,
561 )
562 .first()
563 )
565 if model_usage:
566 model_usage.total_tokens += (
567 prompt_tokens + completion_tokens
568 )
569 model_usage.total_calls += 1
570 else:
571 model_usage = ModelUsage(
572 model_name=self.current_model,
573 model_provider=self.current_provider,
574 total_tokens=prompt_tokens + completion_tokens,
575 total_calls=1,
576 )
577 session.add(model_usage)
579 # Commit the transaction
580 session.commit()
582 except Exception:
583 logger.exception("Error saving token usage to database")
585 def get_counts(self) -> Dict[str, Any]:
586 """Get the current token counts."""
587 return self.counts
590class TokenCounter:
591 """Manager class for token counting across the application."""
593 def __init__(self):
594 """Initialize the token counter."""
596 def create_callback(
597 self,
598 research_id: Optional[str] = None,
599 research_context: Optional[Dict[str, Any]] = None,
600 ) -> TokenCountingCallback:
601 """Create a new token counting callback.
603 Args:
604 research_id: The ID of the research to track tokens for
605 research_context: Additional research context for enhanced tracking
607 Returns:
608 A new TokenCountingCallback instance
609 """
610 return TokenCountingCallback(
611 research_id=research_id, research_context=research_context
612 )
614 def get_research_metrics(self, research_id: str) -> Dict[str, Any]:
615 """Get token metrics for a specific research.
617 Args:
618 research_id: The ID of the research
620 Returns:
621 Dictionary containing token usage metrics
622 """
623 from flask import session as flask_session
625 from ..database.session_context import get_user_db_session
627 username = flask_session.get("username")
628 if not username:
629 return {
630 "research_id": research_id,
631 "total_tokens": 0,
632 "total_calls": 0,
633 "model_usage": [],
634 }
636 with get_user_db_session(username) as session:
637 # Get token usage for this research from TokenUsage table
638 from sqlalchemy import func
640 token_usages = (
641 session.query(
642 TokenUsage.model_name,
643 TokenUsage.model_provider,
644 func.sum(TokenUsage.prompt_tokens).label("prompt_tokens"),
645 func.sum(TokenUsage.completion_tokens).label(
646 "completion_tokens"
647 ),
648 func.sum(TokenUsage.total_tokens).label("total_tokens"),
649 func.count().label("calls"),
650 )
651 .filter_by(research_id=research_id)
652 .group_by(TokenUsage.model_name, TokenUsage.model_provider)
653 .order_by(func.sum(TokenUsage.total_tokens).desc())
654 .all()
655 )
657 model_usage = []
658 total_tokens = 0
659 total_calls = 0
661 for usage in token_usages:
662 model_usage.append(
663 {
664 "model": usage.model_name,
665 "provider": usage.model_provider,
666 "tokens": usage.total_tokens or 0,
667 "calls": usage.calls or 0,
668 "prompt_tokens": usage.prompt_tokens or 0,
669 "completion_tokens": usage.completion_tokens or 0,
670 }
671 )
672 total_tokens += usage.total_tokens or 0
673 total_calls += usage.calls or 0
675 return {
676 "research_id": research_id,
677 "total_tokens": total_tokens,
678 "total_calls": total_calls,
679 "model_usage": model_usage,
680 }
682 def get_overall_metrics(
683 self, period: str = "30d", research_mode: str = "all"
684 ) -> Dict[str, Any]:
685 """Get overall token metrics across all researches.
687 Args:
688 period: Time period to filter by ('7d', '30d', '3m', '1y', 'all')
689 research_mode: Research mode to filter by ('quick', 'detailed', 'all')
691 Returns:
692 Dictionary containing overall metrics
693 """
694 return self._get_metrics_from_encrypted_db(period, research_mode)
696 def _get_metrics_from_encrypted_db(
697 self, period: str, research_mode: str
698 ) -> Dict[str, Any]:
699 """Get metrics from user's encrypted database."""
700 from flask import session as flask_session
702 from ..database.session_context import get_user_db_session
704 username = flask_session.get("username")
705 if not username:
706 return self._get_empty_metrics()
708 try:
709 with get_user_db_session(username) as session:
710 # Build base query with filters
711 query = session.query(TokenUsage)
713 # Apply time filter
714 time_condition = get_time_filter_condition(
715 period, TokenUsage.timestamp
716 )
717 if time_condition is not None:
718 query = query.filter(time_condition)
720 # Apply research mode filter
721 mode_condition = get_research_mode_condition(
722 research_mode, TokenUsage.research_mode
723 )
724 if mode_condition is not None:
725 query = query.filter(mode_condition)
727 # Total tokens from TokenUsage
728 total_tokens = (
729 query.with_entities(
730 func.sum(TokenUsage.total_tokens)
731 ).scalar()
732 or 0
733 )
735 # Import ResearchHistory model
736 from ..database.models.research import ResearchHistory
738 # Count researches from ResearchHistory table
739 research_query = session.query(func.count(ResearchHistory.id))
741 # Debug: Check if any research history records exist at all
742 all_research_count = (
743 session.query(func.count(ResearchHistory.id)).scalar() or 0
744 )
745 logger.debug(
746 f"Total ResearchHistory records in database: {all_research_count}"
747 )
749 # Debug: List first few research IDs and their timestamps
750 sample_researches = (
751 session.query(
752 ResearchHistory.id,
753 ResearchHistory.created_at,
754 ResearchHistory.mode,
755 )
756 .limit(5)
757 .all()
758 )
759 if sample_researches: 759 ↛ 760line 759 didn't jump to line 760 because the condition on line 759 was never true
760 logger.debug("Sample ResearchHistory records:")
761 for r_id, r_created, r_mode in sample_researches:
762 logger.debug(
763 f" - ID: {r_id}, Created: {r_created}, Mode: {r_mode}"
764 )
765 else:
766 logger.debug("No ResearchHistory records found in database")
768 # Get time filter conditions for ResearchHistory query
769 start_time, end_time = None, None
770 if period != "all":
771 if period == "today": 771 ↛ 772line 771 didn't jump to line 772 because the condition on line 771 was never true
772 start_time = datetime.now(UTC).replace(
773 hour=0, minute=0, second=0, microsecond=0
774 )
775 elif period == "week": 775 ↛ 776line 775 didn't jump to line 776 because the condition on line 775 was never true
776 start_time = datetime.now(UTC) - timedelta(days=7)
777 elif period == "month": 777 ↛ 778line 777 didn't jump to line 778 because the condition on line 777 was never true
778 start_time = datetime.now(UTC) - timedelta(days=30)
780 if start_time: 780 ↛ 781line 780 didn't jump to line 781 because the condition on line 780 was never true
781 end_time = datetime.now(UTC)
783 # Apply time filter if specified
784 if start_time and end_time: 784 ↛ 785line 784 didn't jump to line 785 because the condition on line 784 was never true
785 research_query = research_query.filter(
786 ResearchHistory.created_at >= start_time.isoformat(),
787 ResearchHistory.created_at <= end_time.isoformat(),
788 )
790 # Apply mode filter if specified
791 mode_filter = research_mode if research_mode != "all" else None
792 if mode_filter:
793 logger.debug(f"Applying mode filter: {mode_filter}")
794 research_query = research_query.filter(
795 ResearchHistory.mode == mode_filter
796 )
798 total_researches = research_query.scalar() or 0
799 logger.debug(
800 f"Final filtered research count: {total_researches}"
801 )
803 # Also check distinct research_ids in TokenUsage for comparison
804 token_research_count = (
805 session.query(
806 func.count(func.distinct(TokenUsage.research_id))
807 ).scalar()
808 or 0
809 )
810 logger.debug(
811 f"Distinct research_ids in TokenUsage: {token_research_count}"
812 )
814 # Model statistics using ORM aggregation
815 model_stats_query = session.query(
816 TokenUsage.model_name,
817 func.sum(TokenUsage.total_tokens).label("tokens"),
818 func.count().label("calls"),
819 func.sum(TokenUsage.prompt_tokens).label("prompt_tokens"),
820 func.sum(TokenUsage.completion_tokens).label(
821 "completion_tokens"
822 ),
823 ).filter(TokenUsage.model_name.isnot(None))
825 # Apply same filters to model stats
826 if time_condition is not None:
827 model_stats_query = model_stats_query.filter(time_condition)
828 if mode_condition is not None:
829 model_stats_query = model_stats_query.filter(mode_condition)
831 model_stats = (
832 model_stats_query.group_by(TokenUsage.model_name)
833 .order_by(func.sum(TokenUsage.total_tokens).desc())
834 .all()
835 )
837 # Batch load provider info from ModelUsage table (fix N+1)
838 model_names = [stat.model_name for stat in model_stats]
839 provider_map = {}
840 if model_names: 840 ↛ 841line 840 didn't jump to line 841 because the condition on line 840 was never true
841 provider_results = (
842 session.query(
843 ModelUsage.model_name, ModelUsage.model_provider
844 )
845 .filter(ModelUsage.model_name.in_(model_names))
846 .order_by(ModelUsage.id)
847 .all()
848 )
849 for model_name, model_provider in provider_results:
850 provider_map.setdefault(model_name, model_provider)
852 by_model = []
853 for stat in model_stats: 853 ↛ 854line 853 didn't jump to line 854 because the loop on line 853 never started
854 provider = provider_map.get(stat.model_name, "unknown")
856 by_model.append(
857 {
858 "model": stat.model_name,
859 "provider": provider,
860 "tokens": stat.tokens,
861 "calls": stat.calls,
862 "prompt_tokens": stat.prompt_tokens,
863 "completion_tokens": stat.completion_tokens,
864 }
865 )
867 # Get recent researches with token usage
868 # Note: This requires research_history table - for now we'll use available data
869 recent_research_query = session.query(
870 TokenUsage.research_id,
871 func.sum(TokenUsage.total_tokens).label("token_count"),
872 func.max(TokenUsage.timestamp).label("latest_timestamp"),
873 ).filter(TokenUsage.research_id.isnot(None))
875 if time_condition is not None:
876 recent_research_query = recent_research_query.filter(
877 time_condition
878 )
879 if mode_condition is not None:
880 recent_research_query = recent_research_query.filter(
881 mode_condition
882 )
884 recent_research_data = (
885 recent_research_query.group_by(TokenUsage.research_id)
886 .order_by(func.max(TokenUsage.timestamp).desc())
887 .limit(10)
888 .all()
889 )
891 # Batch load research queries for recent researches (fix N+1)
892 recent_research_ids = [
893 r.research_id for r in recent_research_data
894 ]
895 research_query_map = {}
896 if recent_research_ids: 896 ↛ 898line 896 didn't jump to line 898 because the condition on line 896 was never true
897 # Get first non-null research_query for each research_id
898 query_results = (
899 session.query(
900 TokenUsage.research_id, TokenUsage.research_query
901 )
902 .filter(
903 TokenUsage.research_id.in_(recent_research_ids),
904 TokenUsage.research_query.isnot(None),
905 )
906 .order_by(TokenUsage.id)
907 .all()
908 )
909 for research_id, research_query in query_results:
910 if research_id not in research_query_map:
911 research_query_map[research_id] = research_query
913 recent_researches = []
914 for research_data in recent_research_data: 914 ↛ 915line 914 didn't jump to line 915 because the loop on line 914 never started
915 query_text = research_query_map.get(
916 research_data.research_id,
917 f"Research {research_data.research_id}",
918 )
920 recent_researches.append(
921 {
922 "id": research_data.research_id,
923 "query": query_text,
924 "tokens": research_data.token_count or 0,
925 "created_at": research_data.latest_timestamp,
926 }
927 )
929 # Token breakdown statistics
930 breakdown_query = query.with_entities(
931 func.sum(TokenUsage.prompt_tokens).label(
932 "total_input_tokens"
933 ),
934 func.sum(TokenUsage.completion_tokens).label(
935 "total_output_tokens"
936 ),
937 func.avg(TokenUsage.prompt_tokens).label(
938 "avg_input_tokens"
939 ),
940 func.avg(TokenUsage.completion_tokens).label(
941 "avg_output_tokens"
942 ),
943 func.avg(TokenUsage.total_tokens).label("avg_total_tokens"),
944 )
945 token_breakdown = breakdown_query.first()
947 # Get rate limiting metrics
948 from ..database.models import (
949 RateLimitAttempt,
950 RateLimitEstimate,
951 )
953 # Get rate limit attempts
954 rate_limit_query = session.query(RateLimitAttempt)
956 # Apply time filter
957 if time_condition is not None:
958 # RateLimitAttempt uses timestamp as float, not datetime
959 if period == "7d":
960 cutoff_time = time.time() - (7 * 24 * 3600)
961 elif period == "30d": 961 ↛ 963line 961 didn't jump to line 963 because the condition on line 961 was always true
962 cutoff_time = time.time() - (30 * 24 * 3600)
963 elif period == "3m":
964 cutoff_time = time.time() - (90 * 24 * 3600)
965 elif period == "1y":
966 cutoff_time = time.time() - (365 * 24 * 3600)
967 else: # all
968 cutoff_time = 0
970 if cutoff_time > 0: 970 ↛ 976line 970 didn't jump to line 976 because the condition on line 970 was always true
971 rate_limit_query = rate_limit_query.filter(
972 RateLimitAttempt.timestamp >= cutoff_time
973 )
975 # Get rate limit statistics
976 total_attempts = rate_limit_query.count()
977 successful_attempts = rate_limit_query.filter(
978 RateLimitAttempt.success
979 ).count()
980 failed_attempts = total_attempts - successful_attempts
982 # Count rate limiting events (failures with RateLimitError)
983 rate_limit_events = rate_limit_query.filter(
984 ~RateLimitAttempt.success,
985 RateLimitAttempt.error_type == "RateLimitError",
986 ).count()
988 logger.debug(
989 f"Rate limit attempts in database: total={total_attempts}, successful={successful_attempts}"
990 )
992 # Get all attempts for detailed calculations
993 attempts = rate_limit_query.all()
995 # Calculate average wait times
996 if attempts: 996 ↛ 997line 996 didn't jump to line 997 because the condition on line 996 was never true
997 avg_wait_time = sum(a.wait_time for a in attempts) / len(
998 attempts
999 )
1000 successful_wait_times = [
1001 a.wait_time for a in attempts if a.success
1002 ]
1003 avg_successful_wait = (
1004 sum(successful_wait_times) / len(successful_wait_times)
1005 if successful_wait_times
1006 else 0
1007 )
1008 else:
1009 avg_wait_time = 0
1010 avg_successful_wait = 0
1012 # Get tracked engines - count distinct engine types from attempts
1013 tracked_engines_query = session.query(
1014 func.count(func.distinct(RateLimitAttempt.engine_type))
1015 )
1016 if cutoff_time > 0: 1016 ↛ 1020line 1016 didn't jump to line 1020 because the condition on line 1016 was always true
1017 tracked_engines_query = tracked_engines_query.filter(
1018 RateLimitAttempt.timestamp >= cutoff_time
1019 )
1020 tracked_engines = tracked_engines_query.scalar() or 0
1022 # Get engine-specific stats from attempts
1023 engine_stats = []
1025 # Get distinct engine types from attempts
1026 engine_types_query = session.query(
1027 RateLimitAttempt.engine_type
1028 ).distinct()
1029 if cutoff_time > 0: 1029 ↛ 1033line 1029 didn't jump to line 1033 because the condition on line 1029 was always true
1030 engine_types_query = engine_types_query.filter(
1031 RateLimitAttempt.timestamp >= cutoff_time
1032 )
1033 engine_types = [
1034 row.engine_type for row in engine_types_query.all()
1035 ]
1037 # Batch-load all estimates (fix N+1 query)
1038 estimates_map = {}
1039 if engine_types: 1039 ↛ 1040line 1039 didn't jump to line 1040 because the condition on line 1039 was never true
1040 all_estimates = (
1041 session.query(RateLimitEstimate)
1042 .filter(RateLimitEstimate.engine_type.in_(engine_types))
1043 .all()
1044 )
1045 estimates_map = {e.engine_type: e for e in all_estimates}
1047 for engine_type in engine_types: 1047 ↛ 1048line 1047 didn't jump to line 1048 because the loop on line 1047 never started
1048 engine_attempts_list = [
1049 a for a in attempts if a.engine_type == engine_type
1050 ]
1051 engine_attempts = len(engine_attempts_list)
1052 engine_success = len(
1053 [a for a in engine_attempts_list if a.success]
1054 )
1056 # Get estimate if exists
1057 estimate = estimates_map.get(engine_type)
1059 # Calculate recent success rate
1060 recent_success_rate = (
1061 (engine_success / engine_attempts * 100)
1062 if engine_attempts > 0
1063 else 0
1064 )
1066 # Determine status based on success rate
1067 if estimate:
1068 status = (
1069 "healthy"
1070 if estimate.success_rate > 0.8
1071 else "degraded"
1072 if estimate.success_rate > 0.5
1073 else "poor"
1074 )
1075 else:
1076 status = (
1077 "healthy"
1078 if recent_success_rate > 80
1079 else "degraded"
1080 if recent_success_rate > 50
1081 else "poor"
1082 )
1084 engine_stat = {
1085 "engine": engine_type,
1086 "base_wait": estimate.base_wait_seconds
1087 if estimate
1088 else 0.0,
1089 "base_wait_seconds": round(
1090 estimate.base_wait_seconds if estimate else 0.0, 2
1091 ),
1092 "min_wait_seconds": round(
1093 estimate.min_wait_seconds if estimate else 0.0, 2
1094 ),
1095 "max_wait_seconds": round(
1096 estimate.max_wait_seconds if estimate else 0.0, 2
1097 ),
1098 "success_rate": round(estimate.success_rate * 100, 1)
1099 if estimate
1100 else recent_success_rate,
1101 "total_attempts": estimate.total_attempts
1102 if estimate
1103 else engine_attempts,
1104 "recent_attempts": engine_attempts,
1105 "recent_success_rate": round(recent_success_rate, 1),
1106 "attempts": engine_attempts,
1107 "status": status,
1108 }
1110 if estimate:
1111 engine_stat["last_updated"] = datetime.fromtimestamp(
1112 estimate.last_updated
1113 ).strftime("%Y-%m-%d %H:%M:%S")
1114 else:
1115 engine_stat["last_updated"] = "Never"
1117 engine_stats.append(engine_stat)
1119 logger.debug(
1120 f"Tracked engines: {tracked_engines}, engine_stats: {engine_stats}"
1121 )
1123 result = {
1124 "total_tokens": total_tokens,
1125 "total_researches": total_researches,
1126 "by_model": by_model,
1127 "recent_researches": recent_researches,
1128 "token_breakdown": {
1129 "total_input_tokens": int(
1130 token_breakdown.total_input_tokens or 0
1131 ),
1132 "total_output_tokens": int(
1133 token_breakdown.total_output_tokens or 0
1134 ),
1135 "avg_input_tokens": int(
1136 token_breakdown.avg_input_tokens or 0
1137 ),
1138 "avg_output_tokens": int(
1139 token_breakdown.avg_output_tokens or 0
1140 ),
1141 "avg_total_tokens": int(
1142 token_breakdown.avg_total_tokens or 0
1143 ),
1144 },
1145 "rate_limiting": {
1146 "total_attempts": total_attempts,
1147 "successful_attempts": successful_attempts,
1148 "failed_attempts": failed_attempts,
1149 "success_rate": (
1150 successful_attempts / total_attempts * 100
1151 )
1152 if total_attempts > 0
1153 else 0,
1154 "rate_limit_events": rate_limit_events,
1155 "avg_wait_time": round(float(avg_wait_time), 2),
1156 "avg_successful_wait": round(
1157 float(avg_successful_wait), 2
1158 ),
1159 "tracked_engines": tracked_engines,
1160 "engine_stats": engine_stats,
1161 "total_engines_tracked": tracked_engines,
1162 "healthy_engines": len(
1163 [
1164 s
1165 for s in engine_stats
1166 if s["status"] == "healthy"
1167 ]
1168 ),
1169 "degraded_engines": len(
1170 [
1171 s
1172 for s in engine_stats
1173 if s["status"] == "degraded"
1174 ]
1175 ),
1176 "poor_engines": len(
1177 [s for s in engine_stats if s["status"] == "poor"]
1178 ),
1179 },
1180 }
1182 logger.debug(
1183 f"Returning from _get_metrics_from_encrypted_db - total_researches: {result['total_researches']}"
1184 )
1185 return result
1186 except Exception:
1187 logger.exception(
1188 "CRITICAL ERROR accessing encrypted database for metrics"
1189 )
1190 return self._get_empty_metrics()
1192 def _get_empty_metrics(self) -> Dict[str, Any]:
1193 """Return empty metrics structure when no data is available."""
1194 return {
1195 "total_tokens": 0,
1196 "total_researches": 0,
1197 "by_model": [],
1198 "recent_researches": [],
1199 "token_breakdown": {
1200 "prompt_tokens": 0,
1201 "completion_tokens": 0,
1202 "avg_prompt_tokens": 0,
1203 "avg_completion_tokens": 0,
1204 "avg_total_tokens": 0,
1205 },
1206 }
1208 def get_enhanced_metrics(
1209 self, period: str = "30d", research_mode: str = "all"
1210 ) -> Dict[str, Any]:
1211 """Get enhanced Phase 1 tracking metrics.
1213 Args:
1214 period: Time period to filter by ('7d', '30d', '3m', '1y', 'all')
1215 research_mode: Research mode to filter by ('quick', 'detailed', 'all')
1217 Returns:
1218 Dictionary containing enhanced metrics data including time series
1219 """
1220 from flask import session as flask_session
1222 from ..database.session_context import get_user_db_session
1224 username = flask_session.get("username")
1225 if not username:
1226 # Return empty metrics structure when no user session
1227 return {
1228 "recent_enhanced_data": [],
1229 "performance_stats": {
1230 "avg_response_time": 0,
1231 "min_response_time": 0,
1232 "max_response_time": 0,
1233 "success_rate": 0,
1234 "error_rate": 0,
1235 "total_enhanced_calls": 0,
1236 },
1237 "mode_breakdown": [],
1238 "search_engine_stats": [],
1239 "phase_breakdown": [],
1240 "time_series_data": [],
1241 "call_stack_analysis": {
1242 "by_file": [],
1243 "by_function": [],
1244 },
1245 }
1247 try:
1248 with get_user_db_session(username) as session:
1249 # Build base query with filters
1250 query = session.query(TokenUsage)
1252 # Apply time filter
1253 time_condition = get_time_filter_condition(
1254 period, TokenUsage.timestamp
1255 )
1256 if time_condition is not None: 1256 ↛ 1260line 1256 didn't jump to line 1260 because the condition on line 1256 was always true
1257 query = query.filter(time_condition)
1259 # Apply research mode filter
1260 mode_condition = get_research_mode_condition(
1261 research_mode, TokenUsage.research_mode
1262 )
1263 if mode_condition is not None: 1263 ↛ 1264line 1263 didn't jump to line 1264 because the condition on line 1263 was never true
1264 query = query.filter(mode_condition)
1266 # Get time series data for the chart - most important for "Token Consumption Over Time"
1267 time_series_query = query.filter(
1268 TokenUsage.timestamp.isnot(None),
1269 TokenUsage.total_tokens > 0,
1270 ).order_by(TokenUsage.timestamp.asc())
1272 # Limit to recent data for performance
1273 if period != "all": 1273 ↛ 1276line 1273 didn't jump to line 1276 because the condition on line 1273 was always true
1274 time_series_query = time_series_query.limit(200)
1276 time_series_data = time_series_query.all()
1278 # Format time series data with cumulative calculations
1279 time_series = []
1280 cumulative_tokens = 0
1281 cumulative_prompt_tokens = 0
1282 cumulative_completion_tokens = 0
1284 for usage in time_series_data: 1284 ↛ 1285line 1284 didn't jump to line 1285 because the loop on line 1284 never started
1285 cumulative_tokens += usage.total_tokens or 0
1286 cumulative_prompt_tokens += usage.prompt_tokens or 0
1287 cumulative_completion_tokens += usage.completion_tokens or 0
1289 time_series.append(
1290 {
1291 "timestamp": str(usage.timestamp)
1292 if usage.timestamp
1293 else None,
1294 "tokens": usage.total_tokens or 0,
1295 "prompt_tokens": usage.prompt_tokens or 0,
1296 "completion_tokens": usage.completion_tokens or 0,
1297 "cumulative_tokens": cumulative_tokens,
1298 "cumulative_prompt_tokens": cumulative_prompt_tokens,
1299 "cumulative_completion_tokens": cumulative_completion_tokens,
1300 "research_id": usage.research_id,
1301 }
1302 )
1304 # Basic performance stats using ORM
1305 performance_query = query.filter(
1306 TokenUsage.response_time_ms.isnot(None)
1307 )
1308 total_calls = performance_query.count()
1310 if total_calls > 0:
1311 avg_response_time = (
1312 performance_query.with_entities(
1313 func.avg(TokenUsage.response_time_ms)
1314 ).scalar()
1315 or 0
1316 )
1317 min_response_time = (
1318 performance_query.with_entities(
1319 func.min(TokenUsage.response_time_ms)
1320 ).scalar()
1321 or 0
1322 )
1323 max_response_time = (
1324 performance_query.with_entities(
1325 func.max(TokenUsage.response_time_ms)
1326 ).scalar()
1327 or 0
1328 )
1329 success_count = performance_query.filter(
1330 TokenUsage.success_status == "success"
1331 ).count()
1332 error_count = performance_query.filter(
1333 TokenUsage.success_status == "error"
1334 ).count()
1336 perf_stats = {
1337 "avg_response_time": round(avg_response_time),
1338 "min_response_time": min_response_time,
1339 "max_response_time": max_response_time,
1340 "success_rate": (
1341 round((success_count / total_calls * 100), 1)
1342 if total_calls > 0
1343 else 0
1344 ),
1345 "error_rate": (
1346 round((error_count / total_calls * 100), 1)
1347 if total_calls > 0
1348 else 0
1349 ),
1350 "total_enhanced_calls": total_calls,
1351 }
1352 else:
1353 perf_stats = {
1354 "avg_response_time": 0,
1355 "min_response_time": 0,
1356 "max_response_time": 0,
1357 "success_rate": 0,
1358 "error_rate": 0,
1359 "total_enhanced_calls": 0,
1360 }
1362 # Research mode breakdown using ORM
1363 mode_stats = (
1364 query.filter(TokenUsage.research_mode.isnot(None))
1365 .with_entities(
1366 TokenUsage.research_mode,
1367 func.count().label("count"),
1368 func.avg(TokenUsage.total_tokens).label("avg_tokens"),
1369 func.avg(TokenUsage.response_time_ms).label(
1370 "avg_response_time"
1371 ),
1372 )
1373 .group_by(TokenUsage.research_mode)
1374 .all()
1375 )
1377 modes = [
1378 {
1379 "mode": stat.research_mode,
1380 "count": stat.count,
1381 "avg_tokens": round(stat.avg_tokens or 0),
1382 "avg_response_time": round(stat.avg_response_time or 0),
1383 }
1384 for stat in mode_stats
1385 ]
1387 # Recent enhanced data (simplified)
1388 recent_enhanced_query = (
1389 query.filter(TokenUsage.research_query.isnot(None))
1390 .order_by(TokenUsage.timestamp.desc())
1391 .limit(50)
1392 )
1394 recent_enhanced_data = recent_enhanced_query.all()
1395 recent_enhanced = [
1396 {
1397 "research_query": usage.research_query,
1398 "research_mode": usage.research_mode,
1399 "research_phase": usage.research_phase,
1400 "search_iteration": usage.search_iteration,
1401 "response_time_ms": usage.response_time_ms,
1402 "success_status": usage.success_status,
1403 "error_type": usage.error_type,
1404 "search_engines_planned": usage.search_engines_planned,
1405 "search_engine_selected": usage.search_engine_selected,
1406 "total_tokens": usage.total_tokens,
1407 "prompt_tokens": usage.prompt_tokens,
1408 "completion_tokens": usage.completion_tokens,
1409 "timestamp": str(usage.timestamp)
1410 if usage.timestamp
1411 else None,
1412 "research_id": usage.research_id,
1413 "calling_file": usage.calling_file,
1414 "calling_function": usage.calling_function,
1415 "call_stack": usage.call_stack,
1416 }
1417 for usage in recent_enhanced_data
1418 ]
1420 # Search engine breakdown using ORM
1421 search_engine_stats = (
1422 query.filter(TokenUsage.search_engine_selected.isnot(None))
1423 .with_entities(
1424 TokenUsage.search_engine_selected,
1425 func.count().label("count"),
1426 func.avg(TokenUsage.total_tokens).label("avg_tokens"),
1427 func.avg(TokenUsage.response_time_ms).label(
1428 "avg_response_time"
1429 ),
1430 )
1431 .group_by(TokenUsage.search_engine_selected)
1432 .all()
1433 )
1435 search_engines = [
1436 {
1437 "search_engine": stat.search_engine_selected,
1438 "count": stat.count,
1439 "avg_tokens": round(stat.avg_tokens or 0),
1440 "avg_response_time": round(stat.avg_response_time or 0),
1441 }
1442 for stat in search_engine_stats
1443 ]
1445 # Research phase breakdown using ORM
1446 phase_stats = (
1447 query.filter(TokenUsage.research_phase.isnot(None))
1448 .with_entities(
1449 TokenUsage.research_phase,
1450 func.count().label("count"),
1451 func.avg(TokenUsage.total_tokens).label("avg_tokens"),
1452 func.avg(TokenUsage.response_time_ms).label(
1453 "avg_response_time"
1454 ),
1455 )
1456 .group_by(TokenUsage.research_phase)
1457 .all()
1458 )
1460 phases = [
1461 {
1462 "phase": stat.research_phase,
1463 "count": stat.count,
1464 "avg_tokens": round(stat.avg_tokens or 0),
1465 "avg_response_time": round(stat.avg_response_time or 0),
1466 }
1467 for stat in phase_stats
1468 ]
1470 # Call stack analysis using ORM
1471 file_stats = (
1472 query.filter(TokenUsage.calling_file.isnot(None))
1473 .with_entities(
1474 TokenUsage.calling_file,
1475 func.count().label("count"),
1476 func.avg(TokenUsage.total_tokens).label("avg_tokens"),
1477 )
1478 .group_by(TokenUsage.calling_file)
1479 .order_by(func.count().desc())
1480 .limit(10)
1481 .all()
1482 )
1484 files = [
1485 {
1486 "file": stat.calling_file,
1487 "count": stat.count,
1488 "avg_tokens": round(stat.avg_tokens or 0),
1489 }
1490 for stat in file_stats
1491 ]
1493 function_stats = (
1494 query.filter(TokenUsage.calling_function.isnot(None))
1495 .with_entities(
1496 TokenUsage.calling_function,
1497 func.count().label("count"),
1498 func.avg(TokenUsage.total_tokens).label("avg_tokens"),
1499 )
1500 .group_by(TokenUsage.calling_function)
1501 .order_by(func.count().desc())
1502 .limit(10)
1503 .all()
1504 )
1506 functions = [
1507 {
1508 "function": stat.calling_function,
1509 "count": stat.count,
1510 "avg_tokens": round(stat.avg_tokens or 0),
1511 }
1512 for stat in function_stats
1513 ]
1515 return {
1516 "recent_enhanced_data": recent_enhanced,
1517 "performance_stats": perf_stats,
1518 "mode_breakdown": modes,
1519 "search_engine_stats": search_engines,
1520 "phase_breakdown": phases,
1521 "time_series_data": time_series,
1522 "call_stack_analysis": {
1523 "by_file": files,
1524 "by_function": functions,
1525 },
1526 }
1527 except Exception:
1528 logger.exception("Error in get_enhanced_metrics")
1529 # Return simplified response without non-existent columns
1530 return {
1531 "recent_enhanced_data": [],
1532 "performance_stats": {
1533 "avg_response_time": 0,
1534 "min_response_time": 0,
1535 "max_response_time": 0,
1536 "success_rate": 0,
1537 "error_rate": 0,
1538 "total_enhanced_calls": 0,
1539 },
1540 "mode_breakdown": [],
1541 "search_engine_stats": [],
1542 "phase_breakdown": [],
1543 "time_series_data": [],
1544 "call_stack_analysis": {
1545 "by_file": [],
1546 "by_function": [],
1547 },
1548 }
1550 def get_research_timeline_metrics(self, research_id: str) -> Dict[str, Any]:
1551 """Get timeline metrics for a specific research.
1553 Args:
1554 research_id: The ID of the research
1556 Returns:
1557 Dictionary containing timeline metrics for the research
1558 """
1559 from flask import session as flask_session
1561 from ..database.session_context import get_user_db_session
1563 username = flask_session.get("username")
1564 if not username:
1565 return {
1566 "research_id": research_id,
1567 "research_details": {},
1568 "timeline": [],
1569 "summary": {
1570 "total_calls": 0,
1571 "total_tokens": 0,
1572 "total_prompt_tokens": 0,
1573 "total_completion_tokens": 0,
1574 "avg_response_time": 0,
1575 "success_rate": 0,
1576 },
1577 "phase_stats": {},
1578 }
1580 with get_user_db_session(username) as session:
1581 # Get all token usage for this research ordered by time including call stack
1582 timeline_data = session.execute(
1583 text(
1584 """
1585 SELECT
1586 timestamp,
1587 total_tokens,
1588 prompt_tokens,
1589 completion_tokens,
1590 response_time_ms,
1591 success_status,
1592 error_type,
1593 research_phase,
1594 search_iteration,
1595 search_engine_selected,
1596 model_name,
1597 calling_file,
1598 calling_function,
1599 call_stack
1600 FROM token_usage
1601 WHERE research_id = :research_id
1602 ORDER BY timestamp ASC
1603 """
1604 ),
1605 {"research_id": research_id},
1606 ).fetchall()
1608 # Format timeline data with cumulative tokens
1609 timeline = []
1610 cumulative_tokens = 0
1611 cumulative_prompt_tokens = 0
1612 cumulative_completion_tokens = 0
1614 for row in timeline_data:
1615 cumulative_tokens += row[1] or 0
1616 cumulative_prompt_tokens += row[2] or 0
1617 cumulative_completion_tokens += row[3] or 0
1619 timeline.append(
1620 {
1621 "timestamp": str(row[0]) if row[0] else None,
1622 "tokens": row[1] or 0,
1623 "prompt_tokens": row[2] or 0,
1624 "completion_tokens": row[3] or 0,
1625 "cumulative_tokens": cumulative_tokens,
1626 "cumulative_prompt_tokens": cumulative_prompt_tokens,
1627 "cumulative_completion_tokens": cumulative_completion_tokens,
1628 "response_time_ms": row[4],
1629 "success_status": row[5],
1630 "error_type": row[6],
1631 "research_phase": row[7],
1632 "search_iteration": row[8],
1633 "search_engine_selected": row[9],
1634 "model_name": row[10],
1635 "calling_file": row[11],
1636 "calling_function": row[12],
1637 "call_stack": row[13],
1638 }
1639 )
1641 # Get research basic info
1642 research_info = session.execute(
1643 text(
1644 """
1645 SELECT query, mode, status, created_at, completed_at
1646 FROM research_history
1647 WHERE id = :research_id
1648 """
1649 ),
1650 {"research_id": research_id},
1651 ).fetchone()
1653 research_details = {}
1654 if research_info:
1655 research_details = {
1656 "query": research_info[0],
1657 "mode": research_info[1],
1658 "status": research_info[2],
1659 "created_at": str(research_info[3])
1660 if research_info[3]
1661 else None,
1662 "completed_at": str(research_info[4])
1663 if research_info[4]
1664 else None,
1665 }
1667 # Calculate summary stats
1668 total_calls = len(timeline_data)
1669 total_tokens = cumulative_tokens
1670 avg_response_time = sum(row[4] or 0 for row in timeline_data) / max(
1671 total_calls, 1
1672 )
1673 success_rate = (
1674 sum(1 for row in timeline_data if row[5] == "success")
1675 / max(total_calls, 1)
1676 * 100
1677 )
1679 # Phase breakdown for this research
1680 phase_stats = {}
1681 for row in timeline_data:
1682 phase = row[7] or "unknown"
1683 if phase not in phase_stats:
1684 phase_stats[phase] = {
1685 "count": 0,
1686 "tokens": 0,
1687 "avg_response_time": 0,
1688 }
1689 phase_stats[phase]["count"] += 1
1690 phase_stats[phase]["tokens"] += row[1] or 0
1691 if row[4]: 1691 ↛ 1681line 1691 didn't jump to line 1681 because the condition on line 1691 was always true
1692 phase_stats[phase]["avg_response_time"] += row[4]
1694 # Calculate averages for phases
1695 for phase in phase_stats:
1696 if phase_stats[phase]["count"] > 0: 1696 ↛ 1695line 1696 didn't jump to line 1695 because the condition on line 1696 was always true
1697 phase_stats[phase]["avg_response_time"] = round(
1698 phase_stats[phase]["avg_response_time"]
1699 / phase_stats[phase]["count"]
1700 )
1702 return {
1703 "research_id": research_id,
1704 "research_details": research_details,
1705 "timeline": timeline,
1706 "summary": {
1707 "total_calls": total_calls,
1708 "total_tokens": total_tokens,
1709 "total_prompt_tokens": cumulative_prompt_tokens,
1710 "total_completion_tokens": cumulative_completion_tokens,
1711 "avg_response_time": round(avg_response_time),
1712 "success_rate": round(success_rate, 1),
1713 },
1714 "phase_stats": phase_stats,
1715 }