Coverage for src / local_deep_research / web / api.py: 52%

161 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2REST API for Local Deep Research. 

3Provides HTTP access to programmatic search and research capabilities. 

4""" 

5 

6import time 

7from functools import wraps 

8from typing import Dict, Any 

9 

10from flask import Blueprint, jsonify, request, Response 

11from loguru import logger 

12 

13from ..api.research_functions import analyze_documents 

14from ..database.session_context import get_user_db_session 

15from ..utilities.db_utils import get_settings_manager 

16 

17# Create a blueprint for the API 

18api_blueprint = Blueprint("api_v1", __name__, url_prefix="/api/v1") 

19 

20# Rate limiting data store: {ip_address: [timestamp1, timestamp2, ...]} 

21rate_limit_data = {} 

22 

23 

24def api_access_control(f): 

25 """ 

26 Decorator to enforce API access control: 

27 - Check if API is enabled 

28 - Enforce rate limiting 

29 """ 

30 

31 @wraps(f) 

32 def decorated_function(*args, **kwargs): 

33 # Get username from session 

34 from flask import g, session 

35 

36 username = ( 

37 g.current_user 

38 if hasattr(g, "current_user") 

39 else session.get("username") 

40 ) 

41 

42 # Check if API is enabled 

43 api_enabled = True # Default to enabled 

44 rate_limit = 60 # Default 60 requests per minute 

45 

46 # Only try to get settings if there's an authenticated user 

47 if username: 

48 with get_user_db_session(username) as db_session: 

49 if db_session: 49 ↛ 60line 49 didn't jump to line 60

50 settings_manager = get_settings_manager( 

51 db_session, username 

52 ) 

53 api_enabled = settings_manager.get_setting( 

54 "app.enable_api", True 

55 ) 

56 rate_limit = settings_manager.get_setting( 

57 "app.api_rate_limit", 60 

58 ) 

59 

60 if not api_enabled: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true

61 return jsonify({"error": "API access is disabled"}), 403 

62 

63 # Implement rate limiting 

64 if rate_limit: 64 ↛ 93line 64 didn't jump to line 93 because the condition on line 64 was always true

65 client_ip = request.remote_addr 

66 current_time = time.time() 

67 

68 # Initialize or clean up old requests for this IP 

69 if client_ip not in rate_limit_data: 

70 rate_limit_data[client_ip] = [] 

71 

72 # Remove timestamps older than 1 minute 

73 rate_limit_data[client_ip] = [ 

74 ts 

75 for ts in rate_limit_data[client_ip] 

76 if current_time - ts < 60 

77 ] 

78 

79 # Check if rate limit is exceeded 

80 if len(rate_limit_data[client_ip]) >= rate_limit: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true

81 return ( 

82 jsonify( 

83 { 

84 "error": f"Rate limit exceeded. Maximum {rate_limit} requests per minute allowed." 

85 } 

86 ), 

87 429, 

88 ) 

89 

90 # Add current timestamp to the list 

91 rate_limit_data[client_ip].append(current_time) 

92 

93 return f(*args, **kwargs) 

94 

95 return decorated_function 

96 

97 

98@api_blueprint.route("/", methods=["GET"]) 

99@api_access_control 

100def api_documentation(): 

101 """ 

102 Provide documentation on the available API endpoints. 

103 """ 

104 api_docs = { 

105 "api_version": "v1", 

106 "description": "REST API for Local Deep Research", 

107 "endpoints": [ 

108 { 

109 "path": "/api/v1/quick_summary", 

110 "method": "POST", 

111 "description": "Generate a quick research summary", 

112 "parameters": { 

113 "query": "Research query (required)", 

114 "search_tool": "Search engine to use (optional)", 

115 "iterations": "Number of search iterations (optional)", 

116 "temperature": "LLM temperature (optional)", 

117 }, 

118 }, 

119 { 

120 "path": "/api/v1/generate_report", 

121 "method": "POST", 

122 "description": "Generate a comprehensive research report", 

123 "parameters": { 

124 "query": "Research query (required)", 

125 "output_file": "Path to save report (optional)", 

126 "searches_per_section": "Searches per report section (optional)", 

127 "model_name": "LLM model to use (optional)", 

128 "temperature": "LLM temperature (optional)", 

129 }, 

130 }, 

131 { 

132 "path": "/api/v1/analyze_documents", 

133 "method": "POST", 

134 "description": "Search and analyze documents in a local collection", 

135 "parameters": { 

136 "query": "Search query (required)", 

137 "collection_name": "Local collection name (required)", 

138 "max_results": "Maximum results to return (optional)", 

139 "temperature": "LLM temperature (optional)", 

140 "force_reindex": "Force collection reindexing (optional)", 

141 }, 

142 }, 

143 ], 

144 } 

145 

146 return jsonify(api_docs) 

147 

148 

149@api_blueprint.route("/health", methods=["GET"]) 

150def health_check(): 

151 """Simple health check endpoint.""" 

152 return jsonify( 

153 {"status": "ok", "message": "API is running", "timestamp": time.time()} 

154 ) 

155 

156 

157@api_blueprint.route("/quick_summary_test", methods=["POST"]) 

158@api_access_control 

159def api_quick_summary_test(): 

160 """Test endpoint using programmatic access with minimal parameters for fast testing.""" 

161 data = request.json 

162 if not data or "query" not in data: 

163 return jsonify({"error": "Query parameter is required"}), 400 

164 

165 query = data.get("query") 

166 

167 try: 

168 # Import here to avoid circular imports 

169 from ..api.research_functions import quick_summary 

170 

171 logger.info(f"Processing quick_summary_test request: query='{query}'") 

172 

173 # Use minimal parameters for faster testing 

174 result = quick_summary( 

175 query=query, 

176 search_tool="wikipedia", # Use fast Wikipedia search for testing 

177 iterations=1, # Single iteration for speed 

178 temperature=0.7, 

179 ) 

180 

181 return jsonify(result) 

182 except Exception as e: 

183 logger.exception(f"Error in quick_summary_test API: {e!s}") 

184 return ( 

185 jsonify( 

186 { 

187 "error": "An internal error has occurred. Please try again later." 

188 } 

189 ), 

190 500, 

191 ) 

192 

193 

194def _serialize_results(results: Dict[str, Any]) -> Response: 

195 """ 

