Coverage for src / local_deep_research / research_library / services / library_service.py: 87%

353 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Library Management Service 

3 

4Handles querying and managing the downloaded document library: 

5- Search and filter documents 

6- Get statistics and analytics 

7- Manage collections and favorites 

8- Handle file operations 

9""" 

10 

11from pathlib import Path 

12from typing import Dict, List, Optional 

13from urllib.parse import urlparse 

14 

15from loguru import logger 

16from sqlalchemy import or_, func, case 

17from sqlalchemy.orm import aliased 

18 

19from ...constants import FILE_PATH_SENTINELS 

20from ...database.models.download_tracker import DownloadTracker 

21from ...database.models.library import ( 

22 Collection, 

23 Document, 

24 DocumentBlob, 

25 DocumentCollection, 

26 DocumentStatus, 

27) 

28from ...database.models.metrics import ResearchRating 

29from ...database.models.research import ResearchHistory, ResearchResource 

30from ...database.session_context import get_user_db_session 

31from ...security import PathValidator 

32from ...config.paths import get_library_directory 

33from ..utils import ( 

34 get_absolute_path_from_settings, 

35 get_url_hash, 

36 open_file_location, 

37) 

38 

39 

40class LibraryService: 

41 """Service for managing and querying the document library.""" 

42 

43 def __init__(self, username: str): 

44 """Initialize library service for a user.""" 

45 self.username = username 

46 

47 def _has_blob_in_db(self, session, document_id: str) -> bool: 

48 """Check if a PDF blob exists in the database for a document.""" 

49 return ( 

50 session.query(DocumentBlob.document_id) 

51 .filter_by(document_id=document_id) 

52 .first() 

53 is not None 

54 ) 

55 

56 def _get_safe_absolute_path(self, file_path: str) -> Optional[str]: 

57 """ 

58 Get the absolute path for a file, safely handling invalid paths. 

59 

60 Args: 

61 file_path: Relative file path from library root 

62 

63 Returns: 

64 Absolute path as string, or None if path is invalid/unsafe 

65 """ 

66 if not file_path or file_path in FILE_PATH_SENTINELS: 

67 return None 

68 abs_path = get_absolute_path_from_settings(file_path) 

69 return str(abs_path) if abs_path else None 

70 

71 def _is_arxiv_url(self, url: str) -> bool: 

72 """Check if URL is from arXiv domain.""" 

73 try: 

74 hostname = urlparse(url).hostname 

75 return bool( 

76 hostname 

77 and (hostname == "arxiv.org" or hostname.endswith(".arxiv.org")) 

78 ) 

79 except Exception: 

80 return False 

81 

82 def _is_pubmed_url(self, url: str) -> bool: 

83 """Check if URL is from PubMed or NCBI domains.""" 

84 try: 

85 parsed = urlparse(url) 

86 hostname = parsed.hostname 

87 if not hostname: 

88 return False 

89 

90 # Check for pubmed.ncbi.nlm.nih.gov 

91 if hostname == "pubmed.ncbi.nlm.nih.gov": 

92 return True 

93 

94 # Check for ncbi.nlm.nih.gov with PMC path 

95 if hostname == "ncbi.nlm.nih.gov" and "/pmc" in parsed.path: 

96 return True 

97 

98 # Check for pubmed in subdomain 

99 if "pubmed" in hostname: 

100 return True 

101 

102 return False 

103 except Exception: 

104 return False 

105 

106 def _apply_date_filter(self, query, model_class, date_filter: str): 

107 """Apply date range filter based on processed_at timestamp.""" 

108 from datetime import datetime, timedelta, timezone 

109 

110 now = datetime.now(timezone.utc) 

111 if date_filter == "today": 

112 cutoff = now.replace(hour=0, minute=0, second=0, microsecond=0) 

113 elif date_filter == "week": 

114 cutoff = now - timedelta(days=7) 

115 elif date_filter == "month": 

116 cutoff = now - timedelta(days=30) 

117 else: 

118 return query 

119 return query.filter(model_class.processed_at >= cutoff) 

120 

121 @staticmethod 

122 def _escape_like(value: str) -> str: 

123 """Escape SQL LIKE wildcards (%, _) and the escape char itself. 

124 

125 Without this, a value like ``my_journal`` would have ``_`` interpreted 

126 as a single-character wildcard, and ``%`` would match anything. 

127 Used together with ``escape="\\\\"`` on the .like()/.ilike() call. 

128 """ 

129 return ( 

130 value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") 

131 ) 

132 

133 def _apply_domain_filter(self, query, model_class, domain: str): 

134 """Apply domain filter to query for Document. 

135 

136 The dropdown is fully data-driven (populated from get_unique_domains), 

137 so the filter is a generic substring match against original_url. 

