Coverage for src/local_deep_research/utilities/log_utils.py: 83%

217 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Utilities for logging. 

3""" 

4 

5# Needed for loguru annotations 

6from __future__ import annotations 

7 

8import inspect 

9 

10# import logging - needed for InterceptHandler compatibility 

11import logging 

12import os 

13import queue 

14import sys 

15import threading 

16from functools import wraps 

17from typing import Any, Callable 

18 

19import loguru 

20from flask import g, has_app_context 

21from loguru import logger 

22 

23from ..config.paths import get_logs_directory 

24from ..database.models import ResearchLog 

25from ..web.services.socket_service import SocketIOService 

26 

27_LOG_DIR = get_logs_directory() 

28_LOG_DIR.mkdir(parents=True, exist_ok=True) 

29 

30# Thread-safe queue for database logs from background threads 

31_log_queue = queue.Queue(maxsize=1000) 

32_queue_processor_thread = None 

33_queue_processor_lock = threading.Lock() 

34_stop_queue = threading.Event() 

35""" 

36Default log directory to use. 

37""" 

38 

39# Cap how much of a single log record's message we ship to the browser over 

40# socket.io. Some diagnostic log lines (e.g. ``[FETCH] page_text``) inline 

41# the full extracted page body — up to ~10 KB per call — which is useless 

42# in the UI (a single massive blob fills the viewport) and inflates both 

43# wire traffic and client-side state. Container-log/stderr, file, and DB 

44# sinks remain unchanged, so full diagnostics are preserved for grep/DB 

45# queries. The cap bounds the *prefix* preserved from the original message; 

46# the wire payload is the prefix plus a short truncation indicator (~100 

47# bytes), so it can exceed this value by that fixed overhead. 

48FRONTEND_MESSAGE_MAX_LENGTH = 2000 

49 

50 

51class InterceptHandler(logging.Handler): 

52 """ 

53 Intercepts logging messages and forwards them to Loguru's logger. 

54 """ 

55 

56 def emit(self, record: logging.LogRecord) -> None: 

57 # Get corresponding Loguru level if it exists. 

58 try: 

59 level: str | int = logger.level(record.levelname).name 

60 except ValueError: 

61 level = record.levelno 

62 

63 # Find caller from where originated the logged message. 

64 frame, depth = inspect.currentframe(), 0 

65 while frame: 65 ↛ 74line 65 didn't jump to line 74 because the condition on line 65 was always true

66 filename = frame.f_code.co_filename 

67 is_logging = filename == logging.__file__ 

68 is_frozen = "importlib" in filename and "_bootstrap" in filename 

69 if depth > 0 and not (is_logging or is_frozen): 

70 break 

71 frame = frame.f_back 

72 depth += 1 

73 

74 logger.opt(depth=depth, exception=record.exc_info).log( 

75 level, record.getMessage() 

76 ) 

77 

78 

79def log_for_research( 

80 to_wrap: Callable[[str, ...], Any], 

81) -> Callable[[str, ...], Any]: 

82 """ 

83 Decorator for a function that's part of the research process. It expects the function to 

84 take the research ID (UUID) as the first parameter, and configures all log 

85 messages made during this request to include the research ID. 

86 

87 Args: 

88 to_wrap: The function to wrap. Should take the research ID as the first parameter. 

89 

90 Returns: 

91 The wrapped function. 

92 

93 """ 

94 

95 @wraps(to_wrap) 

96 def wrapped(research_id: str, *args: Any, **kwargs: Any) -> Any: 

97 g.research_id = research_id 

98 result = to_wrap(research_id, *args, **kwargs) 

99 g.pop("research_id") 

100 return result 

101 

102 return wrapped 

103 

104 

105def _get_research_context_fallback() -> dict | None: 

106 """Read the per-thread research context, if any. 

107 

108 Used as a fallback when individual log calls don't bind research_id/ 

109 username via ``logger.bind``. The research thread sets this once at 

110 startup via ``set_search_context``, so every subsequent log call from 

111 the same thread picks up research_id, username, and user_password 

112 automatically — without requiring every call site to remember to bind. 

113 """ 

114 try: 

115 from .thread_context import get_search_context 

116 

117 return get_search_context() 

118 except Exception: 

119 return None 

120 

121 

122def _get_research_id(record=None) -> str | None: 

123 """ 

124 Gets the current research ID (UUID), if present. 

125 

126 Args: 

127 record: Optional loguru record that might contain bound research_id 

128 

129 Returns: 

130 The current research ID (UUID), or None if it does not exist. 

131 

