Coverage for src / local_deep_research / metrics / token_counter.py: 86%

465 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1"""Token counting functionality for LLM usage tracking.""" 

2 

3import inspect 

4import json 

5import time 

6from datetime import datetime, timedelta, UTC 

7from pathlib import Path 

8from typing import Any, Dict, List, Optional 

9 

10from langchain_core.callbacks import BaseCallbackHandler 

11from langchain_core.outputs import LLMResult 

12from loguru import logger 

13from sqlalchemy import func, text 

14 

15from ..database.models import ModelUsage, TokenUsage 

16from .query_utils import get_research_mode_condition, get_time_filter_condition 

17 

18 

19class TokenCountingCallback(BaseCallbackHandler): 

20 """Callback handler for counting tokens across different models.""" 

21 

22 def __init__( 

23 self, 

24 research_id: Optional[str] = None, 

25 research_context: Optional[Dict[str, Any]] = None, 

26 ): 

27 """Initialize the token counting callback. 

28 

29 Args: 

30 research_id: The ID of the research to track tokens for 

31 research_context: Additional research context for enhanced tracking 

32 """ 

33 super().__init__() 

34 self.research_id = research_id 

35 self.research_context = research_context or {} 

36 self.current_model = None 

37 self.current_provider = None 

38 self.preset_model = None # Model name set during callback creation 

39 self.preset_provider = None # Provider set during callback creation 

40 

41 # Phase 1 Enhancement: Track timing and context 

42 self.start_time = None 

43 self.response_time_ms = None 

44 self.success_status = "success" 

45 self.error_type = None 

46 

47 # Call stack tracking 

48 self.calling_file = None 

49 self.calling_function = None 

50 self.call_stack = None 

51 

52 # Context overflow tracking 

53 self.context_limit = None 

54 self.context_truncated = False 

55 self.tokens_truncated = 0 

56 self.truncation_ratio = 0.0 

57 self.original_prompt_estimate = 0 

58 

59 # Raw Ollama response metrics 

60 self.ollama_metrics = {} 

61 

62 # Track token counts in memory 

63 self.counts = { 

64 "total_tokens": 0, 

65 "total_prompt_tokens": 0, 

66 "total_completion_tokens": 0, 

67 "by_model": {}, 

68 } 

69 

70 def on_llm_start( 

71 self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any 

72 ) -> None: 

73 """Called when LLM starts running.""" 

74 # Phase 1 Enhancement: Start timing 

75 self.start_time = time.time() 

76 

77 # Estimate original prompt size (rough estimate: ~4 chars per token) 

78 if prompts: 

79 total_chars = sum(len(prompt) for prompt in prompts) 

80 self.original_prompt_estimate = total_chars // 4 

81 logger.debug( 

82 f"Estimated prompt tokens: {self.original_prompt_estimate} (from {total_chars} chars)" 

83 ) 

84 

85 # Get context limit from research context (will be set from settings) 

86 self.context_limit = self.research_context.get("context_limit") 

87 

88 # Phase 1 Enhancement: Capture call stack information 

89 try: 

90 stack = inspect.stack() 

91 

92 # Skip the first few frames (this method, langchain internals) 

93 # Look for the first frame that's in our project directory 

94 for frame_info in stack[1:]: 

95 file_path = frame_info.filename 

96 # Look for any frame containing local_deep_research project 

97 if ( 

98 "local_deep_research" in file_path 

99 and "site-packages" not in file_path 

100 and "venv" not in file_path 

101 ): 

102 # Extract relative path from local_deep_research 

103 if "src/local_deep_research" in file_path: 

104 relative_path = file_path.split( 

105 "src/local_deep_research" 

106 )[-1].lstrip("/") 

107 elif "local_deep_research/src" in file_path: 107 ↛ 111line 107 didn't jump to line 111 because the condition on line 107 was always true

108 relative_path = file_path.split( 

109 "local_deep_research/src" 

110 )[-1].lstrip("/") 

111 elif "local_deep_research" in file_path: 

112 # Get everything after local_deep_research 

113 relative_path = file_path.split("local_deep_research")[ 

114 -1 

115 ].lstrip("/") 

116 else: 

117 relative_path = Path(file_path).name 

118 

119 self.calling_file = relative_path 

120 self.calling_function = frame_info.function 

121 

122 # Capture a simplified call stack (just the relevant frames) 

123 call_stack_frames = [] 

124 for frame in stack[1:6]: # Limit to 5 frames 

125 if ( 

126 "local_deep_research" in frame.filename 

127 and "site-packages" not in frame.filename 

128 and "venv" not in frame.filename 

129 ): 

130 frame_name = f"{Path(frame.filename).name}:{frame.function}:{frame.lineno}" 

131 call_stack_frames.append(frame_name) 

132 

133 self.call_stack = ( 

134 " -> ".join(call_stack_frames) 

135 if call_stack_frames 

136 else None 

137 ) 

138 break 

139 except Exception: 

140 logger.warning("Error capturing call stack") 

141 # Continue without call stack info if there's an error 

142 

143 # First, use preset values if available 

144 if self.preset_model: 

145 self.current_model = self.preset_model 

146 else: 

147 # Try multiple locations for model name 

148 model_name = None 

149 

150 # First check invocation_params 

151 invocation_params = kwargs.get("invocation_params", {}) 

152 model_name = invocation_params.get( 

153 "model" 

154 ) or invocation_params.get("model_name") 

155 

156 # Check kwargs directly 

157 if not model_name: 

158 model_name = kwargs.get("model") or kwargs.get("model_name") 

159 

160 # Check serialized data 

161 if not model_name and "kwargs" in serialized: 

162 model_name = serialized["kwargs"].get("model") or serialized[ 

163 "kwargs" 

164 ].get("model_name") 

165 

166 # Check for name in serialized data 

167 if not model_name and "name" in serialized: 

168 model_name = serialized["name"] 

169 

170 # If still not found and we have Ollama, try to extract from the instance 

171 if ( 

172 not model_name 

173 and "_type" in serialized 

174 and "ChatOllama" in serialized["_type"] 

175 ): 

176 # For Ollama, the model name might be in the serialized kwargs 

177 if "kwargs" in serialized and "model" in serialized["kwargs"]: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true

178 model_name = serialized["kwargs"]["model"] 

179 else: 

180 # Default to the type if we can't find the actual model 

181 model_name = "ollama" 

182 

183 # Final fallback 

184 if not model_name: 

185 if "_type" in serialized: 

186 model_name = serialized["_type"] 

187 else: 

188 model_name = "unknown" 

189 

190 self.current_model = model_name 

191 

192 # Use preset provider if available 

193 if self.preset_provider: 

194 self.current_provider = self.preset_provider 

195 else: 

196 # Extract provider from serialized type or kwargs 

197 if "_type" in serialized: 

198 type_str = serialized["_type"] 

199 if "ChatOllama" in type_str: 

200 self.current_provider = "ollama" 

201 elif "ChatOpenAI" in type_str: 

202 self.current_provider = "openai" 

203 elif "ChatAnthropic" in type_str: 

204 self.current_provider = "anthropic" 

205 else: 

206 self.current_provider = kwargs.get("provider", "unknown") 

207 else: 

208 self.current_provider = kwargs.get("provider", "unknown") 

209 

210 # Initialize model tracking if needed 

211 if self.current_model not in self.counts["by_model"]: 

212 self.counts["by_model"][self.current_model] = { 

213 "prompt_tokens": 0, 

214 "completion_tokens": 0, 

215 "total_tokens": 0, 

216 "calls": 0, 

217 "provider": self.current_provider, 

218 } 

219 

220 # Increment call count 

221 self.counts["by_model"][self.current_model]["calls"] += 1 

222 

223 def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: 

224 """Called when LLM ends running.""" 

225 # Phase 1 Enhancement: Calculate response time 

