Coverage for src/local_deep_research/web/services/research_sources_service.py: 83%

189 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Service for managing research sources/resources in the database. 

3 

4This service handles saving and retrieving sources from research 

5in a proper relational way using the ResearchResource table. 

6""" 

7 

8from typing import List, Dict, Any, Optional 

9from datetime import datetime, UTC 

10from loguru import logger 

11from sqlalchemy import or_ 

12from sqlalchemy.exc import IntegrityError 

13from sqlalchemy.orm import load_only 

14 

15from ...database.models import ( 

16 Journal, 

17 Paper, 

18 PaperAppearance, 

19 ResearchResource, 

20 ResearchHistory, 

21) 

22from ...database.session_context import get_user_db_session 

23from ...utilities.citation_normalizer import normalize_citation 

24 

25 

26class ResearchSourcesService: 

27 """Service for managing research sources in the database.""" 

28 

29 @staticmethod 

30 def save_research_sources( 

31 research_id: str, 

32 sources: List[Dict[str, Any]], 

33 username: Optional[str] = None, 

34 ) -> int: 

35 """ 

36 Save sources from research to the ResearchResource table. 

37 

38 Args: 

39 research_id: The UUID of the research 

40 sources: List of source dictionaries with url, title, snippet, etc. 

41 username: Username for database access 

42 

43 Returns: 

44 Number of sources saved 

