Coverage for src / local_deep_research / news / api.py: 87%

404 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Direct API functions for news system. 

3These functions can be called directly by scheduler or wrapped by Flask endpoints. 

4""" 

5 

6from typing import Dict, Any, Optional 

7from datetime import datetime, timezone, UTC 

8from loguru import logger 

9import re 

10import json 

11 

12from ..constants import ResearchStatus 

13from ..llm.providers.base import normalize_provider 

14from .recommender.topic_based import TopicBasedRecommender 

15from .exceptions import ( 

16 InvalidLimitException, 

17 SubscriptionNotFoundException, 

18 SubscriptionCreationException, 

19 SubscriptionUpdateException, 

20 SubscriptionDeletionException, 

21 DatabaseAccessException, 

22 NewsFeedGenerationException, 

23 NotImplementedException, 

24 NewsAPIException, 

25) 

26# Removed welcome feed import - no placeholders 

27# get_db_setting not available in merged codebase 

28 

29 

30# Global recommender instance (can be reused) 

31_recommender = None 

32 

33 

34def get_recommender(): 

35 """Get or create recommender instance""" 

36 global _recommender 

37 if _recommender is None: 

38 _recommender = TopicBasedRecommender() 

39 return _recommender 

40 

41 

42def _notify_scheduler_about_subscription_change( 

43 action: str, user_id: Optional[str] = None 

44): 

45 """ 

46 Notify the scheduler about subscription changes. 

47 

48 Args: 

49 action: The action performed (created, updated, deleted) 

50 user_id: Optional user_id to use as fallback for username 

51 """ 

52 try: 

53 from flask import session as flask_session 

54 from .subscription_manager.scheduler import get_news_scheduler 

55 

56 scheduler = get_news_scheduler() 

57 if scheduler.is_running: 

58 # Get username, with optional fallback to user_id 

59 username = flask_session.get("username") 

60 if not username and user_id: 

61 username = user_id 

62 

63 # Get password from session password store 

64 from ..database.session_passwords import session_password_store 

65 

66 session_id = flask_session.get("session_id") 

67 password = None 

68 if session_id and username: 68 ↛ 73line 68 didn't jump to line 73 because the condition on line 68 was always true

69 password = session_password_store.get_session_password( 

70 username, session_id 

71 ) 

72 

73 if password and username: 

74 # Update scheduler to reschedule subscriptions 

75 scheduler.update_user_info(username, password) 

76 logger.info( 

77 f"Scheduler notified about {action} subscription for {username}" 

78 ) 

79 else: 

80 logger.warning( 

81 f"Could not notify scheduler - no password available{' for ' + username if username else ''}" 

82 ) 

83 except Exception: 

84 logger.exception( 

85 f"Could not notify scheduler about {action} subscription" 

86 ) 

87 

88 

89def get_news_feed( 

90 user_id: str = "anonymous", 

91 limit: int = 20, 

92 use_cache: bool = True, 

93 focus: Optional[str] = None, 

94 search_strategy: Optional[str] = None, 

95 subscription_id: Optional[str] = None, 

96) -> Dict[str, Any]: 

97 """ 

98 Get personalized news feed by pulling from news_items table first, then research history. 

99 

100 Args: 

101 user_id: User identifier 

102 limit: Maximum number of cards to return 

103 use_cache: Whether to use cached news 

104 focus: Optional focus area for news 

105 search_strategy: Override default recommendation strategy 

106 

107 Returns: 

108 Dictionary with news items and metadata 