138 """ 

139 pattern = f"%{self._escape_like(domain)}%" 

140 return query.filter(model_class.original_url.like(pattern, escape="\\")) 

141 

142 def _apply_search_filter(self, query, model_class, search_query: str): 

143 """Apply search filter to query for Document.""" 

144 search_pattern = f"%{self._escape_like(search_query)}%" 

145 return query.filter( 

146 or_( 

147 model_class.title.ilike(search_pattern, escape="\\"), 

148 model_class.authors.ilike(search_pattern, escape="\\"), 

149 model_class.doi.ilike(search_pattern, escape="\\"), 

150 ResearchResource.title.ilike(search_pattern, escape="\\"), 

151 ) 

152 ) 

153 

154 def get_library_stats(self) -> Dict: 

155 """Get overall library statistics.""" 

156 with get_user_db_session(self.username) as session: 

157 # Get document counts 

158 total_docs = session.query(Document).count() 

159 total_pdfs = ( 

160 session.query(Document).filter_by(file_type="pdf").count() 

161 ) 

162 

163 # Get size stats 

164 size_result = session.query( 

165 func.sum(Document.file_size), 

166 func.avg(Document.file_size), 

167 ).first() 

168 

169 total_size = size_result[0] or 0 

170 avg_size = size_result[1] or 0 

171 

172 # Get research stats 

173 research_count = session.query( 

174 func.count(func.distinct(Document.research_id)) 

175 ).scalar() 

176 

177 # Get domain stats - count unique domains from URLs 

178 # Extract domain from original_url using SQL functions 

179 from sqlalchemy import case, func as sql_func 

180 

181 # Count unique domains by extracting them from URLs 

182 domain_subquery = session.query( 

183 sql_func.distinct( 

184 case( 

185 ( 

186 Document.original_url.like("%arxiv.org%"), 

187 "arxiv.org", 

188 ), 

189 ( 

190 Document.original_url.like("%pubmed%"), 

191 "pubmed", 

192 ), 

193 ( 

194 Document.original_url.like("%ncbi.nlm.nih.gov%"), 

195 "pubmed", 

196 ), 

197 else_="other", 

198 ) 

199 ) 

200 ).subquery() 

201 

202 domain_count = ( 

203 session.query(sql_func.count()) 

204 .select_from(domain_subquery) 

205 .scalar() 

206 ) 

207 

208 # Get download tracker stats 

209 pending_downloads = ( 

210 session.query(DownloadTracker) 

211 .filter_by(is_downloaded=False) 

212 .count() 

213 ) 

214 

215 return { 

216 "total_documents": total_docs, 

217 "total_pdfs": total_pdfs, 

218 "total_size_bytes": total_size, 

219 "total_size_mb": total_size / (1024 * 1024) 

220 if total_size 

221 else 0, 

222 "average_size_mb": avg_size / (1024 * 1024) if avg_size else 0, 

223 "research_sessions": research_count, 

224 "unique_domains": domain_count, 

225 "pending_downloads": pending_downloads, 

226 "storage_path": self._get_storage_path(), 

227 } 

228 

229 def count_documents( 

230 self, 

231 research_id: Optional[str] = None, 

232 domain: Optional[str] = None, 

233 collection_id: Optional[str] = None, 

234 date_filter: Optional[str] = None, 

235 ) -> int: 

236 """Count documents matching the given filters (for pagination).""" 

237 with get_user_db_session(self.username) as session: 

238 from ...database.library_init import get_default_library_id 

239 

240 if not collection_id: 

241 collection_id = get_default_library_id(self.username) 

242 

243 q = ( 

244 session.query(func.count(Document.id)) 

245 .join( 

246 DocumentCollection, 

247 Document.id == DocumentCollection.document_id, 

248 ) 

249 .filter(DocumentCollection.collection_id == collection_id) 

250 .filter(Document.status == "completed") 

251 ) 

252 

253 if research_id: 

254 q = q.filter(Document.research_id == research_id) 

255 if domain: 

256 q = self._apply_domain_filter(q, Document, domain) 

257 if date_filter: 

258 q = self._apply_date_filter(q, Document, date_filter) 

259 

260 return q.scalar() or 0 

261 

262 def get_documents( 

263 self, 

264 research_id: Optional[str] = None, 

265 domain: Optional[str] = None, 

266 file_type: Optional[str] = None, 

267 favorites_only: bool = False, 

268 search_query: Optional[str] = None, 

269 collection_id: Optional[str] = None, 

270 date_filter: Optional[str] = None, 

271 limit: int = 100, 

272 offset: int = 0, 

273 ) -> List[Dict]: 

274 """ 

275 Get documents with filtering options. 

276 

277 Returns enriched document information with research details. 

