Coverage for src / local_deep_research / metrics / token_counter.py: 59%

506 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1"""Token counting functionality for LLM usage tracking.""" 

2 

3import inspect 

4import json 

5import time 

6from datetime import datetime, timedelta, UTC 

7from pathlib import Path 

8from typing import Any, Dict, List, Optional 

9 

10from langchain_core.callbacks import BaseCallbackHandler 

11from langchain_core.outputs import LLMResult 

12from loguru import logger 

13from sqlalchemy import func, text 

14 

15from ..database.models import ModelUsage, TokenUsage 

16from .query_utils import get_research_mode_condition, get_time_filter_condition 

17 

18 

19class TokenCountingCallback(BaseCallbackHandler): 

20 """Callback handler for counting tokens across different models.""" 

21 

22 def __init__( 

23 self, 

24 research_id: Optional[str] = None, 

25 research_context: Optional[Dict[str, Any]] = None, 

26 ): 

27 """Initialize the token counting callback. 

28 

29 Args: 

30 research_id: The ID of the research to track tokens for 

31 research_context: Additional research context for enhanced tracking 

32 """ 

33 super().__init__() 

34 self.research_id = research_id 

35 self.research_context = research_context or {} 

36 self.current_model = None 

37 self.current_provider = None 

38 self.preset_model = None # Model name set during callback creation 

39 self.preset_provider = None # Provider set during callback creation 

40 

41 # Phase 1 Enhancement: Track timing and context 

42 self.start_time = None 

43 self.response_time_ms = None 

44 self.success_status = "success" 

45 self.error_type = None 

46 

47 # Call stack tracking 

48 self.calling_file = None 

49 self.calling_function = None 

50 self.call_stack = None 

51 

52 # Context overflow tracking 

53 self.context_limit = None 

54 self.context_truncated = False 

55 self.tokens_truncated = 0 

56 self.truncation_ratio = 0.0 

57 self.original_prompt_estimate = 0 

58 

59 # Raw Ollama response metrics 

60 self.ollama_metrics = {} 

61 

62 # Track token counts in memory 

63 self.counts = { 

64 "total_tokens": 0, 

65 "total_prompt_tokens": 0, 

66 "total_completion_tokens": 0, 

67 "by_model": {}, 

68 } 

69 

70 def on_llm_start( 

71 self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any 

72 ) -> None: 

73 """Called when LLM starts running.""" 

74 # Phase 1 Enhancement: Start timing 

75 self.start_time = time.time() 

76 

77 # Estimate original prompt size (rough estimate: ~4 chars per token) 

78 if prompts: 78 ↛ 86line 78 didn't jump to line 86 because the condition on line 78 was always true

79 total_chars = sum(len(prompt) for prompt in prompts) 

80 self.original_prompt_estimate = total_chars // 4 

81 logger.debug( 

82 f"Estimated prompt tokens: {self.original_prompt_estimate} (from {total_chars} chars)" 

83 ) 

84 

85 # Get context limit from research context (will be set from settings) 

86 self.context_limit = self.research_context.get("context_limit") 

87 

88 # Phase 1 Enhancement: Capture call stack information 

89 try: 

90 stack = inspect.stack() 

91 

92 # Skip the first few frames (this method, langchain internals) 

93 # Look for the first frame that's in our project directory 

94 for frame_info in stack[1:]: 

95 file_path = frame_info.filename 

96 # Look for any frame containing local_deep_research project 

97 if ( 97 ↛ 103line 97 didn't jump to line 103 because the condition on line 97 was never true

98 "local_deep_research" in file_path 

99 and "site-packages" not in file_path 

100 and "venv" not in file_path 

101 ): 

102 # Extract relative path from local_deep_research 

103 if "src/local_deep_research" in file_path: 

104 relative_path = file_path.split( 

105 "src/local_deep_research" 

106 )[-1].lstrip("/") 

107 elif "local_deep_research/src" in file_path: 

108 relative_path = file_path.split( 

109 "local_deep_research/src" 

110 )[-1].lstrip("/") 

111 elif "local_deep_research" in file_path: 

112 # Get everything after local_deep_research 

113 relative_path = file_path.split("local_deep_research")[ 

114 -1 

115 ].lstrip("/") 

116 else: 

117 relative_path = Path(file_path).name 

118 

119 self.calling_file = relative_path 

120 self.calling_function = frame_info.function 

121 

122 # Capture a simplified call stack (just the relevant frames) 

123 call_stack_frames = [] 

124 for frame in stack[1:6]: # Limit to 5 frames 

125 if ( 

126 "local_deep_research" in frame.filename 

127 and "site-packages" not in frame.filename 

128 and "venv" not in frame.filename 

129 ): 

130 frame_name = f"{Path(frame.filename).name}:{frame.function}:{frame.lineno}" 

131 call_stack_frames.append(frame_name) 

132 

133 self.call_stack = ( 

134 " -> ".join(call_stack_frames) 

135 if call_stack_frames 

136 else None 

137 ) 

138 break 

139 except Exception as e: 

140 logger.debug(f"Error capturing call stack: {e}") 

141 # Continue without call stack info if there's an error 

142 

143 # Debug logging removed to reduce log clutter 

144 # Uncomment below if you need to debug token counting 

145 # logger.debug(f"on_llm_start serialized: {serialized}") 

146 # logger.debug(f"on_llm_start kwargs keys: {list(kwargs.keys()) if kwargs else []}") 

147 

148 # First, use preset values if available 

149 if self.preset_model: 

150 self.current_model = self.preset_model 

151 else: 

152 # Try multiple locations for model name 

153 model_name = None 

154 

155 # First check invocation_params 

156 invocation_params = kwargs.get("invocation_params", {}) 

157 model_name = invocation_params.get( 

158 "model" 

159 ) or invocation_params.get("model_name") 

160 

161 # Check kwargs directly 

162 if not model_name: 

163 model_name = kwargs.get("model") or kwargs.get("model_name") 

164 

165 # Check serialized data 

166 if not model_name and "kwargs" in serialized: 

167 model_name = serialized["kwargs"].get("model") or serialized[ 

168 "kwargs" 

169 ].get("model_name") 

170 

171 # Check for name in serialized data 

172 if not model_name and "name" in serialized: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 model_name = serialized["name"] 

174 

175 # If still not found and we have Ollama, try to extract from the instance 

176 if ( 

177 not model_name 

178 and "_type" in serialized 

179 and "ChatOllama" in serialized["_type"] 

180 ): 

181 # For Ollama, the model name might be in the serialized kwargs 

182 if "kwargs" in serialized and "model" in serialized["kwargs"]: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 model_name = serialized["kwargs"]["model"] 

184 else: 

185 # Default to the type if we can't find the actual model 

186 model_name = "ollama" 

187 

188 # Final fallback 

189 if not model_name: 

190 if "_type" in serialized: 

191 model_name = serialized["_type"] 

192 else: 

193 model_name = "unknown" 

194 

195 self.current_model = model_name 

196 

197 # Use preset provider if available 

198 if self.preset_provider: 

199 self.current_provider = self.preset_provider 

200 else: 

201 # Extract provider from serialized type or kwargs 

202 if "_type" in serialized: 

203 type_str = serialized["_type"] 

204 if "ChatOllama" in type_str: 

205 self.current_provider = "ollama" 

206 elif "ChatOpenAI" in type_str: 

207 self.current_provider = "openai" 

208 elif "ChatAnthropic" in type_str: 208 ↛ 211line 208 didn't jump to line 211 because the condition on line 208 was always true

209 self.current_provider = "anthropic" 

210 else: 

211 self.current_provider = kwargs.get("provider", "unknown") 

212 else: 

213 self.current_provider = kwargs.get("provider", "unknown") 

214 

215 # Initialize model tracking if needed 

216 if self.current_model not in self.counts["by_model"]: 

217 self.counts["by_model"][self.current_model] = { 

218 "prompt_tokens": 0, 

219 "completion_tokens": 0, 

220 "total_tokens": 0, 

221 "calls": 0, 

222 "provider": self.current_provider, 

223 } 

224 

225 # Increment call count 

226 self.counts["by_model"][self.current_model]["calls"] += 1 

227 

228 def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: 

229 """Called when LLM ends running.""" 

230 # Phase 1 Enhancement: Calculate response time 

231 if self.start_time: 

232 self.response_time_ms = int((time.time() - self.start_time) * 1000) 

233 

234 # Extract token usage from response 

235 token_usage = None 

236 

237 # Check multiple locations for token usage 

238 if hasattr(response, "llm_output") and response.llm_output: 

239 token_usage = response.llm_output.get( 

240 "token_usage" 

241 ) or response.llm_output.get("usage", {}) 

242 

243 # Check for usage metadata in generations (Ollama specific) 

244 if not token_usage and hasattr(response, "generations"): 

245 for generation_list in response.generations: 245 ↛ 339line 245 didn't jump to line 339 because the loop on line 245 didn't complete

246 for generation in generation_list: 246 ↛ 336line 246 didn't jump to line 336 because the loop on line 246 didn't complete

