Coverage for src/local_deep_research/news/api.py: 89%
372 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Direct API functions for news system.
3These functions can be called directly by scheduler or wrapped by Flask endpoints.
4"""
6from typing import Dict, Any, Optional
7from datetime import datetime, timezone, UTC
8from loguru import logger
9import json
11from ..constants import ResearchStatus
12from ..llm.providers.base import normalize_provider
13from .exceptions import (
14 InvalidLimitException,
15 SubscriptionNotFoundException,
16 SubscriptionCreationException,
17 SubscriptionUpdateException,
18 SubscriptionDeletionException,
19 DatabaseAccessException,
20 NewsFeedGenerationException,
21 NotImplementedException,
22 NewsAPIException,
23)
24# Removed welcome feed import - no placeholders
25# get_db_setting not available in merged codebase
28def _notify_scheduler_about_subscription_change(
29 action: str, user_id: Optional[str] = None
30):
31 """
32 Notify the scheduler about subscription changes.
34 Args:
35 action: The action performed (created, updated, deleted)
36 user_id: Optional user_id to use as fallback for username
37 """
38 try:
39 from flask import session as flask_session
40 from ..scheduler.background import get_background_job_scheduler
42 scheduler = get_background_job_scheduler()
43 if scheduler.is_running:
44 # Get username, with optional fallback to user_id
45 username = flask_session.get("username")
46 if not username and user_id:
47 username = user_id
49 # Get password from session password store
50 from ..database.session_passwords import session_password_store
52 session_id = flask_session.get("session_id")
53 password = None
54 if session_id and username: 54 ↛ 59line 54 didn't jump to line 59 because the condition on line 54 was always true
55 password = session_password_store.get_session_password(
56 username, session_id
57 )
59 if password and username:
60 # Update scheduler to reschedule subscriptions
61 scheduler.update_user_info(username, password)
62 logger.info(
63 f"Scheduler notified about {action} subscription for {username}"
64 )
65 else:
66 logger.warning(
67 f"Could not notify scheduler - no password available{' for ' + username if username else ''}"
68 )
69 except Exception:
70 logger.exception(
71 f"Could not notify scheduler about {action} subscription"
72 )
75def get_news_feed(
76 user_id: str = "anonymous",
77 limit: int = 20,
78 use_cache: bool = True,
79 focus: Optional[str] = None,
80 search_strategy: Optional[str] = None,
81 subscription_id: Optional[str] = None,
82) -> Dict[str, Any]:
83 """
84 Get personalized news feed by pulling from news_items table first, then research history.
86 Args:
87 user_id: User identifier
88 limit: Maximum number of cards to return
89 use_cache: Whether to use cached news
90 focus: Optional focus area for news
91 search_strategy: Override default recommendation strategy
93 Returns:
94 Dictionary with news items and metadata. Each item's ``findings``
95 field is the answer-only report content (post chat-mode-v2 refactor,
96 #3665 Fix B); structured top-N source links are in the separate
97 ``links`` array, not embedded in ``findings``.
98 """
99 # Validate limit - allow any positive number
100 if limit < 1:
101 raise InvalidLimitException(limit)
103 try:
104 logger.info(
105 f"get_news_feed called with user_id={user_id}, limit={limit}"
106 )
108 # News is always enabled for now - per-user settings will be handled later
109 # if not get_db_setting("news.enabled", True):
110 # return {"error": "News system is disabled", "news_items": []}
112 # Import database functions
113 from ..database.session_context import get_user_db_session
114 from ..database.models import ResearchHistory
116 news_items = []
117 remaining_limit = limit
119 # Query research history from user's database for news items
120 logger.info("Getting news items from research history")
121 try:
122 # Use the user_id provided to the function
123 with get_user_db_session(user_id) as db_session:
124 # Build query using ORM
125 query = db_session.query(ResearchHistory).filter(
126 ResearchHistory.status == ResearchStatus.COMPLETED
127 )
129 # Filter by subscription if provided
130 if subscription_id and subscription_id != "all":
131 # Use JSON containment for PostgreSQL or LIKE for SQLite.
132 # Note: research_meta is serialized via json.dumps which
133 # emits a space after the colon, so the LIKE pattern must
134 # include that space too — otherwise the filter silently
135 # matches zero rows. Mirrors the patterns used in
136 # get_subscriptions and get_subscription_history below.
137 query = query.filter(
138 ResearchHistory.research_meta.like(
139 f'%"subscription_id": "{subscription_id}"%'
140 )
141 )
143 # Order by creation date and limit
144 results = (
145 query.order_by(ResearchHistory.created_at.desc())
146 .limit(remaining_limit * 2)
147 .all()
148 )
150 # Convert ORM objects to dictionaries for compatibility
151 results = [
152 {
153 "id": r.id,
154 "uuid_id": r.id, # In ResearchHistory, id is the UUID
155 "query": r.query,
156 "title": r.title
157 if hasattr(r, "title")
158 else None, # Include title field if exists
159 # created_at is NOT NULL (set to isoformat() on every
160 # insert), so it's always a usable timestamp string.
161 "created_at": r.created_at,
162 "completed_at": r.completed_at
163 if r.completed_at
164 else None,
165 "duration_seconds": r.duration_seconds
166 if hasattr(r, "duration_seconds")
167 else None,
168 "report_path": r.report_path
169 if hasattr(r, "report_path")
170 else None,
171 "report_content": r.report_content
172 if hasattr(r, "report_content")
173 else None, # Include database content
174 "research_meta": r.research_meta,
175 "status": r.status,
176 }
177 for r in results
178 ]
180 # Source links used to be parsed out of report_content via
181 # regex over `URL:` lines (when report_content held the
182 # inline ## Sources block). Now sources live in the
183 # research_resources table — fetch top-N for every row in
184 # ONE batched query (avoids N+1 in the loop below).
185 from ..web.services.report_assembly_service import (
186 get_research_source_links_batch,
187 )
189 research_ids_for_links = [r["id"] for r in results]
190 links_by_research_id = get_research_source_links_batch(
191 research_ids_for_links, db_session, limit=3
192 )
194 logger.info(f"Database returned {len(results)} research items")
195 if results and len(results) > 0:
196 logger.info(f"First row keys: {list(results[0].keys())}")
197 # Log first few items' metadata
198 for i, row in enumerate(results[:3]):
199 logger.info(
200 f"Item {i}: query='{row['query'][:50]}...', has meta={bool(row.get('research_meta'))}"
201 )
203 # Process results to find news items
204 processed_count = 0
205 error_count = 0
207 for row in results:
208 try:
209 # Parse metadata
210 metadata = {}
211 if row.get("research_meta"):
212 try:
213 # Handle both dict and string formats
214 if isinstance(row["research_meta"], dict):
215 metadata = row["research_meta"]
216 else:
217 metadata = json.loads(row["research_meta"])
218 except (json.JSONDecodeError, TypeError):
219 logger.exception("Error parsing metadata")
220 metadata = {}
222 # Check if this has news metadata (generated_headline or generated_topics)
223 # or if it's a news-related query
224 has_news_metadata = (
225 metadata.get("generated_headline") is not None
226 or metadata.get("generated_topics") is not None
227 )
229 query_lower = row["query"].lower()
230 is_news_query = (
231 has_news_metadata
232 or metadata.get("is_news_search")
233 or metadata.get("search_type") == "news_analysis"
234 or "breaking news" in query_lower
235 or "news stories" in query_lower
236 or (
237 "today" in query_lower
238 and (
239 "news" in query_lower
240 or "breaking" in query_lower
241 )
242 )
243 or "latest news" in query_lower
244 )
246 # Log the decision for first few items
247 if processed_count < 3 or error_count < 3: 247 ↛ 254line 247 didn't jump to line 254 because the condition on line 247 was always true
248 logger.info(
249 f"Item check: query='{row['query'][:30]}...', is_news_search={metadata.get('is_news_search')}, "
250 f"has_news_metadata={has_news_metadata}, is_news_query={is_news_query}"
251 )
253 # Only show items that have news metadata or are news queries
254 if is_news_query:
255 processed_count += 1
256 logger.info(
257 f"Processing research item #{processed_count}: {row['query'][:50]}..."
258 )
260 # Always use database content
261 findings = ""
262 summary = ""
263 report_content_db = row.get(
264 "report_content"
265 ) # Get database content
267 # Use database content
268 content = report_content_db
269 if content:
270 logger.debug(
271 f"Using database content for research {row['id']}"
272 )
274 # Process database content
275 lines = content.split("\n") if content else []
276 # `findings` is the answer-only report_content
277 # after the chat-mode-v2 refactor (#3665 Fix B,
278 # intentional). The legacy answer + ## Sources
279 # blob is gone: structured top-N source URLs live
280 # in the separate `links` array below, so snippet
281 # extraction is cleaner without Sources headers in
282 # the substrate.
283 findings = content
284 # Extract summary from first non-empty line
285 for line in lines: 285 ↛ 295line 285 didn't jump to line 295 because the loop on line 285 didn't complete
286 if line.strip() and not line.startswith("#"):
287 summary = line.strip()
288 break
289 else:
290 logger.debug(
291 f"No database content for research {row['id']}"
292 )
294 # Use stored headline/topics if available, otherwise generate
295 original_query = row["query"]
297 # Check for headline - first try database title, then metadata
298 headline = row.get("title") or metadata.get(
299 "generated_headline"
300 )
302 # For subscription results, generate headline from query if needed
303 if not headline and metadata.get("is_news_search"):
304 # Use subscription name or query as headline
305 subscription_name = metadata.get(
306 "subscription_name"
307 )
308 if subscription_name:
309 headline = f"News Update: {subscription_name}"
310 else:
311 # Generate headline from query
312 headline = f"News: {row['query'][:60]}..."
314 # Skip items without meaningful headlines or that are incomplete
315 if (
316 not headline
317 or headline == "[No headline available]"
318 ):
319 logger.debug(
320 f"Skipping item without headline: {row['id']}"
321 )
322 continue
324 # Skip items that are still in progress or suspended
325 if row["status"] in (
326 ResearchStatus.IN_PROGRESS,
327 ResearchStatus.SUSPENDED,
328 ):
329 logger.debug(
330 f"Skipping incomplete item: {row['id']} (status: {row['status']})"
331 )
332 continue
334 # Skip items without content (neither file nor database)
335 if not content:
336 logger.debug(
337 f"Skipping item without content: {row['id']}"
338 )
339 continue
341 # Use ID properly, preferring uuid_id
342 research_id = row.get("uuid_id") or str(row["id"])
344 # Use stored category and topics - no defaults
345 category = metadata.get("category")
346 if not category: 346 ↛ 349line 346 didn't jump to line 349 because the condition on line 346 was always true
347 category = "[Uncategorized]"
349 topics = metadata.get("generated_topics")
350 if not topics: 350 ↛ 356line 350 didn't jump to line 356 because the condition on line 350 was always true
351 topics = ["[No topics]"]
353 # Top-N links pulled from research_resources via
354 # the batch fetch above (no per-row DB query, no
355 # text parsing of report_content).
356 links = links_by_research_id.get(row["id"], [])
358 # Create news item from research
359 news_item = {
360 "id": f"news-{research_id}",
361 "headline": headline,
362 "category": category,
363 "summary": summary
364 or f"Research analysis for: {headline[:100]}",
365 "findings": findings,
366 "impact_score": metadata.get(
367 "impact_score", 0
368 ), # 0 indicates missing
369 "time_ago": _format_time_ago(row["created_at"]),
370 "upvotes": metadata.get("upvotes", 0),
371 "downvotes": metadata.get("downvotes", 0),
372 "source_url": f"/results/{research_id}",
373 "topics": topics, # Use generated topics
374 "links": links, # Add extracted links
375 "research_id": research_id,
376 "created_at": row["created_at"],
377 "duration_seconds": row.get("duration_seconds", 0),
378 "original_query": original_query, # Keep original query for reference
379 "is_news": metadata.get(
380 "is_news_search", False
381 ), # Flag for news searches
382 "news_date": metadata.get(
383 "news_date"
384 ), # If specific date for news
385 "news_source": metadata.get(
386 "news_source"
387 ), # If from specific source
388 "priority": metadata.get(
389 "priority", "normal"
390 ), # Priority level
391 }
393 news_items.append(news_item)
394 logger.info(f"Added news item: {headline[:50]}...")
396 if len(news_items) >= limit: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true
397 break
399 except Exception:
400 error_count += 1
401 logger.exception(
402 f"Error processing research item with query: {row.get('query', 'UNKNOWN')[:100]}"
403 )
404 continue
406 logger.info(
407 f"Processing summary: total_results={len(results)}, processed={processed_count}, "
408 f"errors={error_count}, added={len(news_items)}"
409 )
411 # Log subscription-specific items if we were filtering
412 if subscription_id and subscription_id != "all":
413 sub_items = [
414 item for item in news_items if item.get("is_news", False)
415 ]
416 logger.info(
417 f"Subscription {subscription_id}: found {len(sub_items)} items"
418 )
420 except Exception as db_error:
421 logger.exception(f"Database error in research history: {db_error}")
422 raise DatabaseAccessException(
423 "research_history_query", str(db_error)
424 )
426 # If no news items found, return empty list
427 if not news_items:
428 logger.info("No news items found, returning empty list")
429 news_items = []
431 logger.info(f"Returning {len(news_items)} news items to client")
433 # Determine the source
434 source = (
435 "news_items"
436 if any(item.get("is_news", False) for item in news_items)
437 else "research_history"
438 )
440 return {
441 "news_items": news_items[:limit],
442 "generated_at": datetime.now(timezone.utc).isoformat(),
443 "focus": focus,
444 "search_strategy": search_strategy or "default",
445 "total_items": len(news_items),
446 "source": source,
447 }
449 except NewsAPIException:
450 # Re-raise our custom exceptions
451 raise
452 except Exception as e:
453 logger.exception("Error getting news feed")
454 raise NewsFeedGenerationException(str(e), user_id=user_id)
457def get_subscription_history(
458 subscription_id: str, limit: int = 20
459) -> Dict[str, Any]:
460 """
461 Get research history for a specific subscription.
463 Args:
464 subscription_id: The subscription UUID
465 limit: Maximum number of history items to return
467 Returns:
468 Dict containing subscription info and its research history
469 """
470 try:
471 from ..database.session_context import get_user_db_session
472 from ..database.models import ResearchHistory
473 from ..database.models.news import NewsSubscription
475 # Get subscription details using ORM from user's encrypted database
476 with get_user_db_session() as session:
477 subscription = (
478 session.query(NewsSubscription)
479 .filter_by(id=subscription_id)
480 .first()
481 )
483 if not subscription:
484 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException
486 # Convert to dict for response
487 subscription_dict = {
488 "id": subscription.id,
489 "query_or_topic": subscription.query_or_topic,
490 "subscription_type": subscription.subscription_type,
491 "refresh_interval_minutes": subscription.refresh_interval_minutes,
492 "refresh_count": subscription.refresh_count or 0,
493 "created_at": subscription.created_at.isoformat()
494 if subscription.created_at
495 else None,
496 "next_refresh": subscription.next_refresh.isoformat()
497 if subscription.next_refresh
498 else None,
499 }
501 # Now get research history from the research database. The
502 # NewsSubscription model has no user_id column — this codebase uses
503 # per-user encrypted databases, so "the subscription's user" is just
504 # whichever user's DB we found the subscription in. Reuse the Flask
505 # session username (same source the first get_user_db_session()
506 # call resolved). The previous version of this code did
507 # ``subscription_dict.get("user_id", "anonymous")`` against a dict
508 # that never carried a "user_id" key, so it always opened the
509 # "anonymous" user's database and silently returned an empty
510 # history for every real multi-user deployment.
511 with get_user_db_session() as db_session:
512 # Get all research runs that were triggered by this subscription
513 # Look for subscription_id in the research_meta JSON
514 # Note: JSON format has space after colon
515 like_pattern = f'%"subscription_id": "{subscription_id}"%'
516 logger.info(
517 f"Searching for research history with pattern: {like_pattern}"
518 )
520 history_items = (
521 db_session.query(ResearchHistory)
522 .filter(ResearchHistory.research_meta.like(like_pattern))
523 .order_by(ResearchHistory.created_at.desc())
524 .limit(limit)
525 .all()
526 )
528 # Convert to dict format for compatibility.
529 # ResearchHistory.id is the UUID PK (see comment on line 151);
530 # there is no separate uuid_id column, so populate both keys
531 # from h.id to preserve the downstream contract used by the
532 # processed_item['research_id']/url builders below.
533 history_items = [
534 {
535 "id": h.id,
536 "uuid_id": h.id,
537 "query": h.query,
538 "status": h.status,
539 "created_at": h.created_at.isoformat()
540 if h.created_at
541 else None,
542 "completed_at": h.completed_at.isoformat()
543 if h.completed_at
544 else None,
545 "duration_seconds": h.duration_seconds,
546 "research_meta": h.research_meta,
547 "report_path": h.report_path,
548 }
549 for h in history_items
550 ]
552 # Process history items
553 processed_history = []
554 for item in history_items:
555 processed_item = {
556 "research_id": item.get("uuid_id") or str(item.get("id")),
557 "query": item["query"],
558 "status": item["status"],
559 "created_at": item["created_at"],
560 "completed_at": item.get("completed_at"),
561 "duration_seconds": item.get("duration_seconds", 0),
562 "url": f"/progress/{item.get('uuid_id') or item.get('id')}",
563 }
565 # Parse metadata if available to get headline and topics
566 if item.get("research_meta"): 566 ↛ 581line 566 didn't jump to line 581 because the condition on line 566 was always true
567 try:
568 meta = json.loads(item["research_meta"])
569 processed_item["triggered_by"] = meta.get(
570 "triggered_by", "subscription"
571 )
572 # Add headline and topics from metadata
573 processed_item["headline"] = meta.get(
574 "generated_headline", "[No headline]"
575 )
576 processed_item["topics"] = meta.get("generated_topics", [])
577 except Exception:
578 processed_item["headline"] = "[No headline]"
579 processed_item["topics"] = []
580 else:
581 processed_item["headline"] = "[No headline]"
582 processed_item["topics"] = []
584 processed_history.append(processed_item)
586 return {
587 "subscription": subscription_dict,
588 "history": processed_history,
589 "total_runs": len(processed_history),
590 }
592 except NewsAPIException:
593 # Re-raise our custom exceptions
594 raise
595 except Exception as e:
596 logger.exception("Error getting subscription history")
597 raise DatabaseAccessException("get_subscription_history", str(e))
600def _format_time_ago(timestamp: str) -> str:
601 """Format timestamp as 'X hours ago' string.
603 Raises on unparseable input instead of masking it with a neutral label.
604 ResearchHistory.created_at is a NOT NULL column written as
605 datetime.now(UTC).isoformat() on every insert path, so a value that won't
606 parse means the row is corrupt — not a routine edge case. The only caller
607 (the per-row loop in get_news_feed) already wraps each row in a
608 try/except that logs the failure and skips the row, so a bad timestamp
609 surfaces in the logs and drops that one card rather than rendering it with
610 a misleading "Recently".
611 """
612 from dateutil import parser
614 dt = parser.parse(timestamp)
616 # If dt is naive, assume it's in UTC
617 if dt.tzinfo is None:
618 dt = dt.replace(tzinfo=timezone.utc)
620 now = datetime.now(timezone.utc)
621 diff = now - dt
623 if diff.days > 0:
624 return f"{diff.days} day{'s' if diff.days > 1 else ''} ago"
625 if diff.seconds > 3600:
626 hours = diff.seconds // 3600
627 return f"{hours} hour{'s' if hours > 1 else ''} ago"
628 if diff.seconds > 60:
629 minutes = diff.seconds // 60
630 return f"{minutes} minute{'s' if minutes > 1 else ''} ago"
631 return "Just now"
634def get_subscription(subscription_id: str) -> Optional[Dict[str, Any]]:
635 """
636 Get a single subscription by ID.
638 Args:
639 subscription_id: Subscription identifier
641 Returns:
642 Dictionary with subscription data or None if not found
643 """
644 try:
645 # Get subscription directly from user's encrypted database
646 from ..database.session_context import get_user_db_session
647 from ..database.models.news import NewsSubscription
649 with get_user_db_session() as db_session:
650 subscription = (
651 db_session.query(NewsSubscription)
652 .filter_by(id=subscription_id)
653 .first()
654 )
656 if not subscription:
657 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException
659 # Convert to API format matching the template expectations
660 return {
661 "id": subscription.id,
662 "name": subscription.name or "",
663 "query_or_topic": subscription.query_or_topic,
664 "subscription_type": subscription.subscription_type,
665 "refresh_interval_minutes": subscription.refresh_interval_minutes,
666 "is_active": subscription.status == "active",
667 "status": subscription.status,
668 "folder_id": subscription.folder_id,
669 "model_provider": subscription.model_provider,
670 "model": subscription.model,
671 "search_strategy": subscription.search_strategy,
672 "custom_endpoint": subscription.custom_endpoint,
673 "search_engine": subscription.search_engine,
674 "search_iterations": subscription.search_iterations or 3,
675 "questions_per_iteration": subscription.questions_per_iteration
676 or 5,
677 "created_at": subscription.created_at.isoformat()
678 if subscription.created_at
679 else None,
680 "updated_at": subscription.updated_at.isoformat()
681 if subscription.updated_at
682 else None,
683 }
685 except NewsAPIException:
686 # Re-raise our custom exceptions
687 raise
688 except Exception as e:
689 logger.exception(f"Error getting subscription {subscription_id}")
690 raise DatabaseAccessException("get_subscription", str(e))
693def get_subscriptions(user_id: str) -> Dict[str, Any]:
694 """
695 Get all subscriptions for a user.
697 Args:
698 user_id: User identifier
700 Returns:
701 Dictionary with subscriptions list
702 """
703 try:
704 # Get subscriptions directly from user's encrypted database
705 from ..database.session_context import get_user_db_session
706 from ..database.models import ResearchHistory
707 from ..database.models.news import NewsSubscription
708 from sqlalchemy import func
710 sub_list = []
712 with get_user_db_session(user_id) as db_session:
713 # Query all subscriptions for this user
714 subscriptions = db_session.query(NewsSubscription).all()
716 for sub in subscriptions:
717 # Count actual research runs for this subscription
718 like_pattern = f'%"subscription_id": "{sub.id}"%'
719 total_runs = (
720 db_session.query(func.count(ResearchHistory.id))
721 .filter(ResearchHistory.research_meta.like(like_pattern))
722 .scalar()
723 or 0
724 )
726 # Convert ORM object to API format
727 sub_dict = {
728 "id": sub.id,
729 "query": sub.query_or_topic,
730 "type": sub.subscription_type,
731 "refresh_minutes": sub.refresh_interval_minutes,
732 "created_at": sub.created_at.isoformat()
733 if sub.created_at
734 else None,
735 "next_refresh": sub.next_refresh.isoformat()
736 if sub.next_refresh
737 else None,
738 "last_refreshed": sub.last_refresh.isoformat()
739 if sub.last_refresh
740 else None,
741 "is_active": sub.status == "active",
742 "total_runs": total_runs, # Use actual count from research_history
743 "name": sub.name or "",
744 "folder_id": sub.folder_id,
745 }
746 sub_list.append(sub_dict)
748 return {"subscriptions": sub_list, "total": len(sub_list)}
750 except Exception as e:
751 logger.exception("Error getting subscriptions")
752 raise DatabaseAccessException("get_subscriptions", str(e))
755def update_subscription(
756 subscription_id: str, data: Dict[str, Any]
757) -> Dict[str, Any]:
758 """
759 Update an existing subscription.
761 Args:
762 subscription_id: Subscription identifier
763 data: Dictionary with fields to update
765 Returns:
766 Dictionary with updated subscription data
767 """
768 try:
769 from ..database.session_context import get_user_db_session
770 from ..database.models.news import NewsSubscription
771 from datetime import datetime, timedelta
773 with get_user_db_session() as db_session:
774 # Get existing subscription
775 subscription = (
776 db_session.query(NewsSubscription)
777 .filter_by(id=subscription_id)
778 .first()
779 )
780 if not subscription:
781 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException
783 # Update fields
784 if "name" in data:
785 subscription.name = data["name"]
786 if "query_or_topic" in data:
787 subscription.query_or_topic = data["query_or_topic"]
788 if "subscription_type" in data: 788 ↛ 789line 788 didn't jump to line 789 because the condition on line 788 was never true
789 subscription.subscription_type = data["subscription_type"]
790 if "refresh_interval_minutes" in data:
791 old_interval = subscription.refresh_interval_minutes
792 subscription.refresh_interval_minutes = data[
793 "refresh_interval_minutes"
794 ]
795 # Recalculate next_refresh if interval changed
796 if old_interval != subscription.refresh_interval_minutes: 796 ↛ 800line 796 didn't jump to line 800 because the condition on line 796 was always true
797 subscription.next_refresh = datetime.now(UTC) + timedelta(
798 minutes=subscription.refresh_interval_minutes
799 )
800 if "is_active" in data:
801 subscription.status = (
802 "active" if data["is_active"] else "paused"
803 )
804 if "status" in data: 804 ↛ 805line 804 didn't jump to line 805 because the condition on line 804 was never true
805 subscription.status = data["status"]
806 if "folder_id" in data:
807 subscription.folder_id = data["folder_id"]
808 if "model_provider" in data:
809 subscription.model_provider = normalize_provider(
810 data["model_provider"]
811 )
812 if "model" in data:
813 subscription.model = data["model"]
814 if "search_strategy" in data: 814 ↛ 815line 814 didn't jump to line 815 because the condition on line 814 was never true
815 subscription.search_strategy = data["search_strategy"]
816 if "custom_endpoint" in data: 816 ↛ 817line 816 didn't jump to line 817 because the condition on line 816 was never true
817 subscription.custom_endpoint = data["custom_endpoint"]
818 if "search_engine" in data:
819 subscription.search_engine = data["search_engine"]
820 if "search_iterations" in data:
821 subscription.search_iterations = data["search_iterations"]
822 if "questions_per_iteration" in data:
823 subscription.questions_per_iteration = data[
824 "questions_per_iteration"
825 ]
827 # Update timestamp
828 subscription.updated_at = datetime.now(UTC)
830 # Commit changes
831 db_session.commit()
833 # Notify scheduler about updated subscription
834 _notify_scheduler_about_subscription_change("updated")
836 # Convert to API format
837 return {
838 "status": "success",
839 "subscription": {
840 "id": subscription.id,
841 "name": subscription.name or "",
842 "query_or_topic": subscription.query_or_topic,
843 "subscription_type": subscription.subscription_type,
844 "refresh_interval_minutes": subscription.refresh_interval_minutes,
845 "is_active": subscription.status == "active",
846 "status": subscription.status,
847 "folder_id": subscription.folder_id,
848 "model_provider": subscription.model_provider,
849 "model": subscription.model,
850 "search_strategy": subscription.search_strategy,
851 "custom_endpoint": subscription.custom_endpoint,
852 "search_engine": subscription.search_engine,
853 "search_iterations": subscription.search_iterations or 3,
854 "questions_per_iteration": subscription.questions_per_iteration
855 or 5,
856 },
857 }
859 except NewsAPIException:
860 # Re-raise our custom exceptions
861 raise
862 except Exception as e:
863 logger.exception("Error updating subscription")
864 raise SubscriptionUpdateException(subscription_id, str(e))
867def create_subscription(
868 user_id: str,
869 query: str,
870 subscription_type: str = "search",
871 refresh_minutes: Optional[int] = None,
872 source_research_id: Optional[str] = None,
873 model_provider: Optional[str] = None,
874 model: Optional[str] = None,
875 search_strategy: Optional[str] = None,
876 custom_endpoint: Optional[str] = None,
877 name: Optional[str] = None,
878 folder_id: Optional[str] = None,
879 is_active: bool = True,
880 search_engine: Optional[str] = None,
881 search_iterations: Optional[int] = None,
882 questions_per_iteration: Optional[int] = None,
883) -> Dict[str, Any]:
884 """
885 Create a new subscription for user.
887 Args:
888 user_id: User identifier
889 query: Search query or topic
890 subscription_type: "search" or "topic"
891 refresh_minutes: Refresh interval in minutes
893 Returns:
894 Dictionary with subscription details
895 """
896 try:
897 from ..database.session_context import get_user_db_session
898 from ..database.models.news import NewsSubscription
899 from datetime import datetime, timedelta
900 import uuid
902 # Get default refresh interval from settings if not provided
903 # NOTE: This API function accesses the settings DB for convenience when used
904 # within the Flask application context. For programmatic API access outside
905 # the web context, callers should provide refresh_minutes explicitly to avoid
906 # dependency on the settings database being initialized.
908 with get_user_db_session(user_id) as db_session:
909 if refresh_minutes is None:
910 try:
911 from ..utilities.db_utils import get_settings_manager
913 settings_manager = get_settings_manager(db_session, user_id)
914 refresh_minutes = settings_manager.get_setting(
915 "news.subscription.refresh_minutes", 240
916 )
917 except (ImportError, AttributeError, TypeError):
918 # Fallback for when settings DB is not available (e.g., programmatic API usage)
919 logger.debug(
920 "Settings manager not available, using default refresh_minutes"
921 )
922 refresh_minutes = 240 # Default to 4 hours
923 # Create new subscription
924 subscription = NewsSubscription(
925 id=str(uuid.uuid4()),
926 name=name,
927 query_or_topic=query,
928 subscription_type=subscription_type,
929 refresh_interval_minutes=refresh_minutes,
930 status="active" if is_active else "paused",
931 model_provider=normalize_provider(model_provider),
932 model=model,
933 search_strategy=search_strategy or "news_aggregation",
934 custom_endpoint=custom_endpoint,
935 folder_id=folder_id,
936 search_engine=search_engine,
937 search_iterations=search_iterations,
938 questions_per_iteration=questions_per_iteration,
939 created_at=datetime.now(UTC),
940 updated_at=datetime.now(UTC),
941 last_refresh=None,
942 next_refresh=datetime.now(UTC)
943 + timedelta(minutes=refresh_minutes),
944 source_id=source_research_id,
945 )
947 # Add to database
948 db_session.add(subscription)
949 db_session.commit()
951 # Notify scheduler about new subscription
952 _notify_scheduler_about_subscription_change("created", user_id)
954 return {
955 "status": "success",
956 "subscription_id": subscription.id,
957 "type": subscription_type,
958 "query": query,
959 "refresh_minutes": refresh_minutes,
960 }
962 except Exception as e:
963 logger.exception("Error creating subscription")
964 raise SubscriptionCreationException(
965 str(e), {"query": query, "type": subscription_type}
966 )
969def delete_subscription(subscription_id: str) -> Dict[str, Any]:
970 """
971 Delete a subscription.
973 Args:
974 subscription_id: ID of subscription to delete
976 Returns:
977 Dictionary with status
978 """
979 try:
980 from ..database.session_context import get_user_db_session
981 from ..database.models.news import NewsSubscription
983 with get_user_db_session() as db_session:
984 subscription = (
985 db_session.query(NewsSubscription)
986 .filter_by(id=subscription_id)
987 .first()
988 )
989 if subscription:
990 db_session.delete(subscription)
991 db_session.commit()
993 # Notify scheduler about deleted subscription
994 _notify_scheduler_about_subscription_change("deleted")
996 return {"status": "success", "deleted": subscription_id}
997 raise SubscriptionNotFoundException(subscription_id) # noqa: TRY301 — re-raised by except NewsAPIException
998 except NewsAPIException:
999 # Re-raise our custom exceptions
1000 raise
1001 except Exception as e:
1002 logger.exception("Error deleting subscription")
1003 raise SubscriptionDeletionException(subscription_id, str(e))
1006def get_votes_for_cards(card_ids: list, user_id: str) -> Dict[str, Any]:
1007 """
1008 Get vote counts and user's votes for multiple news cards.
1010 Args:
1011 card_ids: List of card IDs to get votes for
1012 user_id: User identifier (not used - per-user database)
1014 Returns:
1015 Dictionary with vote information for each card
1016 """
1017 from flask import session as flask_session, has_request_context
1018 from ..database.models.news import UserRating, RatingType
1019 from ..database.session_context import get_user_db_session
1021 # Resolve username before try block
1022 if not has_request_context(): 1022 ↛ 1029line 1022 didn't jump to line 1029 because the condition on line 1022 was always true
1023 # If called outside of request context (e.g., in tests), use user_id directly
1024 username = user_id if user_id else None
1025 if not username:
1026 raise ValueError("No username provided and no request context")
1027 else:
1028 # Get username from session
1029 username = flask_session.get("username")
1030 if not username:
1031 raise ValueError("No username in session")
1033 try:
1034 # Get database session
1035 with get_user_db_session(username) as db:
1036 results = {}
1038 for card_id in card_ids:
1039 # Get user's vote for this card
1040 user_vote = (
1041 db.query(UserRating)
1042 .filter_by(
1043 card_id=card_id, rating_type=RatingType.RELEVANCE
1044 )
1045 .first()
1046 )
1048 # Count total votes for this card
1049 upvotes = (
1050 db.query(UserRating)
1051 .filter_by(
1052 card_id=card_id,
1053 rating_type=RatingType.RELEVANCE,
1054 rating_value="up",
1055 )
1056 .count()
1057 )
1059 downvotes = (
1060 db.query(UserRating)
1061 .filter_by(
1062 card_id=card_id,
1063 rating_type=RatingType.RELEVANCE,
1064 rating_value="down",
1065 )
1066 .count()
1067 )
1069 results[card_id] = {
1070 "upvotes": upvotes,
1071 "downvotes": downvotes,
1072 "user_vote": user_vote.rating_value if user_vote else None,
1073 }
1075 return {"success": True, "votes": results}
1077 except Exception:
1078 logger.exception("Error getting votes for cards")
1079 raise
1082def submit_feedback(card_id: str, user_id: str, vote: str) -> Dict[str, Any]:
1083 """
1084 Submit feedback (vote) for a news card.
1086 Args:
1087 card_id: ID of the news card
1088 user_id: User identifier (not used - per-user database)
1089 vote: "up" or "down"
1091 Returns:
1092 Dictionary with updated vote counts
1093 """
1094 from flask import session as flask_session, has_request_context
1095 from sqlalchemy_utc import utcnow
1096 from ..database.models.news import UserRating, RatingType
1097 from ..database.session_context import get_user_db_session
1099 # Validate vote value
1100 if vote not in ["up", "down"]:
1101 raise ValueError(f"Invalid vote type: {vote}")
1103 # Resolve username before try block
1104 if not has_request_context(): 1104 ↛ 1111line 1104 didn't jump to line 1111 because the condition on line 1104 was always true
1105 # If called outside of request context (e.g., in tests), use user_id directly
1106 username = user_id if user_id else None
1107 if not username:
1108 raise ValueError("No username provided and no request context")
1109 else:
1110 # Get username from session
1111 username = flask_session.get("username")
1112 if not username:
1113 raise ValueError("No username in session")
1115 try:
1116 # Get database session
1117 with get_user_db_session(username) as db:
1118 # We don't check if the card exists in the database since news items
1119 # are generated dynamically and may not be stored as NewsCard entries
1121 # Check if user already voted on this card
1122 existing_rating = (
1123 db.query(UserRating)
1124 .filter_by(card_id=card_id, rating_type=RatingType.RELEVANCE)
1125 .first()
1126 )
1128 if existing_rating:
1129 # Update existing vote
1130 existing_rating.rating_value = vote
1131 existing_rating.created_at = utcnow()
1132 else:
1133 # Create new rating
1134 new_rating = UserRating(
1135 card_id=card_id,
1136 rating_type=RatingType.RELEVANCE,
1137 rating_value=vote,
1138 )
1139 db.add(new_rating)
1141 db.commit()
1143 # Count total votes for this card
1144 upvotes = (
1145 db.query(UserRating)
1146 .filter_by(
1147 card_id=card_id,
1148 rating_type=RatingType.RELEVANCE,
1149 rating_value="up",
1150 )
1151 .count()
1152 )
1154 downvotes = (
1155 db.query(UserRating)
1156 .filter_by(
1157 card_id=card_id,
1158 rating_type=RatingType.RELEVANCE,
1159 rating_value="down",
1160 )
1161 .count()
1162 )
1164 logger.info(
1165 f"Feedback submitted for card {card_id}: {vote} (up: {upvotes}, down: {downvotes})"
1166 )
1168 return {
1169 "success": True,
1170 "card_id": card_id,
1171 "vote": vote,
1172 "upvotes": upvotes,
1173 "downvotes": downvotes,
1174 }
1176 except Exception:
1177 logger.exception(f"Error submitting feedback for card {card_id}")
1178 raise
1181def research_news_item(card_id: str, depth: str = "quick") -> Dict[str, Any]:
1182 """
1183 Perform deeper research on a news item.
1185 Args:
1186 card_id: ID of the news card to research
1187 depth: Research depth - "quick", "detailed", or "report"
1189 Returns:
1190 Dictionary with research results
1191 """
1192 # TODO: Implement with per-user database for cards
1193 logger.warning(
1194 "research_news_item not yet implemented with per-user databases"
1195 )
1196 raise NotImplementedException("research_news_item")
1199def save_news_preferences(
1200 user_id: str, preferences: Dict[str, Any]
1201) -> Dict[str, Any]:
1202 """
1203 Save user preferences for news.
1205 Args:
1206 user_id: User identifier
1207 preferences: Dictionary of preferences to save
1209 Returns:
1210 Dictionary with status and message
1211 """
1212 # TODO: Implement with per-user database for preferences
1213 logger.warning(
1214 "save_news_preferences not yet implemented with per-user databases"
1215 )
1216 raise NotImplementedException("save_news_preferences")
1219def get_news_categories() -> Dict[str, Any]:
1220 """
1221 Get available news categories with counts.
1223 Returns:
1224 Dictionary with categories and statistics
1225 """
1226 # TODO: Implement with per-user database for categories
1227 logger.warning(
1228 "get_news_categories not yet implemented with per-user databases"
1229 )
1230 raise NotImplementedException("get_news_categories")