Coverage for src / local_deep_research / news / api.py: 88%
403 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Direct API functions for news system.
3These functions can be called directly by scheduler or wrapped by Flask endpoints.
4"""
6from typing import Dict, Any, Optional
7from datetime import datetime, timezone, UTC
8from loguru import logger
9import re
10import json
12from ..constants import ResearchStatus
13from .recommender.topic_based import TopicBasedRecommender
14from .exceptions import (
15 InvalidLimitException,
16 SubscriptionNotFoundException,
17 SubscriptionCreationException,
18 SubscriptionUpdateException,
19 SubscriptionDeletionException,
20 DatabaseAccessException,
21 NewsFeedGenerationException,
22 NotImplementedException,
23 NewsAPIException,
24)
25# Removed welcome feed import - no placeholders
26# get_db_setting not available in merged codebase
29# Global recommender instance (can be reused)
30_recommender = None
33def get_recommender():
34 """Get or create recommender instance"""
35 global _recommender
36 if _recommender is None:
37 _recommender = TopicBasedRecommender()
38 return _recommender
41def _notify_scheduler_about_subscription_change(
42 action: str, user_id: str = None
43):
44 """
45 Notify the scheduler about subscription changes.
47 Args:
48 action: The action performed (created, updated, deleted)
49 user_id: Optional user_id to use as fallback for username
50 """
51 try:
52 from flask import session as flask_session
53 from .subscription_manager.scheduler import get_news_scheduler
55 scheduler = get_news_scheduler()
56 if scheduler.is_running:
57 # Get username, with optional fallback to user_id
58 username = flask_session.get("username")
59 if not username and user_id:
60 username = user_id
62 # Get password from session password store
63 from ..database.session_passwords import session_password_store
65 session_id = flask_session.get("session_id")
66 password = None
67 if session_id and username: 67 ↛ 72line 67 didn't jump to line 72 because the condition on line 67 was always true
68 password = session_password_store.get_session_password(
69 username, session_id
70 )
72 if password:
73 # Update scheduler to reschedule subscriptions
74 scheduler.update_user_info(username, password)
75 logger.info(
76 f"Scheduler notified about {action} subscription for {username}"
77 )
78 else:
79 logger.warning(
80 f"Could not notify scheduler - no password available{' for ' + username if username else ''}"
81 )
82 except Exception:
83 logger.exception(
84 f"Could not notify scheduler about {action} subscription"
85 )
88def get_news_feed(
89 user_id: str = "anonymous",
90 limit: int = 20,
91 use_cache: bool = True,
92 focus: Optional[str] = None,
93 search_strategy: Optional[str] = None,
94 subscription_id: Optional[str] = None,
95) -> Dict[str, Any]:
96 """
97 Get personalized news feed by pulling from news_items table first, then research history.
99 Args:
100 user_id: User identifier
101 limit: Maximum number of cards to return
102 use_cache: Whether to use cached news
103 focus: Optional focus area for news
104 search_strategy: Override default recommendation strategy
106 Returns:
107 Dictionary with news items and metadata
108 """
109 try:
110 # Validate limit - allow any positive number
111 if limit < 1:
112 raise InvalidLimitException(limit)
114 logger.info(
115 f"get_news_feed called with user_id={user_id}, limit={limit}"
116 )
118 # News is always enabled for now - per-user settings will be handled later
119 # if not get_db_setting("news.enabled", True):
120 # return {"error": "News system is disabled", "news_items": []}
122 # Import database functions
123 from ..database.session_context import get_user_db_session
124 from ..database.models import ResearchHistory
126 news_items = []
127 remaining_limit = limit
129 # Query research history from user's database for news items
130 logger.info("Getting news items from research history")
131 try:
132 # Use the user_id provided to the function
133 with get_user_db_session(user_id) as db_session:
134 # Build query using ORM
135 query = db_session.query(ResearchHistory).filter(
136 ResearchHistory.status == ResearchStatus.COMPLETED
137 )
139 # Filter by subscription if provided
140 if subscription_id and subscription_id != "all":
141 # Use JSON containment for PostgreSQL or LIKE for SQLite
142 query = query.filter(
143 ResearchHistory.research_meta.like(
144 f'%"subscription_id":"{subscription_id}"%'
145 )
146 )
148 # Order by creation date and limit
149 results = (
150 query.order_by(ResearchHistory.created_at.desc())
151 .limit(remaining_limit * 2)
152 .all()
153 )
155 # Convert ORM objects to dictionaries for compatibility
156 results = [
157 {
158 "id": r.id,
159 "uuid_id": r.id, # In ResearchHistory, id is the UUID
160 "query": r.query,
161 "title": r.title
162 if hasattr(r, "title")
163 else None, # Include title field if exists
164 "created_at": r.created_at if r.created_at else None,
165 "completed_at": r.completed_at
166 if r.completed_at
167 else None,
168 "duration_seconds": r.duration_seconds
169 if hasattr(r, "duration_seconds")
170 else None,
171 "report_path": r.report_path
172 if hasattr(r, "report_path")
173 else None,
174 "report_content": r.report_content
175 if hasattr(r, "report_content")
176 else None, # Include database content
177 "research_meta": r.research_meta,
178 "status": r.status,
179 }
180 for r in results
181 ]
183 logger.info(f"Database returned {len(results)} research items")
184 if results and len(results) > 0:
185 logger.info(f"First row keys: {list(results[0].keys())}")
186 # Log first few items' metadata
187 for i, row in enumerate(results[:3]):
188 logger.info(
189 f"Item {i}: query='{row['query'][:50]}...', has meta={bool(row.get('research_meta'))}"
190 )
192 # Process results to find news items
193 processed_count = 0
194 error_count = 0
196 for row in results:
197 try:
198 # Parse metadata
199 metadata = {}
200 if row.get("research_meta"):
201 try:
202 # Handle both dict and string formats
203 if isinstance(row["research_meta"], dict):
204 metadata = row["research_meta"]
205 else:
206 metadata = json.loads(row["research_meta"])
207 except (json.JSONDecodeError, TypeError):
208 logger.exception("Error parsing metadata")
209 metadata = {}
211 # Check if this has news metadata (generated_headline or generated_topics)
212 # or if it's a news-related query
213 has_news_metadata = (
214 metadata.get("generated_headline") is not None
215 or metadata.get("generated_topics") is not None
216 )
218 query_lower = row["query"].lower()
219 is_news_query = (
220 has_news_metadata
221 or metadata.get("is_news_search")
222 or metadata.get("search_type") == "news_analysis"
223 or "breaking news" in query_lower
224 or "news stories" in query_lower
225 or (
226 "today" in query_lower
227 and (
228 "news" in query_lower
229 or "breaking" in query_lower
230 )
231 )
232 or "latest news" in query_lower
233 )
235 # Log the decision for first few items
236 if processed_count < 3 or error_count < 3: 236 ↛ 243line 236 didn't jump to line 243 because the condition on line 236 was always true
237 logger.info(
238 f"Item check: query='{row['query'][:30]}...', is_news_search={metadata.get('is_news_search')}, "
239 f"has_news_metadata={has_news_metadata}, is_news_query={is_news_query}"
240 )
242 # Only show items that have news metadata or are news queries
243 if is_news_query:
244 processed_count += 1
245 logger.info(
246 f"Processing research item #{processed_count}: {row['query'][:50]}..."
247 )
249 # Always use database content
250 findings = ""
251 summary = ""
252 report_content_db = row.get(
253 "report_content"
254 ) # Get database content
256 # Use database content
257 content = report_content_db
258 if content:
259 logger.debug(
260 f"Using database content for research {row['id']}"
261 )
263 # Process database content
264 lines = content.split("\n") if content else []
265 # Use full content as findings
266 findings = content
267 # Extract summary from first non-empty line
268 for line in lines: 268 ↛ 278line 268 didn't jump to line 278 because the loop on line 268 didn't complete
269 if line.strip() and not line.startswith("#"):
270 summary = line.strip()
271 break
272 else:
273 logger.debug(
274 f"No database content for research {row['id']}"
275 )
277 # Use stored headline/topics if available, otherwise generate
278 original_query = row["query"]
280 # Check for headline - first try database title, then metadata
281 headline = row.get("title") or metadata.get(
282 "generated_headline"
283 )
285 # For subscription results, generate headline from query if needed
286 if not headline and metadata.get("is_news_search"):
287 # Use subscription name or query as headline
288 subscription_name = metadata.get(
289 "subscription_name"
290 )
291 if subscription_name:
292 headline = f"News Update: {subscription_name}"
293 else:
294 # Generate headline from query
295 headline = f"News: {row['query'][:60]}..."
297 # Skip items without meaningful headlines or that are incomplete
298 if (
299 not headline
300 or headline == "[No headline available]"
301 ):
302 logger.debug(
303 f"Skipping item without headline: {row['id']}"
304 )
305 continue
307 # Skip items that are still in progress or suspended
308 if row["status"] in (
309 ResearchStatus.IN_PROGRESS,
310 ResearchStatus.SUSPENDED,
311 ):
312 logger.debug(
313 f"Skipping incomplete item: {row['id']} (status: {row['status']})"
314 )
315 continue
317 # Skip items without content (neither file nor database)
318 if not content:
319 logger.debug(
320 f"Skipping item without content: {row['id']}"
321 )
322 continue
324 # Use ID properly, preferring uuid_id
325 research_id = row.get("uuid_id") or str(row["id"])
327 # Use stored category and topics - no defaults
328 category = metadata.get("category")
329 if not category: 329 ↛ 332line 329 didn't jump to line 332 because the condition on line 329 was always true
330 category = "[Uncategorized]"
332 topics = metadata.get("generated_topics")
333 if not topics: 333 ↛ 337line 333 didn't jump to line 337 because the condition on line 333 was always true
334 topics = ["[No topics]"]
336 # Extract top 3 links from the database content
337 links = []
338 if content: 338 ↛ 390line 338 didn't jump to line 390 because the condition on line 338 was always true
339 try:
340 report_lines = content.split("\n")
341 link_count = 0
342 for i, line in enumerate(
343 report_lines[:100]
344 ): # Check first 100 lines for links
345 if "URL:" in line:
346 url = line.split("URL:", 1)[1].strip()
347 if url.startswith("http"): 347 ↛ 342line 347 didn't jump to line 342 because the condition on line 347 was always true
348 # Get the title from the previous line if available
349 title = ""
350 if i > 0: 350 ↛ 361line 350 didn't jump to line 361 because the condition on line 350 was always true
351 title_line = report_lines[
352 i - 1
353 ].strip()
354 # Remove citation numbers like [12, 26, 19]
355 title = re.sub(
356 r"^\[[^\]]+\]\s*",
357 "",
358 title_line,
359 ).strip()
361 if not title: 361 ↛ 363line 361 didn't jump to line 363 because the condition on line 361 was never true
362 # Use domain as fallback
363 domain = url.split("//")[
364 -1
365 ].split("/")[0]
366 title = domain.replace(
367 "www.", ""
368 )
370 links.append(
371 {
372 "url": url,
373 "title": title[:50] + "..."
374 if len(title) > 50
375 else title,
376 }
377 )
378 link_count += 1
379 logger.debug(
380 f"Found link: {title} - {url}"
381 )
382 if link_count >= 3: 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true
383 break
384 except Exception:
385 logger.exception(
386 "Error extracting links from database content"
387 )
389 # Create news item from research
390 news_item = {
391 "id": f"news-{research_id}",
392 "headline": headline,
393 "category": category,
394 "summary": summary
395 or f"Research analysis for: {headline[:100]}",
396 "findings": findings,
397 "impact_score": metadata.get(
398 "impact_score", 0
399 ), # 0 indicates missing
400 "time_ago": _format_time_ago(row["created_at"]),
401 "upvotes": metadata.get("upvotes", 0),
402 "downvotes": metadata.get("downvotes", 0),
403 "source_url": f"/results/{research_id}",
404 "topics": topics, # Use generated topics
405 "links": links, # Add extracted links
406 "research_id": research_id,
407 "created_at": row["created_at"],
408 "duration_seconds": row.get("duration_seconds", 0),
409 "original_query": original_query, # Keep original query for reference
410 "is_news": metadata.get(
411 "is_news_search", False
412 ), # Flag for news searches
413 "news_date": metadata.get(
414 "news_date"
415 ), # If specific date for news
416 "news_source": metadata.get(
417 "news_source"
418 ), # If from specific source
419 "priority": metadata.get(
420 "priority", "normal"
421 ), # Priority level
422 }
424 news_items.append(news_item)
425 logger.info(f"Added news item: {headline[:50]}...")
427 if len(news_items) >= limit: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true
428 break
430 except Exception:
431 error_count += 1
432 logger.exception(
433 f"Error processing research item with query: {row.get('query', 'UNKNOWN')[:100]}"
434 )
435 continue
437 logger.info(
438 f"Processing summary: total_results={len(results)}, processed={processed_count}, "
439 f"errors={error_count}, added={len(news_items)}"
440 )
442 # Log subscription-specific items if we were filtering
443 if subscription_id and subscription_id != "all":
444 sub_items = [
445 item for item in news_items if item.get("is_news", False)
446 ]
447 logger.info(
448 f"Subscription {subscription_id}: found {len(sub_items)} items"
449 )
451 except Exception as db_error:
452 logger.exception(f"Database error in research history: {db_error}")
453 raise DatabaseAccessException(
454 "research_history_query", str(db_error)
455 )
457 # If no news items found, return empty list
458 if not news_items:
459 logger.info("No news items found, returning empty list")
460 news_items = []
462 logger.info(f"Returning {len(news_items)} news items to client")
464 # Determine the source
465 source = (
466 "news_items"
467 if any(item.get("is_news", False) for item in news_items)
468 else "research_history"
469 )
471 return {
472 "news_items": news_items[:limit],
473 "generated_at": datetime.now(timezone.utc).isoformat(),
474 "focus": focus,
475 "search_strategy": search_strategy or "default",
476 "total_items": len(news_items),
477 "source": source,
478 }
480 except NewsAPIException:
481 # Re-raise our custom exceptions
482 raise
483 except Exception as e:
484 logger.exception("Error getting news feed")
485 raise NewsFeedGenerationException(str(e), user_id=user_id)
488def get_subscription_history(
489 subscription_id: str, limit: int = 20
490) -> Dict[str, Any]:
491 """
492 Get research history for a specific subscription.
494 Args:
495 subscription_id: The subscription UUID
496 limit: Maximum number of history items to return
498 Returns:
499 Dict containing subscription info and its research history
500 """
501 try:
502 from ..database.session_context import get_user_db_session
503 from ..database.models import ResearchHistory
504 from ..database.models.news import NewsSubscription
506 # Get subscription details using ORM from user's encrypted database
507 with get_user_db_session() as session:
508 subscription = (
509 session.query(NewsSubscription)
510 .filter_by(id=subscription_id)
511 .first()
512 )
514 if not subscription:
515 raise SubscriptionNotFoundException(subscription_id)
517 # Convert to dict for response
518 subscription_dict = {
519 "id": subscription.id,
520 "query_or_topic": subscription.query_or_topic,
521 "subscription_type": subscription.subscription_type,
522 "refresh_interval_minutes": subscription.refresh_interval_minutes,
523 "refresh_count": subscription.refresh_count or 0,
524 "created_at": subscription.created_at.isoformat()
525 if subscription.created_at
526 else None,
527 "next_refresh": subscription.next_refresh.isoformat()
528 if subscription.next_refresh
529 else None,
530 }
532 # Now get research history from the research database
533 # Get user_id from subscription
534 sub_user_id = subscription_dict.get("user_id", "anonymous")
536 with get_user_db_session(sub_user_id) as db_session:
537 # Get all research runs that were triggered by this subscription
538 # Look for subscription_id in the research_meta JSON
539 # Note: JSON format has space after colon
540 like_pattern = f'%"subscription_id": "{subscription_id}"%'
541 logger.info(
542 f"Searching for research history with pattern: {like_pattern}"
543 )
545 history_items = (
546 db_session.query(ResearchHistory)
547 .filter(ResearchHistory.research_meta.like(like_pattern))
548 .order_by(ResearchHistory.created_at.desc())
549 .limit(limit)
550 .all()
551 )
553 # Convert to dict format for compatibility
554 history_items = [
555 {
556 "id": h.id,
557 "uuid_id": h.uuid_id,
558 "query": h.query,
559 "status": h.status,
560 "created_at": h.created_at.isoformat()
561 if h.created_at
562 else None,
563 "completed_at": h.completed_at.isoformat()
564 if h.completed_at
565 else None,
566 "duration_seconds": h.duration_seconds,
567 "research_meta": h.research_meta,
568 "report_path": h.report_path,
569 }
570 for h in history_items
571 ]
573 # Process history items
574 processed_history = []
575 for item in history_items:
576 processed_item = {
577 "research_id": item.get("uuid_id") or str(item.get("id")),
578 "query": item["query"],
579 "status": item["status"],
580 "created_at": item["created_at"],
581 "completed_at": item.get("completed_at"),
582 "duration_seconds": item.get("duration_seconds", 0),
583 "url": f"/progress/{item.get('uuid_id') or item.get('id')}",
584 }
586 # Parse metadata if available to get headline and topics
587 if item.get("research_meta"): 587 ↛ 602line 587 didn't jump to line 602 because the condition on line 587 was always true
588 try:
589 meta = json.loads(item["research_meta"])
590 processed_item["triggered_by"] = meta.get(
591 "triggered_by", "subscription"
592 )
593 # Add headline and topics from metadata
594 processed_item["headline"] = meta.get(
595 "generated_headline", "[No headline]"
596 )
597 processed_item["topics"] = meta.get("generated_topics", [])
598 except Exception:
599 processed_item["headline"] = "[No headline]"
600 processed_item["topics"] = []
601 else:
602 processed_item["headline"] = "[No headline]"
603 processed_item["topics"] = []
605 processed_history.append(processed_item)
607 return {
608 "subscription": subscription_dict,
609 "history": processed_history,
610 "total_runs": len(processed_history),
611 }
613 except NewsAPIException:
614 # Re-raise our custom exceptions
615 raise
616 except Exception as e:
617 logger.exception("Error getting subscription history")
618 raise DatabaseAccessException("get_subscription_history", str(e))
621def _format_time_ago(timestamp: str) -> str:
622 """Format timestamp as 'X hours ago' string."""
623 try:
624 from dateutil import parser
625 from loguru import logger
627 dt = parser.parse(timestamp)
629 # If dt is naive, assume it's in UTC
630 if dt.tzinfo is None:
631 dt = dt.replace(tzinfo=timezone.utc)
633 now = datetime.now(timezone.utc)
634 diff = now - dt
636 if diff.days > 0:
637 return f"{diff.days} day{'s' if diff.days > 1 else ''} ago"
638 elif diff.seconds > 3600:
639 hours = diff.seconds // 3600
640 return f"{hours} hour{'s' if hours > 1 else ''} ago"
641 elif diff.seconds > 60:
642 minutes = diff.seconds // 60
643 return f"{minutes} minute{'s' if minutes > 1 else ''} ago"
644 else:
645 return "Just now"
646 except Exception:
647 logger.exception(f"Error parsing timestamp '{timestamp}'")
648 return "Recently"
651def get_subscription(subscription_id: str) -> Optional[Dict[str, Any]]:
652 """
653 Get a single subscription by ID.
655 Args:
656 subscription_id: Subscription identifier
658 Returns:
659 Dictionary with subscription data or None if not found
660 """
661 try:
662 # Get subscription directly from user's encrypted database
663 from ..database.session_context import get_user_db_session
664 from ..database.models.news import NewsSubscription
666 with get_user_db_session() as db_session:
667 subscription = (
668 db_session.query(NewsSubscription)
669 .filter_by(id=subscription_id)
670 .first()
671 )
673 if not subscription:
674 raise SubscriptionNotFoundException(subscription_id)
676 # Convert to API format matching the template expectations
677 return {
678 "id": subscription.id,
679 "name": subscription.name or "",
680 "query_or_topic": subscription.query_or_topic,
681 "subscription_type": subscription.subscription_type,
682 "refresh_interval_minutes": subscription.refresh_interval_minutes,
683 "is_active": subscription.status == "active",
684 "status": subscription.status,
685 "folder_id": subscription.folder_id,
686 "model_provider": subscription.model_provider,
687 "model": subscription.model,
688 "search_strategy": subscription.search_strategy,
689 "custom_endpoint": subscription.custom_endpoint,
690 "search_engine": subscription.search_engine,
691 "search_iterations": subscription.search_iterations or 3,
692 "questions_per_iteration": subscription.questions_per_iteration
693 or 5,
694 "created_at": subscription.created_at.isoformat()
695 if subscription.created_at
696 else None,
697 "updated_at": subscription.updated_at.isoformat()
698 if subscription.updated_at
699 else None,
700 }
702 except NewsAPIException:
703 # Re-raise our custom exceptions
704 raise
705 except Exception as e:
706 logger.exception(f"Error getting subscription {subscription_id}")
707 raise DatabaseAccessException("get_subscription", str(e))
710def get_subscriptions(user_id: str) -> Dict[str, Any]:
711 """
712 Get all subscriptions for a user.
714 Args:
715 user_id: User identifier
717 Returns:
718 Dictionary with subscriptions list
719 """
720 try:
721 # Get subscriptions directly from user's encrypted database
722 from ..database.session_context import get_user_db_session
723 from ..database.models import ResearchHistory
724 from ..database.models.news import NewsSubscription
725 from sqlalchemy import func
727 sub_list = []
729 with get_user_db_session(user_id) as db_session:
730 # Query all subscriptions for this user
731 subscriptions = db_session.query(NewsSubscription).all()
733 for sub in subscriptions:
734 # Count actual research runs for this subscription
735 like_pattern = f'%"subscription_id": "{sub.id}"%'
736 total_runs = (
737 db_session.query(func.count(ResearchHistory.id))
738 .filter(ResearchHistory.research_meta.like(like_pattern))
739 .scalar()
740 or 0
741 )
743 # Convert ORM object to API format
744 sub_dict = {
745 "id": sub.id,
746 "query": sub.query_or_topic,
747 "type": sub.subscription_type,
748 "refresh_minutes": sub.refresh_interval_minutes,
749 "created_at": sub.created_at.isoformat()
750 if sub.created_at
751 else None,
752 "next_refresh": sub.next_refresh.isoformat()
753 if sub.next_refresh
754 else None,
755 "last_refreshed": sub.last_refresh.isoformat()
756 if sub.last_refresh
757 else None,
758 "is_active": sub.status == "active",
759 "total_runs": total_runs, # Use actual count from research_history
760 "name": sub.name or "",
761 "folder_id": sub.folder_id,
762 }
763 sub_list.append(sub_dict)
765 return {"subscriptions": sub_list, "total": len(sub_list)}
767 except Exception as e:
768 logger.exception("Error getting subscriptions")
769 raise DatabaseAccessException("get_subscriptions", str(e))
772def update_subscription(
773 subscription_id: str, data: Dict[str, Any]
774) -> Dict[str, Any]:
775 """
776 Update an existing subscription.
778 Args:
779 subscription_id: Subscription identifier
780 data: Dictionary with fields to update
782 Returns:
783 Dictionary with updated subscription data
784 """
785 try:
786 from ..database.session_context import get_user_db_session
787 from ..database.models.news import NewsSubscription
788 from datetime import datetime, timedelta
790 with get_user_db_session() as db_session:
791 # Get existing subscription
792 subscription = (
793 db_session.query(NewsSubscription)
794 .filter_by(id=subscription_id)
795 .first()
796 )
797 if not subscription:
798 raise SubscriptionNotFoundException(subscription_id)
800 # Update fields
801 if "name" in data:
802 subscription.name = data["name"]
803 if "query_or_topic" in data:
804 subscription.query_or_topic = data["query_or_topic"]
805 if "subscription_type" in data: 805 ↛ 806line 805 didn't jump to line 806 because the condition on line 805 was never true
806 subscription.subscription_type = data["subscription_type"]
807 if "refresh_interval_minutes" in data:
808 old_interval = subscription.refresh_interval_minutes
809 subscription.refresh_interval_minutes = data[
810 "refresh_interval_minutes"
811 ]
812 # Recalculate next_refresh if interval changed
813 if old_interval != subscription.refresh_interval_minutes: 813 ↛ 817line 813 didn't jump to line 817 because the condition on line 813 was always true
814 subscription.next_refresh = datetime.now(UTC) + timedelta(
815 minutes=subscription.refresh_interval_minutes
816 )
817 if "is_active" in data:
818 subscription.status = (
819 "active" if data["is_active"] else "paused"
820 )
821 if "status" in data: 821 ↛ 822line 821 didn't jump to line 822 because the condition on line 821 was never true
822 subscription.status = data["status"]
823 if "folder_id" in data:
824 subscription.folder_id = data["folder_id"]
825 if "model_provider" in data:
826 subscription.model_provider = data["model_provider"]
827 if "model" in data:
828 subscription.model = data["model"]
829 if "search_strategy" in data: 829 ↛ 830line 829 didn't jump to line 830 because the condition on line 829 was never true
830 subscription.search_strategy = data["search_strategy"]
831 if "custom_endpoint" in data: 831 ↛ 832line 831 didn't jump to line 832 because the condition on line 831 was never true
832 subscription.custom_endpoint = data["custom_endpoint"]
833 if "search_engine" in data:
834 subscription.search_engine = data["search_engine"]
835 if "search_iterations" in data:
836 subscription.search_iterations = data["search_iterations"]
837 if "questions_per_iteration" in data:
838 subscription.questions_per_iteration = data[
839 "questions_per_iteration"
840 ]
842 # Update timestamp
843 subscription.updated_at = datetime.now(UTC)
845 # Commit changes
846 db_session.commit()
848 # Notify scheduler about updated subscription
849 _notify_scheduler_about_subscription_change("updated")
851 # Convert to API format
852 return {
853 "status": "success",
854 "subscription": {
855 "id": subscription.id,
856 "name": subscription.name or "",
857 "query_or_topic": subscription.query_or_topic,
858 "subscription_type": subscription.subscription_type,
859 "refresh_interval_minutes": subscription.refresh_interval_minutes,
860 "is_active": subscription.status == "active",
861 "status": subscription.status,
862 "folder_id": subscription.folder_id,
863 "model_provider": subscription.model_provider,
864 "model": subscription.model,
865 "search_strategy": subscription.search_strategy,
866 "custom_endpoint": subscription.custom_endpoint,
867 "search_engine": subscription.search_engine,
868 "search_iterations": subscription.search_iterations or 3,
869 "questions_per_iteration": subscription.questions_per_iteration
870 or 5,
871 },
872 }
874 except NewsAPIException:
875 # Re-raise our custom exceptions
876 raise
877 except Exception as e:
878 logger.exception("Error updating subscription")
879 raise SubscriptionUpdateException(subscription_id, str(e))
882def create_subscription(
883 user_id: str,
884 query: str,
885 subscription_type: str = "search",
886 refresh_minutes: int = None,
887 source_research_id: Optional[str] = None,
888 model_provider: Optional[str] = None,
889 model: Optional[str] = None,
890 search_strategy: Optional[str] = None,
891 custom_endpoint: Optional[str] = None,
892 name: Optional[str] = None,
893 folder_id: Optional[str] = None,
894 is_active: bool = True,
895 search_engine: Optional[str] = None,
896 search_iterations: Optional[int] = None,
897 questions_per_iteration: Optional[int] = None,
898) -> Dict[str, Any]:
899 """
900 Create a new subscription for user.
902 Args:
903 user_id: User identifier
904 query: Search query or topic
905 subscription_type: "search" or "topic"
906 refresh_minutes: Refresh interval in minutes
908 Returns:
909 Dictionary with subscription details
910 """
911 try:
912 from ..database.session_context import get_user_db_session
913 from ..database.models.news import NewsSubscription
914 from datetime import datetime, timedelta
915 import uuid
917 # Get default refresh interval from settings if not provided
918 # NOTE: This API function accesses the settings DB for convenience when used
919 # within the Flask application context. For programmatic API access outside
920 # the web context, callers should provide refresh_minutes explicitly to avoid
921 # dependency on the settings database being initialized.
923 with get_user_db_session(user_id) as db_session:
924 if refresh_minutes is None:
925 try:
926 from ..utilities.db_utils import get_settings_manager
928 settings_manager = get_settings_manager(db_session, user_id)
929 refresh_minutes = settings_manager.get_setting(
930 "news.subscription.refresh_minutes", 240
931 )
932 except (ImportError, AttributeError, TypeError):
933 # Fallback for when settings DB is not available (e.g., programmatic API usage)
934 logger.debug(
935 "Settings manager not available, using default refresh_minutes"
936 )
937 refresh_minutes = 240 # Default to 4 hours
938 # Create new subscription
939 subscription = NewsSubscription(
940 id=str(uuid.uuid4()),
941 name=name,
942 query_or_topic=query,
943 subscription_type=subscription_type,
944 refresh_interval_minutes=refresh_minutes,
945 status="active" if is_active else "paused",
946 model_provider=model_provider,
947 model=model,
948 search_strategy=search_strategy or "news_aggregation",
949 custom_endpoint=custom_endpoint,
950 folder_id=folder_id,
951 search_engine=search_engine,
952 search_iterations=search_iterations,
953 questions_per_iteration=questions_per_iteration,
954 created_at=datetime.now(UTC),
955 updated_at=datetime.now(UTC),
956 last_refresh=None,
957 next_refresh=datetime.now(UTC)
958 + timedelta(minutes=refresh_minutes),
959 source_id=source_research_id,
960 )
962 # Add to database
963 db_session.add(subscription)
964 db_session.commit()
966 # Notify scheduler about new subscription
967 _notify_scheduler_about_subscription_change("created", user_id)
969 return {
970 "status": "success",
971 "subscription_id": subscription.id,
972 "type": subscription_type,
973 "query": query,
974 "refresh_minutes": refresh_minutes,
975 }
977 except Exception as e:
978 logger.exception("Error creating subscription")
979 raise SubscriptionCreationException(
980 str(e), {"query": query, "type": subscription_type}
981 )
984def delete_subscription(subscription_id: str) -> Dict[str, Any]:
985 """
986 Delete a subscription.
988 Args:
989 subscription_id: ID of subscription to delete
991 Returns:
992 Dictionary with status
993 """
994 try:
995 from ..database.session_context import get_user_db_session
996 from ..database.models.news import NewsSubscription
998 with get_user_db_session() as db_session:
999 subscription = (
1000 db_session.query(NewsSubscription)
1001 .filter_by(id=subscription_id)
1002 .first()
1003 )
1004 if subscription:
1005 db_session.delete(subscription)
1006 db_session.commit()
1008 # Notify scheduler about deleted subscription
1009 _notify_scheduler_about_subscription_change("deleted")
1011 return {"status": "success", "deleted": subscription_id}
1012 else:
1013 raise SubscriptionNotFoundException(subscription_id)
1014 except NewsAPIException:
1015 # Re-raise our custom exceptions
1016 raise
1017 except Exception as e:
1018 logger.exception("Error deleting subscription")
1019 raise SubscriptionDeletionException(subscription_id, str(e))
1022def get_votes_for_cards(card_ids: list, user_id: str) -> Dict[str, Any]:
1023 """
1024 Get vote counts and user's votes for multiple news cards.
1026 Args:
1027 card_ids: List of card IDs to get votes for
1028 user_id: User identifier (not used - per-user database)
1030 Returns:
1031 Dictionary with vote information for each card
1032 """
1033 from flask import session as flask_session, has_request_context
1034 from ..database.models.news import UserRating, RatingType
1035 from ..database.session_context import get_user_db_session
1037 try:
1038 # Check if we're in a request context
1039 if not has_request_context(): 1039 ↛ 1046line 1039 didn't jump to line 1046 because the condition on line 1039 was always true
1040 # If called outside of request context (e.g., in tests), use user_id directly
1041 username = user_id if user_id else None
1042 if not username:
1043 raise ValueError("No username provided and no request context")
1044 else:
1045 # Get username from session
1046 username = flask_session.get("username")
1047 if not username:
1048 raise ValueError("No username in session")
1050 # Get database session
1051 with get_user_db_session(username) as db:
1052 results = {}
1054 for card_id in card_ids:
1055 # Get user's vote for this card
1056 user_vote = (
1057 db.query(UserRating)
1058 .filter_by(
1059 card_id=card_id, rating_type=RatingType.RELEVANCE
1060 )
1061 .first()
1062 )
1064 # Count total votes for this card
1065 upvotes = (
1066 db.query(UserRating)
1067 .filter_by(
1068 card_id=card_id,
1069 rating_type=RatingType.RELEVANCE,
1070 rating_value="up",
1071 )
1072 .count()
1073 )
1075 downvotes = (
1076 db.query(UserRating)
1077 .filter_by(
1078 card_id=card_id,
1079 rating_type=RatingType.RELEVANCE,
1080 rating_value="down",
1081 )
1082 .count()
1083 )
1085 results[card_id] = {
1086 "upvotes": upvotes,
1087 "downvotes": downvotes,
1088 "user_vote": user_vote.rating_value if user_vote else None,
1089 }
1091 return {"success": True, "votes": results}
1093 except Exception:
1094 logger.exception("Error getting votes for cards")
1095 raise
1098def submit_feedback(card_id: str, user_id: str, vote: str) -> Dict[str, Any]:
1099 """
1100 Submit feedback (vote) for a news card.
1102 Args:
1103 card_id: ID of the news card
1104 user_id: User identifier (not used - per-user database)
1105 vote: "up" or "down"
1107 Returns:
1108 Dictionary with updated vote counts
1109 """
1110 from flask import session as flask_session, has_request_context
1111 from sqlalchemy_utc import utcnow
1112 from ..database.models.news import UserRating, RatingType
1113 from ..database.session_context import get_user_db_session
1115 try:
1116 # Validate vote value
1117 if vote not in ["up", "down"]:
1118 raise ValueError(f"Invalid vote type: {vote}")
1120 # Check if we're in a request context
1121 if not has_request_context(): 1121 ↛ 1128line 1121 didn't jump to line 1128 because the condition on line 1121 was always true
1122 # If called outside of request context (e.g., in tests), use user_id directly
1123 username = user_id if user_id else None
1124 if not username:
1125 raise ValueError("No username provided and no request context")
1126 else:
1127 # Get username from session
1128 username = flask_session.get("username")
1129 if not username:
1130 raise ValueError("No username in session")
1132 # Get database session
1133 with get_user_db_session(username) as db:
1134 # We don't check if the card exists in the database since news items
1135 # are generated dynamically and may not be stored as NewsCard entries
1137 # Check if user already voted on this card
1138 existing_rating = (
1139 db.query(UserRating)
1140 .filter_by(card_id=card_id, rating_type=RatingType.RELEVANCE)
1141 .first()
1142 )
1144 if existing_rating:
1145 # Update existing vote
1146 existing_rating.rating_value = vote
1147 existing_rating.created_at = utcnow()
1148 else:
1149 # Create new rating
1150 new_rating = UserRating(
1151 card_id=card_id,
1152 rating_type=RatingType.RELEVANCE,
1153 rating_value=vote,
1154 )
1155 db.add(new_rating)
1157 db.commit()
1159 # Count total votes for this card
1160 upvotes = (
1161 db.query(UserRating)
1162 .filter_by(
1163 card_id=card_id,
1164 rating_type=RatingType.RELEVANCE,
1165 rating_value="up",
1166 )
1167 .count()
1168 )
1170 downvotes = (
1171 db.query(UserRating)
1172 .filter_by(
1173 card_id=card_id,
1174 rating_type=RatingType.RELEVANCE,
1175 rating_value="down",
1176 )
1177 .count()
1178 )
1180 logger.info(
1181 f"Feedback submitted for card {card_id}: {vote} (up: {upvotes}, down: {downvotes})"
1182 )
1184 return {
1185 "success": True,
1186 "card_id": card_id,
1187 "vote": vote,
1188 "upvotes": upvotes,
1189 "downvotes": downvotes,
1190 }
1192 except Exception:
1193 logger.exception(f"Error submitting feedback for card {card_id}")
1194 raise
1197def research_news_item(card_id: str, depth: str = "quick") -> Dict[str, Any]:
1198 """
1199 Perform deeper research on a news item.
1201 Args:
1202 card_id: ID of the news card to research
1203 depth: Research depth - "quick", "detailed", or "report"
1205 Returns:
1206 Dictionary with research results
1207 """
1208 # TODO: Implement with per-user database for cards
1209 logger.warning(
1210 "research_news_item not yet implemented with per-user databases"
1211 )
1212 raise NotImplementedException("research_news_item")
1215def save_news_preferences(
1216 user_id: str, preferences: Dict[str, Any]
1217) -> Dict[str, Any]:
1218 """
1219 Save user preferences for news.
1221 Args:
1222 user_id: User identifier
1223 preferences: Dictionary of preferences to save
1225 Returns:
1226 Dictionary with status and message
1227 """
1228 # TODO: Implement with per-user database for preferences
1229 logger.warning(
1230 "save_news_preferences not yet implemented with per-user databases"
1231 )
1232 raise NotImplementedException("save_news_preferences")
1235def get_news_categories() -> Dict[str, Any]:
1236 """
1237 Get available news categories with counts.
1239 Returns:
1240 Dictionary with categories and statistics
1241 """
1242 # TODO: Implement with per-user database for categories
1243 logger.warning(
1244 "get_news_categories not yet implemented with per-user databases"
1245 )
1246 raise NotImplementedException("get_news_categories")