Coverage for src/local_deep_research/web/services/research_sources_service.py: 83%
189 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Service for managing research sources/resources in the database.
4This service handles saving and retrieving sources from research
5in a proper relational way using the ResearchResource table.
6"""
8from typing import List, Dict, Any, Optional
9from datetime import datetime, UTC
10from loguru import logger
11from sqlalchemy import or_
12from sqlalchemy.exc import IntegrityError
13from sqlalchemy.orm import load_only
15from ...database.models import (
16 Journal,
17 Paper,
18 PaperAppearance,
19 ResearchResource,
20 ResearchHistory,
21)
22from ...database.session_context import get_user_db_session
23from ...utilities.citation_normalizer import normalize_citation
26class ResearchSourcesService:
27 """Service for managing research sources in the database."""
29 @staticmethod
30 def save_research_sources(
31 research_id: str,
32 sources: List[Dict[str, Any]],
33 username: Optional[str] = None,
34 ) -> int:
35 """
36 Save sources from research to the ResearchResource table.
38 Args:
39 research_id: The UUID of the research
40 sources: List of source dictionaries with url, title, snippet, etc.
41 username: Username for database access
43 Returns:
44 Number of sources saved
45 """
46 if not sources:
47 logger.info(f"No sources to save for research {research_id}")
48 return 0
50 saved_count = 0
51 # Failed-source counter. The per-source try/except below catches
52 # broad exceptions to keep one bad source from killing the batch,
53 # but without this counter the caller had no way to distinguish
54 # "all N saved" from "some silently dropped". Emitted in the
55 # final log line so admins can spot save-failure trends.
56 failed_count = 0
58 try:
59 with get_user_db_session(username) as db_session:
60 # First check if resources already exist for this research
61 existing = (
62 db_session.query(ResearchResource)
63 .filter_by(research_id=research_id)
64 .count()
65 )
67 if existing > 0:
68 logger.info(
69 f"Research {research_id} already has {existing} resources, skipping save"
70 )
71 return int(existing)
73 # Save each source as a ResearchResource.
74 # Each source runs inside a SAVEPOINT so a per-source
75 # failure can be rolled back cleanly without losing
76 # any previously saved sources in this batch.
77 # Per-batch memoization: container_title → journal_id
78 # avoids redundant Journal lookups when multiple sources
79 # share the same venue (common in topic-focused searches).
80 journal_id_cache: Dict[Optional[str], Optional[int]] = {}
81 for source in sources:
82 sp = None
83 try:
84 # Extract fields from various possible formats
85 url = source.get("url", "") or source.get("link", "")
86 title = source.get("title", "") or source.get(
87 "name", ""
88 )
89 snippet = (
90 source.get("snippet", "")
91 or source.get("content_preview", "")
92 or source.get("description", "")
93 )
94 source_type = source.get("source_type", "web")
96 # Skip if no URL
97 if not url:
98 continue
100 # Start savepoint for this source — any rollback
101 # inside this block (including the IntegrityError
102 # retry path below) only affects this source.
103 sp = db_session.begin_nested()
105 # Create resource record.
106 # Sanitize the source dict before embedding it in
107 # resource_metadata — raw engine dicts can contain
108 # non-JSON-serializable values (nested objects,
109 # numpy types, affiliation sub-dicts, etc.) which
110 # would crash json.dumps() at flush time.
111 safe_source = _json_safe(source)
112 resource = ResearchResource(
113 research_id=research_id,
114 title=title or "Untitled",
115 url=url,
116 content_preview=snippet[:1000]
117 if snippet
118 else None, # Limit preview length
119 source_type=source_type,
120 resource_metadata={
121 "added_at": datetime.now(UTC).isoformat(),
122 "original_data": safe_source,
123 },
124 created_at=datetime.now(UTC).isoformat(),
125 )
127 db_session.add(resource)
128 db_session.flush() # Get resource.id for FK
130 # Create or reuse Paper for academic sources
131 citation_fields = normalize_citation(source)
132 if citation_fields:
133 source_engine = citation_fields.pop(
134 "source_engine", None
135 )
136 # Try to link to existing Journal record
137 # (container_title stays in citation_fields so
138 # it ends up in the metadata blob for citation
139 # export). Memoized per batch to avoid repeat
140 # lookups for the same venue.
141 ct = citation_fields.get("container_title")
142 if ct in journal_id_cache:
143 journal_id = journal_id_cache[ct]
144 else:
145 journal_id = _resolve_journal_id(db_session, ct)
146 journal_id_cache[ct] = journal_id
148 # Separate indexed columns from metadata blob.
149 # Only doi/arxiv_id/pmid/journal_id/
150 # container_title/year are real columns on
151 # Paper; everything else is bundled into
152 # the metadata JSON blob. Quality is NOT
153 # stored per-Paper — the dashboard resolves
154 # it live (Tier 4: journals.quality via
155 # container_title lookup; Tier 1-3: bundled
156 # reference DB) so a re-scored journal
157 # propagates automatically.
158 # container_title: prefer the filter's
159 # cleaned matched name (what actually keyed
160 # the successful score); fall back to the raw
161 # CSL container_title if the filter didn't
162 # run (e.g. journal_reputation disabled).
163 #
164 # .pop() removes it from citation_fields so it
165 # doesn't end up duplicated in paper_metadata
166 # JSON. The Paper column is the sole source
167 # of truth; CSL-JSON export already captured
168 # the raw value inside citation_fields[
169 # "csl_json"] during normalize_citation.
170 ct_raw = citation_fields.pop(
171 "container_title", None
172 )
173 ct_matched = (
174 source.get("journal_name_matched") or ct_raw
175 )
176 if ct_matched and len(ct_matched) > 500: 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true
177 logger.debug(
178 f"Truncating container_title to 500 "
179 f"chars: {ct_matched[:80]}..."
180 )
181 ct_matched = ct_matched[:500]
182 # `year` intentionally stays in citation_fields
183 # (JSON blob) AND is copied to the indexed column.
184 # The JSON blob remains the CSL-JSON source of
185 # truth; the column is a denormalized index
186 # surface for dashboard year queries.
187 indexed = {
188 "doi": citation_fields.pop("doi", None),
189 "arxiv_id": citation_fields.pop(
190 "arxiv_id", None
191 ),
192 "pmid": citation_fields.pop("pmid", None),
193 "journal_id": journal_id,
194 "container_title": ct_matched,
195 "year": citation_fields.get("year"),
196 }
198 # Dedup: find existing paper by DOI/arxiv/pmid.
199 # The UNIQUE constraints on doi/arxiv_id/pmid
200 # prevent duplicates from concurrent writers,
201 # but we still need to handle the race where
202 # our SELECT missed and another writer's
203 # INSERT succeeds first — catch IntegrityError
204 # and re-query.
205 paper = _find_existing_paper(db_session, indexed)
206 if paper is not None:
207 _merge_identifiers(
208 paper, indexed, citation_fields
209 )
210 else:
211 paper = Paper(
212 **indexed,
213 paper_metadata=citation_fields or None,
214 )
215 db_session.add(paper)
216 try:
217 db_session.flush()
218 except IntegrityError:
219 # Concurrent writer inserted same
220 # paper. Roll back this SAVEPOINT
221 # only (not the whole batch), then
222 # restart a nested one and re-fetch
223 # the existing row for merging.
224 sp.rollback()
225 sp = db_session.begin_nested()
226 # After savepoint rollback we also
227 # need to re-create the resource
228 # since its flush was undone.
229 resource = ResearchResource(
230 research_id=research_id,
231 title=title or "Untitled",
232 url=url,
233 content_preview=snippet[:1000]
234 if snippet
235 else None,
236 source_type=source_type,
237 resource_metadata={
238 "added_at": datetime.now(
239 UTC
240 ).isoformat(),
241 "original_data": safe_source,
242 },
243 created_at=datetime.now(
244 UTC
245 ).isoformat(),
246 )
247 db_session.add(resource)
248 db_session.flush()
249 paper = _find_existing_paper(
250 db_session, indexed
251 )
252 if paper is None:
253 # Truly unexpected — concurrent
254 # writer's row is gone.
255 raise
256 _merge_identifiers(
257 paper, indexed, citation_fields
258 )
260 # Link paper to this resource
261 appearance = PaperAppearance(
262 paper_id=paper.id,
263 resource_id=resource.id,
264 source_engine=source_engine,
265 )
266 db_session.add(appearance)
268 # Commit the savepoint so this source's writes
269 # persist even if a later source fails.
270 sp.commit()
271 saved_count += 1
273 except Exception:
274 # Roll back just this source's savepoint; earlier
275 # sources in the batch stay committed at the
276 # outer transaction level.
277 if sp is not None and sp.is_active: 277 ↛ 279line 277 didn't jump to line 279 because the condition on line 277 was always true
278 sp.rollback()
279 failed_count += 1
280 logger.exception(
281 f"Failed to save source {source.get('url', 'unknown')}"
282 )
283 continue
285 # Commit all resources
286 if saved_count > 0: 286 ↛ 299line 286 didn't jump to line 299 because the condition on line 286 was always true
287 db_session.commit()
288 if failed_count > 0:
289 logger.warning(
290 f"Saved {saved_count} sources for research "
291 f"{research_id} — {failed_count} source(s) "
292 f"failed and were skipped (see earlier "
293 f"ERROR logs for per-source stack traces)"
294 )
295 else:
296 logger.info(
297 f"Saved {saved_count} sources for research {research_id}"
298 )
299 elif failed_count > 0:
300 logger.warning(
301 f"No sources saved for research {research_id} — "
302 f"all {failed_count} sources in the batch failed "
303 f"(see earlier ERROR logs for per-source stack "
304 f"traces)"
305 )
307 except Exception:
308 logger.exception("Error saving research sources")
309 raise
311 return saved_count
313 @staticmethod
314 def get_research_sources(
315 research_id: str, username: Optional[str] = None
316 ) -> List[Dict[str, Any]]:
317 """
318 Get all sources for a research from the database.
320 Args:
321 research_id: The UUID of the research
322 username: Username for database access
324 Returns:
325 List of source dictionaries
326 """
327 sources = []
329 try:
330 with get_user_db_session(username) as db_session:
331 resources = (
332 db_session.query(ResearchResource)
333 .filter_by(research_id=research_id)
334 .order_by(ResearchResource.id.asc())
335 .all()
336 )
338 for resource in resources:
339 sources.append(
340 {
341 "id": resource.id,
342 "url": resource.url,
343 "title": resource.title,
344 "snippet": resource.content_preview,
345 "content_preview": resource.content_preview,
346 "source_type": resource.source_type,
347 "metadata": resource.resource_metadata or {},
348 "created_at": resource.created_at,
349 }
350 )
352 logger.info(
353 f"Retrieved {len(sources)} sources for research {research_id}"
354 )
356 except Exception:
357 logger.exception("Error retrieving research sources")
358 raise
360 return sources
362 @staticmethod
363 def update_research_with_sources(
364 research_id: str,
365 all_links_of_system: List[Dict[str, Any]],
366 username: Optional[str] = None,
367 ) -> bool:
368 """
369 Update a completed research with its sources.
370 This should be called when research completes.
372 Args:
373 research_id: The UUID of the research
374 all_links_of_system: List of all sources found during research
375 username: Username for database access
377 Returns:
378 True if successful
379 """
380 try:
381 # Save sources to ResearchResource table
382 saved_count = ResearchSourcesService.save_research_sources(
383 research_id, all_links_of_system, username
384 )
386 # Also update the research metadata to include source count
387 with get_user_db_session(username) as db_session:
388 research = (
389 db_session.query(ResearchHistory)
390 .filter_by(id=research_id)
391 .first()
392 )
394 if research:
395 if not research.research_meta: 395 ↛ 399line 395 didn't jump to line 399 because the condition on line 395 was always true
396 research.research_meta = {}
398 # Update metadata with source information
399 research.research_meta["sources_count"] = saved_count
400 research.research_meta["has_sources"] = saved_count > 0
402 db_session.commit()
403 logger.info(
404 f"Updated research {research_id} with {saved_count} sources"
405 )
406 return True
407 logger.warning(
408 f"Research {research_id} not found for source update"
409 )
410 return False
412 except Exception:
413 logger.exception("Error updating research with sources")
414 return False
417def _json_safe(value: Any, _depth: int = 0, _seen: Optional[set] = None) -> Any:
418 """Recursively coerce a value into a JSON-serializable form.
420 Used before embedding arbitrary engine result dicts into JSON
421 columns. Non-primitive values (datetime, date, set, tuple,
422 custom objects) are converted to strings or dropped. This is a
423 last-resort sanitizer — callers should still prefer structured
424 whitelisting (e.g., Paper.paper_metadata only stores known CSL
425 fields).
427 Depth limit and cycle detection prevent RecursionError on
428 pathological input (circular dict/list references).
429 """
430 # Depth limit as a belt-and-braces guard
431 if _depth > 32: 431 ↛ 432line 431 didn't jump to line 432 because the condition on line 431 was never true
432 return str(value)
434 # JSON primitives pass through unchanged
435 if value is None or isinstance(value, (bool, int, float, str)):
436 return value
438 # Container cycle detection via id() tracking
439 if isinstance(value, (dict, list, tuple, set, frozenset)):
440 if _seen is None:
441 _seen = set()
442 if id(value) in _seen: 442 ↛ 443line 442 didn't jump to line 443 because the condition on line 442 was never true
443 return "<circular>"
444 _seen = _seen | {id(value)}
446 if isinstance(value, dict):
447 return {
448 str(k): _json_safe(v, _depth + 1, _seen)
449 for k, v in value.items()
450 if isinstance(k, (str, int, float, bool))
451 }
452 if isinstance(value, (list, tuple, set, frozenset)):
453 return [_json_safe(v, _depth + 1, _seen) for v in value]
454 # datetime/date and anything else: coerce to string
455 return str(value)
458def _resolve_journal_id(
459 db_session, container_title: Optional[str]
460) -> Optional[int]:
461 """Look up a Journal record by name. Returns journal.id or None.
463 The journal reputation filter writes Journal rows using the
464 cleaned journal name as returned by its regex cleanup (NFKC-
465 normalized, whitespace-stripped, but NOT lowercased). We match
466 against that by applying the same NFKC+strip normalization here
467 and using a case-insensitive comparison so mismatched capitalization
468 in the container_title doesn't break the lookup.
469 """
470 if not container_title:
471 return None
472 import unicodedata
474 name_norm = unicodedata.normalize("NFKC", container_title).strip()
475 # Query name_lower, not func.lower(name): expression-wrapping the
476 # indexed column forces a full scan.
477 row = (
478 db_session.query(Journal.id)
479 .filter(Journal.name_lower == name_norm.lower())
480 .first()
481 )
482 return row[0] if row else None
485def _find_existing_paper(db_session, fields: dict) -> Optional["Paper"]:
486 """Find an existing Paper by any of DOI, arXiv ID, or PMID.
488 Issues a single OR-query across all provided identifiers so that a
489 caller with multiple IDs doesn't miss dedup because the first one
490 (e.g. DOI) is absent from the stored row but a later one (e.g.
491 arXiv) would have matched. The previous waterfall short-circuited
492 on the first non-null input and never tried the remaining IDs.
494 Uses load_only to skip the ``paper_metadata`` JSON blob on the
495 dedup lookup — we only need the identifier columns. The blob is
496 lazy-loaded if the caller later touches ``paper.paper_metadata``.
497 """
498 id_only = load_only(
499 Paper.id,
500 Paper.doi,
501 Paper.arxiv_id,
502 Paper.pmid,
503 Paper.journal_id,
504 )
506 conditions = []
507 doi = fields.get("doi")
508 arxiv_id = fields.get("arxiv_id")
509 pmid = fields.get("pmid")
510 if doi:
511 conditions.append(Paper.doi == doi)
512 if arxiv_id:
513 conditions.append(Paper.arxiv_id == arxiv_id)
514 if pmid: 514 ↛ 515line 514 didn't jump to line 515 because the condition on line 514 was never true
515 conditions.append(Paper.pmid == pmid)
517 if not conditions: 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true
518 return None
520 matches = (
521 db_session.query(Paper).options(id_only).filter(or_(*conditions)).all()
522 )
524 if not matches:
525 return None
526 if len(matches) == 1: 526 ↛ 532line 526 didn't jump to line 532 because the condition on line 526 was always true
527 return matches[0]
529 # Multiple distinct rows matched different identifiers of the same
530 # incoming record. This indicates a prior mismerge; deterministic
531 # tie-break on oldest (lowest) id so repeat runs don't oscillate.
532 winner = min(matches, key=lambda p: p.id)
533 logger.warning(
534 f"Paper dedup conflict on {sorted(k for k in ('doi', 'arxiv_id', 'pmid') if fields.get(k))}: "
535 f"{len(matches)} rows (ids {sorted(m.id for m in matches)}); "
536 f"using id {winner.id}. Manual review recommended."
537 )
538 return winner
541def _merge_identifiers(paper: "Paper", indexed: dict, metadata: dict) -> None:
542 """Enrich an existing Paper with identifiers from a new encounter.
544 E.g., an ArXiv paper later found via OpenAlex gains a DOI.
546 Args:
547 paper: The existing Paper row to enrich.
548 indexed: New values for the real columns (doi, arxiv_id,
549 pmid, journal_id). Only applied if the column is
550 currently empty.
551 metadata: Additional bibliographic fields (pmcid, authors,
552 csl_json, etc.) to merge into paper.paper_metadata. Only
553 keys that aren't already present in the existing blob
554 are added — first write wins, to preserve the original
555 enrichment.
556 """
557 # Indexed columns — first write wins. Avoids churning rows when
558 # the same paper turns up across many research sessions with
559 # slightly different scoring / cleaned names.
560 if indexed.get("doi") and not paper.doi:
561 paper.doi = indexed["doi"]
562 if indexed.get("arxiv_id") and not paper.arxiv_id: 562 ↛ 563line 562 didn't jump to line 563 because the condition on line 562 was never true
563 paper.arxiv_id = indexed["arxiv_id"]
564 if indexed.get("pmid") and not paper.pmid: 564 ↛ 565line 564 didn't jump to line 565 because the condition on line 564 was never true
565 paper.pmid = indexed["pmid"]
566 if indexed.get("journal_id") and not paper.journal_id: 566 ↛ 567line 566 didn't jump to line 567 because the condition on line 566 was never true
567 paper.journal_id = indexed["journal_id"]
568 if indexed.get("container_title") and not paper.container_title:
569 paper.container_title = indexed["container_title"]
570 if indexed.get("year") is not None and paper.year is None: 570 ↛ 571line 570 didn't jump to line 571 because the condition on line 570 was never true
571 paper.year = indexed["year"]
573 # Metadata blob — merge any missing keys.
574 # IMPORTANT: we must build a NEW dict and reassign the attribute so
575 # that SQLAlchemy's plain JSON column marks it dirty. In-place
576 # mutation of the existing dict is not detected without
577 # MutableDict.as_mutable() — which this column does not use, to
578 # stay consistent with other JSON columns in the project.
579 if metadata:
580 existing = dict(paper.paper_metadata) if paper.paper_metadata else {}
581 changed = False
582 for key, value in metadata.items():
583 if value is not None and key not in existing: 583 ↛ 584line 583 didn't jump to line 584 because the condition on line 583 was never true
584 existing[key] = value
585 changed = True
586 if changed: 586 ↛ 587line 586 didn't jump to line 587 because the condition on line 586 was never true
587 paper.paper_metadata = existing or None