247 if hasattr(generation, "message") and hasattr( 247 ↛ 265line 247 didn't jump to line 265 because the condition on line 247 was always true

248 generation.message, "usage_metadata" 

249 ): 

250 usage_meta = generation.message.usage_metadata 

251 if usage_meta: # Check if usage_metadata is not None 

252 token_usage = { 

253 "prompt_tokens": usage_meta.get( 

254 "input_tokens", 0 

255 ), 

256 "completion_tokens": usage_meta.get( 

257 "output_tokens", 0 

258 ), 

259 "total_tokens": usage_meta.get( 

260 "total_tokens", 0 

261 ), 

262 } 

263 break 

264 # Also check response_metadata 

265 if hasattr(generation, "message") and hasattr( 265 ↛ 246line 265 didn't jump to line 246 because the condition on line 265 was always true

266 generation.message, "response_metadata" 

267 ): 

268 resp_meta = generation.message.response_metadata 

269 if resp_meta.get("prompt_eval_count") or resp_meta.get( 269 ↛ 246line 269 didn't jump to line 246 because the condition on line 269 was always true

270 "eval_count" 

271 ): 

272 # Capture raw Ollama metrics 

273 self.ollama_metrics = { 

274 "prompt_eval_count": resp_meta.get( 

275 "prompt_eval_count" 

276 ), 

277 "eval_count": resp_meta.get("eval_count"), 

278 "total_duration": resp_meta.get( 

279 "total_duration" 

280 ), 

281 "load_duration": resp_meta.get("load_duration"), 

282 "prompt_eval_duration": resp_meta.get( 

283 "prompt_eval_duration" 

284 ), 

285 "eval_duration": resp_meta.get("eval_duration"), 

286 } 

287 

288 # Check for context overflow 

289 prompt_eval_count = resp_meta.get( 

290 "prompt_eval_count", 0 

291 ) 

292 if self.context_limit and prompt_eval_count > 0: 

293 # Check if we're near or at the context limit 

294 if ( 

295 prompt_eval_count 

296 >= self.context_limit * 0.95 

297 ): # 95% threshold 

298 self.context_truncated = True 

299 

300 # Estimate tokens truncated 

301 if ( 301 ↛ 323line 301 didn't jump to line 323 because the condition on line 301 was always true

302 self.original_prompt_estimate 

303 > prompt_eval_count 

304 ): 

305 self.tokens_truncated = max( 

306 0, 

307 self.original_prompt_estimate 

308 - prompt_eval_count, 

309 ) 

310 self.truncation_ratio = ( 

311 self.tokens_truncated 

312 / self.original_prompt_estimate 

313 if self.original_prompt_estimate > 0 

314 else 0 

315 ) 

316 logger.warning( 

317 f"Context overflow detected! " 

318 f"Prompt tokens: {prompt_eval_count}/{self.context_limit} " 

319 f"(estimated {self.tokens_truncated} tokens truncated, " 

320 f"{self.truncation_ratio:.1%} of prompt)" 

321 ) 

322 

323 token_usage = { 

324 "prompt_tokens": resp_meta.get( 

325 "prompt_eval_count", 0 

326 ), 

327 "completion_tokens": resp_meta.get( 

328 "eval_count", 0 

329 ), 

330 "total_tokens": resp_meta.get( 

331 "prompt_eval_count", 0 

332 ) 

333 + resp_meta.get("eval_count", 0), 

334 } 

335 break 

336 if token_usage: 336 ↛ 245line 336 didn't jump to line 245 because the condition on line 336 was always true

337 break 

338 

339 if token_usage and isinstance(token_usage, dict): 339 ↛ exitline 339 didn't return from function 'on_llm_end' because the condition on line 339 was always true

340 prompt_tokens = token_usage.get("prompt_tokens", 0) 

341 completion_tokens = token_usage.get("completion_tokens", 0) 

342 total_tokens = token_usage.get( 

343 "total_tokens", prompt_tokens + completion_tokens 

344 ) 

345 

346 # Update in-memory counts 

347 self.counts["total_prompt_tokens"] += prompt_tokens 

348 self.counts["total_completion_tokens"] += completion_tokens 

349 self.counts["total_tokens"] += total_tokens 

350 

351 if self.current_model: 

352 self.counts["by_model"][self.current_model][ 

353 "prompt_tokens" 

354 ] += prompt_tokens 

355 self.counts["by_model"][self.current_model][ 

356 "completion_tokens" 

357 ] += completion_tokens 

358 self.counts["by_model"][self.current_model]["total_tokens"] += ( 

359 total_tokens 

360 ) 

361 

362 # Save to database if we have a research_id 

363 if self.research_id: 

364 self._save_to_db(prompt_tokens, completion_tokens) 

365 

366 def on_llm_error(self, error, **kwargs: Any) -> None: 

367 """Called when LLM encounters an error.""" 

368 # Phase 1 Enhancement: Track errors 

369 if self.start_time: 369 ↛ 372line 369 didn't jump to line 372 because the condition on line 369 was always true

370 self.response_time_ms = int((time.time() - self.start_time) * 1000) 

371 

372 self.success_status = "error" 

373 self.error_type = str(type(error).__name__) 

374 

375 # Still save to database to track failed calls 

376 if self.research_id: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true

377 self._save_to_db(0, 0) 

378 

379 def _get_context_overflow_fields(self) -> Dict[str, Any]: 

380 """Get context overflow detection fields for database saving.""" 

381 return { 

382 "context_limit": self.context_limit, 

383 "context_truncated": self.context_truncated, # Now Boolean 

384 "tokens_truncated": self.tokens_truncated 

385 if self.context_truncated 

386 else None, 

387 "truncation_ratio": self.truncation_ratio 

388 if self.context_truncated 

389 else None, 

390 # Raw Ollama metrics 

391 "ollama_prompt_eval_count": self.ollama_metrics.get( 

392 "prompt_eval_count" 

393 ), 

394 "ollama_eval_count": self.ollama_metrics.get("eval_count"), 

395 "ollama_total_duration": self.ollama_metrics.get("total_duration"), 

396 "ollama_load_duration": self.ollama_metrics.get("load_duration"), 

397 "ollama_prompt_eval_duration": self.ollama_metrics.get( 

398 "prompt_eval_duration" 

399 ), 

400 "ollama_eval_duration": self.ollama_metrics.get("eval_duration"), 

401 } 

402 

403 def _save_to_db(self, prompt_tokens: int, completion_tokens: int): 

404 """Save token usage to the database.""" 

405 # Check if we're in a thread - if so, queue the save for later 

406 import threading 

407 

408 if threading.current_thread().name != "MainThread": 

409 # Use thread-safe metrics database for background threads 

410 username = ( 

411 self.research_context.get("username") 

412 if self.research_context 

413 else None 

414 ) 

415 

416 if not username: 

417 logger.warning( 

418 f"Cannot save token metrics - no username in research context. " 

419 f"Token usage: prompt={prompt_tokens}, completion={completion_tokens}, " 

420 f"Research context: {self.research_context}" 

421 ) 

422 return 

423 

424 # Import the thread-safe metrics database 

425 

426 # Prepare token data 

427 token_data = { 

428 "model_name": self.current_model, 

429 "provider": self.current_provider, 

430 "prompt_tokens": prompt_tokens, 

431 "completion_tokens": completion_tokens, 

432 "research_query": self.research_context.get("research_query"), 

433 "research_mode": self.research_context.get("research_mode"), 

434 "research_phase": self.research_context.get("research_phase"), 

435 "search_iteration": self.research_context.get( 

436 "search_iteration" 

437 ), 

438 "response_time_ms": self.response_time_ms, 

439 "success_status": self.success_status, 

440 "error_type": self.error_type, 

441 "search_engines_planned": self.research_context.get( 

442 "search_engines_planned" 

443 ), 

444 "search_engine_selected": self.research_context.get( 

445 "search_engine_selected" 

446 ), 

447 "calling_file": self.calling_file, 

448 "calling_function": self.calling_function, 

449 "call_stack": self.call_stack, 

450 # Add context overflow fields using helper method 

451 **self._get_context_overflow_fields(), 

452 } 

453 

454 # Convert list to JSON string if needed 

455 if isinstance(token_data.get("search_engines_planned"), list): 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true

456 token_data["search_engines_planned"] = json.dumps( 

457 token_data["search_engines_planned"] 

458 ) 

459 

460 # Get password from research context 

461 password = self.research_context.get("user_password") 

462 if not password: 462 ↛ 463line 462 didn't jump to line 463 because the condition on line 462 was never true

463 logger.warning( 

464 f"Cannot save token metrics - no password in research context. " 

465 f"Username: {username}, Token usage: prompt={prompt_tokens}, completion={completion_tokens}" 

466 ) 

467 return 

468 

469 # Write metrics directly using thread-safe database 

470 try: 

471 from ..database.thread_metrics import metrics_writer 

472 

473 # Set password for this thread 

474 metrics_writer.set_user_password(username, password) 

475 

476 # Write metrics to encrypted database 

