Coverage for src / local_deep_research / utilities / log_utils.py: 96%
131 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Utilities for logging.
3"""
5# Needed for loguru annotations
6from __future__ import annotations
8import inspect
10# import logging - needed for InterceptHandler compatibility
11import logging
12import os
13import queue
14import sys
15import threading
16from functools import wraps
17from typing import Any, Callable
19import loguru
20from flask import g, has_app_context
21from loguru import logger
23from ..config.paths import get_logs_directory
24from ..database.models import ResearchLog
25from ..web.services.socket_service import SocketIOService
27_LOG_DIR = get_logs_directory()
28_LOG_DIR.mkdir(parents=True, exist_ok=True)
30# Thread-safe queue for database logs from background threads
31_log_queue = queue.Queue(maxsize=1000)
32_queue_processor_thread = None
33_stop_queue = threading.Event()
34"""
35Default log directory to use.
36"""
39class InterceptHandler(logging.Handler):
40 """
41 Intercepts logging messages and forwards them to Loguru's logger.
42 """
44 def emit(self, record: logging.LogRecord) -> None:
45 # Get corresponding Loguru level if it exists.
46 try:
47 level: str | int = logger.level(record.levelname).name
48 except ValueError:
49 level = record.levelno
51 # Find caller from where originated the logged message.
52 frame, depth = inspect.currentframe(), 0
53 while frame: 53 ↛ 62line 53 didn't jump to line 62 because the condition on line 53 was always true
54 filename = frame.f_code.co_filename
55 is_logging = filename == logging.__file__
56 is_frozen = "importlib" in filename and "_bootstrap" in filename
57 if depth > 0 and not (is_logging or is_frozen):
58 break
59 frame = frame.f_back
60 depth += 1
62 logger.opt(depth=depth, exception=record.exc_info).log(
63 level, record.getMessage()
64 )
67def log_for_research(
68 to_wrap: Callable[[str, ...], Any],
69) -> Callable[[str, ...], Any]:
70 """
71 Decorator for a function that's part of the research process. It expects the function to
72 take the research ID (UUID) as the first parameter, and configures all log
73 messages made during this request to include the research ID.
75 Args:
76 to_wrap: The function to wrap. Should take the research ID as the first parameter.
78 Returns:
79 The wrapped function.
81 """
83 @wraps(to_wrap)
84 def wrapped(research_id: str, *args: Any, **kwargs: Any) -> Any:
85 g.research_id = research_id
86 result = to_wrap(research_id, *args, **kwargs)
87 g.pop("research_id")
88 return result
90 return wrapped
93def _get_research_id(record=None) -> str | None:
94 """
95 Gets the current research ID (UUID), if present.
97 Args:
98 record: Optional loguru record that might contain bound research_id
100 Returns:
101 The current research ID (UUID), or None if it does not exist.
103 """
104 research_id = None
106 # First check if research_id is bound to the log record
107 if record and "extra" in record and "research_id" in record["extra"]:
108 research_id = record["extra"]["research_id"]
109 # Then check Flask context
110 elif has_app_context():
111 research_id = g.get("research_id")
113 return research_id
116def _process_log_queue():
117 """
118 Process logs from the queue in a dedicated thread with app context.
119 This runs in the main thread context to avoid SQLite thread safety issues.
120 """
121 while not _stop_queue.is_set():
122 try:
123 # Wait for logs with timeout to check stop flag
124 log_entry = _log_queue.get(timeout=0.1)
126 # Skip if no entry
127 if log_entry is None:
128 continue
130 # Write to database if we have app context
131 if has_app_context():
132 _write_log_to_database(log_entry)
133 else:
134 # If no app context, put it back in queue for later
135 try:
136 _log_queue.put_nowait(log_entry)
137 except queue.Full:
138 pass # Drop log if queue is full
140 except queue.Empty:
141 continue
142 except Exception:
143 pass # noqa: silent-exception — must not let logging errors crash the log processor thread
146def _write_log_to_database(log_entry: dict) -> None:
147 """
148 Write a log entry to the database. Should only be called from main thread.
149 """
150 from ..database.session_context import get_user_db_session
152 try:
153 username = log_entry.get("username")
155 with get_user_db_session(username) as db_session:
156 if db_session: 156 ↛ exitline 156 didn't jump to the function exit
157 db_log = ResearchLog(
158 timestamp=log_entry["timestamp"],
159 message=log_entry["message"],
160 module=log_entry["module"],
161 function=log_entry["function"],
162 line_no=log_entry["line_no"],
163 level=log_entry["level"],
164 research_id=log_entry["research_id"],
165 )
166 db_session.add(db_log)
167 db_session.commit()
168 except Exception:
169 pass # noqa: silent-exception — DB errors in the logging path must not propagate or recurse
172def database_sink(message: loguru.Message) -> None:
173 """
174 Sink that saves messages to the database.
175 Queues logs from background threads for later processing.
177 Args:
178 message: The log message to save.
180 """
181 record = message.record
182 research_id = _get_research_id(record)
184 # Create log entry dict
185 log_entry = {
186 "timestamp": record["time"],
187 "message": record["message"],
188 "module": record["name"],
189 "function": record["function"],
190 "line_no": int(record["line"]),
191 "level": record["level"].name,
192 "research_id": research_id,
193 "username": record.get("extra", {}).get("username"),
194 }
196 # Check if we're in a background thread
197 # Note: Socket.IO handlers run in separate threads even with app context
198 if not has_app_context() or threading.current_thread().name != "MainThread":
199 # Queue the log for later processing
200 try:
201 _log_queue.put_nowait(log_entry)
202 except queue.Full:
203 # Drop log if queue is full to avoid blocking
204 pass
205 else:
206 # We're in the main thread with app context - write directly
207 _write_log_to_database(log_entry)
210def frontend_progress_sink(message: loguru.Message) -> None:
211 """
212 Sink that sends messages to the frontend.
214 Args:
215 message: The log message to send.
217 """
218 record = message.record
219 research_id = _get_research_id(record)
220 if research_id is None:
221 # If we don't have a research ID, don't send anything.
222 # Can't use logger here as it causes deadlock
223 return
225 frontend_log = {
226 "log_entry": {
227 "message": record["message"],
228 "type": record["level"].name, # Keep original case
229 "time": record["time"].isoformat(),
230 },
231 }
232 SocketIOService().emit_to_subscribers(
233 "progress", research_id, frontend_log, enable_logging=False
234 )
237def flush_log_queue():
238 """
239 Flush all pending logs from the queue to the database.
240 This should be called from a Flask request context.
241 """
242 flushed = 0
243 while not _log_queue.empty():
244 try:
245 log_entry = _log_queue.get_nowait()
246 _write_log_to_database(log_entry)
247 flushed += 1
248 except queue.Empty:
249 break
250 except Exception:
251 pass # noqa: silent-exception — DB errors during log flush must not propagate
253 if flushed > 0:
254 logger.debug(f"Flushed {flushed} queued log entries to database")
257def config_logger(name: str, debug: bool = False) -> None:
258 """
259 Configures the default logger.
261 Args:
262 name: The name to use for the log file.
263 debug: Whether to enable unsafe debug logging.
265 """
266 from ..security.log_sanitizer import strip_control_chars
268 def _sanitize_record(record):
269 record["message"] = strip_control_chars(record["message"])
271 logger.configure(patcher=_sanitize_record)
273 logger.enable("local_deep_research")
274 logger.remove()
276 # Log to console (stderr) and database
277 stderr_level = "DEBUG" if debug else "INFO"
278 logger.add(sys.stderr, level=stderr_level, diagnose=debug)
279 logger.add(database_sink, level="DEBUG", diagnose=debug)
280 logger.add(frontend_progress_sink, diagnose=debug)
282 if debug:
283 logger.warning(
284 "DEBUG logging is enabled (LDR_APP_DEBUG=true). "
285 "Logs may contain sensitive data (queries, answers, API responses). "
286 "Do NOT use in production."
287 )
289 # Optionally log to file if enabled (disabled by default for security)
290 # Check environment variable first, then database setting
291 enable_file_logging = (
292 os.environ.get("LDR_ENABLE_FILE_LOGGING", "").lower() == "true"
293 )
295 # File logging is controlled only by environment variable for simplicity
296 # Database settings are not available at logger initialization time
298 if enable_file_logging:
299 log_file = _LOG_DIR / f"{name}.log"
300 logger.add(
301 log_file,
302 level="DEBUG",
303 rotation="10 MB",
304 retention="7 days",
305 compression="zip",
306 diagnose=debug,
307 )
308 logger.warning(
309 f"File logging enabled - logs will be written to {log_file}. "
310 "WARNING: Log files are unencrypted and may contain sensitive data!"
311 )
313 # Add a special log level for milestones.
314 try:
315 logger.level("MILESTONE", no=26, color="<magenta><bold>")
316 except ValueError:
317 # Level already exists, that's fine
318 pass