Coverage for src / local_deep_research / news / api.py: 88%

403 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Direct API functions for news system. 

3These functions can be called directly by scheduler or wrapped by Flask endpoints. 

4""" 

5 

6from typing import Dict, Any, Optional 

7from datetime import datetime, timezone, UTC 

8from loguru import logger 

9import re 

10import json 

11 

12from ..constants import ResearchStatus 

13from .recommender.topic_based import TopicBasedRecommender 

14from .exceptions import ( 

15 InvalidLimitException, 

16 SubscriptionNotFoundException, 

17 SubscriptionCreationException, 

18 SubscriptionUpdateException, 

19 SubscriptionDeletionException, 

20 DatabaseAccessException, 

21 NewsFeedGenerationException, 

22 NotImplementedException, 

23 NewsAPIException, 

24) 

25# Removed welcome feed import - no placeholders 

26# get_db_setting not available in merged codebase 

27 

28 

29# Global recommender instance (can be reused) 

30_recommender = None 

31 

32 

33def get_recommender(): 

34 """Get or create recommender instance""" 

35 global _recommender 

36 if _recommender is None: 

37 _recommender = TopicBasedRecommender() 

38 return _recommender 

39 

40 

41def _notify_scheduler_about_subscription_change( 

42 action: str, user_id: str = None 

43): 

44 """ 

45 Notify the scheduler about subscription changes. 

46 

47 Args: 

48 action: The action performed (created, updated, deleted) 

49 user_id: Optional user_id to use as fallback for username 

50 """ 

51 try: 

52 from flask import session as flask_session 

53 from .subscription_manager.scheduler import get_news_scheduler 

54 

55 scheduler = get_news_scheduler() 

56 if scheduler.is_running: 

57 # Get username, with optional fallback to user_id 

58 username = flask_session.get("username") 

59 if not username and user_id: 

60 username = user_id 

61 

62 # Get password from session password store 

63 from ..database.session_passwords import session_password_store 

64 

65 session_id = flask_session.get("session_id") 

66 password = None 

67 if session_id and username: 67 ↛ 72line 67 didn't jump to line 72 because the condition on line 67 was always true

68 password = session_password_store.get_session_password( 

69 username, session_id 

70 ) 

71 

72 if password: 

73 # Update scheduler to reschedule subscriptions 

74 scheduler.update_user_info(username, password) 

75 logger.info( 

76 f"Scheduler notified about {action} subscription for {username}" 

77 ) 

78 else: 

79 logger.warning( 

80 f"Could not notify scheduler - no password available{' for ' + username if username else ''}" 

81 ) 

82 except Exception: 

83 logger.exception( 

84 f"Could not notify scheduler about {action} subscription" 

85 ) 

86 

87 

88def get_news_feed( 

89 user_id: str = "anonymous", 

90 limit: int = 20, 

91 use_cache: bool = True, 

92 focus: Optional[str] = None, 

93 search_strategy: Optional[str] = None, 

94 subscription_id: Optional[str] = None, 

95) -> Dict[str, Any]: 

96 """ 

97 Get personalized news feed by pulling from news_items table first, then research history. 

98 

99 Args: 

100 user_id: User identifier 

101 limit: Maximum number of cards to return 

102 use_cache: Whether to use cached news 

103 focus: Optional focus area for news 

104 search_strategy: Override default recommendation strategy 

105 

106 Returns: 

107 Dictionary with news items and metadata 