477 metrics_writer.write_token_metrics( 

478 username, self.research_id, token_data 

479 ) 

480 except Exception: 

481 logger.exception("Failed to write metrics from thread") 

482 return 

483 

484 # In MainThread, save directly 

485 try: 

486 from flask import session as flask_session 

487 from ..database.session_context import get_user_db_session 

488 

489 username = flask_session.get("username") 

490 if not username: 

491 logger.debug("No user session, skipping token metrics save") 

492 return 

493 

494 with get_user_db_session(username) as session: 

495 # Phase 1 Enhancement: Prepare additional context 

496 research_query = self.research_context.get("research_query") 

497 research_mode = self.research_context.get("research_mode") 

498 research_phase = self.research_context.get("research_phase") 

499 search_iteration = self.research_context.get("search_iteration") 

500 search_engines_planned = self.research_context.get( 

501 "search_engines_planned" 

502 ) 

503 search_engine_selected = self.research_context.get( 

504 "search_engine_selected" 

505 ) 

506 

507 # Debug logging for search engine context 

508 if search_engines_planned or search_engine_selected: 

509 logger.info( 

510 f"Token tracking - Search context: planned={search_engines_planned}, selected={search_engine_selected}, phase={research_phase}" 

511 ) 

512 else: 

513 logger.debug( 

514 f"Token tracking - No search engine context yet, phase={research_phase}" 

515 ) 

516 

517 # Convert list to JSON string if needed 

518 if isinstance(search_engines_planned, list): 

519 search_engines_planned = json.dumps(search_engines_planned) 

520 

521 # Log context overflow detection values before saving 

522 logger.debug( 

523 f"Saving TokenUsage - context_limit: {self.context_limit}, " 

524 f"context_truncated: {self.context_truncated}, " 

525 f"tokens_truncated: {self.tokens_truncated}, " 

526 f"ollama_prompt_eval_count: {self.ollama_metrics.get('prompt_eval_count')}, " 

527 f"prompt_tokens: {prompt_tokens}, " 

528 f"completion_tokens: {completion_tokens}" 

529 ) 

530 

531 # Add token usage record with enhanced fields 

532 token_usage = TokenUsage( 

533 research_id=self.research_id, 

534 model_name=self.current_model, 

535 model_provider=self.current_provider, # Added provider 

536 # for accurate cost tracking 

537 prompt_tokens=prompt_tokens, 

538 completion_tokens=completion_tokens, 

539 total_tokens=prompt_tokens + completion_tokens, 

540 # Phase 1 Enhancement: Research context 

541 research_query=research_query, 

542 research_mode=research_mode, 

543 research_phase=research_phase, 

544 search_iteration=search_iteration, 

545 # Phase 1 Enhancement: Performance metrics 

546 response_time_ms=self.response_time_ms, 

547 success_status=self.success_status, 

548 error_type=self.error_type, 

549 # Phase 1 Enhancement: Search engine context 

550 search_engines_planned=search_engines_planned, 

551 search_engine_selected=search_engine_selected, 

552 # Phase 1 Enhancement: Call stack tracking 

553 calling_file=self.calling_file, 

554 calling_function=self.calling_function, 

555 call_stack=self.call_stack, 

556 # Add context overflow fields using helper method 

557 **self._get_context_overflow_fields(), 

558 ) 

559 session.add(token_usage) 

560 

561 # Update or create model usage statistics 

562 model_usage = ( 

563 session.query(ModelUsage) 

564 .filter_by( 

565 model_name=self.current_model, 

566 ) 

567 .first() 

568 ) 

569 

570 if model_usage: 

571 model_usage.total_tokens += ( 

572 prompt_tokens + completion_tokens 

573 ) 

574 model_usage.total_calls += 1 

575 else: 

576 model_usage = ModelUsage( 

577 model_name=self.current_model, 

578 model_provider=self.current_provider, 

579 total_tokens=prompt_tokens + completion_tokens, 

580 total_calls=1, 

581 ) 

582 session.add(model_usage) 

583 

584 # Commit the transaction 

585 session.commit() 

586 

587 except Exception: 

588 logger.exception("Error saving token usage to database") 

589 

590 def get_counts(self) -> Dict[str, Any]: 

591 """Get the current token counts.""" 

592 return self.counts 

593 

594 

595class TokenCounter: 

596 """Manager class for token counting across the application.""" 

597 

598 def __init__(self): 

599 """Initialize the token counter.""" 

600 # No longer need to store database reference 

601 self._thread_metrics_db = None 

602 

603 @property 

604 def thread_metrics_db(self): 

605 """Lazy load thread metrics writer.""" 

606 if self._thread_metrics_db is None: 

607 try: 

608 from ..database.thread_metrics import metrics_writer 

609 

610 self._thread_metrics_db = metrics_writer 

611 except ImportError: 

612 logger.warning("Thread metrics writer not available") 

613 return self._thread_metrics_db 

614 

615 def create_callback( 

616 self, 

617 research_id: Optional[str] = None, 

618 research_context: Optional[Dict[str, Any]] = None, 

619 ) -> TokenCountingCallback: 

620 """Create a new token counting callback. 

621 

622 Args: 

623 research_id: The ID of the research to track tokens for 

624 research_context: Additional research context for enhanced tracking 

625 

626 Returns: 

627 A new TokenCountingCallback instance 

628 """ 

629 return TokenCountingCallback( 

630 research_id=research_id, research_context=research_context 

631 ) 

632 

633 def get_research_metrics(self, research_id: str) -> Dict[str, Any]: 

634 """Get token metrics for a specific research. 

635 

636 Args: 

637 research_id: The ID of the research 

638 

639 Returns: 

640 Dictionary containing token usage metrics 

641 """ 

642 from flask import session as flask_session 

643 

644 from ..database.session_context import get_user_db_session 

645 

646 username = flask_session.get("username") 

647 if not username: 

648 return { 

649 "research_id": research_id, 

650 "total_tokens": 0, 

651 "total_calls": 0, 

652 "model_usage": [], 

653 } 

654 

655 with get_user_db_session(username) as session: 

656 # Get token usage for this research from TokenUsage table 

657 from sqlalchemy import func 

658 

659 token_usages = ( 

660 session.query( 

661 TokenUsage.model_name, 

662 TokenUsage.model_provider, 

663 func.sum(TokenUsage.prompt_tokens).label("prompt_tokens"), 

664 func.sum(TokenUsage.completion_tokens).label( 

665 "completion_tokens" 

666 ), 

667 func.sum(TokenUsage.total_tokens).label("total_tokens"), 

668 func.count().label("calls"), 

669 ) 

670 .filter_by(research_id=research_id) 

671 .group_by(TokenUsage.model_name, TokenUsage.model_provider) 

672 .order_by(func.sum(TokenUsage.total_tokens).desc()) 

673 .all() 

674 ) 

675 

676 model_usage = [] 

677 total_tokens = 0 

678 total_calls = 0 

679 

680 for usage in token_usages: 

681 model_usage.append( 

682 { 

683 "model": usage.model_name, 

684 "provider": usage.model_provider, 

685 "tokens": usage.total_tokens or 0, 

686 "calls": usage.calls or 0, 

687 "prompt_tokens": usage.prompt_tokens or 0, 

688 "completion_tokens": usage.completion_tokens or 0, 

689 } 

690 ) 

691 total_tokens += usage.total_tokens or 0 

692 total_calls += usage.calls or 0 

693 

694 return { 

695 "research_id": research_id, 

696 "total_tokens": total_tokens, 

697 "total_calls": total_calls, 

698 "model_usage": model_usage, 

699 } 

700 

701 def get_overall_metrics( 

702 self, period: str = "30d", research_mode: str = "all" 

703 ) -> Dict[str, Any]: 

704 """Get overall token metrics across all researches. 

705 

706 Args: 

707 period: Time period to filter by ('7d', '30d', '3m', '1y', 'all') 

708 research_mode: Research mode to filter by ('quick', 'detailed', 'all') 

709 

710 Returns: 

711 Dictionary containing overall metrics 

712 """ 

713 # First get metrics from user's encrypted database 

714 encrypted_metrics = self._get_metrics_from_encrypted_db( 

715 period, research_mode 

716 ) 

717 

718 # Then get metrics from thread-safe metrics database 

719 thread_metrics = self._get_metrics_from_thread_db(period, research_mode) 

720 

721 # Merge the results 

722 return self._merge_metrics(encrypted_metrics, thread_metrics) 

723 

724 def _get_metrics_from_encrypted_db( 

725 self, period: str, research_mode: str 

726 ) -> Dict[str, Any]: 

727 """Get metrics from user's encrypted database.""" 

728 from flask import session as flask_session 

729 

730 from ..database.session_context import get_user_db_session 

731 

732 username = flask_session.get("username") 

733 if not username: 733 ↛ 734line 733 didn't jump to line 734 because the condition on line 733 was never true

734 return self._get_empty_metrics() 

735 

736 try: 

737 with get_user_db_session(username) as session: 

738 # Build base query with filters 

739 query = session.query(TokenUsage) 