226 if self.start_time: 

227 self.response_time_ms = int((time.time() - self.start_time) * 1000) 

228 

229 # Extract token usage from response 

230 token_usage = None 

231 

232 # Check multiple locations for token usage 

233 if hasattr(response, "llm_output") and response.llm_output: 

234 token_usage = response.llm_output.get( 

235 "token_usage" 

236 ) or response.llm_output.get("usage", {}) 

237 

238 # Check for usage metadata in generations (Ollama specific) 

239 if not token_usage and hasattr(response, "generations"): 

240 for generation_list in response.generations: 

241 for generation in generation_list: 

242 if hasattr(generation, "message") and hasattr( 

243 generation.message, "usage_metadata" 

244 ): 

245 usage_meta = generation.message.usage_metadata 

246 if usage_meta: # Check if usage_metadata is not None 

247 token_usage = { 

248 "prompt_tokens": usage_meta.get( 

249 "input_tokens", 0 

250 ), 

251 "completion_tokens": usage_meta.get( 

252 "output_tokens", 0 

253 ), 

254 "total_tokens": usage_meta.get( 

255 "total_tokens", 0 

256 ), 

257 } 

258 break 

259 # Also check response_metadata 

260 if hasattr(generation, "message") and hasattr( 

261 generation.message, "response_metadata" 

262 ): 

263 resp_meta = generation.message.response_metadata 

264 if resp_meta.get("prompt_eval_count") or resp_meta.get( 

265 "eval_count" 

266 ): 

267 # Capture raw Ollama metrics 

268 self.ollama_metrics = { 

269 "prompt_eval_count": resp_meta.get( 

270 "prompt_eval_count" 

271 ), 

272 "eval_count": resp_meta.get("eval_count"), 

273 "total_duration": resp_meta.get( 

274 "total_duration" 

275 ), 

276 "load_duration": resp_meta.get("load_duration"), 

277 "prompt_eval_duration": resp_meta.get( 

278 "prompt_eval_duration" 

279 ), 

280 "eval_duration": resp_meta.get("eval_duration"), 

281 } 

282 

283 # Check for context overflow 

284 prompt_eval_count = resp_meta.get( 

285 "prompt_eval_count", 0 

286 ) 

287 if self.context_limit and prompt_eval_count > 0: 

288 # Check if we're near or at the context limit 

289 if ( 

290 prompt_eval_count 

291 >= self.context_limit * 0.95 

292 ): # 95% threshold 

293 self.context_truncated = True 

294 

295 # Estimate tokens truncated 

296 if ( 

297 self.original_prompt_estimate 

298 > prompt_eval_count 

299 ): 

300 self.tokens_truncated = max( 

301 0, 

302 self.original_prompt_estimate 

303 - prompt_eval_count, 

304 ) 

305 self.truncation_ratio = ( 

306 self.tokens_truncated 

307 / self.original_prompt_estimate 

308 if self.original_prompt_estimate > 0 

309 else 0 

310 ) 

311 logger.warning( 

312 f"Context overflow detected! " 

313 f"Prompt tokens: {prompt_eval_count}/{self.context_limit} " 

314 f"(estimated {self.tokens_truncated} tokens truncated, " 

315 f"{self.truncation_ratio:.1%} of prompt)" 

316 ) 

317 

318 token_usage = { 

319 "prompt_tokens": resp_meta.get( 

320 "prompt_eval_count", 0 

321 ), 

322 "completion_tokens": resp_meta.get( 

323 "eval_count", 0 

324 ), 

325 "total_tokens": resp_meta.get( 

326 "prompt_eval_count", 0 

327 ) 

328 + resp_meta.get("eval_count", 0), 

329 } 

330 break 

331 if token_usage: 

332 break 

333 

334 if token_usage and isinstance(token_usage, dict): 

335 prompt_tokens = token_usage.get("prompt_tokens", 0) 

336 completion_tokens = token_usage.get("completion_tokens", 0) 

337 total_tokens = token_usage.get( 

338 "total_tokens", prompt_tokens + completion_tokens 

339 ) 

340 

341 # Update in-memory counts 

342 self.counts["total_prompt_tokens"] += prompt_tokens 

343 self.counts["total_completion_tokens"] += completion_tokens 

344 self.counts["total_tokens"] += total_tokens 

345 

346 if self.current_model: 

347 self.counts["by_model"][self.current_model][ 

348 "prompt_tokens" 

349 ] += prompt_tokens 

350 self.counts["by_model"][self.current_model][ 

351 "completion_tokens" 

352 ] += completion_tokens 

353 self.counts["by_model"][self.current_model]["total_tokens"] += ( 

354 total_tokens 

355 ) 

356 

357 # Save to database if we have a research_id 

358 if self.research_id: 

359 self._save_to_db(prompt_tokens, completion_tokens) 

360 

361 def on_llm_error(self, error, **kwargs: Any) -> None: 

362 """Called when LLM encounters an error.""" 

363 # Phase 1 Enhancement: Track errors 

364 if self.start_time: 

365 self.response_time_ms = int((time.time() - self.start_time) * 1000) 

366 

367 self.success_status = "error" 

368 self.error_type = str(type(error).__name__) 

369 

370 # Still save to database to track failed calls 

371 if self.research_id: 

372 self._save_to_db(0, 0) 

373 

374 def _get_context_overflow_fields(self) -> Dict[str, Any]: 

375 """Get context overflow detection fields for database saving.""" 

376 return { 

377 "context_limit": self.context_limit, 

378 "context_truncated": self.context_truncated, # Now Boolean 

379 "tokens_truncated": self.tokens_truncated 

380 if self.context_truncated 

381 else None, 

382 "truncation_ratio": self.truncation_ratio 

383 if self.context_truncated 

384 else None, 

385 # Raw Ollama metrics 

386 "ollama_prompt_eval_count": self.ollama_metrics.get( 

387 "prompt_eval_count" 

388 ), 

389 "ollama_eval_count": self.ollama_metrics.get("eval_count"), 

390 "ollama_total_duration": self.ollama_metrics.get("total_duration"), 

391 "ollama_load_duration": self.ollama_metrics.get("load_duration"), 

392 "ollama_prompt_eval_duration": self.ollama_metrics.get( 

393 "prompt_eval_duration" 

394 ), 

395 "ollama_eval_duration": self.ollama_metrics.get("eval_duration"), 

396 } 

397 

398 def _save_to_db(self, prompt_tokens: int, completion_tokens: int): 

399 """Save token usage to the database.""" 

400 # Check if we're in a thread - if so, queue the save for later 

401 import threading 

402 

403 if threading.current_thread().name != "MainThread": 

404 # Use thread-safe metrics database for background threads 

405 username = ( 

406 self.research_context.get("username") 

407 if self.research_context 

408 else None 

409 ) 

410 

411 if not username: 

412 logger.warning( 

413 f"Cannot save token metrics - no username in research context. " 

414 f"Token usage: prompt={prompt_tokens}, completion={completion_tokens}, " 

415 f"Research context: {self.research_context}" 

416 ) 

417 return 

418 

419 # Import the thread-safe metrics database 

420 

421 # Prepare token data 

422 token_data = { 

423 "model_name": self.current_model, 

424 "provider": self.current_provider, 

425 "prompt_tokens": prompt_tokens, 

426 "completion_tokens": completion_tokens, 

427 "research_query": self.research_context.get("research_query"), 

428 "research_mode": self.research_context.get("research_mode"), 

429 "research_phase": self.research_context.get("research_phase"), 

430 "search_iteration": self.research_context.get( 

431 "search_iteration" 

432 ), 

433 "response_time_ms": self.response_time_ms, 

434 "success_status": self.success_status, 

435 "error_type": self.error_type, 

436 "search_engines_planned": self.research_context.get( 

437 "search_engines_planned" 

438 ), 

439 "search_engine_selected": self.research_context.get( 

440 "search_engine_selected" 

441 ), 

442 "calling_file": self.calling_file, 

443 "calling_function": self.calling_function, 

444 "call_stack": self.call_stack, 

445 # Add context overflow fields using helper method 

446 **self._get_context_overflow_fields(), 

447 } 

