Coverage for src/local_deep_research/news/api.py: 89%

372 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Direct API functions for news system. 

3These functions can be called directly by scheduler or wrapped by Flask endpoints. 

4""" 

5 

6from typing import Dict, Any, Optional 

7from datetime import datetime, timezone, UTC 

8from loguru import logger 

9import json 

10 

11from ..constants import ResearchStatus 

12from ..llm.providers.base import normalize_provider 

13from .exceptions import ( 

14 InvalidLimitException, 

15 SubscriptionNotFoundException, 

16 SubscriptionCreationException, 

17 SubscriptionUpdateException, 

18 SubscriptionDeletionException, 

19 DatabaseAccessException, 

20 NewsFeedGenerationException, 

21 NotImplementedException, 

22 NewsAPIException, 

23) 

24# Removed welcome feed import - no placeholders 

25# get_db_setting not available in merged codebase 

26 

27 

28def _notify_scheduler_about_subscription_change( 

29 action: str, user_id: Optional[str] = None 

30): 

31 """ 

32 Notify the scheduler about subscription changes. 

33 

34 Args: 

35 action: The action performed (created, updated, deleted) 

36 user_id: Optional user_id to use as fallback for username 

37 """ 

38 try: 

39 from flask import session as flask_session 

40 from ..scheduler.background import get_background_job_scheduler 

41 

42 scheduler = get_background_job_scheduler() 

43 if scheduler.is_running: 

44 # Get username, with optional fallback to user_id 

45 username = flask_session.get("username") 

46 if not username and user_id: 

47 username = user_id 

48 

49 # Get password from session password store 

50 from ..database.session_passwords import session_password_store 

51 

52 session_id = flask_session.get("session_id") 

53 password = None 

54 if session_id and username: 54 ↛ 59line 54 didn't jump to line 59 because the condition on line 54 was always true

55 password = session_password_store.get_session_password( 

56 username, session_id 

57 ) 

58 

59 if password and username: 

60 # Update scheduler to reschedule subscriptions 

61 scheduler.update_user_info(username, password) 

62 logger.info( 

63 f"Scheduler notified about {action} subscription for {username}" 

64 ) 

65 else: 

66 logger.warning( 

67 f"Could not notify scheduler - no password available{' for ' + username if username else ''}" 

68 ) 

69 except Exception: 

70 logger.exception( 

71 f"Could not notify scheduler about {action} subscription" 

72 ) 

73 

74 

75def get_news_feed( 

76 user_id: str = "anonymous", 

77 limit: int = 20, 

78 use_cache: bool = True, 

79 focus: Optional[str] = None, 

80 search_strategy: Optional[str] = None, 

81 subscription_id: Optional[str] = None, 

82) -> Dict[str, Any]: 

83 """ 

84 Get personalized news feed by pulling from news_items table first, then research history. 

85 

86 Args: 

87 user_id: User identifier 

88 limit: Maximum number of cards to return 

89 use_cache: Whether to use cached news 

90 focus: Optional focus area for news 

91 search_strategy: Override default recommendation strategy 

92 

93 Returns: 

94 Dictionary with news items and metadata. Each item's ``findings`` 