740 

741 # Apply time filter 

742 time_condition = get_time_filter_condition( 

743 period, TokenUsage.timestamp 

744 ) 

745 if time_condition is not None: 745 ↛ 749line 745 didn't jump to line 749 because the condition on line 745 was always true

746 query = query.filter(time_condition) 

747 

748 # Apply research mode filter 

749 mode_condition = get_research_mode_condition( 

750 research_mode, TokenUsage.research_mode 

751 ) 

752 if mode_condition is not None: 752 ↛ 753line 752 didn't jump to line 753 because the condition on line 752 was never true

753 query = query.filter(mode_condition) 

754 

755 # Total tokens from TokenUsage 

756 total_tokens = ( 

757 query.with_entities( 

758 func.sum(TokenUsage.total_tokens) 

759 ).scalar() 

760 or 0 

761 ) 

762 

763 # Import ResearchHistory model 

764 from ..database.models.research import ResearchHistory 

765 

766 # Count researches from ResearchHistory table 

767 research_query = session.query(func.count(ResearchHistory.id)) 

768 

769 # Debug: Check if any research history records exist at all 

770 all_research_count = ( 

771 session.query(func.count(ResearchHistory.id)).scalar() or 0 

772 ) 

773 logger.warning( 

774 f"DEBUG: Total ResearchHistory records in database: {all_research_count}" 

775 ) 

776 

777 # Debug: List first few research IDs and their timestamps 

778 sample_researches = ( 

779 session.query( 

780 ResearchHistory.id, 

781 ResearchHistory.created_at, 

782 ResearchHistory.mode, 

783 ) 

784 .limit(5) 

785 .all() 

786 ) 

787 if sample_researches: 787 ↛ 788line 787 didn't jump to line 788 because the condition on line 787 was never true

788 logger.warning("DEBUG: Sample ResearchHistory records:") 

789 for r_id, r_created, r_mode in sample_researches: 

790 logger.warning( 

791 f" - ID: {r_id}, Created: {r_created}, Mode: {r_mode}" 

792 ) 

793 else: 

794 logger.warning( 

795 "DEBUG: No ResearchHistory records found in database!" 

796 ) 

797 

798 # Get time filter conditions for ResearchHistory query 

799 start_time, end_time = None, None 

800 if period != "all": 800 ↛ 814line 800 didn't jump to line 814 because the condition on line 800 was always true

801 if period == "today": 801 ↛ 802line 801 didn't jump to line 802 because the condition on line 801 was never true

802 start_time = datetime.now(UTC).replace( 

803 hour=0, minute=0, second=0, microsecond=0 

804 ) 

805 elif period == "week": 805 ↛ 806line 805 didn't jump to line 806 because the condition on line 805 was never true

806 start_time = datetime.now(UTC) - timedelta(days=7) 

807 elif period == "month": 807 ↛ 808line 807 didn't jump to line 808 because the condition on line 807 was never true

808 start_time = datetime.now(UTC) - timedelta(days=30) 

809 

810 if start_time: 810 ↛ 811line 810 didn't jump to line 811 because the condition on line 810 was never true

811 end_time = datetime.now(UTC) 

812 

813 # Apply time filter if specified 

814 if start_time and end_time: 814 ↛ 815line 814 didn't jump to line 815 because the condition on line 814 was never true

815 research_query = research_query.filter( 

816 ResearchHistory.created_at >= start_time.isoformat(), 

817 ResearchHistory.created_at <= end_time.isoformat(), 

818 ) 

819 

820 # Apply mode filter if specified 

821 mode_filter = research_mode if research_mode != "all" else None 

822 if mode_filter: 822 ↛ 823line 822 didn't jump to line 823 because the condition on line 822 was never true

823 logger.warning( 

824 f"DEBUG: Applying mode filter: {mode_filter}" 

825 ) 

826 research_query = research_query.filter( 

827 ResearchHistory.mode == mode_filter 

828 ) 

829 

830 total_researches = research_query.scalar() or 0 

831 logger.warning( 

832 f"DEBUG: Final filtered research count: {total_researches}" 

833 ) 

834 

835 # Also check distinct research_ids in TokenUsage for comparison 

836 token_research_count = ( 

837 session.query( 

838 func.count(func.distinct(TokenUsage.research_id)) 

839 ).scalar() 

840 or 0 

841 ) 

842 logger.warning( 

843 f"DEBUG: Distinct research_ids in TokenUsage: {token_research_count}" 

844 ) 

845 

846 # Model statistics using ORM aggregation 

847 model_stats_query = session.query( 

848 TokenUsage.model_name, 

849 func.sum(TokenUsage.total_tokens).label("tokens"), 

850 func.count().label("calls"), 

851 func.sum(TokenUsage.prompt_tokens).label("prompt_tokens"), 

852 func.sum(TokenUsage.completion_tokens).label( 

853 "completion_tokens" 

854 ), 

855 ).filter(TokenUsage.model_name.isnot(None)) 

856 

857 # Apply same filters to model stats 

858 if time_condition is not None: 858 ↛ 860line 858 didn't jump to line 860 because the condition on line 858 was always true

859 model_stats_query = model_stats_query.filter(time_condition) 

860 if mode_condition is not None: 860 ↛ 861line 860 didn't jump to line 861 because the condition on line 860 was never true

861 model_stats_query = model_stats_query.filter(mode_condition) 

862 

863 model_stats = ( 

864 model_stats_query.group_by(TokenUsage.model_name) 

865 .order_by(func.sum(TokenUsage.total_tokens).desc()) 

866 .all() 

867 ) 

868 

869 # Get provider info from ModelUsage table 

870 by_model = [] 

871 for stat in model_stats: 871 ↛ 873line 871 didn't jump to line 873 because the loop on line 871 never started

872 # Try to get provider from ModelUsage table 

873 provider_info = ( 

874 session.query(ModelUsage.model_provider) 

875 .filter(ModelUsage.model_name == stat.model_name) 

876 .first() 

877 ) 

878 provider = ( 

879 provider_info.model_provider 

880 if provider_info 

881 else "unknown" 

882 ) 

883 

884 by_model.append( 

885 { 

886 "model": stat.model_name, 

887 "provider": provider, 

888 "tokens": stat.tokens, 

889 "calls": stat.calls, 

890 "prompt_tokens": stat.prompt_tokens, 

891 "completion_tokens": stat.completion_tokens, 

892 } 

893 ) 

894 

895 # Get recent researches with token usage 

896 # Note: This requires research_history table - for now we'll use available data 

897 recent_research_query = session.query( 

898 TokenUsage.research_id, 

899 func.sum(TokenUsage.total_tokens).label("token_count"), 

900 func.max(TokenUsage.timestamp).label("latest_timestamp"), 

901 ).filter(TokenUsage.research_id.isnot(None)) 

902 

903 if time_condition is not None: 903 ↛ 907line 903 didn't jump to line 907 because the condition on line 903 was always true

904 recent_research_query = recent_research_query.filter( 

905 time_condition 

906 ) 

907 if mode_condition is not None: 907 ↛ 908line 907 didn't jump to line 908 because the condition on line 907 was never true

908 recent_research_query = recent_research_query.filter( 

909 mode_condition 

910 ) 

911 

912 recent_research_data = ( 

913 recent_research_query.group_by(TokenUsage.research_id) 

914 .order_by(func.max(TokenUsage.timestamp).desc()) 

915 .limit(10) 

916 .all() 

917 ) 

918 

919 recent_researches = [] 

920 for research_data in recent_research_data: 920 ↛ 922line 920 didn't jump to line 922 because the loop on line 920 never started

921 # Get research query from token_usage table if available 

922 research_query_data = ( 

923 session.query(TokenUsage.research_query) 

924 .filter( 

925 TokenUsage.research_id == research_data.research_id, 

926 TokenUsage.research_query.isnot(None), 

927 ) 

928 .first() 

929 ) 

930 

931 query_text = ( 

932 research_query_data.research_query 

933 if research_query_data 

934 else f"Research {research_data.research_id}" 

935 ) 

936 

937 recent_researches.append( 

938 { 

939 "id": research_data.research_id, 

940 "query": query_text, 

941 "tokens": research_data.token_count or 0, 

942 "created_at": research_data.latest_timestamp, 

943 } 

944 ) 

945 

946 # Token breakdown statistics 

947 breakdown_query = query.with_entities( 

948 func.sum(TokenUsage.prompt_tokens).label( 

949 "total_input_tokens" 

950 ), 

951 func.sum(TokenUsage.completion_tokens).label( 

952 "total_output_tokens" 

953 ), 

954 func.avg(TokenUsage.prompt_tokens).label( 

955 "avg_input_tokens" 

956 ), 

957 func.avg(TokenUsage.completion_tokens).label( 

958 "avg_output_tokens" 

959 ), 

960 func.avg(TokenUsage.total_tokens).label("avg_total_tokens"), 

961 ) 

962 token_breakdown = breakdown_query.first() 

963 

964 # Get rate limiting metrics 

