Coverage for src / local_deep_research / web / services / socket_service.py: 88%

108 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1from threading import Lock 

2from typing import Any, NoReturn 

3 

4from flask import Flask, request 

5from flask_socketio import SocketIO 

6from loguru import logger 

7 

8from ..routes.globals import get_globals 

9 

10 

11class SocketIOService: 

12 """ 

13 Singleton class for managing SocketIO connections and subscriptions. 

14 """ 

15 

16 _instance = None 

17 

18 def __new__(cls, *args: Any, app: Flask | None = None, **kwargs: Any): 

19 """ 

20 Args: 

21 app: The Flask app to bind this service to. It must be specified 

22 the first time this is called and the singleton instance is 

23 created, but will be ignored after that. 

24 *args: Arguments to pass to the superclass's __new__ method. 

25 **kwargs: Keyword arguments to pass to the superclass's __new__ method. 

26 """ 

27 if not cls._instance: 

28 if app is None: 

29 raise ValueError( 

30 "Flask app must be specified to create a SocketIOService instance." 

31 ) 

32 cls._instance = super(SocketIOService, cls).__new__( 

33 cls, *args, **kwargs 

34 ) 

35 cls._instance.__init_singleton(app) 

36 return cls._instance 

37 

38 def __init_singleton(self, app: Flask) -> None: 

39 """ 

40 Initializes the singleton instance. 

41 

42 Args: 

43 app: The app to bind this service to. 

44 

45 """ 

46 self.__app = app # Store the Flask app reference 

47 self.__socketio = SocketIO( 

48 app, 

49 cors_allowed_origins="*", 

50 async_mode="threading", 

51 path="/socket.io", 

52 logger=False, 

53 engineio_logger=False, 

54 ping_timeout=20, 

55 ping_interval=5, 

56 ) 

57 

58 # Socket subscription tracking. 

59 self.__socket_subscriptions = {} 

60 # Set to false to disable logging in the event handlers. This can 

61 # be necessary because it will sometimes run the handlers directly 

62 # during a call to `emit` that was made in a logging handler. 

63 self.__logging_enabled = True 

64 # Protects access to shared state. 

65 self.__lock = Lock() 

66 

67 # Register events. 

68 @self.__socketio.on("connect") 

69 def on_connect(): 

70 self.__handle_connect(request) 

71 

72 @self.__socketio.on("disconnect") 

73 def on_disconnect(reason: str): 

74 self.__handle_disconnect(request, reason) 

75 

76 @self.__socketio.on("subscribe_to_research") 

77 def on_subscribe(data): 

78 globals_dict = get_globals() 

79 active_research = globals_dict.get("active_research", {}) 

80 self.__handle_subscribe(data, request, active_research) 

81 

82 @self.__socketio.on_error 

83 def on_error(e): 

84 return self.__handle_socket_error(e) 

85 

86 @self.__socketio.on_error_default 

87 def on_default_error(e): 

88 return self.__handle_default_error(e) 

89 

90 def __log_info(self, message: str, *args: Any, **kwargs: Any) -> None: 

91 """Log an info message.""" 

92 if self.__logging_enabled: 

93 logger.info(message, *args, **kwargs) 

94 

95 def __log_error(self, message: str, *args: Any, **kwargs: Any) -> None: 

96 """Log an error message.""" 

97 if self.__logging_enabled: 

98 logger.error(message, *args, **kwargs) 

99 

100 def __log_exception(self, message: str, *args: Any, **kwargs: Any) -> None: 

101 """Log an exception.""" 

102 if self.__logging_enabled: 

103 logger.exception(message, *args, **kwargs) 

104 

105 def emit_socket_event(self, event, data, room=None): 

106 """ 

107 Emit a socket event to clients. 

108 

109 Args: 

110 event: The event name to emit 

111 data: The data to send with the event 

112 room: Optional room ID to send to specific client 

113 

114 Returns: 

115 bool: True if emission was successful, False otherwise 

116 """ 

117 try: 

118 # If room is specified, only emit to that room 

119 if room: 

120 self.__socketio.emit(event, data, room=room) 

121 else: 

122 # Otherwise broadcast to all 

123 self.__socketio.emit(event, data) 

124 return True 

125 except Exception as e: 

126 logger.exception(f"Error emitting socket event {event}: {str(e)}") 

127 return False 

128 

129 def emit_to_subscribers( 

130 self, event_base, research_id, data, enable_logging: bool = True 

131 ): 