95 field is the answer-only report content (post chat-mode-v2 refactor, 

96 #3665 Fix B); structured top-N source links are in the separate 

97 ``links`` array, not embedded in ``findings``. 

98 """ 

99 # Validate limit - allow any positive number 

100 if limit < 1: 

101 raise InvalidLimitException(limit) 

102 

103 try: 

104 logger.info( 

105 f"get_news_feed called with user_id={user_id}, limit={limit}" 

106 ) 

107 

108 # News is always enabled for now - per-user settings will be handled later 

109 # if not get_db_setting("news.enabled", True): 

110 # return {"error": "News system is disabled", "news_items": []} 

111 

112 # Import database functions 

113 from ..database.session_context import get_user_db_session 

114 from ..database.models import ResearchHistory 

115 

116 news_items = [] 

117 remaining_limit = limit 

118 

119 # Query research history from user's database for news items 

120 logger.info("Getting news items from research history") 

121 try: 

122 # Use the user_id provided to the function 

123 with get_user_db_session(user_id) as db_session: 

124 # Build query using ORM 

125 query = db_session.query(ResearchHistory).filter( 

126 ResearchHistory.status == ResearchStatus.COMPLETED 

127 ) 

128 

129 # Filter by subscription if provided 

130 if subscription_id and subscription_id != "all": 

131 # Use JSON containment for PostgreSQL or LIKE for SQLite. 

132 # Note: research_meta is serialized via json.dumps which 

133 # emits a space after the colon, so the LIKE pattern must 

134 # include that space too — otherwise the filter silently 

135 # matches zero rows. Mirrors the patterns used in 

136 # get_subscriptions and get_subscription_history below. 

137 query = query.filter( 

138 ResearchHistory.research_meta.like( 

139 f'%"subscription_id": "{subscription_id}"%' 

140 ) 

141 ) 

142 

143 # Order by creation date and limit 

144 results = ( 

145 query.order_by(ResearchHistory.created_at.desc()) 

146 .limit(remaining_limit * 2) 

147 .all() 

148 ) 

149 

150 # Convert ORM objects to dictionaries for compatibility 

151 results = [ 

152 { 

153 "id": r.id, 

154 "uuid_id": r.id, # In ResearchHistory, id is the UUID 

155 "query": r.query, 

156 "title": r.title 

157 if hasattr(r, "title") 

158 else None, # Include title field if exists 

159 # created_at is NOT NULL (set to isoformat() on every 

160 # insert), so it's always a usable timestamp string. 

161 "created_at": r.created_at, 

162 "completed_at": r.completed_at 

163 if r.completed_at 

164 else None, 

165 "duration_seconds": r.duration_seconds 

166 if hasattr(r, "duration_seconds") 

167 else None, 

168 "report_path": r.report_path 

169 if hasattr(r, "report_path") 

170 else None, 

171 "report_content": r.report_content 

172 if hasattr(r, "report_content") 

173 else None, # Include database content 

174 "research_meta": r.research_meta, 

175 "status": r.status, 

176 } 

177 for r in results 

178 ] 

179 

180 # Source links used to be parsed out of report_content via 

181 # regex over `URL:` lines (when report_content held the 

182 # inline ## Sources block). Now sources live in the 

183 # research_resources table — fetch top-N for every row in 

184 # ONE batched query (avoids N+1 in the loop below). 

185 from ..web.services.report_assembly_service import ( 

186 get_research_source_links_batch, 

187 ) 

188 

189 research_ids_for_links = [r["id"] for r in results] 

190 links_by_research_id = get_research_source_links_batch( 

191 research_ids_for_links, db_session, limit=3 

192 ) 

193 

194 logger.info(f"Database returned {len(results)} research items") 

195 if results and len(results) > 0: 

196 logger.info(f"First row keys: {list(results[0].keys())}") 

197 # Log first few items' metadata 

198 for i, row in enumerate(results[:3]): 

199 logger.info( 

200 f"Item {i}: query='{row['query'][:50]}...', has meta={bool(row.get('research_meta'))}" 

201 ) 

202 

203 # Process results to find news items 

204 processed_count = 0 

205 error_count = 0 

206 

207 for row in results: 

208 try: 

209 # Parse metadata 

210 metadata = {} 

211 if row.get("research_meta"): 

212 try: 

213 # Handle both dict and string formats 

214 if isinstance(row["research_meta"], dict): 

215 metadata = row["research_meta"] 

216 else: 

217 metadata = json.loads(row["research_meta"]) 

218 except (json.JSONDecodeError, TypeError): 

219 logger.exception("Error parsing metadata") 

220 metadata = {} 

221 

222 # Check if this has news metadata (generated_headline or generated_topics) 

223 # or if it's a news-related query 

224 has_news_metadata = ( 

225 metadata.get("generated_headline") is not None 

226 or metadata.get("generated_topics") is not None 

227 ) 

228 

229 query_lower = row["query"].lower() 

230 is_news_query = ( 

231 has_news_metadata 

232 or metadata.get("is_news_search") 

233 or metadata.get("search_type") == "news_analysis" 

234 or "breaking news" in query_lower 

235 or "news stories" in query_lower 

236 or ( 

237 "today" in query_lower 

238 and ( 

239 "news" in query_lower 

240 or "breaking" in query_lower 

241 ) 

242 ) 

243 or "latest news" in query_lower 

244 ) 

245 

246 # Log the decision for first few items 

247 if processed_count < 3 or error_count < 3: 247 ↛ 254line 247 didn't jump to line 254 because the condition on line 247 was always true

248 logger.info( 

249 f"Item check: query='{row['query'][:30]}...', is_news_search={metadata.get('is_news_search')}, " 

250 f"has_news_metadata={has_news_metadata}, is_news_query={is_news_query}" 

251 ) 

252 

253 # Only show items that have news metadata or are news queries 

254 if is_news_query: 

255 processed_count += 1 

256 logger.info( 

257 f"Processing research item #{processed_count}: {row['query'][:50]}..." 

258 ) 

259 

260 # Always use database content 

261 findings = "" 

262 summary = "" 

263 report_content_db = row.get( 

264 "report_content" 

265 ) # Get database content 

266 

267 # Use database content 

268 content = report_content_db 

269 if content: 

270 logger.debug( 

271 f"Using database content for research {row['id']}" 

272 ) 

273 

274 # Process database content 

275 lines = content.split("\n") if content else [] 

276 # `findings` is the answer-only report_content 

277 # after the chat-mode-v2 refactor (#3665 Fix B, 

278 # intentional). The legacy answer + ## Sources 

279 # blob is gone: structured top-N source URLs live 

280 # in the separate `links` array below, so snippet 

281 # extraction is cleaner without Sources headers in 

282 # the substrate. 

283 findings = content 

284 # Extract summary from first non-empty line 

285 for line in lines: 285 ↛ 295line 285 didn't jump to line 295 because the loop on line 285 didn't complete

286 if line.strip() and not line.startswith("#"): 

287 summary = line.strip() 

288 break 

289 else: 

290 logger.debug( 

291 f"No database content for research {row['id']}" 

292 ) 

293 

294 # Use stored headline/topics if available, otherwise generate 

295 original_query = row["query"] 

296 

297 # Check for headline - first try database title, then metadata 

298 headline = row.get("title") or metadata.get( 

299 "generated_headline" 

300 ) 

301 

302 # For subscription results, generate headline from query if needed 

303 if not headline and metadata.get("is_news_search"): 

304 # Use subscription name or query as headline 

305 subscription_name = metadata.get( 

306 "subscription_name" 

307 ) 

308 if subscription_name: 

309 headline = f"News Update: {subscription_name}" 

310 else: 

311 # Generate headline from query 

312 headline = f"News: {row['query'][:60]}..." 

313 

314 # Skip items without meaningful headlines or that are incomplete 

315 if ( 

316 not headline 

317 or headline == "[No headline available]" 

318 ): 

319 logger.debug( 

320 f"Skipping item without headline: {row['id']}" 

321 ) 

322 continue 

323 

324 # Skip items that are still in progress or suspended 

325 if row["status"] in ( 

326 ResearchStatus.IN_PROGRESS, 

327 ResearchStatus.SUSPENDED, 

328 ): 

329 logger.debug( 

330 f"Skipping incomplete item: {row['id']} (status: {row['status']})" 

331 ) 

332 continue 

333 

334 # Skip items without content (neither file nor database) 

335 if not content: 

336 logger.debug( 

337 f"Skipping item without content: {row['id']}" 

338 ) 

339 continue 

340 

341 # Use ID properly, preferring uuid_id 

342 research_id = row.get("uuid_id") or str(row["id"]) 

343 

344 # Use stored category and topics - no defaults 

345 category = metadata.get("category") 

346 if not category: 346 ↛ 349line 346 didn't jump to line 349 because the condition on line 346 was always true

347 category = "[Uncategorized]" 

348 

349 topics = metadata.get("generated_topics") 

350 if not topics: 350 ↛ 356line 350 didn't jump to line 356 because the condition on line 350 was always true

351 topics = ["[No topics]"] 

352 

353 # Top-N links pulled from research_resources via 

354 # the batch fetch above (no per-row DB query, no 

355 # text parsing of report_content). 

356 links = links_by_research_id.get(row["id"], []) 

357 

358 # Create news item from research 

359 news_item = { 

360 "id": f"news-{research_id}", 

361 "headline": headline, 

362 "category": category, 

363 "summary": summary 

364 or f"Research analysis for: {headline[:100]}", 

365 "findings": findings, 

366 "impact_score": metadata.get( 

367 "impact_score", 0 

368 ), # 0 indicates missing 

369 "time_ago": _format_time_ago(row["created_at"]), 

370 "upvotes": metadata.get("upvotes", 0), 

371 "downvotes": metadata.get("downvotes", 0), 

372 "source_url": f"/results/{research_id}", 

373 "topics": topics, # Use generated topics 

374 "links": links, # Add extracted links 

375 "research_id": research_id, 

376 "created_at": row["created_at"], 

377 "duration_seconds": row.get("duration_seconds", 0), 

378 "original_query": original_query, # Keep original query for reference 

379 "is_news": metadata.get( 

380 "is_news_search", False 

381 ), # Flag for news searches 

382 "news_date": metadata.get( 

383 "news_date" 

384 ), # If specific date for news 

385 "news_source": metadata.get( 

386 "news_source" 

387 ), # If from specific source 

388 "priority": metadata.get( 

389 "priority", "normal" 

390 ), # Priority level 

391 } 

392 

393 news_items.append(news_item) 

394 logger.info(f"Added news item: {headline[:50]}...") 

395 

396 if len(news_items) >= limit: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true

397 break 

398 

399 except Exception: 

400 error_count += 1 

401 logger.exception( 

402 f"Error processing research item with query: {row.get('query', 'UNKNOWN')[:100]}" 

403 ) 

404 continue 

405 

406 logger.info( 

407 f"Processing summary: total_results={len(results)}, processed={processed_count}, " 

408 f"errors={error_count}, added={len(news_items)}" 

409 ) 

410 

411 # Log subscription-specific items if we were filtering 

412 if subscription_id and subscription_id != "all": 

413 sub_items = [ 

414 item for item in news_items if item.get("is_news", False) 

415 ] 

416 logger.info( 

417 f"Subscription {subscription_id}: found {len(sub_items)} items" 

418 ) 

419 

420 except Exception as db_error: 

421 logger.exception(f"Database error in research history: {db_error}") 

422 raise DatabaseAccessException( 

423 "research_history_query", str(db_error) 

424 ) 

425 

426 # If no news items found, return empty list 

427 if not news_items: 

428 logger.info("No news items found, returning empty list") 

429 news_items = [] 

430 

431 logger.info(f"Returning {len(news_items)} news items to client") 

432 

433 # Determine the source 

434 source = ( 

435 "news_items" 

436 if any(item.get("is_news", False) for item in news_items) 

437 else "research_history" 

438 ) 

439 

440 return { 

441 "news_items": news_items[:limit], 

442 "generated_at": datetime.now(timezone.utc).isoformat(), 

443 "focus": focus, 

444 "search_strategy": search_strategy or "default", 

445 "total_items": len(news_items), 

446 "source": source, 

447 } 

448 

449 except NewsAPIException: 

450 # Re-raise our custom exceptions 

451 raise 

452 except Exception as e: 

453 logger.exception("Error getting news feed") 

454 raise NewsFeedGenerationException(str(e), user_id=user_id) 

455 

456 

457def get_subscription_history( 

458 subscription_id: str, limit: int = 20 

459) -> Dict[str, Any]: 

460 """ 

461 Get research history for a specific subscription. 

462 

463 Args: 

464 subscription_id: The subscription UUID 

465 limit: Maximum number of history items to return 

466 

467 Returns: 

468 Dict containing subscription info and its research history 

469 """ 

470 try: 

471 from ..database.session_context import get_user_db_session 

472 from ..database.models import ResearchHistory 

473 from ..database.models.news import NewsSubscription 

474 

475 # Get subscription details using ORM from user's encrypted database 

476 with get_user_db_session() as session: 

477 subscription = ( 

478 session.query(NewsSubscription) 

479 .filter_by(id=subscription_id) 

480 .first() 

481 ) 

482 

483 if not subscription: 

484 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException 

485 

486 # Convert to dict for response 

487 subscription_dict = { 

488 "id": subscription.id, 

489 "query_or_topic": subscription.query_or_topic, 

490 "subscription_type": subscription.subscription_type, 

491 "refresh_interval_minutes": subscription.refresh_interval_minutes, 

492 "refresh_count": subscription.refresh_count or 0, 

493 "created_at": subscription.created_at.isoformat() 

494 if subscription.created_at 

495 else None, 

496 "next_refresh": subscription.next_refresh.isoformat() 

497 if subscription.next_refresh 

498 else None, 

499 } 

500 

501 # Now get research history from the research database. The 

502 # NewsSubscription model has no user_id column — this codebase uses 

503 # per-user encrypted databases, so "the subscription's user" is just 

504 # whichever user's DB we found the subscription in. Reuse the Flask 

505 # session username (same source the first get_user_db_session() 

506 # call resolved). The previous version of this code did 

507 # ``subscription_dict.get("user_id", "anonymous")`` against a dict 

508 # that never carried a "user_id" key, so it always opened the 

509 # "anonymous" user's database and silently returned an empty 

510 # history for every real multi-user deployment. 

511 with get_user_db_session() as db_session: 

512 # Get all research runs that were triggered by this subscription 

513 # Look for subscription_id in the research_meta JSON 

514 # Note: JSON format has space after colon 

515 like_pattern = f'%"subscription_id": "{subscription_id}"%' 

516 logger.info( 

517 f"Searching for research history with pattern: {like_pattern}" 

518 ) 

519 

520 history_items = ( 

521 db_session.query(ResearchHistory) 

522 .filter(ResearchHistory.research_meta.like(like_pattern)) 

523 .order_by(ResearchHistory.created_at.desc()) 

524 .limit(limit) 

525 .all() 

526 ) 

527 

528 # Convert to dict format for compatibility. 

529 # ResearchHistory.id is the UUID PK (see comment on line 151); 

530 # there is no separate uuid_id column, so populate both keys 

531 # from h.id to preserve the downstream contract used by the 

532 # processed_item['research_id']/url builders below. 

533 history_items = [ 

534 { 

535 "id": h.id, 

536 "uuid_id": h.id, 

537 "query": h.query, 

538 "status": h.status, 

539 "created_at": h.created_at.isoformat() 

540 if h.created_at 

541 else None, 

542 "completed_at": h.completed_at.isoformat() 

543 if h.completed_at 

544 else None, 

545 "duration_seconds": h.duration_seconds, 

546 "research_meta": h.research_meta, 

547 "report_path": h.report_path, 

548 } 

549 for h in history_items 

550 ] 

551 

552 # Process history items 

553 processed_history = [] 

554 for item in history_items: 

555 processed_item = { 

556 "research_id": item.get("uuid_id") or str(item.get("id")), 

557 "query": item["query"], 

558 "status": item["status"], 

559 "created_at": item["created_at"], 

560 "completed_at": item.get("completed_at"), 

561 "duration_seconds": item.get("duration_seconds", 0), 

562 "url": f"/progress/{item.get('uuid_id') or item.get('id')}", 

563 } 

564 

565 # Parse metadata if available to get headline and topics 

566 if item.get("research_meta"): 566 ↛ 581line 566 didn't jump to line 581 because the condition on line 566 was always true

567 try: 

568 meta = json.loads(item["research_meta"]) 

569 processed_item["triggered_by"] = meta.get( 

570 "triggered_by", "subscription" 

571 ) 

572 # Add headline and topics from metadata 

573 processed_item["headline"] = meta.get( 

574 "generated_headline", "[No headline]" 

575 ) 

576 processed_item["topics"] = meta.get("generated_topics", []) 

577 except Exception: 

578 processed_item["headline"] = "[No headline]" 

579 processed_item["topics"] = [] 

580 else: 

581 processed_item["headline"] = "[No headline]" 

582 processed_item["topics"] = [] 

583 

584 processed_history.append(processed_item) 

585 

586 return { 

587 "subscription": subscription_dict, 

588 "history": processed_history, 

589 "total_runs": len(processed_history), 

590 } 

591 

592 except NewsAPIException: 

593 # Re-raise our custom exceptions 

594 raise 

595 except Exception as e: 

596 logger.exception("Error getting subscription history") 

597 raise DatabaseAccessException("get_subscription_history", str(e)) 

598 

599 

600def _format_time_ago(timestamp: str) -> str: 

601 """Format timestamp as 'X hours ago' string. 

602 

603 Raises on unparseable input instead of masking it with a neutral label. 

604 ResearchHistory.created_at is a NOT NULL column written as 

605 datetime.now(UTC).isoformat() on every insert path, so a value that won't 

606 parse means the row is corrupt — not a routine edge case. The only caller 

607 (the per-row loop in get_news_feed) already wraps each row in a 

608 try/except that logs the failure and skips the row, so a bad timestamp 

609 surfaces in the logs and drops that one card rather than rendering it with 

610 a misleading "Recently". 

611 """ 

612 from dateutil import parser 

613 

614 dt = parser.parse(timestamp) 

615 

616 # If dt is naive, assume it's in UTC 

617 if dt.tzinfo is None: 

618 dt = dt.replace(tzinfo=timezone.utc) 

619 

620 now = datetime.now(timezone.utc) 

621 diff = now - dt 

622 

623 if diff.days > 0: 

624 return f"{diff.days} day{'s' if diff.days > 1 else ''} ago" 

625 if diff.seconds > 3600: 

626 hours = diff.seconds // 3600 

627 return f"{hours} hour{'s' if hours > 1 else ''} ago" 

628 if diff.seconds > 60: 

629 minutes = diff.seconds // 60 

630 return f"{minutes} minute{'s' if minutes > 1 else ''} ago" 

631 return "Just now" 

632 

633 

634def get_subscription(subscription_id: str) -> Optional[Dict[str, Any]]: 

635 """ 

636 Get a single subscription by ID. 

637 

638 Args: 

639 subscription_id: Subscription identifier 

640 

641 Returns: 

642 Dictionary with subscription data or None if not found 

643 """ 

644 try: 

645 # Get subscription directly from user's encrypted database 

646 from ..database.session_context import get_user_db_session 

647 from ..database.models.news import NewsSubscription 

648 

649 with get_user_db_session() as db_session: 

650 subscription = ( 

651 db_session.query(NewsSubscription) 

652 .filter_by(id=subscription_id) 

653 .first() 

654 ) 

655 

656 if not subscription: 

657 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException 

658 

659 # Convert to API format matching the template expectations 

660 return { 

661 "id": subscription.id, 

662 "name": subscription.name or "", 

663 "query_or_topic": subscription.query_or_topic, 

664 "subscription_type": subscription.subscription_type, 

665 "refresh_interval_minutes": subscription.refresh_interval_minutes, 

666 "is_active": subscription.status == "active", 

667 "status": subscription.status, 

668 "folder_id": subscription.folder_id, 

669 "model_provider": subscription.model_provider, 

670 "model": subscription.model, 

671 "search_strategy": subscription.search_strategy, 

672 "custom_endpoint": subscription.custom_endpoint, 

673 "search_engine": subscription.search_engine, 

674 "search_iterations": subscription.search_iterations or 3, 

675 "questions_per_iteration": subscription.questions_per_iteration 

676 or 5, 

677 "created_at": subscription.created_at.isoformat() 

678 if subscription.created_at 

679 else None, 

680 "updated_at": subscription.updated_at.isoformat() 

681 if subscription.updated_at 

682 else None, 

683 } 

684 

685 except NewsAPIException: 

686 # Re-raise our custom exceptions 

687 raise 

688 except Exception as e: 

689 logger.exception(f"Error getting subscription {subscription_id}") 

690 raise DatabaseAccessException("get_subscription", str(e)) 

691 

692 

693def get_subscriptions(user_id: str) -> Dict[str, Any]: 

694 """ 

695 Get all subscriptions for a user. 

696 

697 Args: 

698 user_id: User identifier 

699 

700 Returns: 

701 Dictionary with subscriptions list 

702 """ 

703 try: 

704 # Get subscriptions directly from user's encrypted database 

705 from ..database.session_context import get_user_db_session 

706 from ..database.models import ResearchHistory 

707 from ..database.models.news import NewsSubscription 

708 from sqlalchemy import func 

709 

710 sub_list = [] 

711 

712 with get_user_db_session(user_id) as db_session: 

713 # Query all subscriptions for this user 

714 subscriptions = db_session.query(NewsSubscription).all() 

715 

716 for sub in subscriptions: 

717 # Count actual research runs for this subscription 

718 like_pattern = f'%"subscription_id": "{sub.id}"%' 

719 total_runs = ( 

720 db_session.query(func.count(ResearchHistory.id)) 

721 .filter(ResearchHistory.research_meta.like(like_pattern)) 

722 .scalar() 

723 or 0 

724 ) 

725 

726 # Convert ORM object to API format 

727 sub_dict = { 

728 "id": sub.id, 

729 "query": sub.query_or_topic, 

730 "type": sub.subscription_type, 

731 "refresh_minutes": sub.refresh_interval_minutes, 

732 "created_at": sub.created_at.isoformat() 

733 if sub.created_at 

734 else None, 

735 "next_refresh": sub.next_refresh.isoformat() 

736 if sub.next_refresh 

737 else None, 

738 "last_refreshed": sub.last_refresh.isoformat() 

739 if sub.last_refresh 

740 else None, 

741 "is_active": sub.status == "active", 

742 "total_runs": total_runs, # Use actual count from research_history 

743 "name": sub.name or "", 

744 "folder_id": sub.folder_id, 

745 } 

746 sub_list.append(sub_dict) 

747 

748 return {"subscriptions": sub_list, "total": len(sub_list)} 

749 

750 except Exception as e: 

751 logger.exception("Error getting subscriptions") 

752 raise DatabaseAccessException("get_subscriptions", str(e)) 

753 

754 

755def update_subscription( 

756 subscription_id: str, data: Dict[str, Any] 

757) -> Dict[str, Any]: 

758 """ 

759 Update an existing subscription. 

760 

761 Args: 

762 subscription_id: Subscription identifier 

763 data: Dictionary with fields to update 

764 

765 Returns: 

766 Dictionary with updated subscription data 

767 """ 

768 try: 

769 from ..database.session_context import get_user_db_session 

770 from ..database.models.news import NewsSubscription 

771 from datetime import datetime, timedelta 

772 

773 with get_user_db_session() as db_session: 

774 # Get existing subscription 

775 subscription = ( 

776 db_session.query(NewsSubscription) 

777 .filter_by(id=subscription_id) 

778 .first() 

779 ) 

780 if not subscription: 

781 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException 

782 

783 # Update fields 

784 if "name" in data: 

785 subscription.name = data["name"] 

786 if "query_or_topic" in data: 

787 subscription.query_or_topic = data["query_or_topic"] 

788 if "subscription_type" in data: 788 ↛ 789line 788 didn't jump to line 789 because the condition on line 788 was never true

789 subscription.subscription_type = data["subscription_type"] 

790 if "refresh_interval_minutes" in data: 

791 old_interval = subscription.refresh_interval_minutes 

792 subscription.refresh_interval_minutes = data[ 

793 "refresh_interval_minutes" 

794 ] 

795 # Recalculate next_refresh if interval changed 

796 if old_interval != subscription.refresh_interval_minutes: 796 ↛ 800line 796 didn't jump to line 800 because the condition on line 796 was always true

797 subscription.next_refresh = datetime.now(UTC) + timedelta( 

798 minutes=subscription.refresh_interval_minutes 

799 ) 

800 if "is_active" in data: 

801 subscription.status = ( 

802 "active" if data["is_active"] else "paused" 

803 ) 

804 if "status" in data: 804 ↛ 805line 804 didn't jump to line 805 because the condition on line 804 was never true

805 subscription.status = data["status"] 

806 if "folder_id" in data: 

807 subscription.folder_id = data["folder_id"] 

808 if "model_provider" in data: 

809 subscription.model_provider = normalize_provider( 

810 data["model_provider"] 

811 ) 

812 if "model" in data: 

813 subscription.model = data["model"] 

814 if "search_strategy" in data: 814 ↛ 815line 814 didn't jump to line 815 because the condition on line 814 was never true

815 subscription.search_strategy = data["search_strategy"] 

816 if "custom_endpoint" in data: 816 ↛ 817line 816 didn't jump to line 817 because the condition on line 816 was never true

817 subscription.custom_endpoint = data["custom_endpoint"] 

818 if "search_engine" in data: 

819 subscription.search_engine = data["search_engine"] 

820 if "search_iterations" in data: 

821 subscription.search_iterations = data["search_iterations"] 

822 if "questions_per_iteration" in data: 

823 subscription.questions_per_iteration = data[ 

824 "questions_per_iteration" 

825 ] 

826 

827 # Update timestamp 

828 subscription.updated_at = datetime.now(UTC) 

829 

830 # Commit changes 

831 db_session.commit() 

832 

833 # Notify scheduler about updated subscription 

834 _notify_scheduler_about_subscription_change("updated") 

835 

836 # Convert to API format 

837 return { 

838 "status": "success", 

839 "subscription": { 

840 "id": subscription.id, 

841 "name": subscription.name or "", 

842 "query_or_topic": subscription.query_or_topic, 

843 "subscription_type": subscription.subscription_type, 

844 "refresh_interval_minutes": subscription.refresh_interval_minutes, 

845 "is_active": subscription.status == "active", 

846 "status": subscription.status, 

847 "folder_id": subscription.folder_id, 

848 "model_provider": subscription.model_provider, 

849 "model": subscription.model, 

850 "search_strategy": subscription.search_strategy, 

851 "custom_endpoint": subscription.custom_endpoint, 

852 "search_engine": subscription.search_engine, 

853 "search_iterations": subscription.search_iterations or 3, 

854 "questions_per_iteration": subscription.questions_per_iteration 

855 or 5, 

856 }, 

857 } 

858 

859 except NewsAPIException: 

860 # Re-raise our custom exceptions 

861 raise 

862 except Exception as e: 

863 logger.exception("Error updating subscription") 

864 raise SubscriptionUpdateException(subscription_id, str(e)) 

865 

866 

867def create_subscription( 

868 user_id: str, 

869 query: str, 

870 subscription_type: str = "search", 

871 refresh_minutes: Optional[int] = None, 

872 source_research_id: Optional[str] = None, 

873 model_provider: Optional[str] = None, 

874 model: Optional[str] = None, 

875 search_strategy: Optional[str] = None, 

876 custom_endpoint: Optional[str] = None, 

877 name: Optional[str] = None, 

878 folder_id: Optional[str] = None, 

879 is_active: bool = True, 

880 search_engine: Optional[str] = None, 

881 search_iterations: Optional[int] = None, 

882 questions_per_iteration: Optional[int] = None, 

883) -> Dict[str, Any]: 

884 """ 

885 Create a new subscription for user. 

886 

887 Args: 

888 user_id: User identifier 

889 query: Search query or topic 

890 subscription_type: "search" or "topic" 

891 refresh_minutes: Refresh interval in minutes 

892 

893 Returns: 

894 Dictionary with subscription details 

895 """ 

896 try: 

897 from ..database.session_context import get_user_db_session 

898 from ..database.models.news import NewsSubscription 

899 from datetime import datetime, timedelta 

900 import uuid 

901 

902 # Get default refresh interval from settings if not provided 

903 # NOTE: This API function accesses the settings DB for convenience when used 

904 # within the Flask application context. For programmatic API access outside 

905 # the web context, callers should provide refresh_minutes explicitly to avoid 

906 # dependency on the settings database being initialized. 

907 

908 with get_user_db_session(user_id) as db_session: 

909 if refresh_minutes is None: 

910 try: 

911 from ..utilities.db_utils import get_settings_manager 

912 

913 settings_manager = get_settings_manager(db_session, user_id) 

914 refresh_minutes = settings_manager.get_setting( 

915 "news.subscription.refresh_minutes", 240 

916 ) 

917 except (ImportError, AttributeError, TypeError): 

918 # Fallback for when settings DB is not available (e.g., programmatic API usage) 

919 logger.debug( 

920 "Settings manager not available, using default refresh_minutes" 

921 ) 

922 refresh_minutes = 240 # Default to 4 hours 

923 # Create new subscription 

924 subscription = NewsSubscription( 

925 id=str(uuid.uuid4()), 

926 name=name, 

927 query_or_topic=query, 

928 subscription_type=subscription_type, 

929 refresh_interval_minutes=refresh_minutes, 

930 status="active" if is_active else "paused", 

931 model_provider=normalize_provider(model_provider), 

932 model=model, 

933 search_strategy=search_strategy or "news_aggregation", 

934 custom_endpoint=custom_endpoint, 

935 folder_id=folder_id, 

936 search_engine=search_engine, 

937 search_iterations=search_iterations, 

938 questions_per_iteration=questions_per_iteration, 

939 created_at=datetime.now(UTC), 

940 updated_at=datetime.now(UTC), 

941 last_refresh=None, 

942 next_refresh=datetime.now(UTC) 

943 + timedelta(minutes=refresh_minutes), 

944 source_id=source_research_id, 

945 ) 

946 

947 # Add to database 

948 db_session.add(subscription) 

949 db_session.commit() 

950 

951 # Notify scheduler about new subscription 

952 _notify_scheduler_about_subscription_change("created", user_id) 

953 

954 return { 

955 "status": "success", 

956 "subscription_id": subscription.id, 

957 "type": subscription_type, 

958 "query": query, 

959 "refresh_minutes": refresh_minutes, 

960 } 

961 

962 except Exception as e: 

963 logger.exception("Error creating subscription") 

964 raise SubscriptionCreationException( 

965 str(e), {"query": query, "type": subscription_type} 

966 ) 

967 

968 

969def delete_subscription(subscription_id: str) -> Dict[str, Any]: 

970 """ 

971 Delete a subscription. 

972 

973 Args: 

974 subscription_id: ID of subscription to delete 

975 

976 Returns: 

977 Dictionary with status 

978 """ 

979 try: 

980 from ..database.session_context import get_user_db_session 

981 from ..database.models.news import NewsSubscription 

982 

983 with get_user_db_session() as db_session: 

984 subscription = ( 

985 db_session.query(NewsSubscription) 

986 .filter_by(id=subscription_id) 

987 .first() 

988 ) 

989 if subscription: 

990 db_session.delete(subscription) 

991 db_session.commit() 

992 

993 # Notify scheduler about deleted subscription 

994 _notify_scheduler_about_subscription_change("deleted") 

995 

996 return {"status": "success", "deleted": subscription_id} 

997 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException 

998 except NewsAPIException: 

999 # Re-raise our custom exceptions 

1000 raise 

1001 except Exception as e: 

1002 logger.exception("Error deleting subscription") 

1003 raise SubscriptionDeletionException(subscription_id, str(e)) 

1004 

1005 

1006def get_votes_for_cards(card_ids: list, user_id: str) -> Dict[str, Any]: 

1007 """ 

1008 Get vote counts and user's votes for multiple news cards. 

1009 

1010 Args: 

1011 card_ids: List of card IDs to get votes for 

1012 user_id: User identifier (not used - per-user database) 

1013 

1014 Returns: 

1015 Dictionary with vote information for each card 

1016 """ 

1017 from flask import session as flask_session, has_request_context 

1018 from ..database.models.news import UserRating, RatingType 

1019 from ..database.session_context import get_user_db_session 

1020 

1021 # Resolve username before try block 

1022 if not has_request_context(): 1022 ↛ 1029line 1022 didn't jump to line 1029 because the condition on line 1022 was always true

1023 # If called outside of request context (e.g., in tests), use user_id directly 

1024 username = user_id if user_id else None 

1025 if not username: 

1026 raise ValueError("No username provided and no request context") 

1027 else: 

1028 # Get username from session 

1029 username = flask_session.get("username") 

1030 if not username: 

1031 raise ValueError("No username in session") 

1032 

1033 try: 

1034 # Get database session 

1035 with get_user_db_session(username) as db: 

1036 results = {} 

1037 

1038 for card_id in card_ids: 

1039 # Get user's vote for this card 

1040 user_vote = ( 

1041 db.query(UserRating) 

1042 .filter_by( 

1043 card_id=card_id, rating_type=RatingType.RELEVANCE 

1044 ) 

1045 .first() 

1046 ) 

1047 

1048 # Count total votes for this card 

1049 upvotes = ( 

1050 db.query(UserRating) 

1051 .filter_by( 

1052 card_id=card_id, 

1053 rating_type=RatingType.RELEVANCE, 

1054 rating_value="up", 

1055 ) 

1056 .count() 

1057 ) 

1058 

1059 downvotes = ( 

1060 db.query(UserRating) 

1061 .filter_by( 

1062 card_id=card_id, 

1063 rating_type=RatingType.RELEVANCE, 

1064 rating_value="down", 

1065 ) 

1066 .count() 

1067 ) 

1068 

1069 results[card_id] = { 

1070 "upvotes": upvotes, 

1071 "downvotes": downvotes, 

1072 "user_vote": user_vote.rating_value if user_vote else None, 

1073 } 

1074 

1075 return {"success": True, "votes": results} 

1076 

1077 except Exception: 

1078 logger.exception("Error getting votes for cards") 

1079 raise 

1080 

1081 

1082def submit_feedback(card_id: str, user_id: str, vote: str) -> Dict[str, Any]: 

1083 """ 

1084 Submit feedback (vote) for a news card. 

1085 

1086 Args: 

1087 card_id: ID of the news card 

1088 user_id: User identifier (not used - per-user database) 

1089 vote: "up" or "down" 

1090 

1091 Returns: 

1092 Dictionary with updated vote counts 

1093 """ 

1094 from flask import session as flask_session, has_request_context 

1095 from sqlalchemy_utc import utcnow 

1096 from ..database.models.news import UserRating, RatingType 

1097 from ..database.session_context import get_user_db_session 

1098 

1099 # Validate vote value 

1100 if vote not in ["up", "down"]: 

1101 raise ValueError(f"Invalid vote type: {vote}") 

1102 

1103 # Resolve username before try block 

1104 if not has_request_context(): 1104 ↛ 1111line 1104 didn't jump to line 1111 because the condition on line 1104 was always true

1105 # If called outside of request context (e.g., in tests), use user_id directly 

1106 username = user_id if user_id else None 

1107 if not username: 

1108 raise ValueError("No username provided and no request context") 

1109 else: 

1110 # Get username from session 

1111 username = flask_session.get("username") 

1112 if not username: 

1113 raise ValueError("No username in session") 

1114 

1115 try: 

1116 # Get database session 

1117 with get_user_db_session(username) as db: 

1118 # We don't check if the card exists in the database since news items 

1119 # are generated dynamically and may not be stored as NewsCard entries 

1120 

1121 # Check if user already voted on this card 

1122 existing_rating = ( 

1123 db.query(UserRating) 

1124 .filter_by(card_id=card_id, rating_type=RatingType.RELEVANCE) 

1125 .first() 

1126 ) 

1127 

1128 if existing_rating: 

1129 # Update existing vote 

1130 existing_rating.rating_value = vote 

1131 existing_rating.created_at = utcnow() 

1132 else: 

1133 # Create new rating 

1134 new_rating = UserRating( 

1135 card_id=card_id, 

1136 rating_type=RatingType.RELEVANCE, 

1137 rating_value=vote, 

1138 ) 

1139 db.add(new_rating) 

1140 

1141 db.commit() 

1142 

1143 # Count total votes for this card 

1144 upvotes = ( 

1145 db.query(UserRating) 

1146 .filter_by( 

1147 card_id=card_id, 

1148 rating_type=RatingType.RELEVANCE, 

1149 rating_value="up", 

1150 ) 

1151 .count() 

1152 ) 

1153 

1154 downvotes = ( 

1155 db.query(UserRating) 

1156 .filter_by( 

1157 card_id=card_id, 

1158 rating_type=RatingType.RELEVANCE, 

1159 rating_value="down", 

1160 ) 

1161 .count() 

1162 ) 

1163 

1164 logger.info( 

1165 f"Feedback submitted for card {card_id}: {vote} (up: {upvotes}, down: {downvotes})" 

1166 ) 

1167 

1168 return { 

1169 "success": True, 

1170 "card_id": card_id, 

1171 "vote": vote, 

1172 "upvotes": upvotes, 

1173 "downvotes": downvotes, 

1174 } 

1175 

1176 except Exception: 

1177 logger.exception(f"Error submitting feedback for card {card_id}") 

1178 raise 

1179 

1180 

1181def research_news_item(card_id: str, depth: str = "quick") -> Dict[str, Any]: 

1182 """ 

1183 Perform deeper research on a news item. 

1184 

1185 Args: 

1186 card_id: ID of the news card to research 

1187 depth: Research depth - "quick", "detailed", or "report" 

1188 

1189 Returns: 

1190 Dictionary with research results 

1191 """ 

1192 # TODO: Implement with per-user database for cards 

1193 logger.warning( 

1194 "research_news_item not yet implemented with per-user databases" 

1195 ) 

1196 raise NotImplementedException("research_news_item") 

1197 

1198 

1199def save_news_preferences( 

1200 user_id: str, preferences: Dict[str, Any] 

1201) -> Dict[str, Any]: 

1202 """ 

1203 Save user preferences for news. 

1204 

1205 Args: 

1206 user_id: User identifier 

1207 preferences: Dictionary of preferences to save 

1208 

1209 Returns: 

1210 Dictionary with status and message 

1211 """ 

1212 # TODO: Implement with per-user database for preferences 

1213 logger.warning( 

1214 "save_news_preferences not yet implemented with per-user databases" 

1215 ) 

1216 raise NotImplementedException("save_news_preferences") 

1217 

1218 

1219def get_news_categories() -> Dict[str, Any]: 

1220 """ 

1221 Get available news categories with counts. 

1222 

1223 Returns: 

1224 Dictionary with categories and statistics 

1225 """ 

1226 # TODO: Implement with per-user database for categories 

1227 logger.warning( 

1228 "get_news_categories not yet implemented with per-user databases" 

1229 ) 

1230 raise NotImplementedException("get_news_categories")