278 """ 

279 with get_user_db_session(self.username) as session: 

280 # Get default Library collection ID if not specified 

281 from ...database.library_init import get_default_library_id 

282 

283 if not collection_id: 

284 collection_id = get_default_library_id(self.username) 

285 

286 logger.info( 

287 f"[LibraryService] Getting documents for collection_id: {collection_id}, research_id: {research_id}, domain: {domain}" 

288 ) 

289 

290 all_documents = [] 

291 

292 # Step 1: subquery to get paginated document IDs. 

293 # Pagination and sorting happen here on Document alone, 

294 # avoiding non-determinism from outer-joining related tables. 

295 doc_subq = ( 

296 session.query(Document.id) 

297 .join( 

298 DocumentCollection, 

299 Document.id == DocumentCollection.document_id, 

300 ) 

301 .filter(DocumentCollection.collection_id == collection_id) 

302 ) 

303 

304 # Apply filters 

305 if research_id: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true

306 doc_subq = doc_subq.filter(Document.research_id == research_id) 

307 

308 if domain: 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true

309 doc_subq = self._apply_domain_filter(doc_subq, Document, domain) 

310 

311 if date_filter: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true

312 doc_subq = self._apply_date_filter( 

313 doc_subq, Document, date_filter 

314 ) 

315 

316 if file_type: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true

317 doc_subq = doc_subq.filter(Document.file_type == file_type) 

318 

319 if favorites_only: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true

320 doc_subq = doc_subq.filter(Document.favorite.is_(True)) 

321 

322 if search_query: 

323 # _apply_search_filter references ResearchResource.title, 

324 # so we must outerjoin it here. DISTINCT prevents fan-out 

325 # from the outerjoin duplicating Document IDs. 

326 doc_subq = doc_subq.outerjoin( 

327 ResearchResource, 

328 (Document.resource_id == ResearchResource.id) 

329 | (ResearchResource.document_id == Document.id), 

330 ).distinct() 

331 doc_subq = self._apply_search_filter( 

332 doc_subq, Document, search_query 

333 ) 

334 

335 # Filter to only completed documents 

336 doc_subq = doc_subq.filter(Document.status == "completed") 

337 

338 # Sort at SQL level (SQLite-safe NULL handling) 

339 doc_subq = doc_subq.order_by( 

340 case((Document.processed_at.isnot(None), 0), else_=1), 

341 Document.processed_at.desc(), 

342 ) 

343 

344 # Apply SQL-level pagination 

345 doc_subq = doc_subq.offset(offset).limit(limit) 

346 doc_id_subq = doc_subq.subquery() 

347 

348 # Step 2: join the paginated document IDs with related tables. 

349 # Use two separate outer joins for ResearchResource to avoid 

350 # the OR-condition join that can fan out to multiple rows: 

351 # - ResourceByFK: matched via Document.resource_id (primary FK) 

352 # - ResourceByDoc: matched via ResearchResource.document_id 

353 # We prefer ResourceByFK; fall back to ResourceByDoc in Python. 

354 ResourceByFK = aliased(ResearchResource) 

355 ResourceByDoc = aliased(ResearchResource) 

356 

357 query = ( 

358 session.query( 

359 Document, 

360 ResourceByFK, 

361 ResourceByDoc, 

362 ResearchHistory, 

363 DocumentCollection, 

364 ) 

365 .join(doc_id_subq, Document.id == doc_id_subq.c.id) 

366 .join( 

367 DocumentCollection, 

368 Document.id == DocumentCollection.document_id, 

369 ) 

370 .outerjoin( 

371 ResourceByFK, 

372 Document.resource_id == ResourceByFK.id, 

373 ) 

374 .outerjoin( 

375 ResourceByDoc, 

376 ResourceByDoc.document_id == Document.id, 

377 ) 

378 .outerjoin( 

379 ResearchHistory, 

380 Document.research_id == ResearchHistory.id, 

381 ) 

382 .filter(DocumentCollection.collection_id == collection_id) 

383 # Re-apply sort so final results are ordered 

384 .order_by( 

385 case((Document.processed_at.isnot(None), 0), else_=1), 

386 Document.processed_at.desc(), 

387 ) 

388 ) 

389 

390 # Execute query 

391 results = query.all() 

392 logger.info( 

393 f"[LibraryService] Found {len(results)} documents in collection {collection_id}" 

394 ) 

395 

396 # Batch-check blob existence to avoid N+1 queries 

397 doc_ids = [row[0].id for row in results] 

398 blob_ids = set() 

399 if doc_ids: 

400 blob_ids = { 

401 r[0] 

402 for r in session.query(DocumentBlob.document_id) 

403 .filter(DocumentBlob.document_id.in_(doc_ids)) 

404 .all() 

405 } 

406 

407 # Process results — deduplicate by doc.id since the ResourceByDoc 

408 # outer join can fan out when multiple ResearchResource rows 

409 # point to the same document via document_id. 

410 seen_doc_ids = set() 

411 for doc, res_by_fk, res_by_doc, research, doc_collection in results: 

412 if doc.id in seen_doc_ids: 

413 continue 

414 seen_doc_ids.add(doc.id) 

415 # Prefer the resource matched via Document.resource_id FK; 

416 # fall back to the one matched via ResearchResource.document_id. 

417 resource = res_by_fk or res_by_doc 

418 # Determine availability flags - use Document.file_path directly 

419 file_absolute_path = None 

420 if doc.file_path and doc.file_path not in FILE_PATH_SENTINELS: 

421 abs_path = get_absolute_path_from_settings(doc.file_path) 

422 if abs_path: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true

423 file_absolute_path = str(abs_path) 

424 

425 # Check if PDF is available (filesystem OR database) 

426 has_pdf = bool(file_absolute_path) 

427 if not has_pdf and doc.storage_mode == "database": 

428 has_pdf = doc.id in blob_ids 

429 has_text_db = bool(doc.text_content) # Text now in Document 

430 

431 # Use DocumentCollection from query results 

432 has_rag_indexed = ( 

433 doc_collection.indexed if doc_collection else False 

434 ) 

435 rag_chunk_count = ( 

436 doc_collection.chunk_count if doc_collection else 0 

437 ) 

438 

439 all_documents.append( 

440 { 

441 "id": doc.id, 

442 "resource_id": doc.resource_id, 

443 "research_id": doc.research_id, 

444 # Document info 

445 "document_title": doc.title 

446 or (resource.title if resource else doc.filename), 

447 "authors": doc.authors, 

448 "published_date": doc.published_date, 

449 "doi": doc.doi, 

450 "arxiv_id": doc.arxiv_id, 

451 "pmid": doc.pmid, 

452 # File info 

453 "file_path": doc.file_path, 

454 "file_absolute_path": file_absolute_path, 

455 "file_name": Path(doc.file_path).name 

456 if doc.file_path 

457 and doc.file_path not in FILE_PATH_SENTINELS 

458 else doc.filename, 

459 "file_size": doc.file_size, 

460 "file_type": doc.file_type, 

461 # URLs 

462 "original_url": doc.original_url, 

463 "domain": self._extract_domain(doc.original_url) 

464 if doc.original_url 

465 else "User Upload", 

466 # Status 

467 "download_status": doc.status or "completed", 

468 "downloaded_at": doc.processed_at.isoformat() 

469 if doc.processed_at 

470 else ( 

471 doc.uploaded_at.isoformat() 

472 if hasattr(doc, "uploaded_at") and doc.uploaded_at 

473 else None 

474 ), 

475 "favorite": doc.favorite 

476 if hasattr(doc, "favorite") 

477 else False, 

478 "tags": doc.tags if hasattr(doc, "tags") else [], 

479 # Research info (None for user uploads) 

480 "research_title": research.title or research.query[:80] 

481 if research 

482 else "User Upload", 

483 "research_query": research.query if research else None, 

484 "research_mode": research.mode if research else None, 

485 "research_date": research.created_at 

486 if research 

487 else None, 

488 # Classification flags 

489 "is_arxiv": self._is_arxiv_url(doc.original_url) 

490 if doc.original_url 

491 else False, 

492 "is_pubmed": self._is_pubmed_url(doc.original_url) 

493 if doc.original_url 

494 else False, 

495 "is_pdf": doc.file_type == "pdf", 

496 # Availability flags 

497 "has_pdf": has_pdf, 

498 "has_text_db": has_text_db, 

499 "has_rag_indexed": has_rag_indexed, 

500 "rag_chunk_count": rag_chunk_count, 

501 } 

502 ) 

503 

504 # Sorting and pagination are now handled at SQL level 

505 return all_documents 

506 

507 def get_all_collections(self) -> List[Dict]: 

508 """Get all collections with document and indexed document counts.""" 

509 with get_user_db_session(self.username) as session: 

510 # Query collections with document counts and indexed counts 

511 results = ( 

512 session.query( 

513 Collection, 

514 func.count(DocumentCollection.document_id).label( 

515 "document_count" 

516 ), 

517 func.count( 

518 case( 

519 ( 

520 DocumentCollection.indexed == True, # noqa: E712 

521 DocumentCollection.document_id, 

522 ), 

523 else_=None, 

524 ) 

525 ).label("indexed_document_count"), 

526 ) 

527 .outerjoin( 

528 DocumentCollection, 

529 Collection.id == DocumentCollection.collection_id, 

530 ) 

531 .group_by(Collection.id) 

532 .order_by(Collection.is_default.desc(), Collection.name) 

533 .all() 

534 ) 

535 

536 logger.info(f"[LibraryService] Found {len(results)} collections") 

537 

538 collections = [] 

539 for collection, doc_count, indexed_count in results: 

540 logger.debug( 

541 f"[LibraryService] Collection: {collection.name} (ID: {collection.id}), documents: {doc_count}, indexed: {indexed_count}" 

542 ) 

543 collections.append( 

544 { 

545 "id": collection.id, 

546 "name": collection.name, 

547 "description": collection.description, 

548 "is_default": collection.is_default, 

549 "document_count": doc_count or 0, 

550 "indexed_document_count": indexed_count or 0, 

551 } 

552 ) 

553 

554 return collections 

555 

556 def get_research_list_for_dropdown(self) -> List[Dict]: 

557 """Get minimal research session info for filter dropdowns. 

