Coverage for src / local_deep_research / utilities / log_utils.py: 96%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Utilities for logging. 

3""" 

4 

5# Needed for loguru annotations 

6from __future__ import annotations 

7 

8import inspect 

9 

10# import logging - needed for InterceptHandler compatibility 

11import logging 

12import os 

13import queue 

14import sys 

15import threading 

16from functools import wraps 

17from typing import Any, Callable 

18 

19import loguru 

20from flask import g, has_app_context 

21from loguru import logger 

22 

23from ..config.paths import get_logs_directory 

24from ..database.models import ResearchLog 

25from ..web.services.socket_service import SocketIOService 

26 

27_LOG_DIR = get_logs_directory() 

28_LOG_DIR.mkdir(parents=True, exist_ok=True) 

29 

30# Thread-safe queue for database logs from background threads 

31_log_queue = queue.Queue(maxsize=1000) 

32_queue_processor_thread = None 

33_stop_queue = threading.Event() 

34""" 

35Default log directory to use. 

36""" 

37 

38 

39class InterceptHandler(logging.Handler): 

40 """ 

41 Intercepts logging messages and forwards them to Loguru's logger. 

42 """ 

43 

44 def emit(self, record: logging.LogRecord) -> None: 

45 # Get corresponding Loguru level if it exists. 

46 try: 

47 level: str | int = logger.level(record.levelname).name 

48 except ValueError: 

49 level = record.levelno 

50 

51 # Find caller from where originated the logged message. 

52 frame, depth = inspect.currentframe(), 0 

53 while frame: 53 ↛ 62line 53 didn't jump to line 62 because the condition on line 53 was always true

54 filename = frame.f_code.co_filename 

55 is_logging = filename == logging.__file__ 

56 is_frozen = "importlib" in filename and "_bootstrap" in filename 

57 if depth > 0 and not (is_logging or is_frozen): 

58 break 

59 frame = frame.f_back 

60 depth += 1 

61 

62 logger.opt(depth=depth, exception=record.exc_info).log( 

63 level, record.getMessage() 

64 ) 

65 

66 

67def log_for_research( 

68 to_wrap: Callable[[str, ...], Any], 

69) -> Callable[[str, ...], Any]: 

70 """ 

71 Decorator for a function that's part of the research process. It expects the function to 

72 take the research ID (UUID) as the first parameter, and configures all log 

73 messages made during this request to include the research ID. 

74 

75 Args: 

76 to_wrap: The function to wrap. Should take the research ID as the first parameter. 

77 

78 Returns: 

79 The wrapped function. 

80 

81 """ 

82 

83 @wraps(to_wrap) 

84 def wrapped(research_id: str, *args: Any, **kwargs: Any) -> Any: 

85 g.research_id = research_id 

86 result = to_wrap(research_id, *args, **kwargs) 

87 g.pop("research_id") 

88 return result 

89 

90 return wrapped 

91 

92 

93def _get_research_id(record=None) -> str | None: 

94 """ 

95 Gets the current research ID (UUID), if present. 

96 

97 Args: 

98 record: Optional loguru record that might contain bound research_id 

99 

100 Returns: 

101 The current research ID (UUID), or None if it does not exist. 

102 

103 """ 

104 research_id = None 

105 

106 # First check if research_id is bound to the log record 

107 if record and "extra" in record and "research_id" in record["extra"]: 

108 research_id = record["extra"]["research_id"] 

109 # Then check Flask context 

110 elif has_app_context(): 

111 research_id = g.get("research_id") 

112 

113 return research_id 

114 

115 

116def _process_log_queue(): 

117 """ 

118 Process logs from the queue in a dedicated thread with app context. 

119 This runs in the main thread context to avoid SQLite thread safety issues. 

120 """ 

121 while not _stop_queue.is_set(): 

122 try: 

123 # Wait for logs with timeout to check stop flag 

124 log_entry = _log_queue.get(timeout=0.1) 

125 

126 # Skip if no entry 

127 if log_entry is None: 

128 continue 

129 

130 # Write to database if we have app context 

131 if has_app_context(): 

132 _write_log_to_database(log_entry) 

133 else: 

134 # If no app context, put it back in queue for later 

135 try: 

136 _log_queue.put_nowait(log_entry) 

137 except queue.Full: 

138 pass # Drop log if queue is full 

139 

140 except queue.Empty: 

141 continue 

142 except Exception: 

143 pass # noqa: silent-exception — must not let logging errors crash the log processor thread 

144 

145 

146def _write_log_to_database(log_entry: dict) -> None: 

147 """ 

148 Write a log entry to the database. Should only be called from main thread. 

149 """ 

150 from ..database.session_context import get_user_db_session 

151 

152 try: 

153 username = log_entry.get("username") 

154 

155 with get_user_db_session(username) as db_session: 

156 if db_session: 156 ↛ exitline 156 didn't jump to the function exit

157 db_log = ResearchLog( 

158 timestamp=log_entry["timestamp"], 

159 message=log_entry["message"], 

160 module=log_entry["module"], 

161 function=log_entry["function"], 

162 line_no=log_entry["line_no"], 

163 level=log_entry["level"], 

164 research_id=log_entry["research_id"], 

165 ) 

166 db_session.add(db_log) 

167 db_session.commit() 

168 except Exception: 

169 pass # noqa: silent-exception — DB errors in the logging path must not propagate or recurse 

170 

171 

172def database_sink(message: loguru.Message) -> None: 

173 """ 

