Coverage for src/local_deep_research/utilities/llm_utils.py: 100%
97 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1# utilities/llm_utils.py
2"""
3LLM utilities for Local Deep Research.
5This module provides utility functions for working with language models
6when the user's llm_config.py is missing or incomplete.
7"""
9import asyncio
10import threading
11from typing import Any, Optional, Dict
13from loguru import logger
15from ..config.constants import DEFAULT_OLLAMA_URL
16from ..config.thread_settings import get_setting_from_snapshot
19__all__ = [
20 "get_model_identifier",
21 "get_ollama_base_url",
22 "get_server_url",
23 "fetch_ollama_models",
24]
27def get_model_identifier(llm: Any) -> str:
28 """Return a stable string identifier for an LLM instance.
30 The identifier is used as a cache key: `Journal.quality_model` records
31 which LLM scored a cached journal, and the lookup predicate filters on
32 it so scores from a superseded model don't get served.
34 Discovery order:
35 1. Unwrap `ProcessingLLMWrapper` (or any wrapper exposing `.base_llm`)
36 so we key on the underlying model, not the wrapper identity.
37 2. Prefer `model_name` (some LangChain classes). Then `model`
38 (ChatOpenAI, ChatAnthropic, ChatOllama use this). Fallback to the
39 class name so we never return an ephemeral `repr(object)` that
40 poisons the cache.
42 Returns a plain string; never None. Values written by `getattr(llm,
43 "name", str(llm))` in earlier versions (e.g. `"<ProcessingLLMWrapper
44 object at 0x…>"`) naturally miss this cache and re-score once.
45 """
46 base = getattr(llm, "base_llm", llm)
47 for attr in ("model_name", "model"):
48 val = getattr(base, attr, None)
49 if val:
50 return str(val)
51 return type(base).__name__
54def _close_base_llm(llm):
55 """Close per-instance HTTP clients on a raw LLM or embeddings instance.
56 Internal use only.
58 Applies to every ``langchain_ollama`` class — ``ChatOllama``,
59 ``OllamaLLM``, and ``OllamaEmbeddings`` all carry the same
60 ``_client`` / ``_async_client`` shape, eagerly constructed at instance
61 init by a Pydantic ``@model_validator(mode="after")``. ``ChatAnthropic``
62 and ``ChatOpenAI`` use ``@lru_cache``'d shared httpx clients that must
63 NOT be closed; the ``ollama``-module check below short-circuits cleanly
64 for them. Same skip applies to local providers like
65 ``HuggingFaceEmbeddings`` / ``SentenceTransformerEmbeddings`` (no
66 per-instance httpx client at all).
68 Each Ollama class owns both ``_client`` (sync ``ollama.Client`` wrapping
69 ``httpx.Client``) and ``_async_client`` (async ``ollama.AsyncClient``
70 wrapping ``httpx.AsyncClient``). Async paths via ``ainvoke()``
71 (exercised by browsecomp_entity_strategy, llm_driven_modular_strategy,
72 modular_strategy) leak the async transport per call if only the sync
73 side is released — investigated in #3816 where ~72% of leaked FDs were
74 ``a_inode [eventpoll]`` selectors bound to those async clients. The
75 same shape reappeared for embeddings after the langchain_community →
76 langchain_ollama migration (#4352/#4353); see the resource-cleanup doc
77 for the post-mortem.
79 Idempotent via an ``_ldr_closed`` sentinel on the inner httpx clients.
81 The async path always runs ``aclose()`` to completion: when no event
82 loop is currently running we use ``asyncio.run()`` directly; when a
83 loop is running in the calling thread (e.g. ``_close_base_llm`` is
84 invoked inside async code or in a ``finally`` block reached through
85 LangGraph/LangChain async dispatch) we hand the close off to a brief
86 daemon thread whose own ``asyncio.run()`` is unaffected by the
87 caller's loop state. A prior implementation skipped the close in
88 that case and relied on the "loop owner" to close — but no loop
89 owner code actually does, so the ``httpx.AsyncClient`` and its
90 ``epoll_create`` FD were silently leaked. See the regression history
91 in ``docs/developing/resource-cleanup.md`` (this is the gap left by
92 #3855 when reaching for the in-async-context close).
93 """
94 # If the llm is another wrapper with its own close(), delegate.
95 # NOTE: if a future ChatOllama version adds a public close() method,
96 # this short-circuit fires and the introspection below is skipped —
97 # that future close() must then handle BOTH sync AND async clients.
98 if hasattr(type(llm), "close"):
99 llm.close()
100 return
102 # Sync side: ollama.Client._client is an httpx.Client.
103 # ``_ldr_closed is True`` (not just truthy) so we don't trip on Mock
104 # objects without a spec, where attribute access auto-generates a child
105 # Mock that is truthy by default.
106 sync_ollama = getattr(llm, "_client", None)
107 if sync_ollama is not None and type(sync_ollama).__module__.startswith(
108 "ollama"
109 ):
110 sync_httpx = getattr(sync_ollama, "_client", None)
111 if (
112 sync_httpx is not None
113 and getattr(sync_httpx, "_ldr_closed", None) is not True
114 and hasattr(sync_httpx, "close")
115 ):
116 try:
117 sync_httpx.close()
118 except Exception:
119 logger.warning("Failed to close Ollama sync httpx client")
120 sync_httpx._ldr_closed = True
122 # Async side: ollama.AsyncClient._client is an httpx.AsyncClient
123 async_ollama = getattr(llm, "_async_client", None)
124 if async_ollama is not None and type(async_ollama).__module__.startswith(
125 "ollama"
126 ):
127 async_httpx = getattr(async_ollama, "_client", None)
128 if (
129 async_httpx is not None
130 and getattr(async_httpx, "_ldr_closed", None) is not True
131 and hasattr(async_httpx, "aclose")
132 ):
133 try:
134 asyncio.get_running_loop()
135 except RuntimeError:
136 # No running loop in this thread: spin a temporary one
137 # to await aclose() right here.
138 try:
139 asyncio.run(async_httpx.aclose())
140 except Exception:
141 logger.warning("Failed to close Ollama async httpx client")
142 # Mark closed unconditionally — matches the sync-side
143 # invariant: on a known-broken close we don't want to
144 # retry endlessly. The WARNING above is the signal.
145 async_httpx._ldr_closed = True
146 else:
147 # A loop is running in this thread. ``asyncio.run`` cannot
148 # be called here and fire-and-forget tasks scheduled on the
149 # caller's loop from a finally block are unreliable (the
150 # loop may exit before the task is awaited). Hand the close
151 # off to a brief daemon thread whose own loop is
152 # independent of ours; cap with a bounded ``join`` so a
153 # stuck server can't hold up shutdown.
155 def _close_in_thread() -> None:
156 try:
157 asyncio.run(async_httpx.aclose())
158 except Exception:
159 logger.warning(
160 "Failed to close Ollama async httpx client "
161 "in cleanup thread"
162 )
164 t = threading.Thread(
165 target=_close_in_thread,
166 daemon=True,
167 name="ldr-async-llm-close",
168 )
169 t.start()
170 t.join(timeout=5)
171 if t.is_alive():
172 # Don't set ``_ldr_closed`` — the cleanup thread is
173 # still running and the FD is therefore still open.
174 # Surface at WARNING so operators can correlate
175 # against rising eventpoll-FD counts. A subsequent
176 # call to _close_base_llm will retry the close.
177 logger.warning(
178 "Async httpx close exceeded 5s; abandoning to GC. "
179 "If this fires repeatedly, check Ollama server "
180 "responsiveness and look for rising "
181 "anon_inode:[eventpoll] FDs on the process."
182 )
183 else:
184 # Thread completed (with or without an inner
185 # exception). Mark closed to match the sync-side
186 # invariant; the inner exception, if any, was
187 # already logged from inside the thread.
188 async_httpx._ldr_closed = True
191def _close_inner_ollama_clients(sync_client, async_client):
192 """Close just the inner sync/async ``ollama.Client`` pair.
194 A ``weakref.finalize`` callback that strong-refs the wrapping LLM or
195 embeddings instance would defeat its own purpose — the registry's
196 reference would keep the instance alive forever. Callers (the Ollama
197 provider factory's safety net) pass the inner clients directly
198 instead. This shim wraps them in a ``_close_base_llm``-shaped proxy
199 so we reuse the same idempotent sync+async close logic without
200 duplicating its asyncio/eventpoll handling.
201 """
203 class _Proxy:
204 pass
206 proxy = _Proxy()
207 proxy._client = sync_client
208 proxy._async_client = async_client
209 _close_base_llm(proxy)
212def get_ollama_base_url(
213 settings_snapshot: Optional[Dict[str, Any]] = None,
214) -> str:
215 """
216 Get Ollama base URL from settings with normalization.
218 Checks both embeddings.ollama.url and llm.ollama.url settings,
219 falling back to http://localhost:11434.
221 Args:
222 settings_snapshot: Optional settings snapshot
224 Returns:
225 Normalized Ollama base URL
226 """
227 from .url_utils import normalize_url
229 raw_base_url = get_setting_from_snapshot(
230 "embeddings.ollama.url",
231 default=get_setting_from_snapshot(
232 "llm.ollama.url", # Fall back to LLM setting
233 default=DEFAULT_OLLAMA_URL,
234 settings_snapshot=settings_snapshot,
235 ),
236 settings_snapshot=settings_snapshot,
237 )
238 return normalize_url(raw_base_url) if raw_base_url else DEFAULT_OLLAMA_URL
241def get_server_url(settings_snapshot: Optional[Dict[str, Any]] = None) -> str:
242 """
243 Get server URL from settings with fallback logic.
245 Checks multiple sources in order:
246 1. Direct server_url in settings snapshot
247 2. system.server_url in settings
248 3. Constructs from web.host, web.port, and web.use_https
249 4. Fallback to http://127.0.0.1:5000/
251 Args:
252 settings_snapshot: Optional settings snapshot
254 Returns:
255 Server URL with trailing slash
256 """
258 server_url = None
260 if settings_snapshot:
261 # Try to get server URL from research metadata first (where we added it)
262 server_url = settings_snapshot.get("server_url")
264 # If not found, try system settings
265 if not server_url:
266 system_settings = settings_snapshot.get("system", {})
267 server_url = system_settings.get("server_url")
269 # If not found, try web.host and web.port settings
270 if not server_url:
271 host = get_setting_from_snapshot(
272 "web.host", settings_snapshot, "127.0.0.1"
273 )
274 port = get_setting_from_snapshot(
275 "web.port", settings_snapshot, 5000
276 )
277 use_https = get_setting_from_snapshot(
278 "web.use_https", settings_snapshot, True
279 )
281 # Use localhost for 0.0.0.0 bindings as that's what users will use
282 if host == "0.0.0.0":
283 host = "127.0.0.1"
285 scheme = "https" if use_https else "http"
286 server_url = f"{scheme}://{host}:{port}/"
288 # Fallback to default if still not found
289 if not server_url:
290 server_url = "http://127.0.0.1:5000/"
291 logger.warning("Could not determine server URL, using default")
293 return server_url
296def fetch_ollama_models(
297 base_url: str,
298 timeout: float = 3.0,
299 auth_headers: Optional[Dict[str, str]] = None,
300) -> list[Dict[str, str]]:
301 """
302 Fetch available models from Ollama API.
304 Centralized function to avoid duplication between LLM and embedding providers.
306 Args:
307 base_url: Ollama base URL (should be normalized)
308 timeout: Request timeout in seconds
309 auth_headers: Optional authentication headers
311 Returns:
312 List of model dicts with 'value' (model name) and 'label' (display name) keys.
313 Returns empty list on error.
314 """
315 from ..security import safe_get
317 models = []
319 try:
320 response = safe_get(
321 f"{base_url}/api/tags",
322 timeout=timeout,
323 headers=auth_headers or {},
324 allow_localhost=True,
325 allow_private_ips=True,
326 )
328 if response.status_code == 200:
329 data = response.json()
331 # Handle both newer and older Ollama API formats
332 ollama_models = (
333 data.get("models", []) if isinstance(data, dict) else data
334 )
336 for model_data in ollama_models:
337 model_name = model_data.get("name", "")
338 if model_name:
339 models.append({"value": model_name, "label": model_name})
341 logger.info(f"Found {len(models)} Ollama models")
342 else:
343 logger.warning(
344 f"Failed to fetch Ollama models: HTTP {response.status_code}"
345 )
347 except Exception:
348 logger.exception("Error fetching Ollama models")
350 return models