Coverage for src / local_deep_research / news / core / news_analyzer.py: 99%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2News analyzer that produces modular output components. 

3Breaks down news analysis into separate, reusable pieces. 

4""" 

5 

6from typing import List, Dict, Any, Optional 

7from datetime import datetime, timezone, UTC 

8from loguru import logger 

9 

10from .utils import generate_card_id 

11from ..utils.topic_generator import generate_topics 

12from ...config.llm_config import get_llm 

13from ...utilities.json_utils import extract_json, get_llm_response_text 

14 

15 

16class NewsAnalyzer: 

17 """ 

18 Analyzes news search results to produce modular components. 

19 

20 Instead of one big analysis, produces: 

21 - News items table 

22 - Big picture summary 

23 - Watch for (next 24-48h) 

24 - Pattern recognition 

25 - Extractable topics for subscriptions 

26 """ 

27 

28 def __init__( 

29 self, 

30 llm_client: Optional[Any] = None, 

31 ): 

32 """ 

33 Initialize the news analyzer. 

34 

35 Args: 

36 llm_client: LLM client for analysis 

37 """ 

38 self.llm_client = llm_client or get_llm() 

39 

40 def analyze_news( 

41 self, search_results: List[Dict[str, Any]] 

42 ) -> Dict[str, Any]: 

43 """ 

44 Analyze news search results into modular components. 

45 

46 Args: 

47 search_results: Raw search results 

48 

49 Returns: 

50 Dictionary with modular analysis components 

51 """ 

52 if not search_results: 

53 return self._empty_analysis() 

54 

55 try: 

56 # Step 1: Extract news items table 

57 logger.debug("Extracting news items") 

58 news_items = self.extract_news_items(search_results) 

59 

60 # Step 2: Generate overview components (separate LLM calls for modularity) 

61 logger.debug("Generating analysis components") 

62 components = { 

63 "items": news_items, 

64 "item_count": len(news_items), 

65 "search_result_count": len(search_results), 

66 "timestamp": datetime.now(timezone.utc).isoformat(), 

67 } 

68 

69 if news_items: 

70 # Each component is generated independently 

71 components["big_picture"] = self.generate_big_picture( 

72 news_items 

73 ) 

74 components["watch_for"] = self.generate_watch_for(news_items) 

75 components["patterns"] = self.generate_patterns(news_items) 

76 components["topics"] = self.extract_topics(news_items) 

77 components["categories"] = self._count_categories(news_items) 

78 components["impact_summary"] = self._summarize_impact( 

79 news_items 

80 ) 

81 

82 logger.info( 

83 f"News analysis complete: {len(news_items)} items, {len(components.get('topics', []))} topics" 

84 ) 

85 return components 

86 

87 except Exception: 

88 logger.exception("Error analyzing news") 

89 return self._empty_analysis() 

90 

91 def extract_news_items( 

92 self, search_results: List[Dict[str, Any]], max_items: int = 10 

93 ) -> List[Dict[str, Any]]: 

94 """ 

95 Extract structured news items from search results. 

96 

97 Args: 

98 search_results: Raw search results 

99 max_items: Maximum number of items to extract 

100 

101 Returns: 

102 List of structured news items 

103 """ 

104 if not self.llm_client: 

105 logger.warning("No LLM client available for news extraction") 

106 return [] 

107 

108 # Prepare search results for LLM 

109 snippets = self._prepare_snippets( 

110 search_results # Use all results, let LLM handle token limits 

111 ) 

112 

113 prompt = f""" 

114Extract up to {max_items} important news stories from these search results. 

115Today's date: {datetime.now(UTC).strftime("%B %d, %Y")} 

116 

117{snippets} 

118 

119For each news story, extract: 

1201. headline - 8 words max describing the story 

1212. category - A descriptive category for this news (be specific, not limited to generic categories) 

1223. summary - 3 clear sentences about what happened 

1234. impact_score - 1-10 based on significance 

1245. source_url - URL from the search results 

1256. entities - people, places, organizations mentioned 

1267. is_developing - true/false if story is still developing 

1278. time_ago - when it happened (2 hours ago, yesterday, etc) 

128 

129Return as JSON array of news items. 

130Focus on genuinely newsworthy stories. 

