Coverage for src / local_deep_research / web / api.py: 57%

164 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2REST API for Local Deep Research. 

3Provides HTTP access to programmatic search and research capabilities. 

4""" 

5 

6import time 

7from functools import wraps 

8from typing import Dict, Any 

9 

10from flask import Blueprint, jsonify, request, Response 

11from loguru import logger 

12 

13from ..api.research_functions import analyze_documents 

14from ..constants import DEFAULT_RATE_LIMIT, RATE_LIMIT_WINDOW_SECONDS 

15from ..database.session_context import get_user_db_session 

16from ..utilities.db_utils import get_settings_manager 

17 

18# Create a blueprint for the API 

19api_blueprint = Blueprint("api_v1", __name__, url_prefix="/api/v1") 

20 

21# Rate limiting data store: {ip_address: [timestamp1, timestamp2, ...]} 

22rate_limit_data = {} 

23 

24 

25def api_access_control(f): 

26 """ 

27 Decorator to enforce API access control: 

28 - Check if API is enabled 

29 - Enforce rate limiting 

30 """ 

31 

32 @wraps(f) 

33 def decorated_function(*args, **kwargs): 

34 # Get username from session 

35 from flask import g, session 

36 

37 username = ( 

38 g.current_user 

39 if hasattr(g, "current_user") 

40 else session.get("username") 

41 ) 

42 

43 # Check if API is enabled 

44 api_enabled = True # Default to enabled 

45 rate_limit = DEFAULT_RATE_LIMIT 

46 

47 # Only try to get settings if there's an authenticated user 

48 if username: 

49 with get_user_db_session(username) as db_session: 

50 if db_session: 50 ↛ 61line 50 didn't jump to line 61

51 settings_manager = get_settings_manager( 

52 db_session, username 

53 ) 

54 api_enabled = settings_manager.get_setting( 

55 "app.enable_api", True 

56 ) 

57 rate_limit = settings_manager.get_setting( 

58 "app.api_rate_limit", 60 

59 ) 

60 

61 if not api_enabled: 

62 return jsonify({"error": "API access is disabled"}), 403 

63 

64 # Implement rate limiting 

65 if rate_limit: 65 ↛ 94line 65 didn't jump to line 94 because the condition on line 65 was always true

66 client_ip = request.remote_addr 

67 current_time = time.time() 

68 

69 # Initialize or clean up old requests for this IP 

70 if client_ip not in rate_limit_data: 

71 rate_limit_data[client_ip] = [] 

72 

73 # Remove timestamps older than the rate limit window 

74 rate_limit_data[client_ip] = [ 

75 ts 

76 for ts in rate_limit_data[client_ip] 

77 if current_time - ts < RATE_LIMIT_WINDOW_SECONDS 

78 ] 

79 

80 # Check if rate limit is exceeded 

81 if len(rate_limit_data[client_ip]) >= rate_limit: 

82 return ( 

83 jsonify( 

84 { 

85 "error": f"Rate limit exceeded. Maximum {rate_limit} requests per {RATE_LIMIT_WINDOW_SECONDS} seconds allowed." 

86 } 

87 ), 

88 429, 

89 ) 

90 

91 # Add current timestamp to the list 

92 rate_limit_data[client_ip].append(current_time) 

93 

94 return f(*args, **kwargs) 

95 

96 return decorated_function 

97 

98 

99@api_blueprint.route("/", methods=["GET"]) 

100@api_access_control 

101def api_documentation(): 

102 """ 

103 Provide documentation on the available API endpoints. 