448 

449 # Convert list to JSON string if needed 

450 if isinstance(token_data.get("search_engines_planned"), list): 

451 token_data["search_engines_planned"] = json.dumps( 

452 token_data["search_engines_planned"] 

453 ) 

454 

455 # Get password from research context 

456 password = self.research_context.get("user_password") 

457 if not password: 

458 logger.warning( 

459 f"Cannot save token metrics - no password in research context. " 

460 f"Username: {username}, Token usage: prompt={prompt_tokens}, completion={completion_tokens}" 

461 ) 

462 return 

463 

464 # Write metrics directly using thread-safe database 

465 try: 

466 from ..database.thread_metrics import metrics_writer 

467 

468 # Set password for this thread 

469 metrics_writer.set_user_password(username, password) 

470 

471 # Write metrics to encrypted database 

472 metrics_writer.write_token_metrics( 

473 username, self.research_id, token_data 

474 ) 

475 except Exception: 

476 logger.exception("Failed to write metrics from thread") 

477 return 

478 

479 # In MainThread, save directly 

480 try: 

481 from flask import session as flask_session 

482 from ..database.session_context import get_user_db_session 

483 

484 username = flask_session.get("username") 

485 if not username: 

486 logger.debug("No user session, skipping token metrics save") 

487 return 

488 

489 with get_user_db_session(username) as session: 

490 # Phase 1 Enhancement: Prepare additional context 

491 research_query = self.research_context.get("research_query") 

492 research_mode = self.research_context.get("research_mode") 

493 research_phase = self.research_context.get("research_phase") 

494 search_iteration = self.research_context.get("search_iteration") 

495 search_engines_planned = self.research_context.get( 

496 "search_engines_planned" 

497 ) 

498 search_engine_selected = self.research_context.get( 

499 "search_engine_selected" 

500 ) 

501 

502 # Debug logging for search engine context 

503 if search_engines_planned or search_engine_selected: 

504 logger.info( 

505 f"Token tracking - Search context: planned={search_engines_planned}, selected={search_engine_selected}, phase={research_phase}" 

506 ) 

507 else: 

508 logger.debug( 

509 f"Token tracking - No search engine context yet, phase={research_phase}" 

510 ) 

511 

512 # Convert list to JSON string if needed 

513 if isinstance(search_engines_planned, list): 

514 search_engines_planned = json.dumps(search_engines_planned) 

515 

516 # Log context overflow detection values before saving 

517 logger.debug( 

518 f"Saving TokenUsage - context_limit: {self.context_limit}, " 

519 f"context_truncated: {self.context_truncated}, " 

520 f"tokens_truncated: {self.tokens_truncated}, " 

521 f"ollama_prompt_eval_count: {self.ollama_metrics.get('prompt_eval_count')}, " 

522 f"prompt_tokens: {prompt_tokens}, " 

523 f"completion_tokens: {completion_tokens}" 

524 ) 

525 

526 # Add token usage record with enhanced fields 

527 token_usage = TokenUsage( 

528 research_id=self.research_id, 

529 model_name=self.current_model, 

530 model_provider=self.current_provider, # Added provider 

531 # for accurate cost tracking 

532 prompt_tokens=prompt_tokens, 

533 completion_tokens=completion_tokens, 

534 total_tokens=prompt_tokens + completion_tokens, 

535 # Phase 1 Enhancement: Research context 

536 research_query=research_query, 

537 research_mode=research_mode, 

538 research_phase=research_phase, 

539 search_iteration=search_iteration, 

540 # Phase 1 Enhancement: Performance metrics 

541 response_time_ms=self.response_time_ms, 

542 success_status=self.success_status, 

543 error_type=self.error_type, 

544 # Phase 1 Enhancement: Search engine context 

545 search_engines_planned=search_engines_planned, 

546 search_engine_selected=search_engine_selected, 

547 # Phase 1 Enhancement: Call stack tracking 

548 calling_file=self.calling_file, 

549 calling_function=self.calling_function, 

550 call_stack=self.call_stack, 

551 # Add context overflow fields using helper method 

552 **self._get_context_overflow_fields(), 

553 ) 

554 session.add(token_usage) 

555 

556 # Update or create model usage statistics 

557 model_usage = ( 

558 session.query(ModelUsage) 

559 .filter_by( 

560 model_name=self.current_model, 

561 ) 

562 .first() 

563 ) 

564 

565 if model_usage: 

566 model_usage.total_tokens += ( 

567 prompt_tokens + completion_tokens 

568 ) 

569 model_usage.total_calls += 1 

570 else: 

571 model_usage = ModelUsage( 

572 model_name=self.current_model, 

573 model_provider=self.current_provider, 

574 total_tokens=prompt_tokens + completion_tokens, 

575 total_calls=1, 

576 ) 

577 session.add(model_usage) 

578 

579 # Commit the transaction 

580 session.commit() 

581 

582 except Exception: 

583 logger.exception("Error saving token usage to database") 

584 

585 def get_counts(self) -> Dict[str, Any]: 

586 """Get the current token counts.""" 

587 return self.counts 

588 

589 

590class TokenCounter: 

591 """Manager class for token counting across the application.""" 

592 

593 def __init__(self): 

594 """Initialize the token counter.""" 

595 

596 def create_callback( 

597 self, 

598 research_id: Optional[str] = None, 

599 research_context: Optional[Dict[str, Any]] = None, 

600 ) -> TokenCountingCallback: 

601 """Create a new token counting callback. 

602 

603 Args: 

604 research_id: The ID of the research to track tokens for 

605 research_context: Additional research context for enhanced tracking 

606 

607 Returns: 

608 A new TokenCountingCallback instance 

609 """ 

610 return TokenCountingCallback( 

611 research_id=research_id, research_context=research_context 

612 ) 

613 

614 def get_research_metrics(self, research_id: str) -> Dict[str, Any]: 

615 """Get token metrics for a specific research. 

616 

617 Args: 

618 research_id: The ID of the research 

619 

620 Returns: 

621 Dictionary containing token usage metrics 

622 """ 

623 from flask import session as flask_session 

624 

625 from ..database.session_context import get_user_db_session 

626 

627 username = flask_session.get("username") 

628 if not username: 

629 return { 

630 "research_id": research_id, 

631 "total_tokens": 0, 

632 "total_calls": 0, 

633 "model_usage": [], 

634 } 

635 

636 with get_user_db_session(username) as session: 

637 # Get token usage for this research from TokenUsage table 

638 from sqlalchemy import func 

639 

640 token_usages = ( 

641 session.query( 

642 TokenUsage.model_name, 

643 TokenUsage.model_provider, 

644 func.sum(TokenUsage.prompt_tokens).label("prompt_tokens"), 

645 func.sum(TokenUsage.completion_tokens).label( 

646 "completion_tokens" 

647 ), 

648 func.sum(TokenUsage.total_tokens).label("total_tokens"), 

649 func.count().label("calls"), 

650 ) 

651 .filter_by(research_id=research_id) 

652 .group_by(TokenUsage.model_name, TokenUsage.model_provider) 

653 .order_by(func.sum(TokenUsage.total_tokens).desc()) 

654 .all() 

655 ) 

656 

657 model_usage = [] 

658 total_tokens = 0 

659 total_calls = 0 

660 

661 for usage in token_usages: 

662 model_usage.append( 

663 { 

664 "model": usage.model_name, 

665 "provider": usage.model_provider, 

666 "tokens": usage.total_tokens or 0, 

667 "calls": usage.calls or 0, 

668 "prompt_tokens": usage.prompt_tokens or 0, 

669 "completion_tokens": usage.completion_tokens or 0, 

670 } 

671 ) 

