Coverage for src / local_deep_research / news / core / news_analyzer.py: 99%

165 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2News analyzer that produces modular output components. 

3Breaks down news analysis into separate, reusable pieces. 

4""" 

5 

6from typing import List, Dict, Any, Optional, cast 

7from datetime import datetime, timezone, UTC 

8from loguru import logger 

9 

10from .utils import generate_card_id 

11from ..utils.topic_generator import generate_topics 

12from ...config.llm_config import get_llm 

13from ...utilities.json_utils import extract_json, get_llm_response_text 

14 

15 

16class NewsAnalyzer: 

17 """ 

18 Analyzes news search results to produce modular components. 

19 

20 Instead of one big analysis, produces: 

21 - News items table 

22 - Big picture summary 

23 - Watch for (next 24-48h) 

24 - Pattern recognition 

25 - Extractable topics for subscriptions 

26 """ 

27 

28 def __init__( 

29 self, 

30 llm_client: Optional[Any] = None, 

31 ): 

32 """ 

33 Initialize the news analyzer. 

34 

35 Args: 

36 llm_client: LLM client for analysis 

37 """ 

38 self._owns_llm = llm_client is None 

39 self.llm_client = llm_client or get_llm() 

40 

41 def close(self) -> None: 

42 """Close the LLM client if this instance created it.""" 

43 from ...utilities.resource_utils import safe_close 

44 

45 if self._owns_llm: 

46 safe_close(self.llm_client, "news analyzer LLM") 

47 

48 def analyze_news( 

49 self, search_results: List[Dict[str, Any]] 

50 ) -> Dict[str, Any]: 

51 """ 

52 Analyze news search results into modular components. 

53 

54 Args: 

55 search_results: Raw search results 

56 

57 Returns: 

58 Dictionary with modular analysis components 

59 """ 

60 if not search_results: 

61 return self._empty_analysis() 

62 

63 try: 

64 # Step 1: Extract news items table 

65 logger.debug("Extracting news items") 

66 news_items = self.extract_news_items(search_results) 

67 

68 # Step 2: Generate overview components (separate LLM calls for modularity) 

69 logger.debug("Generating analysis components") 

70 components = { 

71 "items": news_items, 

72 "item_count": len(news_items), 

73 "search_result_count": len(search_results), 

74 "timestamp": datetime.now(timezone.utc).isoformat(), 

75 } 

76 

77 if news_items: 

78 # Each component is generated independently 

79 components["big_picture"] = self.generate_big_picture( 

80 news_items 

81 ) 

82 components["watch_for"] = self.generate_watch_for(news_items) 

83 components["patterns"] = self.generate_patterns(news_items) 

84 components["topics"] = self.extract_topics(news_items) 

85 components["categories"] = self._count_categories(news_items) 

86 components["impact_summary"] = self._summarize_impact( 

87 news_items 

88 ) 

89 

90 topics_list = cast(List[Any], components.get("topics", [])) 

91 logger.info( 

92 f"News analysis complete: {len(news_items)} items, {len(topics_list)} topics" 

93 ) 

94 return components 

95 

96 except Exception: 

97 logger.exception("Error analyzing news") 

98 return self._empty_analysis() 

99 

100 def extract_news_items( 

101 self, search_results: List[Dict[str, Any]], max_items: int = 10 

102 ) -> List[Dict[str, Any]]: 

103 """ 

104 Extract structured news items from search results. 

105 

106 Args: 

107 search_results: Raw search results 

108 max_items: Maximum number of items to extract 

109 

110 Returns: 

111 List of structured news items 

112 """ 

113 if not self.llm_client: 

114 logger.warning("No LLM client available for news extraction") 

115 return [] 

116 

117 # Prepare search results for LLM 

118 snippets = self._prepare_snippets( 

119 search_results # Use all results, let LLM handle token limits 

120 ) 

121 

122 prompt = f""" 

123Extract up to {max_items} important news stories from these search results. 

124Today's date: {datetime.now(UTC).strftime("%B %d, %Y")} 

125 

126{snippets} 

127 

128For each news story, extract: 

1291. headline - 8 words max describing the story 

1302. category - A descriptive category for this news (be specific, not limited to generic categories) 

1313. summary - 3 clear sentences about what happened 

1324. impact_score - 1-10 based on significance 

1335. source_url - URL from the search results 

1346. entities - people, places, organizations mentioned 

1357. is_developing - true/false if story is still developing 

1368. time_ago - when it happened (2 hours ago, yesterday, etc) 

137 

138Return as JSON array of news items. 

139Focus on genuinely newsworthy stories. 