108 """ 

109 try: 

110 # Validate limit - allow any positive number 

111 if limit < 1: 

112 raise InvalidLimitException(limit) 

113 

114 logger.info( 

115 f"get_news_feed called with user_id={user_id}, limit={limit}" 

116 ) 

117 

118 # News is always enabled for now - per-user settings will be handled later 

119 # if not get_db_setting("news.enabled", True): 

120 # return {"error": "News system is disabled", "news_items": []} 

121 

122 # Import database functions 

123 from ..database.session_context import get_user_db_session 

124 from ..database.models import ResearchHistory 

125 

126 news_items = [] 

127 remaining_limit = limit 

128 

129 # Query research history from user's database for news items 

130 logger.info("Getting news items from research history") 

131 try: 

132 # Use the user_id provided to the function 

133 with get_user_db_session(user_id) as db_session: 

134 # Build query using ORM 

135 query = db_session.query(ResearchHistory).filter( 

136 ResearchHistory.status == ResearchStatus.COMPLETED 

137 ) 

138 

139 # Filter by subscription if provided 

140 if subscription_id and subscription_id != "all": 

141 # Use JSON containment for PostgreSQL or LIKE for SQLite 

142 query = query.filter( 

143 ResearchHistory.research_meta.like( 

144 f'%"subscription_id":"{subscription_id}"%' 

145 ) 

146 ) 

147 

148 # Order by creation date and limit 

149 results = ( 

150 query.order_by(ResearchHistory.created_at.desc()) 

151 .limit(remaining_limit * 2) 

152 .all() 

153 ) 

154 

155 # Convert ORM objects to dictionaries for compatibility 

156 results = [ 

157 { 

158 "id": r.id, 

159 "uuid_id": r.id, # In ResearchHistory, id is the UUID 

160 "query": r.query, 

161 "title": r.title 

162 if hasattr(r, "title") 

163 else None, # Include title field if exists 

164 "created_at": r.created_at if r.created_at else None, 

165 "completed_at": r.completed_at 

166 if r.completed_at 

167 else None, 

168 "duration_seconds": r.duration_seconds 

169 if hasattr(r, "duration_seconds") 

170 else None, 

171 "report_path": r.report_path 

172 if hasattr(r, "report_path") 

173 else None, 

174 "report_content": r.report_content 

175 if hasattr(r, "report_content") 

176 else None, # Include database content 

177 "research_meta": r.research_meta, 

178 "status": r.status, 

179 } 

180 for r in results 

181 ] 

182 

183 logger.info(f"Database returned {len(results)} research items") 

184 if results and len(results) > 0: 

185 logger.info(f"First row keys: {list(results[0].keys())}") 

186 # Log first few items' metadata 

187 for i, row in enumerate(results[:3]): 

188 logger.info( 

189 f"Item {i}: query='{row['query'][:50]}...', has meta={bool(row.get('research_meta'))}" 

190 ) 

191 

192 # Process results to find news items 

193 processed_count = 0 

194 error_count = 0 

195 

196 for row in results: 

197 try: 

198 # Parse metadata 

199 metadata = {} 

200 if row.get("research_meta"): 

201 try: 

202 # Handle both dict and string formats 

203 if isinstance(row["research_meta"], dict): 

204 metadata = row["research_meta"] 

205 else: 

206 metadata = json.loads(row["research_meta"]) 

207 except (json.JSONDecodeError, TypeError): 

208 logger.exception("Error parsing metadata") 

209 metadata = {} 

210 

211 # Check if this has news metadata (generated_headline or generated_topics) 

212 # or if it's a news-related query 

213 has_news_metadata = ( 

214 metadata.get("generated_headline") is not None 

215 or metadata.get("generated_topics") is not None 

216 ) 

217 

218 query_lower = row["query"].lower() 

219 is_news_query = ( 

220 has_news_metadata 

221 or metadata.get("is_news_search") 

222 or metadata.get("search_type") == "news_analysis" 

223 or "breaking news" in query_lower 

224 or "news stories" in query_lower 

225 or ( 

226 "today" in query_lower 

227 and ( 

228 "news" in query_lower 

229 or "breaking" in query_lower 

230 ) 

231 ) 

232 or "latest news" in query_lower 

233 ) 

234 

235 # Log the decision for first few items 

236 if processed_count < 3 or error_count < 3: 236 ↛ 243line 236 didn't jump to line 243 because the condition on line 236 was always true

237 logger.info( 

238 f"Item check: query='{row['query'][:30]}...', is_news_search={metadata.get('is_news_search')}, " 

239 f"has_news_metadata={has_news_metadata}, is_news_query={is_news_query}" 

240 ) 

241 

242 # Only show items that have news metadata or are news queries 

243 if is_news_query: 

244 processed_count += 1 

245 logger.info( 

246 f"Processing research item #{processed_count}: {row['query'][:50]}..." 

247 ) 

248 

249 # Always use database content 

250 findings = "" 

251 summary = "" 

252 report_content_db = row.get( 

253 "report_content" 

254 ) # Get database content 

255 

256 # Use database content 

257 content = report_content_db 

258 if content: 

259 logger.debug( 

260 f"Using database content for research {row['id']}" 

261 ) 

262 

263 # Process database content 

264 lines = content.split("\n") if content else [] 

265 # Use full content as findings 

266 findings = content 

267 # Extract summary from first non-empty line 

268 for line in lines: 268 ↛ 278line 268 didn't jump to line 278 because the loop on line 268 didn't complete

269 if line.strip() and not line.startswith("#"): 

270 summary = line.strip() 

271 break 

272 else: 

273 logger.debug( 

274 f"No database content for research {row['id']}" 

275 ) 

276 

277 # Use stored headline/topics if available, otherwise generate 

278 original_query = row["query"] 

279 

280 # Check for headline - first try database title, then metadata 

281 headline = row.get("title") or metadata.get( 

282 "generated_headline" 

283 ) 

284 

285 # For subscription results, generate headline from query if needed 

286 if not headline and metadata.get("is_news_search"): 

287 # Use subscription name or query as headline 

288 subscription_name = metadata.get( 

289 "subscription_name" 

290 ) 

291 if subscription_name: 

292 headline = f"News Update: {subscription_name}" 

293 else: 

294 # Generate headline from query 

295 headline = f"News: {row['query'][:60]}..." 

296 

297 # Skip items without meaningful headlines or that are incomplete 

298 if ( 

299 not headline 

300 or headline == "[No headline available]" 

301 ): 

302 logger.debug( 

303 f"Skipping item without headline: {row['id']}" 

304 ) 

305 continue 

306 

307 # Skip items that are still in progress or suspended 

308 if row["status"] in ( 

309 ResearchStatus.IN_PROGRESS, 

310 ResearchStatus.SUSPENDED, 

311 ): 

312 logger.debug( 

313 f"Skipping incomplete item: {row['id']} (status: {row['status']})" 

314 ) 

315 continue 

316 

317 # Skip items without content (neither file nor database) 

318 if not content: 

319 logger.debug( 

320 f"Skipping item without content: {row['id']}" 

321 ) 

322 continue 

323 

324 # Use ID properly, preferring uuid_id 

325 research_id = row.get("uuid_id") or str(row["id"]) 

326 

327 # Use stored category and topics - no defaults 

328 category = metadata.get("category") 

329 if not category: 329 ↛ 332line 329 didn't jump to line 332 because the condition on line 329 was always true

330 category = "[Uncategorized]" 

331 

332 topics = metadata.get("generated_topics") 

333 if not topics: 333 ↛ 337line 333 didn't jump to line 337 because the condition on line 333 was always true

334 topics = ["[No topics]"] 

335 

336 # Extract top 3 links from the database content 

337 links = [] 

338 if content: 338 ↛ 390line 338 didn't jump to line 390 because the condition on line 338 was always true

339 try: 

340 report_lines = content.split("\n") 

341 link_count = 0 

342 for i, line in enumerate( 

343 report_lines[:100] 

344 ): # Check first 100 lines for links 

345 if "URL:" in line: 

346 url = line.split("URL:", 1)[1].strip() 

347 if url.startswith("http"): 347 ↛ 342line 347 didn't jump to line 342 because the condition on line 347 was always true

348 # Get the title from the previous line if available 

349 title = "" 

350 if i > 0: 350 ↛ 361line 350 didn't jump to line 361 because the condition on line 350 was always true

351 title_line = report_lines[ 

352 i - 1 

353 ].strip() 

354 # Remove citation numbers like [12, 26, 19] 

355 title = re.sub( 

356 r"^\[[^\]]+\]\s*", 

357 "", 

358 title_line, 

359 ).strip() 

360 

361 if not title: 361 ↛ 363line 361 didn't jump to line 363 because the condition on line 361 was never true

362 # Use domain as fallback 

363 domain = url.split("//")[ 

364 -1 

365 ].split("/")[0] 

366 title = domain.replace( 

367 "www.", "" 

368 ) 

369 

370 links.append( 

371 { 

372 "url": url, 

373 "title": title[:50] + "..." 

374 if len(title) > 50 

375 else title, 

376 } 

377 ) 

378 link_count += 1 

379 logger.debug( 

380 f"Found link: {title} - {url}" 

381 ) 

382 if link_count >= 3: 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true

383 break 

384 except Exception: 

385 logger.exception( 

386 "Error extracting links from database content" 

387 ) 

388 

389 # Create news item from research 

390 news_item = { 

391 "id": f"news-{research_id}", 

392 "headline": headline, 

393 "category": category, 

394 "summary": summary 

395 or f"Research analysis for: {headline[:100]}", 

396 "findings": findings, 

397 "impact_score": metadata.get( 

398 "impact_score", 0 

399 ), # 0 indicates missing 

400 "time_ago": _format_time_ago(row["created_at"]), 

401 "upvotes": metadata.get("upvotes", 0), 

402 "downvotes": metadata.get("downvotes", 0), 

403 "source_url": f"/results/{research_id}", 

404 "topics": topics, # Use generated topics 

405 "links": links, # Add extracted links 

406 "research_id": research_id, 

407 "created_at": row["created_at"], 

408 "duration_seconds": row.get("duration_seconds", 0), 

409 "original_query": original_query, # Keep original query for reference 

410 "is_news": metadata.get( 

411 "is_news_search", False 

412 ), # Flag for news searches 

413 "news_date": metadata.get( 

414 "news_date" 

415 ), # If specific date for news 

416 "news_source": metadata.get( 

417 "news_source" 

418 ), # If from specific source 

419 "priority": metadata.get( 

420 "priority", "normal" 

421 ), # Priority level 

422 } 

423 

424 news_items.append(news_item) 

425 logger.info(f"Added news item: {headline[:50]}...") 

426 

427 if len(news_items) >= limit: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true

428 break 

429 

430 except Exception: 

431 error_count += 1 

432 logger.exception( 

433 f"Error processing research item with query: {row.get('query', 'UNKNOWN')[:100]}" 

434 ) 

435 continue 

436 

437 logger.info( 

438 f"Processing summary: total_results={len(results)}, processed={processed_count}, " 

439 f"errors={error_count}, added={len(news_items)}" 

440 ) 

441 

442 # Log subscription-specific items if we were filtering 

443 if subscription_id and subscription_id != "all": 

444 sub_items = [ 

445 item for item in news_items if item.get("is_news", False) 

446 ] 

447 logger.info( 

448 f"Subscription {subscription_id}: found {len(sub_items)} items" 

449 ) 

450 

451 except Exception as db_error: 

452 logger.exception(f"Database error in research history: {db_error}") 

453 raise DatabaseAccessException( 

454 "research_history_query", str(db_error) 

455 ) 

456 

457 # If no news items found, return empty list 

458 if not news_items: 

459 logger.info("No news items found, returning empty list") 

460 news_items = [] 

461 

462 logger.info(f"Returning {len(news_items)} news items to client") 

463 

464 # Determine the source 

465 source = ( 

466 "news_items" 

467 if any(item.get("is_news", False) for item in news_items) 

468 else "research_history" 

469 ) 

470 

471 return { 

472 "news_items": news_items[:limit], 

473 "generated_at": datetime.now(timezone.utc).isoformat(), 

474 "focus": focus, 

475 "search_strategy": search_strategy or "default", 

476 "total_items": len(news_items), 

477 "source": source, 

478 } 

479 

480 except NewsAPIException: 

481 # Re-raise our custom exceptions 

482 raise 

483 except Exception as e: 

484 logger.exception("Error getting news feed") 

485 raise NewsFeedGenerationException(str(e), user_id=user_id) 

486 

487 

488def get_subscription_history( 

489 subscription_id: str, limit: int = 20 

490) -> Dict[str, Any]: 

491 """ 