672 total_tokens += usage.total_tokens or 0 

673 total_calls += usage.calls or 0 

674 

675 return { 

676 "research_id": research_id, 

677 "total_tokens": total_tokens, 

678 "total_calls": total_calls, 

679 "model_usage": model_usage, 

680 } 

681 

682 def get_overall_metrics( 

683 self, period: str = "30d", research_mode: str = "all" 

684 ) -> Dict[str, Any]: 

685 """Get overall token metrics across all researches. 

686 

687 Args: 

688 period: Time period to filter by ('7d', '30d', '3m', '1y', 'all') 

689 research_mode: Research mode to filter by ('quick', 'detailed', 'all') 

690 

691 Returns: 

692 Dictionary containing overall metrics 

693 """ 

694 return self._get_metrics_from_encrypted_db(period, research_mode) 

695 

696 def _get_metrics_from_encrypted_db( 

697 self, period: str, research_mode: str 

698 ) -> Dict[str, Any]: 

699 """Get metrics from user's encrypted database.""" 

700 from flask import session as flask_session 

701 

702 from ..database.session_context import get_user_db_session 

703 

704 username = flask_session.get("username") 

705 if not username: 

706 return self._get_empty_metrics() 

707 

708 try: 

709 with get_user_db_session(username) as session: 

710 # Build base query with filters 

711 query = session.query(TokenUsage) 

712 

713 # Apply time filter 

714 time_condition = get_time_filter_condition( 

715 period, TokenUsage.timestamp 

716 ) 

717 if time_condition is not None: 

718 query = query.filter(time_condition) 

719 

720 # Apply research mode filter 

721 mode_condition = get_research_mode_condition( 

722 research_mode, TokenUsage.research_mode 

723 ) 

724 if mode_condition is not None: 

725 query = query.filter(mode_condition) 

726 

727 # Total tokens from TokenUsage 

728 total_tokens = ( 

729 query.with_entities( 

730 func.sum(TokenUsage.total_tokens) 

731 ).scalar() 

732 or 0 

733 ) 

734 

735 # Import ResearchHistory model 

736 from ..database.models.research import ResearchHistory 

737 

738 # Count researches from ResearchHistory table 

739 research_query = session.query(func.count(ResearchHistory.id)) 

740 

741 # Debug: Check if any research history records exist at all 

742 all_research_count = ( 

743 session.query(func.count(ResearchHistory.id)).scalar() or 0 

744 ) 

745 logger.debug( 

746 f"Total ResearchHistory records in database: {all_research_count}" 

747 ) 

748 

749 # Debug: List first few research IDs and their timestamps 

750 sample_researches = ( 

751 session.query( 

752 ResearchHistory.id, 

753 ResearchHistory.created_at, 

754 ResearchHistory.mode, 

755 ) 

756 .limit(5) 

757 .all() 

758 ) 

759 if sample_researches: 759 ↛ 760line 759 didn't jump to line 760 because the condition on line 759 was never true

760 logger.debug("Sample ResearchHistory records:") 

761 for r_id, r_created, r_mode in sample_researches: 

762 logger.debug( 

763 f" - ID: {r_id}, Created: {r_created}, Mode: {r_mode}" 

764 ) 

765 else: 

766 logger.debug("No ResearchHistory records found in database") 

767 

768 # Get time filter conditions for ResearchHistory query 

769 start_time, end_time = None, None 

770 if period != "all": 

771 if period == "today": 771 ↛ 772line 771 didn't jump to line 772 because the condition on line 771 was never true

772 start_time = datetime.now(UTC).replace( 

773 hour=0, minute=0, second=0, microsecond=0 

774 ) 

775 elif period == "week": 775 ↛ 776line 775 didn't jump to line 776 because the condition on line 775 was never true

776 start_time = datetime.now(UTC) - timedelta(days=7) 

777 elif period == "month": 777 ↛ 778line 777 didn't jump to line 778 because the condition on line 777 was never true

778 start_time = datetime.now(UTC) - timedelta(days=30) 

779 

780 if start_time: 780 ↛ 781line 780 didn't jump to line 781 because the condition on line 780 was never true

781 end_time = datetime.now(UTC) 

782 

783 # Apply time filter if specified 

784 if start_time and end_time: 784 ↛ 785line 784 didn't jump to line 785 because the condition on line 784 was never true

785 research_query = research_query.filter( 

786 ResearchHistory.created_at >= start_time.isoformat(), 

787 ResearchHistory.created_at <= end_time.isoformat(), 

788 ) 

789 

790 # Apply mode filter if specified 

791 mode_filter = research_mode if research_mode != "all" else None 

792 if mode_filter: 

793 logger.debug(f"Applying mode filter: {mode_filter}") 

794 research_query = research_query.filter( 

795 ResearchHistory.mode == mode_filter 

796 ) 

797 

798 total_researches = research_query.scalar() or 0 

799 logger.debug( 

800 f"Final filtered research count: {total_researches}" 

801 ) 

802 

803 # Also check distinct research_ids in TokenUsage for comparison 

804 token_research_count = ( 

805 session.query( 

806 func.count(func.distinct(TokenUsage.research_id)) 

807 ).scalar() 

808 or 0 

809 ) 

810 logger.debug( 

811 f"Distinct research_ids in TokenUsage: {token_research_count}" 

812 ) 

813 

814 # Model statistics using ORM aggregation 

815 model_stats_query = session.query( 

816 TokenUsage.model_name, 

817 func.sum(TokenUsage.total_tokens).label("tokens"), 

818 func.count().label("calls"), 

819 func.sum(TokenUsage.prompt_tokens).label("prompt_tokens"), 

820 func.sum(TokenUsage.completion_tokens).label( 

821 "completion_tokens" 

822 ), 

823 ).filter(TokenUsage.model_name.isnot(None)) 

824 

825 # Apply same filters to model stats 

826 if time_condition is not None: 

827 model_stats_query = model_stats_query.filter(time_condition) 

828 if mode_condition is not None: 

829 model_stats_query = model_stats_query.filter(mode_condition) 

830 

831 model_stats = ( 

832 model_stats_query.group_by(TokenUsage.model_name) 

833 .order_by(func.sum(TokenUsage.total_tokens).desc()) 

834 .all() 

835 ) 

836 

837 # Batch load provider info from ModelUsage table (fix N+1) 

838 model_names = [stat.model_name for stat in model_stats] 

839 provider_map = {} 

840 if model_names: 840 ↛ 841line 840 didn't jump to line 841 because the condition on line 840 was never true

841 provider_results = ( 

842 session.query( 

843 ModelUsage.model_name, ModelUsage.model_provider 

844 ) 

845 .filter(ModelUsage.model_name.in_(model_names)) 

846 .order_by(ModelUsage.id) 

847 .all() 

848 ) 

849 for model_name, model_provider in provider_results: 

850 provider_map.setdefault(model_name, model_provider) 

851 

852 by_model = [] 

853 for stat in model_stats: 853 ↛ 854line 853 didn't jump to line 854 because the loop on line 853 never started

854 provider = provider_map.get(stat.model_name, "unknown") 

855 

856 by_model.append( 

857 { 

858 "model": stat.model_name, 

859 "provider": provider, 

860 "tokens": stat.tokens, 

861 "calls": stat.calls, 

862 "prompt_tokens": stat.prompt_tokens, 

863 "completion_tokens": stat.completion_tokens, 

864 } 

865 ) 

866 

867 # Get recent researches with token usage 

868 # Note: This requires research_history table - for now we'll use available data 

869 recent_research_query = session.query( 

870 TokenUsage.research_id, 

871 func.sum(TokenUsage.total_tokens).label("token_count"), 

872 func.max(TokenUsage.timestamp).label("latest_timestamp"), 

873 ).filter(TokenUsage.research_id.isnot(None)) 

