Coverage for src / local_deep_research / news / api.py: 19%

417 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Direct API functions for news system. 

3These functions can be called directly by scheduler or wrapped by Flask endpoints. 

4""" 

5 

6from typing import Dict, Any, Optional 

7from datetime import datetime, timezone, UTC 

8from loguru import logger 

9import re 

10import json 

11 

12from .recommender.topic_based import TopicBasedRecommender 

13from .exceptions import ( 

14 InvalidLimitException, 

15 SubscriptionNotFoundException, 

16 SubscriptionCreationException, 

17 SubscriptionUpdateException, 

18 SubscriptionDeletionException, 

19 DatabaseAccessException, 

20 NewsFeedGenerationException, 

21 NotImplementedException, 

22 NewsAPIException, 

23) 

24# Removed welcome feed import - no placeholders 

25# get_db_setting not available in merged codebase 

26 

27 

28# Global recommender instance (can be reused) 

29_recommender = None 

30 

31 

32def get_recommender(): 

33 """Get or create recommender instance""" 

34 global _recommender 

35 if _recommender is None: 

36 _recommender = TopicBasedRecommender() 

37 return _recommender 

38 

39 

40def _notify_scheduler_about_subscription_change( 

41 action: str, user_id: str = None 

42): 

43 """ 

44 Notify the scheduler about subscription changes. 

45 

46 Args: 

47 action: The action performed (created, updated, deleted) 

48 user_id: Optional user_id to use as fallback for username 

49 """ 

50 try: 

51 from flask import session as flask_session 

52 from .subscription_manager.scheduler import get_news_scheduler 

53 

54 scheduler = get_news_scheduler() 

55 if scheduler.is_running: 

56 # Get username, with optional fallback to user_id 

57 username = flask_session.get("username") 

58 if not username and user_id: 

59 username = user_id 

60 

61 # Get password from session password store 

62 from ..database.session_passwords import session_password_store 

63 

64 session_id = flask_session.get("session_id") 

65 password = None 

66 if session_id and username: 

67 password = session_password_store.get_session_password( 

68 username, session_id 

69 ) 

70 

71 if password: 

72 # Update scheduler to reschedule subscriptions 

73 scheduler.update_user_info(username, password) 

74 logger.info( 

75 f"Scheduler notified about {action} subscription for {username}" 

76 ) 

77 else: 

78 logger.warning( 

79 f"Could not notify scheduler - no password available{' for ' + username if username else ''}" 

80 ) 

81 except Exception: 

82 logger.exception( 

83 f"Could not notify scheduler about {action} subscription" 

84 ) 

85 

86 

87def get_news_feed( 

88 user_id: str = "anonymous", 

89 limit: int = 20, 

90 use_cache: bool = True, 

91 focus: Optional[str] = None, 

92 search_strategy: Optional[str] = None, 

93 subscription_id: Optional[str] = None, 

94) -> Dict[str, Any]: 

95 """ 

96 Get personalized news feed by pulling from news_items table first, then research history. 

97 

98 Args: 

99 user_id: User identifier 

100 limit: Maximum number of cards to return 

101 use_cache: Whether to use cached news 

102 focus: Optional focus area for news 

103 search_strategy: Override default recommendation strategy 

104 

105 Returns: 

106 Dictionary with news items and metadata 

