Coverage for src / local_deep_research / web / api.py: 57%
164 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2REST API for Local Deep Research.
3Provides HTTP access to programmatic search and research capabilities.
4"""
6import time
7from functools import wraps
8from typing import Dict, Any
10from flask import Blueprint, jsonify, request, Response
11from loguru import logger
13from ..api.research_functions import analyze_documents
14from ..constants import DEFAULT_RATE_LIMIT, RATE_LIMIT_WINDOW_SECONDS
15from ..database.session_context import get_user_db_session
16from ..utilities.db_utils import get_settings_manager
18# Create a blueprint for the API
19api_blueprint = Blueprint("api_v1", __name__, url_prefix="/api/v1")
21# Rate limiting data store: {ip_address: [timestamp1, timestamp2, ...]}
22rate_limit_data = {}
25def api_access_control(f):
26 """
27 Decorator to enforce API access control:
28 - Check if API is enabled
29 - Enforce rate limiting
30 """
32 @wraps(f)
33 def decorated_function(*args, **kwargs):
34 # Get username from session
35 from flask import g, session
37 username = (
38 g.current_user
39 if hasattr(g, "current_user")
40 else session.get("username")
41 )
43 # Check if API is enabled
44 api_enabled = True # Default to enabled
45 rate_limit = DEFAULT_RATE_LIMIT
47 # Only try to get settings if there's an authenticated user
48 if username:
49 with get_user_db_session(username) as db_session:
50 if db_session: 50 ↛ 61line 50 didn't jump to line 61
51 settings_manager = get_settings_manager(
52 db_session, username
53 )
54 api_enabled = settings_manager.get_setting(
55 "app.enable_api", True
56 )
57 rate_limit = settings_manager.get_setting(
58 "app.api_rate_limit", 60
59 )
61 if not api_enabled:
62 return jsonify({"error": "API access is disabled"}), 403
64 # Implement rate limiting
65 if rate_limit: 65 ↛ 94line 65 didn't jump to line 94 because the condition on line 65 was always true
66 client_ip = request.remote_addr
67 current_time = time.time()
69 # Initialize or clean up old requests for this IP
70 if client_ip not in rate_limit_data:
71 rate_limit_data[client_ip] = []
73 # Remove timestamps older than the rate limit window
74 rate_limit_data[client_ip] = [
75 ts
76 for ts in rate_limit_data[client_ip]
77 if current_time - ts < RATE_LIMIT_WINDOW_SECONDS
78 ]
80 # Check if rate limit is exceeded
81 if len(rate_limit_data[client_ip]) >= rate_limit:
82 return (
83 jsonify(
84 {
85 "error": f"Rate limit exceeded. Maximum {rate_limit} requests per {RATE_LIMIT_WINDOW_SECONDS} seconds allowed."
86 }
87 ),
88 429,
89 )
91 # Add current timestamp to the list
92 rate_limit_data[client_ip].append(current_time)
94 return f(*args, **kwargs)
96 return decorated_function
99@api_blueprint.route("/", methods=["GET"])
100@api_access_control
101def api_documentation():
102 """
103 Provide documentation on the available API endpoints.
104 """
105 api_docs = {
106 "api_version": "v1",
107 "description": "REST API for Local Deep Research",
108 "endpoints": [
109 {
110 "path": "/api/v1/quick_summary",
111 "method": "POST",
112 "description": "Generate a quick research summary",
113 "parameters": {
114 "query": "Research query (required)",
115 "search_tool": "Search engine to use (optional)",
116 "iterations": "Number of search iterations (optional)",
117 "temperature": "LLM temperature (optional)",
118 },
119 },
120 {
121 "path": "/api/v1/generate_report",
122 "method": "POST",
123 "description": "Generate a comprehensive research report",
124 "parameters": {
125 "query": "Research query (required)",
126 "output_file": "Path to save report (optional)",
127 "searches_per_section": "Searches per report section (optional)",
128 "model_name": "LLM model to use (optional)",
129 "temperature": "LLM temperature (optional)",
130 },
131 },
132 {
133 "path": "/api/v1/analyze_documents",
134 "method": "POST",
135 "description": "Search and analyze documents in a local collection",
136 "parameters": {
137 "query": "Search query (required)",
138 "collection_name": "Local collection name (required)",
139 "max_results": "Maximum results to return (optional)",
140 "temperature": "LLM temperature (optional)",
141 "force_reindex": "Force collection reindexing (optional)",
142 },
143 },
144 ],
145 }
147 return jsonify(api_docs)
150@api_blueprint.route("/health", methods=["GET"])
151def health_check():
152 """Simple health check endpoint."""
153 return jsonify(
154 {"status": "ok", "message": "API is running", "timestamp": time.time()}
155 )
158@api_blueprint.route("/quick_summary_test", methods=["POST"])
159@api_access_control
160def api_quick_summary_test():
161 """Test endpoint using programmatic access with minimal parameters for fast testing."""
162 data = request.json
163 if not data or "query" not in data: 163 ↛ 166line 163 didn't jump to line 166 because the condition on line 163 was always true
164 return jsonify({"error": "Query parameter is required"}), 400
166 query = data.get("query")
168 try:
169 # Import here to avoid circular imports
170 from ..api.research_functions import quick_summary
172 logger.info(f"Processing quick_summary_test request: query='{query}'")
174 # Use minimal parameters for faster testing
175 result = quick_summary(
176 query=query,
177 search_tool="wikipedia", # Use fast Wikipedia search for testing
178 iterations=1, # Single iteration for speed
179 temperature=0.7,
180 )
182 return jsonify(result)
183 except Exception:
184 logger.exception("Error in quick_summary_test API")
185 return (
186 jsonify(
187 {
188 "error": "An internal error has occurred. Please try again later."
189 }
190 ),
191 500,
192 )
195def _serialize_results(results: Dict[str, Any]) -> Response:
196 """
197 Converts the results dictionary into a JSON string.
199 Args:
200 results: The results dictionary.
202 Returns:
203 The JSON string.
205 """
206 # The main thing that needs to be handled here is the `Document` instances.
207 converted_results = results.copy()
208 for finding in converted_results.get("findings", []):
209 for i, document in enumerate(finding.get("documents", [])):
210 finding["documents"][i] = {
211 "metadata": document.metadata,
212 "content": document.page_content,
213 }
215 return jsonify(converted_results)
218@api_blueprint.route("/quick_summary", methods=["POST"])
219@api_access_control
220def api_quick_summary():
221 """
222 Generate a quick research summary via REST API.
224 POST /api/v1/quick_summary
225 {
226 "query": "Advances in fusion energy research",
227 "search_tool": "auto", # Optional: search engine to use
228 "iterations": 2, # Optional: number of search iterations
229 "temperature": 0.7 # Optional: LLM temperature
230 }
231 """
232 logger.debug("API quick_summary endpoint called")
233 data = request.json
234 logger.debug(f"Request data keys: {list(data.keys()) if data else 'None'}")
236 if not data or "query" not in data:
237 logger.debug("Missing query parameter")
238 return jsonify({"error": "Query parameter is required"}), 400
240 # Extract query and validate type
241 query = data.get("query")
242 if not isinstance(query, str):
243 return jsonify({"error": "Query must be a string"}), 400
244 params = {k: v for k, v in data.items() if k != "query"}
245 logger.debug(
246 f"Query length: {len(query) if query else 0}, params keys: {list(params.keys()) if params else 'None'}"
247 )
249 # Get username from session or g object
250 from flask import g, session
252 username = (
253 g.current_user
254 if hasattr(g, "current_user")
255 else session.get("username")
256 )
257 if username: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true
258 params["username"] = username
260 try:
261 # Import here to avoid circular imports
262 from ..api.research_functions import quick_summary
263 from ..database.session_context import get_user_db_session
264 from ..utilities.db_utils import get_settings_manager
266 logger.info(
267 f"Processing quick_summary request: query='{query}' for user='{username}'"
268 )
270 # Set reasonable defaults for API use
271 params.setdefault("temperature", 0.7)
272 params.setdefault("search_tool", "auto")
273 params.setdefault("iterations", 1)
275 # Get settings snapshot for the user
276 if username: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 try:
278 logger.debug(f"Getting settings snapshot for user: {username}")
279 with get_user_db_session(username) as db_session:
280 if db_session:
281 try:
282 settings_manager = get_settings_manager(
283 db_session, username
284 )
285 all_settings = settings_manager.get_all_settings()
286 # Extract just the values for the settings snapshot
287 settings_snapshot = {}
288 for key, setting in all_settings.items():
289 if (
290 isinstance(setting, dict)
291 and "value" in setting
292 ):
293 settings_snapshot[key] = setting["value"]
294 else:
295 settings_snapshot[key] = setting
296 params["settings_snapshot"] = settings_snapshot
297 logger.debug(
298 f"Got settings snapshot with {len(settings_snapshot)} settings"
299 )
300 except AttributeError as ae:
301 logger.exception(
302 f"SettingsManager attribute error: {ae}. "
303 f"Type: {type(settings_manager) if 'settings_manager' in locals() else 'Unknown'}"
304 )
305 raise
306 else:
307 logger.warning(
308 f"No database session for user: {username}"
309 )
310 except Exception as e:
311 logger.warning(
312 f"Failed to get settings snapshot: {e}", exc_info=True
313 )
314 # Continue with empty snapshot rather than failing
315 params["settings_snapshot"] = {}
316 else:
317 logger.debug("No username in session, skipping settings snapshot")
318 params["settings_snapshot"] = {}
320 # Call the actual research function
321 result = quick_summary(query, **params)
323 return _serialize_results(result)
324 except TimeoutError:
325 logger.exception("Request timed out")
326 return (
327 jsonify(
328 {
329 "error": "Request timed out. Please try with a simpler query or fewer iterations."
330 }
331 ),
332 504,
333 )
334 except Exception:
335 logger.exception("Error in quick_summary API")
336 return (
337 jsonify(
338 {
339 "error": "An internal error has occurred. Please try again later."
340 }
341 ),
342 500,
343 )
346@api_blueprint.route("/generate_report", methods=["POST"])
347@api_access_control
348def api_generate_report():
349 """
350 Generate a comprehensive research report via REST API.
352 POST /api/v1/generate_report
353 {
354 "query": "Impact of climate change on agriculture",
355 "output_file": "/path/to/save/report.md", # Optional
356 "searches_per_section": 2, # Optional
357 "model_name": "gpt-4", # Optional
358 "temperature": 0.5 # Optional
359 }
360 """
361 data = request.json
362 if not data or "query" not in data: 362 ↛ 365line 362 didn't jump to line 365 because the condition on line 362 was always true
363 return jsonify({"error": "Query parameter is required"}), 400
365 query = data.get("query")
366 params = {k: v for k, v in data.items() if k != "query"}
368 try:
369 # Import here to avoid circular imports
370 from ..api.research_functions import generate_report
372 # Set reasonable defaults for API use
373 params.setdefault("searches_per_section", 1)
374 params.setdefault("temperature", 0.7)
376 logger.info(
377 f"Processing generate_report request: query='{query}', params={params}"
378 )
380 result = generate_report(query, **params)
382 # Don't return the full content for large reports
383 if (
384 result
385 and "content" in result
386 and isinstance(result["content"], str)
387 and len(result["content"]) > 10000
388 ):
389 # Include a summary of the report content
390 content_preview = (
391 result["content"][:2000] + "... [Content truncated]"
392 )
393 result["content"] = content_preview
394 result["content_truncated"] = True
396 return jsonify(result)
397 except TimeoutError:
398 logger.exception("Request timed out")
399 return (
400 jsonify(
401 {"error": "Request timed out. Please try with a simpler query."}
402 ),
403 504,
404 )
405 except Exception:
406 logger.exception("Error in generate_report API")
407 return (
408 jsonify(
409 {
410 "error": "An internal error has occurred. Please try again later."
411 }
412 ),
413 500,
414 )
417@api_blueprint.route("/analyze_documents", methods=["POST"])
418@api_access_control
419def api_analyze_documents():
420 """
421 Search and analyze documents in a local collection via REST API.
423 POST /api/v1/analyze_documents
424 {
425 "query": "neural networks in medicine",
426 "collection_name": "research_papers", # Required: local collection name
427 "max_results": 20, # Optional: max results to return
428 "temperature": 0.7, # Optional: LLM temperature
429 "force_reindex": false # Optional: force reindexing
430 }
431 """
432 data = request.json
433 if not data or "query" not in data or "collection_name" not in data: 433 ↛ 443line 433 didn't jump to line 443 because the condition on line 433 was always true
434 return (
435 jsonify(
436 {
437 "error": "Both query and collection_name parameters are required"
438 }
439 ),
440 400,
441 )
443 query = data.get("query")
444 collection_name = data.get("collection_name")
445 params = {
446 k: v for k, v in data.items() if k not in ["query", "collection_name"]
447 }
449 try:
450 result = analyze_documents(query, collection_name, **params)
451 return jsonify(result)
452 except Exception:
453 logger.exception("Error in analyze_documents API")
454 return (
455 jsonify(
456 {
457 "error": "An internal error has occurred. Please try again later."
458 }
459 ),
460 500,
461 )