874 

875 if time_condition is not None: 

876 recent_research_query = recent_research_query.filter( 

877 time_condition 

878 ) 

879 if mode_condition is not None: 

880 recent_research_query = recent_research_query.filter( 

881 mode_condition 

882 ) 

883 

884 recent_research_data = ( 

885 recent_research_query.group_by(TokenUsage.research_id) 

886 .order_by(func.max(TokenUsage.timestamp).desc()) 

887 .limit(10) 

888 .all() 

889 ) 

890 

891 # Batch load research queries for recent researches (fix N+1) 

892 recent_research_ids = [ 

893 r.research_id for r in recent_research_data 

894 ] 

895 research_query_map = {} 

896 if recent_research_ids: 896 ↛ 898line 896 didn't jump to line 898 because the condition on line 896 was never true

897 # Get first non-null research_query for each research_id 

898 query_results = ( 

899 session.query( 

900 TokenUsage.research_id, TokenUsage.research_query 

901 ) 

902 .filter( 

903 TokenUsage.research_id.in_(recent_research_ids), 

904 TokenUsage.research_query.isnot(None), 

905 ) 

906 .order_by(TokenUsage.id) 

907 .all() 

908 ) 

909 for research_id, research_query in query_results: 

910 if research_id not in research_query_map: 

911 research_query_map[research_id] = research_query 

912 

913 recent_researches = [] 

914 for research_data in recent_research_data: 914 ↛ 915line 914 didn't jump to line 915 because the loop on line 914 never started

915 query_text = research_query_map.get( 

916 research_data.research_id, 

917 f"Research {research_data.research_id}", 

918 ) 

919 

920 recent_researches.append( 

921 { 

922 "id": research_data.research_id, 

923 "query": query_text, 

924 "tokens": research_data.token_count or 0, 

925 "created_at": research_data.latest_timestamp, 

926 } 

927 ) 

928 

929 # Token breakdown statistics 

930 breakdown_query = query.with_entities( 

931 func.sum(TokenUsage.prompt_tokens).label( 

932 "total_input_tokens" 

933 ), 

934 func.sum(TokenUsage.completion_tokens).label( 

935 "total_output_tokens" 

936 ), 

937 func.avg(TokenUsage.prompt_tokens).label( 

938 "avg_input_tokens" 

939 ), 

940 func.avg(TokenUsage.completion_tokens).label( 

941 "avg_output_tokens" 

942 ), 

943 func.avg(TokenUsage.total_tokens).label("avg_total_tokens"), 

944 ) 

945 token_breakdown = breakdown_query.first() 

946 

947 # Get rate limiting metrics 

948 from ..database.models import ( 

949 RateLimitAttempt, 

950 RateLimitEstimate, 

951 ) 

952 

953 # Get rate limit attempts 

954 rate_limit_query = session.query(RateLimitAttempt) 

955 

956 # Apply time filter 

957 if time_condition is not None: 

958 # RateLimitAttempt uses timestamp as float, not datetime 

959 if period == "7d": 

960 cutoff_time = time.time() - (7 * 24 * 3600) 

961 elif period == "30d": 961 ↛ 963line 961 didn't jump to line 963 because the condition on line 961 was always true

962 cutoff_time = time.time() - (30 * 24 * 3600) 

963 elif period == "3m": 

964 cutoff_time = time.time() - (90 * 24 * 3600) 

965 elif period == "1y": 

966 cutoff_time = time.time() - (365 * 24 * 3600) 

967 else: # all 

968 cutoff_time = 0 

969 

970 if cutoff_time > 0: 970 ↛ 976line 970 didn't jump to line 976 because the condition on line 970 was always true

971 rate_limit_query = rate_limit_query.filter( 

972 RateLimitAttempt.timestamp >= cutoff_time 

973 ) 

974 

975 # Get rate limit statistics 

976 total_attempts = rate_limit_query.count() 

977 successful_attempts = rate_limit_query.filter( 

978 RateLimitAttempt.success 

979 ).count() 

980 failed_attempts = total_attempts - successful_attempts 

981 

982 # Count rate limiting events (failures with RateLimitError) 

983 rate_limit_events = rate_limit_query.filter( 

984 ~RateLimitAttempt.success, 

985 RateLimitAttempt.error_type == "RateLimitError", 

986 ).count() 

987 

988 logger.debug( 

989 f"Rate limit attempts in database: total={total_attempts}, successful={successful_attempts}" 

990 ) 

991 

992 # Get all attempts for detailed calculations 

993 attempts = rate_limit_query.all() 

994 

995 # Calculate average wait times 

996 if attempts: 996 ↛ 997line 996 didn't jump to line 997 because the condition on line 996 was never true

997 avg_wait_time = sum(a.wait_time for a in attempts) / len( 

998 attempts 

999 ) 

1000 successful_wait_times = [ 

1001 a.wait_time for a in attempts if a.success 

1002 ] 

1003 avg_successful_wait = ( 

1004 sum(successful_wait_times) / len(successful_wait_times) 

1005 if successful_wait_times 

1006 else 0 

1007 ) 

1008 else: 

1009 avg_wait_time = 0 

1010 avg_successful_wait = 0 

1011 

1012 # Get tracked engines - count distinct engine types from attempts 

1013 tracked_engines_query = session.query( 

1014 func.count(func.distinct(RateLimitAttempt.engine_type)) 

1015 ) 

1016 if cutoff_time > 0: 1016 ↛ 1020line 1016 didn't jump to line 1020 because the condition on line 1016 was always true

1017 tracked_engines_query = tracked_engines_query.filter( 

1018 RateLimitAttempt.timestamp >= cutoff_time 

1019 ) 

1020 tracked_engines = tracked_engines_query.scalar() or 0 

1021 

1022 # Get engine-specific stats from attempts 

1023 engine_stats = [] 

1024 

1025 # Get distinct engine types from attempts 

1026 engine_types_query = session.query( 

1027 RateLimitAttempt.engine_type 

1028 ).distinct() 

1029 if cutoff_time > 0: 1029 ↛ 1033line 1029 didn't jump to line 1033 because the condition on line 1029 was always true

1030 engine_types_query = engine_types_query.filter( 

1031 RateLimitAttempt.timestamp >= cutoff_time 

1032 ) 

1033 engine_types = [ 

1034 row.engine_type for row in engine_types_query.all() 

1035 ] 

1036 

1037 # Batch-load all estimates (fix N+1 query) 

1038 estimates_map = {} 

1039 if engine_types: 1039 ↛ 1040line 1039 didn't jump to line 1040 because the condition on line 1039 was never true

1040 all_estimates = ( 

1041 session.query(RateLimitEstimate) 

1042 .filter(RateLimitEstimate.engine_type.in_(engine_types)) 

1043 .all() 

1044 ) 

1045 estimates_map = {e.engine_type: e for e in all_estimates} 

1046 

1047 for engine_type in engine_types: 1047 ↛ 1048line 1047 didn't jump to line 1048 because the loop on line 1047 never started

1048 engine_attempts_list = [ 

1049 a for a in attempts if a.engine_type == engine_type 

1050 ] 

1051 engine_attempts = len(engine_attempts_list) 

1052 engine_success = len( 

1053 [a for a in engine_attempts_list if a.success] 

1054 ) 

1055 

1056 # Get estimate if exists 

1057 estimate = estimates_map.get(engine_type) 

1058 

1059 # Calculate recent success rate 

1060 recent_success_rate = ( 

1061 (engine_success / engine_attempts * 100) 

1062 if engine_attempts > 0 

1063 else 0 

1064 ) 

1065 

1066 # Determine status based on success rate 

1067 if estimate: 

1068 status = ( 

1069 "healthy" 

1070 if estimate.success_rate > 0.8 

1071 else "degraded" 

1072 if estimate.success_rate > 0.5 

1073 else "poor" 

1074 ) 