109 """ 

110 # Validate limit - allow any positive number 

111 if limit < 1: 

112 raise InvalidLimitException(limit) 

113 

114 try: 

115 logger.info( 

116 f"get_news_feed called with user_id={user_id}, limit={limit}" 

117 ) 

118 

119 # News is always enabled for now - per-user settings will be handled later 

120 # if not get_db_setting("news.enabled", True): 

121 # return {"error": "News system is disabled", "news_items": []} 

122 

123 # Import database functions 

124 from ..database.session_context import get_user_db_session 

125 from ..database.models import ResearchHistory 

126 

127 news_items = [] 

128 remaining_limit = limit 

129 

130 # Query research history from user's database for news items 

131 logger.info("Getting news items from research history") 

132 try: 

133 # Use the user_id provided to the function 

134 with get_user_db_session(user_id) as db_session: 

135 # Build query using ORM 

136 query = db_session.query(ResearchHistory).filter( 

137 ResearchHistory.status == ResearchStatus.COMPLETED 

138 ) 

139 

140 # Filter by subscription if provided 

141 if subscription_id and subscription_id != "all": 

142 # Use JSON containment for PostgreSQL or LIKE for SQLite 

143 query = query.filter( 

144 ResearchHistory.research_meta.like( 

145 f'%"subscription_id":"{subscription_id}"%' 

146 ) 

147 ) 

148 

149 # Order by creation date and limit 

150 results = ( 

151 query.order_by(ResearchHistory.created_at.desc()) 

152 .limit(remaining_limit * 2) 

153 .all() 

154 ) 

155 

156 # Convert ORM objects to dictionaries for compatibility 

157 results = [ 

158 { 

159 "id": r.id, 

160 "uuid_id": r.id, # In ResearchHistory, id is the UUID 

161 "query": r.query, 

162 "title": r.title 

163 if hasattr(r, "title") 

164 else None, # Include title field if exists 

165 "created_at": r.created_at if r.created_at else None, 

166 "completed_at": r.completed_at 

167 if r.completed_at 

168 else None, 

169 "duration_seconds": r.duration_seconds 

170 if hasattr(r, "duration_seconds") 

171 else None, 

172 "report_path": r.report_path 

173 if hasattr(r, "report_path") 

174 else None, 

175 "report_content": r.report_content 

176 if hasattr(r, "report_content") 

177 else None, # Include database content 

178 "research_meta": r.research_meta, 

179 "status": r.status, 

180 } 

181 for r in results 

182 ] 

183 

184 logger.info(f"Database returned {len(results)} research items") 

185 if results and len(results) > 0: 

186 logger.info(f"First row keys: {list(results[0].keys())}") 

187 # Log first few items' metadata 

188 for i, row in enumerate(results[:3]): 

189 logger.info( 

190 f"Item {i}: query='{row['query'][:50]}...', has meta={bool(row.get('research_meta'))}" 

191 ) 

192 

193 # Process results to find news items 

194 processed_count = 0 

195 error_count = 0 

196 

197 for row in results: 

198 try: 

199 # Parse metadata 

200 metadata = {} 

201 if row.get("research_meta"): 

202 try: 

203 # Handle both dict and string formats 

204 if isinstance(row["research_meta"], dict): 

205 metadata = row["research_meta"] 

206 else: 

207 metadata = json.loads(row["research_meta"]) 

208 except (json.JSONDecodeError, TypeError): 

209 logger.exception("Error parsing metadata") 

210 metadata = {} 

211 

212 # Check if this has news metadata (generated_headline or generated_topics) 

213 # or if it's a news-related query 

214 has_news_metadata = ( 

215 metadata.get("generated_headline") is not None 

216 or metadata.get("generated_topics") is not None 

217 ) 

218 

219 query_lower = row["query"].lower() 

220 is_news_query = ( 

221 has_news_metadata 

222 or metadata.get("is_news_search") 

223 or metadata.get("search_type") == "news_analysis" 

224 or "breaking news" in query_lower 

225 or "news stories" in query_lower 

226 or ( 

227 "today" in query_lower 

228 and ( 

229 "news" in query_lower 

230 or "breaking" in query_lower 

231 ) 

232 ) 

233 or "latest news" in query_lower 

234 ) 

235 

236 # Log the decision for first few items 

237 if processed_count < 3 or error_count < 3: 237 ↛ 244line 237 didn't jump to line 244 because the condition on line 237 was always true

238 logger.info( 

239 f"Item check: query='{row['query'][:30]}...', is_news_search={metadata.get('is_news_search')}, " 

240 f"has_news_metadata={has_news_metadata}, is_news_query={is_news_query}" 

241 ) 

242 

243 # Only show items that have news metadata or are news queries 

244 if is_news_query: 

245 processed_count += 1 

246 logger.info( 

247 f"Processing research item #{processed_count}: {row['query'][:50]}..." 

248 ) 

249 

250 # Always use database content 

251 findings = "" 

252 summary = "" 

253 report_content_db = row.get( 

254 "report_content" 

255 ) # Get database content 

256 

257 # Use database content 

258 content = report_content_db 

259 if content: 

260 logger.debug( 

261 f"Using database content for research {row['id']}" 

262 ) 

263 

264 # Process database content 

265 lines = content.split("\n") if content else [] 

266 # Use full content as findings 

267 findings = content 

268 # Extract summary from first non-empty line 

269 for line in lines: 269 ↛ 279line 269 didn't jump to line 279 because the loop on line 269 didn't complete

270 if line.strip() and not line.startswith("#"): 

271 summary = line.strip() 

272 break 

273 else: 

274 logger.debug( 

275 f"No database content for research {row['id']}" 

276 ) 

277 

278 # Use stored headline/topics if available, otherwise generate 

279 original_query = row["query"] 

280 

281 # Check for headline - first try database title, then metadata 

282 headline = row.get("title") or metadata.get( 

283 "generated_headline" 

284 ) 

285 

286 # For subscription results, generate headline from query if needed 

287 if not headline and metadata.get("is_news_search"): 

288 # Use subscription name or query as headline 

289 subscription_name = metadata.get( 

290 "subscription_name" 

291 ) 

292 if subscription_name: 

293 headline = f"News Update: {subscription_name}" 

294 else: 

295 # Generate headline from query 

296 headline = f"News: {row['query'][:60]}..." 

297 

298 # Skip items without meaningful headlines or that are incomplete 

299 if ( 

300 not headline 

301 or headline == "[No headline available]" 

302 ): 

303 logger.debug( 

304 f"Skipping item without headline: {row['id']}" 

305 ) 

306 continue 

307 

308 # Skip items that are still in progress or suspended 

309 if row["status"] in ( 

310 ResearchStatus.IN_PROGRESS, 

311 ResearchStatus.SUSPENDED, 

312 ): 

313 logger.debug( 

314 f"Skipping incomplete item: {row['id']} (status: {row['status']})" 

315 ) 

316 continue 

317 

318 # Skip items without content (neither file nor database) 

319 if not content: 

320 logger.debug( 

321 f"Skipping item without content: {row['id']}" 

322 ) 

323 continue 

324 

325 # Use ID properly, preferring uuid_id 

326 research_id = row.get("uuid_id") or str(row["id"]) 

327 

328 # Use stored category and topics - no defaults 

329 category = metadata.get("category") 

330 if not category: 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true

331 category = "[Uncategorized]" 

332 

333 topics = metadata.get("generated_topics") 

334 if not topics: 334 ↛ 338line 334 didn't jump to line 338 because the condition on line 334 was always true

335 topics = ["[No topics]"] 

336 

337 # Extract top 3 links from the database content 

338 links = [] 

339 if content: 339 ↛ 391line 339 didn't jump to line 391 because the condition on line 339 was always true

340 try: 

341 report_lines = content.split("\n") 

342 link_count = 0 

343 for i, line in enumerate( 

344 report_lines[:100] 

345 ): # Check first 100 lines for links 

346 if "URL:" in line: 

347 url = line.split("URL:", 1)[1].strip() 

348 if url.startswith("http"): 348 ↛ 343line 348 didn't jump to line 343 because the condition on line 348 was always true

349 # Get the title from the previous line if available 

350 title = "" 

351 if i > 0: 351 ↛ 362line 351 didn't jump to line 362 because the condition on line 351 was always true

352 title_line = report_lines[ 

353 i - 1 

354 ].strip() 

355 # Remove citation numbers like [12, 26, 19] 

356 title = re.sub( 

357 r"^\[[^\]]+\]\s*", 

358 "", 

359 title_line, 

360 ).strip() 

361 

362 if not title: 362 ↛ 364line 362 didn't jump to line 364 because the condition on line 362 was never true

363 # Use domain as fallback 

364 domain = url.split("//")[ 

365 -1 

366 ].split("/")[0] 

367 title = domain.replace( 

368 "www.", "" 

369 ) 

370 

371 links.append( 

372 { 

373 "url": url, 

374 "title": title[:50] + "..." 

375 if len(title) > 50 

376 else title, 

377 } 

378 ) 

379 link_count += 1 

380 logger.debug( 

381 f"Found link: {title} - {url}" 

382 ) 

383 if link_count >= 3: 383 ↛ 384line 383 didn't jump to line 384 because the condition on line 383 was never true

384 break 

385 except Exception: 

386 logger.exception( 

387 "Error extracting links from database content" 

388 ) 

389 

390 # Create news item from research 

391 news_item = { 

392 "id": f"news-{research_id}", 

393 "headline": headline, 

394 "category": category, 

395 "summary": summary 

396 or f"Research analysis for: {headline[:100]}", 

397 "findings": findings, 

398 "impact_score": metadata.get( 

399 "impact_score", 0 

400 ), # 0 indicates missing 

401 "time_ago": _format_time_ago(row["created_at"]), 

402 "upvotes": metadata.get("upvotes", 0), 

403 "downvotes": metadata.get("downvotes", 0), 

404 "source_url": f"/results/{research_id}", 

405 "topics": topics, # Use generated topics 

406 "links": links, # Add extracted links 

407 "research_id": research_id, 

408 "created_at": row["created_at"], 

409 "duration_seconds": row.get("duration_seconds", 0), 

410 "original_query": original_query, # Keep original query for reference 

411 "is_news": metadata.get( 

412 "is_news_search", False 

413 ), # Flag for news searches 

414 "news_date": metadata.get( 

415 "news_date" 

416 ), # If specific date for news 

417 "news_source": metadata.get( 

418 "news_source" 

419 ), # If from specific source 

420 "priority": metadata.get( 

421 "priority", "normal" 

422 ), # Priority level 

423 } 

424 

425 news_items.append(news_item) 

426 logger.info(f"Added news item: {headline[:50]}...") 

427 

428 if len(news_items) >= limit: 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true

429 break 

430 

431 except Exception: 

432 error_count += 1 

433 logger.exception( 

434 f"Error processing research item with query: {row.get('query', 'UNKNOWN')[:100]}" 

435 ) 

436 continue 

437 

438 logger.info( 

439 f"Processing summary: total_results={len(results)}, processed={processed_count}, " 

440 f"errors={error_count}, added={len(news_items)}" 

441 ) 

442 

443 # Log subscription-specific items if we were filtering 

444 if subscription_id and subscription_id != "all": 

445 sub_items = [ 

446 item for item in news_items if item.get("is_news", False) 

447 ] 

448 logger.info( 

449 f"Subscription {subscription_id}: found {len(sub_items)} items" 

450 ) 

451 

452 except Exception as db_error: 

453 logger.exception(f"Database error in research history: {db_error}") 

454 raise DatabaseAccessException( 

455 "research_history_query", str(db_error) 

456 ) 

457 

458 # If no news items found, return empty list 

459 if not news_items: 

460 logger.info("No news items found, returning empty list") 

461 news_items = [] 

462 

463 logger.info(f"Returning {len(news_items)} news items to client") 

464 

465 # Determine the source 

466 source = ( 

467 "news_items" 

468 if any(item.get("is_news", False) for item in news_items) 

469 else "research_history" 

470 ) 

471 

472 return { 

473 "news_items": news_items[:limit], 

474 "generated_at": datetime.now(timezone.utc).isoformat(), 

475 "focus": focus, 

476 "search_strategy": search_strategy or "default", 

477 "total_items": len(news_items), 

478 "source": source, 

479 } 

480 

481 except NewsAPIException: 

482 # Re-raise our custom exceptions 

483 raise 

484 except Exception as e: 

485 logger.exception("Error getting news feed") 

486 raise NewsFeedGenerationException(str(e), user_id=user_id) 

487 

488 

489def get_subscription_history( 

490 subscription_id: str, limit: int = 20 

491) -> Dict[str, Any]: 

492 """ 