558 

559 Returns only id, title, and query — no joins or aggregates. 

560 """ 

561 with get_user_db_session(self.username) as session: 

562 results = ( 

563 session.query( 

564 ResearchHistory.id, 

565 ResearchHistory.title, 

566 ResearchHistory.query, 

567 ) 

568 .order_by(ResearchHistory.created_at.desc()) 

569 .all() 

570 ) 

571 return [ 

572 {"id": r.id, "title": r.title, "query": r.query} 

573 for r in results 

574 ] 

575 

576 def get_research_list_with_stats( 

577 self, 

578 limit: int = 0, 

579 offset: int = 0, 

580 ) -> List[Dict]: 

581 """Get research sessions with download statistics. 

582 

583 Args: 

584 limit: Maximum number of results (0 = no limit, for backwards compat). 

585 offset: Number of rows to skip. Only applied when limit > 0. 

586 """ 

587 with get_user_db_session(self.username) as session: 

588 # Query research sessions with resource counts 

589 query = ( 

590 session.query( 

591 ResearchHistory, 

592 func.count(func.distinct(ResearchResource.id)).label( 

593 "total_resources" 

594 ), 

595 func.count( 

596 func.distinct( 

597 case( 

598 (Document.status == "completed", Document.id), 

599 else_=None, 

600 ) 

601 ) 

602 ).label("downloaded_count"), 

603 func.count( 

604 func.distinct( 

605 case( 

606 ( 

607 ResearchResource.url.like("%.pdf") 

608 | ResearchResource.url.like("%arxiv.org%") 

609 | ResearchResource.url.like( 

610 "%ncbi.nlm.nih.gov/pmc%" 

611 ), 

612 ResearchResource.id, 

613 ), 

614 else_=None, 

615 ) 

616 ) 

617 ).label("downloadable_count"), 

618 ) 

619 .outerjoin( 

620 ResearchResource, 

621 ResearchHistory.id == ResearchResource.research_id, 

622 ) 

623 .outerjoin( 

624 Document, 

625 (ResearchResource.id == Document.resource_id) 

626 | (ResearchResource.document_id == Document.id), 

627 ) 

628 .group_by(ResearchHistory.id) 

629 .order_by(ResearchHistory.created_at.desc()) 

630 ) 

631 

632 # Apply SQL-level pagination when limit is set 

633 if limit > 0: 

634 query = query.offset(offset).limit(limit) 

635 

636 results = query.all() 

637 

638 # Preload all ratings to avoid N+1 queries 

639 research_ids = [r[0].id for r in results] 

640 all_ratings = ( 

641 session.query(ResearchRating) 

642 .filter(ResearchRating.research_id.in_(research_ids)) 

643 .all() 

644 if research_ids 

645 else [] 

646 ) 

647 ratings_by_research = {r.research_id: r for r in all_ratings} 

648 

649 # Batch domain queries to avoid N+1 (same pattern as ratings) 

650 domain_case = case( 

651 ( 

652 ResearchResource.url.like("%arxiv.org%"), 

653 "arxiv.org", 

654 ), 

655 (ResearchResource.url.like("%pubmed%"), "pubmed"), 

656 ( 

657 ResearchResource.url.like("%ncbi.nlm.nih.gov%"), 

658 "pubmed", 

659 ), 

660 else_="other", 

661 ) 

662 all_domains = ( 

663 session.query( 

664 ResearchResource.research_id, 

665 domain_case.label("domain"), 

666 func.count().label("count"), 

667 ) 

668 .filter(ResearchResource.research_id.in_(research_ids)) 

669 .group_by(ResearchResource.research_id, domain_case) 

670 .all() 

671 if research_ids 

672 else [] 

673 ) 

674 domains_by_research: Dict[str, list] = {} 

675 for rid, domain, count in all_domains: 

676 domains_by_research.setdefault(rid, []).append((domain, count)) 

677 

678 research_list = [] 

679 for ( 

680 research, 

681 total_resources, 

682 downloaded_count, 

683 downloadable_count, 

684 ) in results: 

685 # Get rating from preloaded dict 

686 rating = ratings_by_research.get(research.id) 

687 

688 # Get domain breakdown from preloaded dict 

689 domains = domains_by_research.get(research.id, []) 

690 

691 research_list.append( 

692 { 

693 "id": research.id, 

694 "title": research.title, 

695 "query": research.query, 

696 "mode": research.mode, 

697 "status": research.status, 

698 "created_at": research.created_at, 

699 "duration_seconds": research.duration_seconds, 

700 "total_resources": total_resources or 0, 

701 "downloaded_count": downloaded_count or 0, 

702 "downloadable_count": downloadable_count or 0, 

703 "rating": rating.rating if rating else None, 

704 "top_domains": [(d, c) for d, c in domains if d], 

705 } 

706 ) 

707 

708 return research_list 

709 

710 def get_download_manager_summary_stats(self) -> Dict: 

711 """Get aggregate download stats across ALL research sessions. 