492 Get research history for a specific subscription. 

493 

494 Args: 

495 subscription_id: The subscription UUID 

496 limit: Maximum number of history items to return 

497 

498 Returns: 

499 Dict containing subscription info and its research history 

500 """ 

501 try: 

502 from ..database.session_context import get_user_db_session 

503 from ..database.models import ResearchHistory 

504 from ..database.models.news import NewsSubscription 

505 

506 # Get subscription details using ORM from user's encrypted database 

507 with get_user_db_session() as session: 

508 subscription = ( 

509 session.query(NewsSubscription) 

510 .filter_by(id=subscription_id) 

511 .first() 

512 ) 

513 

514 if not subscription: 

515 raise SubscriptionNotFoundException(subscription_id) 

516 

517 # Convert to dict for response 

518 subscription_dict = { 

519 "id": subscription.id, 

520 "query_or_topic": subscription.query_or_topic, 

521 "subscription_type": subscription.subscription_type, 

522 "refresh_interval_minutes": subscription.refresh_interval_minutes, 

523 "refresh_count": subscription.refresh_count or 0, 

524 "created_at": subscription.created_at.isoformat() 

525 if subscription.created_at 

526 else None, 

527 "next_refresh": subscription.next_refresh.isoformat() 

528 if subscription.next_refresh 

529 else None, 

530 } 

531 

532 # Now get research history from the research database 

533 # Get user_id from subscription 

534 sub_user_id = subscription_dict.get("user_id", "anonymous") 

535 

536 with get_user_db_session(sub_user_id) as db_session: 

537 # Get all research runs that were triggered by this subscription 

538 # Look for subscription_id in the research_meta JSON 

539 # Note: JSON format has space after colon 

540 like_pattern = f'%"subscription_id": "{subscription_id}"%' 

541 logger.info( 

542 f"Searching for research history with pattern: {like_pattern}" 

543 ) 

544 

545 history_items = ( 

546 db_session.query(ResearchHistory) 

547 .filter(ResearchHistory.research_meta.like(like_pattern)) 

548 .order_by(ResearchHistory.created_at.desc()) 

549 .limit(limit) 

550 .all() 

551 ) 

552 

553 # Convert to dict format for compatibility 

554 history_items = [ 

555 { 

556 "id": h.id, 

557 "uuid_id": h.uuid_id, 

558 "query": h.query, 

559 "status": h.status, 

560 "created_at": h.created_at.isoformat() 

561 if h.created_at 

562 else None, 

563 "completed_at": h.completed_at.isoformat() 

564 if h.completed_at 

565 else None, 

566 "duration_seconds": h.duration_seconds, 

567 "research_meta": h.research_meta, 

568 "report_path": h.report_path, 

569 } 

570 for h in history_items 

571 ] 

572 

573 # Process history items 

574 processed_history = [] 

575 for item in history_items: 

576 processed_item = { 

577 "research_id": item.get("uuid_id") or str(item.get("id")), 

578 "query": item["query"], 

579 "status": item["status"], 

580 "created_at": item["created_at"], 

581 "completed_at": item.get("completed_at"), 

582 "duration_seconds": item.get("duration_seconds", 0), 

583 "url": f"/progress/{item.get('uuid_id') or item.get('id')}", 

584 } 

585 

586 # Parse metadata if available to get headline and topics 

587 if item.get("research_meta"): 587 ↛ 602line 587 didn't jump to line 602 because the condition on line 587 was always true

588 try: 

589 meta = json.loads(item["research_meta"]) 

590 processed_item["triggered_by"] = meta.get( 

591 "triggered_by", "subscription" 

592 ) 

593 # Add headline and topics from metadata 

594 processed_item["headline"] = meta.get( 

595 "generated_headline", "[No headline]" 

596 ) 

597 processed_item["topics"] = meta.get("generated_topics", []) 

598 except Exception: 

599 processed_item["headline"] = "[No headline]" 

600 processed_item["topics"] = [] 

601 else: 

602 processed_item["headline"] = "[No headline]" 

603 processed_item["topics"] = [] 

604 

605 processed_history.append(processed_item) 

606 

607 return { 

608 "subscription": subscription_dict, 

609 "history": processed_history, 

610 "total_runs": len(processed_history), 

611 } 

612 

613 except NewsAPIException: 

614 # Re-raise our custom exceptions 

615 raise 

616 except Exception as e: 

617 logger.exception("Error getting subscription history") 

618 raise DatabaseAccessException("get_subscription_history", str(e)) 

619 

620 

621def _format_time_ago(timestamp: str) -> str: 

622 """Format timestamp as 'X hours ago' string.""" 

623 try: 

624 from dateutil import parser 

625 from loguru import logger 

626 

627 dt = parser.parse(timestamp) 

628 

629 # If dt is naive, assume it's in UTC 

630 if dt.tzinfo is None: 

631 dt = dt.replace(tzinfo=timezone.utc) 

632 

633 now = datetime.now(timezone.utc) 

634 diff = now - dt 

635 

636 if diff.days > 0: 

637 return f"{diff.days} day{'s' if diff.days > 1 else ''} ago" 

638 elif diff.seconds > 3600: 

639 hours = diff.seconds // 3600 

640 return f"{hours} hour{'s' if hours > 1 else ''} ago" 

641 elif diff.seconds > 60: 

642 minutes = diff.seconds // 60 

643 return f"{minutes} minute{'s' if minutes > 1 else ''} ago" 

644 else: 

645 return "Just now" 

646 except Exception: 

647 logger.exception(f"Error parsing timestamp '{timestamp}'") 

648 return "Recently" 

649 

650 

651def get_subscription(subscription_id: str) -> Optional[Dict[str, Any]]: 

652 """ 

