Coverage for src/local_deep_research/utilities/llm_utils.py: 100%

97 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1# utilities/llm_utils.py 

2""" 

3LLM utilities for Local Deep Research. 

4 

5This module provides utility functions for working with language models 

6when the user's llm_config.py is missing or incomplete. 

7""" 

8 

9import asyncio 

10import threading 

11from typing import Any, Optional, Dict 

12 

13from loguru import logger 

14 

15from ..config.constants import DEFAULT_OLLAMA_URL 

16from ..config.thread_settings import get_setting_from_snapshot 

17 

18 

19__all__ = [ 

20 "get_model_identifier", 

21 "get_ollama_base_url", 

22 "get_server_url", 

23 "fetch_ollama_models", 

24] 

25 

26 

27def get_model_identifier(llm: Any) -> str: 

28 """Return a stable string identifier for an LLM instance. 

29 

30 The identifier is used as a cache key: `Journal.quality_model` records 

31 which LLM scored a cached journal, and the lookup predicate filters on 

32 it so scores from a superseded model don't get served. 

33 

34 Discovery order: 

35 1. Unwrap `ProcessingLLMWrapper` (or any wrapper exposing `.base_llm`) 

36 so we key on the underlying model, not the wrapper identity. 

37 2. Prefer `model_name` (some LangChain classes). Then `model` 

38 (ChatOpenAI, ChatAnthropic, ChatOllama use this). Fallback to the 

39 class name so we never return an ephemeral `repr(object)` that 

40 poisons the cache. 

41 

42 Returns a plain string; never None. Values written by `getattr(llm, 

43 "name", str(llm))` in earlier versions (e.g. `"<ProcessingLLMWrapper 

44 object at 0x…>"`) naturally miss this cache and re-score once. 

45 """ 

46 base = getattr(llm, "base_llm", llm) 

47 for attr in ("model_name", "model"): 

48 val = getattr(base, attr, None) 

49 if val: 

50 return str(val) 

51 return type(base).__name__ 

52 

53 

54def _close_base_llm(llm): 

55 """Close per-instance HTTP clients on a raw LLM or embeddings instance. 

56 Internal use only. 

57 

58 Applies to every ``langchain_ollama`` class — ``ChatOllama``, 

59 ``OllamaLLM``, and ``OllamaEmbeddings`` all carry the same 

60 ``_client`` / ``_async_client`` shape, eagerly constructed at instance 

61 init by a Pydantic ``@model_validator(mode="after")``. ``ChatAnthropic`` 

62 and ``ChatOpenAI`` use ``@lru_cache``'d shared httpx clients that must 

63 NOT be closed; the ``ollama``-module check below short-circuits cleanly 

64 for them. Same skip applies to local providers like 

65 ``HuggingFaceEmbeddings`` / ``SentenceTransformerEmbeddings`` (no 

66 per-instance httpx client at all). 

67 

68 Each Ollama class owns both ``_client`` (sync ``ollama.Client`` wrapping 

69 ``httpx.Client``) and ``_async_client`` (async ``ollama.AsyncClient`` 

70 wrapping ``httpx.AsyncClient``). Async paths via ``ainvoke()`` 

71 (exercised by browsecomp_entity_strategy, llm_driven_modular_strategy, 

72 modular_strategy) leak the async transport per call if only the sync 

73 side is released — investigated in #3816 where ~72% of leaked FDs were 

74 ``a_inode [eventpoll]`` selectors bound to those async clients. The 

75 same shape reappeared for embeddings after the langchain_community → 

76 langchain_ollama migration (#4352/#4353); see the resource-cleanup doc 

77 for the post-mortem. 

78 

79 Idempotent via an ``_ldr_closed`` sentinel on the inner httpx clients. 

80 

81 The async path always runs ``aclose()`` to completion: when no event 

82 loop is currently running we use ``asyncio.run()`` directly; when a 

83 loop is running in the calling thread (e.g. ``_close_base_llm`` is 

84 invoked inside async code or in a ``finally`` block reached through 

85 LangGraph/LangChain async dispatch) we hand the close off to a brief 

86 daemon thread whose own ``asyncio.run()`` is unaffected by the 

87 caller's loop state. A prior implementation skipped the close in 

88 that case and relied on the "loop owner" to close — but no loop 

89 owner code actually does, so the ``httpx.AsyncClient`` and its 

90 ``epoll_create`` FD were silently leaked. See the regression history 

91 in ``docs/developing/resource-cleanup.md`` (this is the gap left by 

92 #3855 when reaching for the in-async-context close). 

93 """ 

94 # If the llm is another wrapper with its own close(), delegate. 

95 # NOTE: if a future ChatOllama version adds a public close() method, 