45 """ 

46 if not sources: 

47 logger.info(f"No sources to save for research {research_id}") 

48 return 0 

49 

50 saved_count = 0 

51 # Failed-source counter. The per-source try/except below catches 

52 # broad exceptions to keep one bad source from killing the batch, 

53 # but without this counter the caller had no way to distinguish 

54 # "all N saved" from "some silently dropped". Emitted in the 

55 # final log line so admins can spot save-failure trends. 

56 failed_count = 0 

57 

58 try: 

59 with get_user_db_session(username) as db_session: 

60 # First check if resources already exist for this research 

61 existing = ( 

62 db_session.query(ResearchResource) 

63 .filter_by(research_id=research_id) 

64 .count() 

65 ) 

66 

67 if existing > 0: 

68 logger.info( 

69 f"Research {research_id} already has {existing} resources, skipping save" 

70 ) 

71 return int(existing) 

72 

73 # Save each source as a ResearchResource. 

74 # Each source runs inside a SAVEPOINT so a per-source 

75 # failure can be rolled back cleanly without losing 

76 # any previously saved sources in this batch. 

77 # Per-batch memoization: container_title → journal_id 

78 # avoids redundant Journal lookups when multiple sources 

79 # share the same venue (common in topic-focused searches). 

80 journal_id_cache: Dict[Optional[str], Optional[int]] = {} 

81 for source in sources: 

82 sp = None 

83 try: 

84 # Extract fields from various possible formats 

85 url = source.get("url", "") or source.get("link", "") 

86 title = source.get("title", "") or source.get( 

87 "name", "" 

88 ) 

89 snippet = ( 

90 source.get("snippet", "") 

91 or source.get("content_preview", "") 

92 or source.get("description", "") 

93 ) 

94 source_type = source.get("source_type", "web") 

95 

96 # Skip if no URL 

97 if not url: 

98 continue 

99 

100 # Start savepoint for this source — any rollback 

101 # inside this block (including the IntegrityError 

102 # retry path below) only affects this source. 

103 sp = db_session.begin_nested() 

104 

105 # Create resource record. 

106 # Sanitize the source dict before embedding it in 

107 # resource_metadata — raw engine dicts can contain 

108 # non-JSON-serializable values (nested objects, 

109 # numpy types, affiliation sub-dicts, etc.) which 

110 # would crash json.dumps() at flush time. 

111 safe_source = _json_safe(source) 

112 resource = ResearchResource( 

113 research_id=research_id, 

114 title=title or "Untitled", 

115 url=url, 

116 content_preview=snippet[:1000] 

117 if snippet 

118 else None, # Limit preview length 

119 source_type=source_type, 

120 resource_metadata={ 

121 "added_at": datetime.now(UTC).isoformat(), 

122 "original_data": safe_source, 

123 }, 

124 created_at=datetime.now(UTC).isoformat(), 

125 ) 

126 

127 db_session.add(resource) 

128 db_session.flush() # Get resource.id for FK 

129 

130 # Create or reuse Paper for academic sources 

131 citation_fields = normalize_citation(source) 

132 if citation_fields: 

133 source_engine = citation_fields.pop( 

134 "source_engine", None 

135 ) 

136 # Try to link to existing Journal record 

137 # (container_title stays in citation_fields so 

138 # it ends up in the metadata blob for citation 

139 # export). Memoized per batch to avoid repeat 

140 # lookups for the same venue. 

141 ct = citation_fields.get("container_title") 

142 if ct in journal_id_cache: 

143 journal_id = journal_id_cache[ct] 

144 else: 

145 journal_id = _resolve_journal_id(db_session, ct) 

146 journal_id_cache[ct] = journal_id 

147 

148 # Separate indexed columns from metadata blob. 

149 # Only doi/arxiv_id/pmid/journal_id/ 

150 # container_title/year are real columns on 

151 # Paper; everything else is bundled into 

152 # the metadata JSON blob. Quality is NOT 

153 # stored per-Paper — the dashboard resolves 

154 # it live (Tier 4: journals.quality via 

155 # container_title lookup; Tier 1-3: bundled 

156 # reference DB) so a re-scored journal 

157 # propagates automatically. 

158 # container_title: prefer the filter's 

159 # cleaned matched name (what actually keyed 

160 # the successful score); fall back to the raw 

161 # CSL container_title if the filter didn't 

162 # run (e.g. journal_reputation disabled). 

163 # 

164 # .pop() removes it from citation_fields so it 

165 # doesn't end up duplicated in paper_metadata 

166 # JSON. The Paper column is the sole source 

167 # of truth; CSL-JSON export already captured 

168 # the raw value inside citation_fields[ 

169 # "csl_json"] during normalize_citation. 

170 ct_raw = citation_fields.pop( 

171 "container_title", None 

172 ) 

173 ct_matched = ( 

174 source.get("journal_name_matched") or ct_raw 

175 ) 

176 if ct_matched and len(ct_matched) > 500: 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true

177 logger.debug( 

178 f"Truncating container_title to 500 " 

179 f"chars: {ct_matched[:80]}..." 

180 ) 

181 ct_matched = ct_matched[:500] 

182 # `year` intentionally stays in citation_fields 

183 # (JSON blob) AND is copied to the indexed column. 

184 # The JSON blob remains the CSL-JSON source of 

185 # truth; the column is a denormalized index 

186 # surface for dashboard year queries. 

187 indexed = { 

188 "doi": citation_fields.pop("doi", None), 

189 "arxiv_id": citation_fields.pop( 

190 "arxiv_id", None 

191 ), 

192 "pmid": citation_fields.pop("pmid", None), 

193 "journal_id": journal_id, 

194 "container_title": ct_matched, 

195 "year": citation_fields.get("year"), 

196 } 

197 

198 # Dedup: find existing paper by DOI/arxiv/pmid. 

199 # The UNIQUE constraints on doi/arxiv_id/pmid 

200 # prevent duplicates from concurrent writers, 

201 # but we still need to handle the race where 

202 # our SELECT missed and another writer's 

203 # INSERT succeeds first — catch IntegrityError 

204 # and re-query. 

205 paper = _find_existing_paper(db_session, indexed) 

206 if paper is not None: 

207 _merge_identifiers( 

208 paper, indexed, citation_fields 

209 ) 

210 else: 

211 paper = Paper( 

212 **indexed, 

213 paper_metadata=citation_fields or None, 

214 ) 

215 db_session.add(paper) 

216 try: 

217 db_session.flush() 

218 except IntegrityError: 

219 # Concurrent writer inserted same 

220 # paper. Roll back this SAVEPOINT 

221 # only (not the whole batch), then 

222 # restart a nested one and re-fetch 

223 # the existing row for merging. 

224 sp.rollback() 

225 sp = db_session.begin_nested() 

226 # After savepoint rollback we also 

227 # need to re-create the resource 

228 # since its flush was undone. 

229 resource = ResearchResource( 

230 research_id=research_id, 

231 title=title or "Untitled", 

232 url=url, 

233 content_preview=snippet[:1000] 

234 if snippet 

235 else None, 

236 source_type=source_type, 

237 resource_metadata={ 

238 "added_at": datetime.now( 

239 UTC 

240 ).isoformat(), 

241 "original_data": safe_source, 

242 }, 

243 created_at=datetime.now( 

244 UTC 

245 ).isoformat(), 

246 ) 

247 db_session.add(resource) 

248 db_session.flush() 

249 paper = _find_existing_paper( 

250 db_session, indexed 

251 ) 

252 if paper is None: 

253 # Truly unexpected — concurrent 

254 # writer's row is gone. 

255 raise 

256 _merge_identifiers( 

257 paper, indexed, citation_fields 

258 ) 

259 

260 # Link paper to this resource 

261 appearance = PaperAppearance( 

262 paper_id=paper.id, 

263 resource_id=resource.id, 

264 source_engine=source_engine, 

265 ) 

266 db_session.add(appearance) 

267 

268 # Commit the savepoint so this source's writes 

269 # persist even if a later source fails. 

270 sp.commit() 

271 saved_count += 1 

272 

273 except Exception: 

274 # Roll back just this source's savepoint; earlier 

275 # sources in the batch stay committed at the 

276 # outer transaction level. 

277 if sp is not None and sp.is_active: 277 ↛ 279line 277 didn't jump to line 279 because the condition on line 277 was always true

278 sp.rollback() 

279 failed_count += 1 

280 logger.exception( 

281 f"Failed to save source {source.get('url', 'unknown')}" 

282 ) 

283 continue 

284 

285 # Commit all resources 

286 if saved_count > 0: 286 ↛ 299line 286 didn't jump to line 299 because the condition on line 286 was always true

287 db_session.commit() 

288 if failed_count > 0: 

289 logger.warning( 

290 f"Saved {saved_count} sources for research " 

291 f"{research_id}{failed_count} source(s) " 

292 f"failed and were skipped (see earlier " 

293 f"ERROR logs for per-source stack traces)" 

294 ) 

295 else: 

296 logger.info( 

297 f"Saved {saved_count} sources for research {research_id}" 

298 ) 

299 elif failed_count > 0: 

300 logger.warning( 

301 f"No sources saved for research {research_id}" 

302 f"all {failed_count} sources in the batch failed " 

303 f"(see earlier ERROR logs for per-source stack " 

304 f"traces)" 

305 ) 

306 

307 except Exception: 

308 logger.exception("Error saving research sources") 

309 raise 

310 

311 return saved_count 

312 

313 @staticmethod 

314 def get_research_sources( 

315 research_id: str, username: Optional[str] = None 

316 ) -> List[Dict[str, Any]]: 

317 """ 