107 """ 

108 try: 

109 # Validate limit - allow any positive number 

110 if limit < 1: 

111 raise InvalidLimitException(limit) 

112 

113 logger.info( 

114 f"get_news_feed called with user_id={user_id}, limit={limit}" 

115 ) 

116 

117 # News is always enabled for now - per-user settings will be handled later 

118 # if not get_db_setting("news.enabled", True): 

119 # return {"error": "News system is disabled", "news_items": []} 

120 

121 # Import database functions 

122 from ..database.session_context import get_user_db_session 

123 from ..database.models import ResearchHistory 

124 

125 news_items = [] 

126 remaining_limit = limit 

127 

128 # Query research history from user's database for news items 

129 logger.info("Getting news items from research history") 

130 try: 

131 # Use the user_id provided to the function 

132 with get_user_db_session(user_id) as db_session: 

133 # Build query using ORM 

134 query = db_session.query(ResearchHistory).filter( 

135 ResearchHistory.status == "completed" 

136 ) 

137 

138 # Filter by subscription if provided 

139 if subscription_id and subscription_id != "all": 

140 # Use JSON containment for PostgreSQL or LIKE for SQLite 

141 query = query.filter( 

142 ResearchHistory.research_meta.like( 

143 f'%"subscription_id":"{subscription_id}"%' 

144 ) 

145 ) 

146 

147 # Order by creation date and limit 

148 results = ( 

149 query.order_by(ResearchHistory.created_at.desc()) 

150 .limit(remaining_limit * 2) 

151 .all() 

152 ) 

153 

154 # Convert ORM objects to dictionaries for compatibility 

155 results = [ 

156 { 

157 "id": r.id, 

158 "uuid_id": r.id, # In ResearchHistory, id is the UUID 

159 "query": r.query, 

160 "title": r.title 

161 if hasattr(r, "title") 

162 else None, # Include title field if exists 

163 "created_at": r.created_at if r.created_at else None, 

164 "completed_at": r.completed_at 

165 if r.completed_at 

166 else None, 

167 "duration_seconds": r.duration_seconds 

168 if hasattr(r, "duration_seconds") 

169 else None, 

170 "report_path": r.report_path 

171 if hasattr(r, "report_path") 

172 else None, 

173 "report_content": r.report_content 

174 if hasattr(r, "report_content") 

175 else None, # Include database content 

176 "research_meta": r.research_meta, 

177 "status": r.status, 

178 } 

179 for r in results 

180 ] 

181 

182 logger.info(f"Database returned {len(results)} research items") 

183 if results and len(results) > 0: 

184 logger.info(f"First row keys: {list(results[0].keys())}") 

185 # Log first few items' metadata 

186 for i, row in enumerate(results[:3]): 

187 logger.info( 

188 f"Item {i}: query='{row['query'][:50]}...', has meta={bool(row.get('research_meta'))}" 

189 ) 

190 

191 # Process results to find news items 

192 processed_count = 0 

193 error_count = 0 

194 

195 for row in results: 

196 try: 

197 # Parse metadata 

198 metadata = {} 

199 if row.get("research_meta"): 

200 try: 

201 # Handle both dict and string formats 

202 if isinstance(row["research_meta"], dict): 

203 metadata = row["research_meta"] 

204 else: 

205 metadata = json.loads(row["research_meta"]) 

206 except (json.JSONDecodeError, TypeError): 

207 logger.exception("Error parsing metadata") 

208 metadata = {} 

209 

210 # Check if this has news metadata (generated_headline or generated_topics) 

211 # or if it's a news-related query 

212 has_news_metadata = ( 

213 metadata.get("generated_headline") is not None 

214 or metadata.get("generated_topics") is not None 

215 ) 

216 

217 query_lower = row["query"].lower() 

218 is_news_query = ( 

219 has_news_metadata 

220 or metadata.get("is_news_search") 

221 or metadata.get("search_type") == "news_analysis" 

222 or "breaking news" in query_lower 

223 or "news stories" in query_lower 

224 or ( 

225 "today" in query_lower 

226 and ( 

227 "news" in query_lower 

228 or "breaking" in query_lower 

229 ) 

230 ) 

231 or "latest news" in query_lower 

232 ) 

233 

234 # Log the decision for first few items 

235 if processed_count < 3 or error_count < 3: 

236 logger.info( 

237 f"Item check: query='{row['query'][:30]}...', is_news_search={metadata.get('is_news_search')}, " 

238 f"has_news_metadata={has_news_metadata}, is_news_query={is_news_query}" 

239 ) 

240 

241 # Only show items that have news metadata or are news queries 

242 if is_news_query: 

243 processed_count += 1 

244 logger.info( 

245 f"Processing research item #{processed_count}: {row['query'][:50]}..." 

246 ) 

247 

248 # Always use database content 

249 findings = "" 

250 summary = "" 

251 report_content_db = row.get( 

252 "report_content" 

253 ) # Get database content 

254 

255 # Use database content 

256 content = report_content_db 

257 if content: 

258 logger.debug( 

259 f"Using database content for research {row['id']}" 

260 ) 

261 

262 # Process database content 

263 lines = content.split("\n") if content else [] 

264 # Use full content as findings 

265 findings = content 

266 # Extract summary from first non-empty line 

267 for line in lines: 

268 if line.strip() and not line.startswith("#"): 

269 summary = line.strip() 

270 break 

271 else: 

272 logger.debug( 

273 f"No database content for research {row['id']}" 

274 ) 

275 

276 # Use stored headline/topics if available, otherwise generate 

277 original_query = row["query"] 

278 

279 # Check for headline - first try database title, then metadata 

280 headline = row.get("title") or metadata.get( 

281 "generated_headline" 

282 ) 

283 

284 # For subscription results, generate headline from query if needed 

285 if not headline and metadata.get("is_news_search"): 

286 # Use subscription name or query as headline 

287 subscription_name = metadata.get( 

288 "subscription_name" 

289 ) 

290 if subscription_name: 

291 headline = f"News Update: {subscription_name}" 

292 else: 

293 # Generate headline from query 

294 headline = f"News: {row['query'][:60]}..." 

295 

296 # Skip items without meaningful headlines or that are incomplete 

297 if ( 

298 not headline 

299 or headline == "[No headline available]" 

300 ): 

301 logger.debug( 

302 f"Skipping item without headline: {row['id']}" 

303 ) 

304 continue 

305 

306 # Skip items that are still in progress or suspended 

307 if row["status"] in ["in_progress", "suspended"]: 

308 logger.debug( 

309 f"Skipping incomplete item: {row['id']} (status: {row['status']})" 

310 ) 

311 continue 

312 

313 # Skip items without content (neither file nor database) 

314 if not content: 

315 logger.debug( 

316 f"Skipping item without content: {row['id']}" 

317 ) 

318 continue 

319 

320 # Use ID properly, preferring uuid_id 

321 research_id = row.get("uuid_id") or str(row["id"]) 

322 

323 # Use stored category and topics - no defaults 

324 category = metadata.get("category") 

325 if not category: 

326 category = "[Uncategorized]" 

327 

328 topics = metadata.get("generated_topics") 

329 if not topics: 

330 topics = ["[No topics]"] 

331 

332 # Extract top 3 links from the database content 

333 links = [] 

334 if content: 

335 try: 

336 report_lines = content.split("\n") 

337 link_count = 0 

338 for i, line in enumerate( 

339 report_lines[:100] 

340 ): # Check first 100 lines for links 

341 if "URL:" in line: 

342 url = line.split("URL:", 1)[1].strip() 

343 if url.startswith("http"): 

344 # Get the title from the previous line if available 

345 title = "" 

346 if i > 0: 

347 title_line = report_lines[ 

348 i - 1 

349 ].strip() 

350 # Remove citation numbers like [12, 26, 19] 

351 title = re.sub( 

352 r"^\[[^\]]+\]\s*", 

353 "", 

354 title_line, 

355 ).strip() 

356 

357 if not title: 

358 # Use domain as fallback 

359 domain = url.split("//")[ 

360 -1 

361 ].split("/")[0] 

362 title = domain.replace( 

363 "www.", "" 

364 ) 

365 

366 links.append( 

367 { 

368 "url": url, 

369 "title": title[:50] + "..." 

370 if len(title) > 50 

371 else title, 

372 } 

373 ) 

374 link_count += 1 

375 logger.debug( 

376 f"Found link: {title} - {url}" 

377 ) 

378 if link_count >= 3: 

379 break 

380 except Exception as e: 

381 logger.exception( 

382 f"Error extracting links from database content: {e}" 

383 ) 

384 

385 # Create news item from research 

386 news_item = { 

387 "id": f"news-{research_id}", 

388 "headline": headline, 

389 "category": category, 

390 "summary": summary 

391 or f"Research analysis for: {headline[:100]}", 

392 "findings": findings, 

393 "impact_score": metadata.get( 

394 "impact_score", 0 

395 ), # 0 indicates missing 

396 "time_ago": _format_time_ago(row["created_at"]), 

397 "upvotes": metadata.get("upvotes", 0), 

398 "downvotes": metadata.get("downvotes", 0), 

399 "source_url": f"/results/{research_id}", 

400 "topics": topics, # Use generated topics 

401 "links": links, # Add extracted links 

402 "research_id": research_id, 

403 "created_at": row["created_at"], 

404 "duration_seconds": row.get("duration_seconds", 0), 

405 "original_query": original_query, # Keep original query for reference 

406 "is_news": metadata.get( 

407 "is_news_search", False 

408 ), # Flag for news searches 

409 "news_date": metadata.get( 

410 "news_date" 

411 ), # If specific date for news 

412 "news_source": metadata.get( 

413 "news_source" 

414 ), # If from specific source 

415 "priority": metadata.get( 

416 "priority", "normal" 

417 ), # Priority level 

418 } 

419 

420 news_items.append(news_item) 

421 logger.info(f"Added news item: {headline[:50]}...") 

422 

423 if len(news_items) >= limit: 

424 break 

425 

426 except Exception: 

427 error_count += 1 

428 logger.exception( 

429 f"Error processing research item with query: {row.get('query', 'UNKNOWN')[:100]}" 

430 ) 

431 continue 

432 

433 logger.info( 

434 f"Processing summary: total_results={len(results)}, processed={processed_count}, " 

435 f"errors={error_count}, added={len(news_items)}" 

436 ) 

437 

438 # Log subscription-specific items if we were filtering 

439 if subscription_id and subscription_id != "all": 

440 sub_items = [ 

441 item for item in news_items if item.get("is_news", False) 

442 ] 

443 logger.info( 

444 f"Subscription {subscription_id}: found {len(sub_items)} items" 

445 ) 

446 

447 except Exception as db_error: 

448 logger.exception(f"Database error in research history: {db_error}") 

449 raise DatabaseAccessException( 

450 "research_history_query", str(db_error) 

451 ) 

452 

453 # If no news items found, return empty list 

454 if not news_items: 

455 logger.info("No news items found, returning empty list") 

456 news_items = [] 

457 

458 logger.info(f"Returning {len(news_items)} news items to client") 

459 

460 # Determine the source 

461 source = ( 

462 "news_items" 

463 if any(item.get("is_news", False) for item in news_items) 

464 else "research_history" 

465 ) 

466 

467 return { 

468 "news_items": news_items[:limit], 

469 "generated_at": datetime.now(timezone.utc).isoformat(), 

470 "focus": focus, 

471 "search_strategy": search_strategy or "default", 

472 "total_items": len(news_items), 

473 "source": source, 

474 } 

475 

476 except NewsAPIException: 

477 # Re-raise our custom exceptions 

478 raise 

479 except Exception as e: 

480 logger.exception("Error getting news feed") 

481 raise NewsFeedGenerationException(str(e), user_id=user_id) 

482 

483 

484def debug_research_items(user_id: str): 

485 """Debug function to check what's in the database.""" 