96 # this short-circuit fires and the introspection below is skipped — 

97 # that future close() must then handle BOTH sync AND async clients. 

98 if hasattr(type(llm), "close"): 

99 llm.close() 

100 return 

101 

102 # Sync side: ollama.Client._client is an httpx.Client. 

103 # ``_ldr_closed is True`` (not just truthy) so we don't trip on Mock 

104 # objects without a spec, where attribute access auto-generates a child 

105 # Mock that is truthy by default. 

106 sync_ollama = getattr(llm, "_client", None) 

107 if sync_ollama is not None and type(sync_ollama).__module__.startswith( 

108 "ollama" 

109 ): 

110 sync_httpx = getattr(sync_ollama, "_client", None) 

111 if ( 

112 sync_httpx is not None 

113 and getattr(sync_httpx, "_ldr_closed", None) is not True 

114 and hasattr(sync_httpx, "close") 

115 ): 

116 try: 

117 sync_httpx.close() 

118 except Exception: 

119 logger.warning("Failed to close Ollama sync httpx client") 

120 sync_httpx._ldr_closed = True 

121 

122 # Async side: ollama.AsyncClient._client is an httpx.AsyncClient 

123 async_ollama = getattr(llm, "_async_client", None) 

124 if async_ollama is not None and type(async_ollama).__module__.startswith( 

125 "ollama" 

126 ): 

127 async_httpx = getattr(async_ollama, "_client", None) 

128 if ( 

129 async_httpx is not None 

130 and getattr(async_httpx, "_ldr_closed", None) is not True 

131 and hasattr(async_httpx, "aclose") 

132 ): 

133 try: 

134 asyncio.get_running_loop() 

135 except RuntimeError: 

136 # No running loop in this thread: spin a temporary one 

137 # to await aclose() right here. 

138 try: 

139 asyncio.run(async_httpx.aclose()) 

140 except Exception: 

141 logger.warning("Failed to close Ollama async httpx client") 

142 # Mark closed unconditionally — matches the sync-side 

143 # invariant: on a known-broken close we don't want to 

144 # retry endlessly. The WARNING above is the signal. 

145 async_httpx._ldr_closed = True 

146 else: 

147 # A loop is running in this thread. ``asyncio.run`` cannot 

148 # be called here and fire-and-forget tasks scheduled on the 

149 # caller's loop from a finally block are unreliable (the 

150 # loop may exit before the task is awaited). Hand the close 

151 # off to a brief daemon thread whose own loop is 

152 # independent of ours; cap with a bounded ``join`` so a 

153 # stuck server can't hold up shutdown. 

154 

155 def _close_in_thread() -> None: 

156 try: 

157 asyncio.run(async_httpx.aclose()) 

158 except Exception: 

159 logger.warning( 

160 "Failed to close Ollama async httpx client " 

161 "in cleanup thread" 

162 ) 

163 

164 t = threading.Thread( 

165 target=_close_in_thread, 

166 daemon=True, 

167 name="ldr-async-llm-close", 

168 ) 

169 t.start() 

170 t.join(timeout=5) 

171 if t.is_alive(): 

172 # Don't set ``_ldr_closed`` — the cleanup thread is 

173 # still running and the FD is therefore still open. 

174 # Surface at WARNING so operators can correlate 

175 # against rising eventpoll-FD counts. A subsequent 

176 # call to _close_base_llm will retry the close. 

177 logger.warning( 

178 "Async httpx close exceeded 5s; abandoning to GC. " 

179 "If this fires repeatedly, check Ollama server " 

180 "responsiveness and look for rising " 

181 "anon_inode:[eventpoll] FDs on the process." 

182 ) 

183 else: 

184 # Thread completed (with or without an inner 

185 # exception). Mark closed to match the sync-side 

186 # invariant; the inner exception, if any, was 

187 # already logged from inside the thread. 

188 async_httpx._ldr_closed = True 

189 

190 

191def _close_inner_ollama_clients(sync_client, async_client): 

192 """Close just the inner sync/async ``ollama.Client`` pair. 

193 

194 A ``weakref.finalize`` callback that strong-refs the wrapping LLM or 

195 embeddings instance would defeat its own purpose — the registry's 

196 reference would keep the instance alive forever. Callers (the Ollama 

197 provider factory's safety net) pass the inner clients directly 

198 instead. This shim wraps them in a ``_close_base_llm``-shaped proxy 

199 so we reuse the same idempotent sync+async close logic without 

200 duplicating its asyncio/eventpoll handling. 

201 """ 

202 

203 class _Proxy: 

204 pass 

205 

206 proxy = _Proxy() 

207 proxy._client = sync_client 

