Coverage for src / local_deep_research / news / api.py: 87%
404 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Direct API functions for news system.
3These functions can be called directly by scheduler or wrapped by Flask endpoints.
4"""
6from typing import Dict, Any, Optional
7from datetime import datetime, timezone, UTC
8from loguru import logger
9import re
10import json
12from ..constants import ResearchStatus
13from ..llm.providers.base import normalize_provider
14from .recommender.topic_based import TopicBasedRecommender
15from .exceptions import (
16 InvalidLimitException,
17 SubscriptionNotFoundException,
18 SubscriptionCreationException,
19 SubscriptionUpdateException,
20 SubscriptionDeletionException,
21 DatabaseAccessException,
22 NewsFeedGenerationException,
23 NotImplementedException,
24 NewsAPIException,
25)
26# Removed welcome feed import - no placeholders
27# get_db_setting not available in merged codebase
30# Global recommender instance (can be reused)
31_recommender = None
34def get_recommender():
35 """Get or create recommender instance"""
36 global _recommender
37 if _recommender is None:
38 _recommender = TopicBasedRecommender()
39 return _recommender
42def _notify_scheduler_about_subscription_change(
43 action: str, user_id: Optional[str] = None
44):
45 """
46 Notify the scheduler about subscription changes.
48 Args:
49 action: The action performed (created, updated, deleted)
50 user_id: Optional user_id to use as fallback for username
51 """
52 try:
53 from flask import session as flask_session
54 from .subscription_manager.scheduler import get_news_scheduler
56 scheduler = get_news_scheduler()
57 if scheduler.is_running:
58 # Get username, with optional fallback to user_id
59 username = flask_session.get("username")
60 if not username and user_id:
61 username = user_id
63 # Get password from session password store
64 from ..database.session_passwords import session_password_store
66 session_id = flask_session.get("session_id")
67 password = None
68 if session_id and username: 68 ↛ 73line 68 didn't jump to line 73 because the condition on line 68 was always true
69 password = session_password_store.get_session_password(
70 username, session_id
71 )
73 if password and username:
74 # Update scheduler to reschedule subscriptions
75 scheduler.update_user_info(username, password)
76 logger.info(
77 f"Scheduler notified about {action} subscription for {username}"
78 )
79 else:
80 logger.warning(
81 f"Could not notify scheduler - no password available{' for ' + username if username else ''}"
82 )
83 except Exception:
84 logger.exception(
85 f"Could not notify scheduler about {action} subscription"
86 )
89def get_news_feed(
90 user_id: str = "anonymous",
91 limit: int = 20,
92 use_cache: bool = True,
93 focus: Optional[str] = None,
94 search_strategy: Optional[str] = None,
95 subscription_id: Optional[str] = None,
96) -> Dict[str, Any]:
97 """
98 Get personalized news feed by pulling from news_items table first, then research history.
100 Args:
101 user_id: User identifier
102 limit: Maximum number of cards to return
103 use_cache: Whether to use cached news
104 focus: Optional focus area for news
105 search_strategy: Override default recommendation strategy
107 Returns:
108 Dictionary with news items and metadata
109 """
110 # Validate limit - allow any positive number
111 if limit < 1:
112 raise InvalidLimitException(limit)
114 try:
115 logger.info(
116 f"get_news_feed called with user_id={user_id}, limit={limit}"
117 )
119 # News is always enabled for now - per-user settings will be handled later
120 # if not get_db_setting("news.enabled", True):
121 # return {"error": "News system is disabled", "news_items": []}
123 # Import database functions
124 from ..database.session_context import get_user_db_session
125 from ..database.models import ResearchHistory
127 news_items = []
128 remaining_limit = limit
130 # Query research history from user's database for news items
131 logger.info("Getting news items from research history")
132 try:
133 # Use the user_id provided to the function
134 with get_user_db_session(user_id) as db_session:
135 # Build query using ORM
136 query = db_session.query(ResearchHistory).filter(
137 ResearchHistory.status == ResearchStatus.COMPLETED
138 )
140 # Filter by subscription if provided
141 if subscription_id and subscription_id != "all":
142 # Use JSON containment for PostgreSQL or LIKE for SQLite
143 query = query.filter(
144 ResearchHistory.research_meta.like(
145 f'%"subscription_id":"{subscription_id}"%'
146 )
147 )
149 # Order by creation date and limit
150 results = (
151 query.order_by(ResearchHistory.created_at.desc())
152 .limit(remaining_limit * 2)
153 .all()
154 )
156 # Convert ORM objects to dictionaries for compatibility
157 results = [
158 {
159 "id": r.id,
160 "uuid_id": r.id, # In ResearchHistory, id is the UUID
161 "query": r.query,
162 "title": r.title
163 if hasattr(r, "title")
164 else None, # Include title field if exists
165 "created_at": r.created_at if r.created_at else None,
166 "completed_at": r.completed_at
167 if r.completed_at
168 else None,
169 "duration_seconds": r.duration_seconds
170 if hasattr(r, "duration_seconds")
171 else None,
172 "report_path": r.report_path
173 if hasattr(r, "report_path")
174 else None,
175 "report_content": r.report_content
176 if hasattr(r, "report_content")
177 else None, # Include database content
178 "research_meta": r.research_meta,
179 "status": r.status,
180 }
181 for r in results
182 ]
184 logger.info(f"Database returned {len(results)} research items")
185 if results and len(results) > 0:
186 logger.info(f"First row keys: {list(results[0].keys())}")
187 # Log first few items' metadata
188 for i, row in enumerate(results[:3]):
189 logger.info(
190 f"Item {i}: query='{row['query'][:50]}...', has meta={bool(row.get('research_meta'))}"
191 )
193 # Process results to find news items
194 processed_count = 0
195 error_count = 0
197 for row in results:
198 try:
199 # Parse metadata
200 metadata = {}
201 if row.get("research_meta"):
202 try:
203 # Handle both dict and string formats
204 if isinstance(row["research_meta"], dict):
205 metadata = row["research_meta"]
206 else:
207 metadata = json.loads(row["research_meta"])
208 except (json.JSONDecodeError, TypeError):
209 logger.exception("Error parsing metadata")
210 metadata = {}
212 # Check if this has news metadata (generated_headline or generated_topics)
213 # or if it's a news-related query
214 has_news_metadata = (
215 metadata.get("generated_headline") is not None
216 or metadata.get("generated_topics") is not None
217 )
219 query_lower = row["query"].lower()
220 is_news_query = (
221 has_news_metadata
222 or metadata.get("is_news_search")
223 or metadata.get("search_type") == "news_analysis"
224 or "breaking news" in query_lower
225 or "news stories" in query_lower
226 or (
227 "today" in query_lower
228 and (
229 "news" in query_lower
230 or "breaking" in query_lower
231 )
232 )
233 or "latest news" in query_lower
234 )
236 # Log the decision for first few items
237 if processed_count < 3 or error_count < 3: 237 ↛ 244line 237 didn't jump to line 244 because the condition on line 237 was always true
238 logger.info(
239 f"Item check: query='{row['query'][:30]}...', is_news_search={metadata.get('is_news_search')}, "
240 f"has_news_metadata={has_news_metadata}, is_news_query={is_news_query}"
241 )
243 # Only show items that have news metadata or are news queries
244 if is_news_query:
245 processed_count += 1
246 logger.info(
247 f"Processing research item #{processed_count}: {row['query'][:50]}..."
248 )
250 # Always use database content
251 findings = ""
252 summary = ""
253 report_content_db = row.get(
254 "report_content"
255 ) # Get database content
257 # Use database content
258 content = report_content_db
259 if content:
260 logger.debug(
261 f"Using database content for research {row['id']}"
262 )
264 # Process database content
265 lines = content.split("\n") if content else []
266 # Use full content as findings
267 findings = content
268 # Extract summary from first non-empty line
269 for line in lines: 269 ↛ 279line 269 didn't jump to line 279 because the loop on line 269 didn't complete
270 if line.strip() and not line.startswith("#"):
271 summary = line.strip()
272 break
273 else:
274 logger.debug(
275 f"No database content for research {row['id']}"
276 )
278 # Use stored headline/topics if available, otherwise generate
279 original_query = row["query"]
281 # Check for headline - first try database title, then metadata
282 headline = row.get("title") or metadata.get(
283 "generated_headline"
284 )
286 # For subscription results, generate headline from query if needed
287 if not headline and metadata.get("is_news_search"):
288 # Use subscription name or query as headline
289 subscription_name = metadata.get(
290 "subscription_name"
291 )
292 if subscription_name:
293 headline = f"News Update: {subscription_name}"
294 else:
295 # Generate headline from query
296 headline = f"News: {row['query'][:60]}..."
298 # Skip items without meaningful headlines or that are incomplete
299 if (
300 not headline
301 or headline == "[No headline available]"
302 ):
303 logger.debug(
304 f"Skipping item without headline: {row['id']}"
305 )
306 continue
308 # Skip items that are still in progress or suspended
309 if row["status"] in (
310 ResearchStatus.IN_PROGRESS,
311 ResearchStatus.SUSPENDED,
312 ):
313 logger.debug(
314 f"Skipping incomplete item: {row['id']} (status: {row['status']})"
315 )
316 continue
318 # Skip items without content (neither file nor database)
319 if not content:
320 logger.debug(
321 f"Skipping item without content: {row['id']}"
322 )
323 continue
325 # Use ID properly, preferring uuid_id
326 research_id = row.get("uuid_id") or str(row["id"])
328 # Use stored category and topics - no defaults
329 category = metadata.get("category")
330 if not category: 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true
331 category = "[Uncategorized]"
333 topics = metadata.get("generated_topics")
334 if not topics: 334 ↛ 338line 334 didn't jump to line 338 because the condition on line 334 was always true
335 topics = ["[No topics]"]
337 # Extract top 3 links from the database content
338 links = []
339 if content: 339 ↛ 391line 339 didn't jump to line 391 because the condition on line 339 was always true
340 try:
341 report_lines = content.split("\n")
342 link_count = 0
343 for i, line in enumerate(
344 report_lines[:100]
345 ): # Check first 100 lines for links
346 if "URL:" in line:
347 url = line.split("URL:", 1)[1].strip()
348 if url.startswith("http"): 348 ↛ 343line 348 didn't jump to line 343 because the condition on line 348 was always true
349 # Get the title from the previous line if available
350 title = ""
351 if i > 0: 351 ↛ 362line 351 didn't jump to line 362 because the condition on line 351 was always true
352 title_line = report_lines[
353 i - 1
354 ].strip()
355 # Remove citation numbers like [12, 26, 19]
356 title = re.sub(
357 r"^\[[^\]]+\]\s*",
358 "",
359 title_line,
360 ).strip()
362 if not title: 362 ↛ 364line 362 didn't jump to line 364 because the condition on line 362 was never true
363 # Use domain as fallback
364 domain = url.split("//")[
365 -1
366 ].split("/")[0]
367 title = domain.replace(
368 "www.", ""
369 )
371 links.append(
372 {
373 "url": url,
374 "title": title[:50] + "..."
375 if len(title) > 50
376 else title,
377 }
378 )
379 link_count += 1
380 logger.debug(
381 f"Found link: {title} - {url}"
382 )
383 if link_count >= 3: 383 ↛ 384line 383 didn't jump to line 384 because the condition on line 383 was never true
384 break
385 except Exception:
386 logger.exception(
387 "Error extracting links from database content"
388 )
390 # Create news item from research
391 news_item = {
392 "id": f"news-{research_id}",
393 "headline": headline,
394 "category": category,
395 "summary": summary
396 or f"Research analysis for: {headline[:100]}",
397 "findings": findings,
398 "impact_score": metadata.get(
399 "impact_score", 0
400 ), # 0 indicates missing
401 "time_ago": _format_time_ago(row["created_at"]),
402 "upvotes": metadata.get("upvotes", 0),
403 "downvotes": metadata.get("downvotes", 0),
404 "source_url": f"/results/{research_id}",
405 "topics": topics, # Use generated topics
406 "links": links, # Add extracted links
407 "research_id": research_id,
408 "created_at": row["created_at"],
409 "duration_seconds": row.get("duration_seconds", 0),
410 "original_query": original_query, # Keep original query for reference
411 "is_news": metadata.get(
412 "is_news_search", False
413 ), # Flag for news searches
414 "news_date": metadata.get(
415 "news_date"
416 ), # If specific date for news
417 "news_source": metadata.get(
418 "news_source"
419 ), # If from specific source
420 "priority": metadata.get(
421 "priority", "normal"
422 ), # Priority level
423 }
425 news_items.append(news_item)
426 logger.info(f"Added news item: {headline[:50]}...")
428 if len(news_items) >= limit: 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true
429 break
431 except Exception:
432 error_count += 1
433 logger.exception(
434 f"Error processing research item with query: {row.get('query', 'UNKNOWN')[:100]}"
435 )
436 continue
438 logger.info(
439 f"Processing summary: total_results={len(results)}, processed={processed_count}, "
440 f"errors={error_count}, added={len(news_items)}"
441 )
443 # Log subscription-specific items if we were filtering
444 if subscription_id and subscription_id != "all":
445 sub_items = [
446 item for item in news_items if item.get("is_news", False)
447 ]
448 logger.info(
449 f"Subscription {subscription_id}: found {len(sub_items)} items"
450 )
452 except Exception as db_error:
453 logger.exception(f"Database error in research history: {db_error}")
454 raise DatabaseAccessException(
455 "research_history_query", str(db_error)
456 )
458 # If no news items found, return empty list
459 if not news_items:
460 logger.info("No news items found, returning empty list")
461 news_items = []
463 logger.info(f"Returning {len(news_items)} news items to client")
465 # Determine the source
466 source = (
467 "news_items"
468 if any(item.get("is_news", False) for item in news_items)
469 else "research_history"
470 )
472 return {
473 "news_items": news_items[:limit],
474 "generated_at": datetime.now(timezone.utc).isoformat(),
475 "focus": focus,
476 "search_strategy": search_strategy or "default",
477 "total_items": len(news_items),
478 "source": source,
479 }
481 except NewsAPIException:
482 # Re-raise our custom exceptions
483 raise
484 except Exception as e:
485 logger.exception("Error getting news feed")
486 raise NewsFeedGenerationException(str(e), user_id=user_id)
489def get_subscription_history(
490 subscription_id: str, limit: int = 20
491) -> Dict[str, Any]:
492 """
493 Get research history for a specific subscription.
495 Args:
496 subscription_id: The subscription UUID
497 limit: Maximum number of history items to return
499 Returns:
500 Dict containing subscription info and its research history
501 """
502 try:
503 from ..database.session_context import get_user_db_session
504 from ..database.models import ResearchHistory
505 from ..database.models.news import NewsSubscription
507 # Get subscription details using ORM from user's encrypted database
508 with get_user_db_session() as session:
509 subscription = (
510 session.query(NewsSubscription)
511 .filter_by(id=subscription_id)
512 .first()
513 )
515 if not subscription:
516 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException
518 # Convert to dict for response
519 subscription_dict = {
520 "id": subscription.id,
521 "query_or_topic": subscription.query_or_topic,
522 "subscription_type": subscription.subscription_type,
523 "refresh_interval_minutes": subscription.refresh_interval_minutes,
524 "refresh_count": subscription.refresh_count or 0,
525 "created_at": subscription.created_at.isoformat()
526 if subscription.created_at
527 else None,
528 "next_refresh": subscription.next_refresh.isoformat()
529 if subscription.next_refresh
530 else None,
531 }
533 # Now get research history from the research database
534 # Get user_id from subscription
535 sub_user_id = subscription_dict.get("user_id", "anonymous")
537 with get_user_db_session(sub_user_id) as db_session:
538 # Get all research runs that were triggered by this subscription
539 # Look for subscription_id in the research_meta JSON
540 # Note: JSON format has space after colon
541 like_pattern = f'%"subscription_id": "{subscription_id}"%'
542 logger.info(
543 f"Searching for research history with pattern: {like_pattern}"
544 )
546 history_items = (
547 db_session.query(ResearchHistory)
548 .filter(ResearchHistory.research_meta.like(like_pattern))
549 .order_by(ResearchHistory.created_at.desc())
550 .limit(limit)
551 .all()
552 )
554 # Convert to dict format for compatibility
555 history_items = [
556 {
557 "id": h.id,
558 "uuid_id": h.uuid_id,
559 "query": h.query,
560 "status": h.status,
561 "created_at": h.created_at.isoformat()
562 if h.created_at
563 else None,
564 "completed_at": h.completed_at.isoformat()
565 if h.completed_at
566 else None,
567 "duration_seconds": h.duration_seconds,
568 "research_meta": h.research_meta,
569 "report_path": h.report_path,
570 }
571 for h in history_items
572 ]
574 # Process history items
575 processed_history = []
576 for item in history_items:
577 processed_item = {
578 "research_id": item.get("uuid_id") or str(item.get("id")),
579 "query": item["query"],
580 "status": item["status"],
581 "created_at": item["created_at"],
582 "completed_at": item.get("completed_at"),
583 "duration_seconds": item.get("duration_seconds", 0),
584 "url": f"/progress/{item.get('uuid_id') or item.get('id')}",
585 }
587 # Parse metadata if available to get headline and topics
588 if item.get("research_meta"): 588 ↛ 603line 588 didn't jump to line 603 because the condition on line 588 was always true
589 try:
590 meta = json.loads(item["research_meta"])
591 processed_item["triggered_by"] = meta.get(
592 "triggered_by", "subscription"
593 )
594 # Add headline and topics from metadata
595 processed_item["headline"] = meta.get(
596 "generated_headline", "[No headline]"
597 )
598 processed_item["topics"] = meta.get("generated_topics", [])
599 except Exception:
600 processed_item["headline"] = "[No headline]"
601 processed_item["topics"] = []
602 else:
603 processed_item["headline"] = "[No headline]"
604 processed_item["topics"] = []
606 processed_history.append(processed_item)
608 return {
609 "subscription": subscription_dict,
610 "history": processed_history,
611 "total_runs": len(processed_history),
612 }
614 except NewsAPIException:
615 # Re-raise our custom exceptions
616 raise
617 except Exception as e:
618 logger.exception("Error getting subscription history")
619 raise DatabaseAccessException("get_subscription_history", str(e))
622def _format_time_ago(timestamp: str) -> str:
623 """Format timestamp as 'X hours ago' string."""
624 try:
625 from dateutil import parser
626 from loguru import logger
628 dt = parser.parse(timestamp)
630 # If dt is naive, assume it's in UTC
631 if dt.tzinfo is None:
632 dt = dt.replace(tzinfo=timezone.utc)
634 now = datetime.now(timezone.utc)
635 diff = now - dt
637 if diff.days > 0:
638 return f"{diff.days} day{'s' if diff.days > 1 else ''} ago"
639 if diff.seconds > 3600:
640 hours = diff.seconds // 3600
641 return f"{hours} hour{'s' if hours > 1 else ''} ago"
642 if diff.seconds > 60:
643 minutes = diff.seconds // 60
644 return f"{minutes} minute{'s' if minutes > 1 else ''} ago"
645 return "Just now"
646 except Exception:
647 logger.exception(f"Error parsing timestamp '{timestamp}'")
648 return "Recently"
651def get_subscription(subscription_id: str) -> Optional[Dict[str, Any]]:
652 """
653 Get a single subscription by ID.
655 Args:
656 subscription_id: Subscription identifier
658 Returns:
659 Dictionary with subscription data or None if not found
660 """
661 try:
662 # Get subscription directly from user's encrypted database
663 from ..database.session_context import get_user_db_session
664 from ..database.models.news import NewsSubscription
666 with get_user_db_session() as db_session:
667 subscription = (
668 db_session.query(NewsSubscription)
669 .filter_by(id=subscription_id)
670 .first()
671 )
673 if not subscription:
674 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException
676 # Convert to API format matching the template expectations
677 return {
678 "id": subscription.id,
679 "name": subscription.name or "",
680 "query_or_topic": subscription.query_or_topic,
681 "subscription_type": subscription.subscription_type,
682 "refresh_interval_minutes": subscription.refresh_interval_minutes,
683 "is_active": subscription.status == "active",
684 "status": subscription.status,
685 "folder_id": subscription.folder_id,
686 "model_provider": subscription.model_provider,
687 "model": subscription.model,
688 "search_strategy": subscription.search_strategy,
689 "custom_endpoint": subscription.custom_endpoint,
690 "search_engine": subscription.search_engine,
691 "search_iterations": subscription.search_iterations or 3,
692 "questions_per_iteration": subscription.questions_per_iteration
693 or 5,
694 "created_at": subscription.created_at.isoformat()
695 if subscription.created_at
696 else None,
697 "updated_at": subscription.updated_at.isoformat()
698 if subscription.updated_at
699 else None,
700 }
702 except NewsAPIException:
703 # Re-raise our custom exceptions
704 raise
705 except Exception as e:
706 logger.exception(f"Error getting subscription {subscription_id}")
707 raise DatabaseAccessException("get_subscription", str(e))
710def get_subscriptions(user_id: str) -> Dict[str, Any]:
711 """
712 Get all subscriptions for a user.
714 Args:
715 user_id: User identifier
717 Returns:
718 Dictionary with subscriptions list
719 """
720 try:
721 # Get subscriptions directly from user's encrypted database
722 from ..database.session_context import get_user_db_session
723 from ..database.models import ResearchHistory
724 from ..database.models.news import NewsSubscription
725 from sqlalchemy import func
727 sub_list = []
729 with get_user_db_session(user_id) as db_session:
730 # Query all subscriptions for this user
731 subscriptions = db_session.query(NewsSubscription).all()
733 for sub in subscriptions:
734 # Count actual research runs for this subscription
735 like_pattern = f'%"subscription_id": "{sub.id}"%'
736 total_runs = (
737 db_session.query(func.count(ResearchHistory.id))
738 .filter(ResearchHistory.research_meta.like(like_pattern))
739 .scalar()
740 or 0
741 )
743 # Convert ORM object to API format
744 sub_dict = {
745 "id": sub.id,
746 "query": sub.query_or_topic,
747 "type": sub.subscription_type,
748 "refresh_minutes": sub.refresh_interval_minutes,
749 "created_at": sub.created_at.isoformat()
750 if sub.created_at
751 else None,
752 "next_refresh": sub.next_refresh.isoformat()
753 if sub.next_refresh
754 else None,
755 "last_refreshed": sub.last_refresh.isoformat()
756 if sub.last_refresh
757 else None,
758 "is_active": sub.status == "active",
759 "total_runs": total_runs, # Use actual count from research_history
760 "name": sub.name or "",
761 "folder_id": sub.folder_id,
762 }
763 sub_list.append(sub_dict)
765 return {"subscriptions": sub_list, "total": len(sub_list)}
767 except Exception as e:
768 logger.exception("Error getting subscriptions")
769 raise DatabaseAccessException("get_subscriptions", str(e))
772def update_subscription(
773 subscription_id: str, data: Dict[str, Any]
774) -> Dict[str, Any]:
775 """
776 Update an existing subscription.
778 Args:
779 subscription_id: Subscription identifier
780 data: Dictionary with fields to update
782 Returns:
783 Dictionary with updated subscription data
784 """
785 try:
786 from ..database.session_context import get_user_db_session
787 from ..database.models.news import NewsSubscription
788 from datetime import datetime, timedelta
790 with get_user_db_session() as db_session:
791 # Get existing subscription
792 subscription = (
793 db_session.query(NewsSubscription)
794 .filter_by(id=subscription_id)
795 .first()
796 )
797 if not subscription:
798 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException
800 # Update fields
801 if "name" in data:
802 subscription.name = data["name"]
803 if "query_or_topic" in data:
804 subscription.query_or_topic = data["query_or_topic"]
805 if "subscription_type" in data: 805 ↛ 806line 805 didn't jump to line 806 because the condition on line 805 was never true
806 subscription.subscription_type = data["subscription_type"]
807 if "refresh_interval_minutes" in data:
808 old_interval = subscription.refresh_interval_minutes
809 subscription.refresh_interval_minutes = data[
810 "refresh_interval_minutes"
811 ]
812 # Recalculate next_refresh if interval changed
813 if old_interval != subscription.refresh_interval_minutes: 813 ↛ 817line 813 didn't jump to line 817 because the condition on line 813 was always true
814 subscription.next_refresh = datetime.now(UTC) + timedelta(
815 minutes=subscription.refresh_interval_minutes
816 )
817 if "is_active" in data:
818 subscription.status = (
819 "active" if data["is_active"] else "paused"
820 )
821 if "status" in data: 821 ↛ 822line 821 didn't jump to line 822 because the condition on line 821 was never true
822 subscription.status = data["status"]
823 if "folder_id" in data:
824 subscription.folder_id = data["folder_id"]
825 if "model_provider" in data:
826 subscription.model_provider = normalize_provider(
827 data["model_provider"]
828 )
829 if "model" in data:
830 subscription.model = data["model"]
831 if "search_strategy" in data: 831 ↛ 832line 831 didn't jump to line 832 because the condition on line 831 was never true
832 subscription.search_strategy = data["search_strategy"]
833 if "custom_endpoint" in data: 833 ↛ 834line 833 didn't jump to line 834 because the condition on line 833 was never true
834 subscription.custom_endpoint = data["custom_endpoint"]
835 if "search_engine" in data:
836 subscription.search_engine = data["search_engine"]
837 if "search_iterations" in data:
838 subscription.search_iterations = data["search_iterations"]
839 if "questions_per_iteration" in data:
840 subscription.questions_per_iteration = data[
841 "questions_per_iteration"
842 ]
844 # Update timestamp
845 subscription.updated_at = datetime.now(UTC)
847 # Commit changes
848 db_session.commit()
850 # Notify scheduler about updated subscription
851 _notify_scheduler_about_subscription_change("updated")
853 # Convert to API format
854 return {
855 "status": "success",
856 "subscription": {
857 "id": subscription.id,
858 "name": subscription.name or "",
859 "query_or_topic": subscription.query_or_topic,
860 "subscription_type": subscription.subscription_type,
861 "refresh_interval_minutes": subscription.refresh_interval_minutes,
862 "is_active": subscription.status == "active",
863 "status": subscription.status,
864 "folder_id": subscription.folder_id,
865 "model_provider": subscription.model_provider,
866 "model": subscription.model,
867 "search_strategy": subscription.search_strategy,
868 "custom_endpoint": subscription.custom_endpoint,
869 "search_engine": subscription.search_engine,
870 "search_iterations": subscription.search_iterations or 3,
871 "questions_per_iteration": subscription.questions_per_iteration
872 or 5,
873 },
874 }
876 except NewsAPIException:
877 # Re-raise our custom exceptions
878 raise
879 except Exception as e:
880 logger.exception("Error updating subscription")
881 raise SubscriptionUpdateException(subscription_id, str(e))
884def create_subscription(
885 user_id: str,
886 query: str,
887 subscription_type: str = "search",
888 refresh_minutes: Optional[int] = None,
889 source_research_id: Optional[str] = None,
890 model_provider: Optional[str] = None,
891 model: Optional[str] = None,
892 search_strategy: Optional[str] = None,
893 custom_endpoint: Optional[str] = None,
894 name: Optional[str] = None,
895 folder_id: Optional[str] = None,
896 is_active: bool = True,
897 search_engine: Optional[str] = None,
898 search_iterations: Optional[int] = None,
899 questions_per_iteration: Optional[int] = None,
900) -> Dict[str, Any]:
901 """
902 Create a new subscription for user.
904 Args:
905 user_id: User identifier
906 query: Search query or topic
907 subscription_type: "search" or "topic"
908 refresh_minutes: Refresh interval in minutes
910 Returns:
911 Dictionary with subscription details
912 """
913 try:
914 from ..database.session_context import get_user_db_session
915 from ..database.models.news import NewsSubscription
916 from datetime import datetime, timedelta
917 import uuid
919 # Get default refresh interval from settings if not provided
920 # NOTE: This API function accesses the settings DB for convenience when used
921 # within the Flask application context. For programmatic API access outside
922 # the web context, callers should provide refresh_minutes explicitly to avoid
923 # dependency on the settings database being initialized.
925 with get_user_db_session(user_id) as db_session:
926 if refresh_minutes is None:
927 try:
928 from ..utilities.db_utils import get_settings_manager
930 settings_manager = get_settings_manager(db_session, user_id)
931 refresh_minutes = settings_manager.get_setting(
932 "news.subscription.refresh_minutes", 240
933 )
934 except (ImportError, AttributeError, TypeError):
935 # Fallback for when settings DB is not available (e.g., programmatic API usage)
936 logger.debug(
937 "Settings manager not available, using default refresh_minutes"
938 )
939 refresh_minutes = 240 # Default to 4 hours
940 # Create new subscription
941 subscription = NewsSubscription(
942 id=str(uuid.uuid4()),
943 name=name,
944 query_or_topic=query,
945 subscription_type=subscription_type,
946 refresh_interval_minutes=refresh_minutes,
947 status="active" if is_active else "paused",
948 model_provider=normalize_provider(model_provider),
949 model=model,
950 search_strategy=search_strategy or "news_aggregation",
951 custom_endpoint=custom_endpoint,
952 folder_id=folder_id,
953 search_engine=search_engine,
954 search_iterations=search_iterations,
955 questions_per_iteration=questions_per_iteration,
956 created_at=datetime.now(UTC),
957 updated_at=datetime.now(UTC),
958 last_refresh=None,
959 next_refresh=datetime.now(UTC)
960 + timedelta(minutes=refresh_minutes),
961 source_id=source_research_id,
962 )
964 # Add to database
965 db_session.add(subscription)
966 db_session.commit()
968 # Notify scheduler about new subscription
969 _notify_scheduler_about_subscription_change("created", user_id)
971 return {
972 "status": "success",
973 "subscription_id": subscription.id,
974 "type": subscription_type,
975 "query": query,
976 "refresh_minutes": refresh_minutes,
977 }
979 except Exception as e:
980 logger.exception("Error creating subscription")
981 raise SubscriptionCreationException(
982 str(e), {"query": query, "type": subscription_type}
983 )
986def delete_subscription(subscription_id: str) -> Dict[str, Any]:
987 """
988 Delete a subscription.
990 Args:
991 subscription_id: ID of subscription to delete
993 Returns:
994 Dictionary with status
995 """
996 try:
997 from ..database.session_context import get_user_db_session
998 from ..database.models.news import NewsSubscription
1000 with get_user_db_session() as db_session:
1001 subscription = (
1002 db_session.query(NewsSubscription)
1003 .filter_by(id=subscription_id)
1004 .first()
1005 )
1006 if subscription:
1007 db_session.delete(subscription)
1008 db_session.commit()
1010 # Notify scheduler about deleted subscription
1011 _notify_scheduler_about_subscription_change("deleted")
1013 return {"status": "success", "deleted": subscription_id}
1014 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException
1015 except NewsAPIException:
1016 # Re-raise our custom exceptions
1017 raise
1018 except Exception as e:
1019 logger.exception("Error deleting subscription")
1020 raise SubscriptionDeletionException(subscription_id, str(e))
1023def get_votes_for_cards(card_ids: list, user_id: str) -> Dict[str, Any]:
1024 """
1025 Get vote counts and user's votes for multiple news cards.
1027 Args:
1028 card_ids: List of card IDs to get votes for
1029 user_id: User identifier (not used - per-user database)
1031 Returns:
1032 Dictionary with vote information for each card
1033 """
1034 from flask import session as flask_session, has_request_context
1035 from ..database.models.news import UserRating, RatingType
1036 from ..database.session_context import get_user_db_session
1038 # Resolve username before try block
1039 if not has_request_context(): 1039 ↛ 1046line 1039 didn't jump to line 1046 because the condition on line 1039 was always true
1040 # If called outside of request context (e.g., in tests), use user_id directly
1041 username = user_id if user_id else None
1042 if not username:
1043 raise ValueError("No username provided and no request context")
1044 else:
1045 # Get username from session
1046 username = flask_session.get("username")
1047 if not username:
1048 raise ValueError("No username in session")
1050 try:
1051 # Get database session
1052 with get_user_db_session(username) as db:
1053 results = {}
1055 for card_id in card_ids:
1056 # Get user's vote for this card
1057 user_vote = (
1058 db.query(UserRating)
1059 .filter_by(
1060 card_id=card_id, rating_type=RatingType.RELEVANCE
1061 )
1062 .first()
1063 )
1065 # Count total votes for this card
1066 upvotes = (
1067 db.query(UserRating)
1068 .filter_by(
1069 card_id=card_id,
1070 rating_type=RatingType.RELEVANCE,
1071 rating_value="up",
1072 )
1073 .count()
1074 )
1076 downvotes = (
1077 db.query(UserRating)
1078 .filter_by(
1079 card_id=card_id,
1080 rating_type=RatingType.RELEVANCE,
1081 rating_value="down",
1082 )
1083 .count()
1084 )
1086 results[card_id] = {
1087 "upvotes": upvotes,
1088 "downvotes": downvotes,
1089 "user_vote": user_vote.rating_value if user_vote else None,
1090 }
1092 return {"success": True, "votes": results}
1094 except Exception:
1095 logger.exception("Error getting votes for cards")
1096 raise
1099def submit_feedback(card_id: str, user_id: str, vote: str) -> Dict[str, Any]:
1100 """
1101 Submit feedback (vote) for a news card.
1103 Args:
1104 card_id: ID of the news card
1105 user_id: User identifier (not used - per-user database)
1106 vote: "up" or "down"
1108 Returns:
1109 Dictionary with updated vote counts
1110 """
1111 from flask import session as flask_session, has_request_context
1112 from sqlalchemy_utc import utcnow
1113 from ..database.models.news import UserRating, RatingType
1114 from ..database.session_context import get_user_db_session
1116 # Validate vote value
1117 if vote not in ["up", "down"]:
1118 raise ValueError(f"Invalid vote type: {vote}")
1120 # Resolve username before try block
1121 if not has_request_context(): 1121 ↛ 1128line 1121 didn't jump to line 1128 because the condition on line 1121 was always true
1122 # If called outside of request context (e.g., in tests), use user_id directly
1123 username = user_id if user_id else None
1124 if not username:
1125 raise ValueError("No username provided and no request context")
1126 else:
1127 # Get username from session
1128 username = flask_session.get("username")
1129 if not username:
1130 raise ValueError("No username in session")
1132 try:
1133 # Get database session
1134 with get_user_db_session(username) as db:
1135 # We don't check if the card exists in the database since news items
1136 # are generated dynamically and may not be stored as NewsCard entries
1138 # Check if user already voted on this card
1139 existing_rating = (
1140 db.query(UserRating)
1141 .filter_by(card_id=card_id, rating_type=RatingType.RELEVANCE)
1142 .first()
1143 )
1145 if existing_rating:
1146 # Update existing vote
1147 existing_rating.rating_value = vote
1148 existing_rating.created_at = utcnow()
1149 else:
1150 # Create new rating
1151 new_rating = UserRating(
1152 card_id=card_id,
1153 rating_type=RatingType.RELEVANCE,
1154 rating_value=vote,
1155 )
1156 db.add(new_rating)
1158 db.commit()
1160 # Count total votes for this card
1161 upvotes = (
1162 db.query(UserRating)
1163 .filter_by(
1164 card_id=card_id,
1165 rating_type=RatingType.RELEVANCE,
1166 rating_value="up",
1167 )
1168 .count()
1169 )
1171 downvotes = (
1172 db.query(UserRating)
1173 .filter_by(
1174 card_id=card_id,
1175 rating_type=RatingType.RELEVANCE,
1176 rating_value="down",
1177 )
1178 .count()
1179 )
1181 logger.info(
1182 f"Feedback submitted for card {card_id}: {vote} (up: {upvotes}, down: {downvotes})"
1183 )
1185 return {
1186 "success": True,
1187 "card_id": card_id,
1188 "vote": vote,
1189 "upvotes": upvotes,
1190 "downvotes": downvotes,
1191 }
1193 except Exception:
1194 logger.exception(f"Error submitting feedback for card {card_id}")
1195 raise
1198def research_news_item(card_id: str, depth: str = "quick") -> Dict[str, Any]:
1199 """
1200 Perform deeper research on a news item.
1202 Args:
1203 card_id: ID of the news card to research
1204 depth: Research depth - "quick", "detailed", or "report"
1206 Returns:
1207 Dictionary with research results
1208 """
1209 # TODO: Implement with per-user database for cards
1210 logger.warning(
1211 "research_news_item not yet implemented with per-user databases"
1212 )
1213 raise NotImplementedException("research_news_item")
1216def save_news_preferences(
1217 user_id: str, preferences: Dict[str, Any]
1218) -> Dict[str, Any]:
1219 """
1220 Save user preferences for news.
1222 Args:
1223 user_id: User identifier
1224 preferences: Dictionary of preferences to save
1226 Returns:
1227 Dictionary with status and message
1228 """
1229 # TODO: Implement with per-user database for preferences
1230 logger.warning(
1231 "save_news_preferences not yet implemented with per-user databases"
1232 )
1233 raise NotImplementedException("save_news_preferences")
1236def get_news_categories() -> Dict[str, Any]:
1237 """
1238 Get available news categories with counts.
1240 Returns:
1241 Dictionary with categories and statistics
1242 """
1243 # TODO: Implement with per-user database for categories
1244 logger.warning(
1245 "get_news_categories not yet implemented with per-user databases"
1246 )
1247 raise NotImplementedException("get_news_categories")