Coverage for src / local_deep_research / web / services / socket_service.py: 88%
108 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1from threading import Lock
2from typing import Any, NoReturn
4from flask import Flask, request
5from flask_socketio import SocketIO
6from loguru import logger
8from ..routes.globals import get_globals
11class SocketIOService:
12 """
13 Singleton class for managing SocketIO connections and subscriptions.
14 """
16 _instance = None
18 def __new__(cls, *args: Any, app: Flask | None = None, **kwargs: Any):
19 """
20 Args:
21 app: The Flask app to bind this service to. It must be specified
22 the first time this is called and the singleton instance is
23 created, but will be ignored after that.
24 *args: Arguments to pass to the superclass's __new__ method.
25 **kwargs: Keyword arguments to pass to the superclass's __new__ method.
26 """
27 if not cls._instance:
28 if app is None:
29 raise ValueError(
30 "Flask app must be specified to create a SocketIOService instance."
31 )
32 cls._instance = super(SocketIOService, cls).__new__(
33 cls, *args, **kwargs
34 )
35 cls._instance.__init_singleton(app)
36 return cls._instance
38 def __init_singleton(self, app: Flask) -> None:
39 """
40 Initializes the singleton instance.
42 Args:
43 app: The app to bind this service to.
45 """
46 self.__app = app # Store the Flask app reference
47 self.__socketio = SocketIO(
48 app,
49 cors_allowed_origins="*",
50 async_mode="threading",
51 path="/socket.io",
52 logger=False,
53 engineio_logger=False,
54 ping_timeout=20,
55 ping_interval=5,
56 )
58 # Socket subscription tracking.
59 self.__socket_subscriptions = {}
60 # Set to false to disable logging in the event handlers. This can
61 # be necessary because it will sometimes run the handlers directly
62 # during a call to `emit` that was made in a logging handler.
63 self.__logging_enabled = True
64 # Protects access to shared state.
65 self.__lock = Lock()
67 # Register events.
68 @self.__socketio.on("connect")
69 def on_connect():
70 self.__handle_connect(request)
72 @self.__socketio.on("disconnect")
73 def on_disconnect(reason: str):
74 self.__handle_disconnect(request, reason)
76 @self.__socketio.on("subscribe_to_research")
77 def on_subscribe(data):
78 globals_dict = get_globals()
79 active_research = globals_dict.get("active_research", {})
80 self.__handle_subscribe(data, request, active_research)
82 @self.__socketio.on_error
83 def on_error(e):
84 return self.__handle_socket_error(e)
86 @self.__socketio.on_error_default
87 def on_default_error(e):
88 return self.__handle_default_error(e)
90 def __log_info(self, message: str, *args: Any, **kwargs: Any) -> None:
91 """Log an info message."""
92 if self.__logging_enabled:
93 logger.info(message, *args, **kwargs)
95 def __log_error(self, message: str, *args: Any, **kwargs: Any) -> None:
96 """Log an error message."""
97 if self.__logging_enabled:
98 logger.error(message, *args, **kwargs)
100 def __log_exception(self, message: str, *args: Any, **kwargs: Any) -> None:
101 """Log an exception."""
102 if self.__logging_enabled:
103 logger.exception(message, *args, **kwargs)
105 def emit_socket_event(self, event, data, room=None):
106 """
107 Emit a socket event to clients.
109 Args:
110 event: The event name to emit
111 data: The data to send with the event
112 room: Optional room ID to send to specific client
114 Returns:
115 bool: True if emission was successful, False otherwise
116 """
117 try:
118 # If room is specified, only emit to that room
119 if room:
120 self.__socketio.emit(event, data, room=room)
121 else:
122 # Otherwise broadcast to all
123 self.__socketio.emit(event, data)
124 return True
125 except Exception as e:
126 logger.exception(f"Error emitting socket event {event}: {str(e)}")
127 return False
129 def emit_to_subscribers(
130 self, event_base, research_id, data, enable_logging: bool = True
131 ):
132 """
133 Emit an event to all subscribers of a specific research.
135 Args:
136 event_base: Base event name (will be formatted with research_id)
137 research_id: ID of the research
138 data: The data to send with the event
139 enable_logging: If set to false, this will disable all logging,
140 which is useful if we are calling this inside of a logging
141 handler.
143 Returns:
144 bool: True if emission was successful, False otherwise
146 """
147 if not enable_logging:
148 self.__logging_enabled = False
150 try:
151 # Emit to the general channel for the research
152 full_event = f"{event_base}_{research_id}"
153 self.__socketio.emit(full_event, data)
155 # Emit to specific subscribers
156 with self.__lock:
157 subscriptions = self.__socket_subscriptions.get(research_id)
158 if subscriptions is not None:
159 for sid in subscriptions:
160 try:
161 self.__socketio.emit(full_event, data, room=sid)
162 except Exception:
163 self.__log_exception(
164 f"Error emitting to subscriber {sid}"
165 )
167 return True
168 except Exception:
169 self.__log_exception(
170 f"Error emitting to subscribers for research {research_id}"
171 )
172 return False
173 finally:
174 self.__logging_enabled = True
176 def __handle_connect(self, request):
177 """Handle client connection"""
178 self.__log_info(f"Client connected: {request.sid}")
180 def __handle_disconnect(self, request, reason: str):
181 """Handle client disconnection"""
182 try:
183 self.__log_info(
184 f"Client {request.sid} disconnected because: {reason}"
185 )
186 # Clean up subscriptions for this client
187 with self.__lock:
188 if request.sid in self.__socket_subscriptions:
189 del self.__socket_subscriptions[request.sid]
190 self.__log_info(f"Removed subscription for client {request.sid}")
191 except Exception as e:
192 self.__log_exception(f"Error handling disconnect: {e}")
194 def __handle_subscribe(self, data, request, active_research=None):
195 """Handle client subscription to research updates"""
196 research_id = data.get("research_id")
197 if research_id:
198 # Initialize subscription set if needed
199 with self.__lock:
200 if research_id not in self.__socket_subscriptions: 200 ↛ 204line 200 didn't jump to line 204 because the condition on line 200 was always true
201 self.__socket_subscriptions[research_id] = set()
203 # Add this client to the subscribers
204 self.__socket_subscriptions[research_id].add(request.sid)
205 self.__log_info(
206 f"Client {request.sid} subscribed to research {research_id}"
207 )
209 # Send current status immediately if available in active research
210 if active_research and research_id in active_research: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 progress = active_research[research_id]["progress"]
212 latest_log = (
213 active_research[research_id]["log"][-1]
214 if active_research[research_id]["log"]
215 else None
216 )
218 if latest_log:
219 self.emit_socket_event(
220 f"research_progress_{research_id}",
221 {
222 "progress": progress,
223 "message": latest_log.get(
224 "message", "Processing..."
225 ),
226 "status": "in_progress",
227 "log_entry": latest_log,
228 },
229 room=request.sid,
230 )
232 def __handle_socket_error(self, e):
233 """Handle Socket.IO errors"""
234 self.__log_exception(f"Socket.IO error: {str(e)}")
235 # Don't propagate exceptions to avoid crashing the server
236 return False
238 def __handle_default_error(self, e):
239 """Handle unhandled Socket.IO errors"""
240 self.__log_exception(f"Unhandled Socket.IO error: {str(e)}")
241 # Don't propagate exceptions to avoid crashing the server
242 return False
244 def run(self, host: str, port: int, debug: bool = False) -> NoReturn:
245 """
246 Runs the SocketIO server.
248 Args:
249 host: The hostname to bind the server to.
250 port: The port number to listen on.
251 debug: Whether to run in debug mode. Defaults to False.
253 """
254 logger.info(f"Starting web server on {host}:{port} (debug: {debug})")
255 self.__socketio.run(
256 self.__app, # Use the stored Flask app reference
257 debug=debug,
258 host=host,
259 port=port,
260 allow_unsafe_werkzeug=True,
261 use_reloader=False,
262 )