Coverage for src / local_deep_research / news / flask_api.py: 19%

633 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Flask API endpoints for news system. 

3Converted from FastAPI to match LDR's Flask architecture. 

4""" 

5 

6from typing import Dict, Any 

7from flask import Blueprint, request, jsonify 

8from loguru import logger 

9 

10from . import api 

11from .folder_manager import FolderManager 

12from ..database.models import SubscriptionFolder 

13from ..web.auth.decorators import login_required 

14from ..database.session_context import get_user_db_session 

15from ..utilities.db_utils import get_settings_manager 

16from ..security import safe_post 

17 

18 

19def safe_error_message(e: Exception, context: str = "") -> str: 

20 """ 

21 Return a safe error message that doesn't expose internal details. 

22 

23 Args: 

24 e: The exception 

25 context: Optional context about what was being attempted 

26 

27 Returns: 

28 A generic error message safe for external users 

29 """ 

30 # Log the actual error for debugging 

31 logger.exception(f"Error in {context}") 

32 

33 # Return generic messages based on exception type 

34 if isinstance(e, ValueError): 

35 return "Invalid input provided" 

36 elif isinstance(e, KeyError): 

37 return "Required data missing" 

38 elif isinstance(e, TypeError): 

39 return "Invalid data format" 

40 else: 

41 # Generic message for production 

42 return f"An error occurred{f' while {context}' if context else ''}" 

43 

44 

45# Create Blueprint - no url_prefix here since parent blueprint already has /news 

46news_api_bp = Blueprint("news_api", __name__, url_prefix="/api") 

47 

48# Components are initialized in api.py 

49 

50 

51def get_user_id(): 

52 """Get current user ID from session""" 

53 from ..web.auth.decorators import current_user 

54 

55 username = current_user() 

56 

57 if not username: 

58 # For news, we need authenticated users 

59 return None 

60 

61 return username 

62 

63 

64@news_api_bp.route("/feed", methods=["GET"]) 

65@login_required 

66def get_news_feed() -> Dict[str, Any]: 

67 """ 

68 Get personalized news feed for user. 

69 

70 Query params: 

71 user_id: User identifier (default: anonymous) 

72 limit: Maximum number of cards to return (default: 20) 

73 use_cache: Whether to use cached news (default: true) 

74 strategy: Override default recommendation strategy 

75 focus: Optional focus area for news 

76 """ 

77 try: 

78 # Get current user (login_required ensures we have one) 

79 user_id = get_user_id() 

80 logger.info(f"News feed requested by user: {user_id}") 

81 

82 # Get query parameters 

83 settings_manager = get_settings_manager() 

84 default_limit = settings_manager.get_setting("news.feed.default_limit") 

85 limit = int(request.args.get("limit", default_limit)) 

86 use_cache = request.args.get("use_cache", "true").lower() == "true" 

87 strategy = request.args.get("strategy") 

88 focus = request.args.get("focus") 

89 subscription_id = request.args.get("subscription_id") 

90 

91 logger.info( 

92 f"News feed params: limit={limit}, subscription_id={subscription_id}, focus={focus}" 

93 ) 

94 

95 # Call the direct API function (now synchronous) 

96 result = api.get_news_feed( 

97 user_id=user_id, 

98 limit=limit, 

99 use_cache=use_cache, 

100 focus=focus, 

101 search_strategy=strategy, 

102 subscription_id=subscription_id, 

103 ) 

104 

105 # Check for errors in result 

106 if "error" in result and result.get("news_items") == []: 

107 # Sanitize error message before returning to client 

108 safe_msg = safe_error_message( 

109 Exception(result["error"]), context="get_news_feed" 

110 ) 

111 return jsonify( 

112 {"error": safe_msg, "news_items": []} 

113 ), 400 if "must be between" in result["error"] else 500 

114 

115 # Debug: Log the result before returning 

116 logger.info( 

117 f"API returning {len(result.get('news_items', []))} news items" 

118 ) 

119 if result.get("news_items"): 

120 logger.info( 

121 f"First item ID: {result['news_items'][0].get('id', 'NO_ID')}" 

122 ) 

123 

124 return jsonify(result) 

125 

126 except Exception as e: 

127 return jsonify( 

128 { 

129 "error": safe_error_message(e, "getting news feed"), 

130 "news_items": [], 

131 } 

132 ), 500 

133 

134 

135@news_api_bp.route("/subscribe", methods=["POST"]) 

136@login_required 

137def create_subscription() -> Dict[str, Any]: 

138 """ 

139 Create a new subscription for user. 

140 

141 JSON body: 

142 query: Search query or topic 

143 subscription_type: "search" or "topic" (default: "search") 

144 refresh_minutes: Refresh interval in minutes (default: from settings) 

145 """ 

146 try: 

147 data = request.get_json(force=True) 

148 except Exception: 

149 # Handle invalid JSON 

150 return jsonify({"error": "Invalid JSON data"}), 400 

151 

152 try: 

153 if not data: 

154 return jsonify({"error": "No JSON data provided"}), 400 

155 

156 # Get current user 

157 user_id = get_user_id() 

158 

159 # Extract parameters 

160 query = data.get("query") 

161 subscription_type = data.get("subscription_type", "search") 

162 refresh_minutes = data.get( 

163 "refresh_minutes" 

164 ) # Will use default from api.py 

165 

166 # Extract model configuration (optional) 

167 model_provider = data.get("model_provider") 

168 model = data.get("model") 

169 search_strategy = data.get("search_strategy", "news_aggregation") 

170 custom_endpoint = data.get("custom_endpoint") 

171 

172 # Extract additional fields 

173 name = data.get("name") 

174 folder_id = data.get("folder_id") 

175 is_active = data.get("is_active", True) 

176 search_engine = data.get("search_engine") 

177 search_iterations = data.get("search_iterations") 

178 questions_per_iteration = data.get("questions_per_iteration") 

179 

180 # Validate required fields 

181 if not query: 

182 return jsonify({"error": "query is required"}), 400 

183 

184 # Call the direct API function 

185 result = api.create_subscription( 

186 user_id=user_id, 

187 query=query, 

188 subscription_type=subscription_type, 

189 refresh_minutes=refresh_minutes, 

190 model_provider=model_provider, 

191 model=model, 

192 search_strategy=search_strategy, 

193 custom_endpoint=custom_endpoint, 

194 name=name, 

195 folder_id=folder_id, 

196 is_active=is_active, 

197 search_engine=search_engine, 

198 search_iterations=search_iterations, 

199 questions_per_iteration=questions_per_iteration, 

200 ) 

201 

202 return jsonify(result) 

203 

204 except ValueError as e: 

205 return jsonify( 

206 {"error": safe_error_message(e, "creating subscription")} 

207 ), 400 

208 except Exception as e: 

209 return jsonify( 

210 {"error": safe_error_message(e, "creating subscription")} 

211 ), 500 

212 

213 

214@news_api_bp.route("/vote", methods=["POST"]) 

215@login_required 

216def vote_on_news() -> Dict[str, Any]: 

217 """ 

