Coverage for src / local_deep_research / benchmarks / efficiency / resource_monitor.py: 12%

133 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Resource monitoring tools for Local Deep Research. 

3 

4This module provides functionality for tracking CPU, memory and other 

5system resource usage during the research process. 

6""" 

7 

8import threading 

9import time 

10from contextlib import contextmanager 

11from typing import Any, Dict 

12 

13from loguru import logger 

14 

15# Try to import psutil, but don't fail if not available 

16try: 

17 import psutil 

18 

19 PSUTIL_AVAILABLE = True 

20except ImportError: 

21 PSUTIL_AVAILABLE = False 

22 logger.warning("psutil not available, resource monitoring will be limited") 

23 

24 

25class ResourceMonitor: 

26 """ 

27 Monitor system resource usage during research. 

28 

29 This class provides methods for tracking CPU, memory, and disk usage 

30 during system execution. It can be used to identify resource bottlenecks 

31 and optimize configurations for different hardware environments. 

32 """ 

33 

34 def __init__( 

35 self, 

36 sampling_interval: float = 1.0, 

37 track_process: bool = True, 

38 track_system: bool = True, 

39 ): 

40 """ 

41 Initialize the resource monitor. 

42 

43 Args: 

44 sampling_interval: Seconds between resource usage measurements 

45 track_process: Whether to track this process's resource usage 

46 track_system: Whether to track overall system resource usage 

47 """ 

48 self.sampling_interval = sampling_interval 

49 self.track_process = track_process 

50 self.track_system = track_system 

51 

52 self.monitoring = False 

53 self.monitor_thread = None 

54 

55 # Resource usage data 

56 self.process_data = [] 

57 self.system_data = [] 

58 self.start_time = None 

59 self.end_time = None 

60 

61 # Check if we can monitor resources 

62 self.can_monitor = PSUTIL_AVAILABLE 

63 if not self.can_monitor: 

64 logger.warning( 

65 "Resource monitoring requires psutil. Install with: pip install psutil" 

66 ) 

67 

68 def start(self): 

69 """Start monitoring resource usage.""" 

70 if not self.can_monitor: 

71 logger.warning( 

72 "Resource monitoring not available (psutil not installed)" 

73 ) 

74 return 

75 

76 if self.monitoring: 

77 logger.warning("Resource monitoring already started") 

78 return 

79 

80 self.process_data = [] 

81 self.system_data = [] 

82 self.start_time = time.time() 

83 self.monitoring = True 

84 

85 # Start monitoring in a background thread 

86 self.monitor_thread = threading.Thread( 

87 target=self._monitor_resources, daemon=True 

88 ) 

89 self.monitor_thread.start() 

90 

91 logger.info( 

92 f"Resource monitoring started with {self.sampling_interval}s interval" 

93 ) 

94 

95 def stop(self): 

96 """Stop monitoring resource usage.""" 

97 if not self.monitoring: 

98 return 

99 

100 self.monitoring = False 

101 self.end_time = time.time() 

102 

103 # Wait for the monitoring thread to finish 

104 if self.monitor_thread: 

105 self.monitor_thread.join(timeout=2.0) 

106 self.monitor_thread = None 

107 

108 logger.info("Resource monitoring stopped") 

109 

110 def _monitor_resources(self): 

111 """Background thread that collects resource usage data.""" 

112 if not PSUTIL_AVAILABLE: 

113 return 

114 

115 # Get this process 

116 current_process = psutil.Process() 

117 

118 while self.monitoring: 

119 timestamp = time.time() 

120 

121 try: 

122 # Monitor this process 

123 if self.track_process: 

124 process_cpu = current_process.cpu_percent(interval=None) 

125 process_memory = current_process.memory_info() 

126 

127 self.process_data.append( 

128 { 

129 "timestamp": timestamp, 

130 "cpu_percent": process_cpu, 

131 "memory_rss": process_memory.rss, # Resident Set Size in bytes 

132 "memory_vms": process_memory.vms, # Virtual Memory Size in bytes 

133 "memory_shared": getattr( 

134 process_memory, "shared", 0 

135 ), 

136 "num_threads": current_process.num_threads(), 

137 "open_files": len(current_process.open_files()), 

138 "status": current_process.status(), 

139 } 

140 ) 

141 

142 # Monitor overall system 

143 if self.track_system: 

144 system_cpu = psutil.cpu_percent(interval=None) 

145 system_memory = psutil.virtual_memory() 

146 system_disk = psutil.disk_usage("/") 

147 

148 self.system_data.append( 

149 { 

150 "timestamp": timestamp, 

151 "cpu_percent": system_cpu, 

152 "memory_total": system_memory.total, 

153 "memory_available": system_memory.available, 

154 "memory_used": system_memory.used, 

155 "memory_percent": system_memory.percent, 

156 "disk_total": system_disk.total, 

157 "disk_used": system_disk.used, 

158 "disk_percent": system_disk.percent, 

159 } 

160 ) 

161 

162 except Exception as e: 

163 logger.exception(f"Error monitoring resources: {e!s}") 

164 

165 # Sleep until next sampling interval 

166 time.sleep(self.sampling_interval) 

167 

168 @contextmanager 

169 def monitor(self): 

170 """ 

