Coverage for src/local_deep_research/utilities/log_utils.py: 83%
217 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Utilities for logging.
3"""
5# Needed for loguru annotations
6from __future__ import annotations
8import inspect
10# import logging - needed for InterceptHandler compatibility
11import logging
12import os
13import queue
14import sys
15import threading
16from functools import wraps
17from typing import Any, Callable
19import loguru
20from flask import g, has_app_context
21from loguru import logger
23from ..config.paths import get_logs_directory
24from ..database.models import ResearchLog
25from ..web.services.socket_service import SocketIOService
27_LOG_DIR = get_logs_directory()
28_LOG_DIR.mkdir(parents=True, exist_ok=True)
30# Thread-safe queue for database logs from background threads
31_log_queue = queue.Queue(maxsize=1000)
32_queue_processor_thread = None
33_queue_processor_lock = threading.Lock()
34_stop_queue = threading.Event()
35"""
36Default log directory to use.
37"""
39# Cap how much of a single log record's message we ship to the browser over
40# socket.io. Some diagnostic log lines (e.g. ``[FETCH] page_text``) inline
41# the full extracted page body — up to ~10 KB per call — which is useless
42# in the UI (a single massive blob fills the viewport) and inflates both
43# wire traffic and client-side state. Container-log/stderr, file, and DB
44# sinks remain unchanged, so full diagnostics are preserved for grep/DB
45# queries. The cap bounds the *prefix* preserved from the original message;
46# the wire payload is the prefix plus a short truncation indicator (~100
47# bytes), so it can exceed this value by that fixed overhead.
48FRONTEND_MESSAGE_MAX_LENGTH = 2000
51class InterceptHandler(logging.Handler):
52 """
53 Intercepts logging messages and forwards them to Loguru's logger.
54 """
56 def emit(self, record: logging.LogRecord) -> None:
57 # Get corresponding Loguru level if it exists.
58 try:
59 level: str | int = logger.level(record.levelname).name
60 except ValueError:
61 level = record.levelno
63 # Find caller from where originated the logged message.
64 frame, depth = inspect.currentframe(), 0
65 while frame: 65 ↛ 74line 65 didn't jump to line 74 because the condition on line 65 was always true
66 filename = frame.f_code.co_filename
67 is_logging = filename == logging.__file__
68 is_frozen = "importlib" in filename and "_bootstrap" in filename
69 if depth > 0 and not (is_logging or is_frozen):
70 break
71 frame = frame.f_back
72 depth += 1
74 logger.opt(depth=depth, exception=record.exc_info).log(
75 level, record.getMessage()
76 )
79def log_for_research(
80 to_wrap: Callable[[str, ...], Any],
81) -> Callable[[str, ...], Any]:
82 """
83 Decorator for a function that's part of the research process. It expects the function to
84 take the research ID (UUID) as the first parameter, and configures all log
85 messages made during this request to include the research ID.
87 Args:
88 to_wrap: The function to wrap. Should take the research ID as the first parameter.
90 Returns:
91 The wrapped function.
93 """
95 @wraps(to_wrap)
96 def wrapped(research_id: str, *args: Any, **kwargs: Any) -> Any:
97 g.research_id = research_id
98 result = to_wrap(research_id, *args, **kwargs)
99 g.pop("research_id")
100 return result
102 return wrapped
105def _get_research_context_fallback() -> dict | None:
106 """Read the per-thread research context, if any.
108 Used as a fallback when individual log calls don't bind research_id/
109 username via ``logger.bind``. The research thread sets this once at
110 startup via ``set_search_context``, so every subsequent log call from
111 the same thread picks up research_id, username, and user_password
112 automatically — without requiring every call site to remember to bind.
113 """
114 try:
115 from .thread_context import get_search_context
117 return get_search_context()
118 except Exception:
119 return None
122def _get_research_id(record=None) -> str | None:
123 """
124 Gets the current research ID (UUID), if present.
126 Args:
127 record: Optional loguru record that might contain bound research_id
129 Returns:
130 The current research ID (UUID), or None if it does not exist.
132 """
133 # First check if research_id is bound to the log record
134 if record and "extra" in record and "research_id" in record["extra"]:
135 return record["extra"]["research_id"]
136 # Then check Flask context
137 if has_app_context():
138 gid = g.get("research_id")
139 if gid:
140 return gid
141 # Fall back to per-thread research context — research-thread logger
142 # calls without an explicit bind still get attributed correctly.
143 ctx = _get_research_context_fallback()
144 if ctx: 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true
145 return ctx.get("research_id")
146 return None
149# Counters for swallowed exceptions in the logging path. Bare except: pass
150# is required here (logging errors must not propagate or recurse), but we
151# write a stderr line on each new occurrence so silent failures aren't
152# invisible — only active when LDR_APP_DEBUG=true so production stderr
153# stays clean.
154_silent_exc_counts: dict[str, int] = {}
157def _report_silent_exception(
158 where: str,
159 exc_type_name: str,
160 username: str | None = None,
161 research_id: str | None = None,
162 level: str | None = None,
163) -> None:
164 """Surface a swallowed logging-path exception to stderr.
166 Bypasses ``logger`` to avoid recursing back through ``database_sink``.
167 Rate-limited to first occurrence + every 100th repeat for the same
168 ``where`` key, so a persistent failure mode doesn't flood the console.
170 Note: takes the exception's TYPE NAME as a plain string (not the
171 exception object). The caller does ``type(exc).__name__`` and passes
172 the result. This is deliberate — CodeQL's taint analyzer treats any
173 function frame holding a ``BaseException`` captured from a password-
174 bearing call site as tainted, and flags every stderr write inside
175 that frame. Receiving only a type-name string severs the flow at
176 the boundary.
177 """
178 if os.environ.get("LDR_APP_DEBUG", "").lower() not in ("1", "true", "yes"): 178 ↛ 180line 178 didn't jump to line 180 because the condition on line 178 was always true
179 return
180 n = _silent_exc_counts.get(where, 0) + 1
181 _silent_exc_counts[where] = n
182 if n != 1 and n % 100 != 0:
183 return
184 parts = []
185 if username is not None:
186 parts.append(f"username={username!r}")
187 if research_id is not None:
188 parts.append(f"research_id={research_id!r}")
189 if level is not None:
190 parts.append(f"level={level!r}")
191 ctx = " ".join(parts)
192 # CodeQL's py/clear-text-logging-sensitive-data may flag this stderr
193 # write because the function frame is reachable from
194 # _write_log_to_database which holds user_password locally. The data
195 # actually written is only plain strings — `where` (literal),
196 # `exc_type_name` (`type(exc).__name__`), and `username/research_id/level`
197 # repr'd from the queue entry. No password value ever reaches the
198 # formatter; the helper signature deliberately accepts only typed
199 # primitives. If CodeQL flags this line, dismiss the alert as a
200 # false positive in the Security tab with that justification.
201 sys.stderr.write(
202 f"[log-utils] {where} swallowed (count={n}): "
203 f"{exc_type_name}{(' ' + ctx) if ctx else ''}\n"
204 )
205 sys.stderr.flush()
208def _process_log_queue():
209 """
210 Process logs from the queue in a dedicated daemon thread.
212 Safe to run off the main thread: ``_write_log_to_database`` uses
213 ``get_user_db_session`` which yields a thread-local SQLAlchemy session,
214 and the underlying SQLite engines are opened with
215 ``check_same_thread=False``.
216 """
217 while not _stop_queue.is_set():
218 try:
219 # Wait for logs with timeout to check stop flag
220 log_entry = _log_queue.get(timeout=0.1)
222 # Skip if no entry
223 if log_entry is None:
224 continue
226 # Write to database if we have app context
227 if has_app_context():
228 _write_log_to_database(log_entry)
229 else:
230 # If no app context, put it back in queue for later
231 try:
232 _log_queue.put_nowait(log_entry)
233 except queue.Full:
234 pass # Drop log if queue is full
236 except queue.Empty:
237 continue
238 except Exception as exc:
239 # noqa: silent-exception — must not let logging errors crash the log processor thread.
240 # Wrap the report itself: if stderr is closed (broken pipe etc.)
241 # an IOError from inside an except handler propagates and would
242 # kill the daemon thread, silently breaking all subsequent log
243 # persistence for the rest of the process lifetime.
244 try:
245 _report_silent_exception(
246 "process_log_queue", type(exc).__name__
247 )
248 except Exception:
249 pass # noqa: silent-exception — broken stderr must not kill the daemon
252def _write_log_to_database(log_entry: dict) -> None:
253 """
254 Write a log entry to the database. Should only be called from main thread.
255 """
256 from ..database.session_context import get_user_db_session
258 try:
259 username = log_entry.get("username")
260 # Captured in the emitting thread (database_sink) from
261 # ContextVar storage; the queue daemon thread can't read that itself.
262 pw = log_entry.get("user_password") # gitleaks:allow
264 with get_user_db_session(
265 username, password=pw
266 ) as db_session: # gitleaks:allow
267 if db_session: 267 ↛ exitline 267 didn't jump to the function exit
268 db_log = ResearchLog(
269 timestamp=log_entry["timestamp"],
270 message=log_entry["message"],
271 module=log_entry["module"],
272 function=log_entry["function"],
273 line_no=log_entry["line_no"],
274 level=log_entry["level"],
275 research_id=log_entry["research_id"],
276 )
277 db_session.add(db_log)
278 db_session.commit()
279 except Exception as exc:
280 # noqa: silent-exception — DB errors in the logging path must not propagate or recurse.
281 # Wrap the report itself so a broken-stderr IOError can't escape and
282 # be re-caught by an outer logging-aware handler somewhere upstream.
283 try:
284 _report_silent_exception(
285 "write_log_to_database",
286 type(exc).__name__,
287 username=log_entry.get("username"),
288 research_id=log_entry.get("research_id"),
289 level=log_entry.get("level"),
290 )
291 except Exception:
292 pass # noqa: silent-exception — broken stderr must not bubble out of logging path
295def database_sink(message: loguru.Message) -> None:
296 """
297 Sink that saves messages to the database.
298 Queues logs from background threads for later processing.
300 Args:
301 message: The log message to save.
303 """
304 record = message.record
305 research_id = _get_research_id(record)
307 # Resolve username + password. The queue daemon thread can't read the
308 # research thread's ContextVar storage and has no Flask request
309 # context, so we capture both here in the emitting thread.
310 #
311 # Source priority:
312 # 1. logger.bind(...) extras on the record itself
313 # 2. per-thread research context (set once when the research thread
314 # starts, so every subsequent log call inherits it without
315 # requiring an explicit bind)
316 # 3. Flask request session (for request-handler threads that
317 # tagged research_id via the @log_for_research decorator but
318 # didn't bind username — common for /api/research/<id>/* routes)
319 username = record.get("extra", {}).get("username")
320 user_password = None
321 ctx = _get_research_context_fallback()
322 if ctx:
323 if not username: 323 ↛ 325line 323 didn't jump to line 325 because the condition on line 323 was always true
324 username = ctx.get("username")
325 user_password = ctx.get("user_password")
326 # Only consult Flask request state when the log already has a
327 # research_id. ResearchLog is research-scoped by design — auth and
328 # other system DEBUG logs (research_id=None) don't belong there. If
329 # we attached a username to them via flask_session, they'd just churn
330 # through the queue and fail at the daemon (where the encrypted DB
331 # may not even be open yet for that user — e.g. right after a server
332 # restart with a still-valid session cookie).
333 if research_id is not None and has_app_context():
334 try:
335 from flask import session as flask_session, has_request_context
337 if not username and has_request_context(): 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true
338 username = flask_session.get("username")
339 # Password is set on g.user_password by the request middleware
340 # after authentication. The daemon thread can't read this, so
341 # capture it here in the request thread.
342 if not user_password and hasattr(g, "user_password"): 342 ↛ 350line 342 didn't jump to line 350 because the condition on line 342 was always true
343 user_password = g.user_password
344 except Exception:
345 pass # noqa: silent-exception — must not fail logging on session lookup
347 # Skip persistence for system logs that have no research context.
348 # These can't be written to any per-user encrypted DB and would just
349 # churn through the queue + daemon for no useful end state.
350 if research_id is None and username is None:
351 return
353 # Create log entry dict
354 log_entry = {
355 "timestamp": record["time"],
356 "message": record["message"],
357 "module": record["name"],
358 "function": record["function"],
359 "line_no": int(record["line"]),
360 "level": record["level"].name,
361 "research_id": research_id,
362 "username": username,
363 "user_password": user_password,
364 }
366 # Check if we're in a background thread
367 # Note: Socket.IO handlers run in separate threads even with app context
368 if not has_app_context() or threading.current_thread().name != "MainThread":
369 # Queue the log for later processing
370 try:
371 _log_queue.put_nowait(log_entry)
372 except queue.Full:
373 # Drop log if queue is full to avoid blocking
374 pass
375 else:
376 # We're in the main thread with app context - write directly
377 _write_log_to_database(log_entry)
380def _truncate_for_frontend(message: str) -> str:
381 """Bound the wire size of an outbound log message.
383 ``FRONTEND_MESSAGE_MAX_LENGTH`` caps the *preserved prefix* of the
384 original message. When truncation kicks in, a short indicator is
385 appended that names the original length and points the user at the
386 server-side logs for the full text, so the returned string is
387 ``FRONTEND_MESSAGE_MAX_LENGTH`` plus the fixed indicator overhead
388 (~100 bytes). Verbose diagnostic logs (e.g. ``[FETCH] page_text``
389 which inlines the full extracted page body) are useless in the UI
390 when displayed in full and inflate socket payloads + client-side
391 memory; container-log/stderr, file, and DB sinks remain unchanged.
392 """
393 if len(message) <= FRONTEND_MESSAGE_MAX_LENGTH:
394 return message
395 suffix = (
396 f"… (truncated; full message in server logs; "
397 f"original length: {len(message)} chars)"
398 )
399 return message[:FRONTEND_MESSAGE_MAX_LENGTH] + suffix
402def frontend_progress_sink(message: loguru.Message) -> None:
403 """
404 Sink that sends messages to the frontend.
406 Args:
407 message: The log message to send.
409 """
410 record = message.record
411 research_id = _get_research_id(record)
412 if research_id is None:
413 # If we don't have a research ID, don't send anything.
414 # Can't use logger here as it causes deadlock
415 return
417 frontend_log = {
418 "log_entry": {
419 "message": _truncate_for_frontend(record["message"]),
420 "type": record["level"].name, # Keep original case
421 "time": record["time"].isoformat(),
422 },
423 }
424 SocketIOService().emit_to_subscribers(
425 "progress", research_id, frontend_log, enable_logging=False
426 )
429def flush_log_queue():
430 """
431 Drain all pending logs from the queue to the database.
433 Used at process exit (see ``flush_logs_on_exit`` in ``web/app.py``) to
434 drain whatever the background daemon did not get to before it was
435 stopped. The request path no longer calls this — the
436 ``start_log_queue_processor`` daemon handles steady-state drainage so
437 requests never block on DB writes.
438 """
439 flushed = 0
440 while not _log_queue.empty():
441 try:
442 log_entry = _log_queue.get_nowait()
443 _write_log_to_database(log_entry)
444 flushed += 1
445 except queue.Empty:
446 break
447 except Exception:
448 pass # noqa: silent-exception — DB errors during log flush must not propagate
450 if flushed > 0:
451 logger.debug(f"Flushed {flushed} queued log entries to database")
454def start_log_queue_processor(app) -> threading.Thread:
455 """Start the background daemon that drains the log queue into the DB.
457 Runs ``_process_log_queue`` inside an application context so writes
458 have a Flask context, and so the daemon can work independently of
459 any in-flight request. Idempotent: calling twice is a no-op.
461 Args:
462 app: The Flask app whose context the daemon should push.
464 Returns:
465 The daemon thread (running).
466 """
467 global _queue_processor_thread
468 with _queue_processor_lock:
469 if (
470 _queue_processor_thread is not None
471 and _queue_processor_thread.is_alive()
472 ):
473 return _queue_processor_thread
475 _stop_queue.clear()
477 def _run():
478 # Push an app context for the lifetime of the daemon so
479 # the queue processor can call into DB code that requires
480 # Flask g.
481 with app.app_context():
482 _process_log_queue()
484 _queue_processor_thread = threading.Thread(
485 target=_run,
486 name="log-queue-processor",
487 daemon=True,
488 )
489 _queue_processor_thread.start()
490 thread = _queue_processor_thread
491 logger.info("Log queue processor daemon started")
492 return thread
495def stop_log_queue_processor(timeout: float = 2.0) -> None:
496 """Signal the log queue processor to stop and wait briefly for it."""
497 global _queue_processor_thread
498 _stop_queue.set()
499 with _queue_processor_lock:
500 thread = _queue_processor_thread
501 if thread is not None: 501 ↛ exitline 501 didn't return from function 'stop_log_queue_processor' because the condition on line 501 was always true
502 thread.join(timeout=timeout)
503 # Only clear the reference if the thread actually exited. If join
504 # timed out the daemon is still running, and clearing the ref would
505 # let a subsequent start_log_queue_processor() spawn a second
506 # daemon that drains the same queue concurrently. Re-check identity
507 # under the lock so we don't accidentally null out a fresh thread
508 # that another start spawned in the meantime.
509 if not thread.is_alive():
510 with _queue_processor_lock:
511 if _queue_processor_thread is thread: 511 ↛ exitline 511 didn't jump to the function exit
512 _queue_processor_thread = None
515def config_logger(name: str, debug: bool = False) -> None:
516 """
517 Configures the default logger.
519 Args:
520 name: The name to use for the log file.
521 debug: Whether to enable unsafe debug logging.
523 """
524 from ..security.log_sanitizer import strip_control_chars
526 def _sanitize_record(record):
527 record["message"] = strip_control_chars(record["message"])
529 logger.configure(patcher=_sanitize_record)
531 logger.enable("local_deep_research")
532 logger.remove()
534 # Log to console (stderr) and database
535 stderr_level = "DEBUG" if debug else "INFO"
537 # loguru's diagnose=True renders repr() of every local variable in every
538 # traceback frame on exceptions. Under LDR_APP_DEBUG that would dump
539 # credentials living in frame locals (api_key, SQLCipher password,
540 # Authorization headers) into every sink. Gate diagnose behind a separate
541 # explicit opt-in so enabling LDR_APP_DEBUG for general debug output does
542 # not also enable localvar dumps. Default OFF even when debug is on.
543 diagnose = debug and os.environ.get("LDR_LOGURU_DIAGNOSE", "").lower() in (
544 "1",
545 "true",
546 "yes",
547 )
549 logger.add(sys.stderr, level=stderr_level, diagnose=diagnose)
550 logger.add(database_sink, level="DEBUG", diagnose=diagnose)
551 logger.add(frontend_progress_sink, diagnose=diagnose)
553 if debug:
554 logger.warning(
555 "DEBUG logging is enabled (LDR_APP_DEBUG=true). "
556 "Logs may contain sensitive data (queries, answers, API responses). "
557 "Do NOT use in production."
558 )
560 if diagnose:
561 logger.warning(
562 "LDR_LOGURU_DIAGNOSE is enabled: exception tracebacks will include "
563 "local variable values, which may contain credentials (API keys, "
564 "passwords, tokens). Do NOT use in production."
565 )
567 # Optionally log to file if enabled (disabled by default for security)
568 # Check environment variable first, then database setting
569 enable_file_logging = (
570 os.environ.get("LDR_ENABLE_FILE_LOGGING", "").lower() == "true"
571 )
573 # File logging is controlled only by environment variable for simplicity
574 # Database settings are not available at logger initialization time
576 if enable_file_logging:
577 log_file = _LOG_DIR / f"{name}.log"
578 logger.add(
579 log_file,
580 level="DEBUG",
581 rotation="10 MB",
582 retention="7 days",
583 compression="zip",
584 diagnose=diagnose,
585 )
586 logger.warning(
587 f"File logging enabled - logs will be written to {log_file}. "
588 "WARNING: Log files are unencrypted and may contain sensitive data!"
589 )
591 # Add a special log level for milestones.
592 try:
593 logger.level("MILESTONE", no=26, color="<magenta><bold>")
594 except ValueError:
595 # Level already exists, that's fine
596 pass