Coverage for src / local_deep_research / web / routes / research_routes_orm.py: 66%

180 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Research routes using SQLAlchemy ORM instead of raw SQL. 

3This is a converted version showing how to replace raw SQL with ORM queries. 

4""" 

5 

6import json 

7from datetime import datetime, UTC 

8from pathlib import Path 

9 

10from flask import ( 

11 Blueprint, 

12 jsonify, 

13 request, 

14) 

15from loguru import logger 

16 

17from ...config.paths import get_research_outputs_directory 

18from ...constants import ResearchStatus 

19from ...database.models import ResearchHistory 

20from ...database.session_context import get_user_db_session 

21from ...security import filter_research_metadata, strip_settings_snapshot 

22from ..auth.decorators import login_required 

23from ..models.database import calculate_duration 

24from .globals import active_research, termination_flags 

25 

26# Create a Blueprint for the research application 

27research_bp = Blueprint("research", __name__) 

28 

29# Output directory for research results 

30OUTPUT_DIR = get_research_outputs_directory() 

31 

32 

33# Example conversions from the original file: 

34 

35 

36def check_research_status_orm(research_id): 

37 """ 

38 Check research status using ORM instead of raw SQL. 

39 

40 Original SQL: 

41 SELECT status FROM research_history WHERE id = ? 

42 """ 

43 with get_user_db_session() as db_session: 

44 research = ( 

45 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

46 ) 

47 return research.status if research else None 

48 

49 

50def update_research_status_orm(research_id, new_status): 

51 """ 

52 Update research status using ORM. 

53 

54 Original SQL: 

55 UPDATE research_history SET status = ? WHERE id = ? 

56 """ 

57 with get_user_db_session() as db_session: 

58 research = ( 

59 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

60 ) 

61 if research: 

62 research.status = new_status 

63 db_session.commit() 

64 return True 

65 return False 

66 

67 

68def update_progress_log_orm(research_id, progress_log): 

69 """ 

70 Update progress log using ORM. 

71 

72 Original SQL: 

73 UPDATE research_history SET progress_log = ? WHERE id = ? 