132 """ 

133 # First check if research_id is bound to the log record 

134 if record and "extra" in record and "research_id" in record["extra"]: 

135 return record["extra"]["research_id"] 

136 # Then check Flask context 

137 if has_app_context(): 

138 gid = g.get("research_id") 

139 if gid: 

140 return gid 

141 # Fall back to per-thread research context — research-thread logger 

142 # calls without an explicit bind still get attributed correctly. 

143 ctx = _get_research_context_fallback() 

144 if ctx: 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true

145 return ctx.get("research_id") 

146 return None 

147 

148 

149# Counters for swallowed exceptions in the logging path. Bare except: pass 

150# is required here (logging errors must not propagate or recurse), but we 

151# write a stderr line on each new occurrence so silent failures aren't 

152# invisible — only active when LDR_APP_DEBUG=true so production stderr 

153# stays clean. 

154_silent_exc_counts: dict[str, int] = {} 

155 

156 

157def _report_silent_exception( 

158 where: str, 

159 exc_type_name: str, 

160 username: str | None = None, 

161 research_id: str | None = None, 

162 level: str | None = None, 

163) -> None: 

164 """Surface a swallowed logging-path exception to stderr. 

165 

166 Bypasses ``logger`` to avoid recursing back through ``database_sink``. 

167 Rate-limited to first occurrence + every 100th repeat for the same 

168 ``where`` key, so a persistent failure mode doesn't flood the console. 

169 

170 Note: takes the exception's TYPE NAME as a plain string (not the 

171 exception object). The caller does ``type(exc).__name__`` and passes 

172 the result. This is deliberate — CodeQL's taint analyzer treats any 

173 function frame holding a ``BaseException`` captured from a password- 

174 bearing call site as tainted, and flags every stderr write inside 

175 that frame. Receiving only a type-name string severs the flow at 

176 the boundary. 

177 """ 

178 if os.environ.get("LDR_APP_DEBUG", "").lower() not in ("1", "true", "yes"): 178 ↛ 180line 178 didn't jump to line 180 because the condition on line 178 was always true

179 return 

180 n = _silent_exc_counts.get(where, 0) + 1 

181 _silent_exc_counts[where] = n 

182 if n != 1 and n % 100 != 0: 

183 return 

184 parts = [] 

185 if username is not None: 

186 parts.append(f"username={username!r}") 

187 if research_id is not None: 

188 parts.append(f"research_id={research_id!r}") 

189 if level is not None: 

190 parts.append(f"level={level!r}") 

191 ctx = " ".join(parts) 

192 # CodeQL's py/clear-text-logging-sensitive-data may flag this stderr 

193 # write because the function frame is reachable from 

194 # _write_log_to_database which holds user_password locally. The data 

195 # actually written is only plain strings — `where` (literal), 

196 # `exc_type_name` (`type(exc).__name__`), and `username/research_id/level` 

197 # repr'd from the queue entry. No password value ever reaches the 

198 # formatter; the helper signature deliberately accepts only typed 

199 # primitives. If CodeQL flags this line, dismiss the alert as a 

200 # false positive in the Security tab with that justification. 

201 sys.stderr.write( 

202 f"[log-utils] {where} swallowed (count={n}): " 

203 f"{exc_type_name}{(' ' + ctx) if ctx else ''}\n" 

204 ) 

205 sys.stderr.flush() 

206 

207 

208def _process_log_queue(): 

209 """ 

210 Process logs from the queue in a dedicated daemon thread. 

211 

212 Safe to run off the main thread: ``_write_log_to_database`` uses 

213 ``get_user_db_session`` which yields a thread-local SQLAlchemy session, 

214 and the underlying SQLite engines are opened with 

215 ``check_same_thread=False``. 

216 """ 

217 while not _stop_queue.is_set(): 

218 try: 

219 # Wait for logs with timeout to check stop flag 

220 log_entry = _log_queue.get(timeout=0.1) 

221 

222 # Skip if no entry 

223 if log_entry is None: 

224 continue 

225 

226 # Write to database if we have app context 

227 if has_app_context(): 

228 _write_log_to_database(log_entry) 

229 else: 

230 # If no app context, put it back in queue for later 

231 try: 

232 _log_queue.put_nowait(log_entry) 

233 except queue.Full: 

234 pass # Drop log if queue is full 

235 

236 except queue.Empty: 

237 continue 

238 except Exception as exc: 

239 # noqa: silent-exception — must not let logging errors crash the log processor thread. 

240 # Wrap the report itself: if stderr is closed (broken pipe etc.) 

241 # an IOError from inside an except handler propagates and would 

242 # kill the daemon thread, silently breaking all subsequent log 

243 # persistence for the rest of the process lifetime. 

244 try: 

245 _report_silent_exception( 

246 "process_log_queue", type(exc).__name__ 

247 ) 

248 except Exception: 

249 pass # noqa: silent-exception — broken stderr must not kill the daemon 

250 

251 

252def _write_log_to_database(log_entry: dict) -> None: 

253 """ 