653 Get a single subscription by ID. 

654 

655 Args: 

656 subscription_id: Subscription identifier 

657 

658 Returns: 

659 Dictionary with subscription data or None if not found 

660 """ 

661 try: 

662 # Get subscription directly from user's encrypted database 

663 from ..database.session_context import get_user_db_session 

664 from ..database.models.news import NewsSubscription 

665 

666 with get_user_db_session() as db_session: 

667 subscription = ( 

668 db_session.query(NewsSubscription) 

669 .filter_by(id=subscription_id) 

670 .first() 

671 ) 

672 

673 if not subscription: 

674 raise SubscriptionNotFoundException(subscription_id) 

675 

676 # Convert to API format matching the template expectations 

677 return { 

678 "id": subscription.id, 

679 "name": subscription.name or "", 

680 "query_or_topic": subscription.query_or_topic, 

681 "subscription_type": subscription.subscription_type, 

682 "refresh_interval_minutes": subscription.refresh_interval_minutes, 

683 "is_active": subscription.status == "active", 

684 "status": subscription.status, 

685 "folder_id": subscription.folder_id, 

686 "model_provider": subscription.model_provider, 

687 "model": subscription.model, 

688 "search_strategy": subscription.search_strategy, 

689 "custom_endpoint": subscription.custom_endpoint, 

690 "search_engine": subscription.search_engine, 

691 "search_iterations": subscription.search_iterations or 3, 

692 "questions_per_iteration": subscription.questions_per_iteration 

693 or 5, 

694 "created_at": subscription.created_at.isoformat() 

695 if subscription.created_at 

696 else None, 

697 "updated_at": subscription.updated_at.isoformat() 

698 if subscription.updated_at 

699 else None, 

700 } 

701 

702 except NewsAPIException: 

703 # Re-raise our custom exceptions 

704 raise 

705 except Exception as e: 

706 logger.exception(f"Error getting subscription {subscription_id}") 

707 raise DatabaseAccessException("get_subscription", str(e)) 

708 

709 

710def get_subscriptions(user_id: str) -> Dict[str, Any]: 

711 """ 