965 from ..database.models import ( 

966 RateLimitAttempt, 

967 RateLimitEstimate, 

968 ) 

969 

970 # Get rate limit attempts 

971 rate_limit_query = session.query(RateLimitAttempt) 

972 

973 # Apply time filter 

974 if time_condition is not None: 974 ↛ 993line 974 didn't jump to line 993 because the condition on line 974 was always true

975 # RateLimitAttempt uses timestamp as float, not datetime 

976 if period == "7d": 976 ↛ 977line 976 didn't jump to line 977 because the condition on line 976 was never true

977 cutoff_time = time.time() - (7 * 24 * 3600) 

978 elif period == "30d": 978 ↛ 980line 978 didn't jump to line 980 because the condition on line 978 was always true

979 cutoff_time = time.time() - (30 * 24 * 3600) 

980 elif period == "3m": 

981 cutoff_time = time.time() - (90 * 24 * 3600) 

982 elif period == "1y": 

983 cutoff_time = time.time() - (365 * 24 * 3600) 

984 else: # all 

985 cutoff_time = 0 

986 

987 if cutoff_time > 0: 987 ↛ 993line 987 didn't jump to line 993 because the condition on line 987 was always true

988 rate_limit_query = rate_limit_query.filter( 

989 RateLimitAttempt.timestamp >= cutoff_time 

990 ) 

991 

992 # Get rate limit statistics 

993 total_attempts = rate_limit_query.count() 

994 successful_attempts = rate_limit_query.filter( 

995 RateLimitAttempt.success 

996 ).count() 

997 failed_attempts = total_attempts - successful_attempts 

998 

999 # Count rate limiting events (failures with RateLimitError) 

1000 rate_limit_events = rate_limit_query.filter( 

1001 ~RateLimitAttempt.success, 

1002 RateLimitAttempt.error_type == "RateLimitError", 

1003 ).count() 

1004 

1005 logger.warning( 

1006 f"DEBUG: Rate limit attempts in database: total={total_attempts}, successful={successful_attempts}" 

1007 ) 

1008 

1009 # Get all attempts for detailed calculations 

1010 attempts = rate_limit_query.all() 

1011 

1012 # Calculate average wait times 

1013 if attempts: 1013 ↛ 1014line 1013 didn't jump to line 1014 because the condition on line 1013 was never true

1014 avg_wait_time = sum(a.wait_time for a in attempts) / len( 

1015 attempts 

1016 ) 

1017 successful_wait_times = [ 

1018 a.wait_time for a in attempts if a.success 

1019 ] 

1020 avg_successful_wait = ( 

1021 sum(successful_wait_times) / len(successful_wait_times) 

1022 if successful_wait_times 

1023 else 0 

1024 ) 

1025 else: 

1026 avg_wait_time = 0 

1027 avg_successful_wait = 0 

1028 

1029 # Get tracked engines - count distinct engine types from attempts 

1030 tracked_engines_query = session.query( 

1031 func.count(func.distinct(RateLimitAttempt.engine_type)) 

1032 ) 

1033 if cutoff_time > 0: 1033 ↛ 1037line 1033 didn't jump to line 1037 because the condition on line 1033 was always true

1034 tracked_engines_query = tracked_engines_query.filter( 

1035 RateLimitAttempt.timestamp >= cutoff_time 

1036 ) 

1037 tracked_engines = tracked_engines_query.scalar() or 0 

1038 

1039 # Get engine-specific stats from attempts 

1040 engine_stats = [] 

1041 

1042 # Get distinct engine types from attempts 

1043 engine_types_query = session.query( 

1044 RateLimitAttempt.engine_type 

1045 ).distinct() 

1046 if cutoff_time > 0: 1046 ↛ 1050line 1046 didn't jump to line 1050 because the condition on line 1046 was always true

1047 engine_types_query = engine_types_query.filter( 

1048 RateLimitAttempt.timestamp >= cutoff_time 

1049 ) 

1050 engine_types = [ 

1051 row.engine_type for row in engine_types_query.all() 

1052 ] 

1053 

1054 for engine_type in engine_types: 1054 ↛ 1055line 1054 didn't jump to line 1055 because the loop on line 1054 never started

1055 engine_attempts_list = [ 

1056 a for a in attempts if a.engine_type == engine_type 

1057 ] 

1058 engine_attempts = len(engine_attempts_list) 

1059 engine_success = len( 

1060 [a for a in engine_attempts_list if a.success] 

1061 ) 

1062 

1063 # Get estimate if exists 

1064 estimate = ( 

1065 session.query(RateLimitEstimate) 

1066 .filter(RateLimitEstimate.engine_type == engine_type) 

1067 .first() 

1068 ) 

1069 

1070 # Calculate recent success rate 

1071 recent_success_rate = ( 

1072 (engine_success / engine_attempts * 100) 

1073 if engine_attempts > 0 

1074 else 0 

1075 ) 

1076 

1077 # Determine status based on success rate 

1078 if estimate: 

1079 status = ( 

1080 "healthy" 

1081 if estimate.success_rate > 0.8 

1082 else "degraded" 

1083 if estimate.success_rate > 0.5 

1084 else "poor" 

1085 ) 

1086 else: 

1087 status = ( 

1088 "healthy" 

1089 if recent_success_rate > 80 

1090 else "degraded" 

1091 if recent_success_rate > 50 

1092 else "poor" 

1093 ) 

1094 

1095 engine_stat = { 

1096 "engine": engine_type, 

1097 "base_wait": estimate.base_wait_seconds 

1098 if estimate 

1099 else 0.0, 

1100 "base_wait_seconds": round( 

1101 estimate.base_wait_seconds if estimate else 0.0, 2 

1102 ), 

1103 "min_wait_seconds": round( 

1104 estimate.min_wait_seconds if estimate else 0.0, 2 

1105 ), 

1106 "max_wait_seconds": round( 

1107 estimate.max_wait_seconds if estimate else 0.0, 2 

1108 ), 

1109 "success_rate": round(estimate.success_rate * 100, 1) 

1110 if estimate 

1111 else recent_success_rate, 

1112 "total_attempts": estimate.total_attempts 

1113 if estimate 

1114 else engine_attempts, 

1115 "recent_attempts": engine_attempts, 

1116 "recent_success_rate": round(recent_success_rate, 1), 

1117 "attempts": engine_attempts, 

1118 "status": status, 

1119 } 

1120 

1121 if estimate: 

1122 engine_stat["last_updated"] = datetime.fromtimestamp( 

1123 estimate.last_updated 

1124 ).strftime("%Y-%m-%d %H:%M:%S") 

1125 else: 

1126 engine_stat["last_updated"] = "Never" 

1127 

1128 engine_stats.append(engine_stat) 

1129 

1130 logger.warning( 

1131 f"DEBUG: Tracked engines: {tracked_engines}, engine_stats: {engine_stats}" 

1132 ) 

1133 

1134 result = { 

1135 "total_tokens": total_tokens, 

1136 "total_researches": total_researches, 

1137 "by_model": by_model, 

1138 "recent_researches": recent_researches, 

1139 "token_breakdown": { 

1140 "total_input_tokens": int( 

1141 token_breakdown.total_input_tokens or 0 

1142 ), 

1143 "total_output_tokens": int( 

1144 token_breakdown.total_output_tokens or 0 

1145 ), 

1146 "avg_input_tokens": int( 

1147 token_breakdown.avg_input_tokens or 0 

1148 ), 

1149 "avg_output_tokens": int( 

1150 token_breakdown.avg_output_tokens or 0 

1151 ), 

1152 "avg_total_tokens": int( 

1153 token_breakdown.avg_total_tokens or 0 

1154 ), 

1155 }, 

1156 "rate_limiting": { 

1157 "total_attempts": total_attempts, 

1158 "successful_attempts": successful_attempts, 

1159 "failed_attempts": failed_attempts, 

1160 "success_rate": ( 

1161 successful_attempts / total_attempts * 100 

1162 ) 

1163 if total_attempts > 0 

1164 else 0, 

1165 "rate_limit_events": rate_limit_events, 

1166 "avg_wait_time": round(float(avg_wait_time), 2), 

1167 "avg_successful_wait": round( 

1168 float(avg_successful_wait), 2 

1169 ), 

1170 "tracked_engines": tracked_engines, 

1171 "engine_stats": engine_stats, 

1172 "total_engines_tracked": tracked_engines, 

1173 "healthy_engines": len( 

1174 [ 

1175 s 

1176 for s in engine_stats 

1177 if s["status"] == "healthy" 

1178 ] 

1179 ), 

1180 "degraded_engines": len( 

1181 [ 

1182 s 

1183 for s in engine_stats 

1184 if s["status"] == "degraded" 

1185 ] 

1186 ), 

1187 "poor_engines": len( 

1188 [s for s in engine_stats if s["status"] == "poor"] 

1189 ), 

1190 }, 

1191 } 

1192 

1193 logger.warning( 

1194 f"DEBUG: Returning from _get_metrics_from_encrypted_db - total_researches: {result['total_researches']}" 

1195 ) 

1196 return result 