140""" 

141 

142 try: 

143 response = self.llm_client.invoke(prompt) 

144 content = get_llm_response_text(response) 

145 

146 # Parse JSON response 

147 news_items = extract_json(content, expected_type=list) 

148 if news_items is not None: 

149 # Validate and clean items 

150 valid_items = [] 

151 for item in news_items[:max_items]: 

152 if self._validate_news_item(item): 

153 # Generate ID 

154 item["id"] = generate_card_id() 

155 valid_items.append(item) 

156 

157 return valid_items 

158 

159 except Exception: 

160 logger.exception("Error extracting news items") 

161 

162 return [] 

163 

164 def generate_big_picture(self, news_items: List[Dict[str, Any]]) -> str: 

165 """ 

166 Generate the big picture summary of how events connect. 

167 

168 Args: 

169 news_items: Extracted news items 

170 

171 Returns: 

172 Big picture summary (3-4 sentences) 

173 """ 

174 if not self.llm_client or not news_items: 

175 return "" 

176 

177 # Prepare news summaries 

178 summaries = "\n".join( 

179 [ 

180 f"- {item['headline']}: {item.get('summary', '')[:100]}..." 

181 for item in news_items[:10] 

182 ] 

183 ) 

184 

185 prompt = f""" 

186Based on these news stories, write THE BIG PICTURE summary. 

187Connect the dots between events. What's the larger narrative? 

188Write 3-4 sentences maximum. 

189 

190News stories: 

191{summaries} 

192 

193THE BIG PICTURE:""" 

194 

195 try: 

196 response = self.llm_client.invoke(prompt) 

197 content = ( 

198 response.content 

199 if hasattr(response, "content") 

200 else str(response) 

201 ) 

202 return content.strip() 

203 except Exception: 

204 logger.exception("Error generating big picture") 

205 return "" 

206 

207 def generate_watch_for(self, news_items: List[Dict[str, Any]]) -> List[str]: 

208 """ 

209 Generate list of developments to watch for in next 24-48 hours. 

210 

211 Args: 

212 news_items: Extracted news items 

213 

214 Returns: 

215 List of bullet points 

216 """ 

217 if not self.llm_client or not news_items: 

218 return [] 

219 

220 # Focus on developing stories 

221 developing = [ 

222 item for item in news_items if item.get("is_developing", False) 

223 ] 

224 if not developing: 

225 developing = news_items[:5] 

226 

227 summaries = "\n".join( 

228 [ 

229 f"- {item['headline']}: {item.get('summary', '')[:100]}..." 

230 for item in developing 

231 ] 

232 ) 

233 

234 prompt = f""" 

235Based on these developing news stories, what should we watch for in the next 24-48 hours? 

236Write 3-5 specific, actionable items. 

237 

238Developing stories: 

239{summaries} 

240 

241WATCH FOR: 

242-""" 

243 

244 try: 

245 response = self.llm_client.invoke(prompt) 

246 content = ( 

247 response.content 

248 if hasattr(response, "content") 

249 else str(response) 

250 ) 

251 

252 # Parse bullet points 

253 lines = content.strip().split("\n") 

254 watch_items = [] 

255 for line in lines: 

256 line = line.strip() 

257 if line and line not in ["WATCH FOR:", "Watch for:"]: 

258 # Remove bullet markers 

259 line = line.lstrip("-•* ") 

260 if line: 260 ↛ 255line 260 didn't jump to line 255 because the condition on line 260 was always true

261 watch_items.append(line) 

262 

263 return watch_items[:5] 

264 

265 except Exception: 

266 logger.exception("Error generating watch items") 

267 return [] 

268 

269 def generate_patterns(self, news_items: List[Dict[str, Any]]) -> str: 

270 """ 

271 Identify emerging patterns from today's news. 

272 

273 Args: 

274 news_items: Extracted news items 

275 

276 Returns: 

277 Pattern recognition summary 

278 """ 

279 if not self.llm_client or not news_items: 

280 return "" 

281 

282 # Group by category 

283 by_category: Dict[str, List[Any]] = {} 

284 for item in news_items: 

285 cat = item.get("category", "Other") 

286 if cat not in by_category: 

287 by_category[cat] = [] 

288 by_category[cat].append(item["headline"]) 

289 

290 category_summary = "\n".join( 

291 [ 

292 f"{cat}: {len(items)} stories" 

293 for cat, items in by_category.items() 

294 ] 

295 ) 

296 

297 prompt = f""" 

298Identify emerging patterns from today's news distribution: 

299 

300{category_summary} 

301 

302Top headlines: 

303{chr(10).join([f"- {item['headline']}" for item in news_items[:10]])} 

304 