493 Get research history for a specific subscription. 

494 

495 Args: 

496 subscription_id: The subscription UUID 

497 limit: Maximum number of history items to return 

498 

499 Returns: 

500 Dict containing subscription info and its research history 

501 """ 

502 try: 

503 from ..database.session_context import get_user_db_session 

504 from ..database.models import ResearchHistory 

505 from ..database.models.news import NewsSubscription 

506 

507 # Get subscription details using ORM from user's encrypted database 

508 with get_user_db_session() as session: 

509 subscription = ( 

510 session.query(NewsSubscription) 

511 .filter_by(id=subscription_id) 

512 .first() 

513 ) 

514 

515 if not subscription: 

516 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException 

517 

518 # Convert to dict for response 

519 subscription_dict = { 

520 "id": subscription.id, 

521 "query_or_topic": subscription.query_or_topic, 

522 "subscription_type": subscription.subscription_type, 

523 "refresh_interval_minutes": subscription.refresh_interval_minutes, 

524 "refresh_count": subscription.refresh_count or 0, 

525 "created_at": subscription.created_at.isoformat() 

526 if subscription.created_at 

527 else None, 

528 "next_refresh": subscription.next_refresh.isoformat() 

529 if subscription.next_refresh 

530 else None, 

531 } 

532 

533 # Now get research history from the research database 

534 # Get user_id from subscription 

535 sub_user_id = subscription_dict.get("user_id", "anonymous") 

536 

537 with get_user_db_session(sub_user_id) as db_session: 

538 # Get all research runs that were triggered by this subscription 

539 # Look for subscription_id in the research_meta JSON 

540 # Note: JSON format has space after colon 

541 like_pattern = f'%"subscription_id": "{subscription_id}"%' 

542 logger.info( 

543 f"Searching for research history with pattern: {like_pattern}" 

544 ) 

545 

546 history_items = ( 

547 db_session.query(ResearchHistory) 

548 .filter(ResearchHistory.research_meta.like(like_pattern)) 

549 .order_by(ResearchHistory.created_at.desc()) 

550 .limit(limit) 

551 .all() 

552 ) 

553 

554 # Convert to dict format for compatibility 

555 history_items = [ 

556 { 

557 "id": h.id, 

558 "uuid_id": h.uuid_id, 

559 "query": h.query, 

560 "status": h.status, 

561 "created_at": h.created_at.isoformat() 

562 if h.created_at 

563 else None, 

564 "completed_at": h.completed_at.isoformat() 

565 if h.completed_at 

566 else None, 

567 "duration_seconds": h.duration_seconds, 

568 "research_meta": h.research_meta, 

569 "report_path": h.report_path, 

570 } 

571 for h in history_items 

572 ] 

573 

574 # Process history items 

575 processed_history = [] 

576 for item in history_items: 

577 processed_item = { 

578 "research_id": item.get("uuid_id") or str(item.get("id")), 

579 "query": item["query"], 

580 "status": item["status"], 

581 "created_at": item["created_at"], 

582 "completed_at": item.get("completed_at"), 

583 "duration_seconds": item.get("duration_seconds", 0), 

584 "url": f"/progress/{item.get('uuid_id') or item.get('id')}", 

585 } 

586 

587 # Parse metadata if available to get headline and topics 

588 if item.get("research_meta"): 588 ↛ 603line 588 didn't jump to line 603 because the condition on line 588 was always true

589 try: 

590 meta = json.loads(item["research_meta"]) 

591 processed_item["triggered_by"] = meta.get( 

592 "triggered_by", "subscription" 

593 ) 

594 # Add headline and topics from metadata 

595 processed_item["headline"] = meta.get( 

596 "generated_headline", "[No headline]" 

597 ) 

598 processed_item["topics"] = meta.get("generated_topics", []) 

599 except Exception: 

600 processed_item["headline"] = "[No headline]" 

601 processed_item["topics"] = [] 

602 else: 

603 processed_item["headline"] = "[No headline]" 

604 processed_item["topics"] = [] 

605 

606 processed_history.append(processed_item) 

607 

608 return { 

609 "subscription": subscription_dict, 

610 "history": processed_history, 

611 "total_runs": len(processed_history), 

612 } 

613 

614 except NewsAPIException: 

615 # Re-raise our custom exceptions 

616 raise 

617 except Exception as e: 

618 logger.exception("Error getting subscription history") 

619 raise DatabaseAccessException("get_subscription_history", str(e)) 

620 

621 

622def _format_time_ago(timestamp: str) -> str: 

623 """Format timestamp as 'X hours ago' string.""" 

624 try: 

625 from dateutil import parser 

626 from loguru import logger 

627 

628 dt = parser.parse(timestamp) 

629 

630 # If dt is naive, assume it's in UTC 

631 if dt.tzinfo is None: 

632 dt = dt.replace(tzinfo=timezone.utc) 

633 

634 now = datetime.now(timezone.utc) 

635 diff = now - dt 

636 

637 if diff.days > 0: 

638 return f"{diff.days} day{'s' if diff.days > 1 else ''} ago" 

639 if diff.seconds > 3600: 

640 hours = diff.seconds // 3600 

641 return f"{hours} hour{'s' if hours > 1 else ''} ago" 

642 if diff.seconds > 60: 

643 minutes = diff.seconds // 60 

644 return f"{minutes} minute{'s' if minutes > 1 else ''} ago" 

645 return "Just now" 

646 except Exception: 

647 logger.exception(f"Error parsing timestamp '{timestamp}'") 

648 return "Recently" 

649 

650 

651def get_subscription(subscription_id: str) -> Optional[Dict[str, Any]]: 

652 """ 

