Coverage for src / local_deep_research / advanced_search_system / source_management / diversity_manager.py: 96%

238 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Source diversity management for improved evidence quality. 

3""" 

4 

5import re 

6from collections import defaultdict 

7from dataclasses import dataclass, field 

8from datetime import datetime, UTC 

9from typing import Any, Dict, List, Optional, Set, Tuple 

10 

11from langchain_core.language_models import BaseChatModel 

12 

13from ..constraints.base_constraint import Constraint, ConstraintType 

14 

15 

16@dataclass 

17class SourceProfile: 

18 """Profile of a source for diversity tracking.""" 

19 

20 url: str 

21 domain: str 

22 source_type: str # 'academic', 'news', 'government', 'wiki', 'blog', etc. 

23 credibility_score: float 

24 specialties: List[str] = field(default_factory=list) 

25 temporal_coverage: Optional[Tuple[datetime, datetime]] = None 

26 geographic_focus: Optional[str] = None 

27 evidence_count: int = 0 

28 last_accessed: Optional[datetime] = None 

29 

30 

31@dataclass 

32class DiversityMetrics: 

33 """Metrics for source diversity assessment.""" 

34 

35 type_diversity: float # 0.0 to 1.0 

36 temporal_diversity: float 

37 geographic_diversity: float 

38 credibility_distribution: Dict[str, float] 

39 specialty_coverage: Dict[str, int] 

40 overall_score: float 

41 

42 

43class SourceDiversityManager: 

44 """ 

45 Manages source diversity to ensure comprehensive evidence collection. 

46 

47 Key features: 

48 1. Tracks source types and characteristics 

49 2. Ensures diverse source selection 

50 3. Prioritizes high-credibility sources 

51 4. Manages geographic and temporal diversity 

52 """ 

53 

54 def __init__(self, model: BaseChatModel): 

55 """Initialize the source diversity manager.""" 

56 self.model = model 

57 self.source_profiles: Dict[str, SourceProfile] = {} 

58 self.source_types: Dict[str, Set[str]] = defaultdict(set) 

59 self.type_priorities: Dict[str, float] = { 

60 "academic": 0.9, 

61 "government": 0.85, 

62 "news": 0.7, 

63 "wiki": 0.75, 

64 "blog": 0.5, 

65 "forum": 0.4, 

66 "social": 0.3, 

67 } 

68 self.minimum_source_types: int = 3 

69 self.credibility_threshold: float = 0.6 

70 

71 def analyze_source( 

72 self, url: str, content: Optional[str] = None 

73 ) -> SourceProfile: 

74 """Analyze a source and create its profile.""" 

75 if url in self.source_profiles: 

76 profile = self.source_profiles[url] 

77 profile.evidence_count += 1 

78 profile.last_accessed = datetime.now(UTC) 

79 return profile 

80 

81 # Extract domain 

82 domain = self._extract_domain(url) 

83 

84 # Determine source type 

85 source_type = self._determine_source_type(url, domain, content) 

86 

87 # Calculate credibility 

88 credibility = self._calculate_credibility( 

89 url, domain, source_type, content 

90 ) 

91 

92 # Extract specialties 

93 specialties = self._extract_specialties(url, content) 

94 

95 # Determine temporal and geographic coverage 

96 temporal_coverage = self._extract_temporal_coverage(content) 

97 geographic_focus = self._extract_geographic_focus(url, content) 

98 

99 profile = SourceProfile( 

100 url=url, 

101 domain=domain, 

102 source_type=source_type, 

103 credibility_score=credibility, 

104 specialties=specialties, 

105 temporal_coverage=temporal_coverage, 

106 geographic_focus=geographic_focus, 

107 evidence_count=1, 

108 last_accessed=datetime.now(UTC), 

109 ) 

110 

111 self.source_profiles[url] = profile 

112 self.source_types[source_type].add(url) 

113 

114 return profile 

115 

116 def _extract_domain(self, url: str) -> str: 

117 """Extract domain from URL.""" 

118 import re 

119 

120 pattern = r"https?://(?:www\.)?([^/]+)" 

121 match = re.match(pattern, url) 

122 if match: 

123 return match.group(1) 

124 return url 

125 

126 def _determine_source_type( 

127 self, url: str, domain: str, content: Optional[str] 

128 ) -> str: 

129 """Determine the type of source.""" 

130 # Check known patterns 

131 academic_domains = [ 

132 ".edu", 

133 ".ac.", 

134 "scholar", 

135 "pubmed", 

136 "arxiv", 

137 "jstor", 

138 ] 

139 government_domains = [".gov", ".mil"] 

140 news_domains = [ 

141 "news", 

142 "times", 

143 "post", 

144 "guardian", 

145 "bbc", 

146 "cnn", 

147 "reuters", 

148 ] 

149 wiki_domains = ["wikipedia", "wiki"] 

150 

151 lower_domain = domain.lower() 

152 lower_url = url.lower() 

153 

154 # Check patterns 

155 for pattern in academic_domains: 

156 if pattern in lower_domain or pattern in lower_url: 

157 return "academic" 

158 

159 for pattern in government_domains: 

160 if pattern in lower_domain: 

161 return "government" 

162 

163 for pattern in wiki_domains: 

164 if pattern in lower_domain: 

165 return "wiki" 

166 

167 for pattern in news_domains: 

168 if pattern in lower_domain: 

169 return "news" 

170 

171 # Use content analysis as fallback 

172 if content: 

173 return self._analyze_content_type(content) 

174 

175 return "general" 

176 

177 def _analyze_content_type(self, content: str) -> str: 

178 """Analyze content to determine source type.""" 

179 prompt = f""" 