712 

713 This is a lightweight query that only returns totals — used for 

714 the download-manager header so stats remain accurate regardless 

715 of which page the user is viewing. 

716 """ 

717 with get_user_db_session(self.username) as session: 

718 row = ( 

719 session.query( 

720 func.count(func.distinct(ResearchHistory.id)).label( 

721 "total_researches" 

722 ), 

723 func.count(func.distinct(ResearchResource.id)).label( 

724 "total_resources" 

725 ), 

726 func.count( 

727 func.distinct( 

728 case( 

729 (Document.status == "completed", Document.id), 

730 else_=None, 

731 ) 

732 ) 

733 ).label("downloaded_count"), 

734 func.count( 

735 func.distinct( 

736 case( 

737 ( 

738 ResearchResource.url.like("%.pdf") 

739 | ResearchResource.url.like("%arxiv.org%") 

740 | ResearchResource.url.like( 

741 "%ncbi.nlm.nih.gov/pmc%" 

742 ), 

743 ResearchResource.id, 

744 ), 

745 else_=None, 

746 ) 

747 ) 

748 ).label("downloadable_count"), 

749 ) 

750 .select_from(ResearchHistory) 

751 .outerjoin( 

752 ResearchResource, 

753 ResearchHistory.id == ResearchResource.research_id, 

754 ) 

755 .outerjoin( 

756 Document, 

757 (ResearchResource.id == Document.resource_id) 

758 | (ResearchResource.document_id == Document.id), 

759 ) 

760 .one() 

761 ) 

762 

763 total_researches = row.total_researches or 0 

764 total_resources = row.total_resources or 0 

765 downloaded = row.downloaded_count or 0 

766 downloadable = row.downloadable_count or 0 

767 

768 return { 

769 "total_researches": total_researches, 

770 "total_resources": total_resources, 

771 "already_downloaded": downloaded, 

772 "available_to_download": max(downloadable - downloaded, 0), 

773 } 

774 

775 def get_pdf_previews_batch( 

776 self, research_ids: List, limit_per_research: int = 10 

777 ) -> Dict[str, Dict]: 

778 """Batch-fetch PDF documents and domain breakdowns for multiple research sessions. 