254 Write a log entry to the database. Should only be called from main thread. 

255 """ 

256 from ..database.session_context import get_user_db_session 

257 

258 try: 

259 username = log_entry.get("username") 

260 # Captured in the emitting thread (database_sink) from 

261 # ContextVar storage; the queue daemon thread can't read that itself. 

262 pw = log_entry.get("user_password") # gitleaks:allow 

263 

264 with get_user_db_session( 

265 username, password=pw 

266 ) as db_session: # gitleaks:allow 

267 if db_session: 267 ↛ exitline 267 didn't jump to the function exit

268 db_log = ResearchLog( 

269 timestamp=log_entry["timestamp"], 

270 message=log_entry["message"], 

271 module=log_entry["module"], 

272 function=log_entry["function"], 

273 line_no=log_entry["line_no"], 

274 level=log_entry["level"], 

275 research_id=log_entry["research_id"], 

276 ) 

277 db_session.add(db_log) 

278 db_session.commit() 

279 except Exception as exc: 

280 # noqa: silent-exception — DB errors in the logging path must not propagate or recurse. 

281 # Wrap the report itself so a broken-stderr IOError can't escape and 

282 # be re-caught by an outer logging-aware handler somewhere upstream. 

283 try: 

284 _report_silent_exception( 

285 "write_log_to_database", 

286 type(exc).__name__, 

287 username=log_entry.get("username"), 

288 research_id=log_entry.get("research_id"), 

289 level=log_entry.get("level"), 

290 ) 

291 except Exception: 

292 pass # noqa: silent-exception — broken stderr must not bubble out of logging path 

293 

294 

295def database_sink(message: loguru.Message) -> None: 

296 """ 

297 Sink that saves messages to the database. 

298 Queues logs from background threads for later processing. 

299 

300 Args: 

301 message: The log message to save. 

302 

303 """ 

304 record = message.record 

305 research_id = _get_research_id(record) 

306 

307 # Resolve username + password. The queue daemon thread can't read the 

308 # research thread's ContextVar storage and has no Flask request 

309 # context, so we capture both here in the emitting thread. 

310 # 

311 # Source priority: 

312 # 1. logger.bind(...) extras on the record itself 

313 # 2. per-thread research context (set once when the research thread 

314 # starts, so every subsequent log call inherits it without 

315 # requiring an explicit bind) 

316 # 3. Flask request session (for request-handler threads that 

317 # tagged research_id via the @log_for_research decorator but 

318 # didn't bind username — common for /api/research/<id>/* routes) 

319 username = record.get("extra", {}).get("username") 

320 user_password = None 

321 ctx = _get_research_context_fallback() 

322 if ctx: 

323 if not username: 323 ↛ 325line 323 didn't jump to line 325 because the condition on line 323 was always true

324 username = ctx.get("username") 

325 user_password = ctx.get("user_password") 

326 # Only consult Flask request state when the log already has a 

327 # research_id. ResearchLog is research-scoped by design — auth and 

328 # other system DEBUG logs (research_id=None) don't belong there. If 

329 # we attached a username to them via flask_session, they'd just churn 

330 # through the queue and fail at the daemon (where the encrypted DB 

331 # may not even be open yet for that user — e.g. right after a server 

332 # restart with a still-valid session cookie). 

333 if research_id is not None and has_app_context(): 

334 try: 

335 from flask import session as flask_session, has_request_context 

336 

337 if not username and has_request_context(): 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true

338 username = flask_session.get("username") 

339 # Password is set on g.user_password by the request middleware 

340 # after authentication. The daemon thread can't read this, so 

341 # capture it here in the request thread. 

342 if not user_password and hasattr(g, "user_password"): 342 ↛ 350line 342 didn't jump to line 350 because the condition on line 342 was always true

343 user_password = g.user_password 

344 except Exception: 

345 pass # noqa: silent-exception — must not fail logging on session lookup 

346 

347 # Skip persistence for system logs that have no research context. 

348 # These can't be written to any per-user encrypted DB and would just 

349 # churn through the queue + daemon for no useful end state. 

350 if research_id is None and username is None: 

351 return 

352 

353 # Create log entry dict 

354 log_entry = { 

355 "timestamp": record["time"], 

356 "message": record["message"], 

357 "module": record["name"], 

358 "function": record["function"], 

359 "line_no": int(record["line"]), 

360 "level": record["level"].name, 

361 "research_id": research_id, 

362 "username": username, 

363 "user_password": user_password, 

364 } 

365 

366 # Check if we're in a background thread 

367 # Note: Socket.IO handlers run in separate threads even with app context 

368 if not has_app_context() or threading.current_thread().name != "MainThread": 

369 # Queue the log for later processing 

370 try: 

371 _log_queue.put_nowait(log_entry) 

372 except queue.Full: 

373 # Drop log if queue is full to avoid blocking 

374 pass 

375 else: 

376 # We're in the main thread with app context - write directly 

377 _write_log_to_database(log_entry) 

378 

379 

380def _truncate_for_frontend(message: str) -> str: 

381 """Bound the wire size of an outbound log message. 