1075 else: 

1076 status = ( 

1077 "healthy" 

1078 if recent_success_rate > 80 

1079 else "degraded" 

1080 if recent_success_rate > 50 

1081 else "poor" 

1082 ) 

1083 

1084 engine_stat = { 

1085 "engine": engine_type, 

1086 "base_wait": estimate.base_wait_seconds 

1087 if estimate 

1088 else 0.0, 

1089 "base_wait_seconds": round( 

1090 estimate.base_wait_seconds if estimate else 0.0, 2 

1091 ), 

1092 "min_wait_seconds": round( 

1093 estimate.min_wait_seconds if estimate else 0.0, 2 

1094 ), 

1095 "max_wait_seconds": round( 

1096 estimate.max_wait_seconds if estimate else 0.0, 2 

1097 ), 

1098 "success_rate": round(estimate.success_rate * 100, 1) 

1099 if estimate 

1100 else recent_success_rate, 

1101 "total_attempts": estimate.total_attempts 

1102 if estimate 

1103 else engine_attempts, 

1104 "recent_attempts": engine_attempts, 

1105 "recent_success_rate": round(recent_success_rate, 1), 

1106 "attempts": engine_attempts, 

1107 "status": status, 

1108 } 

1109 

1110 if estimate: 

1111 engine_stat["last_updated"] = datetime.fromtimestamp( 

1112 estimate.last_updated 

1113 ).strftime("%Y-%m-%d %H:%M:%S") 

1114 else: 

1115 engine_stat["last_updated"] = "Never" 

1116 

1117 engine_stats.append(engine_stat) 

1118 

1119 logger.debug( 

1120 f"Tracked engines: {tracked_engines}, engine_stats: {engine_stats}" 

1121 ) 

1122 

1123 result = { 

1124 "total_tokens": total_tokens, 

1125 "total_researches": total_researches, 

1126 "by_model": by_model, 

1127 "recent_researches": recent_researches, 

1128 "token_breakdown": { 

1129 "total_input_tokens": int( 

1130 token_breakdown.total_input_tokens or 0 

1131 ), 

1132 "total_output_tokens": int( 

1133 token_breakdown.total_output_tokens or 0 

1134 ), 

1135 "avg_input_tokens": int( 

1136 token_breakdown.avg_input_tokens or 0 

1137 ), 

1138 "avg_output_tokens": int( 

1139 token_breakdown.avg_output_tokens or 0 

1140 ), 

1141 "avg_total_tokens": int( 

1142 token_breakdown.avg_total_tokens or 0 

1143 ), 

1144 }, 

1145 "rate_limiting": { 

1146 "total_attempts": total_attempts, 

1147 "successful_attempts": successful_attempts, 

1148 "failed_attempts": failed_attempts, 

1149 "success_rate": ( 

1150 successful_attempts / total_attempts * 100 

1151 ) 

1152 if total_attempts > 0 

1153 else 0, 

1154 "rate_limit_events": rate_limit_events, 

1155 "avg_wait_time": round(float(avg_wait_time), 2), 

1156 "avg_successful_wait": round( 

1157 float(avg_successful_wait), 2 

1158 ), 

1159 "tracked_engines": tracked_engines, 

1160 "engine_stats": engine_stats, 

1161 "total_engines_tracked": tracked_engines, 

1162 "healthy_engines": len( 

1163 [ 

1164 s 

1165 for s in engine_stats 

1166 if s["status"] == "healthy" 

1167 ] 

1168 ), 

1169 "degraded_engines": len( 

1170 [ 

1171 s 

1172 for s in engine_stats 

1173 if s["status"] == "degraded" 

1174 ] 

1175 ), 

1176 "poor_engines": len( 

1177 [s for s in engine_stats if s["status"] == "poor"] 

1178 ), 

1179 }, 

1180 } 

1181 

1182 logger.debug( 

1183 f"Returning from _get_metrics_from_encrypted_db - total_researches: {result['total_researches']}" 

1184 ) 

1185 return result 

1186 except Exception: 

1187 logger.exception( 

1188 "CRITICAL ERROR accessing encrypted database for metrics" 

1189 ) 

1190 return self._get_empty_metrics() 

1191 

1192 def _get_empty_metrics(self) -> Dict[str, Any]: 

1193 """Return empty metrics structure when no data is available.""" 

1194 return { 

1195 "total_tokens": 0, 

1196 "total_researches": 0, 

1197 "by_model": [], 

1198 "recent_researches": [], 

1199 "token_breakdown": { 

1200 "prompt_tokens": 0, 

1201 "completion_tokens": 0, 

1202 "avg_prompt_tokens": 0, 

1203 "avg_completion_tokens": 0, 

1204 "avg_total_tokens": 0, 

1205 }, 

1206 } 

1207 

1208 def get_enhanced_metrics( 

1209 self, period: str = "30d", research_mode: str = "all" 

1210 ) -> Dict[str, Any]: 

1211 """Get enhanced Phase 1 tracking metrics. 

1212 

1213 Args: 

1214 period: Time period to filter by ('7d', '30d', '3m', '1y', 'all') 

1215 research_mode: Research mode to filter by ('quick', 'detailed', 'all') 

1216 

1217 Returns: 

1218 Dictionary containing enhanced metrics data including time series 

1219 """ 

1220 from flask import session as flask_session 

1221 

1222 from ..database.session_context import get_user_db_session 

1223 

1224 username = flask_session.get("username") 

1225 if not username: 

1226 # Return empty metrics structure when no user session 

1227 return { 

1228 "recent_enhanced_data": [], 

1229 "performance_stats": { 

1230 "avg_response_time": 0, 

1231 "min_response_time": 0, 

1232 "max_response_time": 0, 

1233 "success_rate": 0, 

1234 "error_rate": 0, 

1235 "total_enhanced_calls": 0, 

1236 }, 

1237 "mode_breakdown": [], 

1238 "search_engine_stats": [], 

1239 "phase_breakdown": [], 

1240 "time_series_data": [], 

1241 "call_stack_analysis": { 

1242 "by_file": [], 

1243 "by_function": [], 

1244 }, 

1245 } 

1246 

1247 try: 

1248 with get_user_db_session(username) as session: 

1249 # Build base query with filters 

1250 query = session.query(TokenUsage) 

1251 

1252 # Apply time filter 

1253 time_condition = get_time_filter_condition( 

1254 period, TokenUsage.timestamp 

1255 ) 

1256 if time_condition is not None: 1256 ↛ 1260line 1256 didn't jump to line 1260 because the condition on line 1256 was always true

1257 query = query.filter(time_condition) 

1258 

1259 # Apply research mode filter 

1260 mode_condition = get_research_mode_condition( 

1261 research_mode, TokenUsage.research_mode 

1262 ) 

1263 if mode_condition is not None: 1263 ↛ 1264line 1263 didn't jump to line 1264 because the condition on line 1263 was never true

1264 query = query.filter(mode_condition) 

1265 

1266 # Get time series data for the chart - most important for "Token Consumption Over Time" 

1267 time_series_query = query.filter( 

1268 TokenUsage.timestamp.isnot(None), 

1269 TokenUsage.total_tokens > 0, 

1270 ).order_by(TokenUsage.timestamp.asc()) 

1271 

1272 # Limit to recent data for performance 

1273 if period != "all": 1273 ↛ 1276line 1273 didn't jump to line 1276 because the condition on line 1273 was always true

1274 time_series_query = time_series_query.limit(200) 

1275 

1276 time_series_data = time_series_query.all() 

1277 

1278 # Format time series data with cumulative calculations 

1279 time_series = [] 

1280 cumulative_tokens = 0 

1281 cumulative_prompt_tokens = 0 

1282 cumulative_completion_tokens = 0 

1283 