779 

780 Returns a dict keyed by research_id with: 

781 - "pdf_sources": list of document dicts (capped at limit_per_research) 

782 - "domains": dict of domain -> {total, pdfs, downloaded} 

783 """ 

784 if not research_ids: 

785 return {} 

786 

787 with get_user_db_session(self.username) as session: 

788 results = ( 

789 session.query(Document, ResearchResource) 

790 .outerjoin( 

791 ResearchResource, 

792 (Document.resource_id == ResearchResource.id) 

793 | (ResearchResource.document_id == Document.id), 

794 ) 

795 .filter( 

796 Document.research_id.in_(research_ids), 

797 Document.file_type == "pdf", 

798 ) 

799 .order_by(Document.processed_at.desc()) 

800 .limit(limit_per_research * len(research_ids)) 

801 .all() 

802 ) 

803 

804 previews: Dict[str, Dict] = {} 

805 seen_doc_ids: set = set() 

806 for doc, resource in results: 

807 if doc.id in seen_doc_ids: 

808 continue 

809 seen_doc_ids.add(doc.id) 

810 

811 rid = doc.research_id 

812 if rid not in previews: 

813 previews[rid] = {"pdf_sources": [], "domains": {}} 

814 

815 entry = previews[rid] 

816 

817 # Domain breakdown (within the SQL LIMIT budget) 

818 domain = "unknown" 

819 if resource and resource.url: 

820 try: 

821 domain = urlparse(resource.url).netloc or "unknown" 

822 except Exception: 

823 logger.debug("Failed to parse resource URL for domain") 

824 elif doc.original_url: 824 ↛ 830line 824 didn't jump to line 830 because the condition on line 824 was always true

825 try: 

826 domain = urlparse(doc.original_url).netloc or "unknown" 

827 except Exception: 

828 logger.debug("Failed to parse document URL for domain") 

829 

830 if domain not in entry["domains"]: 

831 entry["domains"][domain] = { 

832 "total": 0, 

833 "pdfs": 0, 

834 "downloaded": 0, 

835 } 

836 entry["domains"][domain]["total"] += 1 

837 if doc.file_type == "pdf": 837 ↛ 839line 837 didn't jump to line 839 because the condition on line 837 was always true

838 entry["domains"][domain]["pdfs"] += 1 

839 if doc.status == "completed": 

840 entry["domains"][domain]["downloaded"] += 1 

841 

842 # PDF sources preview (capped) 

843 if len(entry["pdf_sources"]) < limit_per_research: 

844 title = "Untitled" 

845 if resource and resource.title: 

846 title = resource.title 

847 elif doc.filename: 847 ↛ 850line 847 didn't jump to line 850 because the condition on line 847 was always true

848 title = doc.filename 

849 

850 entry["pdf_sources"].append( 

851 { 

852 "document_title": title, 

853 "domain": domain, 

854 "file_type": doc.file_type, 

855 "download_status": doc.status or "unknown", 

856 } 

857 ) 

858 

859 return previews 

860 

861 def get_document_by_id(self, doc_id: str) -> Optional[Dict]: 

862 """ 

863 Get a specific document by its ID. 

864 

865 Returns document information with file path. 