218 Submit vote on a news item. 

219 

220 JSON body: 

221 card_id: ID of the news card 

222 vote: "up" or "down" 

223 """ 

224 try: 

225 data = request.get_json() 

226 if not data: 

227 return jsonify({"error": "No JSON data provided"}), 400 

228 

229 # Get current user 

230 user_id = get_user_id() 

231 

232 card_id = data.get("card_id") 

233 vote = data.get("vote") 

234 

235 # Validate 

236 if not all([card_id, vote]): 

237 return jsonify({"error": "card_id and vote are required"}), 400 

238 

239 # Call the direct API function 

240 result = api.submit_feedback( 

241 card_id=card_id, user_id=user_id, vote=vote 

242 ) 

243 

244 return jsonify(result) 

245 

246 except ValueError as e: 

247 error_msg = str(e) 

248 if "not found" in error_msg.lower(): 

249 return jsonify({"error": "Resource not found"}), 404 

250 else: 

251 return jsonify( 

252 {"error": safe_error_message(e, "submitting vote")} 

253 ), 400 

254 except Exception as e: 

255 return jsonify({"error": safe_error_message(e, "submitting vote")}), 500 

256 

257 

258@news_api_bp.route("/feedback/batch", methods=["POST"]) 

259@login_required 

260def get_batch_feedback() -> Dict[str, Any]: 

261 """ 

262 Get feedback (votes) for multiple news cards. 

263 JSON body: 

264 card_ids: List of card IDs 

265 """ 

266 try: 

267 data = request.get_json() 

268 if not data: 

269 return jsonify({"error": "No JSON data provided"}), 400 

270 

271 card_ids = data.get("card_ids", []) 

272 if not card_ids: 

273 return jsonify({"votes": {}}) 

274 

275 # Get current user 

276 user_id = get_user_id() 

277 

278 # Call the direct API function 

279 result = api.get_votes_for_cards(card_ids=card_ids, user_id=user_id) 

280 

281 return jsonify(result) 

282 

283 except ValueError as e: 

284 error_msg = str(e) 

285 if "not found" in error_msg.lower(): 

286 return jsonify({"error": "Resource not found"}), 404 

287 return jsonify({"error": safe_error_message(e, "getting votes")}), 400 

288 except Exception as e: 

289 logger.exception("Error getting batch feedback") 

290 return jsonify({"error": safe_error_message(e, "getting votes")}), 500 

291 

292 

293@news_api_bp.route("/feedback/<card_id>", methods=["POST"]) 

294@login_required 

295def submit_feedback(card_id: str) -> Dict[str, Any]: 

296 """ 

297 Submit feedback (vote) for a news card. 

298 

299 JSON body: 

300 vote: "up" or "down" 

301 """ 

302 try: 

303 data = request.get_json() 

304 if not data: 

305 return jsonify({"error": "No JSON data provided"}), 400 

306 

307 # Get current user 

308 user_id = get_user_id() 

309 vote = data.get("vote") 

310 

311 # Validate 

312 if not vote: 

313 return jsonify({"error": "vote is required"}), 400 

314 

315 # Call the direct API function 

316 result = api.submit_feedback( 

317 card_id=card_id, user_id=user_id, vote=vote 

318 ) 

319 

320 return jsonify(result) 

321 

322 except ValueError as e: 

323 error_msg = str(e) 

324 if "not found" in error_msg.lower(): 

325 return jsonify({"error": "Resource not found"}), 404 

326 elif "must be" in error_msg.lower(): 

327 return jsonify({"error": "Invalid input value"}), 400 

328 else: 

329 return jsonify( 

330 {"error": safe_error_message(e, "submitting feedback")} 

331 ), 400 

332 except Exception as e: 

333 return jsonify( 

334 {"error": safe_error_message(e, "submitting feedback")} 

335 ), 500 

336 

337 

338@news_api_bp.route("/research/<card_id>", methods=["POST"]) 

339@login_required 

340def research_news_item(card_id: str) -> Dict[str, Any]: 

341 """ 

342 Perform deeper research on a news item. 

343 

344 JSON body: 

345 depth: "quick", "detailed", or "report" (default: "quick") 