132 """ 

133 Emit an event to all subscribers of a specific research. 

134 

135 Args: 

136 event_base: Base event name (will be formatted with research_id) 

137 research_id: ID of the research 

138 data: The data to send with the event 

139 enable_logging: If set to false, this will disable all logging, 

140 which is useful if we are calling this inside of a logging 

141 handler. 

142 

143 Returns: 

144 bool: True if emission was successful, False otherwise 

145 

146 """ 

147 if not enable_logging: 

148 self.__logging_enabled = False 

149 

150 try: 

151 # Emit to the general channel for the research 

152 full_event = f"{event_base}_{research_id}" 

153 self.__socketio.emit(full_event, data) 

154 

155 # Emit to specific subscribers 

156 with self.__lock: 

157 subscriptions = self.__socket_subscriptions.get(research_id) 

158 if subscriptions is not None: 

159 for sid in subscriptions: 

160 try: 

161 self.__socketio.emit(full_event, data, room=sid) 

162 except Exception: 

163 self.__log_exception( 

164 f"Error emitting to subscriber {sid}" 

165 ) 

166 

167 return True 

168 except Exception: 

169 self.__log_exception( 

170 f"Error emitting to subscribers for research {research_id}" 

171 ) 

172 return False 

173 finally: 

174 self.__logging_enabled = True 

175 

176 def __handle_connect(self, request): 

177 """Handle client connection""" 

178 self.__log_info(f"Client connected: {request.sid}") 

179 

180 def __handle_disconnect(self, request, reason: str): 

181 """Handle client disconnection""" 

182 try: 

183 self.__log_info( 

184 f"Client {request.sid} disconnected because: {reason}" 

185 ) 

186 # Clean up subscriptions for this client 

187 with self.__lock: 

188 if request.sid in self.__socket_subscriptions: 

189 del self.__socket_subscriptions[request.sid] 

190 self.__log_info(f"Removed subscription for client {request.sid}") 

191 except Exception as e: 

192 self.__log_exception(f"Error handling disconnect: {e}") 

193 

194 def __handle_subscribe(self, data, request, active_research=None): 

195 """Handle client subscription to research updates""" 

196 research_id = data.get("research_id") 

197 if research_id: 

198 # Initialize subscription set if needed 

199 with self.__lock: 

200 if research_id not in self.__socket_subscriptions: 200 ↛ 204line 200 didn't jump to line 204 because the condition on line 200 was always true

201 self.__socket_subscriptions[research_id] = set() 

202 

203 # Add this client to the subscribers 

204 self.__socket_subscriptions[research_id].add(request.sid) 

205 self.__log_info( 

206 f"Client {request.sid} subscribed to research {research_id}" 

207 ) 

208 

209 # Send current status immediately if available in active research 

210 if active_research and research_id in active_research: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 progress = active_research[research_id]["progress"] 

212 latest_log = ( 

213 active_research[research_id]["log"][-1] 

214 if active_research[research_id]["log"] 

215 else None 

216 ) 

217 

218 if latest_log: 

219 self.emit_socket_event( 

220 f"research_progress_{research_id}", 

221 { 

222 "progress": progress, 

223 "message": latest_log.get( 

224 "message", "Processing..." 

225 ), 

226 "status": "in_progress", 

227 "log_entry": latest_log, 

228 }, 

229 room=request.sid, 

230 ) 

231 

232 def __handle_socket_error(self, e): 

233 """Handle Socket.IO errors""" 

234 self.__log_exception(f"Socket.IO error: {str(e)}") 

235 # Don't propagate exceptions to avoid crashing the server 

236 return False 

237 

238 def __handle_default_error(self, e): 

239 """Handle unhandled Socket.IO errors""" 

240 self.__log_exception(f"Unhandled Socket.IO error: {str(e)}") 

241 # Don't propagate exceptions to avoid crashing the server 

242 return False 

243 

244 def run(self, host: str, port: int, debug: bool = False) -> NoReturn: 

245 """ 

246 Runs the SocketIO server. 

247 

248 Args: 

249 host: The hostname to bind the server to. 

250 port: The port number to listen on. 

251 debug: Whether to run in debug mode. Defaults to False. 

252 

253 """ 

254 logger.info(f"Starting web server on {host}:{port} (debug: {debug})") 

255 self.__socketio.run( 

256 self.__app, # Use the stored Flask app reference 

257 debug=debug, 

258 host=host, 

259 port=port, 

260 allow_unsafe_werkzeug=True, 

261 use_reloader=False, 

262 )