712 Get all subscriptions for a user. 

713 

714 Args: 

715 user_id: User identifier 

716 

717 Returns: 

718 Dictionary with subscriptions list 

719 """ 

720 try: 

721 # Get subscriptions directly from user's encrypted database 

722 from ..database.session_context import get_user_db_session 

723 from ..database.models import ResearchHistory 

724 from ..database.models.news import NewsSubscription 

725 from sqlalchemy import func 

726 

727 sub_list = [] 

728 

729 with get_user_db_session(user_id) as db_session: 

730 # Query all subscriptions for this user 

731 subscriptions = db_session.query(NewsSubscription).all() 

732 

733 for sub in subscriptions: 

734 # Count actual research runs for this subscription 

735 like_pattern = f'%"subscription_id": "{sub.id}"%' 

736 total_runs = ( 

737 db_session.query(func.count(ResearchHistory.id)) 

738 .filter(ResearchHistory.research_meta.like(like_pattern)) 

739 .scalar() 

740 or 0 

741 ) 

742 

743 # Convert ORM object to API format 

744 sub_dict = { 

745 "id": sub.id, 

746 "query": sub.query_or_topic, 

747 "type": sub.subscription_type, 

748 "refresh_minutes": sub.refresh_interval_minutes, 

749 "created_at": sub.created_at.isoformat() 

750 if sub.created_at 

751 else None, 

752 "next_refresh": sub.next_refresh.isoformat() 

753 if sub.next_refresh 

754 else None, 

755 "last_refreshed": sub.last_refresh.isoformat() 

756 if sub.last_refresh 

757 else None, 

758 "is_active": sub.status == "active", 

759 "total_runs": total_runs, # Use actual count from research_history 

760 "name": sub.name or "", 

761 "folder_id": sub.folder_id, 

762 } 

763 sub_list.append(sub_dict) 

764 

765 return {"subscriptions": sub_list, "total": len(sub_list)} 

766 

767 except Exception as e: 

768 logger.exception("Error getting subscriptions") 

769 raise DatabaseAccessException("get_subscriptions", str(e)) 

770 

771 

772def update_subscription( 

773 subscription_id: str, data: Dict[str, Any] 

774) -> Dict[str, Any]: 

775 """ 

776 Update an existing subscription. 

777 

778 Args: 

779 subscription_id: Subscription identifier 

780 data: Dictionary with fields to update 

781 

782 Returns: 

783 Dictionary with updated subscription data 

