Coverage for src / local_deep_research / utilities / log_utils.py: 82%
124 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Utilities for logging.
3"""
5# Needed for loguru annotations
6from __future__ import annotations
8import inspect
10# import logging - needed for InterceptHandler compatibility
11import logging
12import os
13import queue
14import sys
15import threading
16from functools import wraps
17from typing import Any, Callable
19import loguru
20from flask import g, has_app_context
21from loguru import logger
23from ..config.paths import get_logs_directory
24from ..database.models import ResearchLog
25from ..web.services.socket_service import SocketIOService
27_LOG_DIR = get_logs_directory()
28_LOG_DIR.mkdir(parents=True, exist_ok=True)
30# Thread-safe queue for database logs from background threads
31_log_queue = queue.Queue(maxsize=1000)
32_queue_processor_thread = None
33_stop_queue = threading.Event()
34"""
35Default log directory to use.
36"""
39class InterceptHandler(logging.Handler):
40 """
41 Intercepts logging messages and forwards them to Loguru's logger.
42 """
44 def emit(self, record: logging.LogRecord) -> None:
45 # Get corresponding Loguru level if it exists.
46 try:
47 level: str | int = logger.level(record.levelname).name
48 except ValueError:
49 level = record.levelno
51 # Find caller from where originated the logged message.
52 frame, depth = inspect.currentframe(), 0
53 while frame: 53 ↛ 62line 53 didn't jump to line 62 because the condition on line 53 was always true
54 filename = frame.f_code.co_filename
55 is_logging = filename == logging.__file__
56 is_frozen = "importlib" in filename and "_bootstrap" in filename
57 if depth > 0 and not (is_logging or is_frozen):
58 break
59 frame = frame.f_back
60 depth += 1
62 logger.opt(depth=depth, exception=record.exc_info).log(
63 level, record.getMessage()
64 )
67def log_for_research(
68 to_wrap: Callable[[str, ...], Any],
69) -> Callable[[str, ...], Any]:
70 """
71 Decorator for a function that's part of the research process. It expects the function to
72 take the research ID (UUID) as the first parameter, and configures all log
73 messages made during this request to include the research ID.
75 Args:
76 to_wrap: The function to wrap. Should take the research ID as the first parameter.
78 Returns:
79 The wrapped function.
81 """
83 @wraps(to_wrap)
84 def wrapped(research_id: str, *args: Any, **kwargs: Any) -> Any:
85 g.research_id = research_id
86 result = to_wrap(research_id, *args, **kwargs)
87 g.pop("research_id")
88 return result
90 return wrapped
93def _get_research_id(record=None) -> str | None:
94 """
95 Gets the current research ID (UUID), if present.
97 Args:
98 record: Optional loguru record that might contain bound research_id
100 Returns:
101 The current research ID (UUID), or None if it does not exist.
103 """
104 research_id = None
106 # First check if research_id is bound to the log record
107 if record and "extra" in record and "research_id" in record["extra"]:
108 research_id = record["extra"]["research_id"]
109 # Then check Flask context
110 elif has_app_context():
111 research_id = g.get("research_id")
113 return research_id
116def _process_log_queue():
117 """
118 Process logs from the queue in a dedicated thread with app context.
119 This runs in the main thread context to avoid SQLite thread safety issues.
120 """
121 while not _stop_queue.is_set():
122 try:
123 # Wait for logs with timeout to check stop flag
124 log_entry = _log_queue.get(timeout=0.1)
126 # Skip if no entry
127 if log_entry is None:
128 continue
130 # Write to database if we have app context
131 if has_app_context():
132 _write_log_to_database(log_entry)
133 else:
134 # If no app context, put it back in queue for later
135 try:
136 _log_queue.put_nowait(log_entry)
137 except queue.Full:
138 pass # Drop log if queue is full
140 except queue.Empty:
141 continue
142 except Exception:
143 # Don't let logging errors crash the processor
144 pass
147def _write_log_to_database(log_entry: dict) -> None:
148 """
149 Write a log entry to the database. Should only be called from main thread.
150 """
151 from ..database.session_context import get_user_db_session
153 try:
154 username = log_entry.get("username")
156 with get_user_db_session(username) as db_session:
157 if db_session: 157 ↛ exitline 157 didn't jump to the function exit
158 db_log = ResearchLog(
159 timestamp=log_entry["timestamp"],
160 message=log_entry["message"],
161 module=log_entry["module"],
162 function=log_entry["function"],
163 line_no=log_entry["line_no"],
164 level=log_entry["level"],
165 research_id=log_entry["research_id"],
166 )
167 db_session.add(db_log)
168 db_session.commit()
169 except Exception:
170 # Ignore database errors in logging
171 pass
174def database_sink(message: loguru.Message) -> None:
175 """
176 Sink that saves messages to the database.
177 Queues logs from background threads for later processing.
179 Args:
180 message: The log message to save.
182 """
183 record = message.record
184 research_id = _get_research_id(record)
186 # Create log entry dict
187 log_entry = {
188 "timestamp": record["time"],
189 "message": record["message"],
190 "module": record["name"],
191 "function": record["function"],
192 "line_no": int(record["line"]),
193 "level": record["level"].name,
194 "research_id": research_id,
195 "username": record.get("extra", {}).get("username"),
196 }
198 # Check if we're in a background thread
199 # Note: Socket.IO handlers run in separate threads even with app context
200 if not has_app_context() or threading.current_thread().name != "MainThread":
201 # Queue the log for later processing
202 try:
203 _log_queue.put_nowait(log_entry)
204 except queue.Full:
205 # Drop log if queue is full to avoid blocking
206 pass
207 else:
208 # We're in the main thread with app context - write directly
209 _write_log_to_database(log_entry)
212def frontend_progress_sink(message: loguru.Message) -> None:
213 """
214 Sink that sends messages to the frontend.
216 Args:
217 message: The log message to send.
219 """
220 record = message.record
221 research_id = _get_research_id(record)
222 if research_id is None:
223 # If we don't have a research ID, don't send anything.
224 # Can't use logger here as it causes deadlock
225 return
227 frontend_log = dict(
228 log_entry=dict(
229 message=record["message"],
230 type=record["level"].name, # Keep original case
231 time=record["time"].isoformat(),
232 ),
233 )
234 SocketIOService().emit_to_subscribers(
235 "progress", research_id, frontend_log, enable_logging=False
236 )
239def flush_log_queue():
240 """
241 Flush all pending logs from the queue to the database.
242 This should be called from a Flask request context.
243 """
244 flushed = 0
245 while not _log_queue.empty():
246 try:
247 log_entry = _log_queue.get_nowait()
248 _write_log_to_database(log_entry)
249 flushed += 1
250 except queue.Empty:
251 break
252 except Exception:
253 pass
255 if flushed > 0:
256 logger.debug(f"Flushed {flushed} queued log entries to database")
259def config_logger(name: str, debug: bool = False) -> None:
260 """
261 Configures the default logger.
263 Args:
264 name: The name to use for the log file.
265 debug: Whether to enable unsafe debug logging.
267 """
268 logger.enable("local_deep_research")
269 logger.remove()
271 # Log to console (stderr) and database
272 logger.add(sys.stderr, level="INFO", diagnose=debug)
273 logger.add(database_sink, level="DEBUG", diagnose=debug)
274 logger.add(frontend_progress_sink, diagnose=debug)
276 # Optionally log to file if enabled (disabled by default for security)
277 # Check environment variable first, then database setting
278 enable_file_logging = (
279 os.environ.get("LDR_ENABLE_FILE_LOGGING", "").lower() == "true"
280 )
282 # File logging is controlled only by environment variable for simplicity
283 # Database settings are not available at logger initialization time
285 if enable_file_logging:
286 log_file = _LOG_DIR / f"{name}.log"
287 logger.add(
288 log_file,
289 level="DEBUG",
290 rotation="10 MB",
291 retention="7 days",
292 compression="zip",
293 diagnose=debug,
294 )
295 logger.warning(
296 f"File logging enabled - logs will be written to {log_file}. "
297 "WARNING: Log files are unencrypted and may contain sensitive data!"
298 )
300 # Add a special log level for milestones.
301 try:
302 logger.level("MILESTONE", no=26, color="<magenta><bold>")
303 except ValueError:
304 # Level already exists, that's fine
305 pass