346 """ 

347 try: 

348 data = request.get_json() or {} 

349 depth = data.get("depth", "quick") 

350 

351 # Call the API function which handles the research 

352 result = api.research_news_item(card_id, depth) 

353 

354 return jsonify(result) 

355 

356 except Exception as e: 

357 return jsonify( 

358 {"error": safe_error_message(e, "researching news item")} 

359 ), 500 

360 

361 

362@news_api_bp.route("/subscriptions/current", methods=["GET"]) 

363@login_required 

364def get_current_user_subscriptions() -> Dict[str, Any]: 

365 """Get all subscriptions for current user.""" 

366 try: 

367 # Get current user 

368 user_id = get_user_id() 

369 

370 # Ensure we have a database session for the user 

371 # This will trigger register_activity 

372 logger.debug(f"Getting news feed for user {user_id}") 

373 

374 # Use the API function 

375 result = api.get_subscriptions(user_id) 

376 if "error" in result: 

377 logger.error( 

378 f"Error getting subscriptions for user {user_id}: {result['error']}" 

379 ) 

380 return jsonify({"error": "Failed to retrieve subscriptions"}), 500 

381 return jsonify(result) 

382 

383 except Exception as e: 

384 return jsonify( 

385 {"error": safe_error_message(e, "getting subscriptions")} 

386 ), 500 

387 

388 

389@news_api_bp.route("/subscriptions/<subscription_id>", methods=["GET"]) 

390@login_required 

391def get_subscription(subscription_id: str) -> Dict[str, Any]: 

392 """Get a single subscription by ID.""" 

393 try: 

394 # Handle null or invalid subscription IDs 

395 if ( 

396 subscription_id == "null" 

397 or subscription_id == "undefined" 

398 or not subscription_id 

399 ): 

400 return jsonify({"error": "Invalid subscription ID"}), 400 

401 

402 # Get the subscription 

403 subscription = api.get_subscription(subscription_id) 

404 

405 if not subscription: 

406 return jsonify({"error": "Subscription not found"}), 404 

407 

408 return jsonify(subscription) 

409 

410 except Exception as e: 

411 return jsonify( 

412 {"error": safe_error_message(e, "getting subscription")} 

413 ), 500 

414 

415 

416@news_api_bp.route("/subscriptions/<subscription_id>", methods=["PUT"]) 

417@login_required 

418def update_subscription(subscription_id: str) -> Dict[str, Any]: 

419 """Update a subscription.""" 

420 try: 

421 data = request.get_json(force=True) 

422 except Exception: 

423 return jsonify({"error": "Invalid JSON data"}), 400 

424 

425 try: 

426 if not data: 

427 return jsonify({"error": "No JSON data provided"}), 400 

428 

429 # Prepare update data 

430 update_data = {} 

431 

432 # Map fields from request to storage format 

433 field_mapping = { 

434 "query": "query_or_topic", 

435 "name": "name", 

436 "refresh_minutes": "refresh_interval_minutes", 

437 "is_active": "is_active", 

438 "folder_id": "folder_id", 

439 "model_provider": "model_provider", 

440 "model": "model", 

441 "search_strategy": "search_strategy", 

442 "custom_endpoint": "custom_endpoint", 

443 "search_engine": "search_engine", 

444 "search_iterations": "search_iterations", 

445 "questions_per_iteration": "questions_per_iteration", 

446 } 

447 

448 for request_field, storage_field in field_mapping.items(): 

449 if request_field in data: 

450 update_data[storage_field] = data[request_field] 

451 

452 # Update subscription 

453 result = api.update_subscription(subscription_id, update_data) 

454 

455 if "error" in result: 

456 # Sanitize error message before returning to client 

457 original_error = result["error"] 

458 result["error"] = safe_error_message( 

459 Exception(original_error), "updating subscription" 

460 ) 

461 if "not found" in original_error.lower(): 

462 return jsonify(result), 404 

463 else: 

464 return jsonify(result), 400 

465 

466 return jsonify(result) 

467 

468 except Exception as e: 

469 return jsonify( 

470 {"error": safe_error_message(e, "updating subscription")} 

471 ), 500 

472 

473 

474@news_api_bp.route("/subscriptions/<subscription_id>", methods=["DELETE"]) 

475@login_required 

476def delete_subscription(subscription_id: str) -> Dict[str, Any]: 

477 """Delete a subscription.""" 

478 try: 

479 # Call the direct API function 

480 success = api.delete_subscription(subscription_id) 

481 

482 if success: 

483 return jsonify( 

484 { 

485 "status": "success", 

486 "message": f"Subscription {subscription_id} deleted", 

487 } 

488 ) 

489 else: 

490 return jsonify({"error": "Subscription not found"}), 404 

491 

492 except Exception as e: 

493 return jsonify( 

494 {"error": safe_error_message(e, "deleting subscription")} 

495 ), 500 

496 

497 

498@news_api_bp.route("/subscriptions/<subscription_id>/run", methods=["POST"]) 

499@login_required 

500def run_subscription_now(subscription_id: str) -> Dict[str, Any]: 

501 """Manually trigger a subscription to run now.""" 

502 try: 

503 # Get the subscription from the API 

504 subscription_data = api.get_subscriptions("anonymous") 

505 

506 # Find the specific subscription 

507 subscription = None 

508 for sub in subscription_data.get("subscriptions", []): 

509 if sub["id"] == subscription_id: 

510 subscription = sub 

511 break 

512 

513 if not subscription: 

514 return jsonify({"error": "Subscription not found"}), 404 

515 

516 # Get timezone-aware current date using settings 

517 from flask import session 

518 from .core.utils import get_local_date_string 

519 from ..database.session_context import get_user_db_session 

520 from ..settings.manager import SettingsManager 

521 

522 username = session.get("username", "anonymous") 

523 with get_user_db_session(username) as db: 

524 settings_manager = SettingsManager(db) 

525 current_date = get_local_date_string(settings_manager) 

526 

527 # Get the query and update dates 

528 query = subscription["query"] 

529 

530 # Replace YYYY-MM-DD placeholder ONLY (not all dates) 

531 query = query.replace("YYYY-MM-DD", current_date) 

532 

533 # Build request data similar to news page 

534 request_data = { 

535 "query": query, 

536 "mode": "quick", 

537 # Use subscription's model configuration if available 

538 "model_provider": subscription.get( 

539 "model_provider", "OLLAMA" 

540 ), # Default: llm.provider 

541 "model": subscription.get("model", "llama3"), # Default: llm.model 

542 "strategy": subscription.get("search_strategy", "news_aggregation"), 

543 "metadata": { 

544 "is_news_search": True, 

545 "search_type": "news_analysis", 

546 "display_in": "news_feed", 

547 "subscription_id": subscription_id, 

548 "triggered_by": "manual", 

549 "original_query": subscription[ 

550 "query" 

551 ], # Store original query with placeholder 

552 "processed_query": query, # Store processed query with replaced date 

553 "news_date": current_date, # Store the actual date used 

554 "title": subscription.get("name") 

555 if subscription.get("name") 

556 else None, 

557 }, 

558 } 

559 

560 # Add custom endpoint if specified 

561 if subscription.get("custom_endpoint"): 

562 request_data["custom_endpoint"] = subscription["custom_endpoint"] 

563 

564 # Call the CSRF-exempt research API endpoint (api_bp at /research/api) 

565 # Use request.host_url to get the actual URL the server is responding on 

566 base_url = request.host_url.rstrip("/") 

567 

568 response = safe_post( 

569 f"{base_url}/research/api/start", 

570 json=request_data, 

571 headers={"Content-Type": "application/json"}, 

572 allow_localhost=True, 

573 allow_private_ips=True, 

574 ) 

575 

576 if response.ok: 

577 data = response.json() 

578 if data.get("status") in ("success", "queued"): 

579 return jsonify( 

580 { 

581 "status": "success", 

582 "message": "Research started", 

583 "research_id": data.get("research_id"), 

584 "url": f"/progress/{data.get('research_id')}", 

585 } 

586 ) 

587 else: 

588 return jsonify( 

589 {"error": data.get("message", "Failed to start research")} 

590 ), 500 

591 else: 

592 return jsonify( 

593 {"error": f"Failed to start research: {response.status_code}"} 

594 ), response.status_code 

595 

596 except Exception as e: 

597 return jsonify( 

598 {"error": safe_error_message(e, "running subscription")} 

599 ), 500 

600 

601 

602@news_api_bp.route("/subscriptions/<subscription_id>/history", methods=["GET"]) 

603@login_required 

604def get_subscription_history(subscription_id: str) -> Dict[str, Any]: 

605 """Get research history for a subscription.""" 

606 try: 

607 settings_manager = get_settings_manager() 

608 default_limit = settings_manager.get_setting("news.feed.default_limit") 

609 limit = int(request.args.get("limit", default_limit)) 

610 result = api.get_subscription_history(subscription_id, limit) 

611 if "error" in result: 

612 logger.error( 

613 f"Error getting subscription history: {result['error']}" 

614 ) 

615 return jsonify( 

616 { 

617 "error": "Failed to retrieve subscription history", 

618 "history": [], 

619 } 

620 ), 500 

621 return jsonify(result) 

622 except Exception as e: 

623 return jsonify( 

624 {"error": safe_error_message(e, "getting subscription history")} 

625 ), 500 

626 

627 

628@news_api_bp.route("/preferences", methods=["POST"]) 

629@login_required 

630def save_preferences() -> Dict[str, Any]: 

631 """Save user preferences for news.""" 

632 try: 

633 data = request.get_json() 

634 if not data: 

635 return jsonify({"error": "No JSON data provided"}), 400 

636 

637 # Get current user 

638 user_id = get_user_id() 

639 preferences = data.get("preferences", {}) 

640 

641 # Call the direct API function 

642 result = api.save_news_preferences(user_id, preferences) 

643 

644 return jsonify(result) 

645 

646 except Exception as e: 

647 return jsonify( 

648 {"error": safe_error_message(e, "saving preferences")} 

649 ), 500 

650 

651 

652@news_api_bp.route("/categories", methods=["GET"]) 

653def get_categories() -> Dict[str, Any]: 

654 """Get news category distribution.""" 

655 try: 

656 # Call the direct API function 

657 result = api.get_news_categories() 

658 

659 return jsonify(result) 

660 

661 except Exception as e: 

662 return jsonify( 

663 {"error": safe_error_message(e, "getting categories")} 

664 ), 500 

665 

666 

667@news_api_bp.route("/scheduler/status", methods=["GET"]) 

668@login_required 

669def get_scheduler_status() -> Dict[str, Any]: 

670 """Get activity-based scheduler status.""" 

671 try: 

672 logger.info("Scheduler status endpoint called") 

673 from .subscription_manager.scheduler import get_news_scheduler 

674 

675 # Get scheduler instance 

676 scheduler = get_news_scheduler() 

677 logger.info( 

678 f"Scheduler instance obtained: is_running={scheduler.is_running}" 

679 ) 

680 

681 # Build status manually to avoid potential deadlock 

682 status = { 

683 "scheduler_available": True, # APScheduler is installed and working 

684 "is_running": scheduler.is_running, 

685 "config": scheduler.config.copy() 

686 if hasattr(scheduler, "config") 

687 else {}, 

688 "active_users": len(scheduler.user_sessions) 

689 if hasattr(scheduler, "user_sessions") 

690 else 0, 

691 "total_scheduled_jobs": 0, 

692 } 

693 

694 # Count scheduled jobs 

695 if hasattr(scheduler, "user_sessions"): 

696 total_jobs = sum( 

697 len(session.get("scheduled_jobs", set())) 

698 for session in scheduler.user_sessions.values() 

699 ) 

700 status["total_scheduled_jobs"] = total_jobs 

701 

702 # Also count actual APScheduler jobs 

703 if hasattr(scheduler, "scheduler") and scheduler.scheduler: 

704 try: 

705 apscheduler_jobs = scheduler.scheduler.get_jobs() 

706 status["apscheduler_job_count"] = len(apscheduler_jobs) 

707 status["apscheduler_jobs"] = [ 

708 { 

709 "id": job.id, 

710 "name": job.name, 

711 "next_run": job.next_run_time.isoformat() 

712 if job.next_run_time 

713 else None, 

714 } 

715 for job in apscheduler_jobs[ 

716 :10 

717 ] # Limit to first 10 for display 

718 ] 

719 except Exception: 

720 logger.exception("Error getting APScheduler jobs") 

721 status["apscheduler_job_count"] = 0 

722 

723 logger.info(f"Status built: {list(status.keys())}") 

724 

725 # Add scheduled_jobs field that JS expects 

726 status["scheduled_jobs"] = status.get("total_scheduled_jobs", 0) 

727 

728 logger.info( 

729 f"Returning status: is_running={status.get('is_running')}, active_users={status.get('active_users')}" 

730 ) 

731 return jsonify(status) 

732 

733 except Exception as e: 

734 return jsonify( 

735 {"error": safe_error_message(e, "getting scheduler status")} 

736 ), 500 

737 

738 

739@news_api_bp.route("/scheduler/start", methods=["POST"]) 

740@login_required 

741def start_scheduler() -> Dict[str, Any]: 

742 """Start the subscription scheduler.""" 

743 try: 

744 from flask import current_app 

745 from .subscription_manager.scheduler import get_news_scheduler 

746 

747 # Get scheduler instance 

748 scheduler = get_news_scheduler() 

749 

750 if scheduler.is_running: 

751 return jsonify({"message": "Scheduler is already running"}), 200 

752 

753 # Start the scheduler 

754 scheduler.start() 

755 

756 # Update app reference 

757 current_app.news_scheduler = scheduler 

758 

759 logger.info("News scheduler started via API") 

760 return jsonify( 

761 { 

762 "status": "success", 

763 "message": "Scheduler started", 

764 "active_users": len(scheduler.user_sessions), 

765 } 

766 ) 

767 

768 except Exception as e: 

769 return jsonify( 

770 {"error": safe_error_message(e, "starting scheduler")} 

771 ), 500 

772 

773 

774@news_api_bp.route("/scheduler/stop", methods=["POST"]) 

775@login_required 

776def stop_scheduler() -> Dict[str, Any]: 

777 """Stop the subscription scheduler.""" 

778 try: 

779 from flask import current_app 

780 

781 if ( 

782 hasattr(current_app, "news_scheduler") 

783 and current_app.news_scheduler 

784 ): 

785 scheduler = current_app.news_scheduler 

786 if scheduler.is_running: 

787 scheduler.stop() 

788 logger.info("News scheduler stopped via API") 

789 return jsonify( 

790 {"status": "success", "message": "Scheduler stopped"} 

791 ) 

792 else: 

793 return jsonify({"message": "Scheduler is not running"}), 200 

794 else: 

795 return jsonify({"message": "No scheduler instance found"}), 404 

796 

797 except Exception as e: 

798 return jsonify( 

799 {"error": safe_error_message(e, "stopping scheduler")} 

800 ), 500 

801 

802 

803@news_api_bp.route("/scheduler/check-now", methods=["POST"]) 

804@login_required 

805def check_subscriptions_now() -> Dict[str, Any]: 

806 """Manually trigger subscription checking.""" 

807 try: 

808 from flask import current_app 

809 

810 if ( 

811 not hasattr(current_app, "news_scheduler") 

812 or not current_app.news_scheduler 

813 ): 

814 return jsonify({"error": "Scheduler not initialized"}), 503 

815 

816 scheduler = current_app.news_scheduler 

817 if not scheduler.is_running: 

818 return jsonify({"error": "Scheduler is not running"}), 503 

819 

820 # Run the check subscriptions task immediately 

821 scheduler_instance = current_app.news_scheduler 

822 

823 # Get count of due subscriptions 

824 from ..database.models import NewsSubscription as BaseSubscription 

825 from datetime import datetime, timezone 

826 

827 with get_user_db_session() as session: 

828 now = datetime.now(timezone.utc) 

829 count = ( 

830 session.query(BaseSubscription) 

831 .filter( 

832 BaseSubscription.status == "active", 

833 (BaseSubscription.next_refresh.is_(None)) 

834 | (BaseSubscription.next_refresh <= now), 

835 ) 

836 .count() 

837 ) 

838 

839 # Trigger the check asynchronously 

840 import threading 

841 

842 check_thread = threading.Thread( 

843 target=scheduler_instance._check_subscriptions 

844 ) 

845 check_thread.daemon = True 

846 check_thread.start() 

847 

848 return jsonify( 

849 { 

850 "status": "success", 

851 "message": f"Checking {count} due subscriptions", 

852 "count": count, 

853 } 

854 ) 

855 

856 except Exception as e: 

857 return jsonify( 

858 {"error": safe_error_message(e, "checking subscriptions")} 

859 ), 500 

860 

861 

862@news_api_bp.route("/scheduler/cleanup-now", methods=["POST"]) 

863@login_required 

864def trigger_cleanup() -> Dict[str, Any]: 

865 """Manually trigger cleanup job.""" 

866 try: 

867 from .subscription_manager.scheduler import get_news_scheduler 

868 from datetime import datetime, UTC, timedelta 

869 

870 scheduler = get_news_scheduler() 

871 

872 if not scheduler.is_running: 

873 return jsonify({"error": "Scheduler is not running"}), 400 

874 

875 # Schedule cleanup to run in 1 second 

876 scheduler.scheduler.add_job( 

877 scheduler._run_cleanup_with_tracking, 

878 "date", 

879 run_date=datetime.now(UTC) + timedelta(seconds=1), 

880 id="manual_cleanup_trigger", 

881 ) 

882 

883 return jsonify( 

884 { 

885 "status": "triggered", 

886 "message": "Cleanup job will run within seconds", 

887 } 

888 ) 

889 

890 except Exception as e: 

891 return jsonify( 

892 {"error": safe_error_message(e, "triggering cleanup")} 

893 ), 500 

894 

895 

896@news_api_bp.route("/scheduler/users", methods=["GET"]) 

897@login_required 

898def get_active_users() -> Dict[str, Any]: 

899 """Get summary of active user sessions.""" 

900 try: 

901 from .subscription_manager.scheduler import get_news_scheduler 

902 

903 scheduler = get_news_scheduler() 

904 users_summary = scheduler.get_user_sessions_summary() 

905 

906 return jsonify( 

907 {"active_users": len(users_summary), "users": users_summary} 

908 ) 

909 

910 except Exception as e: 

911 return jsonify( 

912 {"error": safe_error_message(e, "getting active users")} 

913 ), 500 

914 

915 

916@news_api_bp.route("/scheduler/stats", methods=["GET"]) 

917@login_required 

918def scheduler_stats() -> Dict[str, Any]: 

919 """Get scheduler statistics and state.""" 

920 try: 

921 from .subscription_manager.scheduler import get_news_scheduler 

922 from flask import session 

923 

924 scheduler = get_news_scheduler() 

925 username = session.get("username") 

926 

927 # Debug info 

928 debug_info = { 

929 "current_user": username, 

930 "scheduler_running": scheduler.is_running, 

931 "user_sessions": {}, 

932 "apscheduler_jobs": [], 

933 } 

934 

935 # Get user session info 

936 if hasattr(scheduler, "user_sessions"): 

937 for user, session_info in scheduler.user_sessions.items(): 

938 debug_info["user_sessions"][user] = { 

939 "has_password": bool(session_info.get("password")), 

940 "last_activity": session_info.get( 

941 "last_activity" 

942 ).isoformat() 

943 if session_info.get("last_activity") 

944 else None, 

945 "scheduled_jobs_count": len( 

946 session_info.get("scheduled_jobs", set()) 

947 ), 

948 } 

949 

950 # Get APScheduler jobs 

951 if hasattr(scheduler, "scheduler") and scheduler.scheduler: 

952 jobs = scheduler.scheduler.get_jobs() 

953 debug_info["apscheduler_jobs"] = [ 

954 { 

955 "id": job.id, 

956 "name": job.name, 

957 "next_run": job.next_run_time.isoformat() 

958 if job.next_run_time 

959 else None, 

960 "trigger": str(job.trigger), 

961 } 

962 for job in jobs 

963 ] 

964 

965 # Force schedule for current user 

966 if username and username in scheduler.user_sessions: 

967 logger.info(f"Forcing schedule update for {username}") 

968 scheduler._schedule_user_subscriptions(username) 

969 debug_info["forced_schedule"] = True 

970 

971 return jsonify(debug_info) 

972 

973 except Exception as e: 

974 return jsonify( 

975 {"error": safe_error_message(e, "getting scheduler stats")} 

976 ), 500 

977 

978 

979@news_api_bp.route("/check-overdue", methods=["POST"]) 

980@login_required 

981def check_overdue_subscriptions(): 

982 """Check and run all overdue subscriptions for the current user.""" 

983 try: 

984 from flask import session 

985 from ..database.session_context import get_user_db_session 

986 from ..database.models.news import NewsSubscription 

987 from datetime import datetime, UTC, timedelta 

988 

989 username = session.get("username", "anonymous") 

990 

991 # Get overdue subscriptions 

992 overdue_count = 0 

993 results = [] 

994 with get_user_db_session(username) as db: 

995 now = datetime.now(UTC) 

996 overdue_subs = ( 

997 db.query(NewsSubscription) 

998 .filter( 

999 NewsSubscription.status == "active", 

1000 NewsSubscription.next_refresh <= now, 

1001 ) 

1002 .all() 

1003 ) 

1004 

1005 logger.info( 

1006 f"Found {len(overdue_subs)} overdue subscriptions for {username}" 

1007 ) 

1008 

1009 # Get timezone-aware current date using settings 

1010 from .core.utils import get_local_date_string 

1011 from ..settings.manager import SettingsManager 

1012 

1013 settings_manager = SettingsManager(db) 

1014 current_date = get_local_date_string(settings_manager) 

1015 

1016 for sub in overdue_subs: 

1017 try: 

1018 # Run the subscription using the same pattern as run_subscription_now 

1019 logger.info( 

1020 f"Running overdue subscription: {sub.name or sub.query_or_topic[:30]}" 

1021 ) 

1022 

1023 # Update any date placeholders with current date in user's timezone 

1024 query = sub.query_or_topic.replace( 

1025 "YYYY-MM-DD", current_date 

1026 ) 

1027 

1028 # Build request data 

1029 request_data = { 

1030 "query": query, 

1031 "mode": "quick", 

1032 "model_provider": sub.model_provider or "OLLAMA", 

1033 "model": sub.model or "llama3", 

1034 "strategy": sub.search_strategy or "news_aggregation", 

1035 "metadata": { 

1036 "is_news_search": True, 

1037 "search_type": "news_analysis", 

1038 "display_in": "news_feed", 

1039 "subscription_id": str(sub.id), 

1040 "triggered_by": "overdue_check", 

1041 "original_query": sub.query_or_topic, 

1042 "processed_query": query, 

1043 "news_date": current_date, 

1044 "title": sub.name if sub.name else None, 

1045 }, 

1046 } 

1047 

1048 # Add optional search parameters 

1049 if sub.search_engine: 

1050 request_data["search_engine"] = sub.search_engine 

1051 if sub.custom_endpoint: 

1052 request_data["custom_endpoint"] = sub.custom_endpoint 

1053 

1054 # Start research using HTTP request like run_subscription_now 

1055 logger.info( 

1056 f"Running overdue subscription: {sub.name or sub.query_or_topic[:30]}" 

1057 ) 

1058 

1059 # Make HTTP request to research API 

1060 from flask import request 

1061 

1062 # Use request.host_url to get the actual URL the server is responding on 

1063 base_url = request.host_url.rstrip("/") 

1064 

1065 # Use the session from the current request to maintain authentication 

1066 session_cookie = request.cookies.get("session") 

1067 

1068 response = safe_post( 

1069 f"{base_url}/research/api/start", 

1070 json=request_data, 

1071 headers={ 

1072 "Content-Type": "application/json", 

1073 "Cookie": f"session={session_cookie}" 

1074 if session_cookie 

1075 else "", 

1076 }, 

1077 timeout=30, 

1078 allow_localhost=True, 

1079 allow_private_ips=True, 

1080 ) 

1081 

1082 if response.ok: 

1083 result = response.json() 

1084 else: 

1085 result = { 

1086 "status": "error", 

1087 "error": f"HTTP {response.status_code}: {response.text}", 

1088 } 

1089 

1090 if result.get("status") in ("success", "queued"): 

1091 overdue_count += 1 

1092 

1093 # Update subscription's last/next refresh times 

1094 sub.last_refresh = datetime.now(UTC) 

1095 sub.next_refresh = datetime.now(UTC) + timedelta( 

1096 minutes=sub.refresh_interval_minutes 

1097 ) 

1098 db.commit() 

1099 

1100 results.append( 

1101 { 

1102 "id": str(sub.id), 

1103 "name": sub.name or sub.query_or_topic[:50], 

1104 "research_id": result.get("research_id"), 

1105 } 

1106 ) 

1107 else: 

1108 results.append( 

1109 { 

1110 "id": str(sub.id), 

1111 "name": sub.name or sub.query_or_topic[:50], 

1112 "error": result.get( 

1113 "error", "Failed to start research" 

1114 ), 

1115 } 

1116 ) 

1117 except Exception as e: 

1118 logger.exception(f"Error running subscription {sub.id}") 

1119 results.append( 

1120 { 

1121 "id": str(sub.id), 

1122 "name": sub.name or sub.query_or_topic[:50], 

1123 "error": safe_error_message( 

1124 e, "running subscription" 

1125 ), 

1126 } 

1127 ) 

1128 

1129 return jsonify( 

1130 { 

1131 "status": "success", 

1132 "overdue_found": len(overdue_subs), 

1133 "started": overdue_count, 

1134 "results": results, 

1135 } 

1136 ) 

1137 

1138 except Exception as e: 

1139 return jsonify( 

1140 {"error": safe_error_message(e, "checking overdue subscriptions")} 

1141 ), 500 

1142 

1143 

1144# Folder and subscription management routes 

1145@news_api_bp.route("/subscription/folders", methods=["GET"]) 

1146@login_required 

1147def get_folders(): 

1148 """Get all folders for the current user""" 

1149 try: 

1150 user_id = get_user_id() 

1151 

1152 with get_user_db_session() as session: 

1153 manager = FolderManager(session) 

1154 folders = manager.get_user_folders(user_id) 

1155 

1156 return jsonify([folder.to_dict() for folder in folders]) 

1157 

1158 except Exception as e: 

1159 return jsonify({"error": safe_error_message(e, "getting folders")}), 500 

1160 

1161 

1162@news_api_bp.route("/subscription/folders", methods=["POST"]) 

1163@login_required 

1164def create_folder(): 

1165 """Create a new folder""" 

1166 try: 

1167 data = request.json 

1168 

1169 if not data.get("name"): 

1170 return jsonify({"error": "Folder name is required"}), 400 

1171 

1172 with get_user_db_session() as session: 

1173 manager = FolderManager(session) 

1174 

1175 # Check if folder already exists 

1176 existing = ( 

1177 session.query(SubscriptionFolder) 

1178 .filter_by(name=data["name"]) 

1179 .first() 

1180 ) 

1181 if existing: 

1182 return jsonify({"error": "Folder already exists"}), 409 

1183 

1184 folder = manager.create_folder( 

1185 name=data["name"], 

1186 description=data.get("description"), 

1187 ) 

1188 

1189 return jsonify(folder.to_dict()), 201 

1190 

1191 except Exception as e: 

1192 return jsonify({"error": safe_error_message(e, "creating folder")}), 500 

1193 

1194 

1195@news_api_bp.route("/subscription/folders/<folder_id>", methods=["PUT"]) 

1196@login_required 

1197def update_folder(folder_id): 

1198 """Update a folder""" 

1199 try: 

1200 data = request.json 

1201 

1202 with get_user_db_session() as session: 

1203 manager = FolderManager(session) 

1204 folder = manager.update_folder(folder_id, **data) 

1205 

1206 if not folder: 

1207 return jsonify({"error": "Folder not found"}), 404 

1208 

1209 return jsonify(folder.to_dict()) 

1210 

1211 except Exception as e: 

1212 return jsonify({"error": safe_error_message(e, "updating folder")}), 500 

1213 

1214 

1215@news_api_bp.route("/subscription/folders/<folder_id>", methods=["DELETE"]) 

1216@login_required 

1217def delete_folder(folder_id): 

1218 """Delete a folder""" 

1219 try: 

1220 move_to = request.args.get("move_to") 

1221 

1222 with get_user_db_session() as session: 

1223 manager = FolderManager(session) 

1224 success = manager.delete_folder(folder_id, move_to) 

1225 

1226 if not success: 

1227 return jsonify({"error": "Folder not found"}), 404 

1228 

1229 return jsonify({"status": "deleted"}), 200 

1230 

1231 except Exception as e: 

1232 return jsonify({"error": safe_error_message(e, "deleting folder")}), 500 

1233 

1234 

1235@news_api_bp.route("/subscription/subscriptions/organized", methods=["GET"]) 

1236@login_required 

1237def get_subscriptions_organized(): 

1238 """Get subscriptions organized by folder""" 

1239 try: 

1240 user_id = get_user_id() 

1241 

1242 with get_user_db_session() as session: 

1243 manager = FolderManager(session) 

1244 organized = manager.get_subscriptions_by_folder(user_id) 

1245 

1246 # Convert to JSON-friendly format 

1247 result = {} 

1248 for folder, subs in organized.items(): 

1249 result[folder] = [sub.to_dict() for sub in subs] 

1250 

1251 return jsonify(result) 

1252 

1253 except Exception as e: 

1254 return jsonify( 

1255 {"error": safe_error_message(e, "getting organized subscriptions")} 

1256 ), 500 

1257 

1258 

1259@news_api_bp.route( 

1260 "/subscription/subscriptions/<subscription_id>", methods=["PUT"] 

1261) 

1262@login_required 

1263def update_subscription_folder(subscription_id): 

1264 """Update a subscription (mainly for folder assignment)""" 

1265 try: 

1266 data = request.json 

1267 logger.info( 

1268 f"Updating subscription {subscription_id} with data: {data}" 

1269 ) 

1270 

1271 with get_user_db_session() as session: 

1272 # Manually handle the update to ensure next_refresh is recalculated 

1273 from ...database.models import NewsSubscription as BaseSubscription 

1274 from datetime import datetime, timedelta, timezone 

1275 

1276 sub = ( 

1277 session.query(BaseSubscription) 

1278 .filter_by(id=subscription_id) 

1279 .first() 

1280 ) 

1281 if not sub: 

1282 return jsonify({"error": "Subscription not found"}), 404 

1283 

1284 # Update fields 

1285 for key, value in data.items(): 

1286 if hasattr(sub, key) and key not in [ 

1287 "id", 

1288 "user_id", 

1289 "created_at", 

1290 ]: 

1291 setattr(sub, key, value) 

1292 

1293 # Recalculate next_refresh if refresh_interval_minutes changed 

1294 if "refresh_interval_minutes" in data: 

1295 new_minutes = data["refresh_interval_minutes"] 

1296 if sub.last_refresh: 

1297 sub.next_refresh = sub.last_refresh + timedelta( 

1298 minutes=new_minutes 

1299 ) 

1300 else: 

1301 sub.next_refresh = datetime.now(timezone.utc) + timedelta( 

1302 minutes=new_minutes 

1303 ) 

1304 logger.info(f"Recalculated next_refresh: {sub.next_refresh}") 

1305 

1306 sub.updated_at = datetime.now(timezone.utc) 

1307 session.commit() 

1308 

1309 result = sub.to_dict() 

1310 logger.info( 

1311 f"Updated subscription result: refresh_interval_minutes={result.get('refresh_interval_minutes')}, next_refresh={result.get('next_refresh')}" 

1312 ) 

1313 return jsonify(result) 

1314 # Force reload: v2 

1315 

1316 except Exception as e: 

1317 return jsonify( 

1318 {"error": safe_error_message(e, "updating subscription")} 

1319 ), 500 

1320 

1321 

1322@news_api_bp.route("/subscription/stats", methods=["GET"]) 

1323@login_required 

1324def get_subscription_stats(): 

1325 """Get subscription statistics""" 

1326 try: 

1327 user_id = get_user_id() 

1328 

1329 with get_user_db_session() as session: 

1330 manager = FolderManager(session) 

1331 stats = manager.get_subscription_stats(user_id) 

1332 

1333 return jsonify(stats) 

1334 

1335 except Exception as e: 

1336 return jsonify({"error": safe_error_message(e, "getting stats")}), 500 

1337 

1338 

1339# Error handlers 

1340@news_api_bp.errorhandler(400) 

1341def bad_request(e): 

1342 return jsonify({"error": "Bad request"}), 400 

1343 

1344 

1345@news_api_bp.errorhandler(404) 

1346def not_found(e): 

1347 return jsonify({"error": "Resource not found"}), 404 

1348 

1349 

1350@news_api_bp.errorhandler(500) 

1351def internal_error(e): 

1352 return jsonify({"error": "Internal server error"}), 500 

1353 

1354 

1355@news_api_bp.route("/search-history", methods=["GET"]) 

1356@login_required 

1357def get_search_history(): 

1358 """Get search history for current user.""" 

1359 try: 

1360 # Get username from session 

1361 from ..web.auth.decorators import current_user 

1362 

1363 username = current_user() 

1364 if not username: 

1365 # Not authenticated, return empty history 

1366 return jsonify({"search_history": []}) 

1367 

1368 # Get search history from user's encrypted database 

1369 from ..database.session_context import get_user_db_session 

1370 from ..database.models import UserNewsSearchHistory 

1371 

1372 # Get password from Flask g object (set by middleware) 

1373 from flask import g 

1374 

1375 password = getattr(g, "user_password", None) 

1376 

1377 with get_user_db_session(username, password) as db_session: 

1378 history = ( 

1379 db_session.query(UserNewsSearchHistory) 

1380 .order_by(UserNewsSearchHistory.created_at.desc()) 

1381 .limit(20) 

1382 .all() 

1383 ) 

1384 

1385 return jsonify( 

1386 {"search_history": [item.to_dict() for item in history]} 

1387 ) 

1388 

1389 except Exception as e: 

1390 return jsonify( 

1391 {"error": safe_error_message(e, "getting search history")} 

1392 ), 500 

1393 

1394 

1395@news_api_bp.route("/search-history", methods=["POST"]) 

1396@login_required 

1397def add_search_history(): 

1398 """Add a search to the history.""" 

1399 try: 

1400 # Get username from session 

1401 from ..web.auth.decorators import current_user 

1402 

1403 username = current_user() 

1404 if not username: 

1405 # Not authenticated 

1406 return jsonify({"error": "Authentication required"}), 401 

1407 

1408 data = request.get_json() 

1409 logger.info( 

1410 f"add_search_history received data keys: {list(data.keys()) if data else 'None'}" 

1411 ) 

1412 if not data or not data.get("query"): 

1413 logger.warning("Invalid search history data: missing query") 

1414 return jsonify({"error": "query is required"}), 400 

1415 

1416 # Add to user's encrypted database 

1417 from ..database.session_context import get_user_db_session 

1418 from ..database.models import UserNewsSearchHistory 

1419 

1420 # Get password from Flask g object (set by middleware) 

1421 from flask import g 

1422 

1423 password = getattr(g, "user_password", None) 

1424 

1425 with get_user_db_session(username, password) as db_session: 

1426 search_history = UserNewsSearchHistory( 

1427 query=data["query"], 

1428 search_type=data.get("type", "filter"), 

1429 result_count=data.get("resultCount", 0), 

1430 ) 

1431 db_session.add(search_history) 

1432 db_session.commit() 

1433 

1434 return jsonify({"status": "success", "id": search_history.id}) 

1435 

1436 except Exception as e: 

1437 logger.exception("Error adding search history") 

1438 return jsonify( 

1439 {"error": safe_error_message(e, "adding search history")} 

1440 ), 500 

1441 

1442 

1443@news_api_bp.route("/search-history", methods=["DELETE"]) 

1444@login_required 

1445def clear_search_history(): 

1446 """Clear all search history for current user.""" 

1447 try: 

1448 # Get username from session 

1449 from ..web.auth.decorators import current_user 

1450 

1451 username = current_user() 

1452 if not username: 

1453 return jsonify({"status": "success"}) 

1454 

1455 # Clear from user's encrypted database 

1456 from ..database.session_context import get_user_db_session 

1457 from ..database.models import UserNewsSearchHistory 

1458 

1459 # Get password from Flask g object (set by middleware) 

1460 from flask import g 

1461 

1462 password = getattr(g, "user_password", None) 

1463 

1464 with get_user_db_session(username, password) as db_session: 

1465 db_session.query(UserNewsSearchHistory).delete() 

1466 db_session.commit() 

1467 

1468 return jsonify({"status": "success"}) 

1469 

1470 except Exception as e: 

1471 return jsonify( 

1472 {"error": safe_error_message(e, "clearing search history")} 

1473 ), 500