196 Converts the results dictionary into a JSON string. 

197 

198 Args: 

199 results: The results dictionary. 

200 

201 Returns: 

202 The JSON string. 

203 

204 """ 

205 # The main thing that needs to be handled here is the `Document` instances. 

206 converted_results = results.copy() 

207 for finding in converted_results.get("findings", []): 

208 for i, document in enumerate(finding.get("documents", [])): 

209 finding["documents"][i] = { 

210 "metadata": document.metadata, 

211 "content": document.page_content, 

212 } 

213 

214 return jsonify(converted_results) 

215 

216 

217@api_blueprint.route("/quick_summary", methods=["POST"]) 

218@api_access_control 

219def api_quick_summary(): 

220 """ 

221 Generate a quick research summary via REST API. 

222 

223 POST /api/v1/quick_summary 

224 { 

225 "query": "Advances in fusion energy research", 

226 "search_tool": "auto", # Optional: search engine to use 

227 "iterations": 2, # Optional: number of search iterations 

228 "temperature": 0.7 # Optional: LLM temperature 

229 } 

230 """ 

231 logger.debug("API quick_summary endpoint called") 

232 data = request.json 

233 logger.debug(f"Request data: {data}") 

234 

235 if not data or "query" not in data: 

236 logger.debug("Missing query parameter") 

237 return jsonify({"error": "Query parameter is required"}), 400 

238 

239 # Extract query and optional parameters 

240 query = data.get("query") 

241 params = {k: v for k, v in data.items() if k != "query"} 

242 logger.debug(f"Query: {query}, params: {params}") 

243 

244 # Get username from session or g object 

245 from flask import g, session 

246 

247 username = ( 

248 g.current_user 

249 if hasattr(g, "current_user") 

250 else session.get("username") 

251 ) 

252 if username: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 params["username"] = username 

254 

255 try: 

256 # Import here to avoid circular imports 

257 from ..api.research_functions import quick_summary 

258 from ..database.session_context import get_user_db_session 

259 from ..utilities.db_utils import get_settings_manager 

260 

261 logger.info( 

262 f"Processing quick_summary request: query='{query}' for user='{username}'" 

263 ) 

264 

265 # Set reasonable defaults for API use 

266 params.setdefault("temperature", 0.7) 

267 params.setdefault("search_tool", "auto") 

268 params.setdefault("iterations", 1) 

269 

270 # Get settings snapshot for the user 

271 if username: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 try: 

273 logger.debug(f"Getting settings snapshot for user: {username}") 

274 with get_user_db_session(username) as db_session: 

275 if db_session: 

276 try: 

277 settings_manager = get_settings_manager( 

278 db_session, username 

279 ) 

280 all_settings = settings_manager.get_all_settings() 

281 # Extract just the values for the settings snapshot 

282 settings_snapshot = {} 

283 for key, setting in all_settings.items(): 

284 if ( 

285 isinstance(setting, dict) 

286 and "value" in setting 

287 ): 

288 settings_snapshot[key] = setting["value"] 

289 else: 

290 settings_snapshot[key] = setting 

291 params["settings_snapshot"] = settings_snapshot 

292 logger.debug( 

293 f"Got settings snapshot with {len(settings_snapshot)} settings" 

294 ) 

295 except AttributeError as ae: 

296 logger.exception( 

297 f"SettingsManager attribute error: {ae}. " 

298 f"Type: {type(settings_manager) if 'settings_manager' in locals() else 'Unknown'}" 

299 ) 

300 raise 

301 else: 

302 logger.warning( 

303 f"No database session for user: {username}" 

304 ) 

305 except Exception as e: 

306 logger.warning( 

307 f"Failed to get settings snapshot: {e}", exc_info=True 

308 ) 

309 # Continue with empty snapshot rather than failing 

310 params["settings_snapshot"] = {} 

311 else: 

312 logger.debug("No username in session, skipping settings snapshot") 

313 params["settings_snapshot"] = {} 

314 

315 # Call the actual research function 

316 result = quick_summary(query, **params) 

317 

318 return _serialize_results(result) 

319 except TimeoutError: 

320 logger.exception("Request timed out") 

321 return ( 

322 jsonify( 

323 { 

324 "error": "Request timed out. Please try with a simpler query or fewer iterations." 

325 } 

326 ), 

327 504, 

328 ) 

329 except Exception: 

330 logger.exception("Error in quick_summary API") 

331 return ( 

332 jsonify( 

333 { 

334 "error": "An internal error has occurred. Please try again later." 

335 } 

336 ), 

337 500, 

338 ) 

339 

340 

341@api_blueprint.route("/generate_report", methods=["POST"]) 

342@api_access_control 

343def api_generate_report(): 

344 """ 

