Coverage for src / local_deep_research / benchmarks / efficiency / resource_monitor.py: 98%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Resource monitoring tools for Local Deep Research. 

3 

4This module provides functionality for tracking CPU, memory and other 

5system resource usage during the research process. 

6""" 

7 

8import threading 

9import time 

10from contextlib import contextmanager 

11from typing import Any, Dict 

12 

13from loguru import logger 

14 

15# Try to import psutil, but don't fail if not available 

16try: 

17 import psutil 

18 

19 PSUTIL_AVAILABLE = True 

20except ImportError: 

21 PSUTIL_AVAILABLE = False 

22 logger.warning("psutil not available, resource monitoring will be limited") 

23 

24 

25class ResourceMonitor: 

26 """ 

27 Monitor system resource usage during research. 

28 

29 This class provides methods for tracking CPU, memory, and disk usage 

30 during system execution. It can be used to identify resource bottlenecks 

31 and optimize configurations for different hardware environments. 

32 """ 

33 

34 def __init__( 

35 self, 

36 sampling_interval: float = 1.0, 

37 track_process: bool = True, 

38 track_system: bool = True, 

39 ): 

40 """ 

41 Initialize the resource monitor. 

42 

43 Args: 

44 sampling_interval: Seconds between resource usage measurements 

45 track_process: Whether to track this process's resource usage 

46 track_system: Whether to track overall system resource usage 

47 """ 

48 self.sampling_interval = sampling_interval 

49 self.track_process = track_process 

50 self.track_system = track_system 

51 

52 self.monitoring = False 

53 self.monitor_thread = None 

54 

55 # Resource usage data 

56 self.process_data = [] 

57 self.system_data = [] 

58 self.start_time = None 

59 self.end_time = None 

60 

61 # Check if we can monitor resources 

62 self.can_monitor = PSUTIL_AVAILABLE 

63 if not self.can_monitor: 

64 logger.warning( 

65 "Resource monitoring requires psutil. Install with: pip install psutil" 

66 ) 

67 

68 def start(self): 

69 """Start monitoring resource usage.""" 

70 if not self.can_monitor: 

71 logger.warning( 

72 "Resource monitoring not available (psutil not installed)" 

73 ) 

74 return 

75 

76 if self.monitoring: 

77 logger.warning("Resource monitoring already started") 

78 return 

79 

80 self.process_data = [] 

81 self.system_data = [] 

82 self.start_time = time.time() 

83 self.monitoring = True 

84 

85 # Start monitoring in a background thread 

86 self.monitor_thread = threading.Thread( 

87 target=self._monitor_resources, daemon=True 

88 ) 

89 self.monitor_thread.start() 

90 

91 logger.info( 

92 f"Resource monitoring started with {self.sampling_interval}s interval" 

93 ) 

94 

95 def stop(self): 

96 """Stop monitoring resource usage.""" 

97 if not self.monitoring: 

98 return 

99 

100 self.monitoring = False 

101 self.end_time = time.time() 

102 

103 # Wait for the monitoring thread to finish 

104 if self.monitor_thread: 

105 self.monitor_thread.join(timeout=2.0) 

106 self.monitor_thread = None 

107 

108 logger.info("Resource monitoring stopped") 

109 

110 def _monitor_resources(self): 

111 """Background thread that collects resource usage data.""" 

112 if not PSUTIL_AVAILABLE: 

113 return 

114 

115 # Get this process 

116 current_process = psutil.Process() 

117 

118 while self.monitoring: 

119 timestamp = time.time() 

120 

121 try: 

122 # Monitor this process 

123 if self.track_process: 

124 process_cpu = current_process.cpu_percent(interval=None) 

125 process_memory = current_process.memory_info() 

126 

127 self.process_data.append( 

128 { 

129 "timestamp": timestamp, 

130 "cpu_percent": process_cpu, 

131 "memory_rss": process_memory.rss, # Resident Set Size in bytes 

132 "memory_vms": process_memory.vms, # Virtual Memory Size in bytes 

133 "memory_shared": getattr( 

134 process_memory, "shared", 0 

135 ), 

136 "num_threads": current_process.num_threads(), 

137 "open_files": len(current_process.open_files()), 

138 "status": current_process.status(), 

139 } 

140 ) 

141 

142 # Monitor overall system 

143 if self.track_system: 

144 system_cpu = psutil.cpu_percent(interval=None) 

145 system_memory = psutil.virtual_memory() 

146 system_disk = psutil.disk_usage("/") 

147 

148 self.system_data.append( 

149 { 

150 "timestamp": timestamp, 

151 "cpu_percent": system_cpu, 

152 "memory_total": system_memory.total, 

153 "memory_available": system_memory.available, 

154 "memory_used": system_memory.used, 

155 "memory_percent": system_memory.percent, 

156 "disk_total": system_disk.total, 

157 "disk_used": system_disk.used, 

158 "disk_percent": system_disk.percent, 

159 } 

160 ) 

161 

162 except Exception: 

163 logger.exception("Error monitoring resources") 

164 

165 # Sleep until next sampling interval 

166 time.sleep(self.sampling_interval) 

167 

168 @contextmanager 

169 def monitor(self): 

170 """ 