866 """ 

867 with get_user_db_session(self.username) as session: 

868 # Find document - use outer joins to support both research downloads and user uploads 

869 result = ( 

870 session.query(Document, ResearchResource, ResearchHistory) 

871 .outerjoin( 

872 ResearchResource, 

873 (Document.resource_id == ResearchResource.id) 

874 | (ResearchResource.document_id == Document.id), 

875 ) 

876 .outerjoin( 

877 ResearchHistory, 

878 Document.research_id == ResearchHistory.id, 

879 ) 

880 .filter(Document.id == doc_id) 

881 .first() 

882 ) 

883 

884 if result: 

885 # Found document 

886 doc, resource, research = result 

887 

888 # Get RAG indexing status across all collections 

889 doc_collections = ( 

890 session.query(DocumentCollection, Collection) 

891 .join(Collection) 

892 .filter(DocumentCollection.document_id == doc_id) 

893 .all() 

894 ) 

895 

896 # Check if indexed in any collection 

897 has_rag_indexed = any( 

898 dc.indexed for dc, coll in doc_collections 

899 ) 

900 total_chunks = sum( 

901 dc.chunk_count for dc, coll in doc_collections if dc.indexed 

902 ) 

903 

904 # Build collections list 

905 collections_list = [ 

906 { 

907 "id": coll.id, 

908 "name": coll.name, 

909 "indexed": dc.indexed, 

910 "chunk_count": dc.chunk_count, 

911 } 

912 for dc, coll in doc_collections 

913 ] 

914 

915 # Calculate word count from text content 

916 word_count = ( 

917 len(doc.text_content.split()) if doc.text_content else 0 

918 ) 

919 

920 # Check if PDF is available (database OR filesystem) 

921 has_pdf = bool( 

922 doc.file_path and doc.file_path not in FILE_PATH_SENTINELS 

923 ) 

924 if not has_pdf and doc.storage_mode == "database": 

925 has_pdf = self._has_blob_in_db(session, doc.id) 

926 

927 return { 

928 "id": doc.id, 

929 "resource_id": doc.resource_id, 

930 "research_id": doc.research_id, 

931 "document_title": doc.title 

932 or (resource.title if resource else doc.filename), 

933 "original_url": doc.original_url 

934 or (resource.url if resource else None), 

935 "file_path": doc.file_path, 

936 "file_absolute_path": self._get_safe_absolute_path( 

937 doc.file_path 

938 ), 

939 "file_name": Path(doc.file_path).name 

940 if doc.file_path 

941 and doc.file_path not in FILE_PATH_SENTINELS 

942 else doc.filename, 

943 "file_size": doc.file_size, 

944 "file_type": doc.file_type, 

945 "mime_type": doc.mime_type, 

946 "domain": self._extract_domain(resource.url) 

947 if resource 

948 else "User Upload", 

949 "download_status": doc.status, 

950 "downloaded_at": doc.processed_at.isoformat() 

951 if doc.processed_at 

952 and hasattr(doc.processed_at, "isoformat") 

953 else str(doc.processed_at) 

954 if doc.processed_at 

955 else ( 

956 doc.uploaded_at.isoformat() 

957 if hasattr(doc, "uploaded_at") and doc.uploaded_at 

958 else None 

959 ), 

960 "favorite": doc.favorite 

961 if hasattr(doc, "favorite") 

962 else False, 

963 "tags": doc.tags if hasattr(doc, "tags") else [], 

964 "research_title": research.query[:100] 

965 if research 

966 else "User Upload", 

967 "research_created_at": research.created_at 

968 if research and isinstance(research.created_at, str) 

969 else research.created_at.isoformat() 

970 if research and research.created_at 

971 else None, 

972 # Document fields 

973 "is_pdf": doc.file_type == "pdf", 

974 "has_pdf": has_pdf, 

975 "has_text_db": bool(doc.text_content), 

976 "has_rag_indexed": has_rag_indexed, 

977 "rag_chunk_count": total_chunks, 

978 "word_count": word_count, 

979 "collections": collections_list, 

980 } 

981 

982 # Not found 

983 return None 

984 

985 def toggle_favorite(self, document_id: str) -> bool: 

986 """Toggle favorite status of a document.""" 

987 with get_user_db_session(self.username) as session: 

988 doc = session.query(Document).get(document_id) 

989 if doc: 

990 doc.favorite = not doc.favorite 

991 session.commit() 

992 return doc.favorite 

993 return False 

994 

995 def delete_document(self, document_id: str) -> bool: 

996 """Delete a document from library (file and database entry).""" 

997 with get_user_db_session(self.username) as session: 

998 doc = session.query(Document).get(document_id) 

999 if not doc: 

1000 return False 

1001 

1002 # Get file path from tracker (only if document has original_url) 

1003 tracker = None 

1004 if doc.original_url: 

1005 tracker = ( 

1006 session.query(DownloadTracker) 

1007 .filter_by(url_hash=self._get_url_hash(doc.original_url)) 

1008 .first() 

1009 ) 

1010 

1011 # Delete physical file 

1012 if tracker and tracker.file_path: 

1013 try: 

1014 file_path = get_absolute_path_from_settings( 

1015 tracker.file_path 

1016 ) 

1017 if file_path and file_path.is_file(): 

1018 file_path.unlink() 

1019 logger.info(f"Deleted file: {file_path}") 

1020 except Exception: 

1021 logger.exception("Failed to delete file") 

1022 

1023 # Update tracker 

1024 if tracker: 

1025 tracker.is_downloaded = False 

1026 tracker.file_path = None 

1027 

1028 # Delete document and all related records 

1029 from ..deletion.utils.cascade_helper import CascadeHelper 

1030 

1031 CascadeHelper.delete_document_completely(session, document_id) 

1032 session.commit() 

1033 

1034 return True 

1035 

1036 def open_file_location(self, document_id: str) -> bool: 

1037 """Open the folder containing the document.""" 

1038 with get_user_db_session(self.username) as session: 

1039 doc = session.query(Document).get(document_id) 

1040 if not doc: 

1041 return False 

1042 

1043 tracker = None 

1044 if doc.original_url: 

1045 tracker = ( 

1046 session.query(DownloadTracker) 

1047 .filter_by(url_hash=self._get_url_hash(doc.original_url)) 

1048 .first() 

1049 ) 

1050 

1051 if tracker and tracker.file_path: 

1052 # Validate path is within library root to prevent traversal attacks 

1053 library_root = get_absolute_path_from_settings("") 

1054 if not library_root: 

1055 logger.warning("Could not determine library root") 

1056 return False 

1057 try: 

1058 validated_path = PathValidator.validate_safe_path( 

1059 tracker.file_path, library_root, allow_absolute=False 

1060 ) 

1061 if validated_path and validated_path.is_file(): 1061 ↛ 1067line 1061 didn't jump to line 1067 because the condition on line 1061 was always true

1062 return open_file_location(str(validated_path)) 

1063 except ValueError: 

1064 logger.warning("Path validation failed") 

1065 return False 

1066 

1067 return False 

1068 

1069 def get_unique_domains(self) -> List[str]: 

1070 """Get sorted list of unique netlocs from all document URLs.""" 

1071 with get_user_db_session(self.username) as session: 

1072 urls = ( 

1073 session.query(Document.original_url) 

1074 .filter(Document.original_url.isnot(None)) 

1075 .all() 

1076 ) 

1077 netlocs = {self._extract_domain(row[0]) for row in urls} 

1078 return sorted(n for n in netlocs if n) 

1079 

1080 def _extract_domain(self, url: str) -> str: 

1081 """Extract domain from URL.""" 

1082 from urllib.parse import urlparse 

1083 

1084 try: 

1085 return urlparse(url).netloc 

1086 except (ValueError, AttributeError): 

1087 return "" 

1088 

1089 def _get_url_hash(self, url: str) -> str: 

1090 """Generate hash for URL.""" 

1091 import re 

1092 

1093 # Normalize URL 

1094 url = re.sub(r"^https?://", "", url) 

1095 url = re.sub(r"^www\.", "", url) 

1096 url = url.rstrip("/") 

1097 

1098 return get_url_hash(url) 

1099 

1100 def _get_storage_path(self) -> str: 

1101 """Get library storage path from settings (respects LDR_DATA_DIR).""" 

1102 from ...utilities.db_utils import get_settings_manager 

1103 

1104 settings = get_settings_manager() 

1105 return str( 

1106 Path( 

1107 settings.get_setting( 

1108 "research_library.storage_path", 

1109 str(get_library_directory()), 

1110 ) 

1111 ) 

1112 .expanduser() 

1113 .resolve() 

1114 ) 

1115 

1116 def sync_library_with_filesystem(self) -> Dict: 

1117 """ 