653 Get a single subscription by ID. 

654 

655 Args: 

656 subscription_id: Subscription identifier 

657 

658 Returns: 

659 Dictionary with subscription data or None if not found 

660 """ 

661 try: 

662 # Get subscription directly from user's encrypted database 

663 from ..database.session_context import get_user_db_session 

664 from ..database.models.news import NewsSubscription 

665 

666 with get_user_db_session() as db_session: 

667 subscription = ( 

668 db_session.query(NewsSubscription) 

669 .filter_by(id=subscription_id) 

670 .first() 

671 ) 

672 

673 if not subscription: 

674 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException 

675 

676 # Convert to API format matching the template expectations 

677 return { 

678 "id": subscription.id, 

679 "name": subscription.name or "", 

680 "query_or_topic": subscription.query_or_topic, 

681 "subscription_type": subscription.subscription_type, 

682 "refresh_interval_minutes": subscription.refresh_interval_minutes, 

683 "is_active": subscription.status == "active", 

684 "status": subscription.status, 

685 "folder_id": subscription.folder_id, 

686 "model_provider": subscription.model_provider, 

687 "model": subscription.model, 

688 "search_strategy": subscription.search_strategy, 

689 "custom_endpoint": subscription.custom_endpoint, 

690 "search_engine": subscription.search_engine, 

691 "search_iterations": subscription.search_iterations or 3, 

692 "questions_per_iteration": subscription.questions_per_iteration 

693 or 5, 

694 "created_at": subscription.created_at.isoformat() 

695 if subscription.created_at 

696 else None, 

697 "updated_at": subscription.updated_at.isoformat() 

698 if subscription.updated_at 

699 else None, 

700 } 

701 

702 except NewsAPIException: 

703 # Re-raise our custom exceptions 

704 raise 

705 except Exception as e: 

706 logger.exception(f"Error getting subscription {subscription_id}") 

707 raise DatabaseAccessException("get_subscription", str(e)) 

708 

709 

710def get_subscriptions(user_id: str) -> Dict[str, Any]: 

711 """ 

