Coverage for src / local_deep_research / benchmarks / efficiency / resource_monitor.py: 12%
133 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Resource monitoring tools for Local Deep Research.
4This module provides functionality for tracking CPU, memory and other
5system resource usage during the research process.
6"""
8import threading
9import time
10from contextlib import contextmanager
11from typing import Any, Dict
13from loguru import logger
15# Try to import psutil, but don't fail if not available
16try:
17 import psutil
19 PSUTIL_AVAILABLE = True
20except ImportError:
21 PSUTIL_AVAILABLE = False
22 logger.warning("psutil not available, resource monitoring will be limited")
25class ResourceMonitor:
26 """
27 Monitor system resource usage during research.
29 This class provides methods for tracking CPU, memory, and disk usage
30 during system execution. It can be used to identify resource bottlenecks
31 and optimize configurations for different hardware environments.
32 """
34 def __init__(
35 self,
36 sampling_interval: float = 1.0,
37 track_process: bool = True,
38 track_system: bool = True,
39 ):
40 """
41 Initialize the resource monitor.
43 Args:
44 sampling_interval: Seconds between resource usage measurements
45 track_process: Whether to track this process's resource usage
46 track_system: Whether to track overall system resource usage
47 """
48 self.sampling_interval = sampling_interval
49 self.track_process = track_process
50 self.track_system = track_system
52 self.monitoring = False
53 self.monitor_thread = None
55 # Resource usage data
56 self.process_data = []
57 self.system_data = []
58 self.start_time = None
59 self.end_time = None
61 # Check if we can monitor resources
62 self.can_monitor = PSUTIL_AVAILABLE
63 if not self.can_monitor:
64 logger.warning(
65 "Resource monitoring requires psutil. Install with: pip install psutil"
66 )
68 def start(self):
69 """Start monitoring resource usage."""
70 if not self.can_monitor:
71 logger.warning(
72 "Resource monitoring not available (psutil not installed)"
73 )
74 return
76 if self.monitoring:
77 logger.warning("Resource monitoring already started")
78 return
80 self.process_data = []
81 self.system_data = []
82 self.start_time = time.time()
83 self.monitoring = True
85 # Start monitoring in a background thread
86 self.monitor_thread = threading.Thread(
87 target=self._monitor_resources, daemon=True
88 )
89 self.monitor_thread.start()
91 logger.info(
92 f"Resource monitoring started with {self.sampling_interval}s interval"
93 )
95 def stop(self):
96 """Stop monitoring resource usage."""
97 if not self.monitoring:
98 return
100 self.monitoring = False
101 self.end_time = time.time()
103 # Wait for the monitoring thread to finish
104 if self.monitor_thread:
105 self.monitor_thread.join(timeout=2.0)
106 self.monitor_thread = None
108 logger.info("Resource monitoring stopped")
110 def _monitor_resources(self):
111 """Background thread that collects resource usage data."""
112 if not PSUTIL_AVAILABLE:
113 return
115 # Get this process
116 current_process = psutil.Process()
118 while self.monitoring:
119 timestamp = time.time()
121 try:
122 # Monitor this process
123 if self.track_process:
124 process_cpu = current_process.cpu_percent(interval=None)
125 process_memory = current_process.memory_info()
127 self.process_data.append(
128 {
129 "timestamp": timestamp,
130 "cpu_percent": process_cpu,
131 "memory_rss": process_memory.rss, # Resident Set Size in bytes
132 "memory_vms": process_memory.vms, # Virtual Memory Size in bytes
133 "memory_shared": getattr(
134 process_memory, "shared", 0
135 ),
136 "num_threads": current_process.num_threads(),
137 "open_files": len(current_process.open_files()),
138 "status": current_process.status(),
139 }
140 )
142 # Monitor overall system
143 if self.track_system:
144 system_cpu = psutil.cpu_percent(interval=None)
145 system_memory = psutil.virtual_memory()
146 system_disk = psutil.disk_usage("/")
148 self.system_data.append(
149 {
150 "timestamp": timestamp,
151 "cpu_percent": system_cpu,
152 "memory_total": system_memory.total,
153 "memory_available": system_memory.available,
154 "memory_used": system_memory.used,
155 "memory_percent": system_memory.percent,
156 "disk_total": system_disk.total,
157 "disk_used": system_disk.used,
158 "disk_percent": system_disk.percent,
159 }
160 )
162 except Exception as e:
163 logger.exception(f"Error monitoring resources: {e!s}")
165 # Sleep until next sampling interval
166 time.sleep(self.sampling_interval)
168 @contextmanager
169 def monitor(self):
170 """
171 Context manager for monitoring resources during a block of code.
173 Example:
174 with resource_monitor.monitor():
175 # Code to monitor
176 do_something_resource_intensive()
177 """
178 self.start()
179 try:
180 yield
181 finally:
182 self.stop()
184 def get_process_stats(self) -> Dict[str, Any]:
185 """
186 Get statistics about this process's resource usage.
188 Returns:
189 Dictionary with process resource usage statistics
190 """
191 if not self.process_data:
192 return {}
194 # Extract data series
195 cpu_values = [d["cpu_percent"] for d in self.process_data]
196 memory_values = [
197 d["memory_rss"] / (1024 * 1024) for d in self.process_data
198 ] # Convert to MB
200 # Calculate statistics
201 stats = {
202 "start_time": self.start_time,
203 "end_time": self.end_time,
204 "duration": self.end_time - self.start_time
205 if self.end_time
206 else None,
207 "sample_count": len(self.process_data),
208 "cpu_min": min(cpu_values) if cpu_values else None,
209 "cpu_max": max(cpu_values) if cpu_values else None,
210 "cpu_avg": sum(cpu_values) / len(cpu_values)
211 if cpu_values
212 else None,
213 "memory_min_mb": min(memory_values) if memory_values else None,
214 "memory_max_mb": max(memory_values) if memory_values else None,
215 "memory_avg_mb": (
216 sum(memory_values) / len(memory_values)
217 if memory_values
218 else None
219 ),
220 "thread_max": (
221 max(d["num_threads"] for d in self.process_data)
222 if self.process_data
223 else None
224 ),
225 }
227 return stats
229 def get_system_stats(self) -> Dict[str, Any]:
230 """
231 Get statistics about overall system resource usage.
233 Returns:
234 Dictionary with system resource usage statistics
235 """
236 if not self.system_data:
237 return {}
239 # Extract data series
240 cpu_values = [d["cpu_percent"] for d in self.system_data]
241 memory_values = [d["memory_percent"] for d in self.system_data]
242 disk_values = [d["disk_percent"] for d in self.system_data]
244 # Calculate statistics
245 stats = {
246 "start_time": self.start_time,
247 "end_time": self.end_time,
248 "duration": self.end_time - self.start_time
249 if self.end_time
250 else None,
251 "sample_count": len(self.system_data),
252 "cpu_min": min(cpu_values) if cpu_values else None,
253 "cpu_max": max(cpu_values) if cpu_values else None,
254 "cpu_avg": sum(cpu_values) / len(cpu_values)
255 if cpu_values
256 else None,
257 "memory_min_percent": min(memory_values) if memory_values else None,
258 "memory_max_percent": max(memory_values) if memory_values else None,
259 "memory_avg_percent": (
260 sum(memory_values) / len(memory_values)
261 if memory_values
262 else None
263 ),
264 "disk_min_percent": min(disk_values) if disk_values else None,
265 "disk_max_percent": max(disk_values) if disk_values else None,
266 "disk_avg_percent": (
267 sum(disk_values) / len(disk_values) if disk_values else None
268 ),
269 "memory_total_gb": (
270 self.system_data[0]["memory_total"] / (1024**3)
271 if self.system_data
272 else None
273 ),
274 "disk_total_gb": (
275 self.system_data[0]["disk_total"] / (1024**3)
276 if self.system_data
277 else None
278 ),
279 }
281 return stats
283 def get_combined_stats(self) -> Dict[str, Any]:
284 """
285 Get combined resource usage statistics.
287 Returns:
288 Dictionary with both process and system statistics
289 """
290 process_stats = self.get_process_stats()
291 system_stats = self.get_system_stats()
293 # Combine stats
294 stats = {
295 "start_time": self.start_time,
296 "end_time": self.end_time,
297 "duration": self.end_time - self.start_time
298 if self.end_time
299 else None,
300 }
302 # Add process stats with 'process_' prefix
303 for key, value in process_stats.items():
304 if key not in ["start_time", "end_time", "duration"]:
305 stats[f"process_{key}"] = value
307 # Add system stats with 'system_' prefix
308 for key, value in system_stats.items():
309 if key not in ["start_time", "end_time", "duration"]:
310 stats[f"system_{key}"] = value
312 # Calculate derived metrics
313 if (
314 process_stats.get("memory_max_mb") is not None
315 and system_stats.get("memory_total_gb") is not None
316 ):
317 # Process memory as percentage of total system memory
318 system_memory_mb = system_stats["memory_total_gb"] * 1024
319 stats["process_memory_percent"] = (
320 (process_stats["memory_max_mb"] / system_memory_mb) * 100
321 if system_memory_mb > 0
322 else 0
323 )
325 return stats
327 def print_summary(self):
328 """Print a formatted summary of resource usage."""
329 process_stats = self.get_process_stats()
330 system_stats = self.get_system_stats()
332 print("\n===== RESOURCE USAGE SUMMARY =====")
334 if process_stats:
335 print("\n--- Process Resources ---")
336 print(
337 f"CPU usage: {process_stats.get('cpu_avg', 0):.1f}% avg, "
338 f"{process_stats.get('cpu_max', 0):.1f}% peak"
339 )
340 print(
341 f"Memory usage: {process_stats.get('memory_avg_mb', 0):.1f} MB avg, "
342 f"{process_stats.get('memory_max_mb', 0):.1f} MB peak"
343 )
344 print(f"Threads: {process_stats.get('thread_max', 0)} max")
346 if system_stats:
347 print("\n--- System Resources ---")
348 print(
349 f"CPU usage: {system_stats.get('cpu_avg', 0):.1f}% avg, "
350 f"{system_stats.get('cpu_max', 0):.1f}% peak"
351 )
352 print(
353 f"Memory usage: {system_stats.get('memory_avg_percent', 0):.1f}% avg, "
354 f"{system_stats.get('memory_max_percent', 0):.1f}% peak "
355 f"(Total: {system_stats.get('memory_total_gb', 0):.1f} GB)"
356 )
357 print(
358 f"Disk usage: {system_stats.get('disk_avg_percent', 0):.1f}% avg "
359 f"(Total: {system_stats.get('disk_total_gb', 0):.1f} GB)"
360 )
362 print("\n===================================")
364 def export_data(self) -> Dict[str, Any]:
365 """
366 Export all collected data.
368 Returns:
369 Dictionary with all collected resource usage data
370 """
371 return {
372 "start_time": self.start_time,
373 "end_time": self.end_time,
374 "sampling_interval": self.sampling_interval,
375 "process_data": self.process_data,
376 "system_data": self.system_data,
377 }
380def check_system_resources() -> Dict[str, Any]:
381 """
382 Check current system resources.
384 Returns:
385 Dictionary with current resource usage information
386 """
387 if not PSUTIL_AVAILABLE:
388 return {"error": "psutil not available", "available": False}
390 try:
391 # Get basic system information
392 cpu_count = psutil.cpu_count(logical=True)
393 cpu_physical = psutil.cpu_count(logical=False)
394 cpu_percent = psutil.cpu_percent(interval=0.1)
396 memory = psutil.virtual_memory()
397 disk = psutil.disk_usage("/")
399 # Format results
400 result = {
401 "available": True,
402 "cpu_count": cpu_count,
403 "cpu_physical": cpu_physical,
404 "cpu_percent": cpu_percent,
405 "memory_total_gb": memory.total / (1024**3),
406 "memory_available_gb": memory.available / (1024**3),
407 "memory_used_gb": memory.used / (1024**3),
408 "memory_percent": memory.percent,
409 "disk_total_gb": disk.total / (1024**3),
410 "disk_free_gb": disk.free / (1024**3),
411 "disk_percent": disk.percent,
412 }
414 return result
416 except Exception as e:
417 logger.exception(f"Error checking system resources: {e!s}")
418 return {"error": str(e), "available": False}