Coverage for src / local_deep_research / news / flask_api.py: 15%

642 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Flask API endpoints for news system. 

3Converted from FastAPI to match LDR's Flask architecture. 

4""" 

5 

6from typing import Dict, Any 

7from flask import Blueprint, request, jsonify 

8from loguru import logger 

9 

10from . import api 

11from .folder_manager import FolderManager 

12from ..database.models import SubscriptionFolder 

13from ..web.auth.decorators import login_required 

14from ..database.session_context import get_user_db_session 

15from ..utilities.db_utils import get_settings_manager 

16from ..security import safe_post 

17 

18 

19def safe_error_message(e: Exception, context: str = "") -> str: 

20 """ 

21 Return a safe error message that doesn't expose internal details. 

22 

23 Args: 

24 e: The exception 

25 context: Optional context about what was being attempted 

26 

27 Returns: 

28 A generic error message safe for external users 

29 """ 

30 # Log the actual error for debugging 

31 logger.exception(f"Error in {context}") 

32 

33 # Return generic messages based on exception type 

34 if isinstance(e, ValueError): 

35 return "Invalid input provided" 

36 elif isinstance(e, KeyError): 

37 return "Required data missing" 

38 elif isinstance(e, TypeError): 

39 return "Invalid data format" 

40 else: 

41 # Generic message for production 

42 return f"An error occurred{f' while {context}' if context else ''}" 

43 

44 

45# Create Blueprint - no url_prefix here since parent blueprint already has /news 

46news_api_bp = Blueprint("news_api", __name__, url_prefix="/api") 

47 

48# Components are initialized in api.py 

49 

50 

51def get_user_id(): 

52 """Get current user ID from session""" 

53 from ..web.auth.decorators import current_user 

54 

55 username = current_user() 

56 

57 if not username: 

58 # For news, we need authenticated users 

59 return None 

60 

61 return username 

62 

63 

64@news_api_bp.route("/feed", methods=["GET"]) 

65@login_required 

66def get_news_feed() -> Dict[str, Any]: 

67 """ 

68 Get personalized news feed for user. 

69 

70 Query params: 

71 user_id: User identifier (default: anonymous) 

72 limit: Maximum number of cards to return (default: 20) 

73 use_cache: Whether to use cached news (default: true) 

74 strategy: Override default recommendation strategy 

75 focus: Optional focus area for news 

76 """ 

77 try: 

78 # Get current user (login_required ensures we have one) 

79 user_id = get_user_id() 

80 logger.info(f"News feed requested by user: {user_id}") 

81 

82 # Get query parameters 

83 settings_manager = get_settings_manager() 

84 default_limit = settings_manager.get_setting("news.feed.default_limit") 

85 limit = int(request.args.get("limit", default_limit)) 

86 use_cache = request.args.get("use_cache", "true").lower() == "true" 

87 strategy = request.args.get("strategy") 

88 focus = request.args.get("focus") 

89 subscription_id = request.args.get("subscription_id") 

90 

91 logger.info( 

92 f"News feed params: limit={limit}, subscription_id={subscription_id}, focus={focus}" 

93 ) 

94 

95 # Call the direct API function (now synchronous) 

96 result = api.get_news_feed( 

97 user_id=user_id, 

98 limit=limit, 

99 use_cache=use_cache, 

100 focus=focus, 

101 search_strategy=strategy, 

102 subscription_id=subscription_id, 

103 ) 

104 

105 # Check for errors in result 

106 if "error" in result and result.get("news_items") == []: 

107 # Sanitize error message before returning to client 

108 safe_msg = safe_error_message( 

109 Exception(result["error"]), context="get_news_feed" 

110 ) 

111 return jsonify( 

112 {"error": safe_msg, "news_items": []} 

113 ), 400 if "must be between" in result["error"] else 500 

114 

115 # Debug: Log the result before returning 

116 logger.info( 

117 f"API returning {len(result.get('news_items', []))} news items" 

118 ) 

119 if result.get("news_items"): 

120 logger.info( 

121 f"First item ID: {result['news_items'][0].get('id', 'NO_ID')}" 

122 ) 

123 

124 return jsonify(result) 

125 

126 except Exception as e: 

127 return jsonify( 

128 { 

129 "error": safe_error_message(e, "getting news feed"), 

130 "news_items": [], 

131 } 

132 ), 500 

133 

134 

135@news_api_bp.route("/subscribe", methods=["POST"]) 

136@login_required 

137def create_subscription() -> Dict[str, Any]: 

138 """ 

139 Create a new subscription for user. 

140 

141 JSON body: 

142 query: Search query or topic 

143 subscription_type: "search" or "topic" (default: "search") 

144 refresh_minutes: Refresh interval in minutes (default: from settings) 

145 """ 

146 try: 

147 data = request.get_json(force=True) 

148 except Exception: 

149 # Handle invalid JSON 

150 return jsonify({"error": "Invalid JSON data"}), 400 

151 

152 try: 

153 if not data: 

154 return jsonify({"error": "No JSON data provided"}), 400 

155 

156 # Get current user 

157 user_id = get_user_id() 

158 

159 # Extract parameters 

160 query = data.get("query") 

161 subscription_type = data.get("subscription_type", "search") 

162 refresh_minutes = data.get( 

163 "refresh_minutes" 

164 ) # Will use default from api.py 

165 

166 # Extract model configuration (optional) 

167 model_provider = data.get("model_provider") 

168 model = data.get("model") 

169 search_strategy = data.get("search_strategy", "news_aggregation") 

170 custom_endpoint = data.get("custom_endpoint") 

171 

172 # Extract additional fields 

173 name = data.get("name") 

174 folder_id = data.get("folder_id") 

175 is_active = data.get("is_active", True) 

176 search_engine = data.get("search_engine") 

177 search_iterations = data.get("search_iterations") 

178 questions_per_iteration = data.get("questions_per_iteration") 

179 

180 # Validate required fields 

181 if not query: 

182 return jsonify({"error": "query is required"}), 400 

183 

184 # Call the direct API function 

185 result = api.create_subscription( 

186 user_id=user_id, 

187 query=query, 

188 subscription_type=subscription_type, 

189 refresh_minutes=refresh_minutes, 

190 model_provider=model_provider, 

191 model=model, 

192 search_strategy=search_strategy, 

193 custom_endpoint=custom_endpoint, 

194 name=name, 

195 folder_id=folder_id, 

196 is_active=is_active, 

197 search_engine=search_engine, 

198 search_iterations=search_iterations, 

199 questions_per_iteration=questions_per_iteration, 

200 ) 

201 

202 return jsonify(result) 

203 

204 except ValueError as e: 

205 return jsonify( 

206 {"error": safe_error_message(e, "creating subscription")} 

207 ), 400 

208 except Exception as e: 

209 return jsonify( 

210 {"error": safe_error_message(e, "creating subscription")} 

211 ), 500 

212 

213 

214@news_api_bp.route("/vote", methods=["POST"]) 

215@login_required 

216def vote_on_news() -> Dict[str, Any]: 

217 """ 