712 Get all subscriptions for a user. 

713 

714 Args: 

715 user_id: User identifier 

716 

717 Returns: 

718 Dictionary with subscriptions list 

719 """ 

720 try: 

721 # Get subscriptions directly from user's encrypted database 

722 from ..database.session_context import get_user_db_session 

723 from ..database.models import ResearchHistory 

724 from ..database.models.news import NewsSubscription 

725 from sqlalchemy import func 

726 

727 sub_list = [] 

728 

729 with get_user_db_session(user_id) as db_session: 

730 # Query all subscriptions for this user 

731 subscriptions = db_session.query(NewsSubscription).all() 

732 

733 for sub in subscriptions: 

734 # Count actual research runs for this subscription 

735 like_pattern = f'%"subscription_id": "{sub.id}"%' 

736 total_runs = ( 

737 db_session.query(func.count(ResearchHistory.id)) 

738 .filter(ResearchHistory.research_meta.like(like_pattern)) 

739 .scalar() 

740 or 0 

741 ) 

742 

743 # Convert ORM object to API format 

744 sub_dict = { 

745 "id": sub.id, 

746 "query": sub.query_or_topic, 

747 "type": sub.subscription_type, 

748 "refresh_minutes": sub.refresh_interval_minutes, 

749 "created_at": sub.created_at.isoformat() 

750 if sub.created_at 

751 else None, 

752 "next_refresh": sub.next_refresh.isoformat() 

753 if sub.next_refresh 

754 else None, 

755 "last_refreshed": sub.last_refresh.isoformat() 

756 if sub.last_refresh 

757 else None, 

758 "is_active": sub.status == "active", 

759 "total_runs": total_runs, # Use actual count from research_history 

760 "name": sub.name or "", 

761 "folder_id": sub.folder_id, 

762 } 

763 sub_list.append(sub_dict) 

764 

765 return {"subscriptions": sub_list, "total": len(sub_list)} 

766 

767 except Exception as e: 

768 logger.exception("Error getting subscriptions") 

769 raise DatabaseAccessException("get_subscriptions", str(e)) 

770 

771 

772def update_subscription( 

773 subscription_id: str, data: Dict[str, Any] 

774) -> Dict[str, Any]: 

775 """ 

776 Update an existing subscription. 

777 

778 Args: 

779 subscription_id: Subscription identifier 

780 data: Dictionary with fields to update 

781 

782 Returns: 

783 Dictionary with updated subscription data 