382 

383 ``FRONTEND_MESSAGE_MAX_LENGTH`` caps the *preserved prefix* of the 

384 original message. When truncation kicks in, a short indicator is 

385 appended that names the original length and points the user at the 

386 server-side logs for the full text, so the returned string is 

387 ``FRONTEND_MESSAGE_MAX_LENGTH`` plus the fixed indicator overhead 

388 (~100 bytes). Verbose diagnostic logs (e.g. ``[FETCH] page_text`` 

389 which inlines the full extracted page body) are useless in the UI 

390 when displayed in full and inflate socket payloads + client-side 

391 memory; container-log/stderr, file, and DB sinks remain unchanged. 

392 """ 

393 if len(message) <= FRONTEND_MESSAGE_MAX_LENGTH: 

394 return message 

395 suffix = ( 

396 f"… (truncated; full message in server logs; " 

397 f"original length: {len(message)} chars)" 

398 ) 

399 return message[:FRONTEND_MESSAGE_MAX_LENGTH] + suffix 

400 

401 

402def frontend_progress_sink(message: loguru.Message) -> None: 

403 """ 

404 Sink that sends messages to the frontend. 

405 

406 Args: 

407 message: The log message to send. 

408 

409 """ 

410 record = message.record 

411 research_id = _get_research_id(record) 

412 if research_id is None: 

413 # If we don't have a research ID, don't send anything. 

414 # Can't use logger here as it causes deadlock 

415 return 

416 

417 frontend_log = { 

418 "log_entry": { 

419 "message": _truncate_for_frontend(record["message"]), 

420 "type": record["level"].name, # Keep original case 

421 "time": record["time"].isoformat(), 

422 }, 

423 } 

424 SocketIOService().emit_to_subscribers( 

425 "progress", research_id, frontend_log, enable_logging=False 

426 ) 

427 

428 

429def flush_log_queue(): 

430 """ 

431 Drain all pending logs from the queue to the database. 

432 

433 Used at process exit (see ``flush_logs_on_exit`` in ``web/app.py``) to 

434 drain whatever the background daemon did not get to before it was 

435 stopped. The request path no longer calls this — the 

436 ``start_log_queue_processor`` daemon handles steady-state drainage so 

437 requests never block on DB writes. 

438 """ 

439 flushed = 0 

440 while not _log_queue.empty(): 

441 try: 

442 log_entry = _log_queue.get_nowait() 

443 _write_log_to_database(log_entry) 

444 flushed += 1 

445 except queue.Empty: 

446 break 

447 except Exception: 

448 pass # noqa: silent-exception — DB errors during log flush must not propagate 

449 

450 if flushed > 0: 

451 logger.debug(f"Flushed {flushed} queued log entries to database") 

452 

453 

454def start_log_queue_processor(app) -> threading.Thread: 

455 """Start the background daemon that drains the log queue into the DB. 

456 

457 Runs ``_process_log_queue`` inside an application context so writes 

458 have a Flask context, and so the daemon can work independently of 

459 any in-flight request. Idempotent: calling twice is a no-op. 

460 

461 Args: 

462 app: The Flask app whose context the daemon should push. 

463 

464 Returns: 

465 The daemon thread (running). 