318 Get all sources for a research from the database. 

319 

320 Args: 

321 research_id: The UUID of the research 

322 username: Username for database access 

323 

324 Returns: 

325 List of source dictionaries 

326 """ 

327 sources = [] 

328 

329 try: 

330 with get_user_db_session(username) as db_session: 

331 resources = ( 

332 db_session.query(ResearchResource) 

333 .filter_by(research_id=research_id) 

334 .order_by(ResearchResource.id.asc()) 

335 .all() 

336 ) 

337 

338 for resource in resources: 

339 sources.append( 

340 { 

341 "id": resource.id, 

342 "url": resource.url, 

343 "title": resource.title, 

344 "snippet": resource.content_preview, 

345 "content_preview": resource.content_preview, 

346 "source_type": resource.source_type, 

347 "metadata": resource.resource_metadata or {}, 

348 "created_at": resource.created_at, 

349 } 

350 ) 

351 

352 logger.info( 

353 f"Retrieved {len(sources)} sources for research {research_id}" 

354 ) 

355 

356 except Exception: 

357 logger.exception("Error retrieving research sources") 

358 raise 

359 

360 return sources 

361 

362 @staticmethod 

363 def update_research_with_sources( 

364 research_id: str, 

365 all_links_of_system: List[Dict[str, Any]], 

366 username: Optional[str] = None, 

367 ) -> bool: 

368 """ 

369 Update a completed research with its sources. 

370 This should be called when research completes. 

371 

372 Args: 

373 research_id: The UUID of the research 

374 all_links_of_system: List of all sources found during research 

375 username: Username for database access 

376 

377 Returns: 

378 True if successful 