1197 except Exception as e: 

1198 logger.exception( 

1199 f"CRITICAL ERROR accessing encrypted database for metrics: {e}" 

1200 ) 

1201 return self._get_empty_metrics() 

1202 

1203 def _get_metrics_from_thread_db( 

1204 self, period: str, research_mode: str 

1205 ) -> Dict[str, Any]: 

1206 """Get metrics from thread-safe metrics database.""" 

1207 if not self.thread_metrics_db: 1207 ↛ 1208line 1207 didn't jump to line 1208 because the condition on line 1207 was never true

1208 return { 

1209 "total_tokens": 0, 

1210 "total_researches": 0, 

1211 "by_model": [], 

1212 "recent_researches": [], 

1213 "token_breakdown": { 

1214 "total_input_tokens": 0, 

1215 "total_output_tokens": 0, 

1216 "avg_input_tokens": 0, 

1217 "avg_output_tokens": 0, 

1218 "avg_total_tokens": 0, 

1219 }, 

1220 } 

1221 

1222 try: 

1223 with self.thread_metrics_db.get_session() as session: 

1224 # Build base query with filters 

1225 query = session.query(TokenUsage) 

1226 

1227 # Apply time filter 

1228 time_condition = get_time_filter_condition( 

1229 period, TokenUsage.timestamp 

1230 ) 

1231 if time_condition is not None: 

1232 query = query.filter(time_condition) 

1233 

1234 # Apply research mode filter 

1235 mode_condition = get_research_mode_condition( 

1236 research_mode, TokenUsage.research_mode 

1237 ) 

1238 if mode_condition is not None: 

1239 query = query.filter(mode_condition) 

1240 

1241 # Get totals 

1242 total_tokens = ( 

1243 query.with_entities( 

1244 func.sum(TokenUsage.total_tokens) 

1245 ).scalar() 

1246 or 0 

1247 ) 

1248 total_researches = ( 

1249 query.with_entities( 

1250 func.count(func.distinct(TokenUsage.research_id)) 

1251 ).scalar() 

1252 or 0 

1253 ) 

1254 

1255 # Get model statistics 

1256 model_stats = ( 

1257 query.with_entities( 

1258 TokenUsage.model_name, 

1259 func.sum(TokenUsage.total_tokens).label("tokens"), 

1260 func.count().label("calls"), 

1261 func.sum(TokenUsage.prompt_tokens).label( 

1262 "prompt_tokens" 

1263 ), 

1264 func.sum(TokenUsage.completion_tokens).label( 

1265 "completion_tokens" 

1266 ), 

1267 ) 

1268 .filter(TokenUsage.model_name.isnot(None)) 

1269 .group_by(TokenUsage.model_name) 

1270 .all() 

1271 ) 

1272 

1273 by_model = [] 

1274 for stat in model_stats: 

1275 by_model.append( 

1276 { 

1277 "model": stat.model_name, 

1278 "provider": "unknown", # Provider info might not be in thread DB 

1279 "tokens": stat.tokens, 

1280 "calls": stat.calls, 

1281 "prompt_tokens": stat.prompt_tokens, 

1282 "completion_tokens": stat.completion_tokens, 

1283 } 

1284 ) 

1285 

1286 # Token breakdown 

1287 breakdown = query.with_entities( 

1288 func.sum(TokenUsage.prompt_tokens).label( 

1289 "total_input_tokens" 

1290 ), 

1291 func.sum(TokenUsage.completion_tokens).label( 

1292 "total_output_tokens" 

1293 ), 

1294 ).first() 

1295 

1296 return { 

1297 "total_tokens": total_tokens, 

1298 "total_researches": total_researches, 

1299 "by_model": by_model, 

1300 "recent_researches": [], # Skip for thread DB 

1301 "token_breakdown": { 

1302 "total_input_tokens": int( 

1303 breakdown.total_input_tokens or 0 

1304 ), 

1305 "total_output_tokens": int( 

1306 breakdown.total_output_tokens or 0 

1307 ), 

1308 "avg_input_tokens": 0, 

1309 "avg_output_tokens": 0, 

1310 "avg_total_tokens": 0, 

1311 }, 

1312 } 

1313 except Exception: 

1314 logger.exception("Error reading thread metrics database") 

1315 return { 

1316 "total_tokens": 0, 

1317 "total_researches": 0, 

1318 "by_model": [], 

1319 "recent_researches": [], 

1320 "token_breakdown": { 

1321 "total_input_tokens": 0, 

1322 "total_output_tokens": 0, 

1323 "avg_input_tokens": 0, 

1324 "avg_output_tokens": 0, 

1325 "avg_total_tokens": 0, 

1326 }, 

1327 } 

1328 

1329 def _merge_metrics( 

1330 self, encrypted: Dict[str, Any], thread: Dict[str, Any] 

1331 ) -> Dict[str, Any]: 

1332 """Merge metrics from both databases.""" 

1333 # Combine totals 

1334 total_tokens = encrypted.get("total_tokens", 0) + thread.get( 

1335 "total_tokens", 0 

1336 ) 

1337 total_researches = max( 

1338 encrypted.get("total_researches", 0), 

1339 thread.get("total_researches", 0), 

1340 ) 

1341 logger.warning( 

1342 f"DEBUG: Merged metrics - encrypted researches: {encrypted.get('total_researches', 0)}, thread researches: {thread.get('total_researches', 0)}, final: {total_researches}" 

1343 ) 

1344 

1345 # Merge model usage 

1346 model_map = {} 

1347 for model_data in encrypted.get("by_model", []): 

1348 key = model_data["model"] 

1349 model_map[key] = model_data 

1350 

1351 for model_data in thread.get("by_model", []): 

1352 key = model_data["model"] 

1353 if key in model_map: 1353 ↛ 1362line 1353 didn't jump to line 1362 because the condition on line 1353 was always true

1354 # Merge with existing 

1355 model_map[key]["tokens"] += model_data["tokens"] 

1356 model_map[key]["calls"] += model_data["calls"] 

1357 model_map[key]["prompt_tokens"] += model_data["prompt_tokens"] 

1358 model_map[key]["completion_tokens"] += model_data[ 

1359 "completion_tokens" 

1360 ] 

1361 else: 

1362 model_map[key] = model_data 

1363 

1364 by_model = sorted( 

1365 model_map.values(), key=lambda x: x["tokens"], reverse=True 

1366 ) 

1367 

1368 # Merge token breakdown 

1369 token_breakdown = { 

1370 "total_input_tokens": ( 

1371 encrypted.get("token_breakdown", {}).get( 

1372 "total_input_tokens", 0 

1373 ) 

1374 + thread.get("token_breakdown", {}).get("total_input_tokens", 0) 

1375 ), 

1376 "total_output_tokens": ( 

1377 encrypted.get("token_breakdown", {}).get( 

1378 "total_output_tokens", 0 

1379 ) 

1380 + thread.get("token_breakdown", {}).get( 

1381 "total_output_tokens", 0 

1382 ) 

1383 ), 

1384 "avg_input_tokens": encrypted.get("token_breakdown", {}).get( 

1385 "avg_input_tokens", 0 

1386 ), 

1387 "avg_output_tokens": encrypted.get("token_breakdown", {}).get( 

1388 "avg_output_tokens", 0 

1389 ), 

1390 "avg_total_tokens": encrypted.get("token_breakdown", {}).get( 

1391 "avg_total_tokens", 0 

1392 ), 

1393 } 

1394 

1395 result = { 

1396 "total_tokens": total_tokens, 

1397 "total_researches": total_researches, 

1398 "by_model": by_model, 

1399 "recent_researches": encrypted.get("recent_researches", []), 

1400 "token_breakdown": token_breakdown, 

1401 } 

1402 

1403 logger.warning( 

1404 f"DEBUG: Final get_token_metrics result - total_researches: {result['total_researches']}" 

1405 ) 

1406 return result 

1407 

1408 def _get_empty_metrics(self) -> Dict[str, Any]: 

1409 """Return empty metrics structure when no data is available.""" 

1410 return { 

1411 "total_tokens": 0, 

1412 "total_researches": 0, 

1413 "by_model": [], 

1414 "recent_researches": [], 

1415 "token_breakdown": { 

1416 "prompt_tokens": 0, 

1417 "completion_tokens": 0, 

1418 "avg_prompt_tokens": 0, 

1419 "avg_completion_tokens": 0, 

1420 "avg_total_tokens": 0, 

1421 }, 

1422 } 

1423 

1424 def get_enhanced_metrics( 

1425 self, period: str = "30d", research_mode: str = "all" 

1426 ) -> Dict[str, Any]: 

1427 """Get enhanced Phase 1 tracking metrics. 

1428 

1429 Args: 

1430 period: Time period to filter by ('7d', '30d', '3m', '1y', 'all') 

1431 research_mode: Research mode to filter by ('quick', 'detailed', 'all') 

1432 

1433 Returns: 

1434 Dictionary containing enhanced metrics data including time series 

1435 """ 

1436 from flask import session as flask_session 

1437 