131""" 

132 

133 try: 

134 response = self.llm_client.invoke(prompt) 

135 content = get_llm_response_text(response) 

136 

137 # Parse JSON response 

138 news_items = extract_json(content, expected_type=list) 

139 if news_items is not None: 

140 # Validate and clean items 

141 valid_items = [] 

142 for item in news_items[:max_items]: 

143 if self._validate_news_item(item): 

144 # Generate ID 

145 item["id"] = generate_card_id() 

146 valid_items.append(item) 

147 

148 return valid_items 

149 

150 except Exception: 

151 logger.exception("Error extracting news items") 

152 

153 return [] 

154 

155 def generate_big_picture(self, news_items: List[Dict[str, Any]]) -> str: 

156 """ 

157 Generate the big picture summary of how events connect. 

158 

159 Args: 

160 news_items: Extracted news items 

161 

162 Returns: 

163 Big picture summary (3-4 sentences) 

164 """ 

165 if not self.llm_client or not news_items: 

166 return "" 

167 

168 # Prepare news summaries 

169 summaries = "\n".join( 

170 [ 

171 f"- {item['headline']}: {item.get('summary', '')[:100]}..." 

172 for item in news_items[:10] 

173 ] 

174 ) 

175 

176 prompt = f""" 

177Based on these news stories, write THE BIG PICTURE summary. 

178Connect the dots between events. What's the larger narrative? 

179Write 3-4 sentences maximum. 

180 

181News stories: 

182{summaries} 

183 

184THE BIG PICTURE:""" 

185 

186 try: 

187 response = self.llm_client.invoke(prompt) 

188 content = ( 

189 response.content 

190 if hasattr(response, "content") 

191 else str(response) 

192 ) 

193 return content.strip() 

194 except Exception: 

195 logger.exception("Error generating big picture") 

196 return "" 

197 

198 def generate_watch_for(self, news_items: List[Dict[str, Any]]) -> List[str]: 

199 """ 

200 Generate list of developments to watch for in next 24-48 hours. 

201 

202 Args: 

203 news_items: Extracted news items 

204 

205 Returns: 

206 List of bullet points 

207 """ 

208 if not self.llm_client or not news_items: 

209 return [] 

210 

211 # Focus on developing stories 

212 developing = [ 

213 item for item in news_items if item.get("is_developing", False) 

214 ] 

215 if not developing: 

216 developing = news_items[:5] 

217 

218 summaries = "\n".join( 

219 [ 

220 f"- {item['headline']}: {item.get('summary', '')[:100]}..." 

221 for item in developing 

222 ] 

223 ) 

224 

225 prompt = f""" 

226Based on these developing news stories, what should we watch for in the next 24-48 hours? 

227Write 3-5 specific, actionable items. 

228 

229Developing stories: 

230{summaries} 

231 

232WATCH FOR: 

233-""" 

234 

235 try: 

236 response = self.llm_client.invoke(prompt) 

237 content = ( 

238 response.content 

239 if hasattr(response, "content") 

240 else str(response) 

241 ) 

242 

243 # Parse bullet points 

244 lines = content.strip().split("\n") 

245 watch_items = [] 

246 for line in lines: 

247 line = line.strip() 

248 if line and line not in ["WATCH FOR:", "Watch for:"]: 

249 # Remove bullet markers 

250 line = line.lstrip("-•* ") 

251 if line: 251 ↛ 246line 251 didn't jump to line 246 because the condition on line 251 was always true

252 watch_items.append(line) 

253 

254 return watch_items[:5] 

255 

256 except Exception: 

257 logger.exception("Error generating watch items") 

258 return [] 

259 

260 def generate_patterns(self, news_items: List[Dict[str, Any]]) -> str: 

261 """ 

262 Identify emerging patterns from today's news. 

263 

264 Args: 

265 news_items: Extracted news items 

266 

267 Returns: 

268 Pattern recognition summary 

269 """ 

270 if not self.llm_client or not news_items: 

271 return "" 

272 

273 # Group by category 

274 by_category = {} 

275 for item in news_items: 

276 cat = item.get("category", "Other") 

277 if cat not in by_category: 

278 by_category[cat] = [] 

279 by_category[cat].append(item["headline"]) 

280 

281 category_summary = "\n".join( 

282 [ 

283 f"{cat}: {len(items)} stories" 

284 for cat, items in by_category.items() 

285 ] 

286 ) 

287 

288 prompt = f""" 

289Identify emerging patterns from today's news distribution: 

290 

291{category_summary} 

292 

293Top headlines: 

294{chr(10).join([f"- {item['headline']}" for item in news_items[:10]])} 

295 