379 """ 

380 try: 

381 # Save sources to ResearchResource table 

382 saved_count = ResearchSourcesService.save_research_sources( 

383 research_id, all_links_of_system, username 

384 ) 

385 

386 # Also update the research metadata to include source count 

387 with get_user_db_session(username) as db_session: 

388 research = ( 

389 db_session.query(ResearchHistory) 

390 .filter_by(id=research_id) 

391 .first() 

392 ) 

393 

394 if research: 

395 if not research.research_meta: 395 ↛ 399line 395 didn't jump to line 399 because the condition on line 395 was always true

396 research.research_meta = {} 

397 

398 # Update metadata with source information 

399 research.research_meta["sources_count"] = saved_count 

400 research.research_meta["has_sources"] = saved_count > 0 

401 

402 db_session.commit() 

403 logger.info( 

404 f"Updated research {research_id} with {saved_count} sources" 

405 ) 

406 return True 

407 logger.warning( 

408 f"Research {research_id} not found for source update" 

409 ) 

410 return False 

411 

412 except Exception: 

413 logger.exception("Error updating research with sources") 

414 return False 

415 

416 

417def _json_safe(value: Any, _depth: int = 0, _seen: Optional[set] = None) -> Any: 

418 """Recursively coerce a value into a JSON-serializable form. 

419 

420 Used before embedding arbitrary engine result dicts into JSON 

421 columns. Non-primitive values (datetime, date, set, tuple, 

422 custom objects) are converted to strings or dropped. This is a 

423 last-resort sanitizer — callers should still prefer structured 

424 whitelisting (e.g., Paper.paper_metadata only stores known CSL 

425 fields). 

426 

427 Depth limit and cycle detection prevent RecursionError on 

428 pathological input (circular dict/list references). 

429 """ 

430 # Depth limit as a belt-and-braces guard 

431 if _depth > 32: 431 ↛ 432line 431 didn't jump to line 432 because the condition on line 431 was never true

432 return str(value) 

433 

434 # JSON primitives pass through unchanged 

435 if value is None or isinstance(value, (bool, int, float, str)): 

436 return value 

437 

438 # Container cycle detection via id() tracking 

439 if isinstance(value, (dict, list, tuple, set, frozenset)): 

440 if _seen is None: 

441 _seen = set() 

442 if id(value) in _seen: 442 ↛ 443line 442 didn't jump to line 443 because the condition on line 442 was never true

443 return "<circular>" 

444 _seen = _seen | {id(value)} 

445 

446 if isinstance(value, dict): 

447 return { 

448 str(k): _json_safe(v, _depth + 1, _seen) 

449 for k, v in value.items() 

450 if isinstance(k, (str, int, float, bool)) 

451 } 

452 if isinstance(value, (list, tuple, set, frozenset)): 

453 return [_json_safe(v, _depth + 1, _seen) for v in value] 

454 # datetime/date and anything else: coerce to string 

455 return str(value) 

456 

457 

458def _resolve_journal_id( 

459 db_session, container_title: Optional[str] 

460) -> Optional[int]: 

461 """Look up a Journal record by name. Returns journal.id or None. 

462 

463 The journal reputation filter writes Journal rows using the 

464 cleaned journal name as returned by its regex cleanup (NFKC- 

465 normalized, whitespace-stripped, but NOT lowercased). We match 

466 against that by applying the same NFKC+strip normalization here 

467 and using a case-insensitive comparison so mismatched capitalization 

468 in the container_title doesn't break the lookup. 

469 """ 

470 if not container_title: 

471 return None 

472 import unicodedata 

473 

474 name_norm = unicodedata.normalize("NFKC", container_title).strip() 

475 # Query name_lower, not func.lower(name): expression-wrapping the 

476 # indexed column forces a full scan. 

477 row = ( 

478 db_session.query(Journal.id) 

479 .filter(Journal.name_lower == name_norm.lower()) 

480 .first() 

481 ) 

482 return row[0] if row else None 

483 

484 

485def _find_existing_paper(db_session, fields: dict) -> Optional["Paper"]: 

486 """Find an existing Paper by any of DOI, arXiv ID, or PMID. 

487 

488 Issues a single OR-query across all provided identifiers so that a 

489 caller with multiple IDs doesn't miss dedup because the first one 

490 (e.g. DOI) is absent from the stored row but a later one (e.g. 

491 arXiv) would have matched. The previous waterfall short-circuited 

492 on the first non-null input and never tried the remaining IDs. 

493 

494 Uses load_only to skip the ``paper_metadata`` JSON blob on the 

495 dedup lookup — we only need the identifier columns. The blob is 

496 lazy-loaded if the caller later touches ``paper.paper_metadata``. 

