Coverage for src / local_deep_research / news / api.py: 19%
417 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Direct API functions for news system.
3These functions can be called directly by scheduler or wrapped by Flask endpoints.
4"""
6from typing import Dict, Any, Optional
7from datetime import datetime, timezone, UTC
8from loguru import logger
9import re
10import json
12from .recommender.topic_based import TopicBasedRecommender
13from .exceptions import (
14 InvalidLimitException,
15 SubscriptionNotFoundException,
16 SubscriptionCreationException,
17 SubscriptionUpdateException,
18 SubscriptionDeletionException,
19 DatabaseAccessException,
20 NewsFeedGenerationException,
21 NotImplementedException,
22 NewsAPIException,
23)
24# Removed welcome feed import - no placeholders
25# get_db_setting not available in merged codebase
28# Global recommender instance (can be reused)
29_recommender = None
32def get_recommender():
33 """Get or create recommender instance"""
34 global _recommender
35 if _recommender is None:
36 _recommender = TopicBasedRecommender()
37 return _recommender
40def _notify_scheduler_about_subscription_change(
41 action: str, user_id: str = None
42):
43 """
44 Notify the scheduler about subscription changes.
46 Args:
47 action: The action performed (created, updated, deleted)
48 user_id: Optional user_id to use as fallback for username
49 """
50 try:
51 from flask import session as flask_session
52 from .subscription_manager.scheduler import get_news_scheduler
54 scheduler = get_news_scheduler()
55 if scheduler.is_running:
56 # Get username, with optional fallback to user_id
57 username = flask_session.get("username")
58 if not username and user_id:
59 username = user_id
61 # Get password from session password store
62 from ..database.session_passwords import session_password_store
64 session_id = flask_session.get("session_id")
65 password = None
66 if session_id and username:
67 password = session_password_store.get_session_password(
68 username, session_id
69 )
71 if password:
72 # Update scheduler to reschedule subscriptions
73 scheduler.update_user_info(username, password)
74 logger.info(
75 f"Scheduler notified about {action} subscription for {username}"
76 )
77 else:
78 logger.warning(
79 f"Could not notify scheduler - no password available{' for ' + username if username else ''}"
80 )
81 except Exception:
82 logger.exception(
83 f"Could not notify scheduler about {action} subscription"
84 )
87def get_news_feed(
88 user_id: str = "anonymous",
89 limit: int = 20,
90 use_cache: bool = True,
91 focus: Optional[str] = None,
92 search_strategy: Optional[str] = None,
93 subscription_id: Optional[str] = None,
94) -> Dict[str, Any]:
95 """
96 Get personalized news feed by pulling from news_items table first, then research history.
98 Args:
99 user_id: User identifier
100 limit: Maximum number of cards to return
101 use_cache: Whether to use cached news
102 focus: Optional focus area for news
103 search_strategy: Override default recommendation strategy
105 Returns:
106 Dictionary with news items and metadata
107 """
108 try:
109 # Validate limit - allow any positive number
110 if limit < 1:
111 raise InvalidLimitException(limit)
113 logger.info(
114 f"get_news_feed called with user_id={user_id}, limit={limit}"
115 )
117 # News is always enabled for now - per-user settings will be handled later
118 # if not get_db_setting("news.enabled", True):
119 # return {"error": "News system is disabled", "news_items": []}
121 # Import database functions
122 from ..database.session_context import get_user_db_session
123 from ..database.models import ResearchHistory
125 news_items = []
126 remaining_limit = limit
128 # Query research history from user's database for news items
129 logger.info("Getting news items from research history")
130 try:
131 # Use the user_id provided to the function
132 with get_user_db_session(user_id) as db_session:
133 # Build query using ORM
134 query = db_session.query(ResearchHistory).filter(
135 ResearchHistory.status == "completed"
136 )
138 # Filter by subscription if provided
139 if subscription_id and subscription_id != "all":
140 # Use JSON containment for PostgreSQL or LIKE for SQLite
141 query = query.filter(
142 ResearchHistory.research_meta.like(
143 f'%"subscription_id":"{subscription_id}"%'
144 )
145 )
147 # Order by creation date and limit
148 results = (
149 query.order_by(ResearchHistory.created_at.desc())
150 .limit(remaining_limit * 2)
151 .all()
152 )
154 # Convert ORM objects to dictionaries for compatibility
155 results = [
156 {
157 "id": r.id,
158 "uuid_id": r.id, # In ResearchHistory, id is the UUID
159 "query": r.query,
160 "title": r.title
161 if hasattr(r, "title")
162 else None, # Include title field if exists
163 "created_at": r.created_at if r.created_at else None,
164 "completed_at": r.completed_at
165 if r.completed_at
166 else None,
167 "duration_seconds": r.duration_seconds
168 if hasattr(r, "duration_seconds")
169 else None,
170 "report_path": r.report_path
171 if hasattr(r, "report_path")
172 else None,
173 "report_content": r.report_content
174 if hasattr(r, "report_content")
175 else None, # Include database content
176 "research_meta": r.research_meta,
177 "status": r.status,
178 }
179 for r in results
180 ]
182 logger.info(f"Database returned {len(results)} research items")
183 if results and len(results) > 0:
184 logger.info(f"First row keys: {list(results[0].keys())}")
185 # Log first few items' metadata
186 for i, row in enumerate(results[:3]):
187 logger.info(
188 f"Item {i}: query='{row['query'][:50]}...', has meta={bool(row.get('research_meta'))}"
189 )
191 # Process results to find news items
192 processed_count = 0
193 error_count = 0
195 for row in results:
196 try:
197 # Parse metadata
198 metadata = {}
199 if row.get("research_meta"):
200 try:
201 # Handle both dict and string formats
202 if isinstance(row["research_meta"], dict):
203 metadata = row["research_meta"]
204 else:
205 metadata = json.loads(row["research_meta"])
206 except (json.JSONDecodeError, TypeError):
207 logger.exception("Error parsing metadata")
208 metadata = {}
210 # Check if this has news metadata (generated_headline or generated_topics)
211 # or if it's a news-related query
212 has_news_metadata = (
213 metadata.get("generated_headline") is not None
214 or metadata.get("generated_topics") is not None
215 )
217 query_lower = row["query"].lower()
218 is_news_query = (
219 has_news_metadata
220 or metadata.get("is_news_search")
221 or metadata.get("search_type") == "news_analysis"
222 or "breaking news" in query_lower
223 or "news stories" in query_lower
224 or (
225 "today" in query_lower
226 and (
227 "news" in query_lower
228 or "breaking" in query_lower
229 )
230 )
231 or "latest news" in query_lower
232 )
234 # Log the decision for first few items
235 if processed_count < 3 or error_count < 3:
236 logger.info(
237 f"Item check: query='{row['query'][:30]}...', is_news_search={metadata.get('is_news_search')}, "
238 f"has_news_metadata={has_news_metadata}, is_news_query={is_news_query}"
239 )
241 # Only show items that have news metadata or are news queries
242 if is_news_query:
243 processed_count += 1
244 logger.info(
245 f"Processing research item #{processed_count}: {row['query'][:50]}..."
246 )
248 # Always use database content
249 findings = ""
250 summary = ""
251 report_content_db = row.get(
252 "report_content"
253 ) # Get database content
255 # Use database content
256 content = report_content_db
257 if content:
258 logger.debug(
259 f"Using database content for research {row['id']}"
260 )
262 # Process database content
263 lines = content.split("\n") if content else []
264 # Use full content as findings
265 findings = content
266 # Extract summary from first non-empty line
267 for line in lines:
268 if line.strip() and not line.startswith("#"):
269 summary = line.strip()
270 break
271 else:
272 logger.debug(
273 f"No database content for research {row['id']}"
274 )
276 # Use stored headline/topics if available, otherwise generate
277 original_query = row["query"]
279 # Check for headline - first try database title, then metadata
280 headline = row.get("title") or metadata.get(
281 "generated_headline"
282 )
284 # For subscription results, generate headline from query if needed
285 if not headline and metadata.get("is_news_search"):
286 # Use subscription name or query as headline
287 subscription_name = metadata.get(
288 "subscription_name"
289 )
290 if subscription_name:
291 headline = f"News Update: {subscription_name}"
292 else:
293 # Generate headline from query
294 headline = f"News: {row['query'][:60]}..."
296 # Skip items without meaningful headlines or that are incomplete
297 if (
298 not headline
299 or headline == "[No headline available]"
300 ):
301 logger.debug(
302 f"Skipping item without headline: {row['id']}"
303 )
304 continue
306 # Skip items that are still in progress or suspended
307 if row["status"] in ["in_progress", "suspended"]:
308 logger.debug(
309 f"Skipping incomplete item: {row['id']} (status: {row['status']})"
310 )
311 continue
313 # Skip items without content (neither file nor database)
314 if not content:
315 logger.debug(
316 f"Skipping item without content: {row['id']}"
317 )
318 continue
320 # Use ID properly, preferring uuid_id
321 research_id = row.get("uuid_id") or str(row["id"])
323 # Use stored category and topics - no defaults
324 category = metadata.get("category")
325 if not category:
326 category = "[Uncategorized]"
328 topics = metadata.get("generated_topics")
329 if not topics:
330 topics = ["[No topics]"]
332 # Extract top 3 links from the database content
333 links = []
334 if content:
335 try:
336 report_lines = content.split("\n")
337 link_count = 0
338 for i, line in enumerate(
339 report_lines[:100]
340 ): # Check first 100 lines for links
341 if "URL:" in line:
342 url = line.split("URL:", 1)[1].strip()
343 if url.startswith("http"):
344 # Get the title from the previous line if available
345 title = ""
346 if i > 0:
347 title_line = report_lines[
348 i - 1
349 ].strip()
350 # Remove citation numbers like [12, 26, 19]
351 title = re.sub(
352 r"^\[[^\]]+\]\s*",
353 "",
354 title_line,
355 ).strip()
357 if not title:
358 # Use domain as fallback
359 domain = url.split("//")[
360 -1
361 ].split("/")[0]
362 title = domain.replace(
363 "www.", ""
364 )
366 links.append(
367 {
368 "url": url,
369 "title": title[:50] + "..."
370 if len(title) > 50
371 else title,
372 }
373 )
374 link_count += 1
375 logger.debug(
376 f"Found link: {title} - {url}"
377 )
378 if link_count >= 3:
379 break
380 except Exception as e:
381 logger.exception(
382 f"Error extracting links from database content: {e}"
383 )
385 # Create news item from research
386 news_item = {
387 "id": f"news-{research_id}",
388 "headline": headline,
389 "category": category,
390 "summary": summary
391 or f"Research analysis for: {headline[:100]}",
392 "findings": findings,
393 "impact_score": metadata.get(
394 "impact_score", 0
395 ), # 0 indicates missing
396 "time_ago": _format_time_ago(row["created_at"]),
397 "upvotes": metadata.get("upvotes", 0),
398 "downvotes": metadata.get("downvotes", 0),
399 "source_url": f"/results/{research_id}",
400 "topics": topics, # Use generated topics
401 "links": links, # Add extracted links
402 "research_id": research_id,
403 "created_at": row["created_at"],
404 "duration_seconds": row.get("duration_seconds", 0),
405 "original_query": original_query, # Keep original query for reference
406 "is_news": metadata.get(
407 "is_news_search", False
408 ), # Flag for news searches
409 "news_date": metadata.get(
410 "news_date"
411 ), # If specific date for news
412 "news_source": metadata.get(
413 "news_source"
414 ), # If from specific source
415 "priority": metadata.get(
416 "priority", "normal"
417 ), # Priority level
418 }
420 news_items.append(news_item)
421 logger.info(f"Added news item: {headline[:50]}...")
423 if len(news_items) >= limit:
424 break
426 except Exception:
427 error_count += 1
428 logger.exception(
429 f"Error processing research item with query: {row.get('query', 'UNKNOWN')[:100]}"
430 )
431 continue
433 logger.info(
434 f"Processing summary: total_results={len(results)}, processed={processed_count}, "
435 f"errors={error_count}, added={len(news_items)}"
436 )
438 # Log subscription-specific items if we were filtering
439 if subscription_id and subscription_id != "all":
440 sub_items = [
441 item for item in news_items if item.get("is_news", False)
442 ]
443 logger.info(
444 f"Subscription {subscription_id}: found {len(sub_items)} items"
445 )
447 except Exception as db_error:
448 logger.exception(f"Database error in research history: {db_error}")
449 raise DatabaseAccessException(
450 "research_history_query", str(db_error)
451 )
453 # If no news items found, return empty list
454 if not news_items:
455 logger.info("No news items found, returning empty list")
456 news_items = []
458 logger.info(f"Returning {len(news_items)} news items to client")
460 # Determine the source
461 source = (
462 "news_items"
463 if any(item.get("is_news", False) for item in news_items)
464 else "research_history"
465 )
467 return {
468 "news_items": news_items[:limit],
469 "generated_at": datetime.now(timezone.utc).isoformat(),
470 "focus": focus,
471 "search_strategy": search_strategy or "default",
472 "total_items": len(news_items),
473 "source": source,
474 }
476 except NewsAPIException:
477 # Re-raise our custom exceptions
478 raise
479 except Exception as e:
480 logger.exception("Error getting news feed")
481 raise NewsFeedGenerationException(str(e), user_id=user_id)
484def debug_research_items(user_id: str):
485 """Debug function to check what's in the database."""
486 try:
487 from ..database.session_context import get_user_db_session
488 from ..database.models import ResearchHistory
489 from sqlalchemy import func
491 with get_user_db_session(user_id) as db_session:
492 # Count all research items
493 total = db_session.query(func.count(ResearchHistory.id)).scalar()
495 # Count by status
496 status_counts = (
497 db_session.query(
498 ResearchHistory.status,
499 func.count(ResearchHistory.id).label("count"),
500 )
501 .group_by(ResearchHistory.status)
502 .all()
503 )
505 # Convert to dict format
506 status_counts = [
507 {"status": status, "count": count}
508 for status, count in status_counts
509 ]
511 # Get recent items
512 recent = (
513 db_session.query(ResearchHistory)
514 .order_by(ResearchHistory.created_at.desc())
515 .limit(10)
516 .all()
517 )
519 # Convert to dict format
520 recent = [
521 {
522 "id": r.id,
523 "query": r.query,
524 "status": r.status,
525 "created_at": r.created_at.isoformat()
526 if r.created_at
527 else None,
528 }
529 for r in recent
530 ]
532 return {
533 "total_items": total,
534 "by_status": status_counts,
535 "recent_items": recent,
536 }
537 except Exception as e:
538 logger.exception("Error in debug_research_items")
539 raise DatabaseAccessException("debug_research_items", str(e))
542def get_subscription_history(
543 subscription_id: str, limit: int = 20
544) -> Dict[str, Any]:
545 """
546 Get research history for a specific subscription.
548 Args:
549 subscription_id: The subscription UUID
550 limit: Maximum number of history items to return
552 Returns:
553 Dict containing subscription info and its research history
554 """
555 try:
556 from ..database.session_context import get_user_db_session
557 from ..database.models import ResearchHistory
558 from ..database.models.news import NewsSubscription
560 # Get subscription details using ORM from user's encrypted database
561 with get_user_db_session() as session:
562 subscription = (
563 session.query(NewsSubscription)
564 .filter_by(id=subscription_id)
565 .first()
566 )
568 if not subscription: 568 ↛ 572line 568 didn't jump to line 572 because the condition on line 568 was always true
569 raise SubscriptionNotFoundException(subscription_id)
571 # Convert to dict for response
572 subscription_dict = {
573 "id": subscription.id,
574 "query_or_topic": subscription.query_or_topic,
575 "subscription_type": subscription.subscription_type,
576 "refresh_interval_minutes": subscription.refresh_interval_minutes,
577 "refresh_count": subscription.refresh_count or 0,
578 "created_at": subscription.created_at.isoformat()
579 if subscription.created_at
580 else None,
581 "next_refresh": subscription.next_refresh.isoformat()
582 if subscription.next_refresh
583 else None,
584 }
586 # Now get research history from the research database
587 # Get user_id from subscription
588 sub_user_id = subscription_dict.get("user_id", "anonymous")
590 with get_user_db_session(sub_user_id) as db_session:
591 # Get all research runs that were triggered by this subscription
592 # Look for subscription_id in the research_meta JSON
593 # Note: JSON format has space after colon
594 like_pattern = f'%"subscription_id": "{subscription_id}"%'
595 logger.info(
596 f"Searching for research history with pattern: {like_pattern}"
597 )
599 history_items = (
600 db_session.query(ResearchHistory)
601 .filter(ResearchHistory.research_meta.like(like_pattern))
602 .order_by(ResearchHistory.created_at.desc())
603 .limit(limit)
604 .all()
605 )
607 # Convert to dict format for compatibility
608 history_items = [
609 {
610 "id": h.id,
611 "uuid_id": h.uuid_id,
612 "query": h.query,
613 "status": h.status,
614 "created_at": h.created_at.isoformat()
615 if h.created_at
616 else None,
617 "completed_at": h.completed_at.isoformat()
618 if h.completed_at
619 else None,
620 "duration_seconds": h.duration_seconds,
621 "research_meta": h.research_meta,
622 "report_path": h.report_path,
623 }
624 for h in history_items
625 ]
627 # Process history items
628 processed_history = []
629 for item in history_items:
630 processed_item = {
631 "research_id": item.get("uuid_id") or str(item.get("id")),
632 "query": item["query"],
633 "status": item["status"],
634 "created_at": item["created_at"],
635 "completed_at": item.get("completed_at"),
636 "duration_seconds": item.get("duration_seconds", 0),
637 "url": f"/progress/{item.get('uuid_id') or item.get('id')}",
638 }
640 # Parse metadata if available to get headline and topics
641 if item.get("research_meta"):
642 try:
643 meta = json.loads(item["research_meta"])
644 processed_item["triggered_by"] = meta.get(
645 "triggered_by", "subscription"
646 )
647 # Add headline and topics from metadata
648 processed_item["headline"] = meta.get(
649 "generated_headline", "[No headline]"
650 )
651 processed_item["topics"] = meta.get("generated_topics", [])
652 except Exception:
653 processed_item["headline"] = "[No headline]"
654 processed_item["topics"] = []
655 else:
656 processed_item["headline"] = "[No headline]"
657 processed_item["topics"] = []
659 processed_history.append(processed_item)
661 return {
662 "subscription": subscription_dict,
663 "history": processed_history,
664 "total_runs": len(processed_history),
665 }
667 except NewsAPIException:
668 # Re-raise our custom exceptions
669 raise
670 except Exception as e:
671 logger.exception("Error getting subscription history")
672 raise DatabaseAccessException("get_subscription_history", str(e))
675def _format_time_ago(timestamp: str) -> str:
676 """Format timestamp as 'X hours ago' string."""
677 try:
678 from dateutil import parser
679 from loguru import logger
681 dt = parser.parse(timestamp)
683 # If dt is naive, assume it's in UTC
684 if dt.tzinfo is None:
685 dt = dt.replace(tzinfo=timezone.utc)
687 now = datetime.now(timezone.utc)
688 diff = now - dt
690 if diff.days > 0:
691 return f"{diff.days} day{'s' if diff.days > 1 else ''} ago"
692 elif diff.seconds > 3600:
693 hours = diff.seconds // 3600
694 return f"{hours} hour{'s' if hours > 1 else ''} ago"
695 elif diff.seconds > 60:
696 minutes = diff.seconds // 60
697 return f"{minutes} minute{'s' if minutes > 1 else ''} ago"
698 else:
699 return "Just now"
700 except Exception:
701 logger.exception(f"Error parsing timestamp '{timestamp}'")
702 return "Recently"
705def get_subscription(subscription_id: str) -> Optional[Dict[str, Any]]:
706 """
707 Get a single subscription by ID.
709 Args:
710 subscription_id: Subscription identifier
712 Returns:
713 Dictionary with subscription data or None if not found
714 """
715 try:
716 # Get subscription directly from user's encrypted database
717 from ..database.session_context import get_user_db_session
718 from ..database.models.news import NewsSubscription
720 with get_user_db_session() as db_session:
721 subscription = (
722 db_session.query(NewsSubscription)
723 .filter_by(id=subscription_id)
724 .first()
725 )
727 if not subscription: 727 ↛ 731line 727 didn't jump to line 731 because the condition on line 727 was always true
728 raise SubscriptionNotFoundException(subscription_id)
730 # Convert to API format matching the template expectations
731 return {
732 "id": subscription.id,
733 "name": subscription.name or "",
734 "query_or_topic": subscription.query_or_topic,
735 "subscription_type": subscription.subscription_type,
736 "refresh_interval_minutes": subscription.refresh_interval_minutes,
737 "is_active": subscription.status == "active",
738 "status": subscription.status,
739 "folder_id": subscription.folder_id,
740 "model_provider": subscription.model_provider,
741 "model": subscription.model,
742 "search_strategy": subscription.search_strategy,
743 "custom_endpoint": subscription.custom_endpoint,
744 "search_engine": subscription.search_engine,
745 "search_iterations": subscription.search_iterations or 3,
746 "questions_per_iteration": subscription.questions_per_iteration
747 or 5,
748 "created_at": subscription.created_at.isoformat()
749 if subscription.created_at
750 else None,
751 "updated_at": subscription.updated_at.isoformat()
752 if subscription.updated_at
753 else None,
754 }
756 except NewsAPIException:
757 # Re-raise our custom exceptions
758 raise
759 except Exception as e:
760 logger.exception(f"Error getting subscription {subscription_id}")
761 raise DatabaseAccessException("get_subscription", str(e))
764def get_subscriptions(user_id: str) -> Dict[str, Any]:
765 """
766 Get all subscriptions for a user.
768 Args:
769 user_id: User identifier
771 Returns:
772 Dictionary with subscriptions list
773 """
774 try:
775 # Get subscriptions directly from user's encrypted database
776 from ..database.session_context import get_user_db_session
777 from ..database.models import ResearchHistory
778 from ..database.models.news import NewsSubscription
779 from sqlalchemy import func
781 sub_list = []
783 with get_user_db_session(user_id) as db_session:
784 # Query all subscriptions for this user
785 subscriptions = db_session.query(NewsSubscription).all()
787 for sub in subscriptions:
788 # Count actual research runs for this subscription
789 like_pattern = f'%"subscription_id": "{sub.id}"%'
790 total_runs = (
791 db_session.query(func.count(ResearchHistory.id))
792 .filter(ResearchHistory.research_meta.like(like_pattern))
793 .scalar()
794 or 0
795 )
797 # Convert ORM object to API format
798 sub_dict = {
799 "id": sub.id,
800 "query": sub.query_or_topic,
801 "type": sub.subscription_type,
802 "refresh_minutes": sub.refresh_interval_minutes,
803 "created_at": sub.created_at.isoformat()
804 if sub.created_at
805 else None,
806 "next_refresh": sub.next_refresh.isoformat()
807 if sub.next_refresh
808 else None,
809 "last_refreshed": sub.last_refresh.isoformat()
810 if sub.last_refresh
811 else None,
812 "is_active": sub.status == "active",
813 "total_runs": total_runs, # Use actual count from research_history
814 "name": sub.name or "",
815 "folder_id": sub.folder_id,
816 }
817 sub_list.append(sub_dict)
819 return {"subscriptions": sub_list, "total": len(sub_list)}
821 except Exception as e:
822 logger.exception("Error getting subscriptions")
823 raise DatabaseAccessException("get_subscriptions", str(e))
826def update_subscription(
827 subscription_id: str, data: Dict[str, Any]
828) -> Dict[str, Any]:
829 """
830 Update an existing subscription.
832 Args:
833 subscription_id: Subscription identifier
834 data: Dictionary with fields to update
836 Returns:
837 Dictionary with updated subscription data
838 """
839 try:
840 from ..database.session_context import get_user_db_session
841 from ..database.models.news import NewsSubscription
842 from datetime import datetime, timedelta
844 with get_user_db_session() as db_session:
845 # Get existing subscription
846 subscription = (
847 db_session.query(NewsSubscription)
848 .filter_by(id=subscription_id)
849 .first()
850 )
851 if not subscription: 851 ↛ 855line 851 didn't jump to line 855 because the condition on line 851 was always true
852 raise SubscriptionNotFoundException(subscription_id)
854 # Update fields
855 if "name" in data:
856 subscription.name = data["name"]
857 if "query_or_topic" in data:
858 subscription.query_or_topic = data["query_or_topic"]
859 if "subscription_type" in data:
860 subscription.subscription_type = data["subscription_type"]
861 if "refresh_interval_minutes" in data:
862 old_interval = subscription.refresh_interval_minutes
863 subscription.refresh_interval_minutes = data[
864 "refresh_interval_minutes"
865 ]
866 # Recalculate next_refresh if interval changed
867 if old_interval != subscription.refresh_interval_minutes:
868 subscription.next_refresh = datetime.now(UTC) + timedelta(
869 minutes=subscription.refresh_interval_minutes
870 )
871 if "is_active" in data:
872 subscription.status = (
873 "active" if data["is_active"] else "paused"
874 )
875 if "status" in data:
876 subscription.status = data["status"]
877 if "folder_id" in data:
878 subscription.folder_id = data["folder_id"]
879 if "model_provider" in data:
880 subscription.model_provider = data["model_provider"]
881 if "model" in data:
882 subscription.model = data["model"]
883 if "search_strategy" in data:
884 subscription.search_strategy = data["search_strategy"]
885 if "custom_endpoint" in data:
886 subscription.custom_endpoint = data["custom_endpoint"]
887 if "search_engine" in data:
888 subscription.search_engine = data["search_engine"]
889 if "search_iterations" in data:
890 subscription.search_iterations = data["search_iterations"]
891 if "questions_per_iteration" in data:
892 subscription.questions_per_iteration = data[
893 "questions_per_iteration"
894 ]
896 # Update timestamp
897 subscription.updated_at = datetime.now(UTC)
899 # Commit changes
900 db_session.commit()
902 # Notify scheduler about updated subscription
903 _notify_scheduler_about_subscription_change("updated")
905 # Convert to API format
906 return {
907 "status": "success",
908 "subscription": {
909 "id": subscription.id,
910 "name": subscription.name or "",
911 "query_or_topic": subscription.query_or_topic,
912 "subscription_type": subscription.subscription_type,
913 "refresh_interval_minutes": subscription.refresh_interval_minutes,
914 "is_active": subscription.status == "active",
915 "status": subscription.status,
916 "folder_id": subscription.folder_id,
917 "model_provider": subscription.model_provider,
918 "model": subscription.model,
919 "search_strategy": subscription.search_strategy,
920 "custom_endpoint": subscription.custom_endpoint,
921 "search_engine": subscription.search_engine,
922 "search_iterations": subscription.search_iterations or 3,
923 "questions_per_iteration": subscription.questions_per_iteration
924 or 5,
925 },
926 }
928 except NewsAPIException:
929 # Re-raise our custom exceptions
930 raise
931 except Exception as e:
932 logger.exception("Error updating subscription")
933 raise SubscriptionUpdateException(subscription_id, str(e))
936def create_subscription(
937 user_id: str,
938 query: str,
939 subscription_type: str = "search",
940 refresh_minutes: int = None,
941 source_research_id: Optional[str] = None,
942 model_provider: Optional[str] = None,
943 model: Optional[str] = None,
944 search_strategy: Optional[str] = None,
945 custom_endpoint: Optional[str] = None,
946 name: Optional[str] = None,
947 folder_id: Optional[str] = None,
948 is_active: bool = True,
949 search_engine: Optional[str] = None,
950 search_iterations: Optional[int] = None,
951 questions_per_iteration: Optional[int] = None,
952) -> Dict[str, Any]:
953 """
954 Create a new subscription for user.
956 Args:
957 user_id: User identifier
958 query: Search query or topic
959 subscription_type: "search" or "topic"
960 refresh_minutes: Refresh interval in minutes
962 Returns:
963 Dictionary with subscription details
964 """
965 try:
966 from ..database.session_context import get_user_db_session
967 from ..database.models.news import NewsSubscription
968 from datetime import datetime, timedelta
969 import uuid
971 # Get default refresh interval from settings if not provided
972 # NOTE: This API function accesses the settings DB for convenience when used
973 # within the Flask application context. For programmatic API access outside
974 # the web context, callers should provide refresh_minutes explicitly to avoid
975 # dependency on the settings database being initialized.
977 with get_user_db_session(user_id) as db_session:
978 if refresh_minutes is None:
979 try:
980 from ..utilities.db_utils import get_settings_manager
982 settings_manager = get_settings_manager(db_session, user_id)
983 refresh_minutes = settings_manager.get_setting(
984 "news.subscription.refresh_minutes", 240
985 )
986 except (ImportError, AttributeError, TypeError):
987 # Fallback for when settings DB is not available (e.g., programmatic API usage)
988 logger.debug(
989 "Settings manager not available, using default refresh_minutes"
990 )
991 refresh_minutes = 240 # Default to 4 hours
992 # Create new subscription
993 subscription = NewsSubscription(
994 id=str(uuid.uuid4()),
995 name=name,
996 query_or_topic=query,
997 subscription_type=subscription_type,
998 refresh_interval_minutes=refresh_minutes,
999 status="active" if is_active else "paused",
1000 model_provider=model_provider,
1001 model=model,
1002 search_strategy=search_strategy or "news_aggregation",
1003 custom_endpoint=custom_endpoint,
1004 folder_id=folder_id,
1005 search_engine=search_engine,
1006 search_iterations=search_iterations,
1007 questions_per_iteration=questions_per_iteration,
1008 created_at=datetime.now(UTC),
1009 updated_at=datetime.now(UTC),
1010 last_refresh=None,
1011 next_refresh=datetime.now(UTC)
1012 + timedelta(minutes=refresh_minutes),
1013 source_id=source_research_id,
1014 )
1016 # Add to database
1017 db_session.add(subscription)
1018 db_session.commit()
1020 # Notify scheduler about new subscription
1021 _notify_scheduler_about_subscription_change("created", user_id)
1023 return {
1024 "status": "success",
1025 "subscription_id": subscription.id,
1026 "type": subscription_type,
1027 "query": query,
1028 "refresh_minutes": refresh_minutes,
1029 }
1031 except Exception as e:
1032 logger.exception("Error creating subscription")
1033 raise SubscriptionCreationException(
1034 str(e), {"query": query, "type": subscription_type}
1035 )
1038def delete_subscription(subscription_id: str) -> Dict[str, Any]:
1039 """
1040 Delete a subscription.
1042 Args:
1043 subscription_id: ID of subscription to delete
1045 Returns:
1046 Dictionary with status
1047 """
1048 try:
1049 from ..database.session_context import get_user_db_session
1050 from ..database.models.news import NewsSubscription
1052 with get_user_db_session() as db_session:
1053 subscription = (
1054 db_session.query(NewsSubscription)
1055 .filter_by(id=subscription_id)
1056 .first()
1057 )
1058 if subscription: 1058 ↛ 1059line 1058 didn't jump to line 1059 because the condition on line 1058 was never true
1059 db_session.delete(subscription)
1060 db_session.commit()
1062 # Notify scheduler about deleted subscription
1063 _notify_scheduler_about_subscription_change("deleted")
1065 return {"status": "success", "deleted": subscription_id}
1066 else:
1067 raise SubscriptionNotFoundException(subscription_id)
1068 except NewsAPIException:
1069 # Re-raise our custom exceptions
1070 raise
1071 except Exception as e:
1072 logger.exception("Error deleting subscription")
1073 raise SubscriptionDeletionException(subscription_id, str(e))
1076def get_votes_for_cards(card_ids: list, user_id: str) -> Dict[str, Any]:
1077 """
1078 Get vote counts and user's votes for multiple news cards.
1080 Args:
1081 card_ids: List of card IDs to get votes for
1082 user_id: User identifier (not used - per-user database)
1084 Returns:
1085 Dictionary with vote information for each card
1086 """
1087 from flask import session as flask_session, has_request_context
1088 from ..database.models.news import UserRating, RatingType
1089 from ..database.session_context import get_user_db_session
1091 try:
1092 # Check if we're in a request context
1093 if not has_request_context():
1094 # If called outside of request context (e.g., in tests), use user_id directly
1095 username = user_id if user_id else None
1096 if not username:
1097 raise ValueError("No username provided and no request context")
1098 else:
1099 # Get username from session
1100 username = flask_session.get("username")
1101 if not username:
1102 raise ValueError("No username in session")
1104 # Get database session
1105 with get_user_db_session(username) as db:
1106 results = {}
1108 for card_id in card_ids:
1109 # Get user's vote for this card
1110 user_vote = (
1111 db.query(UserRating)
1112 .filter_by(
1113 card_id=card_id, rating_type=RatingType.RELEVANCE
1114 )
1115 .first()
1116 )
1118 # Count total votes for this card
1119 upvotes = (
1120 db.query(UserRating)
1121 .filter_by(
1122 card_id=card_id,
1123 rating_type=RatingType.RELEVANCE,
1124 rating_value="up",
1125 )
1126 .count()
1127 )
1129 downvotes = (
1130 db.query(UserRating)
1131 .filter_by(
1132 card_id=card_id,
1133 rating_type=RatingType.RELEVANCE,
1134 rating_value="down",
1135 )
1136 .count()
1137 )
1139 results[card_id] = {
1140 "upvotes": upvotes,
1141 "downvotes": downvotes,
1142 "user_vote": user_vote.rating_value if user_vote else None,
1143 }
1145 return {"success": True, "votes": results}
1147 except Exception:
1148 logger.exception("Error getting votes for cards")
1149 raise
1152def submit_feedback(card_id: str, user_id: str, vote: str) -> Dict[str, Any]:
1153 """
1154 Submit feedback (vote) for a news card.
1156 Args:
1157 card_id: ID of the news card
1158 user_id: User identifier (not used - per-user database)
1159 vote: "up" or "down"
1161 Returns:
1162 Dictionary with updated vote counts
1163 """
1164 from flask import session as flask_session, has_request_context
1165 from sqlalchemy_utc import utcnow
1166 from ..database.models.news import UserRating, RatingType
1167 from ..database.session_context import get_user_db_session
1169 try:
1170 # Validate vote value
1171 if vote not in ["up", "down"]:
1172 raise ValueError(f"Invalid vote type: {vote}")
1174 # Check if we're in a request context
1175 if not has_request_context():
1176 # If called outside of request context (e.g., in tests), use user_id directly
1177 username = user_id if user_id else None
1178 if not username:
1179 raise ValueError("No username provided and no request context")
1180 else:
1181 # Get username from session
1182 username = flask_session.get("username")
1183 if not username:
1184 raise ValueError("No username in session")
1186 # Get database session
1187 with get_user_db_session(username) as db:
1188 # We don't check if the card exists in the database since news items
1189 # are generated dynamically and may not be stored as NewsCard entries
1191 # Check if user already voted on this card
1192 existing_rating = (
1193 db.query(UserRating)
1194 .filter_by(card_id=card_id, rating_type=RatingType.RELEVANCE)
1195 .first()
1196 )
1198 if existing_rating:
1199 # Update existing vote
1200 existing_rating.rating_value = vote
1201 existing_rating.created_at = utcnow()
1202 else:
1203 # Create new rating
1204 new_rating = UserRating(
1205 card_id=card_id,
1206 rating_type=RatingType.RELEVANCE,
1207 rating_value=vote,
1208 )
1209 db.add(new_rating)
1211 db.commit()
1213 # Count total votes for this card
1214 upvotes = (
1215 db.query(UserRating)
1216 .filter_by(
1217 card_id=card_id,
1218 rating_type=RatingType.RELEVANCE,
1219 rating_value="up",
1220 )
1221 .count()
1222 )
1224 downvotes = (
1225 db.query(UserRating)
1226 .filter_by(
1227 card_id=card_id,
1228 rating_type=RatingType.RELEVANCE,
1229 rating_value="down",
1230 )
1231 .count()
1232 )
1234 logger.info(
1235 f"Feedback submitted for card {card_id}: {vote} (up: {upvotes}, down: {downvotes})"
1236 )
1238 return {
1239 "success": True,
1240 "card_id": card_id,
1241 "vote": vote,
1242 "upvotes": upvotes,
1243 "downvotes": downvotes,
1244 }
1246 except Exception:
1247 logger.exception(f"Error submitting feedback for card {card_id}")
1248 raise
1251def research_news_item(card_id: str, depth: str = "quick") -> Dict[str, Any]:
1252 """
1253 Perform deeper research on a news item.
1255 Args:
1256 card_id: ID of the news card to research
1257 depth: Research depth - "quick", "detailed", or "report"
1259 Returns:
1260 Dictionary with research results
1261 """
1262 # TODO: Implement with per-user database for cards
1263 logger.warning(
1264 "research_news_item not yet implemented with per-user databases"
1265 )
1266 raise NotImplementedException("research_news_item")
1269def save_news_preferences(
1270 user_id: str, preferences: Dict[str, Any]
1271) -> Dict[str, Any]:
1272 """
1273 Save user preferences for news.
1275 Args:
1276 user_id: User identifier
1277 preferences: Dictionary of preferences to save
1279 Returns:
1280 Dictionary with status and message
1281 """
1282 # TODO: Implement with per-user database for preferences
1283 logger.warning(
1284 "save_news_preferences not yet implemented with per-user databases"
1285 )
1286 raise NotImplementedException("save_news_preferences")
1289def get_news_categories() -> Dict[str, Any]:
1290 """
1291 Get available news categories with counts.
1293 Returns:
1294 Dictionary with categories and statistics
1295 """
1296 # TODO: Implement with per-user database for categories
1297 logger.warning(
1298 "get_news_categories not yet implemented with per-user databases"
1299 )
1300 raise NotImplementedException("get_news_categories")