305PATTERN RECOGNITION (1-2 sentences):""" 

306 

307 try: 

308 response = self.llm_client.invoke(prompt) 

309 content = ( 

310 response.content 

311 if hasattr(response, "content") 

312 else str(response) 

313 ) 

314 return content.strip() 

315 except Exception: 

316 logger.exception("Error generating patterns") 

317 return "" 

318 

319 def extract_topics( 

320 self, news_items: List[Dict[str, Any]] 

321 ) -> List[Dict[str, Any]]: 

322 """ 

323 Extract subscribable topics from news items. 

324 

325 Args: 

326 news_items: Extracted news items 

327 

328 Returns: 

329 List of topic dictionaries with metadata 

330 """ 

331 topics = [] 

332 

333 # Use topic generator to extract from each item 

334 for item in news_items: 

335 # Use topic generator with headline as query and summary as findings 

336 headline = item.get("headline", "") 

337 summary = item.get("summary", "") 

338 category = item.get("category", "") 

339 

340 extracted = generate_topics( 

341 query=headline, 

342 findings=summary, 

343 category=category, 

344 max_topics=3, 

345 ) 

346 

347 for topic in extracted: 

348 topics.append( 

349 { 

350 "name": topic, 

351 "source_item_id": item.get("id"), 

352 "source_headline": item.get("headline"), 

353 "category": item.get("category"), 

354 "impact_score": item.get("impact_score", 5), 

355 } 

356 ) 

357 

358 # Deduplicate and sort by frequency 

359 topic_counts = {} 

360 topic_metadata = {} 

361 

362 for topic_info in topics: 

363 name = topic_info["name"] 

364 if name not in topic_counts: 

365 topic_counts[name] = 0 

366 topic_metadata[name] = topic_info 

367 topic_counts[name] += 1 

368 

369 # Keep highest impact score 

370 if ( 

371 topic_info["impact_score"] 

372 > topic_metadata[name]["impact_score"] 

373 ): 

374 topic_metadata[name] = topic_info 

375 

376 # Create final topic list 

377 final_topics = [] 

378 for topic, count in sorted( 

379 topic_counts.items(), key=lambda x: x[1], reverse=True 

380 ): 

381 metadata = topic_metadata[topic] 

382 metadata["frequency"] = count 

383 metadata["query"] = f"{topic} latest developments news" 

384 final_topics.append(metadata) 

385 

386 return final_topics[:10] # Top 10 topics 

387 

388 def _prepare_snippets(self, search_results: List[Dict[str, Any]]) -> str: 

389 """Prepare search result snippets for LLM processing.""" 

390 snippets = [] 

391 for i, result in enumerate(search_results): 

392 snippet = f"[{i + 1}] " 

393 if result.get("title"): 

394 snippet += f"Title: {result['title']}\n" 

395 if result.get("url"): 

396 snippet += f"URL: {result['url']}\n" 

397 if result.get("snippet"): 

398 snippet += f"Snippet: {result['snippet'][:200]}...\n" 

399 elif result.get("content"): 

400 snippet += f"Content: {result['content'][:200]}...\n" 

401 

402 snippets.append(snippet) 

403 

404 return "\n".join(snippets) 

405 

406 def _validate_news_item(self, item: Dict[str, Any]) -> bool: 

407 """Validate that a news item has required fields.""" 

408 required = ["headline", "summary"] 

409 return all(field in item and item[field] for field in required) 

410 

411 def _count_categories( 

412 self, news_items: List[Dict[str, Any]] 

413 ) -> Dict[str, int]: 

414 """Count items by category.""" 

415 counts: Dict[str, int] = {} 

416 for item in news_items: 

417 cat = item.get("category", "Other") 

418 counts[cat] = counts.get(cat, 0) + 1 

419 return counts 

420 

421 def _summarize_impact( 

422 self, news_items: List[Dict[str, Any]] 

423 ) -> Dict[str, Any]: 

424 """Summarize impact scores.""" 

425 if not news_items: 

426 return {"average": 0, "high_impact_count": 0} 

427 

428 scores = [item.get("impact_score", 5) for item in news_items] 

429 return { 

430 "average": sum(scores) / len(scores), 

431 "high_impact_count": len([s for s in scores if s >= 8]), 

432 "max": max(scores), 

433 "min": min(scores), 

434 } 

435 

436 def _empty_analysis(self) -> Dict[str, Any]: 

437 """Return empty analysis structure.""" 

438 return { 

439 "items": [], 

440 "item_count": 0, 

441 "big_picture": "", 

442 "watch_for": [], 

443 "patterns": "", 

444 "topics": [], 

445 "categories": {}, 

446 "impact_summary": {"average": 0, "high_impact_count": 0}, 

447 "timestamp": datetime.now(timezone.utc).isoformat(), 

448 }