Coverage for src / local_deep_research / benchmarks / efficiency / resource_monitor.py: 98%
130 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Resource monitoring tools for Local Deep Research.
4This module provides functionality for tracking CPU, memory and other
5system resource usage during the research process.
6"""
8import threading
9import time
10from contextlib import contextmanager
11from typing import Any, Dict
13from loguru import logger
15# Try to import psutil, but don't fail if not available
16try:
17 import psutil
19 PSUTIL_AVAILABLE = True
20except ImportError:
21 PSUTIL_AVAILABLE = False
22 logger.warning("psutil not available, resource monitoring will be limited")
25class ResourceMonitor:
26 """
27 Monitor system resource usage during research.
29 This class provides methods for tracking CPU, memory, and disk usage
30 during system execution. It can be used to identify resource bottlenecks
31 and optimize configurations for different hardware environments.
32 """
34 def __init__(
35 self,
36 sampling_interval: float = 1.0,
37 track_process: bool = True,
38 track_system: bool = True,
39 ):
40 """
41 Initialize the resource monitor.
43 Args:
44 sampling_interval: Seconds between resource usage measurements
45 track_process: Whether to track this process's resource usage
46 track_system: Whether to track overall system resource usage
47 """
48 self.sampling_interval = sampling_interval
49 self.track_process = track_process
50 self.track_system = track_system
52 self.monitoring = False
53 self.monitor_thread = None
55 # Resource usage data
56 self.process_data = []
57 self.system_data = []
58 self.start_time = None
59 self.end_time = None
61 # Check if we can monitor resources
62 self.can_monitor = PSUTIL_AVAILABLE
63 if not self.can_monitor:
64 logger.warning(
65 "Resource monitoring requires psutil. Install with: pip install psutil"
66 )
68 def start(self):
69 """Start monitoring resource usage."""
70 if not self.can_monitor:
71 logger.warning(
72 "Resource monitoring not available (psutil not installed)"
73 )
74 return
76 if self.monitoring:
77 logger.warning("Resource monitoring already started")
78 return
80 self.process_data = []
81 self.system_data = []
82 self.start_time = time.time()
83 self.monitoring = True
85 # Start monitoring in a background thread
86 self.monitor_thread = threading.Thread(
87 target=self._monitor_resources, daemon=True
88 )
89 self.monitor_thread.start()
91 logger.info(
92 f"Resource monitoring started with {self.sampling_interval}s interval"
93 )
95 def stop(self):
96 """Stop monitoring resource usage."""
97 if not self.monitoring:
98 return
100 self.monitoring = False
101 self.end_time = time.time()
103 # Wait for the monitoring thread to finish
104 if self.monitor_thread:
105 self.monitor_thread.join(timeout=2.0)
106 self.monitor_thread = None
108 logger.info("Resource monitoring stopped")
110 def _monitor_resources(self):
111 """Background thread that collects resource usage data."""
112 if not PSUTIL_AVAILABLE:
113 return
115 # Get this process
116 current_process = psutil.Process()
118 while self.monitoring:
119 timestamp = time.time()
121 try:
122 # Monitor this process
123 if self.track_process:
124 process_cpu = current_process.cpu_percent(interval=None)
125 process_memory = current_process.memory_info()
127 self.process_data.append(
128 {
129 "timestamp": timestamp,
130 "cpu_percent": process_cpu,
131 "memory_rss": process_memory.rss, # Resident Set Size in bytes
132 "memory_vms": process_memory.vms, # Virtual Memory Size in bytes
133 "memory_shared": getattr(
134 process_memory, "shared", 0
135 ),
136 "num_threads": current_process.num_threads(),
137 "open_files": len(current_process.open_files()),
138 "status": current_process.status(),
139 }
140 )
142 # Monitor overall system
143 if self.track_system:
144 system_cpu = psutil.cpu_percent(interval=None)
145 system_memory = psutil.virtual_memory()
146 system_disk = psutil.disk_usage("/")
148 self.system_data.append(
149 {
150 "timestamp": timestamp,
151 "cpu_percent": system_cpu,
152 "memory_total": system_memory.total,
153 "memory_available": system_memory.available,
154 "memory_used": system_memory.used,
155 "memory_percent": system_memory.percent,
156 "disk_total": system_disk.total,
157 "disk_used": system_disk.used,
158 "disk_percent": system_disk.percent,
159 }
160 )
162 except Exception:
163 logger.exception("Error monitoring resources")
165 # Sleep until next sampling interval
166 time.sleep(self.sampling_interval)
168 @contextmanager
169 def monitor(self):
170 """
171 Context manager for monitoring resources during a block of code.
173 Example:
174 with resource_monitor.monitor():
175 # Code to monitor
176 do_something_resource_intensive()
177 """
178 self.start()
179 try:
180 yield
181 finally:
182 self.stop()
184 def get_process_stats(self) -> Dict[str, Any]:
185 """
186 Get statistics about this process's resource usage.
188 Returns:
189 Dictionary with process resource usage statistics
190 """
191 if not self.process_data:
192 return {}
194 # Extract data series
195 cpu_values = [d["cpu_percent"] for d in self.process_data]
196 memory_values = [
197 d["memory_rss"] / (1024 * 1024) for d in self.process_data
198 ] # Convert to MB
200 # Calculate statistics
201 return {
202 "start_time": self.start_time,
203 "end_time": self.end_time,
204 "duration": self.end_time - self.start_time
205 if self.end_time
206 else None,
207 "sample_count": len(self.process_data),
208 "cpu_min": min(cpu_values) if cpu_values else None,
209 "cpu_max": max(cpu_values) if cpu_values else None,
210 "cpu_avg": sum(cpu_values) / len(cpu_values)
211 if cpu_values
212 else None,
213 "memory_min_mb": min(memory_values) if memory_values else None,
214 "memory_max_mb": max(memory_values) if memory_values else None,
215 "memory_avg_mb": (
216 sum(memory_values) / len(memory_values)
217 if memory_values
218 else None
219 ),
220 "thread_max": (
221 max(d["num_threads"] for d in self.process_data)
222 if self.process_data
223 else None
224 ),
225 }
227 def get_system_stats(self) -> Dict[str, Any]:
228 """
229 Get statistics about overall system resource usage.
231 Returns:
232 Dictionary with system resource usage statistics
233 """
234 if not self.system_data:
235 return {}
237 # Extract data series
238 cpu_values = [d["cpu_percent"] for d in self.system_data]
239 memory_values = [d["memory_percent"] for d in self.system_data]
240 disk_values = [d["disk_percent"] for d in self.system_data]
242 # Calculate statistics
243 return {
244 "start_time": self.start_time,
245 "end_time": self.end_time,
246 "duration": self.end_time - self.start_time
247 if self.end_time
248 else None,
249 "sample_count": len(self.system_data),
250 "cpu_min": min(cpu_values) if cpu_values else None,
251 "cpu_max": max(cpu_values) if cpu_values else None,
252 "cpu_avg": sum(cpu_values) / len(cpu_values)
253 if cpu_values
254 else None,
255 "memory_min_percent": min(memory_values) if memory_values else None,
256 "memory_max_percent": max(memory_values) if memory_values else None,
257 "memory_avg_percent": (
258 sum(memory_values) / len(memory_values)
259 if memory_values
260 else None
261 ),
262 "disk_min_percent": min(disk_values) if disk_values else None,
263 "disk_max_percent": max(disk_values) if disk_values else None,
264 "disk_avg_percent": (
265 sum(disk_values) / len(disk_values) if disk_values else None
266 ),
267 "memory_total_gb": (
268 self.system_data[0]["memory_total"] / (1024**3)
269 if self.system_data
270 else None
271 ),
272 "disk_total_gb": (
273 self.system_data[0]["disk_total"] / (1024**3)
274 if self.system_data
275 else None
276 ),
277 }
279 def get_combined_stats(self) -> Dict[str, Any]:
280 """
281 Get combined resource usage statistics.
283 Returns:
284 Dictionary with both process and system statistics
285 """
286 process_stats = self.get_process_stats()
287 system_stats = self.get_system_stats()
289 # Combine stats
290 stats = {
291 "start_time": self.start_time,
292 "end_time": self.end_time,
293 "duration": self.end_time - self.start_time
294 if self.end_time
295 else None,
296 }
298 # Add process stats with 'process_' prefix
299 for key, value in process_stats.items():
300 if key not in ["start_time", "end_time", "duration"]:
301 stats[f"process_{key}"] = value
303 # Add system stats with 'system_' prefix
304 for key, value in system_stats.items():
305 if key not in ["start_time", "end_time", "duration"]:
306 stats[f"system_{key}"] = value
308 # Calculate derived metrics
309 if (
310 process_stats.get("memory_max_mb") is not None
311 and system_stats.get("memory_total_gb") is not None
312 ):
313 # Process memory as percentage of total system memory
314 system_memory_mb = system_stats["memory_total_gb"] * 1024
315 stats["process_memory_percent"] = (
316 (process_stats["memory_max_mb"] / system_memory_mb) * 100
317 if system_memory_mb > 0
318 else 0
319 )
321 return stats
323 def print_summary(self):
324 """Print a formatted summary of resource usage."""
325 process_stats = self.get_process_stats()
326 system_stats = self.get_system_stats()
328 print("\n===== RESOURCE USAGE SUMMARY =====")
330 if process_stats:
331 print("\n--- Process Resources ---")
332 print(
333 f"CPU usage: {process_stats.get('cpu_avg', 0):.1f}% avg, "
334 f"{process_stats.get('cpu_max', 0):.1f}% peak"
335 )
336 print(
337 f"Memory usage: {process_stats.get('memory_avg_mb', 0):.1f} MB avg, "
338 f"{process_stats.get('memory_max_mb', 0):.1f} MB peak"
339 )
340 print(f"Threads: {process_stats.get('thread_max', 0)} max")
342 if system_stats:
343 print("\n--- System Resources ---")
344 print(
345 f"CPU usage: {system_stats.get('cpu_avg', 0):.1f}% avg, "
346 f"{system_stats.get('cpu_max', 0):.1f}% peak"
347 )
348 print(
349 f"Memory usage: {system_stats.get('memory_avg_percent', 0):.1f}% avg, "
350 f"{system_stats.get('memory_max_percent', 0):.1f}% peak "
351 f"(Total: {system_stats.get('memory_total_gb', 0):.1f} GB)"
352 )
353 print(
354 f"Disk usage: {system_stats.get('disk_avg_percent', 0):.1f}% avg "
355 f"(Total: {system_stats.get('disk_total_gb', 0):.1f} GB)"
356 )
358 print("\n===================================")
360 def export_data(self) -> Dict[str, Any]:
361 """
362 Export all collected data.
364 Returns:
365 Dictionary with all collected resource usage data
366 """
367 return {
368 "start_time": self.start_time,
369 "end_time": self.end_time,
370 "sampling_interval": self.sampling_interval,
371 "process_data": self.process_data,
372 "system_data": self.system_data,
373 }
376def check_system_resources() -> Dict[str, Any]:
377 """
378 Check current system resources.
380 Returns:
381 Dictionary with current resource usage information
382 """
383 if not PSUTIL_AVAILABLE:
384 return {"error": "psutil not available", "available": False}
386 try:
387 # Get basic system information
388 cpu_count = psutil.cpu_count(logical=True)
389 cpu_physical = psutil.cpu_count(logical=False)
390 cpu_percent = psutil.cpu_percent(interval=0.1)
392 memory = psutil.virtual_memory()
393 disk = psutil.disk_usage("/")
395 # Format results
396 return {
397 "available": True,
398 "cpu_count": cpu_count,
399 "cpu_physical": cpu_physical,
400 "cpu_percent": cpu_percent,
401 "memory_total_gb": memory.total / (1024**3),
402 "memory_available_gb": memory.available / (1024**3),
403 "memory_used_gb": memory.used / (1024**3),
404 "memory_percent": memory.percent,
405 "disk_total_gb": disk.total / (1024**3),
406 "disk_free_gb": disk.free / (1024**3),
407 "disk_percent": disk.percent,
408 }
410 except Exception as e:
411 logger.exception("Error checking system resources")
412 return {"error": str(e), "available": False}