218 Submit vote on a news item. 

219 

220 JSON body: 

221 card_id: ID of the news card 

222 vote: "up" or "down" 

223 """ 

224 try: 

225 data = request.get_json() 

226 if not data: 

227 return jsonify({"error": "No JSON data provided"}), 400 

228 

229 # Get current user 

230 user_id = get_user_id() 

231 

232 card_id = data.get("card_id") 

233 vote = data.get("vote") 

234 

235 # Validate 

236 if not all([card_id, vote]): 

237 return jsonify({"error": "card_id and vote are required"}), 400 

238 

239 # Call the direct API function 

240 result = api.submit_feedback( 

241 card_id=card_id, user_id=user_id, vote=vote 

242 ) 

243 

244 return jsonify(result) 

245 

246 except ValueError as e: 

247 error_msg = str(e) 

248 if "not found" in error_msg.lower(): 

249 return jsonify({"error": "Resource not found"}), 404 

250 else: 

251 return jsonify( 

252 {"error": safe_error_message(e, "submitting vote")} 

253 ), 400 

254 except Exception as e: 

255 return jsonify({"error": safe_error_message(e, "submitting vote")}), 500 

256 

257 

258@news_api_bp.route("/feedback/batch", methods=["POST"]) 

259@login_required 

260def get_batch_feedback() -> Dict[str, Any]: 

261 """ 

262 Get feedback (votes) for multiple news cards. 

263 JSON body: 

264 card_ids: List of card IDs 

265 """ 

266 try: 

267 data = request.get_json() 

268 if not data: 

269 return jsonify({"error": "No JSON data provided"}), 400 

270 

271 card_ids = data.get("card_ids", []) 

272 if not card_ids: 

273 return jsonify({"votes": {}}) 

274 

275 # Get current user 

276 user_id = get_user_id() 

277 

278 # Call the direct API function 

279 result = api.get_votes_for_cards(card_ids=card_ids, user_id=user_id) 

280 

281 return jsonify(result) 

282 

283 except ValueError as e: 

284 error_msg = str(e) 

285 if "not found" in error_msg.lower(): 

286 return jsonify({"error": "Resource not found"}), 404 

287 return jsonify({"error": safe_error_message(e, "getting votes")}), 400 

288 except Exception as e: 

289 logger.exception("Error getting batch feedback") 

290 return jsonify({"error": safe_error_message(e, "getting votes")}), 500 

291 

292 

293@news_api_bp.route("/feedback/<card_id>", methods=["POST"]) 

294@login_required 

295def submit_feedback(card_id: str) -> Dict[str, Any]: 

296 """ 

297 Submit feedback (vote) for a news card. 

298 

299 JSON body: 

300 vote: "up" or "down" 

301 """ 

302 try: 

303 data = request.get_json() 

304 if not data: 

305 return jsonify({"error": "No JSON data provided"}), 400 

306 

307 # Get current user 

308 user_id = get_user_id() 

309 vote = data.get("vote") 

310 

311 # Validate 

312 if not vote: 

313 return jsonify({"error": "vote is required"}), 400 

314 

315 # Call the direct API function 

316 result = api.submit_feedback( 

317 card_id=card_id, user_id=user_id, vote=vote 

318 ) 

319 

320 return jsonify(result) 

321 

322 except ValueError as e: 

323 error_msg = str(e) 

324 if "not found" in error_msg.lower(): 

325 return jsonify({"error": "Resource not found"}), 404 

326 elif "must be" in error_msg.lower(): 

327 return jsonify({"error": "Invalid input value"}), 400 

328 else: 

329 return jsonify( 

330 {"error": safe_error_message(e, "submitting feedback")} 

331 ), 400 

332 except Exception as e: 

333 return jsonify( 

334 {"error": safe_error_message(e, "submitting feedback")} 

335 ), 500 

336 

337 

338@news_api_bp.route("/research/<card_id>", methods=["POST"]) 

339@login_required 

340def research_news_item(card_id: str) -> Dict[str, Any]: 

341 """ 

342 Perform deeper research on a news item. 

343 

344 JSON body: 

345 depth: "quick", "detailed", or "report" (default: "quick") 