180Analyze this content excerpt and determine the source type: 

181 

182{content[:500]} 

183 

184Choose from: academic, government, news, wiki, blog, forum, social, general 

185 

186Return only the source type. 

187""" 

188 

189 response = self.model.invoke(prompt) 

190 source_type = response.content.strip().lower() 

191 

192 if source_type in self.type_priorities: 

193 return source_type 

194 return "general" 

195 

196 def _calculate_credibility( 

197 self, url: str, domain: str, source_type: str, content: Optional[str] 

198 ) -> float: 

199 """Calculate credibility score for a source.""" 

200 # Base score from source type 

201 base_score = self.type_priorities.get(source_type, 0.5) 

202 

203 # Adjust based on domain characteristics 

204 if ".edu" in domain or ".gov" in domain: 

205 base_score = min(base_score + 0.1, 1.0) 

206 

207 # Check for HTTPS 

208 if url.startswith("https://"): 

209 base_score = min(base_score + 0.05, 1.0) 

210 

211 # Additional analysis if content provided 

212 if content: 

213 # Check for citations/references 

214 if re.search(r"\[\d+\]|\(\d{4}\)", content): 

215 base_score = min(base_score + 0.1, 1.0) 

216 

217 # Check for author information 

218 if re.search( 

219 r"[Aa]uthor:|[Bb]y\s+[A-Z][a-z]+\s+[A-Z][a-z]+", content 

220 ): 

221 base_score = min(base_score + 0.05, 1.0) 

222 

223 return base_score 

224 

225 def _extract_specialties( 

226 self, url: str, content: Optional[str] 

227 ) -> List[str]: 

228 """Extract topic specialties from source.""" 

229 specialties = [] 

230 

231 # URL-based extraction 

232 url_keywords = re.findall(r"/([a-z]+)/", url.lower()) 

233 specialties.extend([kw for kw in url_keywords if len(kw) > 3][:3]) 

234 

235 # Content-based extraction if available 

236 if content: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 prompt = f""" 

238Identify the main topic areas or specialties covered in this content: 

239 

240{content[:500]} 

241 

242Return up to 3 topic areas, one per line. 