784 """ 

785 try: 

786 from ..database.session_context import get_user_db_session 

787 from ..database.models.news import NewsSubscription 

788 from datetime import datetime, timedelta 

789 

790 with get_user_db_session() as db_session: 

791 # Get existing subscription 

792 subscription = ( 

793 db_session.query(NewsSubscription) 

794 .filter_by(id=subscription_id) 

795 .first() 

796 ) 

797 if not subscription: 

798 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException 

799 

800 # Update fields 

801 if "name" in data: 

802 subscription.name = data["name"] 

803 if "query_or_topic" in data: 

804 subscription.query_or_topic = data["query_or_topic"] 

805 if "subscription_type" in data: 805 ↛ 806line 805 didn't jump to line 806 because the condition on line 805 was never true

806 subscription.subscription_type = data["subscription_type"] 

807 if "refresh_interval_minutes" in data: 

808 old_interval = subscription.refresh_interval_minutes 

809 subscription.refresh_interval_minutes = data[ 

810 "refresh_interval_minutes" 

811 ] 

812 # Recalculate next_refresh if interval changed 

813 if old_interval != subscription.refresh_interval_minutes: 813 ↛ 817line 813 didn't jump to line 817 because the condition on line 813 was always true

814 subscription.next_refresh = datetime.now(UTC) + timedelta( 

815 minutes=subscription.refresh_interval_minutes 

816 ) 

817 if "is_active" in data: 

818 subscription.status = ( 

819 "active" if data["is_active"] else "paused" 

820 ) 

821 if "status" in data: 821 ↛ 822line 821 didn't jump to line 822 because the condition on line 821 was never true

822 subscription.status = data["status"] 

823 if "folder_id" in data: 

824 subscription.folder_id = data["folder_id"] 

825 if "model_provider" in data: 

826 subscription.model_provider = normalize_provider( 

827 data["model_provider"] 

828 ) 

829 if "model" in data: 

830 subscription.model = data["model"] 

831 if "search_strategy" in data: 831 ↛ 832line 831 didn't jump to line 832 because the condition on line 831 was never true

832 subscription.search_strategy = data["search_strategy"] 

833 if "custom_endpoint" in data: 833 ↛ 834line 833 didn't jump to line 834 because the condition on line 833 was never true

834 subscription.custom_endpoint = data["custom_endpoint"] 

835 if "search_engine" in data: 

836 subscription.search_engine = data["search_engine"] 

837 if "search_iterations" in data: 

838 subscription.search_iterations = data["search_iterations"] 

839 if "questions_per_iteration" in data: 

840 subscription.questions_per_iteration = data[ 

841 "questions_per_iteration" 

842 ] 

843 

844 # Update timestamp 

845 subscription.updated_at = datetime.now(UTC) 

846 

847 # Commit changes 

848 db_session.commit() 

849 

850 # Notify scheduler about updated subscription 

851 _notify_scheduler_about_subscription_change("updated") 

852 

853 # Convert to API format 

854 return { 

855 "status": "success", 

856 "subscription": { 

857 "id": subscription.id, 

858 "name": subscription.name or "", 

859 "query_or_topic": subscription.query_or_topic, 

860 "subscription_type": subscription.subscription_type, 

861 "refresh_interval_minutes": subscription.refresh_interval_minutes, 

862 "is_active": subscription.status == "active", 

863 "status": subscription.status, 

864 "folder_id": subscription.folder_id, 

865 "model_provider": subscription.model_provider, 

866 "model": subscription.model, 

867 "search_strategy": subscription.search_strategy, 

868 "custom_endpoint": subscription.custom_endpoint, 

869 "search_engine": subscription.search_engine, 

870 "search_iterations": subscription.search_iterations or 3, 

871 "questions_per_iteration": subscription.questions_per_iteration 

872 or 5, 

873 }, 

874 } 

875 

876 except NewsAPIException: 

877 # Re-raise our custom exceptions 

878 raise 

879 except Exception as e: 

880 logger.exception("Error updating subscription") 

881 raise SubscriptionUpdateException(subscription_id, str(e)) 

882 

883 

884def create_subscription( 

885 user_id: str, 

886 query: str, 

887 subscription_type: str = "search", 

888 refresh_minutes: Optional[int] = None, 

889 source_research_id: Optional[str] = None, 

890 model_provider: Optional[str] = None, 

891 model: Optional[str] = None, 

892 search_strategy: Optional[str] = None, 

893 custom_endpoint: Optional[str] = None, 

894 name: Optional[str] = None, 

895 folder_id: Optional[str] = None, 

896 is_active: bool = True, 

897 search_engine: Optional[str] = None, 

898 search_iterations: Optional[int] = None, 

899 questions_per_iteration: Optional[int] = None, 

900) -> Dict[str, Any]: 

901 """ 

902 Create a new subscription for user. 

903 

904 Args: 

905 user_id: User identifier 

906 query: Search query or topic 

907 subscription_type: "search" or "topic" 

908 refresh_minutes: Refresh interval in minutes 

909 

910 Returns: 

911 Dictionary with subscription details 