466 """ 

467 global _queue_processor_thread 

468 with _queue_processor_lock: 

469 if ( 

470 _queue_processor_thread is not None 

471 and _queue_processor_thread.is_alive() 

472 ): 

473 return _queue_processor_thread 

474 

475 _stop_queue.clear() 

476 

477 def _run(): 

478 # Push an app context for the lifetime of the daemon so 

479 # the queue processor can call into DB code that requires 

480 # Flask g. 

481 with app.app_context(): 

482 _process_log_queue() 

483 

484 _queue_processor_thread = threading.Thread( 

485 target=_run, 

486 name="log-queue-processor", 

487 daemon=True, 

488 ) 

489 _queue_processor_thread.start() 

490 thread = _queue_processor_thread 

491 logger.info("Log queue processor daemon started") 

492 return thread 

493 

494 

495def stop_log_queue_processor(timeout: float = 2.0) -> None: 

496 """Signal the log queue processor to stop and wait briefly for it.""" 

497 global _queue_processor_thread 

498 _stop_queue.set() 

499 with _queue_processor_lock: 

500 thread = _queue_processor_thread 

501 if thread is not None: 501 ↛ exitline 501 didn't return from function 'stop_log_queue_processor' because the condition on line 501 was always true

502 thread.join(timeout=timeout) 

503 # Only clear the reference if the thread actually exited. If join 

504 # timed out the daemon is still running, and clearing the ref would 

505 # let a subsequent start_log_queue_processor() spawn a second 

506 # daemon that drains the same queue concurrently. Re-check identity 

507 # under the lock so we don't accidentally null out a fresh thread 

508 # that another start spawned in the meantime. 

509 if not thread.is_alive(): 

510 with _queue_processor_lock: 

511 if _queue_processor_thread is thread: 511 ↛ exitline 511 didn't jump to the function exit

512 _queue_processor_thread = None 

513 

514 

515def config_logger(name: str, debug: bool = False) -> None: 

516 """ 

517 Configures the default logger. 

518 

519 Args: 

520 name: The name to use for the log file. 

521 debug: Whether to enable unsafe debug logging. 

522 

523 """ 

524 from ..security.log_sanitizer import strip_control_chars 

525 

526 def _sanitize_record(record): 

527 record["message"] = strip_control_chars(record["message"]) 

528 

529 logger.configure(patcher=_sanitize_record) 

530 

531 logger.enable("local_deep_research") 

532 logger.remove() 

533 

534 # Log to console (stderr) and database 

535 stderr_level = "DEBUG" if debug else "INFO" 

536 

537 # loguru's diagnose=True renders repr() of every local variable in every 

538 # traceback frame on exceptions. Under LDR_APP_DEBUG that would dump 

539 # credentials living in frame locals (api_key, SQLCipher password, 

540 # Authorization headers) into every sink. Gate diagnose behind a separate 

541 # explicit opt-in so enabling LDR_APP_DEBUG for general debug output does 

542 # not also enable localvar dumps. Default OFF even when debug is on. 

543 diagnose = debug and os.environ.get("LDR_LOGURU_DIAGNOSE", "").lower() in ( 

544 "1", 

545 "true", 

546 "yes", 

547 ) 

548 

549 logger.add(sys.stderr, level=stderr_level, diagnose=diagnose) 

550 logger.add(database_sink, level="DEBUG", diagnose=diagnose) 

551 logger.add(frontend_progress_sink, diagnose=diagnose) 

552 

553 if debug: 

554 logger.warning( 

555 "DEBUG logging is enabled (LDR_APP_DEBUG=true). " 

556 "Logs may contain sensitive data (queries, answers, API responses). " 

557 "Do NOT use in production." 

558 ) 

559 

560 if diagnose: 

561 logger.warning( 

562 "LDR_LOGURU_DIAGNOSE is enabled: exception tracebacks will include " 

563 "local variable values, which may contain credentials (API keys, " 

564 "passwords, tokens). Do NOT use in production." 

565 ) 

566 

567 # Optionally log to file if enabled (disabled by default for security) 

568 # Check environment variable first, then database setting 

569 enable_file_logging = ( 

570 os.environ.get("LDR_ENABLE_FILE_LOGGING", "").lower() == "true" 

571 ) 

572 

573 # File logging is controlled only by environment variable for simplicity 

574 # Database settings are not available at logger initialization time 

575 

576 if enable_file_logging: 

577 log_file = _LOG_DIR / f"{name}.log" 

578 logger.add( 

579 log_file, 

580 level="DEBUG", 

581 rotation="10 MB", 

582 retention="7 days", 

583 compression="zip", 

584 diagnose=diagnose, 

585 ) 

586 logger.warning( 

587 f"File logging enabled - logs will be written to {log_file}. " 

588 "WARNING: Log files are unencrypted and may contain sensitive data!" 

589 ) 

590 

591 # Add a special log level for milestones. 

592 try: 

593 logger.level("MILESTONE", no=26, color="<magenta><bold>") 

594 except ValueError: 

595 # Level already exists, that's fine 

596 pass