74 """ 

75 with get_user_db_session() as db_session: 

76 research = ( 

77 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

78 ) 

79 if research: 

80 research.progress_log = progress_log 

81 db_session.commit() 

82 return True 

83 return False 

84 

85 

86@research_bp.route("/api/start_research", methods=["POST"]) 

87@login_required 

88def start_research(): 

89 """Start research with ORM operations.""" 

90 data = request.json 

91 query = data.get("query") 

92 mode = data.get("mode", "quick") 

93 

94 # ... validation code ... 

95 

96 # Check if there's any active research that's actually still running 

97 if active_research: 

98 # Verify each active research is still valid 

99 stale_research_ids = [] 

100 with get_user_db_session() as db_session: 

101 for research_id, research_data in list(active_research.items()): 

102 # Check database status using ORM 

103 research = ( 

104 db_session.query(ResearchHistory) 

105 .filter_by(id=research_id) 

106 .first() 

107 ) 

108 

109 # If the research doesn't exist in DB or is not in_progress, it's stale 

110 if ( 

111 not research 

112 or research.status != ResearchStatus.IN_PROGRESS 

113 or ( 

114 not research_data.get("thread") 

115 or not research_data.get("thread").is_alive() 

116 ) 

117 ): 

118 stale_research_ids.append(research_id) 

119 

120 # Clean up any stale research processes 

121 for stale_id in stale_research_ids: 

122 logger.info(f"Cleaning up stale research process: {stale_id}") 

123 if stale_id in active_research: 

124 del active_research[stale_id] 

125 if stale_id in termination_flags: 

126 del termination_flags[stale_id] 

127 

128 # Create a record in the database with ORM 

129 created_at = datetime.now(UTC).isoformat() 

130 

131 # Save research settings in the metadata field 

132 research_settings = { 

133 "model_provider": data.get("model_provider", "OLLAMA"), 

134 "model": data.get("model"), 

135 # ... other settings ... 

136 } 

137 

138 with get_user_db_session() as db_session: 

139 research = ResearchHistory( 

140 query=query, 

141 mode=mode, 

142 status=ResearchStatus.IN_PROGRESS, 

143 created_at=created_at, 

144 progress_log=[{"time": created_at, "progress": 0}], 

145 research_meta=research_settings, 

146 ) 

147 db_session.add(research) 

148 db_session.commit() 

149 research_id = research.id 

150 

151 # Start the research process 

152 # ... rest of the function ... 

153 

154 return jsonify({"status": "success", "research_id": research_id}) 

155 

156 

157@research_bp.route("/api/terminate/<string:research_id>", methods=["POST"]) 

158@login_required 

159def terminate_research(research_id): 

160 """Terminate research using ORM.""" 

161 try: 

162 with get_user_db_session() as db_session: 

163 # Check if the research exists and is in progress 

164 research = ( 

165 db_session.query(ResearchHistory) 

166 .filter_by(id=research_id) 

167 .first() 

168 ) 

169 

170 if not research: 

171 return jsonify( 

172 {"status": "error", "message": "Research not found"} 

173 ), 404 

174 

175 # If it's not in progress, return an error 

176 if research.status != ResearchStatus.IN_PROGRESS: 176 ↛ 188line 176 didn't jump to line 188 because the condition on line 176 was always true

177 return ( 

178 jsonify( 

179 { 

180 "status": "error", 

181 "message": "Research is not in progress", 

182 } 

183 ), 

184 400, 

185 ) 

186 

187 # Check if it's in the active_research dict 

188 if research_id not in active_research: 

189 # Update the status in the database 

190 research.status = ResearchStatus.SUSPENDED 

191 db_session.commit() 

192 return jsonify( 

193 {"status": "success", "message": "Research terminated"} 

194 ) 

195 

196 # Set the termination flag 

197 termination_flags[research_id] = True 

198 

199 # Log the termination request 

200 timestamp = datetime.now(UTC).isoformat() 

201 termination_message = "Research termination requested by user" 

202 current_progress = active_research[research_id]["progress"] 

203 

204 # Create log entry 

205 log_entry = { 

206 "time": timestamp, 

207 "message": termination_message, 

208 "progress": current_progress, 

209 "metadata": {"phase": "termination"}, 

210 } 

211 

212 # Add to in-memory log 

213 active_research[research_id]["log"].append(log_entry) 

214 

215 # Update the log in the database 

216 if research.progress_log: 

217 try: 

218 current_log = research.progress_log 

219 if isinstance(current_log, str): 

220 current_log = json.loads(current_log) 

221 except Exception: 

222 current_log = [] 

223 else: 

224 current_log = [] 

225 

226 current_log.append(log_entry) 

227 research.progress_log = current_log 

228 research.status = ResearchStatus.SUSPENDED 

229 db_session.commit() 

230 

231 logger.log("MILESTONE", f"Research ended: {termination_message}") 

232 

233 return jsonify({"status": "success", "message": "Research terminated"}) 

234 

235 finally: 

236 db_session.close() 

237 

238 

239@research_bp.route("/api/delete/<string:research_id>", methods=["DELETE"]) 

240@login_required 

241def delete_research(research_id): 

242 """Delete research using ORM.""" 

243 db_session = get_user_db_session() 

244 

245 try: 

246 # Get the research record 

247 research = ( 

248 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

249 ) 

250 

251 if not research: 

252 return jsonify( 

253 {"status": "error", "message": "Research not found"} 

254 ), 404 

255 

256 # Get report path before deletion 

257 report_path = research.report_path 

258 

259 # Delete from database 

260 db_session.delete(research) 

261 db_session.commit() 

262 

263 # Delete report file if exists 

264 if report_path and Path(report_path).exists(): 264 ↛ 271line 264 didn't jump to line 271 because the condition on line 264 was always true

265 try: 

266 Path(report_path).unlink() 

267 logger.info(f"Deleted report file: {report_path}") 

268 except Exception: 

269 logger.exception("Failed to delete report file") 

270 

271 return jsonify( 

272 {"status": "success", "message": "Research deleted successfully"} 

273 ) 

274 

275 except Exception: 

276 db_session.rollback() 

277 logger.exception("Error deleting research") 

278 return jsonify( 

279 { 

280 "status": "error", 

281 "message": "An internal error occurred while deleting the research.", 

282 } 

283 ), 500 

284 finally: 

285 db_session.close() 

286 

287 

288@research_bp.route("/api/clear_history", methods=["POST"]) 

289@login_required 

290def clear_history(): 

291 """Clear history using ORM.""" 

292 db_session = get_user_db_session() 

293 

294 try: 

295 # Get all research records 

296 all_research = db_session.query(ResearchHistory).all() 

297 

298 # Delete report files 

299 deleted_files = 0 

300 for research in all_research: 

301 if research.report_path and Path(research.report_path).exists(): 301 ↛ 300line 301 didn't jump to line 300 because the condition on line 301 was always true

302 try: 

303 Path(research.report_path).unlink() 

304 deleted_files += 1 

305 except Exception: 

306 logger.exception( 

307 f"Failed to delete file {research.report_path}" 

308 ) 

309 

310 # Delete all records 

311 deleted_count = db_session.query(ResearchHistory).delete() 

312 db_session.commit() 

313 

314 logger.info( 

315 f"Cleared history: {deleted_count} records, {deleted_files} files" 

316 ) 

317 

318 return jsonify( 

319 { 

320 "status": "success", 

321 "message": f"Deleted {deleted_count} research records and {deleted_files} report files", 

322 } 

323 ) 

324 

325 except Exception: 

326 db_session.rollback() 

327 logger.exception("Error clearing history") 

328 return jsonify( 

329 { 

330 "status": "error", 

331 "message": "An internal error occurred while clearing the history.", 

332 } 

333 ), 500 

334 finally: 

335 db_session.close() 

336 

337 

338@research_bp.route("/api/history", methods=["GET"]) 

339@login_required 

340def api_get_history(): 

341 """Get history using ORM with pagination.""" 

342 page = max(1, request.args.get("page", 1, type=int)) 

343 per_page = request.args.get("per_page", 50, type=int) 

344 per_page = max(1, min(per_page, 500)) 

345 

346 db_session = get_user_db_session() 

347 

348 try: 

349 # Query with pagination 

350 query = db_session.query(ResearchHistory).order_by( 

351 ResearchHistory.created_at.desc() 

352 ) 

353 

354 # Get total count 

355 total = query.count() 

356 

357 # Get paginated results 

358 research_items = ( 

359 query.offset((page - 1) * per_page).limit(per_page).all() 

360 ) 

361 

362 # Convert to dictionaries 

363 history_data = [] 

364 for item in research_items: 

365 data = { 

366 "id": item.id, 

367 "query": item.query, 

368 "mode": item.mode, 

369 "status": item.status, 

370 "created_at": item.created_at, 

371 "completed_at": item.completed_at, 

372 "duration_seconds": item.duration_seconds, 

373 "metadata": filter_research_metadata(item.research_meta), 

374 "progress": item.progress, 

375 "title": item.title, 

376 } 

377 

378 # Calculate duration if not set 

379 if not data["duration_seconds"] and data["created_at"]: 379 ↛ 380line 379 didn't jump to line 380 because the condition on line 379 was never true

380 data["duration_seconds"] = calculate_duration( 

381 data["created_at"], data["completed_at"] 

382 ) 

383 

384 history_data.append(data) 

385 

386 return jsonify( 

387 { 

388 "history": history_data, 

389 "total": total, 

390 "page": page, 

391 "per_page": per_page, 

392 "total_pages": (total + per_page - 1) // per_page, 

393 } 

394 ) 

395 

396 except Exception: 

397 logger.exception("Error fetching history") 

398 return jsonify( 

399 { 

400 "status": "error", 

401 "message": "An internal error occurred while fetching the history.", 

402 } 

403 ), 500 

404 finally: 

405 db_session.close() 

406 

407 

408@research_bp.route("/api/research/<string:research_id>") 

409@login_required 

410def api_get_research(research_id): 

411 """Get research details using ORM.""" 

412 try: 

413 with get_user_db_session() as db_session: 

414 research = ( 

415 db_session.query(ResearchHistory) 

416 .filter_by(id=research_id) 

417 .first() 

418 ) 

419 

420 if not research: 

421 return jsonify( 

422 {"status": "error", "message": "Research not found"} 

423 ), 404 

424 

425 # Convert to dictionary 

426 data = { 

427 "id": research.id, 

428 "query": research.query, 

429 "mode": research.mode, 

430 "status": research.status, 

431 "created_at": research.created_at, 

432 "completed_at": research.completed_at, 

433 "duration_seconds": research.duration_seconds, 

434 "metadata": strip_settings_snapshot(research.research_meta), 

435 "progress": research.progress, 

436 "title": research.title, 

437 } 

438 

439 # Add logs if available 

440 if research_id in active_research: 

441 data["logs"] = active_research[research_id].get("log", []) 

442 

443 return jsonify(data) 

444 

445 except Exception: 

446 logger.exception("Error fetching research") 

447 return jsonify( 

448 { 

449 "status": "error", 

450 "message": "An internal error occurred while fetching the research.", 

451 } 

452 ), 500 

453 

454 

455# Add more converted routes as needed...