784 """ 

785 try: 

786 from ..database.session_context import get_user_db_session 

787 from ..database.models.news import NewsSubscription 

788 from datetime import datetime, timedelta 

789 

790 with get_user_db_session() as db_session: 

791 # Get existing subscription 

792 subscription = ( 

793 db_session.query(NewsSubscription) 

794 .filter_by(id=subscription_id) 

795 .first() 

796 ) 

797 if not subscription: 

798 raise SubscriptionNotFoundException(subscription_id) 

799 

800 # Update fields 

801 if "name" in data: 

802 subscription.name = data["name"] 

803 if "query_or_topic" in data: 

804 subscription.query_or_topic = data["query_or_topic"] 

805 if "subscription_type" in data: 805 ↛ 806line 805 didn't jump to line 806 because the condition on line 805 was never true

806 subscription.subscription_type = data["subscription_type"] 

807 if "refresh_interval_minutes" in data: 

808 old_interval = subscription.refresh_interval_minutes 

809 subscription.refresh_interval_minutes = data[ 

810 "refresh_interval_minutes" 

811 ] 

812 # Recalculate next_refresh if interval changed 

813 if old_interval != subscription.refresh_interval_minutes: 813 ↛ 817line 813 didn't jump to line 817 because the condition on line 813 was always true

814 subscription.next_refresh = datetime.now(UTC) + timedelta( 

815 minutes=subscription.refresh_interval_minutes 

816 ) 

817 if "is_active" in data: 

818 subscription.status = ( 

819 "active" if data["is_active"] else "paused" 

820 ) 

821 if "status" in data: 821 ↛ 822line 821 didn't jump to line 822 because the condition on line 821 was never true

822 subscription.status = data["status"] 

823 if "folder_id" in data: 

824 subscription.folder_id = data["folder_id"] 

825 if "model_provider" in data: 

826 subscription.model_provider = data["model_provider"] 

827 if "model" in data: 

828 subscription.model = data["model"] 

829 if "search_strategy" in data: 829 ↛ 830line 829 didn't jump to line 830 because the condition on line 829 was never true

830 subscription.search_strategy = data["search_strategy"] 

831 if "custom_endpoint" in data: 831 ↛ 832line 831 didn't jump to line 832 because the condition on line 831 was never true

832 subscription.custom_endpoint = data["custom_endpoint"] 

833 if "search_engine" in data: 

834 subscription.search_engine = data["search_engine"] 

835 if "search_iterations" in data: 

836 subscription.search_iterations = data["search_iterations"] 

837 if "questions_per_iteration" in data: 

838 subscription.questions_per_iteration = data[ 

839 "questions_per_iteration" 

840 ] 

841 

842 # Update timestamp 

843 subscription.updated_at = datetime.now(UTC) 

844 

845 # Commit changes 

846 db_session.commit() 

847 

848 # Notify scheduler about updated subscription 

849 _notify_scheduler_about_subscription_change("updated") 

850 

851 # Convert to API format 

852 return { 

853 "status": "success", 

854 "subscription": { 

855 "id": subscription.id, 

856 "name": subscription.name or "", 

857 "query_or_topic": subscription.query_or_topic, 

858 "subscription_type": subscription.subscription_type, 

859 "refresh_interval_minutes": subscription.refresh_interval_minutes, 

860 "is_active": subscription.status == "active", 

861 "status": subscription.status, 

862 "folder_id": subscription.folder_id, 

863 "model_provider": subscription.model_provider, 

864 "model": subscription.model, 

865 "search_strategy": subscription.search_strategy, 

866 "custom_endpoint": subscription.custom_endpoint, 

867 "search_engine": subscription.search_engine, 

868 "search_iterations": subscription.search_iterations or 3, 

869 "questions_per_iteration": subscription.questions_per_iteration 

870 or 5, 

871 }, 

872 } 

873 

874 except NewsAPIException: 

875 # Re-raise our custom exceptions 

876 raise 

877 except Exception as e: 

878 logger.exception("Error updating subscription") 

879 raise SubscriptionUpdateException(subscription_id, str(e)) 

880 

881 

882def create_subscription( 

883 user_id: str, 

884 query: str, 

885 subscription_type: str = "search", 

886 refresh_minutes: int = None, 

887 source_research_id: Optional[str] = None, 

888 model_provider: Optional[str] = None, 

889 model: Optional[str] = None, 

890 search_strategy: Optional[str] = None, 

891 custom_endpoint: Optional[str] = None, 

892 name: Optional[str] = None, 

893 folder_id: Optional[str] = None, 

894 is_active: bool = True, 

895 search_engine: Optional[str] = None, 

896 search_iterations: Optional[int] = None, 

897 questions_per_iteration: Optional[int] = None, 

898) -> Dict[str, Any]: 

899 """ 

900 Create a new subscription for user. 

901 

902 Args: 

903 user_id: User identifier 

904 query: Search query or topic 

905 subscription_type: "search" or "topic" 

906 refresh_minutes: Refresh interval in minutes 

907 

908 Returns: 

909 Dictionary with subscription details 