1284 for usage in time_series_data: 1284 ↛ 1285line 1284 didn't jump to line 1285 because the loop on line 1284 never started

1285 cumulative_tokens += usage.total_tokens or 0 

1286 cumulative_prompt_tokens += usage.prompt_tokens or 0 

1287 cumulative_completion_tokens += usage.completion_tokens or 0 

1288 

1289 time_series.append( 

1290 { 

1291 "timestamp": str(usage.timestamp) 

1292 if usage.timestamp 

1293 else None, 

1294 "tokens": usage.total_tokens or 0, 

1295 "prompt_tokens": usage.prompt_tokens or 0, 

1296 "completion_tokens": usage.completion_tokens or 0, 

1297 "cumulative_tokens": cumulative_tokens, 

1298 "cumulative_prompt_tokens": cumulative_prompt_tokens, 

1299 "cumulative_completion_tokens": cumulative_completion_tokens, 

1300 "research_id": usage.research_id, 

1301 } 

1302 ) 

1303 

1304 # Basic performance stats using ORM 

1305 performance_query = query.filter( 

1306 TokenUsage.response_time_ms.isnot(None) 

1307 ) 

1308 total_calls = performance_query.count() 

1309 

1310 if total_calls > 0: 

1311 avg_response_time = ( 

1312 performance_query.with_entities( 

1313 func.avg(TokenUsage.response_time_ms) 

1314 ).scalar() 

1315 or 0 

1316 ) 

1317 min_response_time = ( 

1318 performance_query.with_entities( 

1319 func.min(TokenUsage.response_time_ms) 

1320 ).scalar() 

1321 or 0 

1322 ) 

1323 max_response_time = ( 

1324 performance_query.with_entities( 

1325 func.max(TokenUsage.response_time_ms) 

1326 ).scalar() 

1327 or 0 

1328 ) 

1329 success_count = performance_query.filter( 

1330 TokenUsage.success_status == "success" 

1331 ).count() 

1332 error_count = performance_query.filter( 

1333 TokenUsage.success_status == "error" 

1334 ).count() 

1335 

1336 perf_stats = { 

1337 "avg_response_time": round(avg_response_time), 

1338 "min_response_time": min_response_time, 

1339 "max_response_time": max_response_time, 

1340 "success_rate": ( 

1341 round((success_count / total_calls * 100), 1) 

1342 if total_calls > 0 

1343 else 0 

1344 ), 

1345 "error_rate": ( 

1346 round((error_count / total_calls * 100), 1) 

1347 if total_calls > 0 

1348 else 0 

1349 ), 

1350 "total_enhanced_calls": total_calls, 

1351 } 

1352 else: 

1353 perf_stats = { 

1354 "avg_response_time": 0, 

1355 "min_response_time": 0, 

1356 "max_response_time": 0, 

1357 "success_rate": 0, 

1358 "error_rate": 0, 

1359 "total_enhanced_calls": 0, 

1360 } 

1361 

1362 # Research mode breakdown using ORM 

1363 mode_stats = ( 

1364 query.filter(TokenUsage.research_mode.isnot(None)) 

1365 .with_entities( 

1366 TokenUsage.research_mode, 

1367 func.count().label("count"), 

1368 func.avg(TokenUsage.total_tokens).label("avg_tokens"), 

1369 func.avg(TokenUsage.response_time_ms).label( 

1370 "avg_response_time" 

1371 ), 

1372 ) 

1373 .group_by(TokenUsage.research_mode) 

1374 .all() 

1375 ) 

1376 

1377 modes = [ 

1378 { 

1379 "mode": stat.research_mode, 

1380 "count": stat.count, 

1381 "avg_tokens": round(stat.avg_tokens or 0), 

1382 "avg_response_time": round(stat.avg_response_time or 0), 

1383 } 

1384 for stat in mode_stats 

1385 ] 

1386 

1387 # Recent enhanced data (simplified) 

1388 recent_enhanced_query = ( 

1389 query.filter(TokenUsage.research_query.isnot(None)) 

1390 .order_by(TokenUsage.timestamp.desc()) 

1391 .limit(50) 

1392 ) 

1393 

1394 recent_enhanced_data = recent_enhanced_query.all() 

1395 recent_enhanced = [ 

1396 { 

1397 "research_query": usage.research_query, 

1398 "research_mode": usage.research_mode, 

1399 "research_phase": usage.research_phase, 

1400 "search_iteration": usage.search_iteration, 

1401 "response_time_ms": usage.response_time_ms, 

1402 "success_status": usage.success_status, 

1403 "error_type": usage.error_type, 

1404 "search_engines_planned": usage.search_engines_planned, 

1405 "search_engine_selected": usage.search_engine_selected, 

1406 "total_tokens": usage.total_tokens, 

1407 "prompt_tokens": usage.prompt_tokens, 

1408 "completion_tokens": usage.completion_tokens, 

1409 "timestamp": str(usage.timestamp) 

1410 if usage.timestamp 

1411 else None, 

1412 "research_id": usage.research_id, 

1413 "calling_file": usage.calling_file, 

1414 "calling_function": usage.calling_function, 

1415 "call_stack": usage.call_stack, 

1416 } 

1417 for usage in recent_enhanced_data 

1418 ] 

1419 

1420 # Search engine breakdown using ORM 

1421 search_engine_stats = ( 

1422 query.filter(TokenUsage.search_engine_selected.isnot(None)) 

1423 .with_entities( 

1424 TokenUsage.search_engine_selected, 

1425 func.count().label("count"), 

1426 func.avg(TokenUsage.total_tokens).label("avg_tokens"), 

1427 func.avg(TokenUsage.response_time_ms).label( 

1428 "avg_response_time" 

1429 ), 

1430 ) 

1431 .group_by(TokenUsage.search_engine_selected) 

1432 .all() 

1433 ) 

1434 

1435 search_engines = [ 

1436 { 

1437 "search_engine": stat.search_engine_selected, 

1438 "count": stat.count, 

1439 "avg_tokens": round(stat.avg_tokens or 0), 

1440 "avg_response_time": round(stat.avg_response_time or 0), 

1441 } 

1442 for stat in search_engine_stats 

1443 ] 

1444 

1445 # Research phase breakdown using ORM 

1446 phase_stats = ( 

1447 query.filter(TokenUsage.research_phase.isnot(None)) 

1448 .with_entities( 

1449 TokenUsage.research_phase, 

1450 func.count().label("count"), 

1451 func.avg(TokenUsage.total_tokens).label("avg_tokens"), 

1452 func.avg(TokenUsage.response_time_ms).label( 

1453 "avg_response_time" 

1454 ), 

1455 ) 

1456 .group_by(TokenUsage.research_phase) 

1457 .all() 

1458 ) 

1459 

1460 phases = [ 

1461 { 

1462 "phase": stat.research_phase, 

1463 "count": stat.count, 

1464 "avg_tokens": round(stat.avg_tokens or 0), 

1465 "avg_response_time": round(stat.avg_response_time or 0), 

1466 } 

1467 for stat in phase_stats 

1468 ] 

1469 

1470 # Call stack analysis using ORM 

1471 file_stats = ( 

1472 query.filter(TokenUsage.calling_file.isnot(None)) 

1473 .with_entities( 

1474 TokenUsage.calling_file, 

1475 func.count().label("count"), 

1476 func.avg(TokenUsage.total_tokens).label("avg_tokens"), 

1477 ) 

1478 .group_by(TokenUsage.calling_file) 

1479 .order_by(func.count().desc()) 

1480 .limit(10) 

1481 .all() 

1482 ) 

1483 

1484 files = [ 

1485 { 

1486 "file": stat.calling_file, 

1487 "count": stat.count, 

1488 "avg_tokens": round(stat.avg_tokens or 0), 

1489 } 

1490 for stat in file_stats 

1491 ] 

1492 