912 """ 

913 try: 

914 from ..database.session_context import get_user_db_session 

915 from ..database.models.news import NewsSubscription 

916 from datetime import datetime, timedelta 

917 import uuid 

918 

919 # Get default refresh interval from settings if not provided 

920 # NOTE: This API function accesses the settings DB for convenience when used 

921 # within the Flask application context. For programmatic API access outside 

922 # the web context, callers should provide refresh_minutes explicitly to avoid 

923 # dependency on the settings database being initialized. 

924 

925 with get_user_db_session(user_id) as db_session: 

926 if refresh_minutes is None: 

927 try: 

928 from ..utilities.db_utils import get_settings_manager 

929 

930 settings_manager = get_settings_manager(db_session, user_id) 

931 refresh_minutes = settings_manager.get_setting( 

932 "news.subscription.refresh_minutes", 240 

933 ) 

934 except (ImportError, AttributeError, TypeError): 

935 # Fallback for when settings DB is not available (e.g., programmatic API usage) 

936 logger.debug( 

937 "Settings manager not available, using default refresh_minutes" 

938 ) 

939 refresh_minutes = 240 # Default to 4 hours 

940 # Create new subscription 

941 subscription = NewsSubscription( 

942 id=str(uuid.uuid4()), 

943 name=name, 

944 query_or_topic=query, 

945 subscription_type=subscription_type, 

946 refresh_interval_minutes=refresh_minutes, 

947 status="active" if is_active else "paused", 

948 model_provider=normalize_provider(model_provider), 

949 model=model, 

950 search_strategy=search_strategy or "news_aggregation", 

951 custom_endpoint=custom_endpoint, 

952 folder_id=folder_id, 

953 search_engine=search_engine, 

954 search_iterations=search_iterations, 

955 questions_per_iteration=questions_per_iteration, 

956 created_at=datetime.now(UTC), 

957 updated_at=datetime.now(UTC), 

958 last_refresh=None, 

959 next_refresh=datetime.now(UTC) 

960 + timedelta(minutes=refresh_minutes), 

961 source_id=source_research_id, 

962 ) 

963 

964 # Add to database 

965 db_session.add(subscription) 

966 db_session.commit() 

967 

968 # Notify scheduler about new subscription 

969 _notify_scheduler_about_subscription_change("created", user_id) 

970 

971 return { 

972 "status": "success", 

973 "subscription_id": subscription.id, 

974 "type": subscription_type, 

975 "query": query, 

976 "refresh_minutes": refresh_minutes, 

977 } 

978 

979 except Exception as e: 

980 logger.exception("Error creating subscription") 

981 raise SubscriptionCreationException( 

982 str(e), {"query": query, "type": subscription_type} 

983 ) 

984 

985 

986def delete_subscription(subscription_id: str) -> Dict[str, Any]: 

987 """ 

988 Delete a subscription. 

989 

990 Args: 

991 subscription_id: ID of subscription to delete 

992 

993 Returns: 

994 Dictionary with status 

995 """ 

996 try: 

997 from ..database.session_context import get_user_db_session 

998 from ..database.models.news import NewsSubscription 

999 

1000 with get_user_db_session() as db_session: 

1001 subscription = ( 

1002 db_session.query(NewsSubscription) 

1003 .filter_by(id=subscription_id) 

1004 .first() 

1005 ) 

1006 if subscription: 

1007 db_session.delete(subscription) 

1008 db_session.commit() 

1009 

1010 # Notify scheduler about deleted subscription 

1011 _notify_scheduler_about_subscription_change("deleted") 

1012 

1013 return {"status": "success", "deleted": subscription_id} 

1014 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException 

1015 except NewsAPIException: 

1016 # Re-raise our custom exceptions 

1017 raise 

1018 except Exception as e: 

1019 logger.exception("Error deleting subscription") 

1020 raise SubscriptionDeletionException(subscription_id, str(e)) 

1021 

1022 

1023def get_votes_for_cards(card_ids: list, user_id: str) -> Dict[str, Any]: 

1024 """ 

1025 Get vote counts and user's votes for multiple news cards. 

1026 

1027 Args: 

1028 card_ids: List of card IDs to get votes for 

1029 user_id: User identifier (not used - per-user database) 

1030 

1031 Returns: 

1032 Dictionary with vote information for each card 

1033 """ 

1034 from flask import session as flask_session, has_request_context 

1035 from ..database.models.news import UserRating, RatingType 

1036 from ..database.session_context import get_user_db_session 

1037 

1038 # Resolve username before try block 

1039 if not has_request_context(): 1039 ↛ 1046line 1039 didn't jump to line 1046 because the condition on line 1039 was always true

1040 # If called outside of request context (e.g., in tests), use user_id directly 

1041 username = user_id if user_id else None 

1042 if not username: 

1043 raise ValueError("No username provided and no request context") 

1044 else: 

1045 # Get username from session 

1046 username = flask_session.get("username") 

1047 if not username: 

1048 raise ValueError("No username in session") 

1049 

1050 try: 

1051 # Get database session 

1052 with get_user_db_session(username) as db: 

1053 results = {} 

1054 

1055 for card_id in card_ids: 

1056 # Get user's vote for this card 

1057 user_vote = ( 

1058 db.query(UserRating) 

1059 .filter_by( 

1060 card_id=card_id, rating_type=RatingType.RELEVANCE 

1061 ) 

1062 .first() 

1063 ) 

1064 

1065 # Count total votes for this card 

1066 upvotes = ( 

1067 db.query(UserRating) 

1068 .filter_by( 

1069 card_id=card_id, 

1070 rating_type=RatingType.RELEVANCE, 

1071 rating_value="up", 

1072 ) 

1073 .count() 

1074 ) 

1075 

1076 downvotes = ( 

1077 db.query(UserRating) 

1078 .filter_by( 

1079 card_id=card_id, 

1080 rating_type=RatingType.RELEVANCE, 

1081 rating_value="down", 

1082 ) 

1083 .count() 

1084 ) 

1085 

1086 results[card_id] = { 

1087 "upvotes": upvotes, 

1088 "downvotes": downvotes, 

1089 "user_vote": user_vote.rating_value if user_vote else None, 

1090 } 

1091 

1092 return {"success": True, "votes": results} 

1093 

1094 except Exception: 

1095 logger.exception("Error getting votes for cards") 

1096 raise 

1097 

1098 

1099def submit_feedback(card_id: str, user_id: str, vote: str) -> Dict[str, Any]: 

1100 """ 