497 """ 

498 id_only = load_only( 

499 Paper.id, 

500 Paper.doi, 

501 Paper.arxiv_id, 

502 Paper.pmid, 

503 Paper.journal_id, 

504 ) 

505 

506 conditions = [] 

507 doi = fields.get("doi") 

508 arxiv_id = fields.get("arxiv_id") 

509 pmid = fields.get("pmid") 

510 if doi: 

511 conditions.append(Paper.doi == doi) 

512 if arxiv_id: 

513 conditions.append(Paper.arxiv_id == arxiv_id) 

514 if pmid: 514 ↛ 515line 514 didn't jump to line 515 because the condition on line 514 was never true

515 conditions.append(Paper.pmid == pmid) 

516 

517 if not conditions: 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true

518 return None 

519 

520 matches = ( 

521 db_session.query(Paper).options(id_only).filter(or_(*conditions)).all() 

522 ) 

523 

524 if not matches: 

525 return None 

526 if len(matches) == 1: 526 ↛ 532line 526 didn't jump to line 532 because the condition on line 526 was always true

527 return matches[0] 

528 

529 # Multiple distinct rows matched different identifiers of the same 

530 # incoming record. This indicates a prior mismerge; deterministic 

531 # tie-break on oldest (lowest) id so repeat runs don't oscillate. 

532 winner = min(matches, key=lambda p: p.id) 

533 logger.warning( 

534 f"Paper dedup conflict on {sorted(k for k in ('doi', 'arxiv_id', 'pmid') if fields.get(k))}: " 

535 f"{len(matches)} rows (ids {sorted(m.id for m in matches)}); " 

536 f"using id {winner.id}. Manual review recommended." 

537 ) 

538 return winner 

539 

540 

541def _merge_identifiers(paper: "Paper", indexed: dict, metadata: dict) -> None: 

542 """Enrich an existing Paper with identifiers from a new encounter. 

543 

544 E.g., an ArXiv paper later found via OpenAlex gains a DOI. 

545 

546 Args: 

547 paper: The existing Paper row to enrich. 

548 indexed: New values for the real columns (doi, arxiv_id, 

549 pmid, journal_id). Only applied if the column is 

550 currently empty. 

551 metadata: Additional bibliographic fields (pmcid, authors, 

552 csl_json, etc.) to merge into paper.paper_metadata. Only 

553 keys that aren't already present in the existing blob 

554 are added — first write wins, to preserve the original 

555 enrichment. 

556 """ 

557 # Indexed columns — first write wins. Avoids churning rows when 

558 # the same paper turns up across many research sessions with 

559 # slightly different scoring / cleaned names. 

560 if indexed.get("doi") and not paper.doi: 

561 paper.doi = indexed["doi"] 

562 if indexed.get("arxiv_id") and not paper.arxiv_id: 562 ↛ 563line 562 didn't jump to line 563 because the condition on line 562 was never true

563 paper.arxiv_id = indexed["arxiv_id"] 

564 if indexed.get("pmid") and not paper.pmid: 564 ↛ 565line 564 didn't jump to line 565 because the condition on line 564 was never true

565 paper.pmid = indexed["pmid"] 

566 if indexed.get("journal_id") and not paper.journal_id: 566 ↛ 567line 566 didn't jump to line 567 because the condition on line 566 was never true

567 paper.journal_id = indexed["journal_id"] 

568 if indexed.get("container_title") and not paper.container_title: 

569 paper.container_title = indexed["container_title"] 

570 if indexed.get("year") is not None and paper.year is None: 570 ↛ 571line 570 didn't jump to line 571 because the condition on line 570 was never true

571 paper.year = indexed["year"] 

572 

573 # Metadata blob — merge any missing keys. 

574 # IMPORTANT: we must build a NEW dict and reassign the attribute so 

575 # that SQLAlchemy's plain JSON column marks it dirty. In-place 

576 # mutation of the existing dict is not detected without 

577 # MutableDict.as_mutable() — which this column does not use, to 

578 # stay consistent with other JSON columns in the project. 

579 if metadata: 

580 existing = dict(paper.paper_metadata) if paper.paper_metadata else {} 

581 changed = False 

582 for key, value in metadata.items(): 

583 if value is not None and key not in existing: 583 ↛ 584line 583 didn't jump to line 584 because the condition on line 583 was never true

584 existing[key] = value 

585 changed = True 

586 if changed: 586 ↛ 587line 586 didn't jump to line 587 because the condition on line 586 was never true

587 paper.paper_metadata = existing or None