104 """ 

105 api_docs = { 

106 "api_version": "v1", 

107 "description": "REST API for Local Deep Research", 

108 "endpoints": [ 

109 { 

110 "path": "/api/v1/quick_summary", 

111 "method": "POST", 

112 "description": "Generate a quick research summary", 

113 "parameters": { 

114 "query": "Research query (required)", 

115 "search_tool": "Search engine to use (optional)", 

116 "iterations": "Number of search iterations (optional)", 

117 "temperature": "LLM temperature (optional)", 

118 }, 

119 }, 

120 { 

121 "path": "/api/v1/generate_report", 

122 "method": "POST", 

123 "description": "Generate a comprehensive research report", 

124 "parameters": { 

125 "query": "Research query (required)", 

126 "output_file": "Path to save report (optional)", 

127 "searches_per_section": "Searches per report section (optional)", 

128 "model_name": "LLM model to use (optional)", 

129 "temperature": "LLM temperature (optional)", 

130 }, 

131 }, 

132 { 

133 "path": "/api/v1/analyze_documents", 

134 "method": "POST", 

135 "description": "Search and analyze documents in a local collection", 

136 "parameters": { 

137 "query": "Search query (required)", 

138 "collection_name": "Local collection name (required)", 

139 "max_results": "Maximum results to return (optional)", 

140 "temperature": "LLM temperature (optional)", 

141 "force_reindex": "Force collection reindexing (optional)", 

142 }, 

143 }, 

144 ], 

145 } 

146 

147 return jsonify(api_docs) 

148 

149 

150@api_blueprint.route("/health", methods=["GET"]) 

151def health_check(): 

152 """Simple health check endpoint.""" 

153 return jsonify( 

154 {"status": "ok", "message": "API is running", "timestamp": time.time()} 

155 ) 

156 

157 

158@api_blueprint.route("/quick_summary_test", methods=["POST"]) 

159@api_access_control 

160def api_quick_summary_test(): 

161 """Test endpoint using programmatic access with minimal parameters for fast testing.""" 

162 data = request.json 

163 if not data or "query" not in data: 163 ↛ 166line 163 didn't jump to line 166 because the condition on line 163 was always true

164 return jsonify({"error": "Query parameter is required"}), 400 

165 

166 query = data.get("query") 

167 

168 try: 

169 # Import here to avoid circular imports 

170 from ..api.research_functions import quick_summary 

171 

172 logger.info(f"Processing quick_summary_test request: query='{query}'") 

173 

174 # Use minimal parameters for faster testing 

175 result = quick_summary( 

176 query=query, 

177 search_tool="wikipedia", # Use fast Wikipedia search for testing 

178 iterations=1, # Single iteration for speed 

179 temperature=0.7, 

180 ) 

181 

182 return jsonify(result) 

183 except Exception: 

184 logger.exception("Error in quick_summary_test API") 

185 return ( 

186 jsonify( 

187 { 

188 "error": "An internal error has occurred. Please try again later." 

189 } 

190 ), 

191 500, 

192 ) 

193 

194 

195def _serialize_results(results: Dict[str, Any]) -> Response: 

196 """ 

197 Converts the results dictionary into a JSON string. 

198 

199 Args: 

200 results: The results dictionary. 

201 

202 Returns: 

203 The JSON string. 

204 

205 """ 

206 # The main thing that needs to be handled here is the `Document` instances. 

207 converted_results = results.copy() 

208 for finding in converted_results.get("findings", []): 

209 for i, document in enumerate(finding.get("documents", [])): 

210 finding["documents"][i] = { 

211 "metadata": document.metadata, 

212 "content": document.page_content, 

213 } 

214 

215 return jsonify(converted_results) 

216 

217 

218@api_blueprint.route("/quick_summary", methods=["POST"]) 

219@api_access_control 

220def api_quick_summary(): 

221 """ 

222 Generate a quick research summary via REST API. 

223 

224 POST /api/v1/quick_summary 

225 { 

226 "query": "Advances in fusion energy research", 

227 "search_tool": "auto", # Optional: search engine to use 

228 "iterations": 2, # Optional: number of search iterations 

229 "temperature": 0.7 # Optional: LLM temperature 

230 } 

231 """ 

232 logger.debug("API quick_summary endpoint called") 

233 data = request.json 

234 logger.debug(f"Request data keys: {list(data.keys()) if data else 'None'}") 

235 

236 if not data or "query" not in data: 

237 logger.debug("Missing query parameter") 

238 return jsonify({"error": "Query parameter is required"}), 400 

239 

240 # Extract query and validate type 

241 query = data.get("query") 

242 if not isinstance(query, str): 

243 return jsonify({"error": "Query must be a string"}), 400 

244 params = {k: v for k, v in data.items() if k != "query"} 

245 logger.debug( 

246 f"Query length: {len(query) if query else 0}, params keys: {list(params.keys()) if params else 'None'}" 

247 ) 

248 

249 # Get username from session or g object 

250 from flask import g, session 

251 

252 username = ( 

253 g.current_user 

254 if hasattr(g, "current_user") 

255 else session.get("username") 

256 ) 

257 if username: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true

258 params["username"] = username 

259 

260 try: 

261 # Import here to avoid circular imports 

262 from ..api.research_functions import quick_summary 

263 from ..database.session_context import get_user_db_session 

264 from ..utilities.db_utils import get_settings_manager 

265 

266 logger.info( 

267 f"Processing quick_summary request: query='{query}' for user='{username}'" 

268 ) 

269 

270 # Set reasonable defaults for API use 

271 params.setdefault("temperature", 0.7) 

272 params.setdefault("search_tool", "auto") 

273 params.setdefault("iterations", 1) 

274 

275 # Get settings snapshot for the user 

276 if username: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 try: 

278 logger.debug(f"Getting settings snapshot for user: {username}") 

279 with get_user_db_session(username) as db_session: 

280 if db_session: 

281 try: 

282 settings_manager = get_settings_manager( 

283 db_session, username 

284 ) 

285 all_settings = settings_manager.get_all_settings() 

286 # Extract just the values for the settings snapshot 

287 settings_snapshot = {} 

288 for key, setting in all_settings.items(): 

289 if ( 

290 isinstance(setting, dict) 

291 and "value" in setting 

292 ): 

293 settings_snapshot[key] = setting["value"] 

294 else: 

295 settings_snapshot[key] = setting 

296 params["settings_snapshot"] = settings_snapshot 

297 logger.debug( 

298 f"Got settings snapshot with {len(settings_snapshot)} settings" 

299 ) 

300 except AttributeError as ae: 

301 logger.exception( 

302 f"SettingsManager attribute error: {ae}. " 

303 f"Type: {type(settings_manager) if 'settings_manager' in locals() else 'Unknown'}" 

304 ) 

305 raise 

306 else: 

307 logger.warning( 

308 f"No database session for user: {username}" 

309 ) 

310 except Exception as e: 

311 logger.warning( 

312 f"Failed to get settings snapshot: {e}", exc_info=True 

313 ) 

314 # Continue with empty snapshot rather than failing 

315 params["settings_snapshot"] = {} 

316 else: 

317 logger.debug("No username in session, skipping settings snapshot") 

318 params["settings_snapshot"] = {} 

319 

320 # Call the actual research function 

321 result = quick_summary(query, **params) 

322 

323 return _serialize_results(result) 

324 except TimeoutError: 

325 logger.exception("Request timed out") 

326 return ( 

327 jsonify( 

328 { 

329 "error": "Request timed out. Please try with a simpler query or fewer iterations." 

330 } 

331 ), 

332 504, 

333 ) 

334 except Exception: 

335 logger.exception("Error in quick_summary API") 

336 return ( 

337 jsonify( 

338 { 

339 "error": "An internal error has occurred. Please try again later." 

340 } 

341 ), 

342 500, 

343 ) 

344 

345 

346@api_blueprint.route("/generate_report", methods=["POST"]) 

347@api_access_control 

348def api_generate_report(): 

349 """ 

