Coverage for src / local_deep_research / metrics / token_counter.py: 59%
506 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""Token counting functionality for LLM usage tracking."""
3import inspect
4import json
5import time
6from datetime import datetime, timedelta, UTC
7from pathlib import Path
8from typing import Any, Dict, List, Optional
10from langchain_core.callbacks import BaseCallbackHandler
11from langchain_core.outputs import LLMResult
12from loguru import logger
13from sqlalchemy import func, text
15from ..database.models import ModelUsage, TokenUsage
16from .query_utils import get_research_mode_condition, get_time_filter_condition
19class TokenCountingCallback(BaseCallbackHandler):
20 """Callback handler for counting tokens across different models."""
22 def __init__(
23 self,
24 research_id: Optional[str] = None,
25 research_context: Optional[Dict[str, Any]] = None,
26 ):
27 """Initialize the token counting callback.
29 Args:
30 research_id: The ID of the research to track tokens for
31 research_context: Additional research context for enhanced tracking
32 """
33 super().__init__()
34 self.research_id = research_id
35 self.research_context = research_context or {}
36 self.current_model = None
37 self.current_provider = None
38 self.preset_model = None # Model name set during callback creation
39 self.preset_provider = None # Provider set during callback creation
41 # Phase 1 Enhancement: Track timing and context
42 self.start_time = None
43 self.response_time_ms = None
44 self.success_status = "success"
45 self.error_type = None
47 # Call stack tracking
48 self.calling_file = None
49 self.calling_function = None
50 self.call_stack = None
52 # Context overflow tracking
53 self.context_limit = None
54 self.context_truncated = False
55 self.tokens_truncated = 0
56 self.truncation_ratio = 0.0
57 self.original_prompt_estimate = 0
59 # Raw Ollama response metrics
60 self.ollama_metrics = {}
62 # Track token counts in memory
63 self.counts = {
64 "total_tokens": 0,
65 "total_prompt_tokens": 0,
66 "total_completion_tokens": 0,
67 "by_model": {},
68 }
70 def on_llm_start(
71 self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
72 ) -> None:
73 """Called when LLM starts running."""
74 # Phase 1 Enhancement: Start timing
75 self.start_time = time.time()
77 # Estimate original prompt size (rough estimate: ~4 chars per token)
78 if prompts: 78 ↛ 86line 78 didn't jump to line 86 because the condition on line 78 was always true
79 total_chars = sum(len(prompt) for prompt in prompts)
80 self.original_prompt_estimate = total_chars // 4
81 logger.debug(
82 f"Estimated prompt tokens: {self.original_prompt_estimate} (from {total_chars} chars)"
83 )
85 # Get context limit from research context (will be set from settings)
86 self.context_limit = self.research_context.get("context_limit")
88 # Phase 1 Enhancement: Capture call stack information
89 try:
90 stack = inspect.stack()
92 # Skip the first few frames (this method, langchain internals)
93 # Look for the first frame that's in our project directory
94 for frame_info in stack[1:]:
95 file_path = frame_info.filename
96 # Look for any frame containing local_deep_research project
97 if ( 97 ↛ 103line 97 didn't jump to line 103 because the condition on line 97 was never true
98 "local_deep_research" in file_path
99 and "site-packages" not in file_path
100 and "venv" not in file_path
101 ):
102 # Extract relative path from local_deep_research
103 if "src/local_deep_research" in file_path:
104 relative_path = file_path.split(
105 "src/local_deep_research"
106 )[-1].lstrip("/")
107 elif "local_deep_research/src" in file_path:
108 relative_path = file_path.split(
109 "local_deep_research/src"
110 )[-1].lstrip("/")
111 elif "local_deep_research" in file_path:
112 # Get everything after local_deep_research
113 relative_path = file_path.split("local_deep_research")[
114 -1
115 ].lstrip("/")
116 else:
117 relative_path = Path(file_path).name
119 self.calling_file = relative_path
120 self.calling_function = frame_info.function
122 # Capture a simplified call stack (just the relevant frames)
123 call_stack_frames = []
124 for frame in stack[1:6]: # Limit to 5 frames
125 if (
126 "local_deep_research" in frame.filename
127 and "site-packages" not in frame.filename
128 and "venv" not in frame.filename
129 ):
130 frame_name = f"{Path(frame.filename).name}:{frame.function}:{frame.lineno}"
131 call_stack_frames.append(frame_name)
133 self.call_stack = (
134 " -> ".join(call_stack_frames)
135 if call_stack_frames
136 else None
137 )
138 break
139 except Exception as e:
140 logger.debug(f"Error capturing call stack: {e}")
141 # Continue without call stack info if there's an error
143 # Debug logging removed to reduce log clutter
144 # Uncomment below if you need to debug token counting
145 # logger.debug(f"on_llm_start serialized: {serialized}")
146 # logger.debug(f"on_llm_start kwargs keys: {list(kwargs.keys()) if kwargs else []}")
148 # First, use preset values if available
149 if self.preset_model:
150 self.current_model = self.preset_model
151 else:
152 # Try multiple locations for model name
153 model_name = None
155 # First check invocation_params
156 invocation_params = kwargs.get("invocation_params", {})
157 model_name = invocation_params.get(
158 "model"
159 ) or invocation_params.get("model_name")
161 # Check kwargs directly
162 if not model_name:
163 model_name = kwargs.get("model") or kwargs.get("model_name")
165 # Check serialized data
166 if not model_name and "kwargs" in serialized:
167 model_name = serialized["kwargs"].get("model") or serialized[
168 "kwargs"
169 ].get("model_name")
171 # Check for name in serialized data
172 if not model_name and "name" in serialized: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 model_name = serialized["name"]
175 # If still not found and we have Ollama, try to extract from the instance
176 if (
177 not model_name
178 and "_type" in serialized
179 and "ChatOllama" in serialized["_type"]
180 ):
181 # For Ollama, the model name might be in the serialized kwargs
182 if "kwargs" in serialized and "model" in serialized["kwargs"]: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 model_name = serialized["kwargs"]["model"]
184 else:
185 # Default to the type if we can't find the actual model
186 model_name = "ollama"
188 # Final fallback
189 if not model_name:
190 if "_type" in serialized:
191 model_name = serialized["_type"]
192 else:
193 model_name = "unknown"
195 self.current_model = model_name
197 # Use preset provider if available
198 if self.preset_provider:
199 self.current_provider = self.preset_provider
200 else:
201 # Extract provider from serialized type or kwargs
202 if "_type" in serialized:
203 type_str = serialized["_type"]
204 if "ChatOllama" in type_str:
205 self.current_provider = "ollama"
206 elif "ChatOpenAI" in type_str:
207 self.current_provider = "openai"
208 elif "ChatAnthropic" in type_str: 208 ↛ 211line 208 didn't jump to line 211 because the condition on line 208 was always true
209 self.current_provider = "anthropic"
210 else:
211 self.current_provider = kwargs.get("provider", "unknown")
212 else:
213 self.current_provider = kwargs.get("provider", "unknown")
215 # Initialize model tracking if needed
216 if self.current_model not in self.counts["by_model"]:
217 self.counts["by_model"][self.current_model] = {
218 "prompt_tokens": 0,
219 "completion_tokens": 0,
220 "total_tokens": 0,
221 "calls": 0,
222 "provider": self.current_provider,
223 }
225 # Increment call count
226 self.counts["by_model"][self.current_model]["calls"] += 1
228 def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
229 """Called when LLM ends running."""
230 # Phase 1 Enhancement: Calculate response time
231 if self.start_time:
232 self.response_time_ms = int((time.time() - self.start_time) * 1000)
234 # Extract token usage from response
235 token_usage = None
237 # Check multiple locations for token usage
238 if hasattr(response, "llm_output") and response.llm_output:
239 token_usage = response.llm_output.get(
240 "token_usage"
241 ) or response.llm_output.get("usage", {})
243 # Check for usage metadata in generations (Ollama specific)
244 if not token_usage and hasattr(response, "generations"):
245 for generation_list in response.generations: 245 ↛ 339line 245 didn't jump to line 339 because the loop on line 245 didn't complete
246 for generation in generation_list: 246 ↛ 336line 246 didn't jump to line 336 because the loop on line 246 didn't complete
247 if hasattr(generation, "message") and hasattr( 247 ↛ 265line 247 didn't jump to line 265 because the condition on line 247 was always true
248 generation.message, "usage_metadata"
249 ):
250 usage_meta = generation.message.usage_metadata
251 if usage_meta: # Check if usage_metadata is not None
252 token_usage = {
253 "prompt_tokens": usage_meta.get(
254 "input_tokens", 0
255 ),
256 "completion_tokens": usage_meta.get(
257 "output_tokens", 0
258 ),
259 "total_tokens": usage_meta.get(
260 "total_tokens", 0
261 ),
262 }
263 break
264 # Also check response_metadata
265 if hasattr(generation, "message") and hasattr( 265 ↛ 246line 265 didn't jump to line 246 because the condition on line 265 was always true
266 generation.message, "response_metadata"
267 ):
268 resp_meta = generation.message.response_metadata
269 if resp_meta.get("prompt_eval_count") or resp_meta.get( 269 ↛ 246line 269 didn't jump to line 246 because the condition on line 269 was always true
270 "eval_count"
271 ):
272 # Capture raw Ollama metrics
273 self.ollama_metrics = {
274 "prompt_eval_count": resp_meta.get(
275 "prompt_eval_count"
276 ),
277 "eval_count": resp_meta.get("eval_count"),
278 "total_duration": resp_meta.get(
279 "total_duration"
280 ),
281 "load_duration": resp_meta.get("load_duration"),
282 "prompt_eval_duration": resp_meta.get(
283 "prompt_eval_duration"
284 ),
285 "eval_duration": resp_meta.get("eval_duration"),
286 }
288 # Check for context overflow
289 prompt_eval_count = resp_meta.get(
290 "prompt_eval_count", 0
291 )
292 if self.context_limit and prompt_eval_count > 0:
293 # Check if we're near or at the context limit
294 if (
295 prompt_eval_count
296 >= self.context_limit * 0.95
297 ): # 95% threshold
298 self.context_truncated = True
300 # Estimate tokens truncated
301 if ( 301 ↛ 323line 301 didn't jump to line 323 because the condition on line 301 was always true
302 self.original_prompt_estimate
303 > prompt_eval_count
304 ):
305 self.tokens_truncated = max(
306 0,
307 self.original_prompt_estimate
308 - prompt_eval_count,
309 )
310 self.truncation_ratio = (
311 self.tokens_truncated
312 / self.original_prompt_estimate
313 if self.original_prompt_estimate > 0
314 else 0
315 )
316 logger.warning(
317 f"Context overflow detected! "
318 f"Prompt tokens: {prompt_eval_count}/{self.context_limit} "
319 f"(estimated {self.tokens_truncated} tokens truncated, "
320 f"{self.truncation_ratio:.1%} of prompt)"
321 )
323 token_usage = {
324 "prompt_tokens": resp_meta.get(
325 "prompt_eval_count", 0
326 ),
327 "completion_tokens": resp_meta.get(
328 "eval_count", 0
329 ),
330 "total_tokens": resp_meta.get(
331 "prompt_eval_count", 0
332 )
333 + resp_meta.get("eval_count", 0),
334 }
335 break
336 if token_usage: 336 ↛ 245line 336 didn't jump to line 245 because the condition on line 336 was always true
337 break
339 if token_usage and isinstance(token_usage, dict): 339 ↛ exitline 339 didn't return from function 'on_llm_end' because the condition on line 339 was always true
340 prompt_tokens = token_usage.get("prompt_tokens", 0)
341 completion_tokens = token_usage.get("completion_tokens", 0)
342 total_tokens = token_usage.get(
343 "total_tokens", prompt_tokens + completion_tokens
344 )
346 # Update in-memory counts
347 self.counts["total_prompt_tokens"] += prompt_tokens
348 self.counts["total_completion_tokens"] += completion_tokens
349 self.counts["total_tokens"] += total_tokens
351 if self.current_model:
352 self.counts["by_model"][self.current_model][
353 "prompt_tokens"
354 ] += prompt_tokens
355 self.counts["by_model"][self.current_model][
356 "completion_tokens"
357 ] += completion_tokens
358 self.counts["by_model"][self.current_model]["total_tokens"] += (
359 total_tokens
360 )
362 # Save to database if we have a research_id
363 if self.research_id:
364 self._save_to_db(prompt_tokens, completion_tokens)
366 def on_llm_error(self, error, **kwargs: Any) -> None:
367 """Called when LLM encounters an error."""
368 # Phase 1 Enhancement: Track errors
369 if self.start_time: 369 ↛ 372line 369 didn't jump to line 372 because the condition on line 369 was always true
370 self.response_time_ms = int((time.time() - self.start_time) * 1000)
372 self.success_status = "error"
373 self.error_type = str(type(error).__name__)
375 # Still save to database to track failed calls
376 if self.research_id: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true
377 self._save_to_db(0, 0)
379 def _get_context_overflow_fields(self) -> Dict[str, Any]:
380 """Get context overflow detection fields for database saving."""
381 return {
382 "context_limit": self.context_limit,
383 "context_truncated": self.context_truncated, # Now Boolean
384 "tokens_truncated": self.tokens_truncated
385 if self.context_truncated
386 else None,
387 "truncation_ratio": self.truncation_ratio
388 if self.context_truncated
389 else None,
390 # Raw Ollama metrics
391 "ollama_prompt_eval_count": self.ollama_metrics.get(
392 "prompt_eval_count"
393 ),
394 "ollama_eval_count": self.ollama_metrics.get("eval_count"),
395 "ollama_total_duration": self.ollama_metrics.get("total_duration"),
396 "ollama_load_duration": self.ollama_metrics.get("load_duration"),
397 "ollama_prompt_eval_duration": self.ollama_metrics.get(
398 "prompt_eval_duration"
399 ),
400 "ollama_eval_duration": self.ollama_metrics.get("eval_duration"),
401 }
403 def _save_to_db(self, prompt_tokens: int, completion_tokens: int):
404 """Save token usage to the database."""
405 # Check if we're in a thread - if so, queue the save for later
406 import threading
408 if threading.current_thread().name != "MainThread":
409 # Use thread-safe metrics database for background threads
410 username = (
411 self.research_context.get("username")
412 if self.research_context
413 else None
414 )
416 if not username:
417 logger.warning(
418 f"Cannot save token metrics - no username in research context. "
419 f"Token usage: prompt={prompt_tokens}, completion={completion_tokens}, "
420 f"Research context: {self.research_context}"
421 )
422 return
424 # Import the thread-safe metrics database
426 # Prepare token data
427 token_data = {
428 "model_name": self.current_model,
429 "provider": self.current_provider,
430 "prompt_tokens": prompt_tokens,
431 "completion_tokens": completion_tokens,
432 "research_query": self.research_context.get("research_query"),
433 "research_mode": self.research_context.get("research_mode"),
434 "research_phase": self.research_context.get("research_phase"),
435 "search_iteration": self.research_context.get(
436 "search_iteration"
437 ),
438 "response_time_ms": self.response_time_ms,
439 "success_status": self.success_status,
440 "error_type": self.error_type,
441 "search_engines_planned": self.research_context.get(
442 "search_engines_planned"
443 ),
444 "search_engine_selected": self.research_context.get(
445 "search_engine_selected"
446 ),
447 "calling_file": self.calling_file,
448 "calling_function": self.calling_function,
449 "call_stack": self.call_stack,
450 # Add context overflow fields using helper method
451 **self._get_context_overflow_fields(),
452 }
454 # Convert list to JSON string if needed
455 if isinstance(token_data.get("search_engines_planned"), list): 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true
456 token_data["search_engines_planned"] = json.dumps(
457 token_data["search_engines_planned"]
458 )
460 # Get password from research context
461 password = self.research_context.get("user_password")
462 if not password: 462 ↛ 463line 462 didn't jump to line 463 because the condition on line 462 was never true
463 logger.warning(
464 f"Cannot save token metrics - no password in research context. "
465 f"Username: {username}, Token usage: prompt={prompt_tokens}, completion={completion_tokens}"
466 )
467 return
469 # Write metrics directly using thread-safe database
470 try:
471 from ..database.thread_metrics import metrics_writer
473 # Set password for this thread
474 metrics_writer.set_user_password(username, password)
476 # Write metrics to encrypted database
477 metrics_writer.write_token_metrics(
478 username, self.research_id, token_data
479 )
480 except Exception:
481 logger.exception("Failed to write metrics from thread")
482 return
484 # In MainThread, save directly
485 try:
486 from flask import session as flask_session
487 from ..database.session_context import get_user_db_session
489 username = flask_session.get("username")
490 if not username:
491 logger.debug("No user session, skipping token metrics save")
492 return
494 with get_user_db_session(username) as session:
495 # Phase 1 Enhancement: Prepare additional context
496 research_query = self.research_context.get("research_query")
497 research_mode = self.research_context.get("research_mode")
498 research_phase = self.research_context.get("research_phase")
499 search_iteration = self.research_context.get("search_iteration")
500 search_engines_planned = self.research_context.get(
501 "search_engines_planned"
502 )
503 search_engine_selected = self.research_context.get(
504 "search_engine_selected"
505 )
507 # Debug logging for search engine context
508 if search_engines_planned or search_engine_selected:
509 logger.info(
510 f"Token tracking - Search context: planned={search_engines_planned}, selected={search_engine_selected}, phase={research_phase}"
511 )
512 else:
513 logger.debug(
514 f"Token tracking - No search engine context yet, phase={research_phase}"
515 )
517 # Convert list to JSON string if needed
518 if isinstance(search_engines_planned, list):
519 search_engines_planned = json.dumps(search_engines_planned)
521 # Log context overflow detection values before saving
522 logger.debug(
523 f"Saving TokenUsage - context_limit: {self.context_limit}, "
524 f"context_truncated: {self.context_truncated}, "
525 f"tokens_truncated: {self.tokens_truncated}, "
526 f"ollama_prompt_eval_count: {self.ollama_metrics.get('prompt_eval_count')}, "
527 f"prompt_tokens: {prompt_tokens}, "
528 f"completion_tokens: {completion_tokens}"
529 )
531 # Add token usage record with enhanced fields
532 token_usage = TokenUsage(
533 research_id=self.research_id,
534 model_name=self.current_model,
535 model_provider=self.current_provider, # Added provider
536 # for accurate cost tracking
537 prompt_tokens=prompt_tokens,
538 completion_tokens=completion_tokens,
539 total_tokens=prompt_tokens + completion_tokens,
540 # Phase 1 Enhancement: Research context
541 research_query=research_query,
542 research_mode=research_mode,
543 research_phase=research_phase,
544 search_iteration=search_iteration,
545 # Phase 1 Enhancement: Performance metrics
546 response_time_ms=self.response_time_ms,
547 success_status=self.success_status,
548 error_type=self.error_type,
549 # Phase 1 Enhancement: Search engine context
550 search_engines_planned=search_engines_planned,
551 search_engine_selected=search_engine_selected,
552 # Phase 1 Enhancement: Call stack tracking
553 calling_file=self.calling_file,
554 calling_function=self.calling_function,
555 call_stack=self.call_stack,
556 # Add context overflow fields using helper method
557 **self._get_context_overflow_fields(),
558 )
559 session.add(token_usage)
561 # Update or create model usage statistics
562 model_usage = (
563 session.query(ModelUsage)
564 .filter_by(
565 model_name=self.current_model,
566 )
567 .first()
568 )
570 if model_usage:
571 model_usage.total_tokens += (
572 prompt_tokens + completion_tokens
573 )
574 model_usage.total_calls += 1
575 else:
576 model_usage = ModelUsage(
577 model_name=self.current_model,
578 model_provider=self.current_provider,
579 total_tokens=prompt_tokens + completion_tokens,
580 total_calls=1,
581 )
582 session.add(model_usage)
584 # Commit the transaction
585 session.commit()
587 except Exception:
588 logger.exception("Error saving token usage to database")
590 def get_counts(self) -> Dict[str, Any]:
591 """Get the current token counts."""
592 return self.counts
595class TokenCounter:
596 """Manager class for token counting across the application."""
598 def __init__(self):
599 """Initialize the token counter."""
600 # No longer need to store database reference
601 self._thread_metrics_db = None
603 @property
604 def thread_metrics_db(self):
605 """Lazy load thread metrics writer."""
606 if self._thread_metrics_db is None:
607 try:
608 from ..database.thread_metrics import metrics_writer
610 self._thread_metrics_db = metrics_writer
611 except ImportError:
612 logger.warning("Thread metrics writer not available")
613 return self._thread_metrics_db
615 def create_callback(
616 self,
617 research_id: Optional[str] = None,
618 research_context: Optional[Dict[str, Any]] = None,
619 ) -> TokenCountingCallback:
620 """Create a new token counting callback.
622 Args:
623 research_id: The ID of the research to track tokens for
624 research_context: Additional research context for enhanced tracking
626 Returns:
627 A new TokenCountingCallback instance
628 """
629 return TokenCountingCallback(
630 research_id=research_id, research_context=research_context
631 )
633 def get_research_metrics(self, research_id: str) -> Dict[str, Any]:
634 """Get token metrics for a specific research.
636 Args:
637 research_id: The ID of the research
639 Returns:
640 Dictionary containing token usage metrics
641 """
642 from flask import session as flask_session
644 from ..database.session_context import get_user_db_session
646 username = flask_session.get("username")
647 if not username:
648 return {
649 "research_id": research_id,
650 "total_tokens": 0,
651 "total_calls": 0,
652 "model_usage": [],
653 }
655 with get_user_db_session(username) as session:
656 # Get token usage for this research from TokenUsage table
657 from sqlalchemy import func
659 token_usages = (
660 session.query(
661 TokenUsage.model_name,
662 TokenUsage.model_provider,
663 func.sum(TokenUsage.prompt_tokens).label("prompt_tokens"),
664 func.sum(TokenUsage.completion_tokens).label(
665 "completion_tokens"
666 ),
667 func.sum(TokenUsage.total_tokens).label("total_tokens"),
668 func.count().label("calls"),
669 )
670 .filter_by(research_id=research_id)
671 .group_by(TokenUsage.model_name, TokenUsage.model_provider)
672 .order_by(func.sum(TokenUsage.total_tokens).desc())
673 .all()
674 )
676 model_usage = []
677 total_tokens = 0
678 total_calls = 0
680 for usage in token_usages:
681 model_usage.append(
682 {
683 "model": usage.model_name,
684 "provider": usage.model_provider,
685 "tokens": usage.total_tokens or 0,
686 "calls": usage.calls or 0,
687 "prompt_tokens": usage.prompt_tokens or 0,
688 "completion_tokens": usage.completion_tokens or 0,
689 }
690 )
691 total_tokens += usage.total_tokens or 0
692 total_calls += usage.calls or 0
694 return {
695 "research_id": research_id,
696 "total_tokens": total_tokens,
697 "total_calls": total_calls,
698 "model_usage": model_usage,
699 }
701 def get_overall_metrics(
702 self, period: str = "30d", research_mode: str = "all"
703 ) -> Dict[str, Any]:
704 """Get overall token metrics across all researches.
706 Args:
707 period: Time period to filter by ('7d', '30d', '3m', '1y', 'all')
708 research_mode: Research mode to filter by ('quick', 'detailed', 'all')
710 Returns:
711 Dictionary containing overall metrics
712 """
713 # First get metrics from user's encrypted database
714 encrypted_metrics = self._get_metrics_from_encrypted_db(
715 period, research_mode
716 )
718 # Then get metrics from thread-safe metrics database
719 thread_metrics = self._get_metrics_from_thread_db(period, research_mode)
721 # Merge the results
722 return self._merge_metrics(encrypted_metrics, thread_metrics)
724 def _get_metrics_from_encrypted_db(
725 self, period: str, research_mode: str
726 ) -> Dict[str, Any]:
727 """Get metrics from user's encrypted database."""
728 from flask import session as flask_session
730 from ..database.session_context import get_user_db_session
732 username = flask_session.get("username")
733 if not username: 733 ↛ 734line 733 didn't jump to line 734 because the condition on line 733 was never true
734 return self._get_empty_metrics()
736 try:
737 with get_user_db_session(username) as session:
738 # Build base query with filters
739 query = session.query(TokenUsage)
741 # Apply time filter
742 time_condition = get_time_filter_condition(
743 period, TokenUsage.timestamp
744 )
745 if time_condition is not None: 745 ↛ 749line 745 didn't jump to line 749 because the condition on line 745 was always true
746 query = query.filter(time_condition)
748 # Apply research mode filter
749 mode_condition = get_research_mode_condition(
750 research_mode, TokenUsage.research_mode
751 )
752 if mode_condition is not None: 752 ↛ 753line 752 didn't jump to line 753 because the condition on line 752 was never true
753 query = query.filter(mode_condition)
755 # Total tokens from TokenUsage
756 total_tokens = (
757 query.with_entities(
758 func.sum(TokenUsage.total_tokens)
759 ).scalar()
760 or 0
761 )
763 # Import ResearchHistory model
764 from ..database.models.research import ResearchHistory
766 # Count researches from ResearchHistory table
767 research_query = session.query(func.count(ResearchHistory.id))
769 # Debug: Check if any research history records exist at all
770 all_research_count = (
771 session.query(func.count(ResearchHistory.id)).scalar() or 0
772 )
773 logger.warning(
774 f"DEBUG: Total ResearchHistory records in database: {all_research_count}"
775 )
777 # Debug: List first few research IDs and their timestamps
778 sample_researches = (
779 session.query(
780 ResearchHistory.id,
781 ResearchHistory.created_at,
782 ResearchHistory.mode,
783 )
784 .limit(5)
785 .all()
786 )
787 if sample_researches: 787 ↛ 788line 787 didn't jump to line 788 because the condition on line 787 was never true
788 logger.warning("DEBUG: Sample ResearchHistory records:")
789 for r_id, r_created, r_mode in sample_researches:
790 logger.warning(
791 f" - ID: {r_id}, Created: {r_created}, Mode: {r_mode}"
792 )
793 else:
794 logger.warning(
795 "DEBUG: No ResearchHistory records found in database!"
796 )
798 # Get time filter conditions for ResearchHistory query
799 start_time, end_time = None, None
800 if period != "all": 800 ↛ 814line 800 didn't jump to line 814 because the condition on line 800 was always true
801 if period == "today": 801 ↛ 802line 801 didn't jump to line 802 because the condition on line 801 was never true
802 start_time = datetime.now(UTC).replace(
803 hour=0, minute=0, second=0, microsecond=0
804 )
805 elif period == "week": 805 ↛ 806line 805 didn't jump to line 806 because the condition on line 805 was never true
806 start_time = datetime.now(UTC) - timedelta(days=7)
807 elif period == "month": 807 ↛ 808line 807 didn't jump to line 808 because the condition on line 807 was never true
808 start_time = datetime.now(UTC) - timedelta(days=30)
810 if start_time: 810 ↛ 811line 810 didn't jump to line 811 because the condition on line 810 was never true
811 end_time = datetime.now(UTC)
813 # Apply time filter if specified
814 if start_time and end_time: 814 ↛ 815line 814 didn't jump to line 815 because the condition on line 814 was never true
815 research_query = research_query.filter(
816 ResearchHistory.created_at >= start_time.isoformat(),
817 ResearchHistory.created_at <= end_time.isoformat(),
818 )
820 # Apply mode filter if specified
821 mode_filter = research_mode if research_mode != "all" else None
822 if mode_filter: 822 ↛ 823line 822 didn't jump to line 823 because the condition on line 822 was never true
823 logger.warning(
824 f"DEBUG: Applying mode filter: {mode_filter}"
825 )
826 research_query = research_query.filter(
827 ResearchHistory.mode == mode_filter
828 )
830 total_researches = research_query.scalar() or 0
831 logger.warning(
832 f"DEBUG: Final filtered research count: {total_researches}"
833 )
835 # Also check distinct research_ids in TokenUsage for comparison
836 token_research_count = (
837 session.query(
838 func.count(func.distinct(TokenUsage.research_id))
839 ).scalar()
840 or 0
841 )
842 logger.warning(
843 f"DEBUG: Distinct research_ids in TokenUsage: {token_research_count}"
844 )
846 # Model statistics using ORM aggregation
847 model_stats_query = session.query(
848 TokenUsage.model_name,
849 func.sum(TokenUsage.total_tokens).label("tokens"),
850 func.count().label("calls"),
851 func.sum(TokenUsage.prompt_tokens).label("prompt_tokens"),
852 func.sum(TokenUsage.completion_tokens).label(
853 "completion_tokens"
854 ),
855 ).filter(TokenUsage.model_name.isnot(None))
857 # Apply same filters to model stats
858 if time_condition is not None: 858 ↛ 860line 858 didn't jump to line 860 because the condition on line 858 was always true
859 model_stats_query = model_stats_query.filter(time_condition)
860 if mode_condition is not None: 860 ↛ 861line 860 didn't jump to line 861 because the condition on line 860 was never true
861 model_stats_query = model_stats_query.filter(mode_condition)
863 model_stats = (
864 model_stats_query.group_by(TokenUsage.model_name)
865 .order_by(func.sum(TokenUsage.total_tokens).desc())
866 .all()
867 )
869 # Get provider info from ModelUsage table
870 by_model = []
871 for stat in model_stats: 871 ↛ 873line 871 didn't jump to line 873 because the loop on line 871 never started
872 # Try to get provider from ModelUsage table
873 provider_info = (
874 session.query(ModelUsage.model_provider)
875 .filter(ModelUsage.model_name == stat.model_name)
876 .first()
877 )
878 provider = (
879 provider_info.model_provider
880 if provider_info
881 else "unknown"
882 )
884 by_model.append(
885 {
886 "model": stat.model_name,
887 "provider": provider,
888 "tokens": stat.tokens,
889 "calls": stat.calls,
890 "prompt_tokens": stat.prompt_tokens,
891 "completion_tokens": stat.completion_tokens,
892 }
893 )
895 # Get recent researches with token usage
896 # Note: This requires research_history table - for now we'll use available data
897 recent_research_query = session.query(
898 TokenUsage.research_id,
899 func.sum(TokenUsage.total_tokens).label("token_count"),
900 func.max(TokenUsage.timestamp).label("latest_timestamp"),
901 ).filter(TokenUsage.research_id.isnot(None))
903 if time_condition is not None: 903 ↛ 907line 903 didn't jump to line 907 because the condition on line 903 was always true
904 recent_research_query = recent_research_query.filter(
905 time_condition
906 )
907 if mode_condition is not None: 907 ↛ 908line 907 didn't jump to line 908 because the condition on line 907 was never true
908 recent_research_query = recent_research_query.filter(
909 mode_condition
910 )
912 recent_research_data = (
913 recent_research_query.group_by(TokenUsage.research_id)
914 .order_by(func.max(TokenUsage.timestamp).desc())
915 .limit(10)
916 .all()
917 )
919 recent_researches = []
920 for research_data in recent_research_data: 920 ↛ 922line 920 didn't jump to line 922 because the loop on line 920 never started
921 # Get research query from token_usage table if available
922 research_query_data = (
923 session.query(TokenUsage.research_query)
924 .filter(
925 TokenUsage.research_id == research_data.research_id,
926 TokenUsage.research_query.isnot(None),
927 )
928 .first()
929 )
931 query_text = (
932 research_query_data.research_query
933 if research_query_data
934 else f"Research {research_data.research_id}"
935 )
937 recent_researches.append(
938 {
939 "id": research_data.research_id,
940 "query": query_text,
941 "tokens": research_data.token_count or 0,
942 "created_at": research_data.latest_timestamp,
943 }
944 )
946 # Token breakdown statistics
947 breakdown_query = query.with_entities(
948 func.sum(TokenUsage.prompt_tokens).label(
949 "total_input_tokens"
950 ),
951 func.sum(TokenUsage.completion_tokens).label(
952 "total_output_tokens"
953 ),
954 func.avg(TokenUsage.prompt_tokens).label(
955 "avg_input_tokens"
956 ),
957 func.avg(TokenUsage.completion_tokens).label(
958 "avg_output_tokens"
959 ),
960 func.avg(TokenUsage.total_tokens).label("avg_total_tokens"),
961 )
962 token_breakdown = breakdown_query.first()
964 # Get rate limiting metrics
965 from ..database.models import (
966 RateLimitAttempt,
967 RateLimitEstimate,
968 )
970 # Get rate limit attempts
971 rate_limit_query = session.query(RateLimitAttempt)
973 # Apply time filter
974 if time_condition is not None: 974 ↛ 993line 974 didn't jump to line 993 because the condition on line 974 was always true
975 # RateLimitAttempt uses timestamp as float, not datetime
976 if period == "7d": 976 ↛ 977line 976 didn't jump to line 977 because the condition on line 976 was never true
977 cutoff_time = time.time() - (7 * 24 * 3600)
978 elif period == "30d": 978 ↛ 980line 978 didn't jump to line 980 because the condition on line 978 was always true
979 cutoff_time = time.time() - (30 * 24 * 3600)
980 elif period == "3m":
981 cutoff_time = time.time() - (90 * 24 * 3600)
982 elif period == "1y":
983 cutoff_time = time.time() - (365 * 24 * 3600)
984 else: # all
985 cutoff_time = 0
987 if cutoff_time > 0: 987 ↛ 993line 987 didn't jump to line 993 because the condition on line 987 was always true
988 rate_limit_query = rate_limit_query.filter(
989 RateLimitAttempt.timestamp >= cutoff_time
990 )
992 # Get rate limit statistics
993 total_attempts = rate_limit_query.count()
994 successful_attempts = rate_limit_query.filter(
995 RateLimitAttempt.success
996 ).count()
997 failed_attempts = total_attempts - successful_attempts
999 # Count rate limiting events (failures with RateLimitError)
1000 rate_limit_events = rate_limit_query.filter(
1001 ~RateLimitAttempt.success,
1002 RateLimitAttempt.error_type == "RateLimitError",
1003 ).count()
1005 logger.warning(
1006 f"DEBUG: Rate limit attempts in database: total={total_attempts}, successful={successful_attempts}"
1007 )
1009 # Get all attempts for detailed calculations
1010 attempts = rate_limit_query.all()
1012 # Calculate average wait times
1013 if attempts: 1013 ↛ 1014line 1013 didn't jump to line 1014 because the condition on line 1013 was never true
1014 avg_wait_time = sum(a.wait_time for a in attempts) / len(
1015 attempts
1016 )
1017 successful_wait_times = [
1018 a.wait_time for a in attempts if a.success
1019 ]
1020 avg_successful_wait = (
1021 sum(successful_wait_times) / len(successful_wait_times)
1022 if successful_wait_times
1023 else 0
1024 )
1025 else:
1026 avg_wait_time = 0
1027 avg_successful_wait = 0
1029 # Get tracked engines - count distinct engine types from attempts
1030 tracked_engines_query = session.query(
1031 func.count(func.distinct(RateLimitAttempt.engine_type))
1032 )
1033 if cutoff_time > 0: 1033 ↛ 1037line 1033 didn't jump to line 1037 because the condition on line 1033 was always true
1034 tracked_engines_query = tracked_engines_query.filter(
1035 RateLimitAttempt.timestamp >= cutoff_time
1036 )
1037 tracked_engines = tracked_engines_query.scalar() or 0
1039 # Get engine-specific stats from attempts
1040 engine_stats = []
1042 # Get distinct engine types from attempts
1043 engine_types_query = session.query(
1044 RateLimitAttempt.engine_type
1045 ).distinct()
1046 if cutoff_time > 0: 1046 ↛ 1050line 1046 didn't jump to line 1050 because the condition on line 1046 was always true
1047 engine_types_query = engine_types_query.filter(
1048 RateLimitAttempt.timestamp >= cutoff_time
1049 )
1050 engine_types = [
1051 row.engine_type for row in engine_types_query.all()
1052 ]
1054 for engine_type in engine_types: 1054 ↛ 1055line 1054 didn't jump to line 1055 because the loop on line 1054 never started
1055 engine_attempts_list = [
1056 a for a in attempts if a.engine_type == engine_type
1057 ]
1058 engine_attempts = len(engine_attempts_list)
1059 engine_success = len(
1060 [a for a in engine_attempts_list if a.success]
1061 )
1063 # Get estimate if exists
1064 estimate = (
1065 session.query(RateLimitEstimate)
1066 .filter(RateLimitEstimate.engine_type == engine_type)
1067 .first()
1068 )
1070 # Calculate recent success rate
1071 recent_success_rate = (
1072 (engine_success / engine_attempts * 100)
1073 if engine_attempts > 0
1074 else 0
1075 )
1077 # Determine status based on success rate
1078 if estimate:
1079 status = (
1080 "healthy"
1081 if estimate.success_rate > 0.8
1082 else "degraded"
1083 if estimate.success_rate > 0.5
1084 else "poor"
1085 )
1086 else:
1087 status = (
1088 "healthy"
1089 if recent_success_rate > 80
1090 else "degraded"
1091 if recent_success_rate > 50
1092 else "poor"
1093 )
1095 engine_stat = {
1096 "engine": engine_type,
1097 "base_wait": estimate.base_wait_seconds
1098 if estimate
1099 else 0.0,
1100 "base_wait_seconds": round(
1101 estimate.base_wait_seconds if estimate else 0.0, 2
1102 ),
1103 "min_wait_seconds": round(
1104 estimate.min_wait_seconds if estimate else 0.0, 2
1105 ),
1106 "max_wait_seconds": round(
1107 estimate.max_wait_seconds if estimate else 0.0, 2
1108 ),
1109 "success_rate": round(estimate.success_rate * 100, 1)
1110 if estimate
1111 else recent_success_rate,
1112 "total_attempts": estimate.total_attempts
1113 if estimate
1114 else engine_attempts,
1115 "recent_attempts": engine_attempts,
1116 "recent_success_rate": round(recent_success_rate, 1),
1117 "attempts": engine_attempts,
1118 "status": status,
1119 }
1121 if estimate:
1122 engine_stat["last_updated"] = datetime.fromtimestamp(
1123 estimate.last_updated
1124 ).strftime("%Y-%m-%d %H:%M:%S")
1125 else:
1126 engine_stat["last_updated"] = "Never"
1128 engine_stats.append(engine_stat)
1130 logger.warning(
1131 f"DEBUG: Tracked engines: {tracked_engines}, engine_stats: {engine_stats}"
1132 )
1134 result = {
1135 "total_tokens": total_tokens,
1136 "total_researches": total_researches,
1137 "by_model": by_model,
1138 "recent_researches": recent_researches,
1139 "token_breakdown": {
1140 "total_input_tokens": int(
1141 token_breakdown.total_input_tokens or 0
1142 ),
1143 "total_output_tokens": int(
1144 token_breakdown.total_output_tokens or 0
1145 ),
1146 "avg_input_tokens": int(
1147 token_breakdown.avg_input_tokens or 0
1148 ),
1149 "avg_output_tokens": int(
1150 token_breakdown.avg_output_tokens or 0
1151 ),
1152 "avg_total_tokens": int(
1153 token_breakdown.avg_total_tokens or 0
1154 ),
1155 },
1156 "rate_limiting": {
1157 "total_attempts": total_attempts,
1158 "successful_attempts": successful_attempts,
1159 "failed_attempts": failed_attempts,
1160 "success_rate": (
1161 successful_attempts / total_attempts * 100
1162 )
1163 if total_attempts > 0
1164 else 0,
1165 "rate_limit_events": rate_limit_events,
1166 "avg_wait_time": round(float(avg_wait_time), 2),
1167 "avg_successful_wait": round(
1168 float(avg_successful_wait), 2
1169 ),
1170 "tracked_engines": tracked_engines,
1171 "engine_stats": engine_stats,
1172 "total_engines_tracked": tracked_engines,
1173 "healthy_engines": len(
1174 [
1175 s
1176 for s in engine_stats
1177 if s["status"] == "healthy"
1178 ]
1179 ),
1180 "degraded_engines": len(
1181 [
1182 s
1183 for s in engine_stats
1184 if s["status"] == "degraded"
1185 ]
1186 ),
1187 "poor_engines": len(
1188 [s for s in engine_stats if s["status"] == "poor"]
1189 ),
1190 },
1191 }
1193 logger.warning(
1194 f"DEBUG: Returning from _get_metrics_from_encrypted_db - total_researches: {result['total_researches']}"
1195 )
1196 return result
1197 except Exception as e:
1198 logger.exception(
1199 f"CRITICAL ERROR accessing encrypted database for metrics: {e}"
1200 )
1201 return self._get_empty_metrics()
1203 def _get_metrics_from_thread_db(
1204 self, period: str, research_mode: str
1205 ) -> Dict[str, Any]:
1206 """Get metrics from thread-safe metrics database."""
1207 if not self.thread_metrics_db: 1207 ↛ 1208line 1207 didn't jump to line 1208 because the condition on line 1207 was never true
1208 return {
1209 "total_tokens": 0,
1210 "total_researches": 0,
1211 "by_model": [],
1212 "recent_researches": [],
1213 "token_breakdown": {
1214 "total_input_tokens": 0,
1215 "total_output_tokens": 0,
1216 "avg_input_tokens": 0,
1217 "avg_output_tokens": 0,
1218 "avg_total_tokens": 0,
1219 },
1220 }
1222 try:
1223 with self.thread_metrics_db.get_session() as session:
1224 # Build base query with filters
1225 query = session.query(TokenUsage)
1227 # Apply time filter
1228 time_condition = get_time_filter_condition(
1229 period, TokenUsage.timestamp
1230 )
1231 if time_condition is not None:
1232 query = query.filter(time_condition)
1234 # Apply research mode filter
1235 mode_condition = get_research_mode_condition(
1236 research_mode, TokenUsage.research_mode
1237 )
1238 if mode_condition is not None:
1239 query = query.filter(mode_condition)
1241 # Get totals
1242 total_tokens = (
1243 query.with_entities(
1244 func.sum(TokenUsage.total_tokens)
1245 ).scalar()
1246 or 0
1247 )
1248 total_researches = (
1249 query.with_entities(
1250 func.count(func.distinct(TokenUsage.research_id))
1251 ).scalar()
1252 or 0
1253 )
1255 # Get model statistics
1256 model_stats = (
1257 query.with_entities(
1258 TokenUsage.model_name,
1259 func.sum(TokenUsage.total_tokens).label("tokens"),
1260 func.count().label("calls"),
1261 func.sum(TokenUsage.prompt_tokens).label(
1262 "prompt_tokens"
1263 ),
1264 func.sum(TokenUsage.completion_tokens).label(
1265 "completion_tokens"
1266 ),
1267 )
1268 .filter(TokenUsage.model_name.isnot(None))
1269 .group_by(TokenUsage.model_name)
1270 .all()
1271 )
1273 by_model = []
1274 for stat in model_stats:
1275 by_model.append(
1276 {
1277 "model": stat.model_name,
1278 "provider": "unknown", # Provider info might not be in thread DB
1279 "tokens": stat.tokens,
1280 "calls": stat.calls,
1281 "prompt_tokens": stat.prompt_tokens,
1282 "completion_tokens": stat.completion_tokens,
1283 }
1284 )
1286 # Token breakdown
1287 breakdown = query.with_entities(
1288 func.sum(TokenUsage.prompt_tokens).label(
1289 "total_input_tokens"
1290 ),
1291 func.sum(TokenUsage.completion_tokens).label(
1292 "total_output_tokens"
1293 ),
1294 ).first()
1296 return {
1297 "total_tokens": total_tokens,
1298 "total_researches": total_researches,
1299 "by_model": by_model,
1300 "recent_researches": [], # Skip for thread DB
1301 "token_breakdown": {
1302 "total_input_tokens": int(
1303 breakdown.total_input_tokens or 0
1304 ),
1305 "total_output_tokens": int(
1306 breakdown.total_output_tokens or 0
1307 ),
1308 "avg_input_tokens": 0,
1309 "avg_output_tokens": 0,
1310 "avg_total_tokens": 0,
1311 },
1312 }
1313 except Exception:
1314 logger.exception("Error reading thread metrics database")
1315 return {
1316 "total_tokens": 0,
1317 "total_researches": 0,
1318 "by_model": [],
1319 "recent_researches": [],
1320 "token_breakdown": {
1321 "total_input_tokens": 0,
1322 "total_output_tokens": 0,
1323 "avg_input_tokens": 0,
1324 "avg_output_tokens": 0,
1325 "avg_total_tokens": 0,
1326 },
1327 }
1329 def _merge_metrics(
1330 self, encrypted: Dict[str, Any], thread: Dict[str, Any]
1331 ) -> Dict[str, Any]:
1332 """Merge metrics from both databases."""
1333 # Combine totals
1334 total_tokens = encrypted.get("total_tokens", 0) + thread.get(
1335 "total_tokens", 0
1336 )
1337 total_researches = max(
1338 encrypted.get("total_researches", 0),
1339 thread.get("total_researches", 0),
1340 )
1341 logger.warning(
1342 f"DEBUG: Merged metrics - encrypted researches: {encrypted.get('total_researches', 0)}, thread researches: {thread.get('total_researches', 0)}, final: {total_researches}"
1343 )
1345 # Merge model usage
1346 model_map = {}
1347 for model_data in encrypted.get("by_model", []):
1348 key = model_data["model"]
1349 model_map[key] = model_data
1351 for model_data in thread.get("by_model", []):
1352 key = model_data["model"]
1353 if key in model_map: 1353 ↛ 1362line 1353 didn't jump to line 1362 because the condition on line 1353 was always true
1354 # Merge with existing
1355 model_map[key]["tokens"] += model_data["tokens"]
1356 model_map[key]["calls"] += model_data["calls"]
1357 model_map[key]["prompt_tokens"] += model_data["prompt_tokens"]
1358 model_map[key]["completion_tokens"] += model_data[
1359 "completion_tokens"
1360 ]
1361 else:
1362 model_map[key] = model_data
1364 by_model = sorted(
1365 model_map.values(), key=lambda x: x["tokens"], reverse=True
1366 )
1368 # Merge token breakdown
1369 token_breakdown = {
1370 "total_input_tokens": (
1371 encrypted.get("token_breakdown", {}).get(
1372 "total_input_tokens", 0
1373 )
1374 + thread.get("token_breakdown", {}).get("total_input_tokens", 0)
1375 ),
1376 "total_output_tokens": (
1377 encrypted.get("token_breakdown", {}).get(
1378 "total_output_tokens", 0
1379 )
1380 + thread.get("token_breakdown", {}).get(
1381 "total_output_tokens", 0
1382 )
1383 ),
1384 "avg_input_tokens": encrypted.get("token_breakdown", {}).get(
1385 "avg_input_tokens", 0
1386 ),
1387 "avg_output_tokens": encrypted.get("token_breakdown", {}).get(
1388 "avg_output_tokens", 0
1389 ),
1390 "avg_total_tokens": encrypted.get("token_breakdown", {}).get(
1391 "avg_total_tokens", 0
1392 ),
1393 }
1395 result = {
1396 "total_tokens": total_tokens,
1397 "total_researches": total_researches,
1398 "by_model": by_model,
1399 "recent_researches": encrypted.get("recent_researches", []),
1400 "token_breakdown": token_breakdown,
1401 }
1403 logger.warning(
1404 f"DEBUG: Final get_token_metrics result - total_researches: {result['total_researches']}"
1405 )
1406 return result
1408 def _get_empty_metrics(self) -> Dict[str, Any]:
1409 """Return empty metrics structure when no data is available."""
1410 return {
1411 "total_tokens": 0,
1412 "total_researches": 0,
1413 "by_model": [],
1414 "recent_researches": [],
1415 "token_breakdown": {
1416 "prompt_tokens": 0,
1417 "completion_tokens": 0,
1418 "avg_prompt_tokens": 0,
1419 "avg_completion_tokens": 0,
1420 "avg_total_tokens": 0,
1421 },
1422 }
1424 def get_enhanced_metrics(
1425 self, period: str = "30d", research_mode: str = "all"
1426 ) -> Dict[str, Any]:
1427 """Get enhanced Phase 1 tracking metrics.
1429 Args:
1430 period: Time period to filter by ('7d', '30d', '3m', '1y', 'all')
1431 research_mode: Research mode to filter by ('quick', 'detailed', 'all')
1433 Returns:
1434 Dictionary containing enhanced metrics data including time series
1435 """
1436 from flask import session as flask_session
1438 from ..database.session_context import get_user_db_session
1440 username = flask_session.get("username")
1441 if not username: 1441 ↛ 1443line 1441 didn't jump to line 1443 because the condition on line 1441 was never true
1442 # Return empty metrics structure when no user session
1443 return {
1444 "recent_enhanced_data": [],
1445 "performance_stats": {
1446 "avg_response_time": 0,
1447 "min_response_time": 0,
1448 "max_response_time": 0,
1449 "success_rate": 0,
1450 "error_rate": 0,
1451 "total_enhanced_calls": 0,
1452 },
1453 "mode_breakdown": [],
1454 "search_engine_stats": [],
1455 "phase_breakdown": [],
1456 "time_series_data": [],
1457 "call_stack_analysis": {
1458 "by_file": [],
1459 "by_function": [],
1460 },
1461 }
1463 try:
1464 with get_user_db_session(username) as session:
1465 # Build base query with filters
1466 query = session.query(TokenUsage)
1468 # Apply time filter
1469 time_condition = get_time_filter_condition(
1470 period, TokenUsage.timestamp
1471 )
1472 if time_condition is not None: 1472 ↛ 1476line 1472 didn't jump to line 1476 because the condition on line 1472 was always true
1473 query = query.filter(time_condition)
1475 # Apply research mode filter
1476 mode_condition = get_research_mode_condition(
1477 research_mode, TokenUsage.research_mode
1478 )
1479 if mode_condition is not None: 1479 ↛ 1480line 1479 didn't jump to line 1480 because the condition on line 1479 was never true
1480 query = query.filter(mode_condition)
1482 # Get time series data for the chart - most important for "Token Consumption Over Time"
1483 time_series_query = query.filter(
1484 TokenUsage.timestamp.isnot(None),
1485 TokenUsage.total_tokens > 0,
1486 ).order_by(TokenUsage.timestamp.asc())
1488 # Limit to recent data for performance
1489 if period != "all": 1489 ↛ 1492line 1489 didn't jump to line 1492 because the condition on line 1489 was always true
1490 time_series_query = time_series_query.limit(200)
1492 time_series_data = time_series_query.all()
1494 # Format time series data with cumulative calculations
1495 time_series = []
1496 cumulative_tokens = 0
1497 cumulative_prompt_tokens = 0
1498 cumulative_completion_tokens = 0
1500 for usage in time_series_data: 1500 ↛ 1501line 1500 didn't jump to line 1501 because the loop on line 1500 never started
1501 cumulative_tokens += usage.total_tokens or 0
1502 cumulative_prompt_tokens += usage.prompt_tokens or 0
1503 cumulative_completion_tokens += usage.completion_tokens or 0
1505 time_series.append(
1506 {
1507 "timestamp": str(usage.timestamp)
1508 if usage.timestamp
1509 else None,
1510 "tokens": usage.total_tokens or 0,
1511 "prompt_tokens": usage.prompt_tokens or 0,
1512 "completion_tokens": usage.completion_tokens or 0,
1513 "cumulative_tokens": cumulative_tokens,
1514 "cumulative_prompt_tokens": cumulative_prompt_tokens,
1515 "cumulative_completion_tokens": cumulative_completion_tokens,
1516 "research_id": usage.research_id,
1517 }
1518 )
1520 # Basic performance stats using ORM
1521 performance_query = query.filter(
1522 TokenUsage.response_time_ms.isnot(None)
1523 )
1524 total_calls = performance_query.count()
1526 if total_calls > 0: 1526 ↛ 1527line 1526 didn't jump to line 1527 because the condition on line 1526 was never true
1527 avg_response_time = (
1528 performance_query.with_entities(
1529 func.avg(TokenUsage.response_time_ms)
1530 ).scalar()
1531 or 0
1532 )
1533 min_response_time = (
1534 performance_query.with_entities(
1535 func.min(TokenUsage.response_time_ms)
1536 ).scalar()
1537 or 0
1538 )
1539 max_response_time = (
1540 performance_query.with_entities(
1541 func.max(TokenUsage.response_time_ms)
1542 ).scalar()
1543 or 0
1544 )
1545 success_count = performance_query.filter(
1546 TokenUsage.success_status == "success"
1547 ).count()
1548 error_count = performance_query.filter(
1549 TokenUsage.success_status == "error"
1550 ).count()
1552 perf_stats = {
1553 "avg_response_time": round(avg_response_time),
1554 "min_response_time": min_response_time,
1555 "max_response_time": max_response_time,
1556 "success_rate": (
1557 round((success_count / total_calls * 100), 1)
1558 if total_calls > 0
1559 else 0
1560 ),
1561 "error_rate": (
1562 round((error_count / total_calls * 100), 1)
1563 if total_calls > 0
1564 else 0
1565 ),
1566 "total_enhanced_calls": total_calls,
1567 }
1568 else:
1569 perf_stats = {
1570 "avg_response_time": 0,
1571 "min_response_time": 0,
1572 "max_response_time": 0,
1573 "success_rate": 0,
1574 "error_rate": 0,
1575 "total_enhanced_calls": 0,
1576 }
1578 # Research mode breakdown using ORM
1579 mode_stats = (
1580 query.filter(TokenUsage.research_mode.isnot(None))
1581 .with_entities(
1582 TokenUsage.research_mode,
1583 func.count().label("count"),
1584 func.avg(TokenUsage.total_tokens).label("avg_tokens"),
1585 func.avg(TokenUsage.response_time_ms).label(
1586 "avg_response_time"
1587 ),
1588 )
1589 .group_by(TokenUsage.research_mode)
1590 .all()
1591 )
1593 modes = [
1594 {
1595 "mode": stat.research_mode,
1596 "count": stat.count,
1597 "avg_tokens": round(stat.avg_tokens or 0),
1598 "avg_response_time": round(stat.avg_response_time or 0),
1599 }
1600 for stat in mode_stats
1601 ]
1603 # Recent enhanced data (simplified)
1604 recent_enhanced_query = (
1605 query.filter(TokenUsage.research_query.isnot(None))
1606 .order_by(TokenUsage.timestamp.desc())
1607 .limit(50)
1608 )
1610 recent_enhanced_data = recent_enhanced_query.all()
1611 recent_enhanced = [
1612 {
1613 "research_query": usage.research_query,
1614 "research_mode": usage.research_mode,
1615 "research_phase": usage.research_phase,
1616 "search_iteration": usage.search_iteration,
1617 "response_time_ms": usage.response_time_ms,
1618 "success_status": usage.success_status,
1619 "error_type": usage.error_type,
1620 "search_engines_planned": usage.search_engines_planned,
1621 "search_engine_selected": usage.search_engine_selected,
1622 "total_tokens": usage.total_tokens,
1623 "prompt_tokens": usage.prompt_tokens,
1624 "completion_tokens": usage.completion_tokens,
1625 "timestamp": str(usage.timestamp)
1626 if usage.timestamp
1627 else None,
1628 "research_id": usage.research_id,
1629 "calling_file": usage.calling_file,
1630 "calling_function": usage.calling_function,
1631 "call_stack": usage.call_stack,
1632 }
1633 for usage in recent_enhanced_data
1634 ]
1636 # Search engine breakdown using ORM
1637 search_engine_stats = (
1638 query.filter(TokenUsage.search_engine_selected.isnot(None))
1639 .with_entities(
1640 TokenUsage.search_engine_selected,
1641 func.count().label("count"),
1642 func.avg(TokenUsage.total_tokens).label("avg_tokens"),
1643 func.avg(TokenUsage.response_time_ms).label(
1644 "avg_response_time"
1645 ),
1646 )
1647 .group_by(TokenUsage.search_engine_selected)
1648 .all()
1649 )
1651 search_engines = [
1652 {
1653 "search_engine": stat.search_engine_selected,
1654 "count": stat.count,
1655 "avg_tokens": round(stat.avg_tokens or 0),
1656 "avg_response_time": round(stat.avg_response_time or 0),
1657 }
1658 for stat in search_engine_stats
1659 ]
1661 # Research phase breakdown using ORM
1662 phase_stats = (
1663 query.filter(TokenUsage.research_phase.isnot(None))
1664 .with_entities(
1665 TokenUsage.research_phase,
1666 func.count().label("count"),
1667 func.avg(TokenUsage.total_tokens).label("avg_tokens"),
1668 func.avg(TokenUsage.response_time_ms).label(
1669 "avg_response_time"
1670 ),
1671 )
1672 .group_by(TokenUsage.research_phase)
1673 .all()
1674 )
1676 phases = [
1677 {
1678 "phase": stat.research_phase,
1679 "count": stat.count,
1680 "avg_tokens": round(stat.avg_tokens or 0),
1681 "avg_response_time": round(stat.avg_response_time or 0),
1682 }
1683 for stat in phase_stats
1684 ]
1686 # Call stack analysis using ORM
1687 file_stats = (
1688 query.filter(TokenUsage.calling_file.isnot(None))
1689 .with_entities(
1690 TokenUsage.calling_file,
1691 func.count().label("count"),
1692 func.avg(TokenUsage.total_tokens).label("avg_tokens"),
1693 )
1694 .group_by(TokenUsage.calling_file)
1695 .order_by(func.count().desc())
1696 .limit(10)
1697 .all()
1698 )
1700 files = [
1701 {
1702 "file": stat.calling_file,
1703 "count": stat.count,
1704 "avg_tokens": round(stat.avg_tokens or 0),
1705 }
1706 for stat in file_stats
1707 ]
1709 function_stats = (
1710 query.filter(TokenUsage.calling_function.isnot(None))
1711 .with_entities(
1712 TokenUsage.calling_function,
1713 func.count().label("count"),
1714 func.avg(TokenUsage.total_tokens).label("avg_tokens"),
1715 )
1716 .group_by(TokenUsage.calling_function)
1717 .order_by(func.count().desc())
1718 .limit(10)
1719 .all()
1720 )
1722 functions = [
1723 {
1724 "function": stat.calling_function,
1725 "count": stat.count,
1726 "avg_tokens": round(stat.avg_tokens or 0),
1727 }
1728 for stat in function_stats
1729 ]
1731 return {
1732 "recent_enhanced_data": recent_enhanced,
1733 "performance_stats": perf_stats,
1734 "mode_breakdown": modes,
1735 "search_engine_stats": search_engines,
1736 "phase_breakdown": phases,
1737 "time_series_data": time_series,
1738 "call_stack_analysis": {
1739 "by_file": files,
1740 "by_function": functions,
1741 },
1742 }
1743 except Exception:
1744 logger.exception("Error in get_enhanced_metrics")
1745 # Return simplified response without non-existent columns
1746 return {
1747 "recent_enhanced_data": [],
1748 "performance_stats": {
1749 "avg_response_time": 0,
1750 "min_response_time": 0,
1751 "max_response_time": 0,
1752 "success_rate": 0,
1753 "error_rate": 0,
1754 "total_enhanced_calls": 0,
1755 },
1756 "mode_breakdown": [],
1757 "search_engine_stats": [],
1758 "phase_breakdown": [],
1759 "time_series_data": [],
1760 "call_stack_analysis": {
1761 "by_file": [],
1762 "by_function": [],
1763 },
1764 }
1766 def get_research_timeline_metrics(self, research_id: str) -> Dict[str, Any]:
1767 """Get timeline metrics for a specific research.
1769 Args:
1770 research_id: The ID of the research
1772 Returns:
1773 Dictionary containing timeline metrics for the research
1774 """
1775 from flask import session as flask_session
1777 from ..database.session_context import get_user_db_session
1779 username = flask_session.get("username")
1780 if not username:
1781 return {
1782 "research_id": research_id,
1783 "research_details": {},
1784 "timeline": [],
1785 "summary": {
1786 "total_calls": 0,
1787 "total_tokens": 0,
1788 "total_prompt_tokens": 0,
1789 "total_completion_tokens": 0,
1790 "avg_response_time": 0,
1791 "success_rate": 0,
1792 },
1793 "phase_stats": {},
1794 }
1796 with get_user_db_session(username) as session:
1797 # Get all token usage for this research ordered by time including call stack
1798 timeline_data = session.execute(
1799 text(
1800 """
1801 SELECT
1802 timestamp,
1803 total_tokens,
1804 prompt_tokens,
1805 completion_tokens,
1806 response_time_ms,
1807 success_status,
1808 error_type,
1809 research_phase,
1810 search_iteration,
1811 search_engine_selected,
1812 model_name,
1813 calling_file,
1814 calling_function,
1815 call_stack
1816 FROM token_usage
1817 WHERE research_id = :research_id
1818 ORDER BY timestamp ASC
1819 """
1820 ),
1821 {"research_id": research_id},
1822 ).fetchall()
1824 # Format timeline data with cumulative tokens
1825 timeline = []
1826 cumulative_tokens = 0
1827 cumulative_prompt_tokens = 0
1828 cumulative_completion_tokens = 0
1830 for row in timeline_data:
1831 cumulative_tokens += row[1] or 0
1832 cumulative_prompt_tokens += row[2] or 0
1833 cumulative_completion_tokens += row[3] or 0
1835 timeline.append(
1836 {
1837 "timestamp": str(row[0]) if row[0] else None,
1838 "tokens": row[1] or 0,
1839 "prompt_tokens": row[2] or 0,
1840 "completion_tokens": row[3] or 0,
1841 "cumulative_tokens": cumulative_tokens,
1842 "cumulative_prompt_tokens": cumulative_prompt_tokens,
1843 "cumulative_completion_tokens": cumulative_completion_tokens,
1844 "response_time_ms": row[4],
1845 "success_status": row[5],
1846 "error_type": row[6],
1847 "research_phase": row[7],
1848 "search_iteration": row[8],
1849 "search_engine_selected": row[9],
1850 "model_name": row[10],
1851 "calling_file": row[11],
1852 "calling_function": row[12],
1853 "call_stack": row[13],
1854 }
1855 )
1857 # Get research basic info
1858 research_info = session.execute(
1859 text(
1860 """
1861 SELECT query, mode, status, created_at, completed_at
1862 FROM research_history
1863 WHERE id = :research_id
1864 """
1865 ),
1866 {"research_id": research_id},
1867 ).fetchone()
1869 research_details = {}
1870 if research_info:
1871 research_details = {
1872 "query": research_info[0],
1873 "mode": research_info[1],
1874 "status": research_info[2],
1875 "created_at": str(research_info[3])
1876 if research_info[3]
1877 else None,
1878 "completed_at": str(research_info[4])
1879 if research_info[4]
1880 else None,
1881 }
1883 # Calculate summary stats
1884 total_calls = len(timeline_data)
1885 total_tokens = cumulative_tokens
1886 avg_response_time = sum(row[4] or 0 for row in timeline_data) / max(
1887 total_calls, 1
1888 )
1889 success_rate = (
1890 sum(1 for row in timeline_data if row[5] == "success")
1891 / max(total_calls, 1)
1892 * 100
1893 )
1895 # Phase breakdown for this research
1896 phase_stats = {}
1897 for row in timeline_data:
1898 phase = row[7] or "unknown"
1899 if phase not in phase_stats:
1900 phase_stats[phase] = {
1901 "count": 0,
1902 "tokens": 0,
1903 "avg_response_time": 0,
1904 }
1905 phase_stats[phase]["count"] += 1
1906 phase_stats[phase]["tokens"] += row[1] or 0
1907 if row[4]:
1908 phase_stats[phase]["avg_response_time"] += row[4]
1910 # Calculate averages for phases
1911 for phase in phase_stats:
1912 if phase_stats[phase]["count"] > 0:
1913 phase_stats[phase]["avg_response_time"] = round(
1914 phase_stats[phase]["avg_response_time"]
1915 / phase_stats[phase]["count"]
1916 )
1918 return {
1919 "research_id": research_id,
1920 "research_details": research_details,
1921 "timeline": timeline,
1922 "summary": {
1923 "total_calls": total_calls,
1924 "total_tokens": total_tokens,
1925 "total_prompt_tokens": cumulative_prompt_tokens,
1926 "total_completion_tokens": cumulative_completion_tokens,
1927 "avg_response_time": round(avg_response_time),
1928 "success_rate": round(success_rate, 1),
1929 },
1930 "phase_stats": phase_stats,
1931 }