243""" 

244 

245 response = self.model.invoke(prompt) 

246 topics = [ 

247 line.strip() 

248 for line in response.content.strip().split("\n") 

249 if line.strip() 

250 ] 

251 specialties.extend(topics[:3]) 

252 

253 return list(set(specialties))[:5] 

254 

255 def _extract_temporal_coverage( 

256 self, content: Optional[str] 

257 ) -> Optional[Tuple[datetime, datetime]]: 

258 """Extract temporal coverage from content.""" 

259 if not content: 

260 return None 

261 

262 # Look for year patterns 

263 years = re.findall(r"\b(19\d{2}|20\d{2})\b", content) 

264 

265 if years: 

266 years = [int(year) for year in years] 

267 min_year = min(years) 

268 max_year = max(years) 

269 

270 try: 

271 return (datetime(min_year, 1, 1), datetime(max_year, 12, 31)) 

272 except ValueError: 

273 return None 

274 

275 return None 

276 

277 def _extract_geographic_focus( 

278 self, url: str, content: Optional[str] 

279 ) -> Optional[str]: 

280 """Extract geographic focus from source.""" 

281 # Check URL for geographic indicators 

282 geo_patterns = { 

283 "us": "United States", 

284 "uk": "United Kingdom", 

285 "ca": "Canada", 

286 "au": "Australia", 

287 "eu": "Europe", 

288 } 

289 

290 for pattern, location in geo_patterns.items(): 

291 if f".{pattern}" in url or f"/{pattern}/" in url: 

292 return location 

293 

294 # Content-based extraction 

295 if content: 

296 # Look for country/region mentions 

297 locations = re.findall( 

298 r"\b(?:United States|UK|Canada|Australia|Europe|Asia|Africa|Americas)\b", 

299 content[:1000], 

300 re.IGNORECASE, 

301 ) 

302 

303 if locations: 

304 # Return most frequent 

305 from collections import Counter 

306 

307 location_counts = Counter(locations) 

308 return location_counts.most_common(1)[0][0] 

309 

310 return None 

311 

312 def calculate_diversity_metrics( 

313 self, sources: List[str] 

314 ) -> DiversityMetrics: 

315 """Calculate diversity metrics for a set of sources.""" 

316 if not sources: 

317 return DiversityMetrics( 

318 type_diversity=0.0, 

319 temporal_diversity=0.0, 

320 geographic_diversity=0.0, 

321 credibility_distribution={}, 

322 specialty_coverage={}, 

323 overall_score=0.0, 

324 ) 

325 

326 # Get profiles 

327 profiles = [ 

328 self.source_profiles.get(url) or self.analyze_source(url) 

329 for url in sources 

330 ] 

331 

332 # Type diversity 

333 source_types = [p.source_type for p in profiles] 

334 unique_types = len(set(source_types)) 

335 type_diversity = min(unique_types / self.minimum_source_types, 1.0) 

336 

337 # Temporal diversity 

338 temporal_ranges = [ 

339 p.temporal_coverage for p in profiles if p.temporal_coverage 

340 ] 

341 temporal_diversity = self._calculate_temporal_diversity(temporal_ranges) 

342 

343 # Geographic diversity 

344 geo_focuses = [ 

345 p.geographic_focus for p in profiles if p.geographic_focus 

346 ] 

347 unique_geos = len(set(geo_focuses)) 

348 geographic_diversity = min(unique_geos / 3, 1.0) if geo_focuses else 0.0 

349 

350 # Credibility distribution 

351 credibility_distribution = {} 

352 for p in profiles: 

353 level = ( 

354 "high" 

355 if p.credibility_score >= 0.8 

356 else "medium" 

357 if p.credibility_score >= 0.6 

358 else "low" 

359 ) 

360 credibility_distribution[level] = ( 

361 credibility_distribution.get(level, 0) + 1 

362 ) 

363 

364 # Specialty coverage 

365 specialty_coverage = {} 

366 for p in profiles: 

367 for specialty in p.specialties: 367 ↛ 368line 367 didn't jump to line 368 because the loop on line 367 never started

368 specialty_coverage[specialty] = ( 

369 specialty_coverage.get(specialty, 0) + 1 

370 ) 

371 

372 # Overall score 

373 overall_score = ( 

374 type_diversity * 0.3 

375 + temporal_diversity * 0.2 

376 + geographic_diversity * 0.2 

377 + (credibility_distribution.get("high", 0) / len(profiles)) * 0.3 

378 ) 

379 

380 return DiversityMetrics( 

381 type_diversity=type_diversity, 

382 temporal_diversity=temporal_diversity, 

383 geographic_diversity=geographic_diversity, 

384 credibility_distribution=credibility_distribution, 

385 specialty_coverage=specialty_coverage, 

386 overall_score=overall_score, 

387 ) 

388 

389 def _calculate_temporal_diversity( 

390 self, ranges: List[Tuple[datetime, datetime]] 

391 ) -> float: 

392 """Calculate temporal diversity from date ranges.""" 

393 if not ranges: 

394 return 0.0 

395 

396 # Calculate span coverage 

397 all_years = set() 

398 for start, end in ranges: 

399 for year in range(start.year, end.year + 1): 

400 all_years.add(year) 

401 

402 # Diversity based on year span 

403 if len(all_years) > 1: 

404 year_span = max(all_years) - min(all_years) 

405 # Normalize to 0-1 (20 years = max diversity) 

406 return min(year_span / 20, 1.0) 

407 

408 return 0.0 

409 

410 def recommend_additional_sources( 

411 self, current_sources: List[str], constraints: List[Constraint] 

412 ) -> List[Dict[str, Any]]: 

413 """Recommend additional sources to improve diversity.""" 

414 current_metrics = self.calculate_diversity_metrics(current_sources) 

415 recommendations = [] 

416 

417 # Identify gaps 

418 gaps = self._identify_diversity_gaps(current_metrics, constraints) 

419 

420 for gap_type, gap_details in gaps.items(): 

421 if gap_type == "source_type": 

422 # Recommend sources of missing types 

423 for missing_type in gap_details: 

424 rec = { 

425 "type": "source_type", 

426 "target": missing_type, 

427 "query_modifier": self._get_source_type_modifier( 

428 missing_type 

429 ), 

430 "reason": f"Add {missing_type} sources for better perspective", 

431 } 

432 recommendations.append(rec) 

433 

434 elif gap_type == "temporal": 

435 # Recommend sources for missing time periods 

436 rec = { 

437 "type": "temporal", 

438 "target": gap_details, 

439 "query_modifier": f'"{gap_details}" historical archive', 

440 "reason": f"Add sources covering {gap_details}", 

441 } 

442 recommendations.append(rec) 

443 

444 elif gap_type == "geographic": 

445 # Recommend sources from missing regions 

446 for region in gap_details: 

447 rec = { 

448 "type": "geographic", 

449 "target": region, 

450 "query_modifier": f"site:{self._get_region_domain(region)}", 

451 "reason": f"Add sources from {region}", 

452 } 

453 recommendations.append(rec) 

454 

455 elif gap_type == "credibility": 455 ↛ 420line 455 didn't jump to line 420 because the condition on line 455 was always true

456 # Recommend higher credibility sources 

457 rec = { 

458 "type": "credibility", 

459 "target": "high_credibility", 

460 "query_modifier": "site:.edu OR site:.gov OR peer-reviewed", 

461 "reason": "Add more authoritative sources", 

462 } 

463 recommendations.append(rec) 

464 

465 return recommendations[:5] # Limit recommendations 

466 

467 def _identify_diversity_gaps( 

468 self, metrics: DiversityMetrics, constraints: List[Constraint] 

469 ) -> Dict[str, Any]: 

470 """Identify gaps in source diversity.""" 

471 gaps = {} 

472 

473 # Source type gaps 

474 if metrics.type_diversity < 0.7: 

475 current_types = set( 

476 p.source_type for p in self.source_profiles.values() 

477 ) 

478 desired_types = {"academic", "government", "news", "wiki"} 

479 missing_types = desired_types - current_types 

480 if missing_types: 480 ↛ 484line 480 didn't jump to line 484 because the condition on line 480 was always true

481 gaps["source_type"] = list(missing_types) 

482 

483 # Temporal gaps (based on constraints) 

484 temporal_constraints = [ 

485 c for c in constraints if c.type == ConstraintType.TEMPORAL 

486 ] 

487 if temporal_constraints and metrics.temporal_diversity < 0.5: 

488 # Extract years from constraints 

489 years_needed = [] 

490 for c in temporal_constraints: 

491 year_match = re.search(r"\b(19\d{2}|20\d{2})\b", c.value) 

492 if year_match: 492 ↛ 490line 492 didn't jump to line 490 because the condition on line 492 was always true

493 years_needed.append(year_match.group(1)) 

494 

495 if years_needed: 495 ↛ 499line 495 didn't jump to line 499 because the condition on line 495 was always true

496 gaps["temporal"] = f"{min(years_needed)}-{max(years_needed)}" 

497 

498 # Geographic gaps 

499 location_constraints = [ 

500 c for c in constraints if c.type == ConstraintType.LOCATION 

501 ] 

502 if location_constraints and metrics.geographic_diversity < 0.5: 

503 locations_needed = [c.value for c in location_constraints] 

504 gaps["geographic"] = locations_needed 

505 

506 # Credibility gaps 

507 high_cred_ratio = metrics.credibility_distribution.get("high", 0) / max( 

508 sum(metrics.credibility_distribution.values()), 1 

509 ) 

510 if high_cred_ratio < 0.3: 

511 gaps["credibility"] = True 

512 

513 return gaps 

514 

515 def _get_source_type_modifier(self, source_type: str) -> str: 

516 """Get search modifier for specific source type.""" 

517 modifiers = { 

518 "academic": "site:.edu OR site:scholar.google.com OR site:pubmed.gov", 

519 "government": "site:.gov OR site:.mil", 

520 "news": 'news OR "press release" OR journalism', 

521 "wiki": "site:wikipedia.org OR wiki", 

522 "blog": 'blog OR "posted by" OR comments', 

523 } 

524 return modifiers.get(source_type, "") 

525 

526 def _get_region_domain(self, region: str) -> str: 

527 """Get domain suffix for a region.""" 

528 region_domains = { 

529 "United States": ".us OR .com", 

530 "United Kingdom": ".uk", 

531 "Canada": ".ca", 

532 "Australia": ".au", 

533 "Europe": ".eu OR .de OR .fr", 

534 } 

535 return region_domains.get(region, ".com") 

536 

537 def select_diverse_sources( 

538 self, available_sources: List[str], target_count: int 

539 ) -> List[str]: 

540 """Select a diverse subset of sources.""" 

541 if len(available_sources) <= target_count: 

542 return available_sources 

543 

544 # Score each source based on diversity contribution 

545 source_scores = [] 

546 

547 for source in available_sources: 

548 profile = self.source_profiles.get(source) or self.analyze_source( 

549 source 

550 ) 

551 

552 # Calculate diversity score 

553 score = ( 

554 self.type_priorities.get(profile.source_type, 0.5) * 0.4 

555 + profile.credibility_score * 0.3 

556 + (1.0 if profile.specialties else 0.5) * 0.15 

557 + (1.0 if profile.temporal_coverage else 0.5) * 0.15 

558 ) 

559 

560 source_scores.append((source, score, profile)) 

561 

562 # Sort by score 

563 source_scores.sort(key=lambda x: x[1], reverse=True) 

564 

565 # Select diverse sources 

566 selected = [] 

567 selected_types = set() 

568 selected_geos = set() 

569 

570 for source, score, profile in source_scores: 570 ↛ 586line 570 didn't jump to line 586 because the loop on line 570 didn't complete

571 # Prioritize diversity 

572 is_diverse = profile.source_type not in selected_types or ( 

573 profile.geographic_focus 

574 and profile.geographic_focus not in selected_geos 

575 ) 

576 

577 if is_diverse or len(selected) < target_count // 2: 

578 selected.append(source) 

579 selected_types.add(profile.source_type) 

580 if profile.geographic_focus: 

581 selected_geos.add(profile.geographic_focus) 

582 

583 if len(selected) >= target_count: 

584 break 

585 

586 return selected 

587 

588 def track_source_effectiveness( 

589 self, source: str, evidence_quality: float, constraint_satisfied: bool 

590 ): 

591 """Track how effective a source is for evidence gathering.""" 

592 profile = self.source_profiles.get(source) 

593 if not profile: 

594 return 

595 

596 # Update profile based on effectiveness 

597 if constraint_satisfied: 

598 # Boost credibility slightly 

599 profile.credibility_score = min( 

600 profile.credibility_score * 1.05, 1.0 

601 ) 

602 

603 # Track in metadata 

604 if "effectiveness" not in profile.__dict__: 604 ↛ 607line 604 didn't jump to line 607 because the condition on line 604 was always true

605 profile.effectiveness = [] 

606 

607 profile.effectiveness.append( 

608 { 

609 "timestamp": datetime.now(UTC), 

610 "evidence_quality": evidence_quality, 

611 "constraint_satisfied": constraint_satisfied, 

612 } 

613 )