171 Context manager for monitoring resources during a block of code. 

172 

173 Example: 

174 with resource_monitor.monitor(): 

175 # Code to monitor 

176 do_something_resource_intensive() 

177 """ 

178 self.start() 

179 try: 

180 yield 

181 finally: 

182 self.stop() 

183 

184 def get_process_stats(self) -> Dict[str, Any]: 

185 """ 

186 Get statistics about this process's resource usage. 

187 

188 Returns: 

189 Dictionary with process resource usage statistics 

190 """ 

191 if not self.process_data: 

192 return {} 

193 

194 # Extract data series 

195 cpu_values = [d["cpu_percent"] for d in self.process_data] 

196 memory_values = [ 

197 d["memory_rss"] / (1024 * 1024) for d in self.process_data 

198 ] # Convert to MB 

199 

200 # Calculate statistics 

201 stats = { 

202 "start_time": self.start_time, 

203 "end_time": self.end_time, 

204 "duration": self.end_time - self.start_time 

205 if self.end_time 

206 else None, 

207 "sample_count": len(self.process_data), 

208 "cpu_min": min(cpu_values) if cpu_values else None, 

209 "cpu_max": max(cpu_values) if cpu_values else None, 

210 "cpu_avg": sum(cpu_values) / len(cpu_values) 

211 if cpu_values 

212 else None, 

213 "memory_min_mb": min(memory_values) if memory_values else None, 

214 "memory_max_mb": max(memory_values) if memory_values else None, 

215 "memory_avg_mb": ( 

216 sum(memory_values) / len(memory_values) 

217 if memory_values 

218 else None 

219 ), 

220 "thread_max": ( 

221 max(d["num_threads"] for d in self.process_data) 

222 if self.process_data 

223 else None 

224 ), 

225 } 

226 

227 return stats 

228 

229 def get_system_stats(self) -> Dict[str, Any]: 

230 """ 

231 Get statistics about overall system resource usage. 

232 

233 Returns: 

234 Dictionary with system resource usage statistics 

235 """ 

236 if not self.system_data: 

237 return {} 

238 

239 # Extract data series 

240 cpu_values = [d["cpu_percent"] for d in self.system_data] 

241 memory_values = [d["memory_percent"] for d in self.system_data] 

242 disk_values = [d["disk_percent"] for d in self.system_data] 

243 

244 # Calculate statistics 

245 stats = { 

246 "start_time": self.start_time, 

247 "end_time": self.end_time, 

248 "duration": self.end_time - self.start_time 

249 if self.end_time 

250 else None, 

251 "sample_count": len(self.system_data), 

252 "cpu_min": min(cpu_values) if cpu_values else None, 

253 "cpu_max": max(cpu_values) if cpu_values else None, 

254 "cpu_avg": sum(cpu_values) / len(cpu_values) 

255 if cpu_values 

256 else None, 

257 "memory_min_percent": min(memory_values) if memory_values else None, 

258 "memory_max_percent": max(memory_values) if memory_values else None, 

259 "memory_avg_percent": ( 

260 sum(memory_values) / len(memory_values) 

261 if memory_values 

262 else None 

263 ), 

264 "disk_min_percent": min(disk_values) if disk_values else None, 

265 "disk_max_percent": max(disk_values) if disk_values else None, 

266 "disk_avg_percent": ( 

267 sum(disk_values) / len(disk_values) if disk_values else None 

268 ), 

269 "memory_total_gb": ( 

270 self.system_data[0]["memory_total"] / (1024**3) 

271 if self.system_data 

272 else None 

273 ), 

274 "disk_total_gb": ( 

275 self.system_data[0]["disk_total"] / (1024**3) 

276 if self.system_data 

277 else None 

278 ), 

279 } 

280 

281 return stats 

282 

283 def get_combined_stats(self) -> Dict[str, Any]: 

284 """ 

285 Get combined resource usage statistics. 

286 

287 Returns: 

288 Dictionary with both process and system statistics 