486 try: 

487 from ..database.session_context import get_user_db_session 

488 from ..database.models import ResearchHistory 

489 from sqlalchemy import func 

490 

491 with get_user_db_session(user_id) as db_session: 

492 # Count all research items 

493 total = db_session.query(func.count(ResearchHistory.id)).scalar() 

494 

495 # Count by status 

496 status_counts = ( 

497 db_session.query( 

498 ResearchHistory.status, 

499 func.count(ResearchHistory.id).label("count"), 

500 ) 

501 .group_by(ResearchHistory.status) 

502 .all() 

503 ) 

504 

505 # Convert to dict format 

506 status_counts = [ 

507 {"status": status, "count": count} 

508 for status, count in status_counts 

509 ] 

510 

511 # Get recent items 

512 recent = ( 

513 db_session.query(ResearchHistory) 

514 .order_by(ResearchHistory.created_at.desc()) 

515 .limit(10) 

516 .all() 

517 ) 

518 

519 # Convert to dict format 

520 recent = [ 

521 { 

522 "id": r.id, 

523 "query": r.query, 

524 "status": r.status, 

525 "created_at": r.created_at.isoformat() 

526 if r.created_at 

527 else None, 

528 } 

529 for r in recent 

530 ] 

531 

532 return { 

533 "total_items": total, 

534 "by_status": status_counts, 

535 "recent_items": recent, 

536 } 