174 Sink that saves messages to the database. 

175 Queues logs from background threads for later processing. 

176 

177 Args: 

178 message: The log message to save. 

179 

180 """ 

181 record = message.record 

182 research_id = _get_research_id(record) 

183 

184 # Create log entry dict 

185 log_entry = { 

186 "timestamp": record["time"], 

187 "message": record["message"], 

188 "module": record["name"], 

189 "function": record["function"], 

190 "line_no": int(record["line"]), 

191 "level": record["level"].name, 

192 "research_id": research_id, 

193 "username": record.get("extra", {}).get("username"), 

194 } 

195 

196 # Check if we're in a background thread 

197 # Note: Socket.IO handlers run in separate threads even with app context 

198 if not has_app_context() or threading.current_thread().name != "MainThread": 

199 # Queue the log for later processing 

200 try: 

201 _log_queue.put_nowait(log_entry) 

202 except queue.Full: 

203 # Drop log if queue is full to avoid blocking 

204 pass 

205 else: 

206 # We're in the main thread with app context - write directly 

207 _write_log_to_database(log_entry) 

208 

209 

210def frontend_progress_sink(message: loguru.Message) -> None: 

211 """ 

212 Sink that sends messages to the frontend. 

213 

214 Args: 

215 message: The log message to send. 

216 

217 """ 

218 record = message.record 

219 research_id = _get_research_id(record) 

220 if research_id is None: 

221 # If we don't have a research ID, don't send anything. 

222 # Can't use logger here as it causes deadlock 

223 return 

224 

225 frontend_log = { 

226 "log_entry": { 

227 "message": record["message"], 

228 "type": record["level"].name, # Keep original case 

229 "time": record["time"].isoformat(), 

230 }, 

231 } 

232 SocketIOService().emit_to_subscribers( 

233 "progress", research_id, frontend_log, enable_logging=False 

234 ) 

235 

236 

237def flush_log_queue(): 

238 """ 

239 Flush all pending logs from the queue to the database. 

240 This should be called from a Flask request context. 

241 """ 

242 flushed = 0 

243 while not _log_queue.empty(): 

244 try: 

245 log_entry = _log_queue.get_nowait() 

246 _write_log_to_database(log_entry) 

247 flushed += 1 

248 except queue.Empty: 

249 break 

250 except Exception: 

251 pass # noqa: silent-exception — DB errors during log flush must not propagate 

252 

253 if flushed > 0: 

254 logger.debug(f"Flushed {flushed} queued log entries to database") 

255 

256 

257def config_logger(name: str, debug: bool = False) -> None: 

258 """ 

259 Configures the default logger. 

260 

261 Args: 

262 name: The name to use for the log file. 

263 debug: Whether to enable unsafe debug logging. 

264 

265 """ 

266 from ..security.log_sanitizer import strip_control_chars 

267 

268 def _sanitize_record(record): 

269 record["message"] = strip_control_chars(record["message"]) 

270 

271 logger.configure(patcher=_sanitize_record) 

272 

273 logger.enable("local_deep_research") 

274 logger.remove() 

275 

276 # Log to console (stderr) and database 

277 stderr_level = "DEBUG" if debug else "INFO" 

278 logger.add(sys.stderr, level=stderr_level, diagnose=debug) 

279 logger.add(database_sink, level="DEBUG", diagnose=debug) 

280 logger.add(frontend_progress_sink, diagnose=debug) 

281 

282 if debug: 

283 logger.warning( 

284 "DEBUG logging is enabled (LDR_APP_DEBUG=true). " 

285 "Logs may contain sensitive data (queries, answers, API responses). " 

286 "Do NOT use in production." 

287 ) 

288 

289 # Optionally log to file if enabled (disabled by default for security) 

290 # Check environment variable first, then database setting 

291 enable_file_logging = ( 

292 os.environ.get("LDR_ENABLE_FILE_LOGGING", "").lower() == "true" 

293 ) 

294 

295 # File logging is controlled only by environment variable for simplicity 

296 # Database settings are not available at logger initialization time 

297 

298 if enable_file_logging: 

299 log_file = _LOG_DIR / f"{name}.log" 

300 logger.add( 

301 log_file, 

302 level="DEBUG", 

303 rotation="10 MB", 

304 retention="7 days", 

305 compression="zip", 

306 diagnose=debug, 

307 ) 

308 logger.warning( 

309 f"File logging enabled - logs will be written to {log_file}. " 

310 "WARNING: Log files are unencrypted and may contain sensitive data!" 

311 ) 

312 

313 # Add a special log level for milestones. 

314 try: 

315 logger.level("MILESTONE", no=26, color="<magenta><bold>") 

316 except ValueError: 

317 # Level already exists, that's fine 

318 pass