Coverage for src / local_deep_research / news / utils / topic_generator.py: 97%
54 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Topic generation utilities for news items.
3Uses LLM to extract relevant topics/tags from news content.
4"""
6from loguru import logger
7from typing import List
9from ...utilities.json_utils import extract_json, get_llm_response_text
12def generate_topics(
13 query: str, findings: str = "", category: str = "", max_topics: int = 5
14) -> List[str]:
15 """
16 Generate relevant topics/tags from news content.
18 Args:
19 query: The search query or research question
20 findings: The research findings/content
21 category: The news category (if available)
22 max_topics: Maximum number of topics to generate
24 Returns:
25 List of topic strings
26 """
27 # Try LLM generation first
28 topics = _generate_with_llm(query, findings, category, max_topics)
30 # No fallback - if LLM fails, mark as missing
31 if not topics:
32 topics = ["[Topic generation failed]"]
34 # Ensure we have valid topics
35 return _validate_topics(topics, max_topics)
38def _generate_with_llm(
39 query: str, findings: str, category: str, max_topics: int
40) -> List[str]:
41 """Generate topics using LLM."""
42 try:
43 from ...config.llm_config import get_llm
45 logger.debug(
46 f"Topic generation - findings length: {len(findings) if findings else 0}, category: {category}"
47 )
49 # Use the configured model for topic generation
50 llm = get_llm(temperature=0.5)
52 # Prepare context
53 query_preview = query[:500] if len(query) > 500 else query
54 findings_preview = (
55 findings[:1000] if findings and len(findings) > 1000 else findings
56 )
58 prompt = f"""Extract relevant topics/tags from this news content.
60Query: {query_preview}
61{f"Content: {findings_preview}" if findings_preview else ""}
62{f"Category: {category}" if category else ""}
64Generate {max_topics} specific, relevant topics that would help categorize and filter this news item.
66Requirements:
67- Each topic should be 1-3 words
68- Topics should be specific and meaningful
69- Include geographic regions if mentioned
70- Include key entities (countries, organizations, people)
71- Include event types (conflict, economy, disaster, etc.)
72- Topics should be diverse and cover different aspects
74Return ONLY a JSON array of topic strings, like: ["Topic 1", "Topic 2", "Topic 3"]"""
76 response = llm.invoke(prompt)
77 content = get_llm_response_text(response)
79 # Try to parse the JSON response
80 topics = extract_json(content, expected_type=list)
82 if topics is not None:
83 # Clean and validate each topic
84 cleaned_topics = []
85 for topic in topics:
86 if isinstance(topic, str): 86 ↛ 85line 86 didn't jump to line 85 because the condition on line 86 was always true
87 cleaned = topic.strip()
88 if cleaned and len(cleaned) <= 30: # Max topic length
89 cleaned_topics.append(cleaned)
91 logger.debug(f"Generated topics: {cleaned_topics}")
92 return cleaned_topics[:max_topics]
94 # Try to extract topics from plain text response
95 logger.debug(f"Failed to parse LLM topics as JSON: {content}")
96 if "," in content: 96 ↛ 103line 96 didn't jump to line 103 because the condition on line 96 was always true
97 topics = [t.strip().strip("\"'") for t in content.split(",")]
98 return [t for t in topics if t and len(t) <= 30][:max_topics]
100 except Exception as e:
101 logger.debug(f"LLM topic generation failed: {e}")
103 return []
106def _validate_topics(topics: List[str], max_topics: int) -> List[str]:
107 """Validate and clean topics."""
108 valid_topics = []
109 seen = set()
111 for topic in topics:
112 if not topic:
113 continue
115 # Clean the topic
116 cleaned = topic.strip()
118 # Skip if too short or too long
119 if len(cleaned) < 2 or len(cleaned) > 30:
120 continue
122 # Skip duplicates (case-insensitive)
123 normalized = cleaned.lower()
124 if normalized in seen:
125 continue
126 seen.add(normalized)
128 # Convert to lowercase as djpetti suggested
129 valid_topics.append(normalized)
131 if len(valid_topics) >= max_topics:
132 break
134 # Don't add default topics - show what actually happened
135 if not valid_topics:
136 valid_topics = ["[No valid topics]"]
138 return valid_topics