537 except Exception as e: 

538 logger.exception("Error in debug_research_items") 

539 raise DatabaseAccessException("debug_research_items", str(e)) 

540 

541 

542def get_subscription_history( 

543 subscription_id: str, limit: int = 20 

544) -> Dict[str, Any]: 

545 """ 

546 Get research history for a specific subscription. 

547 

548 Args: 

549 subscription_id: The subscription UUID 

550 limit: Maximum number of history items to return 

551 

552 Returns: 

553 Dict containing subscription info and its research history 

554 """ 

555 try: 

556 from ..database.session_context import get_user_db_session 

557 from ..database.models import ResearchHistory 

558 from ..database.models.news import NewsSubscription 

559 

560 # Get subscription details using ORM from user's encrypted database 

561 with get_user_db_session() as session: 

562 subscription = ( 

563 session.query(NewsSubscription) 

564 .filter_by(id=subscription_id) 

565 .first() 

566 ) 

567 

568 if not subscription: 568 ↛ 572line 568 didn't jump to line 572 because the condition on line 568 was always true

569 raise SubscriptionNotFoundException(subscription_id) 

570 

571 # Convert to dict for response 

572 subscription_dict = { 

573 "id": subscription.id, 

574 "query_or_topic": subscription.query_or_topic, 

575 "subscription_type": subscription.subscription_type, 

576 "refresh_interval_minutes": subscription.refresh_interval_minutes, 

577 "refresh_count": subscription.refresh_count or 0, 

578 "created_at": subscription.created_at.isoformat() 

579 if subscription.created_at 

580 else None, 

581 "next_refresh": subscription.next_refresh.isoformat() 

582 if subscription.next_refresh 

583 else None, 

584 } 

585 

586 # Now get research history from the research database 

587 # Get user_id from subscription 

588 sub_user_id = subscription_dict.get("user_id", "anonymous") 

589 

590 with get_user_db_session(sub_user_id) as db_session: 

591 # Get all research runs that were triggered by this subscription 

592 # Look for subscription_id in the research_meta JSON 

593 # Note: JSON format has space after colon 

594 like_pattern = f'%"subscription_id": "{subscription_id}"%' 

595 logger.info( 

596 f"Searching for research history with pattern: {like_pattern}" 

597 ) 

598 

599 history_items = ( 

600 db_session.query(ResearchHistory) 

601 .filter(ResearchHistory.research_meta.like(like_pattern)) 

602 .order_by(ResearchHistory.created_at.desc()) 

603 .limit(limit) 

604 .all() 

605 ) 

606 

607 # Convert to dict format for compatibility 

608 history_items = [ 

609 { 

610 "id": h.id, 

611 "uuid_id": h.uuid_id, 

612 "query": h.query, 

613 "status": h.status, 

614 "created_at": h.created_at.isoformat() 

615 if h.created_at 

616 else None, 

617 "completed_at": h.completed_at.isoformat() 

618 if h.completed_at 

619 else None, 

620 "duration_seconds": h.duration_seconds, 

621 "research_meta": h.research_meta, 

622 "report_path": h.report_path, 

623 } 

624 for h in history_items 

625 ] 

626 

627 # Process history items 

628 processed_history = [] 

629 for item in history_items: 

630 processed_item = { 

631 "research_id": item.get("uuid_id") or str(item.get("id")), 

632 "query": item["query"], 

633 "status": item["status"], 

634 "created_at": item["created_at"], 

635 "completed_at": item.get("completed_at"), 

636 "duration_seconds": item.get("duration_seconds", 0), 

637 "url": f"/progress/{item.get('uuid_id') or item.get('id')}", 

638 } 

639 

640 # Parse metadata if available to get headline and topics 

641 if item.get("research_meta"): 

642 try: 

643 meta = json.loads(item["research_meta"]) 

644 processed_item["triggered_by"] = meta.get( 

645 "triggered_by", "subscription" 

646 ) 

647 # Add headline and topics from metadata 

648 processed_item["headline"] = meta.get( 

649 "generated_headline", "[No headline]" 

650 ) 

651 processed_item["topics"] = meta.get("generated_topics", []) 

652 except Exception: 

653 processed_item["headline"] = "[No headline]" 

654 processed_item["topics"] = [] 

655 else: 

656 processed_item["headline"] = "[No headline]" 

657 processed_item["topics"] = [] 

658 

659 processed_history.append(processed_item) 

660 

661 return { 

662 "subscription": subscription_dict, 

663 "history": processed_history, 

664 "total_runs": len(processed_history), 

665 } 

666 

667 except NewsAPIException: 

668 # Re-raise our custom exceptions 

669 raise 

670 except Exception as e: 

671 logger.exception("Error getting subscription history") 

672 raise DatabaseAccessException("get_subscription_history", str(e)) 

673 

674 

675def _format_time_ago(timestamp: str) -> str: 

676 """Format timestamp as 'X hours ago' string.""" 

677 try: 

678 from dateutil import parser 

679 from loguru import logger 

680 

681 dt = parser.parse(timestamp) 

682 