345 Generate a comprehensive research report via REST API. 

346 

347 POST /api/v1/generate_report 

348 { 

349 "query": "Impact of climate change on agriculture", 

350 "output_file": "/path/to/save/report.md", # Optional 

351 "searches_per_section": 2, # Optional 

352 "model_name": "gpt-4", # Optional 

353 "temperature": 0.5 # Optional 

354 } 

355 """ 

356 data = request.json 

357 if not data or "query" not in data: 357 ↛ 360line 357 didn't jump to line 360 because the condition on line 357 was always true

358 return jsonify({"error": "Query parameter is required"}), 400 

359 

360 query = data.get("query") 

361 params = {k: v for k, v in data.items() if k != "query"} 

362 

363 try: 

364 # Import here to avoid circular imports 

365 from ..api.research_functions import generate_report 

366 

367 # Set reasonable defaults for API use 

368 params.setdefault("searches_per_section", 1) 

369 params.setdefault("temperature", 0.7) 

370 

371 logger.info( 

372 f"Processing generate_report request: query='{query}', params={params}" 

373 ) 

374 

375 result = generate_report(query, **params) 

376 

377 # Don't return the full content for large reports 

378 if ( 

379 result 

380 and "content" in result 

381 and isinstance(result["content"], str) 

382 and len(result["content"]) > 10000 

383 ): 

384 # Include a summary of the report content 

385 content_preview = ( 

386 result["content"][:2000] + "... [Content truncated]" 

387 ) 

388 result["content"] = content_preview 

389 result["content_truncated"] = True 

390 

391 return jsonify(result) 

392 except TimeoutError: 

393 logger.exception("Request timed out") 

394 return ( 

395 jsonify( 

396 {"error": "Request timed out. Please try with a simpler query."} 

397 ), 

398 504, 

399 ) 

400 except Exception as e: 

401 logger.exception(f"Error in generate_report API: {e!s}") 

402 return ( 

403 jsonify( 

404 { 

405 "error": "An internal error has occurred. Please try again later." 

406 } 

407 ), 

408 500, 

409 ) 

410 

411 

412@api_blueprint.route("/analyze_documents", methods=["POST"]) 

413@api_access_control 

414def api_analyze_documents(): 

415 """ 

416 Search and analyze documents in a local collection via REST API. 

417 

418 POST /api/v1/analyze_documents 

419 { 

420 "query": "neural networks in medicine", 

421 "collection_name": "research_papers", # Required: local collection name 

422 "max_results": 20, # Optional: max results to return 

423 "temperature": 0.7, # Optional: LLM temperature 

424 "force_reindex": false # Optional: force reindexing 

425 } 

426 """ 

427 data = request.json 

428 if not data or "query" not in data or "collection_name" not in data: 428 ↛ 438line 428 didn't jump to line 438 because the condition on line 428 was always true

429 return ( 

430 jsonify( 

431 { 

432 "error": "Both query and collection_name parameters are required" 

433 } 

434 ), 

435 400, 

436 ) 

437 

438 query = data.get("query") 

439 collection_name = data.get("collection_name") 

440 params = { 

441 k: v for k, v in data.items() if k not in ["query", "collection_name"] 

442 } 

443 

444 try: 

445 result = analyze_documents(query, collection_name, **params) 

446 return jsonify(result) 

447 except Exception as e: 

448 logger.exception(f"Error in analyze_documents API: {e!s}") 

449 return ( 

450 jsonify( 

451 { 

452 "error": "An internal error has occurred. Please try again later." 

453 } 

454 ), 

455 500, 

456 )