1101 Submit feedback (vote) for a news card. 

1102 

1103 Args: 

1104 card_id: ID of the news card 

1105 user_id: User identifier (not used - per-user database) 

1106 vote: "up" or "down" 

1107 

1108 Returns: 

1109 Dictionary with updated vote counts 

1110 """ 

1111 from flask import session as flask_session, has_request_context 

1112 from sqlalchemy_utc import utcnow 

1113 from ..database.models.news import UserRating, RatingType 

1114 from ..database.session_context import get_user_db_session 

1115 

1116 # Validate vote value 

1117 if vote not in ["up", "down"]: 

1118 raise ValueError(f"Invalid vote type: {vote}") 

1119 

1120 # Resolve username before try block 

1121 if not has_request_context(): 1121 ↛ 1128line 1121 didn't jump to line 1128 because the condition on line 1121 was always true

1122 # If called outside of request context (e.g., in tests), use user_id directly 

1123 username = user_id if user_id else None 

1124 if not username: 

1125 raise ValueError("No username provided and no request context") 

1126 else: 

1127 # Get username from session 

1128 username = flask_session.get("username") 

1129 if not username: 

1130 raise ValueError("No username in session") 

1131 

1132 try: 

1133 # Get database session 

1134 with get_user_db_session(username) as db: 

1135 # We don't check if the card exists in the database since news items 

1136 # are generated dynamically and may not be stored as NewsCard entries 

1137 

1138 # Check if user already voted on this card 

1139 existing_rating = ( 

1140 db.query(UserRating) 

1141 .filter_by(card_id=card_id, rating_type=RatingType.RELEVANCE) 

1142 .first() 

1143 ) 

1144 

1145 if existing_rating: 

1146 # Update existing vote 

1147 existing_rating.rating_value = vote 

1148 existing_rating.created_at = utcnow() 

1149 else: 

1150 # Create new rating 

1151 new_rating = UserRating( 

1152 card_id=card_id, 

1153 rating_type=RatingType.RELEVANCE, 

1154 rating_value=vote, 

1155 ) 

1156 db.add(new_rating) 

1157 

1158 db.commit() 

1159 

1160 # Count total votes for this card 

1161 upvotes = ( 

1162 db.query(UserRating) 

1163 .filter_by( 

1164 card_id=card_id, 

1165 rating_type=RatingType.RELEVANCE, 

1166 rating_value="up", 

1167 ) 

1168 .count() 

1169 ) 

1170 

1171 downvotes = ( 

1172 db.query(UserRating) 

1173 .filter_by( 

1174 card_id=card_id, 

1175 rating_type=RatingType.RELEVANCE, 

1176 rating_value="down", 

1177 ) 

1178 .count() 

1179 ) 

1180 

1181 logger.info( 

1182 f"Feedback submitted for card {card_id}: {vote} (up: {upvotes}, down: {downvotes})" 

1183 ) 

1184 

1185 return { 

1186 "success": True, 

1187 "card_id": card_id, 

1188 "vote": vote, 

1189 "upvotes": upvotes, 

1190 "downvotes": downvotes, 

1191 } 

1192 

1193 except Exception: 

1194 logger.exception(f"Error submitting feedback for card {card_id}") 

1195 raise 

1196 

1197 

1198def research_news_item(card_id: str, depth: str = "quick") -> Dict[str, Any]: 

1199 """ 

1200 Perform deeper research on a news item. 

1201 

1202 Args: 

1203 card_id: ID of the news card to research 

1204 depth: Research depth - "quick", "detailed", or "report" 

1205 

1206 Returns: 

1207 Dictionary with research results 

1208 """ 

1209 # TODO: Implement with per-user database for cards 

1210 logger.warning( 

1211 "research_news_item not yet implemented with per-user databases" 

1212 ) 

1213 raise NotImplementedException("research_news_item") 

1214 

1215 

1216def save_news_preferences( 

1217 user_id: str, preferences: Dict[str, Any] 

1218) -> Dict[str, Any]: 

1219 """ 

1220 Save user preferences for news. 

1221 

1222 Args: 

1223 user_id: User identifier 

1224 preferences: Dictionary of preferences to save 

1225 

1226 Returns: 

1227 Dictionary with status and message 

1228 """ 

1229 # TODO: Implement with per-user database for preferences 

1230 logger.warning( 

1231 "save_news_preferences not yet implemented with per-user databases" 

1232 ) 

1233 raise NotImplementedException("save_news_preferences") 

1234 

1235 

1236def get_news_categories() -> Dict[str, Any]: 

1237 """ 

1238 Get available news categories with counts. 

1239 

1240 Returns: 

1241 Dictionary with categories and statistics 

1242 """ 

1243 # TODO: Implement with per-user database for categories 

1244 logger.warning( 

1245 "get_news_categories not yet implemented with per-user databases" 

1246 ) 

1247 raise NotImplementedException("get_news_categories")