683 # If dt is naive, assume it's in UTC 

684 if dt.tzinfo is None: 

685 dt = dt.replace(tzinfo=timezone.utc) 

686 

687 now = datetime.now(timezone.utc) 

688 diff = now - dt 

689 

690 if diff.days > 0: 

691 return f"{diff.days} day{'s' if diff.days > 1 else ''} ago" 

692 elif diff.seconds > 3600: 

693 hours = diff.seconds // 3600 

694 return f"{hours} hour{'s' if hours > 1 else ''} ago" 

695 elif diff.seconds > 60: 

696 minutes = diff.seconds // 60 

697 return f"{minutes} minute{'s' if minutes > 1 else ''} ago" 

698 else: 

699 return "Just now" 

700 except Exception: 

701 logger.exception(f"Error parsing timestamp '{timestamp}'") 

702 return "Recently" 

703 

704 

705def get_subscription(subscription_id: str) -> Optional[Dict[str, Any]]: 

706 """ 

707 Get a single subscription by ID. 

708 

709 Args: 

710 subscription_id: Subscription identifier 

711 

712 Returns: 

713 Dictionary with subscription data or None if not found 

714 """ 

715 try: 

716 # Get subscription directly from user's encrypted database 

717 from ..database.session_context import get_user_db_session 

718 from ..database.models.news import NewsSubscription 

719 

720 with get_user_db_session() as db_session: 

721 subscription = ( 

722 db_session.query(NewsSubscription) 

723 .filter_by(id=subscription_id) 

724 .first() 

725 ) 

726 

727 if not subscription: 727 ↛ 731line 727 didn't jump to line 731 because the condition on line 727 was always true

728 raise SubscriptionNotFoundException(subscription_id) 

729 

730 # Convert to API format matching the template expectations 

731 return { 

732 "id": subscription.id, 

733 "name": subscription.name or "", 

734 "query_or_topic": subscription.query_or_topic, 

735 "subscription_type": subscription.subscription_type, 

736 "refresh_interval_minutes": subscription.refresh_interval_minutes, 

737 "is_active": subscription.status == "active", 

738 "status": subscription.status, 

739 "folder_id": subscription.folder_id, 

740 "model_provider": subscription.model_provider, 

741 "model": subscription.model, 

742 "search_strategy": subscription.search_strategy, 

743 "custom_endpoint": subscription.custom_endpoint, 

744 "search_engine": subscription.search_engine, 

745 "search_iterations": subscription.search_iterations or 3, 

746 "questions_per_iteration": subscription.questions_per_iteration 

747 or 5, 

748 "created_at": subscription.created_at.isoformat() 

749 if subscription.created_at 

750 else None, 

751 "updated_at": subscription.updated_at.isoformat() 

752 if subscription.updated_at 

753 else None, 

754 } 

755 

756 except NewsAPIException: 

757 # Re-raise our custom exceptions 

758 raise 

759 except Exception as e: 

760 logger.exception(f"Error getting subscription {subscription_id}") 

761 raise DatabaseAccessException("get_subscription", str(e)) 

762 

763 

764def get_subscriptions(user_id: str) -> Dict[str, Any]: 

765 """ 

766 Get all subscriptions for a user. 

767 

768 Args: 

769 user_id: User identifier 

770 

771 Returns: 

772 Dictionary with subscriptions list 

773 """ 

774 try: 

775 # Get subscriptions directly from user's encrypted database 

776 from ..database.session_context import get_user_db_session 

777 from ..database.models import ResearchHistory 

778 from ..database.models.news import NewsSubscription 

779 from sqlalchemy import func 

780 

781 sub_list = [] 

782 

783 with get_user_db_session(user_id) as db_session: 

784 # Query all subscriptions for this user 

785 subscriptions = db_session.query(NewsSubscription).all() 

786 

787 for sub in subscriptions: 

788 # Count actual research runs for this subscription 

789 like_pattern = f'%"subscription_id": "{sub.id}"%' 

790 total_runs = ( 

791 db_session.query(func.count(ResearchHistory.id)) 

792 .filter(ResearchHistory.research_meta.like(like_pattern)) 

793 .scalar() 

794 or 0 

795 ) 

796 

797 # Convert ORM object to API format 

798 sub_dict = { 

799 "id": sub.id, 

800 "query": sub.query_or_topic, 

801 "type": sub.subscription_type, 

802 "refresh_minutes": sub.refresh_interval_minutes, 

803 "created_at": sub.created_at.isoformat() 

804 if sub.created_at 

805 else None, 

806 "next_refresh": sub.next_refresh.isoformat() 

807 if sub.next_refresh 

808 else None, 

809 "last_refreshed": sub.last_refresh.isoformat() 

810 if sub.last_refresh 

811 else None, 

812 "is_active": sub.status == "active", 

813 "total_runs": total_runs, # Use actual count from research_history 

814 "name": sub.name or "", 

815 "folder_id": sub.folder_id, 

816 } 

817 sub_list.append(sub_dict) 

818 

819 return {"subscriptions": sub_list, "total": len(sub_list)} 

820 

821 except Exception as e: 

822 logger.exception("Error getting subscriptions") 

823 raise DatabaseAccessException("get_subscriptions", str(e)) 

824 

825 

826def update_subscription( 

827 subscription_id: str, data: Dict[str, Any] 

828) -> Dict[str, Any]: 