910 """ 

911 try: 

912 from ..database.session_context import get_user_db_session 

913 from ..database.models.news import NewsSubscription 

914 from datetime import datetime, timedelta 

915 import uuid 

916 

917 # Get default refresh interval from settings if not provided 

918 # NOTE: This API function accesses the settings DB for convenience when used 

919 # within the Flask application context. For programmatic API access outside 

920 # the web context, callers should provide refresh_minutes explicitly to avoid 

921 # dependency on the settings database being initialized. 

922 

923 with get_user_db_session(user_id) as db_session: 

924 if refresh_minutes is None: 

925 try: 

926 from ..utilities.db_utils import get_settings_manager 

927 

928 settings_manager = get_settings_manager(db_session, user_id) 

929 refresh_minutes = settings_manager.get_setting( 

930 "news.subscription.refresh_minutes", 240 

931 ) 

932 except (ImportError, AttributeError, TypeError): 

933 # Fallback for when settings DB is not available (e.g., programmatic API usage) 

934 logger.debug( 

935 "Settings manager not available, using default refresh_minutes" 

936 ) 

937 refresh_minutes = 240 # Default to 4 hours 

938 # Create new subscription 

939 subscription = NewsSubscription( 

940 id=str(uuid.uuid4()), 

941 name=name, 

942 query_or_topic=query, 

943 subscription_type=subscription_type, 

944 refresh_interval_minutes=refresh_minutes, 

945 status="active" if is_active else "paused", 

946 model_provider=model_provider, 

947 model=model, 

948 search_strategy=search_strategy or "news_aggregation", 

949 custom_endpoint=custom_endpoint, 

950 folder_id=folder_id, 

951 search_engine=search_engine, 

952 search_iterations=search_iterations, 

953 questions_per_iteration=questions_per_iteration, 

954 created_at=datetime.now(UTC), 

955 updated_at=datetime.now(UTC), 

956 last_refresh=None, 

957 next_refresh=datetime.now(UTC) 

958 + timedelta(minutes=refresh_minutes), 

959 source_id=source_research_id, 

960 ) 

961 

962 # Add to database 

963 db_session.add(subscription) 

964 db_session.commit() 

965 

966 # Notify scheduler about new subscription 

967 _notify_scheduler_about_subscription_change("created", user_id) 

968 

969 return { 

970 "status": "success", 

971 "subscription_id": subscription.id, 

972 "type": subscription_type, 

973 "query": query, 

974 "refresh_minutes": refresh_minutes, 

975 } 

976 

977 except Exception as e: 

978 logger.exception("Error creating subscription") 

979 raise SubscriptionCreationException( 

980 str(e), {"query": query, "type": subscription_type} 

981 ) 

982 

983 

984def delete_subscription(subscription_id: str) -> Dict[str, Any]: 

985 """ 

986 Delete a subscription. 

987 

988 Args: 

989 subscription_id: ID of subscription to delete 

990 

991 Returns: 

992 Dictionary with status 

993 """ 

994 try: 

995 from ..database.session_context import get_user_db_session 

996 from ..database.models.news import NewsSubscription 

997 

998 with get_user_db_session() as db_session: 

999 subscription = ( 

1000 db_session.query(NewsSubscription) 

1001 .filter_by(id=subscription_id) 

1002 .first() 

1003 ) 

1004 if subscription: 

1005 db_session.delete(subscription) 

1006 db_session.commit() 

1007 

1008 # Notify scheduler about deleted subscription 

1009 _notify_scheduler_about_subscription_change("deleted") 

1010 

1011 return {"status": "success", "deleted": subscription_id} 

1012 else: 

1013 raise SubscriptionNotFoundException(subscription_id) 

1014 except NewsAPIException: 

1015 # Re-raise our custom exceptions 

1016 raise 

1017 except Exception as e: 

1018 logger.exception("Error deleting subscription") 

1019 raise SubscriptionDeletionException(subscription_id, str(e)) 

1020 

1021 

1022def get_votes_for_cards(card_ids: list, user_id: str) -> Dict[str, Any]: 

1023 """ 

1024 Get vote counts and user's votes for multiple news cards. 

1025 

1026 Args: 

1027 card_ids: List of card IDs to get votes for 

1028 user_id: User identifier (not used - per-user database) 

1029 

1030 Returns: 

1031 Dictionary with vote information for each card 

1032 """ 

1033 from flask import session as flask_session, has_request_context 

1034 from ..database.models.news import UserRating, RatingType 

1035 from ..database.session_context import get_user_db_session 

1036 

1037 try: 

1038 # Check if we're in a request context 

1039 if not has_request_context(): 1039 ↛ 1046line 1039 didn't jump to line 1046 because the condition on line 1039 was always true

1040 # If called outside of request context (e.g., in tests), use user_id directly 

1041 username = user_id if user_id else None 

1042 if not username: 

1043 raise ValueError("No username provided and no request context") 

1044 else: 

1045 # Get username from session 

1046 username = flask_session.get("username") 

1047 if not username: 

1048 raise ValueError("No username in session") 

1049 

1050 # Get database session 

1051 with get_user_db_session(username) as db: 

1052 results = {} 

1053 

1054 for card_id in card_ids: 

1055 # Get user's vote for this card 

1056 user_vote = ( 

1057 db.query(UserRating) 

1058 .filter_by( 

1059 card_id=card_id, rating_type=RatingType.RELEVANCE 

1060 ) 

1061 .first() 

1062 ) 

1063 

1064 # Count total votes for this card 

1065 upvotes = ( 

1066 db.query(UserRating) 

1067 .filter_by( 

1068 card_id=card_id, 

1069 rating_type=RatingType.RELEVANCE, 

1070 rating_value="up", 

1071 ) 

1072 .count() 

1073 ) 

1074 

1075 downvotes = ( 

1076 db.query(UserRating) 

1077 .filter_by( 

1078 card_id=card_id, 

1079 rating_type=RatingType.RELEVANCE, 

1080 rating_value="down", 

1081 ) 

1082 .count() 

1083 ) 

1084 

1085 results[card_id] = { 

1086 "upvotes": upvotes, 

1087 "downvotes": downvotes, 

1088 "user_vote": user_vote.rating_value if user_vote else None, 

1089 } 

1090 

1091 return {"success": True, "votes": results} 

1092 

1093 except Exception: 

1094 logger.exception("Error getting votes for cards") 

1095 raise 

1096 

1097 

1098def submit_feedback(card_id: str, user_id: str, vote: str) -> Dict[str, Any]: 

1099 """ 

