Coverage for src / local_deep_research / news / core / news_analyzer.py: 27%
161 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2News analyzer that produces modular output components.
3Breaks down news analysis into separate, reusable pieces.
4"""
6from typing import List, Dict, Any, Optional
7from datetime import datetime, timezone, UTC
8from loguru import logger
10from .utils import generate_card_id
11from ..utils.topic_generator import generate_topics
12from ...config.llm_config import get_llm
15class NewsAnalyzer:
16 """
17 Analyzes news search results to produce modular components.
19 Instead of one big analysis, produces:
20 - News items table
21 - Big picture summary
22 - Watch for (next 24-48h)
23 - Pattern recognition
24 - Extractable topics for subscriptions
25 """
27 def __init__(
28 self,
29 llm_client: Optional[Any] = None,
30 ):
31 """
32 Initialize the news analyzer.
34 Args:
35 llm_client: LLM client for analysis
36 """
37 self.llm_client = llm_client or get_llm()
39 def analyze_news(
40 self, search_results: List[Dict[str, Any]]
41 ) -> Dict[str, Any]:
42 """
43 Analyze news search results into modular components.
45 Args:
46 search_results: Raw search results
48 Returns:
49 Dictionary with modular analysis components
50 """
51 if not search_results: 51 ↛ 54line 51 didn't jump to line 54 because the condition on line 51 was always true
52 return self._empty_analysis()
54 try:
55 # Step 1: Extract news items table
56 logger.debug("Extracting news items")
57 news_items = self.extract_news_items(search_results)
59 # Step 2: Generate overview components (separate LLM calls for modularity)
60 logger.debug("Generating analysis components")
61 components = {
62 "items": news_items,
63 "item_count": len(news_items),
64 "search_result_count": len(search_results),
65 "timestamp": datetime.now(timezone.utc).isoformat(),
66 }
68 if news_items:
69 # Each component is generated independently
70 components["big_picture"] = self.generate_big_picture(
71 news_items
72 )
73 components["watch_for"] = self.generate_watch_for(news_items)
74 components["patterns"] = self.generate_patterns(news_items)
75 components["topics"] = self.extract_topics(news_items)
76 components["categories"] = self._count_categories(news_items)
77 components["impact_summary"] = self._summarize_impact(
78 news_items
79 )
81 logger.info(
82 f"News analysis complete: {len(news_items)} items, {len(components.get('topics', []))} topics"
83 )
84 return components
86 except Exception:
87 logger.exception("Error analyzing news")
88 return self._empty_analysis()
90 def extract_news_items(
91 self, search_results: List[Dict[str, Any]], max_items: int = 10
92 ) -> List[Dict[str, Any]]:
93 """
94 Extract structured news items from search results.
96 Args:
97 search_results: Raw search results
98 max_items: Maximum number of items to extract
100 Returns:
101 List of structured news items
102 """
103 if not self.llm_client:
104 logger.warning("No LLM client available for news extraction")
105 return []
107 # Prepare search results for LLM
108 snippets = self._prepare_snippets(
109 search_results # Use all results, let LLM handle token limits
110 )
112 prompt = f"""
113Extract up to {max_items} important news stories from these search results.
114Today's date: {datetime.now(UTC).strftime("%B %d, %Y")}
116{snippets}
118For each news story, extract:
1191. headline - 8 words max describing the story
1202. category - A descriptive category for this news (be specific, not limited to generic categories)
1213. summary - 3 clear sentences about what happened
1224. impact_score - 1-10 based on significance
1235. source_url - URL from the search results
1246. entities - people, places, organizations mentioned
1257. is_developing - true/false if story is still developing
1268. time_ago - when it happened (2 hours ago, yesterday, etc)
128Return as JSON array of news items.
129Focus on genuinely newsworthy stories.
130"""
132 try:
133 response = self.llm_client.invoke(prompt)
134 content = (
135 response.content
136 if hasattr(response, "content")
137 else str(response)
138 )
140 # Parse JSON response
141 import json
142 import re
144 # Extract JSON array
145 json_match = re.search(r"\[.*\]", content, re.DOTALL)
146 if json_match:
147 news_items = json.loads(json_match.group())
149 # Validate and clean items
150 valid_items = []
151 for item in news_items[:max_items]:
152 if self._validate_news_item(item):
153 # Generate ID
154 item["id"] = generate_card_id()
155 valid_items.append(item)
157 return valid_items
159 except Exception:
160 logger.exception("Error extracting news items")
162 return []
164 def generate_big_picture(self, news_items: List[Dict[str, Any]]) -> str:
165 """
166 Generate the big picture summary of how events connect.
168 Args:
169 news_items: Extracted news items
171 Returns:
172 Big picture summary (3-4 sentences)
173 """
174 if not self.llm_client or not news_items:
175 return ""
177 # Prepare news summaries
178 summaries = "\n".join(
179 [
180 f"- {item['headline']}: {item.get('summary', '')[:100]}..."
181 for item in news_items[:10]
182 ]
183 )
185 prompt = f"""
186Based on these news stories, write THE BIG PICTURE summary.
187Connect the dots between events. What's the larger narrative?
188Write 3-4 sentences maximum.
190News stories:
191{summaries}
193THE BIG PICTURE:"""
195 try:
196 response = self.llm_client.invoke(prompt)
197 content = (
198 response.content
199 if hasattr(response, "content")
200 else str(response)
201 )
202 return content.strip()
203 except Exception:
204 logger.exception("Error generating big picture")
205 return ""
207 def generate_watch_for(self, news_items: List[Dict[str, Any]]) -> List[str]:
208 """
209 Generate list of developments to watch for in next 24-48 hours.
211 Args:
212 news_items: Extracted news items
214 Returns:
215 List of bullet points
216 """
217 if not self.llm_client or not news_items:
218 return []
220 # Focus on developing stories
221 developing = [
222 item for item in news_items if item.get("is_developing", False)
223 ]
224 if not developing:
225 developing = news_items[:5]
227 summaries = "\n".join(
228 [
229 f"- {item['headline']}: {item.get('summary', '')[:100]}..."
230 for item in developing
231 ]
232 )
234 prompt = f"""
235Based on these developing news stories, what should we watch for in the next 24-48 hours?
236Write 3-5 specific, actionable items.
238Developing stories:
239{summaries}
241WATCH FOR:
242-"""
244 try:
245 response = self.llm_client.invoke(prompt)
246 content = (
247 response.content
248 if hasattr(response, "content")
249 else str(response)
250 )
252 # Parse bullet points
253 lines = content.strip().split("\n")
254 watch_items = []
255 for line in lines:
256 line = line.strip()
257 if line and line not in ["WATCH FOR:", "Watch for:"]:
258 # Remove bullet markers
259 line = line.lstrip("-•* ")
260 if line:
261 watch_items.append(line)
263 return watch_items[:5]
265 except Exception:
266 logger.exception("Error generating watch items")
267 return []
269 def generate_patterns(self, news_items: List[Dict[str, Any]]) -> str:
270 """
271 Identify emerging patterns from today's news.
273 Args:
274 news_items: Extracted news items
276 Returns:
277 Pattern recognition summary
278 """
279 if not self.llm_client or not news_items:
280 return ""
282 # Group by category
283 by_category = {}
284 for item in news_items:
285 cat = item.get("category", "Other")
286 if cat not in by_category:
287 by_category[cat] = []
288 by_category[cat].append(item["headline"])
290 category_summary = "\n".join(
291 [
292 f"{cat}: {len(items)} stories"
293 for cat, items in by_category.items()
294 ]
295 )
297 prompt = f"""
298Identify emerging patterns from today's news distribution:
300{category_summary}
302Top headlines:
303{chr(10).join([f"- {item['headline']}" for item in news_items[:10]])}
305PATTERN RECOGNITION (1-2 sentences):"""
307 try:
308 response = self.llm_client.invoke(prompt)
309 content = (
310 response.content
311 if hasattr(response, "content")
312 else str(response)
313 )
314 return content.strip()
315 except Exception:
316 logger.exception("Error generating patterns")
317 return ""
319 def extract_topics(
320 self, news_items: List[Dict[str, Any]]
321 ) -> List[Dict[str, Any]]:
322 """
323 Extract subscribable topics from news items.
325 Args:
326 news_items: Extracted news items
328 Returns:
329 List of topic dictionaries with metadata
330 """
331 topics = []
333 # Use topic generator to extract from each item
334 for item in news_items:
335 # Use topic generator with headline as query and summary as findings
336 headline = item.get("headline", "")
337 summary = item.get("summary", "")
338 category = item.get("category", "")
340 extracted = generate_topics(
341 query=headline,
342 findings=summary,
343 category=category,
344 max_topics=3,
345 )
347 for topic in extracted:
348 topics.append(
349 {
350 "name": topic,
351 "source_item_id": item.get("id"),
352 "source_headline": item.get("headline"),
353 "category": item.get("category"),
354 "impact_score": item.get("impact_score", 5),
355 }
356 )
358 # Deduplicate and sort by frequency
359 topic_counts = {}
360 topic_metadata = {}
362 for topic_info in topics:
363 name = topic_info["name"]
364 if name not in topic_counts:
365 topic_counts[name] = 0
366 topic_metadata[name] = topic_info
367 topic_counts[name] += 1
369 # Keep highest impact score
370 if (
371 topic_info["impact_score"]
372 > topic_metadata[name]["impact_score"]
373 ):
374 topic_metadata[name] = topic_info
376 # Create final topic list
377 final_topics = []
378 for topic, count in sorted(
379 topic_counts.items(), key=lambda x: x[1], reverse=True
380 ):
381 metadata = topic_metadata[topic]
382 metadata["frequency"] = count
383 metadata["query"] = f"{topic} latest developments news"
384 final_topics.append(metadata)
386 return final_topics[:10] # Top 10 topics
388 def _prepare_snippets(self, search_results: List[Dict[str, Any]]) -> str:
389 """Prepare search result snippets for LLM processing."""
390 snippets = []
391 for i, result in enumerate(search_results):
392 snippet = f"[{i + 1}] "
393 if result.get("title"): 393 ↛ 395line 393 didn't jump to line 395 because the condition on line 393 was always true
394 snippet += f"Title: {result['title']}\n"
395 if result.get("url"): 395 ↛ 397line 395 didn't jump to line 397 because the condition on line 395 was always true
396 snippet += f"URL: {result['url']}\n"
397 if result.get("snippet"):
398 snippet += f"Snippet: {result['snippet'][:200]}...\n"
399 elif result.get("content"): 399 ↛ 402line 399 didn't jump to line 402 because the condition on line 399 was always true
400 snippet += f"Content: {result['content'][:200]}...\n"
402 snippets.append(snippet)
404 return "\n".join(snippets)
406 def _validate_news_item(self, item: Dict[str, Any]) -> bool:
407 """Validate that a news item has required fields."""
408 required = ["headline", "summary"]
409 return all(field in item and item[field] for field in required)
411 def _count_categories(
412 self, news_items: List[Dict[str, Any]]
413 ) -> Dict[str, int]:
414 """Count items by category."""
415 counts = {}
416 for item in news_items:
417 cat = item.get("category", "Other")
418 counts[cat] = counts.get(cat, 0) + 1
419 return counts
421 def _summarize_impact(
422 self, news_items: List[Dict[str, Any]]
423 ) -> Dict[str, Any]:
424 """Summarize impact scores."""
425 if not news_items:
426 return {"average": 0, "high_impact_count": 0}
428 scores = [item.get("impact_score", 5) for item in news_items]
429 return {
430 "average": sum(scores) / len(scores),
431 "high_impact_count": len([s for s in scores if s >= 8]),
432 "max": max(scores),
433 "min": min(scores),
434 }
436 def _empty_analysis(self) -> Dict[str, Any]:
437 """Return empty analysis structure."""
438 return {
439 "items": [],
440 "item_count": 0,
441 "big_picture": "",
442 "watch_for": [],
443 "patterns": "",
444 "topics": [],
445 "categories": {},
446 "impact_summary": {"average": 0, "high_impact_count": 0},
447 "timestamp": datetime.now(timezone.utc).isoformat(),
448 }