171 Context manager for monitoring resources during a block of code. 

172 

173 Example: 

174 with resource_monitor.monitor(): 

175 # Code to monitor 

176 do_something_resource_intensive() 

177 """ 

178 self.start() 

179 try: 

180 yield 

181 finally: 

182 self.stop() 

183 

184 def get_process_stats(self) -> Dict[str, Any]: 

185 """ 

186 Get statistics about this process's resource usage. 

187 

188 Returns: 

189 Dictionary with process resource usage statistics 

190 """ 

191 if not self.process_data: 

192 return {} 

193 

194 # Extract data series 

195 cpu_values = [d["cpu_percent"] for d in self.process_data] 

196 memory_values = [ 

197 d["memory_rss"] / (1024 * 1024) for d in self.process_data 

198 ] # Convert to MB 

199 

200 # Calculate statistics 

201 return { 

202 "start_time": self.start_time, 

203 "end_time": self.end_time, 

204 "duration": self.end_time - self.start_time 

205 if self.end_time 

206 else None, 

207 "sample_count": len(self.process_data), 

208 "cpu_min": min(cpu_values) if cpu_values else None, 

209 "cpu_max": max(cpu_values) if cpu_values else None, 

210 "cpu_avg": sum(cpu_values) / len(cpu_values) 

211 if cpu_values 

212 else None, 

213 "memory_min_mb": min(memory_values) if memory_values else None, 

214 "memory_max_mb": max(memory_values) if memory_values else None, 

215 "memory_avg_mb": ( 

216 sum(memory_values) / len(memory_values) 

217 if memory_values 

218 else None 

219 ), 

220 "thread_max": ( 

221 max(d["num_threads"] for d in self.process_data) 

222 if self.process_data 

223 else None 

224 ), 

225 } 

226 

227 def get_system_stats(self) -> Dict[str, Any]: 

228 """ 

229 Get statistics about overall system resource usage. 

230 

231 Returns: 

232 Dictionary with system resource usage statistics 

233 """ 

234 if not self.system_data: 

235 return {} 

236 

237 # Extract data series 

238 cpu_values = [d["cpu_percent"] for d in self.system_data] 

239 memory_values = [d["memory_percent"] for d in self.system_data] 

240 disk_values = [d["disk_percent"] for d in self.system_data] 

241 

242 # Calculate statistics 

243 return { 

244 "start_time": self.start_time, 

245 "end_time": self.end_time, 

246 "duration": self.end_time - self.start_time 

247 if self.end_time 

248 else None, 

249 "sample_count": len(self.system_data), 

250 "cpu_min": min(cpu_values) if cpu_values else None, 

251 "cpu_max": max(cpu_values) if cpu_values else None, 

252 "cpu_avg": sum(cpu_values) / len(cpu_values) 

253 if cpu_values 

254 else None, 

255 "memory_min_percent": min(memory_values) if memory_values else None, 

256 "memory_max_percent": max(memory_values) if memory_values else None, 

257 "memory_avg_percent": ( 

258 sum(memory_values) / len(memory_values) 

259 if memory_values 

260 else None 

261 ), 

262 "disk_min_percent": min(disk_values) if disk_values else None, 

263 "disk_max_percent": max(disk_values) if disk_values else None, 

264 "disk_avg_percent": ( 

265 sum(disk_values) / len(disk_values) if disk_values else None 

266 ), 

267 "memory_total_gb": ( 

268 self.system_data[0]["memory_total"] / (1024**3) 

269 if self.system_data 

270 else None 

271 ), 

272 "disk_total_gb": ( 

273 self.system_data[0]["disk_total"] / (1024**3) 

274 if self.system_data 

275 else None 

276 ), 

277 } 

278 

279 def get_combined_stats(self) -> Dict[str, Any]: 

280 """ 

281 Get combined resource usage statistics. 

282 

283 Returns: 

284 Dictionary with both process and system statistics 