1438 from ..database.session_context import get_user_db_session 

1439 

1440 username = flask_session.get("username") 

1441 if not username: 1441 ↛ 1443line 1441 didn't jump to line 1443 because the condition on line 1441 was never true

1442 # Return empty metrics structure when no user session 

1443 return { 

1444 "recent_enhanced_data": [], 

1445 "performance_stats": { 

1446 "avg_response_time": 0, 

1447 "min_response_time": 0, 

1448 "max_response_time": 0, 

1449 "success_rate": 0, 

1450 "error_rate": 0, 

1451 "total_enhanced_calls": 0, 

1452 }, 

1453 "mode_breakdown": [], 

1454 "search_engine_stats": [], 

1455 "phase_breakdown": [], 

1456 "time_series_data": [], 

1457 "call_stack_analysis": { 

1458 "by_file": [], 

1459 "by_function": [], 

1460 }, 

1461 } 

1462 

1463 try: 

1464 with get_user_db_session(username) as session: 

1465 # Build base query with filters 

1466 query = session.query(TokenUsage) 

1467 

1468 # Apply time filter 

1469 time_condition = get_time_filter_condition( 

1470 period, TokenUsage.timestamp 

1471 ) 

1472 if time_condition is not None: 1472 ↛ 1476line 1472 didn't jump to line 1476 because the condition on line 1472 was always true

1473 query = query.filter(time_condition) 

1474 

1475 # Apply research mode filter 

1476 mode_condition = get_research_mode_condition( 

1477 research_mode, TokenUsage.research_mode 

1478 ) 

1479 if mode_condition is not None: 1479 ↛ 1480line 1479 didn't jump to line 1480 because the condition on line 1479 was never true

1480 query = query.filter(mode_condition) 

1481 

1482 # Get time series data for the chart - most important for "Token Consumption Over Time" 

1483 time_series_query = query.filter( 

1484 TokenUsage.timestamp.isnot(None), 

1485 TokenUsage.total_tokens > 0, 

1486 ).order_by(TokenUsage.timestamp.asc()) 

1487 

1488 # Limit to recent data for performance 

1489 if period != "all": 1489 ↛ 1492line 1489 didn't jump to line 1492 because the condition on line 1489 was always true

1490 time_series_query = time_series_query.limit(200) 

1491 

1492 time_series_data = time_series_query.all() 

1493 

1494 # Format time series data with cumulative calculations 

1495 time_series = [] 

1496 cumulative_tokens = 0 

1497 cumulative_prompt_tokens = 0 

1498 cumulative_completion_tokens = 0 

1499 

1500 for usage in time_series_data: 1500 ↛ 1501line 1500 didn't jump to line 1501 because the loop on line 1500 never started

1501 cumulative_tokens += usage.total_tokens or 0 

1502 cumulative_prompt_tokens += usage.prompt_tokens or 0 

1503 cumulative_completion_tokens += usage.completion_tokens or 0 

1504 

1505 time_series.append( 

1506 { 

1507 "timestamp": str(usage.timestamp) 

1508 if usage.timestamp 

1509 else None, 

1510 "tokens": usage.total_tokens or 0, 

1511 "prompt_tokens": usage.prompt_tokens or 0, 

1512 "completion_tokens": usage.completion_tokens or 0, 

1513 "cumulative_tokens": cumulative_tokens, 

1514 "cumulative_prompt_tokens": cumulative_prompt_tokens, 

1515 "cumulative_completion_tokens": cumulative_completion_tokens, 

1516 "research_id": usage.research_id, 

1517 } 

1518 ) 

1519 

1520 # Basic performance stats using ORM 

1521 performance_query = query.filter( 

1522 TokenUsage.response_time_ms.isnot(None) 

1523 ) 

1524 total_calls = performance_query.count() 

1525 

1526 if total_calls > 0: 1526 ↛ 1527line 1526 didn't jump to line 1527 because the condition on line 1526 was never true

1527 avg_response_time = ( 

1528 performance_query.with_entities( 

1529 func.avg(TokenUsage.response_time_ms) 

1530 ).scalar() 

1531 or 0 

1532 ) 

1533 min_response_time = ( 

1534 performance_query.with_entities( 

1535 func.min(TokenUsage.response_time_ms) 

1536 ).scalar() 

1537 or 0 

1538 ) 

1539 max_response_time = ( 

1540 performance_query.with_entities( 

1541 func.max(TokenUsage.response_time_ms) 

1542 ).scalar() 

1543 or 0 

1544 ) 

1545 success_count = performance_query.filter( 

1546 TokenUsage.success_status == "success" 

1547 ).count() 

1548 error_count = performance_query.filter( 

1549 TokenUsage.success_status == "error" 

1550 ).count() 

1551 

1552 perf_stats = { 

1553 "avg_response_time": round(avg_response_time), 

1554 "min_response_time": min_response_time, 

1555 "max_response_time": max_response_time, 

1556 "success_rate": ( 

1557 round((success_count / total_calls * 100), 1) 

1558 if total_calls > 0 

1559 else 0 

1560 ), 

1561 "error_rate": ( 

1562 round((error_count / total_calls * 100), 1) 

1563 if total_calls > 0 

1564 else 0 

1565 ), 

1566 "total_enhanced_calls": total_calls, 

1567 } 

1568 else: 

1569 perf_stats = { 

1570 "avg_response_time": 0, 

1571 "min_response_time": 0, 

1572 "max_response_time": 0, 

1573 "success_rate": 0, 

1574 "error_rate": 0, 

1575 "total_enhanced_calls": 0, 

1576 } 

1577 

1578 # Research mode breakdown using ORM 

1579 mode_stats = ( 

1580 query.filter(TokenUsage.research_mode.isnot(None)) 

1581 .with_entities( 

1582 TokenUsage.research_mode, 

1583 func.count().label("count"), 

1584 func.avg(TokenUsage.total_tokens).label("avg_tokens"), 

1585 func.avg(TokenUsage.response_time_ms).label( 

1586 "avg_response_time" 

1587 ), 

1588 ) 

1589 .group_by(TokenUsage.research_mode) 

1590 .all() 

1591 ) 

1592 

1593 modes = [ 

1594 { 

1595 "mode": stat.research_mode, 

1596 "count": stat.count, 

1597 "avg_tokens": round(stat.avg_tokens or 0), 

1598 "avg_response_time": round(stat.avg_response_time or 0), 

1599 } 

1600 for stat in mode_stats 

1601 ] 

1602 

1603 # Recent enhanced data (simplified) 

1604 recent_enhanced_query = ( 

1605 query.filter(TokenUsage.research_query.isnot(None)) 

1606 .order_by(TokenUsage.timestamp.desc()) 

1607 .limit(50) 

1608 ) 

1609 

1610 recent_enhanced_data = recent_enhanced_query.all() 

1611 recent_enhanced = [ 

1612 { 

1613 "research_query": usage.research_query, 

1614 "research_mode": usage.research_mode, 

1615 "research_phase": usage.research_phase, 

1616 "search_iteration": usage.search_iteration, 

1617 "response_time_ms": usage.response_time_ms, 

1618 "success_status": usage.success_status, 

1619 "error_type": usage.error_type, 

1620 "search_engines_planned": usage.search_engines_planned, 

1621 "search_engine_selected": usage.search_engine_selected, 

1622 "total_tokens": usage.total_tokens, 

1623 "prompt_tokens": usage.prompt_tokens, 

1624 "completion_tokens": usage.completion_tokens, 

1625 "timestamp": str(usage.timestamp) 

1626 if usage.timestamp 

1627 else None, 

1628 "research_id": usage.research_id, 

1629 "calling_file": usage.calling_file, 

1630 "calling_function": usage.calling_function, 

1631 "call_stack": usage.call_stack, 

1632 } 

1633 for usage in recent_enhanced_data 

1634 ] 

1635 

1636 # Search engine breakdown using ORM 

1637 search_engine_stats = ( 

1638 query.filter(TokenUsage.search_engine_selected.isnot(None)) 

1639 .with_entities( 

1640 TokenUsage.search_engine_selected, 

1641 func.count().label("count"), 

1642 func.avg(TokenUsage.total_tokens).label("avg_tokens"), 

1643 func.avg(TokenUsage.response_time_ms).label( 

1644 "avg_response_time" 

1645 ), 

1646 ) 

1647 .group_by(TokenUsage.search_engine_selected) 

1648 .all() 

1649 ) 

1650 

1651 search_engines = [ 

1652 { 

1653 "search_engine": stat.search_engine_selected, 

1654 "count": stat.count, 

1655 "avg_tokens": round(stat.avg_tokens or 0), 

1656 "avg_response_time": round(stat.avg_response_time or 0), 

1657 } 

1658 for stat in search_engine_stats 

1659 ] 

1660 

1661 # Research phase breakdown using ORM 