289 """ 

290 process_stats = self.get_process_stats() 

291 system_stats = self.get_system_stats() 

292 

293 # Combine stats 

294 stats = { 

295 "start_time": self.start_time, 

296 "end_time": self.end_time, 

297 "duration": self.end_time - self.start_time 

298 if self.end_time 

299 else None, 

300 } 

301 

302 # Add process stats with 'process_' prefix 

303 for key, value in process_stats.items(): 

304 if key not in ["start_time", "end_time", "duration"]: 

305 stats[f"process_{key}"] = value 

306 

307 # Add system stats with 'system_' prefix 

308 for key, value in system_stats.items(): 

309 if key not in ["start_time", "end_time", "duration"]: 

310 stats[f"system_{key}"] = value 

311 

312 # Calculate derived metrics 

313 if ( 

314 process_stats.get("memory_max_mb") is not None 

315 and system_stats.get("memory_total_gb") is not None 

316 ): 

317 # Process memory as percentage of total system memory 

318 system_memory_mb = system_stats["memory_total_gb"] * 1024 

319 stats["process_memory_percent"] = ( 

320 (process_stats["memory_max_mb"] / system_memory_mb) * 100 

321 if system_memory_mb > 0 

322 else 0 

323 ) 

324 

325 return stats 

326 

327 def print_summary(self): 

328 """Print a formatted summary of resource usage.""" 

329 process_stats = self.get_process_stats() 

330 system_stats = self.get_system_stats() 

331 

332 print("\n===== RESOURCE USAGE SUMMARY =====") 

333 

334 if process_stats: 

335 print("\n--- Process Resources ---") 

336 print( 

337 f"CPU usage: {process_stats.get('cpu_avg', 0):.1f}% avg, " 

338 f"{process_stats.get('cpu_max', 0):.1f}% peak" 

339 ) 

340 print( 

341 f"Memory usage: {process_stats.get('memory_avg_mb', 0):.1f} MB avg, " 

342 f"{process_stats.get('memory_max_mb', 0):.1f} MB peak" 

343 ) 

344 print(f"Threads: {process_stats.get('thread_max', 0)} max") 

345 

346 if system_stats: 

347 print("\n--- System Resources ---") 

348 print( 

349 f"CPU usage: {system_stats.get('cpu_avg', 0):.1f}% avg, " 

350 f"{system_stats.get('cpu_max', 0):.1f}% peak" 

351 ) 

352 print( 

353 f"Memory usage: {system_stats.get('memory_avg_percent', 0):.1f}% avg, " 

354 f"{system_stats.get('memory_max_percent', 0):.1f}% peak " 

355 f"(Total: {system_stats.get('memory_total_gb', 0):.1f} GB)" 

356 ) 

357 print( 

358 f"Disk usage: {system_stats.get('disk_avg_percent', 0):.1f}% avg " 

359 f"(Total: {system_stats.get('disk_total_gb', 0):.1f} GB)" 

360 ) 

361 

362 print("\n===================================") 

363 

364 def export_data(self) -> Dict[str, Any]: 

365 """ 

366 Export all collected data. 

367 

368 Returns: 

369 Dictionary with all collected resource usage data 

370 """ 

371 return { 

372 "start_time": self.start_time, 

373 "end_time": self.end_time, 

374 "sampling_interval": self.sampling_interval, 

375 "process_data": self.process_data, 

376 "system_data": self.system_data, 

377 } 

378 

379 

380def check_system_resources() -> Dict[str, Any]: 

381 """ 

382 Check current system resources. 

383 

384 Returns: 

385 Dictionary with current resource usage information 

386 """ 

387 if not PSUTIL_AVAILABLE: 

388 return {"error": "psutil not available", "available": False} 

389 

390 try: 

391 # Get basic system information 

392 cpu_count = psutil.cpu_count(logical=True) 

393 cpu_physical = psutil.cpu_count(logical=False) 

394 cpu_percent = psutil.cpu_percent(interval=0.1) 

395 

396 memory = psutil.virtual_memory() 

397 disk = psutil.disk_usage("/") 

398 

399 # Format results 

400 result = { 

401 "available": True, 

402 "cpu_count": cpu_count, 

403 "cpu_physical": cpu_physical, 

404 "cpu_percent": cpu_percent, 

405 "memory_total_gb": memory.total / (1024**3), 

406 "memory_available_gb": memory.available / (1024**3), 

407 "memory_used_gb": memory.used / (1024**3), 

408 "memory_percent": memory.percent, 

409 "disk_total_gb": disk.total / (1024**3), 

410 "disk_free_gb": disk.free / (1024**3), 

411 "disk_percent": disk.percent, 

412 } 

413 

414 return result 

415 

416 except Exception as e: 

417 logger.exception(f"Error checking system resources: {e!s}") 

418 return {"error": str(e), "available": False}