350 Generate a comprehensive research report via REST API. 

351 

352 POST /api/v1/generate_report 

353 { 

354 "query": "Impact of climate change on agriculture", 

355 "output_file": "/path/to/save/report.md", # Optional 

356 "searches_per_section": 2, # Optional 

357 "model_name": "gpt-4", # Optional 

358 "temperature": 0.5 # Optional 

359 } 

360 """ 

361 data = request.json 

362 if not data or "query" not in data: 362 ↛ 365line 362 didn't jump to line 365 because the condition on line 362 was always true

363 return jsonify({"error": "Query parameter is required"}), 400 

364 

365 query = data.get("query") 

366 params = {k: v for k, v in data.items() if k != "query"} 

367 

368 try: 

369 # Import here to avoid circular imports 

370 from ..api.research_functions import generate_report 

371 

372 # Set reasonable defaults for API use 

373 params.setdefault("searches_per_section", 1) 

374 params.setdefault("temperature", 0.7) 

375 

376 logger.info( 

377 f"Processing generate_report request: query='{query}', params={params}" 

378 ) 

379 

380 result = generate_report(query, **params) 

381 

382 # Don't return the full content for large reports 

383 if ( 

384 result 

385 and "content" in result 

386 and isinstance(result["content"], str) 

387 and len(result["content"]) > 10000 

388 ): 

389 # Include a summary of the report content 

390 content_preview = ( 

391 result["content"][:2000] + "... [Content truncated]" 

392 ) 

393 result["content"] = content_preview 

394 result["content_truncated"] = True 

395 

396 return jsonify(result) 

397 except TimeoutError: 

398 logger.exception("Request timed out") 

399 return ( 

400 jsonify( 

401 {"error": "Request timed out. Please try with a simpler query."} 

402 ), 

403 504, 

404 ) 

405 except Exception: 

406 logger.exception("Error in generate_report API") 

407 return ( 

408 jsonify( 

409 { 

410 "error": "An internal error has occurred. Please try again later." 

411 } 

412 ), 

413 500, 

414 ) 

415 

416 

417@api_blueprint.route("/analyze_documents", methods=["POST"]) 

418@api_access_control 

419def api_analyze_documents(): 

420 """ 

421 Search and analyze documents in a local collection via REST API. 

422 

423 POST /api/v1/analyze_documents 

424 { 

425 "query": "neural networks in medicine", 

426 "collection_name": "research_papers", # Required: local collection name 

427 "max_results": 20, # Optional: max results to return 

428 "temperature": 0.7, # Optional: LLM temperature 

429 "force_reindex": false # Optional: force reindexing 

430 } 

431 """ 

432 data = request.json 

433 if not data or "query" not in data or "collection_name" not in data: 433 ↛ 443line 433 didn't jump to line 443 because the condition on line 433 was always true

434 return ( 

435 jsonify( 

436 { 

437 "error": "Both query and collection_name parameters are required" 

438 } 

439 ), 

440 400, 

441 ) 

442 

443 query = data.get("query") 

444 collection_name = data.get("collection_name") 

445 params = { 

446 k: v for k, v in data.items() if k not in ["query", "collection_name"] 

447 } 

448 

449 try: 

450 result = analyze_documents(query, collection_name, **params) 

451 return jsonify(result) 

452 except Exception: 

453 logger.exception("Error in analyze_documents API") 

454 return ( 

455 jsonify( 

456 { 

457 "error": "An internal error has occurred. Please try again later." 

458 } 

459 ), 

460 500, 

461 )