Coverage for src / local_deep_research / utilities / log_utils.py: 82%

124 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Utilities for logging. 

3""" 

4 

5# Needed for loguru annotations 

6from __future__ import annotations 

7 

8import inspect 

9 

10# import logging - needed for InterceptHandler compatibility 

11import logging 

12import os 

13import queue 

14import sys 

15import threading 

16from functools import wraps 

17from typing import Any, Callable 

18 

19import loguru 

20from flask import g, has_app_context 

21from loguru import logger 

22 

23from ..config.paths import get_logs_directory 

24from ..database.models import ResearchLog 

25from ..web.services.socket_service import SocketIOService 

26 

27_LOG_DIR = get_logs_directory() 

28_LOG_DIR.mkdir(parents=True, exist_ok=True) 

29 

30# Thread-safe queue for database logs from background threads 

31_log_queue = queue.Queue(maxsize=1000) 

32_queue_processor_thread = None 

33_stop_queue = threading.Event() 

34""" 

35Default log directory to use. 

36""" 

37 

38 

39class InterceptHandler(logging.Handler): 

40 """ 

41 Intercepts logging messages and forwards them to Loguru's logger. 

42 """ 

43 

44 def emit(self, record: logging.LogRecord) -> None: 

45 # Get corresponding Loguru level if it exists. 

46 try: 

47 level: str | int = logger.level(record.levelname).name 

48 except ValueError: 

49 level = record.levelno 

50 

51 # Find caller from where originated the logged message. 

52 frame, depth = inspect.currentframe(), 0 

53 while frame: 53 ↛ 62line 53 didn't jump to line 62 because the condition on line 53 was always true

54 filename = frame.f_code.co_filename 

55 is_logging = filename == logging.__file__ 

56 is_frozen = "importlib" in filename and "_bootstrap" in filename 

57 if depth > 0 and not (is_logging or is_frozen): 

58 break 

59 frame = frame.f_back 

60 depth += 1 

61 

62 logger.opt(depth=depth, exception=record.exc_info).log( 

63 level, record.getMessage() 

64 ) 

65 

66 

67def log_for_research( 

68 to_wrap: Callable[[str, ...], Any], 

69) -> Callable[[str, ...], Any]: 

70 """ 

71 Decorator for a function that's part of the research process. It expects the function to 

72 take the research ID (UUID) as the first parameter, and configures all log 

73 messages made during this request to include the research ID. 

74 

75 Args: 

76 to_wrap: The function to wrap. Should take the research ID as the first parameter. 

77 

78 Returns: 

79 The wrapped function. 

80 

81 """ 

82 

83 @wraps(to_wrap) 

84 def wrapped(research_id: str, *args: Any, **kwargs: Any) -> Any: 

85 g.research_id = research_id 

86 result = to_wrap(research_id, *args, **kwargs) 

87 g.pop("research_id") 

88 return result 

89 

90 return wrapped 

91 

92 

93def _get_research_id(record=None) -> str | None: 

94 """ 

95 Gets the current research ID (UUID), if present. 

96 

97 Args: 

98 record: Optional loguru record that might contain bound research_id 

99 

100 Returns: 

101 The current research ID (UUID), or None if it does not exist. 

102 

103 """ 

104 research_id = None 

105 

106 # First check if research_id is bound to the log record 

107 if record and "extra" in record and "research_id" in record["extra"]: 

108 research_id = record["extra"]["research_id"] 

109 # Then check Flask context 

110 elif has_app_context(): 

111 research_id = g.get("research_id") 

112 

113 return research_id 

114 

115 

116def _process_log_queue(): 

117 """ 

118 Process logs from the queue in a dedicated thread with app context. 

119 This runs in the main thread context to avoid SQLite thread safety issues. 

120 """ 

121 while not _stop_queue.is_set(): 

122 try: 

123 # Wait for logs with timeout to check stop flag 

124 log_entry = _log_queue.get(timeout=0.1) 

125 

126 # Skip if no entry 

127 if log_entry is None: 

128 continue 

129 

130 # Write to database if we have app context 

131 if has_app_context(): 

132 _write_log_to_database(log_entry) 

133 else: 

134 # If no app context, put it back in queue for later 

135 try: 

136 _log_queue.put_nowait(log_entry) 

137 except queue.Full: 

138 pass # Drop log if queue is full 

139 

140 except queue.Empty: 

141 continue 

142 except Exception: 

143 # Don't let logging errors crash the processor 

144 pass 

145 

146 

147def _write_log_to_database(log_entry: dict) -> None: 

148 """ 

149 Write a log entry to the database. Should only be called from main thread. 