346 """ 

347 try: 

348 data = request.get_json() or {} 

349 depth = data.get("depth", "quick") 

350 

351 # Call the API function which handles the research 

352 result = api.research_news_item(card_id, depth) 

353 

354 return jsonify(result) 

355 

356 except Exception as e: 

357 return jsonify( 

358 {"error": safe_error_message(e, "researching news item")} 

359 ), 500 

360 

361 

362@news_api_bp.route("/subscriptions/current", methods=["GET"]) 

363@login_required 

364def get_current_user_subscriptions() -> Dict[str, Any]: 

365 """Get all subscriptions for current user.""" 

366 try: 

367 # Get current user 

368 user_id = get_user_id() 

369 

370 # Ensure we have a database session for the user 

371 # This will trigger register_activity 

372 logger.debug(f"Getting news feed for user {user_id}") 

373 

374 # Use the API function 

375 result = api.get_subscriptions(user_id) 

376 if "error" in result: 

377 logger.error( 

378 f"Error getting subscriptions for user {user_id}: {result['error']}" 

379 ) 

380 return jsonify({"error": "Failed to retrieve subscriptions"}), 500 

381 return jsonify(result) 

382 

383 except Exception as e: 

384 return jsonify( 

385 {"error": safe_error_message(e, "getting subscriptions")} 

386 ), 500 

387 

388 

389@news_api_bp.route("/subscriptions/<subscription_id>", methods=["GET"]) 

390@login_required 

391def get_subscription(subscription_id: str) -> Dict[str, Any]: 

392 """Get a single subscription by ID.""" 

393 try: 

394 # Handle null or invalid subscription IDs 

395 if ( 

396 subscription_id == "null" 

397 or subscription_id == "undefined" 

398 or not subscription_id 

399 ): 

400 return jsonify({"error": "Invalid subscription ID"}), 400 

401 

402 # Get the subscription 

403 subscription = api.get_subscription(subscription_id) 

404 

405 if not subscription: 

406 return jsonify({"error": "Subscription not found"}), 404 

407 

408 return jsonify(subscription) 

409 

410 except Exception as e: 

411 return jsonify( 

412 {"error": safe_error_message(e, "getting subscription")} 

413 ), 500 

414 

415 

416@news_api_bp.route("/subscriptions/<subscription_id>", methods=["PUT"]) 

417@login_required 

418def update_subscription(subscription_id: str) -> Dict[str, Any]: 

419 """Update a subscription.""" 

420 try: 

421 data = request.get_json(force=True) 

422 except Exception: 

423 return jsonify({"error": "Invalid JSON data"}), 400 

424 

425 try: 

426 if not data: 

427 return jsonify({"error": "No JSON data provided"}), 400 

428 

429 # Prepare update data 

430 update_data = {} 

431 

432 # Map fields from request to storage format 

433 field_mapping = { 

434 "query": "query_or_topic", 

435 "name": "name", 

436 "refresh_minutes": "refresh_interval_minutes", 

437 "is_active": "is_active", 

438 "folder_id": "folder_id", 

439 "model_provider": "model_provider", 

440 "model": "model", 

441 "search_strategy": "search_strategy", 

442 "custom_endpoint": "custom_endpoint", 

443 "search_engine": "search_engine", 

444 "search_iterations": "search_iterations", 

445 "questions_per_iteration": "questions_per_iteration", 

446 } 

447 

448 for request_field, storage_field in field_mapping.items(): 

449 if request_field in data: 

450 update_data[storage_field] = data[request_field] 

451 

452 # Update subscription 

453 result = api.update_subscription(subscription_id, update_data) 

454 

455 if "error" in result: 

456 # Sanitize error message before returning to client 

457 original_error = result["error"] 

458 result["error"] = safe_error_message( 

459 Exception(original_error), "updating subscription" 

460 ) 

461 if "not found" in original_error.lower(): 

462 return jsonify(result), 404 

463 else: 

464 return jsonify(result), 400 

465 

466 return jsonify(result) 

467 

468 except Exception as e: 

469 return jsonify( 

470 {"error": safe_error_message(e, "updating subscription")} 

471 ), 500 

472 

473 

474@news_api_bp.route("/subscriptions/<subscription_id>", methods=["DELETE"]) 

475@login_required 

476def delete_subscription(subscription_id: str) -> Dict[str, Any]: 

477 """Delete a subscription.""" 

478 try: 

479 # Call the direct API function 

480 success = api.delete_subscription(subscription_id) 

481 

482 if success: 

483 return jsonify( 

484 { 

485 "status": "success", 

486 "message": f"Subscription {subscription_id} deleted", 

487 } 

488 ) 

489 else: 

490 return jsonify({"error": "Subscription not found"}), 404 

491 

492 except Exception as e: 

493 return jsonify( 

494 {"error": safe_error_message(e, "deleting subscription")} 

495 ), 500 

496 

497 

498@news_api_bp.route("/subscriptions/<subscription_id>/run", methods=["POST"]) 

499@login_required 

500def run_subscription_now(subscription_id: str) -> Dict[str, Any]: 

501 """Manually trigger a subscription to run now.""" 

502 try: 

503 # Get the subscription from the API 

504 subscription_data = api.get_subscriptions("anonymous") 

505 

506 # Find the specific subscription 

507 subscription = None 

508 for sub in subscription_data.get("subscriptions", []): 

509 if sub["id"] == subscription_id: 

510 subscription = sub 

511 break 

512 

513 if not subscription: 

514 return jsonify({"error": "Subscription not found"}), 404 

515 

516 # Get timezone-aware current date using settings 

517 from flask import session 

518 from .core.utils import get_local_date_string 

519 from ..database.session_context import get_user_db_session 

520 from ..settings.manager import SettingsManager 

521 

522 username = session.get("username", "anonymous") 

523 with get_user_db_session(username) as db: 

524 settings_manager = SettingsManager(db) 

525 current_date = get_local_date_string(settings_manager) 

526 

527 # Get the query and update dates 

528 query = subscription["query"] 

529 

530 # Replace YYYY-MM-DD placeholder ONLY (not all dates) 

531 query = query.replace("YYYY-MM-DD", current_date) 

532 

533 # Build request data similar to news page 

534 request_data = { 

535 "query": query, 

536 "mode": "quick", 

537 # Use subscription's model configuration if available 

538 "model_provider": subscription.get( 

539 "model_provider", "OLLAMA" 

540 ), # Default: llm.provider 

541 "model": subscription.get("model", "llama3"), # Default: llm.model 

542 "strategy": subscription.get("search_strategy", "news_aggregation"), 

543 "metadata": { 

544 "is_news_search": True, 

545 "search_type": "news_analysis", 

546 "display_in": "news_feed", 

547 "subscription_id": subscription_id, 

548 "triggered_by": "manual", 

549 "original_query": subscription[ 

550 "query" 

551 ], # Store original query with placeholder 

552 "processed_query": query, # Store processed query with replaced date 

553 "news_date": current_date, # Store the actual date used 

554 "title": subscription.get("name") 

555 if subscription.get("name") 

556 else None, 

557 }, 

558 } 

559 

560 # Add custom endpoint if specified 

561 if subscription.get("custom_endpoint"): 

562 request_data["custom_endpoint"] = subscription["custom_endpoint"] 

563 

564 # Call the main research API endpoint (use the one from research blueprint) 

565 # Use request.host_url to get the actual URL the server is responding on 

566 base_url = request.host_url.rstrip("/") 

567 

568 response = safe_post( 

569 f"{base_url}/research/api/start_research", 

570 json=request_data, 

571 headers={"Content-Type": "application/json"}, 

572 allow_localhost=True, 

573 allow_private_ips=True, 

574 ) 

575 

576 if response.ok: 

577 data = response.json() 

578 if data.get("status") == "success": 

579 return jsonify( 

580 { 

581 "status": "success", 

582 "message": "Research started", 

583 "research_id": data.get("research_id"), 

584 "url": f"/progress/{data.get('research_id')}", 

585 } 

586 ) 

587 else: 

588 return jsonify( 

589 {"error": data.get("message", "Failed to start research")} 

590 ), 500 

591 else: 

592 return jsonify( 

593 {"error": f"Failed to start research: {response.status_code}"} 

594 ), response.status_code 

595 

596 except Exception as e: 

597 return jsonify( 

598 {"error": safe_error_message(e, "running subscription")} 

599 ), 500 

600 

601 

602@news_api_bp.route("/subscriptions/<subscription_id>/history", methods=["GET"]) 

603@login_required 

604def get_subscription_history(subscription_id: str) -> Dict[str, Any]: 

605 """Get research history for a subscription.""" 

606 try: 

607 settings_manager = get_settings_manager() 

608 default_limit = settings_manager.get_setting("news.feed.default_limit") 

609 limit = int(request.args.get("limit", default_limit)) 

610 result = api.get_subscription_history(subscription_id, limit) 

611 if "error" in result: 

612 logger.error( 

613 f"Error getting subscription history: {result['error']}" 

614 ) 

615 return jsonify( 

616 { 

617 "error": "Failed to retrieve subscription history", 

618 "history": [], 

619 } 

620 ), 500 

621 return jsonify(result) 

622 except Exception as e: 

623 return jsonify( 

624 {"error": safe_error_message(e, "getting subscription history")} 

625 ), 500 

626 

627 

628@news_api_bp.route("/preferences", methods=["POST"]) 

629@login_required 

630def save_preferences() -> Dict[str, Any]: 

631 """Save user preferences for news.""" 

632 try: 

633 data = request.get_json() 

634 if not data: 

635 return jsonify({"error": "No JSON data provided"}), 400 

636 

637 # Get current user 

638 user_id = get_user_id() 

639 preferences = data.get("preferences", {}) 

640 

641 # Call the direct API function 

642 result = api.save_news_preferences(user_id, preferences) 

643 

644 return jsonify(result) 

645 

646 except Exception as e: 

647 return jsonify( 

648 {"error": safe_error_message(e, "saving preferences")} 

649 ), 500 

650 

651 

652@news_api_bp.route("/categories", methods=["GET"]) 

653def get_categories() -> Dict[str, Any]: 

654 """Get news category distribution.""" 

655 try: 

656 # Call the direct API function 

657 result = api.get_news_categories() 

658 

659 return jsonify(result) 

660 

661 except Exception as e: 

662 return jsonify( 

663 {"error": safe_error_message(e, "getting categories")} 

664 ), 500 

665 

666 

667@news_api_bp.route("/scheduler/status", methods=["GET"]) 

668def get_scheduler_status() -> Dict[str, Any]: 

669 """Get activity-based scheduler status.""" 

670 try: 

671 logger.info("Scheduler status endpoint called") 

672 from .subscription_manager.scheduler import get_news_scheduler 

673 

674 # Get scheduler instance 

675 scheduler = get_news_scheduler() 

676 logger.info( 

677 f"Scheduler instance obtained: is_running={scheduler.is_running}" 

678 ) 

679 

680 # Build status manually to avoid potential deadlock 

681 status = { 

682 "scheduler_available": True, # APScheduler is installed and working 

683 "is_running": scheduler.is_running, 

684 "config": scheduler.config.copy() 

685 if hasattr(scheduler, "config") 

686 else {}, 

687 "active_users": len(scheduler.user_sessions) 

688 if hasattr(scheduler, "user_sessions") 

689 else 0, 

690 "total_scheduled_jobs": 0, 

691 } 

692 

693 # Count scheduled jobs 

694 if hasattr(scheduler, "user_sessions"): 

695 total_jobs = sum( 

696 len(session.get("scheduled_jobs", set())) 

697 for session in scheduler.user_sessions.values() 

698 ) 

699 status["total_scheduled_jobs"] = total_jobs 

700 

701 # Also count actual APScheduler jobs 

702 if hasattr(scheduler, "scheduler") and scheduler.scheduler: 

703 try: 

704 apscheduler_jobs = scheduler.scheduler.get_jobs() 

705 status["apscheduler_job_count"] = len(apscheduler_jobs) 

706 status["apscheduler_jobs"] = [ 

707 { 

708 "id": job.id, 

709 "name": job.name, 

710 "next_run": job.next_run_time.isoformat() 

711 if job.next_run_time 

712 else None, 

713 } 

714 for job in apscheduler_jobs[ 

715 :10 

716 ] # Limit to first 10 for display 

717 ] 

718 except Exception: 

719 logger.exception("Error getting APScheduler jobs") 

720 status["apscheduler_job_count"] = 0 

721 

722 logger.info(f"Status built: {list(status.keys())}") 

723 

724 # Add scheduled_jobs field that JS expects 

725 status["scheduled_jobs"] = status.get("total_scheduled_jobs", 0) 

726 

727 logger.info( 

728 f"Returning status: is_running={status.get('is_running')}, active_users={status.get('active_users')}" 

729 ) 

730 return jsonify(status) 

731 

732 except Exception as e: 

733 return jsonify( 

734 {"error": safe_error_message(e, "getting scheduler status")} 

735 ), 500 

736 

737 

738@news_api_bp.route("/scheduler/start", methods=["POST"]) 

739@login_required 

740def start_scheduler() -> Dict[str, Any]: 

741 """Start the subscription scheduler.""" 

742 try: 

743 from flask import current_app 

744 from .subscription_manager.scheduler import get_news_scheduler 

745 

746 # Get scheduler instance 

747 scheduler = get_news_scheduler() 

748 

749 if scheduler.is_running: 

750 return jsonify({"message": "Scheduler is already running"}), 200 

751 

752 # Start the scheduler 

753 scheduler.start() 

754 

755 # Update app reference 

756 current_app.news_scheduler = scheduler 

757 

758 logger.info("News scheduler started via API") 

759 return jsonify( 

760 { 

761 "status": "success", 

762 "message": "Scheduler started", 

763 "active_users": len(scheduler.user_sessions), 

764 } 

765 ) 

766 

767 except Exception as e: 

768 return jsonify( 

769 {"error": safe_error_message(e, "starting scheduler")} 

770 ), 500 

771 

772 

773@news_api_bp.route("/scheduler/stop", methods=["POST"]) 

774def stop_scheduler() -> Dict[str, Any]: 

775 """Stop the subscription scheduler.""" 

776 try: 

777 from flask import current_app 

778 

779 if ( 

780 hasattr(current_app, "news_scheduler") 

781 and current_app.news_scheduler 

782 ): 

783 scheduler = current_app.news_scheduler 

784 if scheduler.is_running: 

785 scheduler.stop() 

786 logger.info("News scheduler stopped via API") 

787 return jsonify( 

788 {"status": "success", "message": "Scheduler stopped"} 

789 ) 

790 else: 

791 return jsonify({"message": "Scheduler is not running"}), 200 

792 else: 

793 return jsonify({"message": "No scheduler instance found"}), 404 

794 

795 except Exception as e: 

796 return jsonify( 

797 {"error": safe_error_message(e, "stopping scheduler")} 

798 ), 500 

799 

800 

801@news_api_bp.route("/scheduler/check-now", methods=["POST"]) 

802def check_subscriptions_now() -> Dict[str, Any]: 

803 """Manually trigger subscription checking.""" 

804 try: 

805 from flask import current_app 

806 

807 if ( 

808 not hasattr(current_app, "news_scheduler") 

809 or not current_app.news_scheduler 

810 ): 

811 return jsonify({"error": "Scheduler not initialized"}), 503 

812 

813 scheduler = current_app.news_scheduler 

814 if not scheduler.is_running: 

815 return jsonify({"error": "Scheduler is not running"}), 503 

816 

817 # Run the check subscriptions task immediately 

818 scheduler_instance = current_app.news_scheduler 

819 

820 # Get count of due subscriptions 

821 from ..database.models import NewsSubscription as BaseSubscription 

822 from datetime import datetime, timezone 

823 

824 with get_user_db_session() as session: 

825 now = datetime.now(timezone.utc) 

826 count = ( 

827 session.query(BaseSubscription) 

828 .filter( 

829 BaseSubscription.status == "active", 

830 (BaseSubscription.next_refresh.is_(None)) 

831 | (BaseSubscription.next_refresh <= now), 

832 ) 

833 .count() 

834 ) 

835 

836 # Trigger the check asynchronously 

837 import threading 

838 

839 check_thread = threading.Thread( 

840 target=scheduler_instance._check_subscriptions 

841 ) 

842 check_thread.daemon = True 

843 check_thread.start() 

844 

845 return jsonify( 

846 { 

847 "status": "success", 

848 "message": f"Checking {count} due subscriptions", 

849 "count": count, 

850 } 

851 ) 

852 

853 except Exception as e: 

854 return jsonify( 

855 {"error": safe_error_message(e, "checking subscriptions")} 

856 ), 500 

857 

858 

859@news_api_bp.route("/scheduler/cleanup-now", methods=["POST"]) 

860@login_required 

861def trigger_cleanup() -> Dict[str, Any]: 

862 """Manually trigger cleanup job.""" 

863 try: 

864 from .subscription_manager.scheduler import get_news_scheduler 

865 from datetime import datetime, UTC, timedelta 

866 

867 scheduler = get_news_scheduler() 

868 

869 if not scheduler.is_running: 

870 return jsonify({"error": "Scheduler is not running"}), 400 

871 

872 # Schedule cleanup to run in 1 second 

873 scheduler.scheduler.add_job( 

874 scheduler._run_cleanup_with_tracking, 

875 "date", 

876 run_date=datetime.now(UTC) + timedelta(seconds=1), 

877 id="manual_cleanup_trigger", 

878 ) 

879 

880 return jsonify( 

881 { 

882 "status": "triggered", 

883 "message": "Cleanup job will run within seconds", 

884 } 

885 ) 

886 

887 except Exception as e: 

888 return jsonify( 

889 {"error": safe_error_message(e, "triggering cleanup")} 

890 ), 500 

891 

892 

893@news_api_bp.route("/scheduler/users", methods=["GET"]) 

894@login_required 

895def get_active_users() -> Dict[str, Any]: 

896 """Get summary of active user sessions.""" 

897 try: 

898 from .subscription_manager.scheduler import get_news_scheduler 

899 

900 scheduler = get_news_scheduler() 

901 users_summary = scheduler.get_user_sessions_summary() 

902 

903 return jsonify( 

904 {"active_users": len(users_summary), "users": users_summary} 

905 ) 

906 

907 except Exception as e: 

908 return jsonify( 

909 {"error": safe_error_message(e, "getting active users")} 

910 ), 500 

911 

912 

913@news_api_bp.route("/scheduler/stats", methods=["GET"]) 

914@login_required 

915def scheduler_stats() -> Dict[str, Any]: 

916 """Get scheduler statistics and state.""" 

917 try: 

918 from .subscription_manager.scheduler import get_news_scheduler 

919 from flask import session 

920 

921 scheduler = get_news_scheduler() 

922 username = session.get("username") 

923 

924 # Debug info 

925 debug_info = { 

926 "current_user": username, 

927 "scheduler_running": scheduler.is_running, 

928 "user_sessions": {}, 

929 "apscheduler_jobs": [], 

930 } 

931 

932 # Get user session info 

933 if hasattr(scheduler, "user_sessions"): 

934 for user, session_info in scheduler.user_sessions.items(): 

935 debug_info["user_sessions"][user] = { 

936 "has_password": bool(session_info.get("password")), 

937 "last_activity": session_info.get( 

938 "last_activity" 

939 ).isoformat() 

940 if session_info.get("last_activity") 

941 else None, 

942 "scheduled_jobs_count": len( 

943 session_info.get("scheduled_jobs", set()) 

944 ), 

945 } 

946 

947 # Get APScheduler jobs 

948 if hasattr(scheduler, "scheduler") and scheduler.scheduler: 

949 jobs = scheduler.scheduler.get_jobs() 

950 debug_info["apscheduler_jobs"] = [ 

951 { 

952 "id": job.id, 

953 "name": job.name, 

954 "next_run": job.next_run_time.isoformat() 

955 if job.next_run_time 

956 else None, 

957 "trigger": str(job.trigger), 

958 } 

959 for job in jobs 

960 ] 

961 

962 # Force schedule for current user 

963 if username and username in scheduler.user_sessions: 

964 logger.info(f"Forcing schedule update for {username}") 

965 scheduler._schedule_user_subscriptions(username) 

966 debug_info["forced_schedule"] = True 

967 

968 return jsonify(debug_info) 

969 

970 except Exception as e: 

971 return jsonify( 

972 {"error": safe_error_message(e, "getting scheduler stats")} 

973 ), 500 

974 

975 

976@news_api_bp.route("/check-overdue", methods=["POST"]) 

977@login_required 

978def check_overdue_subscriptions(): 

979 """Check and run all overdue subscriptions for the current user.""" 

980 try: 

981 from flask import session 

982 from ..database.session_context import get_user_db_session 

983 from ..database.models.news import NewsSubscription 

984 from datetime import datetime, UTC, timedelta 

985 

986 username = session.get("username", "anonymous") 

987 

988 # Get overdue subscriptions 

989 overdue_count = 0 

990 results = [] 

991 with get_user_db_session(username) as db: 

992 now = datetime.now(UTC) 

993 overdue_subs = ( 

994 db.query(NewsSubscription) 

995 .filter( 

996 NewsSubscription.status == "active", 

997 NewsSubscription.next_refresh <= now, 

998 ) 

999 .all() 

1000 ) 

1001 

1002 logger.info( 

1003 f"Found {len(overdue_subs)} overdue subscriptions for {username}" 

1004 ) 

1005 

1006 # Get timezone-aware current date using settings 

1007 from .core.utils import get_local_date_string 

1008 from ..settings.manager import SettingsManager 

1009 

1010 settings_manager = SettingsManager(db) 

1011 current_date = get_local_date_string(settings_manager) 

1012 

1013 for sub in overdue_subs: 

1014 try: 

1015 # Run the subscription using the same pattern as run_subscription_now 

1016 logger.info( 

1017 f"Running overdue subscription: {sub.name or sub.query_or_topic[:30]}" 

1018 ) 

1019 

1020 # Update any date placeholders with current date in user's timezone 

1021 query = sub.query_or_topic.replace( 

1022 "YYYY-MM-DD", current_date 

1023 ) 

1024 

1025 # Build request data 

1026 request_data = { 

1027 "query": query, 

1028 "mode": "quick", 

1029 "model_provider": sub.model_provider or "OLLAMA", 

1030 "model": sub.model or "llama3", 

1031 "strategy": sub.search_strategy or "news_aggregation", 

1032 "metadata": { 

1033 "is_news_search": True, 

1034 "search_type": "news_analysis", 

1035 "display_in": "news_feed", 

1036 "subscription_id": str(sub.id), 

1037 "triggered_by": "overdue_check", 

1038 "original_query": sub.query_or_topic, 

1039 "processed_query": query, 

1040 "news_date": current_date, 

1041 "title": sub.name if sub.name else None, 

1042 }, 

1043 } 

1044 

1045 # Add optional search parameters 

1046 if sub.search_engine: 

1047 request_data["search_engine"] = sub.search_engine 

1048 if sub.custom_endpoint: 

1049 request_data["custom_endpoint"] = sub.custom_endpoint 

1050 

1051 # Start research using HTTP request like run_subscription_now 

1052 logger.info( 

1053 f"Running overdue subscription: {sub.name or sub.query_or_topic[:30]}" 

1054 ) 

1055 

1056 # Make HTTP request to research API 

1057 from flask import request 

1058 

1059 # Use request.host_url to get the actual URL the server is responding on 

1060 base_url = request.host_url.rstrip("/") 

1061 

1062 # Use the session from the current request to maintain authentication 

1063 session_cookie = request.cookies.get("session") 

1064 

1065 response = safe_post( 

1066 f"{base_url}/research/api/start_research", 

1067 json=request_data, 

1068 headers={ 

1069 "Content-Type": "application/json", 

1070 "Cookie": f"session={session_cookie}" 

1071 if session_cookie 

1072 else "", 

1073 }, 

1074 timeout=30, 

1075 allow_localhost=True, 

1076 allow_private_ips=True, 

1077 ) 

1078 

1079 if response.ok: 

1080 result = response.json() 

1081 else: 

1082 result = { 

1083 "status": "error", 

1084 "error": f"HTTP {response.status_code}: {response.text}", 

1085 } 

1086 

1087 if result.get("status") == "success": 

1088 overdue_count += 1 

1089 

1090 # Update subscription's last/next refresh times 

1091 sub.last_refresh = datetime.now(UTC) 

1092 sub.next_refresh = datetime.now(UTC) + timedelta( 

1093 minutes=sub.refresh_interval_minutes 

1094 ) 

1095 db.commit() 

1096 

1097 results.append( 

1098 { 

1099 "id": str(sub.id), 

1100 "name": sub.name or sub.query_or_topic[:50], 

1101 "research_id": result.get("research_id"), 

1102 } 

1103 ) 

1104 else: 

1105 results.append( 

1106 { 

1107 "id": str(sub.id), 

1108 "name": sub.name or sub.query_or_topic[:50], 

1109 "error": result.get( 

1110 "error", "Failed to start research" 

1111 ), 

1112 } 

1113 ) 

1114 except Exception as e: 

1115 logger.exception( 

1116 f"Error running subscription {sub.id}: {e}" 

1117 ) 

1118 results.append( 

1119 { 

1120 "id": str(sub.id), 

1121 "name": sub.name or sub.query_or_topic[:50], 

1122 "error": safe_error_message( 

1123 e, "running subscription" 

1124 ), 

1125 } 

1126 ) 

1127 

1128 return jsonify( 

1129 { 

1130 "status": "success", 

1131 "overdue_found": len(overdue_subs), 

1132 "started": overdue_count, 

1133 "results": results, 

1134 } 

1135 ) 

1136 

1137 except Exception as e: 

1138 return jsonify( 

1139 {"error": safe_error_message(e, "checking overdue subscriptions")} 

1140 ), 500 

1141 

1142 

1143# Folder and subscription management routes 

1144@news_api_bp.route("/subscription/folders", methods=["GET"]) 

1145@login_required 

1146def get_folders(): 

1147 """Get all folders for the current user""" 

1148 try: 

1149 user_id = get_user_id() 

1150 

1151 with get_user_db_session() as session: 

1152 manager = FolderManager(session) 

1153 folders = manager.get_user_folders(user_id) 

1154 

1155 return jsonify([folder.to_dict() for folder in folders]) 

1156 

1157 except Exception as e: 

1158 return jsonify({"error": safe_error_message(e, "getting folders")}), 500 

1159 

1160 

1161@news_api_bp.route("/subscription/folders", methods=["POST"]) 

1162@login_required 

1163def create_folder(): 

1164 """Create a new folder""" 

1165 try: 

1166 data = request.json 

1167 

1168 if not data.get("name"): 

1169 return jsonify({"error": "Folder name is required"}), 400 

1170 

1171 with get_user_db_session() as session: 

1172 manager = FolderManager(session) 

1173 

1174 # Check if folder already exists 

1175 existing = ( 

1176 session.query(SubscriptionFolder) 

1177 .filter_by(name=data["name"]) 

1178 .first() 

1179 ) 

1180 if existing: 

1181 return jsonify({"error": "Folder already exists"}), 409 

1182 

1183 folder = manager.create_folder( 

1184 name=data["name"], 

1185 description=data.get("description"), 

1186 ) 

1187 

1188 return jsonify(folder.to_dict()), 201 

1189 

1190 except Exception as e: 

1191 return jsonify({"error": safe_error_message(e, "creating folder")}), 500 

1192 

1193 

1194@news_api_bp.route("/subscription/folders/<folder_id>", methods=["PUT"]) 

1195@login_required 

1196def update_folder(folder_id): 

1197 """Update a folder""" 

1198 try: 

1199 data = request.json 

1200 

1201 with get_user_db_session() as session: 

1202 manager = FolderManager(session) 

1203 folder = manager.update_folder(folder_id, **data) 

1204 

1205 if not folder: 

1206 return jsonify({"error": "Folder not found"}), 404 

1207 

1208 return jsonify(folder.to_dict()) 

1209 

1210 except Exception as e: 

1211 return jsonify({"error": safe_error_message(e, "updating folder")}), 500 

1212 

1213 

1214@news_api_bp.route("/subscription/folders/<folder_id>", methods=["DELETE"]) 

1215@login_required 

1216def delete_folder(folder_id): 

1217 """Delete a folder""" 

1218 try: 

1219 move_to = request.args.get("move_to") 

1220 

1221 with get_user_db_session() as session: 

1222 manager = FolderManager(session) 

1223 success = manager.delete_folder(folder_id, move_to) 

1224 

1225 if not success: 

1226 return jsonify({"error": "Folder not found"}), 404 

1227 

1228 return jsonify({"status": "deleted"}), 200 

1229 

1230 except Exception as e: 

1231 return jsonify({"error": safe_error_message(e, "deleting folder")}), 500 

1232 

1233 

1234@news_api_bp.route("/subscription/subscriptions/organized", methods=["GET"]) 

1235@login_required 

1236def get_subscriptions_organized(): 

1237 """Get subscriptions organized by folder""" 

1238 try: 

1239 user_id = get_user_id() 

1240 

1241 with get_user_db_session() as session: 

1242 manager = FolderManager(session) 

1243 organized = manager.get_subscriptions_by_folder(user_id) 

1244 

1245 # Convert to JSON-friendly format 

1246 result = {} 

1247 for folder, subs in organized.items(): 

1248 result[folder] = [sub.to_dict() for sub in subs] 

1249 

1250 return jsonify(result) 

1251 

1252 except Exception as e: 

1253 return jsonify( 

1254 {"error": safe_error_message(e, "getting organized subscriptions")} 

1255 ), 500 

1256 

1257 

1258@news_api_bp.route( 

1259 "/subscription/subscriptions/<subscription_id>", methods=["PUT"] 

1260) 

1261@login_required 

1262def update_subscription_folder(subscription_id): 

1263 """Update a subscription (mainly for folder assignment)""" 

1264 try: 

1265 data = request.json 

1266 logger.info( 

1267 f"Updating subscription {subscription_id} with data: {data}" 

1268 ) 

1269 

1270 with get_user_db_session() as session: 

1271 # Manually handle the update to ensure next_refresh is recalculated 

1272 from ...database.models import NewsSubscription as BaseSubscription 

1273 from datetime import datetime, timedelta, timezone 

1274 

1275 sub = ( 

1276 session.query(BaseSubscription) 

1277 .filter_by(id=subscription_id) 

1278 .first() 

1279 ) 

1280 if not sub: 

1281 return jsonify({"error": "Subscription not found"}), 404 

1282 

1283 # Update fields 

1284 for key, value in data.items(): 

1285 if hasattr(sub, key) and key not in [ 

1286 "id", 

1287 "user_id", 

1288 "created_at", 

1289 ]: 

1290 setattr(sub, key, value) 

1291 

1292 # Recalculate next_refresh if refresh_interval_minutes changed 

1293 if "refresh_interval_minutes" in data: 

1294 new_minutes = data["refresh_interval_minutes"] 

1295 if sub.last_refresh: 

1296 sub.next_refresh = sub.last_refresh + timedelta( 

1297 minutes=new_minutes 

1298 ) 

1299 else: 

1300 sub.next_refresh = datetime.now(timezone.utc) + timedelta( 

1301 minutes=new_minutes 

1302 ) 

1303 logger.info(f"Recalculated next_refresh: {sub.next_refresh}") 

1304 

1305 sub.updated_at = datetime.now(timezone.utc) 

1306 session.commit() 

1307 

1308 result = sub.to_dict() 

1309 logger.info( 

1310 f"Updated subscription result: refresh_interval_minutes={result.get('refresh_interval_minutes')}, next_refresh={result.get('next_refresh')}" 

1311 ) 

1312 return jsonify(result) 

1313 # Force reload: v2 

1314 

1315 except Exception as e: 

1316 return jsonify( 

1317 {"error": safe_error_message(e, "updating subscription")} 

1318 ), 500 

1319 

1320 

1321@news_api_bp.route("/subscription/stats", methods=["GET"]) 

1322@login_required 

1323def get_subscription_stats(): 

1324 """Get subscription statistics""" 

1325 try: 

1326 user_id = get_user_id() 

1327 

1328 with get_user_db_session() as session: 

1329 manager = FolderManager(session) 

1330 stats = manager.get_subscription_stats(user_id) 

1331 

1332 return jsonify(stats) 

1333 

1334 except Exception as e: 

1335 return jsonify({"error": safe_error_message(e, "getting stats")}), 500 

1336 

1337 

1338# Error handlers 

1339@news_api_bp.errorhandler(400) 

1340def bad_request(e): 

1341 return jsonify({"error": "Bad request"}), 400 

1342 

1343 

1344@news_api_bp.errorhandler(404) 

1345def not_found(e): 

1346 return jsonify({"error": "Resource not found"}), 404 

1347 

1348 

1349@news_api_bp.errorhandler(500) 

1350def internal_error(e): 

1351 return jsonify({"error": "Internal server error"}), 500 

1352 

1353 

1354@news_api_bp.route("/search-history", methods=["GET"]) 

1355@login_required 

1356def get_search_history(): 

1357 """Get search history for current user.""" 

1358 try: 

1359 # Get username from session 

1360 from ..web.auth.decorators import current_user 

1361 

1362 username = current_user() 

1363 if not username: 

1364 # Not authenticated, return empty history 

1365 return jsonify({"search_history": []}) 

1366 

1367 # Get search history from user's encrypted database 

1368 from ..database.session_context import get_user_db_session 

1369 from ..database.models import UserNewsSearchHistory 

1370 

1371 # Get password from Flask g object (set by middleware) 

1372 from flask import g 

1373 

1374 password = getattr(g, "user_password", None) 

1375 

1376 with get_user_db_session(username, password) as db_session: 

1377 history = ( 

1378 db_session.query(UserNewsSearchHistory) 

1379 .order_by(UserNewsSearchHistory.created_at.desc()) 

1380 .limit(20) 

1381 .all() 

1382 ) 

1383 

1384 return jsonify( 

1385 {"search_history": [item.to_dict() for item in history]} 

1386 ) 

1387 

1388 except Exception as e: 

1389 return jsonify( 

1390 {"error": safe_error_message(e, "getting search history")} 

1391 ), 500 

1392 

1393 

1394@news_api_bp.route("/search-history", methods=["POST"]) 

1395@login_required 

1396def add_search_history(): 

1397 """Add a search to the history.""" 

1398 try: 

1399 # Get username from session 

1400 from ..web.auth.decorators import current_user 

1401 

1402 username = current_user() 

1403 if not username: 

1404 # Not authenticated 

1405 return jsonify({"error": "Authentication required"}), 401 

1406 

1407 data = request.get_json() 

1408 logger.info(f"add_search_history received data: {data}") 

1409 if not data or not data.get("query"): 

1410 logger.warning(f"Invalid search history data: {data}") 

1411 return jsonify({"error": "query is required"}), 400 

1412 

1413 # Add to user's encrypted database 

1414 from ..database.session_context import get_user_db_session 

1415 from ..database.models import UserNewsSearchHistory 

1416 

1417 # Get password from Flask g object (set by middleware) 

1418 from flask import g 

1419 

1420 password = getattr(g, "user_password", None) 

1421 

1422 with get_user_db_session(username, password) as db_session: 

1423 search_history = UserNewsSearchHistory( 

1424 query=data["query"], 

1425 search_type=data.get("type", "filter"), 

1426 result_count=data.get("resultCount", 0), 

1427 ) 

1428 db_session.add(search_history) 

1429 db_session.commit() 

1430 

1431 return jsonify({"status": "success", "id": search_history.id}) 

1432 

1433 except Exception as e: 

1434 logger.exception("Error adding search history") 

1435 return jsonify( 

1436 {"error": safe_error_message(e, "adding search history")} 

1437 ), 500 

1438 

1439 

1440@news_api_bp.route("/search-history", methods=["DELETE"]) 

1441@login_required 

1442def clear_search_history(): 

1443 """Clear all search history for current user.""" 

1444 try: 

1445 # Get username from session 

1446 from ..web.auth.decorators import current_user 

1447 

1448 username = current_user() 

1449 if not username: 

1450 return jsonify({"status": "success"}) 

1451 

1452 # Clear from user's encrypted database 

1453 from ..database.session_context import get_user_db_session 

1454 from ..database.models import UserNewsSearchHistory 

1455 

1456 # Get password from Flask g object (set by middleware) 

1457 from flask import g 

1458 

1459 password = getattr(g, "user_password", None) 

1460 

1461 with get_user_db_session(username, password) as db_session: 

1462 db_session.query(UserNewsSearchHistory).delete() 

1463 db_session.commit() 

1464 

1465 return jsonify({"status": "success"}) 

1466 

1467 except Exception as e: 

1468 return jsonify( 

1469 {"error": safe_error_message(e, "clearing search history")} 

1470 ), 500 

1471 

1472 

1473@news_api_bp.route("/debug", methods=["GET"]) 

1474def debug_database(): 

1475 """Debug endpoint to check database content.""" 

1476 try: 

1477 user_id = get_user_id() 

1478 result = api.debug_research_items(user_id) 

1479 if "error" in result: 

1480 logger.error( 

1481 f"Debug endpoint error for user {user_id}: {result['error']}" 

1482 ) 

1483 return jsonify({"error": "Internal server error"}), 500 

1484 return jsonify(result) 

1485 except Exception: 

1486 logger.exception("Exception in debug endpoint") 

1487 return jsonify({"error": "Internal server error"}), 500