1100 Submit feedback (vote) for a news card. 

1101 

1102 Args: 

1103 card_id: ID of the news card 

1104 user_id: User identifier (not used - per-user database) 

1105 vote: "up" or "down" 

1106 

1107 Returns: 

1108 Dictionary with updated vote counts 

1109 """ 

1110 from flask import session as flask_session, has_request_context 

1111 from sqlalchemy_utc import utcnow 

1112 from ..database.models.news import UserRating, RatingType 

1113 from ..database.session_context import get_user_db_session 

1114 

1115 try: 

1116 # Validate vote value 

1117 if vote not in ["up", "down"]: 

1118 raise ValueError(f"Invalid vote type: {vote}") 

1119 

1120 # Check if we're in a request context 

1121 if not has_request_context(): 1121 ↛ 1128line 1121 didn't jump to line 1128 because the condition on line 1121 was always true

1122 # If called outside of request context (e.g., in tests), use user_id directly 

1123 username = user_id if user_id else None 

1124 if not username: 

1125 raise ValueError("No username provided and no request context") 

1126 else: 

1127 # Get username from session 

1128 username = flask_session.get("username") 

1129 if not username: 

1130 raise ValueError("No username in session") 

1131 

1132 # Get database session 

1133 with get_user_db_session(username) as db: 

1134 # We don't check if the card exists in the database since news items 

1135 # are generated dynamically and may not be stored as NewsCard entries 

1136 

1137 # Check if user already voted on this card 

1138 existing_rating = ( 

1139 db.query(UserRating) 

1140 .filter_by(card_id=card_id, rating_type=RatingType.RELEVANCE) 

1141 .first() 

1142 ) 

1143 

1144 if existing_rating: 

1145 # Update existing vote 

1146 existing_rating.rating_value = vote 

1147 existing_rating.created_at = utcnow() 

1148 else: 

1149 # Create new rating 

1150 new_rating = UserRating( 

1151 card_id=card_id, 

1152 rating_type=RatingType.RELEVANCE, 

1153 rating_value=vote, 

1154 ) 

1155 db.add(new_rating) 

1156 

1157 db.commit() 

1158 

1159 # Count total votes for this card 

1160 upvotes = ( 

1161 db.query(UserRating) 

1162 .filter_by( 

1163 card_id=card_id, 

1164 rating_type=RatingType.RELEVANCE, 

1165 rating_value="up", 

1166 ) 

1167 .count() 

1168 ) 

1169 

1170 downvotes = ( 

1171 db.query(UserRating) 

1172 .filter_by( 

1173 card_id=card_id, 

1174 rating_type=RatingType.RELEVANCE, 

1175 rating_value="down", 

1176 ) 

1177 .count() 

1178 ) 

1179 

1180 logger.info( 

1181 f"Feedback submitted for card {card_id}: {vote} (up: {upvotes}, down: {downvotes})" 

1182 ) 

1183 

1184 return { 

1185 "success": True, 

1186 "card_id": card_id, 

1187 "vote": vote, 

1188 "upvotes": upvotes, 

1189 "downvotes": downvotes, 

1190 } 

1191 

1192 except Exception: 

1193 logger.exception(f"Error submitting feedback for card {card_id}") 

1194 raise 

1195 

1196 

1197def research_news_item(card_id: str, depth: str = "quick") -> Dict[str, Any]: 

1198 """ 

1199 Perform deeper research on a news item. 

1200 

1201 Args: 

1202 card_id: ID of the news card to research 

1203 depth: Research depth - "quick", "detailed", or "report" 

1204 

1205 Returns: 

1206 Dictionary with research results 

1207 """ 

1208 # TODO: Implement with per-user database for cards 

1209 logger.warning( 

1210 "research_news_item not yet implemented with per-user databases" 

1211 ) 

1212 raise NotImplementedException("research_news_item") 

1213 

1214 

1215def save_news_preferences( 

1216 user_id: str, preferences: Dict[str, Any] 

1217) -> Dict[str, Any]: 

1218 """ 

1219 Save user preferences for news. 

1220 

1221 Args: 

1222 user_id: User identifier 

1223 preferences: Dictionary of preferences to save 

1224 

1225 Returns: 

1226 Dictionary with status and message 

1227 """ 

1228 # TODO: Implement with per-user database for preferences 

1229 logger.warning( 

1230 "save_news_preferences not yet implemented with per-user databases" 

1231 ) 

1232 raise NotImplementedException("save_news_preferences") 

1233 

1234 

1235def get_news_categories() -> Dict[str, Any]: 

1236 """ 

1237 Get available news categories with counts. 

1238 

1239 Returns: 

1240 Dictionary with categories and statistics 

1241 """ 

1242 # TODO: Implement with per-user database for categories 

1243 logger.warning( 

1244 "get_news_categories not yet implemented with per-user databases" 

1245 ) 

1246 raise NotImplementedException("get_news_categories")