Coverage for src / local_deep_research / advanced_search_system / source_management / diversity_manager.py: 96%
238 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Source diversity management for improved evidence quality.
3"""
5import re
6from collections import defaultdict
7from dataclasses import dataclass, field
8from datetime import datetime, UTC
9from typing import Any, Dict, List, Optional, Set, Tuple
11from langchain_core.language_models import BaseChatModel
13from ..constraints.base_constraint import Constraint, ConstraintType
16@dataclass
17class SourceProfile:
18 """Profile of a source for diversity tracking."""
20 url: str
21 domain: str
22 source_type: str # 'academic', 'news', 'government', 'wiki', 'blog', etc.
23 credibility_score: float
24 specialties: List[str] = field(default_factory=list)
25 temporal_coverage: Optional[Tuple[datetime, datetime]] = None
26 geographic_focus: Optional[str] = None
27 evidence_count: int = 0
28 last_accessed: Optional[datetime] = None
31@dataclass
32class DiversityMetrics:
33 """Metrics for source diversity assessment."""
35 type_diversity: float # 0.0 to 1.0
36 temporal_diversity: float
37 geographic_diversity: float
38 credibility_distribution: Dict[str, float]
39 specialty_coverage: Dict[str, int]
40 overall_score: float
43class SourceDiversityManager:
44 """
45 Manages source diversity to ensure comprehensive evidence collection.
47 Key features:
48 1. Tracks source types and characteristics
49 2. Ensures diverse source selection
50 3. Prioritizes high-credibility sources
51 4. Manages geographic and temporal diversity
52 """
54 def __init__(self, model: BaseChatModel):
55 """Initialize the source diversity manager."""
56 self.model = model
57 self.source_profiles: Dict[str, SourceProfile] = {}
58 self.source_types: Dict[str, Set[str]] = defaultdict(set)
59 self.type_priorities: Dict[str, float] = {
60 "academic": 0.9,
61 "government": 0.85,
62 "news": 0.7,
63 "wiki": 0.75,
64 "blog": 0.5,
65 "forum": 0.4,
66 "social": 0.3,
67 }
68 self.minimum_source_types: int = 3
69 self.credibility_threshold: float = 0.6
71 def analyze_source(
72 self, url: str, content: Optional[str] = None
73 ) -> SourceProfile:
74 """Analyze a source and create its profile."""
75 if url in self.source_profiles:
76 profile = self.source_profiles[url]
77 profile.evidence_count += 1
78 profile.last_accessed = datetime.now(UTC)
79 return profile
81 # Extract domain
82 domain = self._extract_domain(url)
84 # Determine source type
85 source_type = self._determine_source_type(url, domain, content)
87 # Calculate credibility
88 credibility = self._calculate_credibility(
89 url, domain, source_type, content
90 )
92 # Extract specialties
93 specialties = self._extract_specialties(url, content)
95 # Determine temporal and geographic coverage
96 temporal_coverage = self._extract_temporal_coverage(content)
97 geographic_focus = self._extract_geographic_focus(url, content)
99 profile = SourceProfile(
100 url=url,
101 domain=domain,
102 source_type=source_type,
103 credibility_score=credibility,
104 specialties=specialties,
105 temporal_coverage=temporal_coverage,
106 geographic_focus=geographic_focus,
107 evidence_count=1,
108 last_accessed=datetime.now(UTC),
109 )
111 self.source_profiles[url] = profile
112 self.source_types[source_type].add(url)
114 return profile
116 def _extract_domain(self, url: str) -> str:
117 """Extract domain from URL."""
118 import re
120 pattern = r"https?://(?:www\.)?([^/]+)"
121 match = re.match(pattern, url)
122 if match:
123 return match.group(1)
124 return url
126 def _determine_source_type(
127 self, url: str, domain: str, content: Optional[str]
128 ) -> str:
129 """Determine the type of source."""
130 # Check known patterns
131 academic_domains = [
132 ".edu",
133 ".ac.",
134 "scholar",
135 "pubmed",
136 "arxiv",
137 "jstor",
138 ]
139 government_domains = [".gov", ".mil"]
140 news_domains = [
141 "news",
142 "times",
143 "post",
144 "guardian",
145 "bbc",
146 "cnn",
147 "reuters",
148 ]
149 wiki_domains = ["wikipedia", "wiki"]
151 lower_domain = domain.lower()
152 lower_url = url.lower()
154 # Check patterns
155 for pattern in academic_domains:
156 if pattern in lower_domain or pattern in lower_url:
157 return "academic"
159 for pattern in government_domains:
160 if pattern in lower_domain:
161 return "government"
163 for pattern in wiki_domains:
164 if pattern in lower_domain:
165 return "wiki"
167 for pattern in news_domains:
168 if pattern in lower_domain:
169 return "news"
171 # Use content analysis as fallback
172 if content:
173 return self._analyze_content_type(content)
175 return "general"
177 def _analyze_content_type(self, content: str) -> str:
178 """Analyze content to determine source type."""
179 prompt = f"""
180Analyze this content excerpt and determine the source type:
182{content[:500]}
184Choose from: academic, government, news, wiki, blog, forum, social, general
186Return only the source type.
187"""
189 response = self.model.invoke(prompt)
190 source_type = response.content.strip().lower()
192 if source_type in self.type_priorities:
193 return source_type
194 return "general"
196 def _calculate_credibility(
197 self, url: str, domain: str, source_type: str, content: Optional[str]
198 ) -> float:
199 """Calculate credibility score for a source."""
200 # Base score from source type
201 base_score = self.type_priorities.get(source_type, 0.5)
203 # Adjust based on domain characteristics
204 if ".edu" in domain or ".gov" in domain:
205 base_score = min(base_score + 0.1, 1.0)
207 # Check for HTTPS
208 if url.startswith("https://"):
209 base_score = min(base_score + 0.05, 1.0)
211 # Additional analysis if content provided
212 if content:
213 # Check for citations/references
214 if re.search(r"\[\d+\]|\(\d{4}\)", content):
215 base_score = min(base_score + 0.1, 1.0)
217 # Check for author information
218 if re.search(
219 r"[Aa]uthor:|[Bb]y\s+[A-Z][a-z]+\s+[A-Z][a-z]+", content
220 ):
221 base_score = min(base_score + 0.05, 1.0)
223 return base_score
225 def _extract_specialties(
226 self, url: str, content: Optional[str]
227 ) -> List[str]:
228 """Extract topic specialties from source."""
229 specialties = []
231 # URL-based extraction
232 url_keywords = re.findall(r"/([a-z]+)/", url.lower())
233 specialties.extend([kw for kw in url_keywords if len(kw) > 3][:3])
235 # Content-based extraction if available
236 if content: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 prompt = f"""
238Identify the main topic areas or specialties covered in this content:
240{content[:500]}
242Return up to 3 topic areas, one per line.
243"""
245 response = self.model.invoke(prompt)
246 topics = [
247 line.strip()
248 for line in response.content.strip().split("\n")
249 if line.strip()
250 ]
251 specialties.extend(topics[:3])
253 return list(set(specialties))[:5]
255 def _extract_temporal_coverage(
256 self, content: Optional[str]
257 ) -> Optional[Tuple[datetime, datetime]]:
258 """Extract temporal coverage from content."""
259 if not content:
260 return None
262 # Look for year patterns
263 years = re.findall(r"\b(19\d{2}|20\d{2})\b", content)
265 if years:
266 years = [int(year) for year in years]
267 min_year = min(years)
268 max_year = max(years)
270 try:
271 return (datetime(min_year, 1, 1), datetime(max_year, 12, 31))
272 except ValueError:
273 return None
275 return None
277 def _extract_geographic_focus(
278 self, url: str, content: Optional[str]
279 ) -> Optional[str]:
280 """Extract geographic focus from source."""
281 # Check URL for geographic indicators
282 geo_patterns = {
283 "us": "United States",
284 "uk": "United Kingdom",
285 "ca": "Canada",
286 "au": "Australia",
287 "eu": "Europe",
288 }
290 for pattern, location in geo_patterns.items():
291 if f".{pattern}" in url or f"/{pattern}/" in url:
292 return location
294 # Content-based extraction
295 if content:
296 # Look for country/region mentions
297 locations = re.findall(
298 r"\b(?:United States|UK|Canada|Australia|Europe|Asia|Africa|Americas)\b",
299 content[:1000],
300 re.IGNORECASE,
301 )
303 if locations:
304 # Return most frequent
305 from collections import Counter
307 location_counts = Counter(locations)
308 return location_counts.most_common(1)[0][0]
310 return None
312 def calculate_diversity_metrics(
313 self, sources: List[str]
314 ) -> DiversityMetrics:
315 """Calculate diversity metrics for a set of sources."""
316 if not sources:
317 return DiversityMetrics(
318 type_diversity=0.0,
319 temporal_diversity=0.0,
320 geographic_diversity=0.0,
321 credibility_distribution={},
322 specialty_coverage={},
323 overall_score=0.0,
324 )
326 # Get profiles
327 profiles = [
328 self.source_profiles.get(url) or self.analyze_source(url)
329 for url in sources
330 ]
332 # Type diversity
333 source_types = [p.source_type for p in profiles]
334 unique_types = len(set(source_types))
335 type_diversity = min(unique_types / self.minimum_source_types, 1.0)
337 # Temporal diversity
338 temporal_ranges = [
339 p.temporal_coverage for p in profiles if p.temporal_coverage
340 ]
341 temporal_diversity = self._calculate_temporal_diversity(temporal_ranges)
343 # Geographic diversity
344 geo_focuses = [
345 p.geographic_focus for p in profiles if p.geographic_focus
346 ]
347 unique_geos = len(set(geo_focuses))
348 geographic_diversity = min(unique_geos / 3, 1.0) if geo_focuses else 0.0
350 # Credibility distribution
351 credibility_distribution = {}
352 for p in profiles:
353 level = (
354 "high"
355 if p.credibility_score >= 0.8
356 else "medium"
357 if p.credibility_score >= 0.6
358 else "low"
359 )
360 credibility_distribution[level] = (
361 credibility_distribution.get(level, 0) + 1
362 )
364 # Specialty coverage
365 specialty_coverage = {}
366 for p in profiles:
367 for specialty in p.specialties: 367 ↛ 368line 367 didn't jump to line 368 because the loop on line 367 never started
368 specialty_coverage[specialty] = (
369 specialty_coverage.get(specialty, 0) + 1
370 )
372 # Overall score
373 overall_score = (
374 type_diversity * 0.3
375 + temporal_diversity * 0.2
376 + geographic_diversity * 0.2
377 + (credibility_distribution.get("high", 0) / len(profiles)) * 0.3
378 )
380 return DiversityMetrics(
381 type_diversity=type_diversity,
382 temporal_diversity=temporal_diversity,
383 geographic_diversity=geographic_diversity,
384 credibility_distribution=credibility_distribution,
385 specialty_coverage=specialty_coverage,
386 overall_score=overall_score,
387 )
389 def _calculate_temporal_diversity(
390 self, ranges: List[Tuple[datetime, datetime]]
391 ) -> float:
392 """Calculate temporal diversity from date ranges."""
393 if not ranges:
394 return 0.0
396 # Calculate span coverage
397 all_years = set()
398 for start, end in ranges:
399 for year in range(start.year, end.year + 1):
400 all_years.add(year)
402 # Diversity based on year span
403 if len(all_years) > 1:
404 year_span = max(all_years) - min(all_years)
405 # Normalize to 0-1 (20 years = max diversity)
406 return min(year_span / 20, 1.0)
408 return 0.0
410 def recommend_additional_sources(
411 self, current_sources: List[str], constraints: List[Constraint]
412 ) -> List[Dict[str, Any]]:
413 """Recommend additional sources to improve diversity."""
414 current_metrics = self.calculate_diversity_metrics(current_sources)
415 recommendations = []
417 # Identify gaps
418 gaps = self._identify_diversity_gaps(current_metrics, constraints)
420 for gap_type, gap_details in gaps.items():
421 if gap_type == "source_type":
422 # Recommend sources of missing types
423 for missing_type in gap_details:
424 rec = {
425 "type": "source_type",
426 "target": missing_type,
427 "query_modifier": self._get_source_type_modifier(
428 missing_type
429 ),
430 "reason": f"Add {missing_type} sources for better perspective",
431 }
432 recommendations.append(rec)
434 elif gap_type == "temporal":
435 # Recommend sources for missing time periods
436 rec = {
437 "type": "temporal",
438 "target": gap_details,
439 "query_modifier": f'"{gap_details}" historical archive',
440 "reason": f"Add sources covering {gap_details}",
441 }
442 recommendations.append(rec)
444 elif gap_type == "geographic":
445 # Recommend sources from missing regions
446 for region in gap_details:
447 rec = {
448 "type": "geographic",
449 "target": region,
450 "query_modifier": f"site:{self._get_region_domain(region)}",
451 "reason": f"Add sources from {region}",
452 }
453 recommendations.append(rec)
455 elif gap_type == "credibility": 455 ↛ 420line 455 didn't jump to line 420 because the condition on line 455 was always true
456 # Recommend higher credibility sources
457 rec = {
458 "type": "credibility",
459 "target": "high_credibility",
460 "query_modifier": "site:.edu OR site:.gov OR peer-reviewed",
461 "reason": "Add more authoritative sources",
462 }
463 recommendations.append(rec)
465 return recommendations[:5] # Limit recommendations
467 def _identify_diversity_gaps(
468 self, metrics: DiversityMetrics, constraints: List[Constraint]
469 ) -> Dict[str, Any]:
470 """Identify gaps in source diversity."""
471 gaps = {}
473 # Source type gaps
474 if metrics.type_diversity < 0.7:
475 current_types = set(
476 p.source_type for p in self.source_profiles.values()
477 )
478 desired_types = {"academic", "government", "news", "wiki"}
479 missing_types = desired_types - current_types
480 if missing_types: 480 ↛ 484line 480 didn't jump to line 484 because the condition on line 480 was always true
481 gaps["source_type"] = list(missing_types)
483 # Temporal gaps (based on constraints)
484 temporal_constraints = [
485 c for c in constraints if c.type == ConstraintType.TEMPORAL
486 ]
487 if temporal_constraints and metrics.temporal_diversity < 0.5:
488 # Extract years from constraints
489 years_needed = []
490 for c in temporal_constraints:
491 year_match = re.search(r"\b(19\d{2}|20\d{2})\b", c.value)
492 if year_match: 492 ↛ 490line 492 didn't jump to line 490 because the condition on line 492 was always true
493 years_needed.append(year_match.group(1))
495 if years_needed: 495 ↛ 499line 495 didn't jump to line 499 because the condition on line 495 was always true
496 gaps["temporal"] = f"{min(years_needed)}-{max(years_needed)}"
498 # Geographic gaps
499 location_constraints = [
500 c for c in constraints if c.type == ConstraintType.LOCATION
501 ]
502 if location_constraints and metrics.geographic_diversity < 0.5:
503 locations_needed = [c.value for c in location_constraints]
504 gaps["geographic"] = locations_needed
506 # Credibility gaps
507 high_cred_ratio = metrics.credibility_distribution.get("high", 0) / max(
508 sum(metrics.credibility_distribution.values()), 1
509 )
510 if high_cred_ratio < 0.3:
511 gaps["credibility"] = True
513 return gaps
515 def _get_source_type_modifier(self, source_type: str) -> str:
516 """Get search modifier for specific source type."""
517 modifiers = {
518 "academic": "site:.edu OR site:scholar.google.com OR site:pubmed.gov",
519 "government": "site:.gov OR site:.mil",
520 "news": 'news OR "press release" OR journalism',
521 "wiki": "site:wikipedia.org OR wiki",
522 "blog": 'blog OR "posted by" OR comments',
523 }
524 return modifiers.get(source_type, "")
526 def _get_region_domain(self, region: str) -> str:
527 """Get domain suffix for a region."""
528 region_domains = {
529 "United States": ".us OR .com",
530 "United Kingdom": ".uk",
531 "Canada": ".ca",
532 "Australia": ".au",
533 "Europe": ".eu OR .de OR .fr",
534 }
535 return region_domains.get(region, ".com")
537 def select_diverse_sources(
538 self, available_sources: List[str], target_count: int
539 ) -> List[str]:
540 """Select a diverse subset of sources."""
541 if len(available_sources) <= target_count:
542 return available_sources
544 # Score each source based on diversity contribution
545 source_scores = []
547 for source in available_sources:
548 profile = self.source_profiles.get(source) or self.analyze_source(
549 source
550 )
552 # Calculate diversity score
553 score = (
554 self.type_priorities.get(profile.source_type, 0.5) * 0.4
555 + profile.credibility_score * 0.3
556 + (1.0 if profile.specialties else 0.5) * 0.15
557 + (1.0 if profile.temporal_coverage else 0.5) * 0.15
558 )
560 source_scores.append((source, score, profile))
562 # Sort by score
563 source_scores.sort(key=lambda x: x[1], reverse=True)
565 # Select diverse sources
566 selected = []
567 selected_types = set()
568 selected_geos = set()
570 for source, score, profile in source_scores: 570 ↛ 586line 570 didn't jump to line 586 because the loop on line 570 didn't complete
571 # Prioritize diversity
572 is_diverse = profile.source_type not in selected_types or (
573 profile.geographic_focus
574 and profile.geographic_focus not in selected_geos
575 )
577 if is_diverse or len(selected) < target_count // 2:
578 selected.append(source)
579 selected_types.add(profile.source_type)
580 if profile.geographic_focus:
581 selected_geos.add(profile.geographic_focus)
583 if len(selected) >= target_count:
584 break
586 return selected
588 def track_source_effectiveness(
589 self, source: str, evidence_quality: float, constraint_satisfied: bool
590 ):
591 """Track how effective a source is for evidence gathering."""
592 profile = self.source_profiles.get(source)
593 if not profile:
594 return
596 # Update profile based on effectiveness
597 if constraint_satisfied:
598 # Boost credibility slightly
599 profile.credibility_score = min(
600 profile.credibility_score * 1.05, 1.0
601 )
603 # Track in metadata
604 if "effectiveness" not in profile.__dict__: 604 ↛ 607line 604 didn't jump to line 607 because the condition on line 604 was always true
605 profile.effectiveness = []
607 profile.effectiveness.append(
608 {
609 "timestamp": datetime.now(UTC),
610 "evidence_quality": evidence_quality,
611 "constraint_satisfied": constraint_satisfied,
612 }
613 )