285 """ 

286 process_stats = self.get_process_stats() 

287 system_stats = self.get_system_stats() 

288 

289 # Combine stats 

290 stats = { 

291 "start_time": self.start_time, 

292 "end_time": self.end_time, 

293 "duration": self.end_time - self.start_time 

294 if self.end_time 

295 else None, 

296 } 

297 

298 # Add process stats with 'process_' prefix 

299 for key, value in process_stats.items(): 

300 if key not in ["start_time", "end_time", "duration"]: 

301 stats[f"process_{key}"] = value 

302 

303 # Add system stats with 'system_' prefix 

304 for key, value in system_stats.items(): 

305 if key not in ["start_time", "end_time", "duration"]: 

306 stats[f"system_{key}"] = value 

307 

308 # Calculate derived metrics 

309 if ( 

310 process_stats.get("memory_max_mb") is not None 

311 and system_stats.get("memory_total_gb") is not None 

312 ): 

313 # Process memory as percentage of total system memory 

314 system_memory_mb = system_stats["memory_total_gb"] * 1024 

315 stats["process_memory_percent"] = ( 

316 (process_stats["memory_max_mb"] / system_memory_mb) * 100 

317 if system_memory_mb > 0 

318 else 0 

319 ) 

320 

321 return stats 

322 

323 def print_summary(self): 

324 """Print a formatted summary of resource usage.""" 

325 process_stats = self.get_process_stats() 

326 system_stats = self.get_system_stats() 

327 

328 print("\n===== RESOURCE USAGE SUMMARY =====") 

329 

330 if process_stats: 

331 print("\n--- Process Resources ---") 

332 print( 

333 f"CPU usage: {process_stats.get('cpu_avg', 0):.1f}% avg, " 

334 f"{process_stats.get('cpu_max', 0):.1f}% peak" 

335 ) 

336 print( 

337 f"Memory usage: {process_stats.get('memory_avg_mb', 0):.1f} MB avg, " 

338 f"{process_stats.get('memory_max_mb', 0):.1f} MB peak" 

339 ) 

340 print(f"Threads: {process_stats.get('thread_max', 0)} max") 

341 

342 if system_stats: 

343 print("\n--- System Resources ---") 

344 print( 

345 f"CPU usage: {system_stats.get('cpu_avg', 0):.1f}% avg, " 

346 f"{system_stats.get('cpu_max', 0):.1f}% peak" 

347 ) 

348 print( 

349 f"Memory usage: {system_stats.get('memory_avg_percent', 0):.1f}% avg, " 

350 f"{system_stats.get('memory_max_percent', 0):.1f}% peak " 

351 f"(Total: {system_stats.get('memory_total_gb', 0):.1f} GB)" 

352 ) 

353 print( 

354 f"Disk usage: {system_stats.get('disk_avg_percent', 0):.1f}% avg " 

355 f"(Total: {system_stats.get('disk_total_gb', 0):.1f} GB)" 

356 ) 

357 

358 print("\n===================================") 

359 

360 def export_data(self) -> Dict[str, Any]: 

361 """ 

362 Export all collected data. 

363 

364 Returns: 

365 Dictionary with all collected resource usage data 

366 """ 

367 return { 

368 "start_time": self.start_time, 

369 "end_time": self.end_time, 

370 "sampling_interval": self.sampling_interval, 

371 "process_data": self.process_data, 

372 "system_data": self.system_data, 

373 } 

374 

375 

376def check_system_resources() -> Dict[str, Any]: 

377 """ 

378 Check current system resources. 

379 

380 Returns: 

381 Dictionary with current resource usage information 

382 """ 

383 if not PSUTIL_AVAILABLE: 

384 return {"error": "psutil not available", "available": False} 

385 

386 try: 

387 # Get basic system information 

388 cpu_count = psutil.cpu_count(logical=True) 

389 cpu_physical = psutil.cpu_count(logical=False) 

390 cpu_percent = psutil.cpu_percent(interval=0.1) 

391 

392 memory = psutil.virtual_memory() 

393 disk = psutil.disk_usage("/") 

394 

395 # Format results 

396 return { 

397 "available": True, 

398 "cpu_count": cpu_count, 

399 "cpu_physical": cpu_physical, 

400 "cpu_percent": cpu_percent, 

401 "memory_total_gb": memory.total / (1024**3), 

402 "memory_available_gb": memory.available / (1024**3), 

403 "memory_used_gb": memory.used / (1024**3), 

404 "memory_percent": memory.percent, 

405 "disk_total_gb": disk.total / (1024**3), 

406 "disk_free_gb": disk.free / (1024**3), 

407 "disk_percent": disk.percent, 

408 } 

409 

410 except Exception as e: 

411 logger.exception("Error checking system resources") 

412 return {"error": str(e), "available": False}