829 """ 

830 Update an existing subscription. 

831 

832 Args: 

833 subscription_id: Subscription identifier 

834 data: Dictionary with fields to update 

835 

836 Returns: 

837 Dictionary with updated subscription data 

838 """ 

839 try: 

840 from ..database.session_context import get_user_db_session 

841 from ..database.models.news import NewsSubscription 

842 from datetime import datetime, timedelta 

843 

844 with get_user_db_session() as db_session: 

845 # Get existing subscription 

846 subscription = ( 

847 db_session.query(NewsSubscription) 

848 .filter_by(id=subscription_id) 

849 .first() 

850 ) 

851 if not subscription: 851 ↛ 855line 851 didn't jump to line 855 because the condition on line 851 was always true

852 raise SubscriptionNotFoundException(subscription_id) 

853 

854 # Update fields 

855 if "name" in data: 

856 subscription.name = data["name"] 

857 if "query_or_topic" in data: 

858 subscription.query_or_topic = data["query_or_topic"] 

859 if "subscription_type" in data: 

860 subscription.subscription_type = data["subscription_type"] 

861 if "refresh_interval_minutes" in data: 

862 old_interval = subscription.refresh_interval_minutes 

863 subscription.refresh_interval_minutes = data[ 

864 "refresh_interval_minutes" 

865 ] 

866 # Recalculate next_refresh if interval changed 

867 if old_interval != subscription.refresh_interval_minutes: 

868 subscription.next_refresh = datetime.now(UTC) + timedelta( 

869 minutes=subscription.refresh_interval_minutes 

870 ) 

871 if "is_active" in data: 

872 subscription.status = ( 

873 "active" if data["is_active"] else "paused" 

874 ) 

875 if "status" in data: 

876 subscription.status = data["status"] 

877 if "folder_id" in data: 

878 subscription.folder_id = data["folder_id"] 

879 if "model_provider" in data: 

880 subscription.model_provider = data["model_provider"] 

881 if "model" in data: 

882 subscription.model = data["model"] 

883 if "search_strategy" in data: 

884 subscription.search_strategy = data["search_strategy"] 

885 if "custom_endpoint" in data: 

886 subscription.custom_endpoint = data["custom_endpoint"] 

887 if "search_engine" in data: 

888 subscription.search_engine = data["search_engine"] 

889 if "search_iterations" in data: 

890 subscription.search_iterations = data["search_iterations"] 

891 if "questions_per_iteration" in data: 

892 subscription.questions_per_iteration = data[ 

893 "questions_per_iteration" 

894 ] 

895 

896 # Update timestamp 

897 subscription.updated_at = datetime.now(UTC) 

898 

899 # Commit changes 

900 db_session.commit() 

901 

902 # Notify scheduler about updated subscription 

903 _notify_scheduler_about_subscription_change("updated") 

904 

905 # Convert to API format 

906 return { 

907 "status": "success", 

908 "subscription": { 

909 "id": subscription.id, 

910 "name": subscription.name or "", 

911 "query_or_topic": subscription.query_or_topic, 

912 "subscription_type": subscription.subscription_type, 

913 "refresh_interval_minutes": subscription.refresh_interval_minutes, 

914 "is_active": subscription.status == "active", 

915 "status": subscription.status, 

916 "folder_id": subscription.folder_id, 

917 "model_provider": subscription.model_provider, 

918 "model": subscription.model, 

919 "search_strategy": subscription.search_strategy, 

920 "custom_endpoint": subscription.custom_endpoint, 

921 "search_engine": subscription.search_engine, 

922 "search_iterations": subscription.search_iterations or 3, 

923 "questions_per_iteration": subscription.questions_per_iteration 

924 or 5, 

925 }, 

926 } 

927 

928 except NewsAPIException: 

929 # Re-raise our custom exceptions 

930 raise 

931 except Exception as e: 

932 logger.exception("Error updating subscription") 

933 raise SubscriptionUpdateException(subscription_id, str(e)) 

934 

935 

936def create_subscription( 

937 user_id: str, 

938 query: str, 

939 subscription_type: str = "search", 

940 refresh_minutes: int = None, 

941 source_research_id: Optional[str] = None, 

942 model_provider: Optional[str] = None, 

943 model: Optional[str] = None, 

944 search_strategy: Optional[str] = None, 

945 custom_endpoint: Optional[str] = None, 

946 name: Optional[str] = None, 

947 folder_id: Optional[str] = None, 

948 is_active: bool = True, 

949 search_engine: Optional[str] = None, 

950 search_iterations: Optional[int] = None, 

951 questions_per_iteration: Optional[int] = None, 

952) -> Dict[str, Any]: 

953 """ 

954 Create a new subscription for user. 

955 

956 Args: 

957 user_id: User identifier 

958 query: Search query or topic 

959 subscription_type: "search" or "topic" 

960 refresh_minutes: Refresh interval in minutes 

961 

962 Returns: 

963 Dictionary with subscription details 

964 """ 

965 try: 

966 from ..database.session_context import get_user_db_session 

967 from ..database.models.news import NewsSubscription 

968 from datetime import datetime, timedelta 

969 import uuid 

970 

971 # Get default refresh interval from settings if not provided 

972 # NOTE: This API function accesses the settings DB for convenience when used 

973 # within the Flask application context. For programmatic API access outside 

974 # the web context, callers should provide refresh_minutes explicitly to avoid 

975 # dependency on the settings database being initialized. 

976 

977 with get_user_db_session(user_id) as db_session: 

978 if refresh_minutes is None: 

979 try: 