208 proxy._async_client = async_client 

209 _close_base_llm(proxy) 

210 

211 

212def get_ollama_base_url( 

213 settings_snapshot: Optional[Dict[str, Any]] = None, 

214) -> str: 

215 """ 

216 Get Ollama base URL from settings with normalization. 

217 

218 Checks both embeddings.ollama.url and llm.ollama.url settings, 

219 falling back to http://localhost:11434. 

220 

221 Args: 

222 settings_snapshot: Optional settings snapshot 

223 

224 Returns: 

225 Normalized Ollama base URL 

226 """ 

227 from .url_utils import normalize_url 

228 

229 raw_base_url = get_setting_from_snapshot( 

230 "embeddings.ollama.url", 

231 default=get_setting_from_snapshot( 

232 "llm.ollama.url", # Fall back to LLM setting 

233 default=DEFAULT_OLLAMA_URL, 

234 settings_snapshot=settings_snapshot, 

235 ), 

236 settings_snapshot=settings_snapshot, 

237 ) 

238 return normalize_url(raw_base_url) if raw_base_url else DEFAULT_OLLAMA_URL 

239 

240 

241def get_server_url(settings_snapshot: Optional[Dict[str, Any]] = None) -> str: 

242 """ 

243 Get server URL from settings with fallback logic. 

244 

245 Checks multiple sources in order: 

246 1. Direct server_url in settings snapshot 

247 2. system.server_url in settings 

248 3. Constructs from web.host, web.port, and web.use_https 

249 4. Fallback to http://127.0.0.1:5000/ 

250 

251 Args: 

252 settings_snapshot: Optional settings snapshot 

253 

254 Returns: 

255 Server URL with trailing slash 

256 """ 

257 

258 server_url = None 

259 

260 if settings_snapshot: 

261 # Try to get server URL from research metadata first (where we added it) 

262 server_url = settings_snapshot.get("server_url") 

263 

264 # If not found, try system settings 

265 if not server_url: 

266 system_settings = settings_snapshot.get("system", {}) 

267 server_url = system_settings.get("server_url") 

268 

269 # If not found, try web.host and web.port settings 

270 if not server_url: 

271 host = get_setting_from_snapshot( 

272 "web.host", settings_snapshot, "127.0.0.1" 

273 ) 

274 port = get_setting_from_snapshot( 

275 "web.port", settings_snapshot, 5000 

276 ) 

277 use_https = get_setting_from_snapshot( 

278 "web.use_https", settings_snapshot, True 

279 ) 

280 

281 # Use localhost for 0.0.0.0 bindings as that's what users will use 

282 if host == "0.0.0.0": 

283 host = "127.0.0.1" 

284 

285 scheme = "https" if use_https else "http" 

286 server_url = f"{scheme}://{host}:{port}/" 

287 

288 # Fallback to default if still not found 

289 if not server_url: 

290 server_url = "http://127.0.0.1:5000/" 

291 logger.warning("Could not determine server URL, using default") 

292 

293 return server_url 

294 

295 

296def fetch_ollama_models( 

297 base_url: str, 

298 timeout: float = 3.0, 

299 auth_headers: Optional[Dict[str, str]] = None, 

300) -> list[Dict[str, str]]: 

301 """ 

302 Fetch available models from Ollama API. 

303 

304 Centralized function to avoid duplication between LLM and embedding providers. 

305 

306 Args: 

307 base_url: Ollama base URL (should be normalized) 

308 timeout: Request timeout in seconds 

309 auth_headers: Optional authentication headers 

310 

311 Returns: 

312 List of model dicts with 'value' (model name) and 'label' (display name) keys. 

313 Returns empty list on error. 

314 """ 

315 from ..security import safe_get 

316 

317 models = [] 

318 

319 try: 

320 response = safe_get( 

321 f"{base_url}/api/tags", 

322 timeout=timeout, 

323 headers=auth_headers or {}, 

324 allow_localhost=True, 

325 allow_private_ips=True, 

326 ) 

327 

328 if response.status_code == 200: 

329 data = response.json() 

330 

331 # Handle both newer and older Ollama API formats 

332 ollama_models = ( 

333 data.get("models", []) if isinstance(data, dict) else data 

334 ) 

335 

336 for model_data in ollama_models: 

337 model_name = model_data.get("name", "") 

338 if model_name: 

339 models.append({"value": model_name, "label": model_name}) 

340 

341 logger.info(f"Found {len(models)} Ollama models") 

342 else: 

343 logger.warning( 

344 f"Failed to fetch Ollama models: HTTP {response.status_code}" 

345 ) 

346 

347 except Exception: 

348 logger.exception("Error fetching Ollama models") 

349 

350 return models