1493 function_stats = ( 

1494 query.filter(TokenUsage.calling_function.isnot(None)) 

1495 .with_entities( 

1496 TokenUsage.calling_function, 

1497 func.count().label("count"), 

1498 func.avg(TokenUsage.total_tokens).label("avg_tokens"), 

1499 ) 

1500 .group_by(TokenUsage.calling_function) 

1501 .order_by(func.count().desc()) 

1502 .limit(10) 

1503 .all() 

1504 ) 

1505 

1506 functions = [ 

1507 { 

1508 "function": stat.calling_function, 

1509 "count": stat.count, 

1510 "avg_tokens": round(stat.avg_tokens or 0), 

1511 } 

1512 for stat in function_stats 

1513 ] 

1514 

1515 return { 

1516 "recent_enhanced_data": recent_enhanced, 

1517 "performance_stats": perf_stats, 

1518 "mode_breakdown": modes, 

1519 "search_engine_stats": search_engines, 

1520 "phase_breakdown": phases, 

1521 "time_series_data": time_series, 

1522 "call_stack_analysis": { 

1523 "by_file": files, 

1524 "by_function": functions, 

1525 }, 

1526 } 

1527 except Exception: 

1528 logger.exception("Error in get_enhanced_metrics") 

1529 # Return simplified response without non-existent columns 

1530 return { 

1531 "recent_enhanced_data": [], 

1532 "performance_stats": { 

1533 "avg_response_time": 0, 

1534 "min_response_time": 0, 

1535 "max_response_time": 0, 

1536 "success_rate": 0, 

1537 "error_rate": 0, 

1538 "total_enhanced_calls": 0, 

1539 }, 

1540 "mode_breakdown": [], 

1541 "search_engine_stats": [], 

1542 "phase_breakdown": [], 

1543 "time_series_data": [], 

1544 "call_stack_analysis": { 

1545 "by_file": [], 

1546 "by_function": [], 

1547 }, 

1548 } 

1549 

1550 def get_research_timeline_metrics(self, research_id: str) -> Dict[str, Any]: 

1551 """Get timeline metrics for a specific research. 

1552 

1553 Args: 

1554 research_id: The ID of the research 

1555 

1556 Returns: 

1557 Dictionary containing timeline metrics for the research 

1558 """ 

1559 from flask import session as flask_session 

1560 

1561 from ..database.session_context import get_user_db_session 

1562 

1563 username = flask_session.get("username") 

1564 if not username: 

1565 return { 

1566 "research_id": research_id, 

1567 "research_details": {}, 

1568 "timeline": [], 

1569 "summary": { 

1570 "total_calls": 0, 

1571 "total_tokens": 0, 

1572 "total_prompt_tokens": 0, 

1573 "total_completion_tokens": 0, 

1574 "avg_response_time": 0, 

1575 "success_rate": 0, 

1576 }, 

1577 "phase_stats": {}, 

1578 } 

1579 

1580 with get_user_db_session(username) as session: 

1581 # Get all token usage for this research ordered by time including call stack 

1582 timeline_data = session.execute( 

1583 text( 

1584 """ 

1585 SELECT 

1586 timestamp, 

1587 total_tokens, 

1588 prompt_tokens, 

1589 completion_tokens, 

1590 response_time_ms, 

1591 success_status, 

1592 error_type, 

1593 research_phase, 

1594 search_iteration, 

1595 search_engine_selected, 

1596 model_name, 

1597 calling_file, 

1598 calling_function, 

1599 call_stack 

1600 FROM token_usage 

1601 WHERE research_id = :research_id 

1602 ORDER BY timestamp ASC 

1603 """ 

1604 ), 

1605 {"research_id": research_id}, 

1606 ).fetchall() 

1607 

1608 # Format timeline data with cumulative tokens 

1609 timeline = [] 

1610 cumulative_tokens = 0 

1611 cumulative_prompt_tokens = 0 

1612 cumulative_completion_tokens = 0 

1613 

1614 for row in timeline_data: 

1615 cumulative_tokens += row[1] or 0 

1616 cumulative_prompt_tokens += row[2] or 0 

1617 cumulative_completion_tokens += row[3] or 0 

1618 

1619 timeline.append( 

1620 { 

1621 "timestamp": str(row[0]) if row[0] else None, 

1622 "tokens": row[1] or 0, 

1623 "prompt_tokens": row[2] or 0, 

1624 "completion_tokens": row[3] or 0, 

1625 "cumulative_tokens": cumulative_tokens, 

1626 "cumulative_prompt_tokens": cumulative_prompt_tokens, 

1627 "cumulative_completion_tokens": cumulative_completion_tokens, 

1628 "response_time_ms": row[4], 

1629 "success_status": row[5], 

1630 "error_type": row[6], 

1631 "research_phase": row[7], 

1632 "search_iteration": row[8], 

1633 "search_engine_selected": row[9], 

1634 "model_name": row[10], 

1635 "calling_file": row[11], 

1636 "calling_function": row[12], 

1637 "call_stack": row[13], 

1638 } 

1639 ) 

1640 

1641 # Get research basic info 

1642 research_info = session.execute( 

1643 text( 

1644 """ 

1645 SELECT query, mode, status, created_at, completed_at 

1646 FROM research_history 

1647 WHERE id = :research_id 

1648 """ 

1649 ), 

1650 {"research_id": research_id}, 

1651 ).fetchone() 

1652 

1653 research_details = {} 

1654 if research_info: 

1655 research_details = { 

1656 "query": research_info[0], 

1657 "mode": research_info[1], 

1658 "status": research_info[2], 

1659 "created_at": str(research_info[3]) 

1660 if research_info[3] 

1661 else None, 

1662 "completed_at": str(research_info[4]) 

1663 if research_info[4] 

1664 else None, 

1665 } 

1666 

1667 # Calculate summary stats 

1668 total_calls = len(timeline_data) 

1669 total_tokens = cumulative_tokens 

1670 avg_response_time = sum(row[4] or 0 for row in timeline_data) / max( 

1671 total_calls, 1 

1672 ) 

1673 success_rate = ( 

1674 sum(1 for row in timeline_data if row[5] == "success") 

1675 / max(total_calls, 1) 

1676 * 100 

1677 ) 

1678 

1679 # Phase breakdown for this research 

1680 phase_stats = {} 

1681 for row in timeline_data: 

1682 phase = row[7] or "unknown" 

1683 if phase not in phase_stats: 

1684 phase_stats[phase] = { 

1685 "count": 0, 

1686 "tokens": 0, 

1687 "avg_response_time": 0, 

1688 } 

1689 phase_stats[phase]["count"] += 1 

1690 phase_stats[phase]["tokens"] += row[1] or 0 

1691 if row[4]: 1691 ↛ 1681line 1691 didn't jump to line 1681 because the condition on line 1691 was always true

1692 phase_stats[phase]["avg_response_time"] += row[4] 

1693 

1694 # Calculate averages for phases 

1695 for phase in phase_stats: 

1696 if phase_stats[phase]["count"] > 0: 1696 ↛ 1695line 1696 didn't jump to line 1695 because the condition on line 1696 was always true

1697 phase_stats[phase]["avg_response_time"] = round( 

1698 phase_stats[phase]["avg_response_time"] 

1699 / phase_stats[phase]["count"] 

1700 ) 

1701 

1702 return { 

1703 "research_id": research_id, 

1704 "research_details": research_details, 

1705 "timeline": timeline, 

1706 "summary": { 

1707 "total_calls": total_calls, 

1708 "total_tokens": total_tokens, 

1709 "total_prompt_tokens": cumulative_prompt_tokens, 

1710 "total_completion_tokens": cumulative_completion_tokens, 

1711 "avg_response_time": round(avg_response_time), 

1712 "success_rate": round(success_rate, 1), 

1713 }, 

1714 "phase_stats": phase_stats, 

1715 }