296PATTERN RECOGNITION (1-2 sentences):""" 

297 

298 try: 

299 response = self.llm_client.invoke(prompt) 

300 content = ( 

301 response.content 

302 if hasattr(response, "content") 

303 else str(response) 

304 ) 

305 return content.strip() 

306 except Exception: 

307 logger.exception("Error generating patterns") 

308 return "" 

309 

310 def extract_topics( 

311 self, news_items: List[Dict[str, Any]] 

312 ) -> List[Dict[str, Any]]: 

313 """ 

314 Extract subscribable topics from news items. 

315 

316 Args: 

317 news_items: Extracted news items 

318 

319 Returns: 

320 List of topic dictionaries with metadata 

321 """ 

322 topics = [] 

323 

324 # Use topic generator to extract from each item 

325 for item in news_items: 

326 # Use topic generator with headline as query and summary as findings 

327 headline = item.get("headline", "") 

328 summary = item.get("summary", "") 

329 category = item.get("category", "") 

330 

331 extracted = generate_topics( 

332 query=headline, 

333 findings=summary, 

334 category=category, 

335 max_topics=3, 

336 ) 

337 

338 for topic in extracted: 

339 topics.append( 

340 { 

341 "name": topic, 

342 "source_item_id": item.get("id"), 

343 "source_headline": item.get("headline"), 

344 "category": item.get("category"), 

345 "impact_score": item.get("impact_score", 5), 

346 } 

347 ) 

348 

349 # Deduplicate and sort by frequency 

350 topic_counts = {} 

351 topic_metadata = {} 

352 

353 for topic_info in topics: 

354 name = topic_info["name"] 

355 if name not in topic_counts: 

356 topic_counts[name] = 0 

357 topic_metadata[name] = topic_info 

358 topic_counts[name] += 1 

359 

360 # Keep highest impact score 

361 if ( 

362 topic_info["impact_score"] 

363 > topic_metadata[name]["impact_score"] 

364 ): 

365 topic_metadata[name] = topic_info 

366 

367 # Create final topic list 

368 final_topics = [] 

369 for topic, count in sorted( 

370 topic_counts.items(), key=lambda x: x[1], reverse=True 

371 ): 

372 metadata = topic_metadata[topic] 

373 metadata["frequency"] = count 

374 metadata["query"] = f"{topic} latest developments news" 

375 final_topics.append(metadata) 

376 

377 return final_topics[:10] # Top 10 topics 

378 

379 def _prepare_snippets(self, search_results: List[Dict[str, Any]]) -> str: 

380 """Prepare search result snippets for LLM processing.""" 

381 snippets = [] 

382 for i, result in enumerate(search_results): 

383 snippet = f"[{i + 1}] " 

384 if result.get("title"): 

385 snippet += f"Title: {result['title']}\n" 

386 if result.get("url"): 

387 snippet += f"URL: {result['url']}\n" 

388 if result.get("snippet"): 

389 snippet += f"Snippet: {result['snippet'][:200]}...\n" 

390 elif result.get("content"): 

391 snippet += f"Content: {result['content'][:200]}...\n" 

392 

393 snippets.append(snippet) 

394 

395 return "\n".join(snippets) 

396 

397 def _validate_news_item(self, item: Dict[str, Any]) -> bool: 

398 """Validate that a news item has required fields.""" 

399 required = ["headline", "summary"] 

400 return all(field in item and item[field] for field in required) 

401 

402 def _count_categories( 

403 self, news_items: List[Dict[str, Any]] 

404 ) -> Dict[str, int]: 

405 """Count items by category.""" 

406 counts = {} 

407 for item in news_items: 

408 cat = item.get("category", "Other") 

409 counts[cat] = counts.get(cat, 0) + 1 

410 return counts 

411 

412 def _summarize_impact( 

413 self, news_items: List[Dict[str, Any]] 

414 ) -> Dict[str, Any]: 

415 """Summarize impact scores.""" 

416 if not news_items: 

417 return {"average": 0, "high_impact_count": 0} 

418 

419 scores = [item.get("impact_score", 5) for item in news_items] 

420 return { 

421 "average": sum(scores) / len(scores), 

422 "high_impact_count": len([s for s in scores if s >= 8]), 

423 "max": max(scores), 

424 "min": min(scores), 

425 } 

426 

427 def _empty_analysis(self) -> Dict[str, Any]: 

428 """Return empty analysis structure.""" 

429 return { 

430 "items": [], 

431 "item_count": 0, 

432 "big_picture": "", 

433 "watch_for": [], 

434 "patterns": "", 

435 "topics": [], 

436 "categories": {}, 

437 "impact_summary": {"average": 0, "high_impact_count": 0}, 

438 "timestamp": datetime.now(timezone.utc).isoformat(), 

439 }