980 from ..utilities.db_utils import get_settings_manager 

981 

982 settings_manager = get_settings_manager(db_session, user_id) 

983 refresh_minutes = settings_manager.get_setting( 

984 "news.subscription.refresh_minutes", 240 

985 ) 

986 except (ImportError, AttributeError, TypeError): 

987 # Fallback for when settings DB is not available (e.g., programmatic API usage) 

988 logger.debug( 

989 "Settings manager not available, using default refresh_minutes" 

990 ) 

991 refresh_minutes = 240 # Default to 4 hours 

992 # Create new subscription 

993 subscription = NewsSubscription( 

994 id=str(uuid.uuid4()), 

995 name=name, 

996 query_or_topic=query, 

997 subscription_type=subscription_type, 

998 refresh_interval_minutes=refresh_minutes, 

999 status="active" if is_active else "paused", 

1000 model_provider=model_provider, 

1001 model=model, 

1002 search_strategy=search_strategy or "news_aggregation", 

1003 custom_endpoint=custom_endpoint, 

1004 folder_id=folder_id, 

1005 search_engine=search_engine, 

1006 search_iterations=search_iterations, 

1007 questions_per_iteration=questions_per_iteration, 

1008 created_at=datetime.now(UTC), 

1009 updated_at=datetime.now(UTC), 

1010 last_refresh=None, 

1011 next_refresh=datetime.now(UTC) 

1012 + timedelta(minutes=refresh_minutes), 

1013 source_id=source_research_id, 

1014 ) 

1015 

1016 # Add to database 

1017 db_session.add(subscription) 

1018 db_session.commit() 

1019 

1020 # Notify scheduler about new subscription 

1021 _notify_scheduler_about_subscription_change("created", user_id) 

1022 

1023 return { 

1024 "status": "success", 

1025 "subscription_id": subscription.id, 

1026 "type": subscription_type, 

1027 "query": query, 

1028 "refresh_minutes": refresh_minutes, 

1029 } 

1030 

1031 except Exception as e: 

1032 logger.exception("Error creating subscription") 

1033 raise SubscriptionCreationException( 

1034 str(e), {"query": query, "type": subscription_type} 

1035 ) 

1036 

1037 

1038def delete_subscription(subscription_id: str) -> Dict[str, Any]: 

1039 """ 

1040 Delete a subscription. 

1041 

1042 Args: 

1043 subscription_id: ID of subscription to delete 

1044 

1045 Returns: 

1046 Dictionary with status 

1047 """ 

1048 try: 

1049 from ..database.session_context import get_user_db_session 

1050 from ..database.models.news import NewsSubscription 

1051 

1052 with get_user_db_session() as db_session: 

1053 subscription = ( 

1054 db_session.query(NewsSubscription) 

1055 .filter_by(id=subscription_id) 

1056 .first() 

1057 ) 

1058 if subscription: 1058 ↛ 1059line 1058 didn't jump to line 1059 because the condition on line 1058 was never true

1059 db_session.delete(subscription) 

1060 db_session.commit() 

1061 

1062 # Notify scheduler about deleted subscription 

1063 _notify_scheduler_about_subscription_change("deleted") 

1064 

1065 return {"status": "success", "deleted": subscription_id} 

1066 else: 

1067 raise SubscriptionNotFoundException(subscription_id) 

1068 except NewsAPIException: 

1069 # Re-raise our custom exceptions 

1070 raise 

1071 except Exception as e: 

1072 logger.exception("Error deleting subscription") 

1073 raise SubscriptionDeletionException(subscription_id, str(e)) 

1074 

1075 

1076def get_votes_for_cards(card_ids: list, user_id: str) -> Dict[str, Any]: 

1077 """ 

1078 Get vote counts and user's votes for multiple news cards. 

1079 

1080 Args: 

1081 card_ids: List of card IDs to get votes for 

1082 user_id: User identifier (not used - per-user database) 

1083 

1084 Returns: 

1085 Dictionary with vote information for each card 

1086 """ 

1087 from flask import session as flask_session, has_request_context 

1088 from ..database.models.news import UserRating, RatingType 

1089 from ..database.session_context import get_user_db_session 

1090 

1091 try: 

1092 # Check if we're in a request context 

1093 if not has_request_context(): 

1094 # If called outside of request context (e.g., in tests), use user_id directly 

1095 username = user_id if user_id else None 

1096 if not username: 

1097 raise ValueError("No username provided and no request context") 

1098 else: 

1099 # Get username from session 

1100 username = flask_session.get("username") 

1101 if not username: 

1102 raise ValueError("No username in session") 

1103 

1104 # Get database session 

1105 with get_user_db_session(username) as db: 

1106 results = {} 

1107 

1108 for card_id in card_ids: 

1109 # Get user's vote for this card 

1110 user_vote = ( 

1111 db.query(UserRating) 

1112 .filter_by( 

1113 card_id=card_id, rating_type=RatingType.RELEVANCE 

1114 ) 

1115 .first() 

1116 ) 

1117 

1118 # Count total votes for this card 

1119 upvotes = ( 

1120 db.query(UserRating) 

1121 .filter_by( 

1122 card_id=card_id, 

1123 rating_type=RatingType.RELEVANCE, 

1124 rating_value="up", 

1125 ) 

1126 .count() 

1127 ) 

1128 