150 """ 

151 from ..database.session_context import get_user_db_session 

152 

153 try: 

154 username = log_entry.get("username") 

155 

156 with get_user_db_session(username) as db_session: 

157 if db_session: 157 ↛ exitline 157 didn't jump to the function exit

158 db_log = ResearchLog( 

159 timestamp=log_entry["timestamp"], 

160 message=log_entry["message"], 

161 module=log_entry["module"], 

162 function=log_entry["function"], 

163 line_no=log_entry["line_no"], 

164 level=log_entry["level"], 

165 research_id=log_entry["research_id"], 

166 ) 

167 db_session.add(db_log) 

168 db_session.commit() 

169 except Exception: 

170 # Ignore database errors in logging 

171 pass 

172 

173 

174def database_sink(message: loguru.Message) -> None: 

175 """ 

176 Sink that saves messages to the database. 

177 Queues logs from background threads for later processing. 

178 

179 Args: 

180 message: The log message to save. 

181 

182 """ 

183 record = message.record 

184 research_id = _get_research_id(record) 

185 

186 # Create log entry dict 

187 log_entry = { 

188 "timestamp": record["time"], 

189 "message": record["message"], 

190 "module": record["name"], 

191 "function": record["function"], 

192 "line_no": int(record["line"]), 

193 "level": record["level"].name, 

194 "research_id": research_id, 

195 "username": record.get("extra", {}).get("username"), 

196 } 

197 

198 # Check if we're in a background thread 

199 # Note: Socket.IO handlers run in separate threads even with app context 

200 if not has_app_context() or threading.current_thread().name != "MainThread": 

201 # Queue the log for later processing 

202 try: 

203 _log_queue.put_nowait(log_entry) 

204 except queue.Full: 

205 # Drop log if queue is full to avoid blocking 

206 pass 

207 else: 

208 # We're in the main thread with app context - write directly 

209 _write_log_to_database(log_entry) 

210 

211 

212def frontend_progress_sink(message: loguru.Message) -> None: 

213 """ 

214 Sink that sends messages to the frontend. 

215 

216 Args: 

217 message: The log message to send. 

218 

219 """ 

220 record = message.record 

221 research_id = _get_research_id(record) 

222 if research_id is None: 

223 # If we don't have a research ID, don't send anything. 

224 # Can't use logger here as it causes deadlock 

225 return 

226 

227 frontend_log = dict( 

228 log_entry=dict( 

229 message=record["message"], 

230 type=record["level"].name, # Keep original case 

231 time=record["time"].isoformat(), 

232 ), 

233 ) 

234 SocketIOService().emit_to_subscribers( 

235 "progress", research_id, frontend_log, enable_logging=False 

236 ) 

237 

238 

239def flush_log_queue(): 

240 """ 

241 Flush all pending logs from the queue to the database. 

242 This should be called from a Flask request context. 

243 """ 

244 flushed = 0 

245 while not _log_queue.empty(): 

246 try: 

247 log_entry = _log_queue.get_nowait() 

248 _write_log_to_database(log_entry) 

249 flushed += 1 

250 except queue.Empty: 

251 break 

252 except Exception: 

253 pass 

254 

255 if flushed > 0: 

256 logger.debug(f"Flushed {flushed} queued log entries to database") 

257 

258 

259def config_logger(name: str, debug: bool = False) -> None: 

260 """ 

261 Configures the default logger. 

262 

263 Args: 

264 name: The name to use for the log file. 

265 debug: Whether to enable unsafe debug logging. 

266 

267 """ 

268 logger.enable("local_deep_research") 

269 logger.remove() 

270 

271 # Log to console (stderr) and database 

272 logger.add(sys.stderr, level="INFO", diagnose=debug) 

273 logger.add(database_sink, level="DEBUG", diagnose=debug) 

274 logger.add(frontend_progress_sink, diagnose=debug) 

275 

276 # Optionally log to file if enabled (disabled by default for security) 

277 # Check environment variable first, then database setting 

278 enable_file_logging = ( 

279 os.environ.get("LDR_ENABLE_FILE_LOGGING", "").lower() == "true" 

280 ) 

281 

282 # File logging is controlled only by environment variable for simplicity 

283 # Database settings are not available at logger initialization time 

284 

285 if enable_file_logging: 

286 log_file = _LOG_DIR / f"{name}.log" 

287 logger.add( 

288 log_file, 

289 level="DEBUG", 

290 rotation="10 MB", 

291 retention="7 days", 

292 compression="zip", 

293 diagnose=debug, 

294 ) 

295 logger.warning( 

296 f"File logging enabled - logs will be written to {log_file}. " 

297 "WARNING: Log files are unencrypted and may contain sensitive data!" 

298 ) 

299 

300 # Add a special log level for milestones. 

301 try: 

302 logger.level("MILESTONE", no=26, color="<magenta><bold>") 

303 except ValueError: 

304 # Level already exists, that's fine 

305 pass