1118 Sync library database with filesystem. 

1119 Check which PDF files exist and update database accordingly. 

1120 

1121 Returns: 

1122 Statistics about the sync operation 

1123 """ 

1124 with get_user_db_session(self.username) as session: 

1125 # Get all documents marked as completed 

1126 documents = ( 

1127 session.query(Document) 

1128 .filter_by(status=DocumentStatus.COMPLETED) 

1129 .all() 

1130 ) 

1131 

1132 stats = { 

1133 "total_documents": len(documents), 

1134 "files_found": 0, 

1135 "files_missing": 0, 

1136 "trackers_updated": 0, 

1137 "missing_files": [], 

1138 } 

1139 

1140 # Sync documents with filesystem 

1141 for doc in documents: 

1142 # Get download tracker 

1143 tracker = ( 

1144 session.query(DownloadTracker) 

1145 .filter_by(url_hash=self._get_url_hash(doc.original_url)) 

1146 .first() 

1147 ) 

1148 

1149 if tracker and tracker.file_path: 

1150 # Check if file exists 

1151 file_path = get_absolute_path_from_settings( 

1152 tracker.file_path 

1153 ) 

1154 if file_path and file_path.is_file(): 

1155 stats["files_found"] += 1 

1156 else: 

1157 # File missing or path invalid - mark for re-download 

1158 stats["files_missing"] += 1 

1159 stats["missing_files"].append( 

1160 { 

1161 "id": doc.id, 

1162 "title": doc.title, 

1163 "path": str(file_path) 

1164 if file_path 

1165 else "invalid", 

1166 "url": doc.original_url, 

1167 } 

1168 ) 

1169 

1170 # Reset tracker 

1171 tracker.is_downloaded = False 

1172 tracker.file_path = None 

1173 

1174 # Delete the document entry so it can be re-queued 

1175 from ..deletion.utils.cascade_helper import ( 

1176 CascadeHelper, 

1177 ) 

1178 

1179 CascadeHelper.delete_document_completely( 

1180 session, doc.id 

1181 ) 

1182 stats["trackers_updated"] += 1 

1183 else: 

1184 # No tracker or path - delete the document entry 

1185 stats["files_missing"] += 1 

1186 from ..deletion.utils.cascade_helper import CascadeHelper 

1187 

1188 CascadeHelper.delete_document_completely(session, doc.id) 

1189 

1190 session.commit() 

1191 logger.info( 

1192 f"Library sync completed: {stats['files_found']} found, {stats['files_missing']} missing" 

1193 ) 

1194 

1195 return stats 

1196 

1197 def mark_for_redownload(self, document_ids: List[str]) -> int: 

1198 """ 

1199 Mark specific documents for re-download. 

1200 

1201 Args: 

1202 document_ids: List of document IDs to mark for re-download 

1203 

1204 Returns: 

1205 Number of documents marked 

1206 """ 

1207 with get_user_db_session(self.username) as session: 

1208 count = 0 

1209 for doc_id in document_ids: 

1210 doc = session.query(Document).get(doc_id) 

1211 if doc: 

1212 # Get tracker and reset it 

1213 tracker = ( 

1214 session.query(DownloadTracker) 

1215 .filter_by( 

1216 url_hash=self._get_url_hash(doc.original_url) 

1217 ) 

1218 .first() 

1219 ) 

1220 

1221 if tracker: 1221 ↛ 1226line 1221 didn't jump to line 1226 because the condition on line 1221 was always true

1222 tracker.is_downloaded = False 

1223 tracker.file_path = None 

1224 

1225 # Mark document as pending 

1226 doc.status = DocumentStatus.PENDING 

1227 count += 1 

1228 

1229 session.commit() 

1230 logger.info(f"Marked {count} documents for re-download") 

1231 return count