1129 downvotes = ( 

1130 db.query(UserRating) 

1131 .filter_by( 

1132 card_id=card_id, 

1133 rating_type=RatingType.RELEVANCE, 

1134 rating_value="down", 

1135 ) 

1136 .count() 

1137 ) 

1138 

1139 results[card_id] = { 

1140 "upvotes": upvotes, 

1141 "downvotes": downvotes, 

1142 "user_vote": user_vote.rating_value if user_vote else None, 

1143 } 

1144 

1145 return {"success": True, "votes": results} 

1146 

1147 except Exception: 

1148 logger.exception("Error getting votes for cards") 

1149 raise 

1150 

1151 

1152def submit_feedback(card_id: str, user_id: str, vote: str) -> Dict[str, Any]: 

1153 """ 

1154 Submit feedback (vote) for a news card. 

1155 

1156 Args: 

1157 card_id: ID of the news card 

1158 user_id: User identifier (not used - per-user database) 

1159 vote: "up" or "down" 

1160 

1161 Returns: 

1162 Dictionary with updated vote counts 

1163 """ 

1164 from flask import session as flask_session, has_request_context 

1165 from sqlalchemy_utc import utcnow 

1166 from ..database.models.news import UserRating, RatingType 

1167 from ..database.session_context import get_user_db_session 

1168 

1169 try: 

1170 # Validate vote value 

1171 if vote not in ["up", "down"]: 

1172 raise ValueError(f"Invalid vote type: {vote}") 

1173 

1174 # Check if we're in a request context 

1175 if not has_request_context(): 

1176 # If called outside of request context (e.g., in tests), use user_id directly 

1177 username = user_id if user_id else None 

1178 if not username: 

1179 raise ValueError("No username provided and no request context") 

1180 else: 

1181 # Get username from session 

1182 username = flask_session.get("username") 

1183 if not username: 

1184 raise ValueError("No username in session") 

1185 

1186 # Get database session 

1187 with get_user_db_session(username) as db: 

1188 # We don't check if the card exists in the database since news items 

1189 # are generated dynamically and may not be stored as NewsCard entries 

1190 

1191 # Check if user already voted on this card 

1192 existing_rating = ( 

1193 db.query(UserRating) 

1194 .filter_by(card_id=card_id, rating_type=RatingType.RELEVANCE) 

1195 .first() 

1196 ) 

1197 

1198 if existing_rating: 

1199 # Update existing vote 

1200 existing_rating.rating_value = vote 

1201 existing_rating.created_at = utcnow() 

1202 else: 

1203 # Create new rating 

1204 new_rating = UserRating( 

1205 card_id=card_id, 

1206 rating_type=RatingType.RELEVANCE, 

1207 rating_value=vote, 

1208 ) 

1209 db.add(new_rating) 

1210 

1211 db.commit() 

1212 

1213 # Count total votes for this card 

1214 upvotes = ( 

1215 db.query(UserRating) 

1216 .filter_by( 

1217 card_id=card_id, 

1218 rating_type=RatingType.RELEVANCE, 

1219 rating_value="up", 

1220 ) 

1221 .count() 

1222 ) 

1223 

1224 downvotes = ( 

1225 db.query(UserRating) 

1226 .filter_by( 

1227 card_id=card_id, 

1228 rating_type=RatingType.RELEVANCE, 

1229 rating_value="down", 

1230 ) 

1231 .count() 

1232 ) 

1233 

1234 logger.info( 

1235 f"Feedback submitted for card {card_id}: {vote} (up: {upvotes}, down: {downvotes})" 

1236 ) 

1237 

1238 return { 

1239 "success": True, 

1240 "card_id": card_id, 

1241 "vote": vote, 

1242 "upvotes": upvotes, 

1243 "downvotes": downvotes, 

1244 } 

1245 

1246 except Exception: 

1247 logger.exception(f"Error submitting feedback for card {card_id}") 

1248 raise 

1249 

1250 

1251def research_news_item(card_id: str, depth: str = "quick") -> Dict[str, Any]: 

1252 """ 

1253 Perform deeper research on a news item. 

1254 

1255 Args: 

1256 card_id: ID of the news card to research 

1257 depth: Research depth - "quick", "detailed", or "report" 

1258 

1259 Returns: 

1260 Dictionary with research results 

1261 """ 

1262 # TODO: Implement with per-user database for cards 

1263 logger.warning( 

1264 "research_news_item not yet implemented with per-user databases" 

1265 ) 

1266 raise NotImplementedException("research_news_item") 

1267 

1268 

1269def save_news_preferences( 

1270 user_id: str, preferences: Dict[str, Any] 

1271) -> Dict[str, Any]: 

1272 """ 

1273 Save user preferences for news. 

1274 

1275 Args: 

1276 user_id: User identifier 

1277 preferences: Dictionary of preferences to save 

1278 

1279 Returns: 

1280 Dictionary with status and message 

1281 """ 

1282 # TODO: Implement with per-user database for preferences 

1283 logger.warning( 

1284 "save_news_preferences not yet implemented with per-user databases" 

1285 ) 

1286 raise NotImplementedException("save_news_preferences") 

1287 

1288 

1289def get_news_categories() -> Dict[str, Any]: 

1290 """ 

1291 Get available news categories with counts. 

1292 

1293 Returns: 

1294 Dictionary with categories and statistics 

1295 """ 

1296 # TODO: Implement with per-user database for categories 

1297 logger.warning( 

1298 "get_news_categories not yet implemented with per-user databases" 

1299 ) 

1300 raise NotImplementedException("get_news_categories")