1662 phase_stats = ( 

1663 query.filter(TokenUsage.research_phase.isnot(None)) 

1664 .with_entities( 

1665 TokenUsage.research_phase, 

1666 func.count().label("count"), 

1667 func.avg(TokenUsage.total_tokens).label("avg_tokens"), 

1668 func.avg(TokenUsage.response_time_ms).label( 

1669 "avg_response_time" 

1670 ), 

1671 ) 

1672 .group_by(TokenUsage.research_phase) 

1673 .all() 

1674 ) 

1675 

1676 phases = [ 

1677 { 

1678 "phase": stat.research_phase, 

1679 "count": stat.count, 

1680 "avg_tokens": round(stat.avg_tokens or 0), 

1681 "avg_response_time": round(stat.avg_response_time or 0), 

1682 } 

1683 for stat in phase_stats 

1684 ] 

1685 

1686 # Call stack analysis using ORM 

1687 file_stats = ( 

1688 query.filter(TokenUsage.calling_file.isnot(None)) 

1689 .with_entities( 

1690 TokenUsage.calling_file, 

1691 func.count().label("count"), 

1692 func.avg(TokenUsage.total_tokens).label("avg_tokens"), 

1693 ) 

1694 .group_by(TokenUsage.calling_file) 

1695 .order_by(func.count().desc()) 

1696 .limit(10) 

1697 .all() 

1698 ) 

1699 

1700 files = [ 

1701 { 

1702 "file": stat.calling_file, 

1703 "count": stat.count, 

1704 "avg_tokens": round(stat.avg_tokens or 0), 

1705 } 

1706 for stat in file_stats 

1707 ] 

1708 

1709 function_stats = ( 

1710 query.filter(TokenUsage.calling_function.isnot(None)) 

1711 .with_entities( 

1712 TokenUsage.calling_function, 

1713 func.count().label("count"), 

1714 func.avg(TokenUsage.total_tokens).label("avg_tokens"), 

1715 ) 

1716 .group_by(TokenUsage.calling_function) 

1717 .order_by(func.count().desc()) 

1718 .limit(10) 

1719 .all() 

1720 ) 

1721 

1722 functions = [ 

1723 { 

1724 "function": stat.calling_function, 

1725 "count": stat.count, 

1726 "avg_tokens": round(stat.avg_tokens or 0), 

1727 } 

1728 for stat in function_stats 

1729 ] 

1730 

1731 return { 

1732 "recent_enhanced_data": recent_enhanced, 

1733 "performance_stats": perf_stats, 

1734 "mode_breakdown": modes, 

1735 "search_engine_stats": search_engines, 

1736 "phase_breakdown": phases, 

1737 "time_series_data": time_series, 

1738 "call_stack_analysis": { 

1739 "by_file": files, 

1740 "by_function": functions, 

1741 }, 

1742 } 

1743 except Exception: 

1744 logger.exception("Error in get_enhanced_metrics") 

1745 # Return simplified response without non-existent columns 

1746 return { 

1747 "recent_enhanced_data": [], 

1748 "performance_stats": { 

1749 "avg_response_time": 0, 

1750 "min_response_time": 0, 

1751 "max_response_time": 0, 

1752 "success_rate": 0, 

1753 "error_rate": 0, 

1754 "total_enhanced_calls": 0, 

1755 }, 

1756 "mode_breakdown": [], 

1757 "search_engine_stats": [], 

1758 "phase_breakdown": [], 

1759 "time_series_data": [], 

1760 "call_stack_analysis": { 

1761 "by_file": [], 

1762 "by_function": [], 

1763 }, 

1764 } 

1765 

1766 def get_research_timeline_metrics(self, research_id: str) -> Dict[str, Any]: 

1767 """Get timeline metrics for a specific research. 

1768 

1769 Args: 

1770 research_id: The ID of the research 

1771 

1772 Returns: 

1773 Dictionary containing timeline metrics for the research 

1774 """ 

1775 from flask import session as flask_session 

1776 

1777 from ..database.session_context import get_user_db_session 

1778 

1779 username = flask_session.get("username") 

1780 if not username: 

1781 return { 

1782 "research_id": research_id, 

1783 "research_details": {}, 

1784 "timeline": [], 

1785 "summary": { 

1786 "total_calls": 0, 

1787 "total_tokens": 0, 

1788 "total_prompt_tokens": 0, 

1789 "total_completion_tokens": 0, 

1790 "avg_response_time": 0, 

1791 "success_rate": 0, 

1792 }, 

1793 "phase_stats": {}, 

1794 } 

1795 

1796 with get_user_db_session(username) as session: 

1797 # Get all token usage for this research ordered by time including call stack 

1798 timeline_data = session.execute( 

1799 text( 

1800 """ 

1801 SELECT 

1802 timestamp, 

1803 total_tokens, 

1804 prompt_tokens, 

1805 completion_tokens, 

1806 response_time_ms, 

1807 success_status, 

1808 error_type, 

1809 research_phase, 

1810 search_iteration, 

1811 search_engine_selected, 

1812 model_name, 

1813 calling_file, 

1814 calling_function, 

1815 call_stack 

1816 FROM token_usage 

1817 WHERE research_id = :research_id 

1818 ORDER BY timestamp ASC 

1819 """ 

1820 ), 

1821 {"research_id": research_id}, 

1822 ).fetchall() 

1823 

1824 # Format timeline data with cumulative tokens 

1825 timeline = [] 

1826 cumulative_tokens = 0 

1827 cumulative_prompt_tokens = 0 

1828 cumulative_completion_tokens = 0 

1829 

1830 for row in timeline_data: 

1831 cumulative_tokens += row[1] or 0 

1832 cumulative_prompt_tokens += row[2] or 0 

1833 cumulative_completion_tokens += row[3] or 0 

1834 

1835 timeline.append( 

1836 { 

1837 "timestamp": str(row[0]) if row[0] else None, 

1838 "tokens": row[1] or 0, 

1839 "prompt_tokens": row[2] or 0, 

1840 "completion_tokens": row[3] or 0, 

1841 "cumulative_tokens": cumulative_tokens, 

1842 "cumulative_prompt_tokens": cumulative_prompt_tokens, 

1843 "cumulative_completion_tokens": cumulative_completion_tokens, 

1844 "response_time_ms": row[4], 

1845 "success_status": row[5], 

1846 "error_type": row[6], 

1847 "research_phase": row[7], 

1848 "search_iteration": row[8], 

1849 "search_engine_selected": row[9], 

1850 "model_name": row[10], 

1851 "calling_file": row[11], 

1852 "calling_function": row[12], 

1853 "call_stack": row[13], 

1854 } 

1855 ) 

1856 

1857 # Get research basic info 

1858 research_info = session.execute( 

1859 text( 

1860 """ 

1861 SELECT query, mode, status, created_at, completed_at 

1862 FROM research_history 

1863 WHERE id = :research_id 

1864 """ 

1865 ), 

1866 {"research_id": research_id}, 

1867 ).fetchone() 

1868 

1869 research_details = {} 

1870 if research_info: 

1871 research_details = { 

1872 "query": research_info[0], 

1873 "mode": research_info[1], 

1874 "status": research_info[2], 

1875 "created_at": str(research_info[3]) 

1876 if research_info[3] 

1877 else None, 

1878 "completed_at": str(research_info[4]) 

1879 if research_info[4] 

1880 else None, 

1881 } 

1882 

1883 # Calculate summary stats 

1884 total_calls = len(timeline_data) 

1885 total_tokens = cumulative_tokens 

1886 avg_response_time = sum(row[4] or 0 for row in timeline_data) / max( 

1887 total_calls, 1 

1888 ) 

1889 success_rate = ( 

1890 sum(1 for row in timeline_data if row[5] == "success") 

1891 / max(total_calls, 1) 

1892 * 100 

1893 ) 

1894 

1895 # Phase breakdown for this research 

1896 phase_stats = {} 

1897 for row in timeline_data: 

1898 phase = row[7] or "unknown" 

1899 if phase not in phase_stats: 

1900 phase_stats[phase] = { 

1901 "count": 0, 

1902 "tokens": 0, 

1903 "avg_response_time": 0, 

1904 } 

1905 phase_stats[phase]["count"] += 1 

1906 phase_stats[phase]["tokens"] += row[1] or 0 

1907 if row[4]: 

1908 phase_stats[phase]["avg_response_time"] += row[4] 

1909 

1910 # Calculate averages for phases 

1911 for phase in phase_stats: 

1912 if phase_stats[phase]["count"] > 0: 

1913 phase_stats[phase]["avg_response_time"] = round( 

1914 phase_stats[phase]["avg_response_time"] 

1915 / phase_stats[phase]["count"] 

1916 ) 

1917 

1918 return { 

1919 "research_id": research_id, 

1920 "research_details": research_details, 

1921 "timeline": timeline, 

1922 "summary": { 

1923 "total_calls": total_calls, 

1924 "total_tokens": total_tokens, 

1925 "total_prompt_tokens": cumulative_prompt_tokens, 

1926 "total_completion_tokens": cumulative_completion_tokens, 

1927 "avg_response_time": round(avg_response_time), 

1928 "success_rate": round(success_rate, 1), 

1929 }, 

1930 "phase_stats": phase_stats, 

1931 }