Coverage for src / local_deep_research / research_library / services / library_service.py: 59%

245 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Library Management Service 

3 

4Handles querying and managing the downloaded document library: 

5- Search and filter documents 

6- Get statistics and analytics 

7- Manage collections and favorites 

8- Handle file operations 

9""" 

10 

11from pathlib import Path 

12from typing import Dict, List, Optional 

13from urllib.parse import urlparse 

14 

15from loguru import logger 

16from sqlalchemy import and_, or_, func, Integer, case 

17 

18from ...database.models.download_tracker import DownloadTracker 

19from ...database.models.library import ( 

20 Collection, 

21 Document, 

22 DocumentBlob, 

23 DocumentCollection, 

24 DocumentStatus, 

25) 

26from ...database.models.metrics import ResearchRating 

27from ...database.models.research import ResearchHistory, ResearchResource 

28from ...database.session_context import get_user_db_session 

29from ...security import PathValidator 

30from ...config.paths import get_library_directory 

31from ..utils import ( 

32 get_absolute_path_from_settings, 

33 get_url_hash, 

34 open_file_location, 

35) 

36 

37 

38class LibraryService: 

39 """Service for managing and querying the document library.""" 

40 

41 def __init__(self, username: str): 

42 """Initialize library service for a user.""" 

43 self.username = username 

44 

45 def _has_blob_in_db(self, session, document_id: str) -> bool: 

46 """Check if a PDF blob exists in the database for a document.""" 

47 return ( 

48 session.query(DocumentBlob.document_id) 

49 .filter_by(document_id=document_id) 

50 .first() 

51 is not None 

52 ) 

53 

54 def _is_arxiv_url(self, url: str) -> bool: 

55 """Check if URL is from arXiv domain.""" 

56 try: 

57 hostname = urlparse(url).hostname 

58 return bool( 

59 hostname 

60 and (hostname == "arxiv.org" or hostname.endswith(".arxiv.org")) 

61 ) 

62 except Exception: 

63 return False 

64 

65 def _is_pubmed_url(self, url: str) -> bool: 

66 """Check if URL is from PubMed or NCBI domains.""" 

67 try: 

68 parsed = urlparse(url) 

69 hostname = parsed.hostname 

70 if not hostname: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 return False 

72 

73 # Check for pubmed.ncbi.nlm.nih.gov 

74 if hostname == "pubmed.ncbi.nlm.nih.gov": 

75 return True 

76 

77 # Check for ncbi.nlm.nih.gov with PMC path 

78 if hostname == "ncbi.nlm.nih.gov" and "/pmc" in parsed.path: 

79 return True 

80 

81 # Check for pubmed in subdomain 

82 if "pubmed" in hostname: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true

83 return True 

84 

85 return False 

86 except Exception: 

87 return False 

88 

89 def _apply_domain_filter(self, query, model_class, domain: str): 

90 """Apply domain filter to query for Document.""" 

91 if domain == "arxiv.org": 91 ↛ 93line 91 didn't jump to line 93 because the condition on line 91 was always true

92 return query.filter(model_class.original_url.like("%arxiv.org%")) 

93 elif domain == "pubmed": 

94 return query.filter( 

95 or_( 

96 model_class.original_url.like("%pubmed%"), 

97 model_class.original_url.like("%ncbi.nlm.nih.gov%"), 

98 ) 

99 ) 

100 elif domain == "other": 

101 return query.filter( 

102 and_( 

103 ~model_class.original_url.like("%arxiv.org%"), 

104 ~model_class.original_url.like("%pubmed%"), 

105 ~model_class.original_url.like("%ncbi.nlm.nih.gov%"), 

106 ) 

107 ) 

108 else: 

109 return query.filter(model_class.original_url.like(f"%{domain}%")) 

110 

111 def _apply_search_filter(self, query, model_class, search_query: str): 

112 """Apply search filter to query for Document.""" 

113 search_pattern = f"%{search_query}%" 

114 return query.filter( 

115 or_( 

116 model_class.title.ilike(search_pattern), 

117 model_class.authors.ilike(search_pattern), 

118 model_class.doi.ilike(search_pattern), 

119 ResearchResource.title.ilike(search_pattern), 

120 ) 

121 ) 

122 

123 def get_library_stats(self) -> Dict: 

124 """Get overall library statistics.""" 

125 with get_user_db_session(self.username) as session: 

126 # Get document counts 

127 total_docs = session.query(Document).count() 

128 total_pdfs = ( 

129 session.query(Document).filter_by(file_type="pdf").count() 

130 ) 

131 

132 # Get size stats 

133 size_result = session.query( 

134 func.sum(Document.file_size), 

135 func.avg(Document.file_size), 

136 ).first() 

137 

138 total_size = size_result[0] or 0 

139 avg_size = size_result[1] or 0 

140 

141 # Get research stats 

142 research_count = session.query( 

143 func.count(func.distinct(Document.research_id)) 

144 ).scalar() 

145 

146 # Get domain stats - count unique domains from URLs 

147 # Extract domain from original_url using SQL functions 

148 from sqlalchemy import case, func as sql_func 

149 

150 # Count unique domains by extracting them from URLs 

151 domain_subquery = session.query( 

152 sql_func.distinct( 

153 case( 

154 ( 

155 Document.original_url.like("%arxiv.org%"), 

156 "arxiv.org", 

157 ), 

158 ( 

159 Document.original_url.like("%pubmed%"), 

160 "pubmed", 

161 ), 

162 ( 

163 Document.original_url.like("%ncbi.nlm.nih.gov%"), 

164 "pubmed", 

165 ), 

166 else_="other", 

167 ) 

168 ) 

169 ).subquery() 

170 

171 domain_count = ( 

172 session.query(sql_func.count()) 

173 .select_from(domain_subquery) 

174 .scalar() 

175 ) 

176 

177 # Get download tracker stats 

178 pending_downloads = ( 

179 session.query(DownloadTracker) 

180 .filter_by(is_downloaded=False) 

181 .count() 

182 ) 

183 

184 return { 

185 "total_documents": total_docs, 

186 "total_pdfs": total_pdfs, 

187 "total_size_bytes": total_size, 

188 "total_size_mb": total_size / (1024 * 1024) 

189 if total_size 

190 else 0, 

191 "average_size_mb": avg_size / (1024 * 1024) if avg_size else 0, 

192 "research_sessions": research_count, 

193 "unique_domains": domain_count, 

194 "pending_downloads": pending_downloads, 

195 "storage_path": self._get_storage_path(), 

196 } 

197 

198 def get_documents( 

199 self, 

200 research_id: Optional[str] = None, 

201 domain: Optional[str] = None, 

202 file_type: Optional[str] = None, 

203 favorites_only: bool = False, 

204 search_query: Optional[str] = None, 

205 collection_id: Optional[str] = None, 

206 limit: int = 100, 

207 offset: int = 0, 

208 ) -> List[Dict]: 

209 """ 

210 Get documents with filtering options. 

211 

212 Returns enriched document information with research details. 

213 """ 

214 with get_user_db_session(self.username) as session: 

215 # Get default Library collection ID if not specified 

216 from ...database.library_init import get_default_library_id 

217 

218 if not collection_id: 218 ↛ 221line 218 didn't jump to line 221 because the condition on line 218 was always true

219 collection_id = get_default_library_id(self.username) 

220 

221 logger.info( 

222 f"[LibraryService] Getting documents for collection_id: {collection_id}, research_id: {research_id}, domain: {domain}" 

223 ) 

224 

225 all_documents = [] 

226 

227 # Query documents - join with DocumentCollection to filter by collection 

228 # Use outer joins for ResearchResource and ResearchHistory to include user uploads 

229 query = ( 

230 session.query( 

231 Document, 

232 ResearchResource, 

233 ResearchHistory, 

234 DocumentCollection, 

235 ) 

236 .join( 

237 DocumentCollection, 

238 Document.id == DocumentCollection.document_id, 

239 ) 

240 .outerjoin( 

241 ResearchResource, 

242 Document.resource_id == ResearchResource.id, 

243 ) 

244 .outerjoin( 

245 ResearchHistory, 

246 Document.research_id == ResearchHistory.id, 

247 ) 

248 .filter(DocumentCollection.collection_id == collection_id) 

249 ) 

250 

251 # Apply filters 

252 if research_id: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 query = query.filter(Document.research_id == research_id) 

254 

255 if domain: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true

256 query = self._apply_domain_filter(query, Document, domain) 

257 

258 if file_type: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true

259 query = query.filter(Document.file_type == file_type) 

260 

261 if favorites_only: 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true

262 query = query.filter(Document.favorite.is_(True)) 

263 

264 if search_query: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true

265 query = self._apply_search_filter(query, Document, search_query) 

266 

267 # Filter to only completed documents 

268 query = query.filter(Document.status == "completed") 

269 

270 # Apply safety limit to prevent memory issues 

271 query = query.limit(500000) 

272 

273 # Execute query 

274 results = query.all() 

275 logger.info( 

276 f"[LibraryService] Found {len(results)} documents in collection {collection_id}" 

277 ) 

278 

279 # Process results 

280 for doc, resource, research, doc_collection in results: 280 ↛ 282line 280 didn't jump to line 282 because the loop on line 280 never started

281 # Determine availability flags - use Document.file_path directly 

282 file_absolute_path = None 

283 if ( 

284 doc.file_path 

285 and doc.file_path != "metadata_only" 

286 and doc.file_path != "text_only_not_stored" 

287 ): 

288 file_absolute_path = str( 

289 get_absolute_path_from_settings(doc.file_path) 

290 ) 

291 

292 # Check if PDF is available (filesystem OR database) 

293 has_pdf = bool(file_absolute_path) 

294 if not has_pdf and doc.storage_mode == "database": 

295 has_pdf = self._has_blob_in_db(session, doc.id) 

296 has_text_db = bool(doc.text_content) # Text now in Document 

297 

298 # Use DocumentCollection from query results 

299 has_rag_indexed = ( 

300 doc_collection.indexed if doc_collection else False 

301 ) 

302 rag_chunk_count = ( 

303 doc_collection.chunk_count if doc_collection else 0 

304 ) 

305 

306 all_documents.append( 

307 { 

308 "id": doc.id, 

309 "resource_id": doc.resource_id, 

310 "research_id": doc.research_id, 

311 # Document info 

312 "document_title": doc.title 

313 or (resource.title if resource else doc.filename), 

314 "authors": doc.authors, 

315 "published_date": doc.published_date, 

316 "doi": doc.doi, 

317 "arxiv_id": doc.arxiv_id, 

318 "pmid": doc.pmid, 

319 # File info 

320 "file_path": doc.file_path, 

321 "file_absolute_path": file_absolute_path, 

322 "file_name": Path(doc.file_path).name 

323 if doc.file_path and doc.file_path != "metadata_only" 

324 else "metadata_only", 

325 "file_size": doc.file_size, 

326 "file_type": doc.file_type, 

327 # URLs 

328 "original_url": doc.original_url, 

329 "domain": self._extract_domain(doc.original_url) 

330 if doc.original_url 

331 else "User Upload", 

332 # Status 

333 "download_status": doc.status or "completed", 

334 "downloaded_at": doc.processed_at.isoformat() 

335 if doc.processed_at 

336 else ( 

337 doc.uploaded_at.isoformat() 

338 if hasattr(doc, "uploaded_at") and doc.uploaded_at 

339 else None 

340 ), 

341 "favorite": doc.favorite 

342 if hasattr(doc, "favorite") 

343 else False, 

344 "tags": doc.tags if hasattr(doc, "tags") else [], 

345 # Research info (None for user uploads) 

346 "research_title": research.title or research.query[:80] 

347 if research 

348 else "User Upload", 

349 "research_query": research.query if research else None, 

350 "research_mode": research.mode if research else None, 

351 "research_date": research.created_at 

352 if research 

353 else None, 

354 # Classification flags 

355 "is_arxiv": self._is_arxiv_url(doc.original_url) 

356 if doc.original_url 

357 else False, 

358 "is_pubmed": self._is_pubmed_url(doc.original_url) 

359 if doc.original_url 

360 else False, 

361 "is_pdf": doc.file_type == "pdf", 

362 # Availability flags 

363 "has_pdf": has_pdf, 

364 "has_text_db": has_text_db, 

365 "has_rag_indexed": has_rag_indexed, 

366 "rag_chunk_count": rag_chunk_count, 

367 # Sort key 

368 "_sort_date": doc.processed_at 

369 or ( 

370 doc.uploaded_at 

371 if hasattr(doc, "uploaded_at") 

372 else None 

373 ), 

374 } 

375 ) 

376 

377 # Sort all documents by date (descending) 

378 all_documents.sort( 

379 key=lambda d: d["_sort_date"] if d["_sort_date"] else "", 

380 reverse=True, 

381 ) 

382 

383 # Apply pagination 

384 paginated_documents = all_documents[offset : offset + limit] 

385 

386 # Remove internal sort key 

387 for doc in paginated_documents: 387 ↛ 388line 387 didn't jump to line 388 because the loop on line 387 never started

388 doc.pop("_sort_date", None) 

389 

390 return paginated_documents 

391 

392 def get_all_collections(self) -> List[Dict]: 

393 """Get all collections with document counts.""" 

394 with get_user_db_session(self.username) as session: 

395 # Query collections with document counts 

396 results = ( 

397 session.query( 

398 Collection, 

399 func.count(DocumentCollection.document_id).label( 

400 "document_count" 

401 ), 

402 ) 

403 .outerjoin( 

404 DocumentCollection, 

405 Collection.id == DocumentCollection.collection_id, 

406 ) 

407 .group_by(Collection.id) 

408 .order_by(Collection.is_default.desc(), Collection.name) 

409 .all() 

410 ) 

411 

412 logger.info(f"[LibraryService] Found {len(results)} collections") 

413 

414 collections = [] 

415 for collection, doc_count in results: 

416 logger.debug( 

417 f"[LibraryService] Collection: {collection.name} (ID: {collection.id}), documents: {doc_count}" 

418 ) 

419 collections.append( 

420 { 

421 "id": collection.id, 

422 "name": collection.name, 

423 "description": collection.description, 

424 "is_default": collection.is_default, 

425 "document_count": doc_count or 0, 

426 } 

427 ) 

428 

429 return collections 

430 

431 def get_research_list_with_stats(self) -> List[Dict]: 

432 """Get all research sessions with download statistics.""" 

433 with get_user_db_session(self.username) as session: 

434 # Query research sessions with resource counts 

435 results = ( 

436 session.query( 

437 ResearchHistory, 

438 func.count(ResearchResource.id).label("total_resources"), 

439 func.count( 

440 case( 

441 (Document.status == "completed", 1), 

442 else_=None, 

443 ) 

444 ).label("downloaded_count"), 

445 func.sum( 

446 func.cast( 

447 ResearchResource.url.like("%.pdf") 

448 | ResearchResource.url.like("%arxiv.org%") 

449 | ResearchResource.url.like( 

450 "%ncbi.nlm.nih.gov/pmc%" 

451 ), 

452 Integer, 

453 ) 

454 ).label("downloadable_count"), 

455 ) 

456 .outerjoin( 

457 ResearchResource, 

458 ResearchHistory.id == ResearchResource.research_id, 

459 ) 

460 .outerjoin( 

461 Document, 

462 ResearchResource.id == Document.resource_id, 

463 ) 

464 .group_by(ResearchHistory.id) 

465 .order_by(ResearchHistory.created_at.desc()) 

466 .all() 

467 ) 

468 

469 # Preload all ratings to avoid N+1 queries 

470 research_ids = [r[0].id for r in results] 

471 all_ratings = ( 

472 session.query(ResearchRating) 

473 .filter(ResearchRating.research_id.in_(research_ids)) 

474 .all() 

475 if research_ids 

476 else [] 

477 ) 

478 ratings_by_research = {r.research_id: r for r in all_ratings} 

479 

480 research_list = [] 

481 for ( 

482 research, 

483 total_resources, 

484 downloaded_count, 

485 downloadable_count, 

486 ) in results: 

487 # Get rating from preloaded dict 

488 rating = ratings_by_research.get(research.id) 

489 

490 # Get domain breakdown - simplified version 

491 # Extract domain from URLs using SQL case statements 

492 domains = ( 

493 session.query( 

494 case( 

495 ( 

496 ResearchResource.url.like("%arxiv.org%"), 

497 "arxiv.org", 

498 ), 

499 (ResearchResource.url.like("%pubmed%"), "pubmed"), 

500 ( 

501 ResearchResource.url.like("%ncbi.nlm.nih.gov%"), 

502 "pubmed", 

503 ), 

504 else_="other", 

505 ).label("domain"), 

506 func.count().label("count"), 

507 ) 

508 .filter(ResearchResource.research_id == research.id) 

509 .group_by("domain") 

510 .limit(5) 

511 .all() 

512 ) 

513 

514 research_list.append( 

515 { 

516 "id": research.id, 

517 "title": research.title, 

518 "query": research.query, 

519 "mode": research.mode, 

520 "status": research.status, 

521 "created_at": research.created_at, 

522 "duration_seconds": research.duration_seconds, 

523 "total_resources": total_resources or 0, 

524 "downloaded_count": downloaded_count or 0, 

525 "downloadable_count": downloadable_count or 0, 

526 "rating": rating.rating if rating else None, 

527 "top_domains": [(d, c) for d, c in domains if d], 

528 } 

529 ) 

530 

531 return research_list 

532 

533 def get_document_by_id(self, doc_id: str) -> Optional[Dict]: 

534 """ 

535 Get a specific document by its ID. 

536 

537 Returns document information with file path. 

538 """ 

539 with get_user_db_session(self.username) as session: 

540 # Find document - use outer joins to support both research downloads and user uploads 

541 result = ( 

542 session.query(Document, ResearchResource, ResearchHistory) 

543 .outerjoin( 

544 ResearchResource, 

545 Document.resource_id == ResearchResource.id, 

546 ) 

547 .outerjoin( 

548 ResearchHistory, 

549 Document.research_id == ResearchHistory.id, 

550 ) 

551 .filter(Document.id == doc_id) 

552 .first() 

553 ) 

554 

555 if result: 555 ↛ 557line 555 didn't jump to line 557 because the condition on line 555 was never true

556 # Found document 

557 doc, resource, research = result 

558 

559 # Get RAG indexing status across all collections 

560 doc_collections = ( 

561 session.query(DocumentCollection, Collection) 

562 .join(Collection) 

563 .filter(DocumentCollection.document_id == doc_id) 

564 .all() 

565 ) 

566 

567 # Check if indexed in any collection 

568 has_rag_indexed = any( 

569 dc.indexed for dc, coll in doc_collections 

570 ) 

571 total_chunks = sum( 

572 dc.chunk_count for dc, coll in doc_collections if dc.indexed 

573 ) 

574 

575 # Build collections list 

576 collections_list = [ 

577 { 

578 "id": coll.id, 

579 "name": coll.name, 

580 "indexed": dc.indexed, 

581 "chunk_count": dc.chunk_count, 

582 } 

583 for dc, coll in doc_collections 

584 ] 

585 

586 # Calculate word count from text content 

587 word_count = ( 

588 len(doc.text_content.split()) if doc.text_content else 0 

589 ) 

590 

591 # Check if PDF is available (database OR filesystem) 

592 has_pdf = bool( 

593 doc.file_path 

594 and doc.file_path != "metadata_only" 

595 and doc.file_path != "text_only_not_stored" 

596 ) 

597 if not has_pdf and doc.storage_mode == "database": 

598 has_pdf = self._has_blob_in_db(session, doc.id) 

599 

600 return { 

601 "id": doc.id, 

602 "resource_id": doc.resource_id, 

603 "research_id": doc.research_id, 

604 "document_title": doc.title 

605 or (resource.title if resource else doc.filename), 

606 "original_url": doc.original_url 

607 or (resource.url if resource else None), 

608 "file_path": doc.file_path, 

609 "file_absolute_path": str( 

610 get_absolute_path_from_settings(doc.file_path) 

611 ) 

612 if doc.file_path 

613 and doc.file_path 

614 not in ("metadata_only", "text_only_not_stored") 

615 else None, 

616 "file_name": Path(doc.file_path).name 

617 if doc.file_path 

618 and doc.file_path 

619 not in ("metadata_only", "text_only_not_stored") 

620 else doc.filename, 

621 "file_size": doc.file_size, 

622 "file_type": doc.file_type, 

623 "mime_type": doc.mime_type, 

624 "domain": self._extract_domain(resource.url) 

625 if resource 

626 else "User Upload", 

627 "download_status": doc.status, 

628 "downloaded_at": doc.processed_at.isoformat() 

629 if doc.processed_at 

630 and hasattr(doc.processed_at, "isoformat") 

631 else str(doc.processed_at) 

632 if doc.processed_at 

633 else ( 

634 doc.uploaded_at.isoformat() 

635 if hasattr(doc, "uploaded_at") and doc.uploaded_at 

636 else None 

637 ), 

638 "favorite": doc.favorite 

639 if hasattr(doc, "favorite") 

640 else False, 

641 "tags": doc.tags if hasattr(doc, "tags") else [], 

642 "research_title": research.query[:100] 

643 if research 

644 else "User Upload", 

645 "research_created_at": research.created_at 

646 if research and isinstance(research.created_at, str) 

647 else research.created_at.isoformat() 

648 if research and research.created_at 

649 else None, 

650 # Document fields 

651 "is_pdf": doc.file_type == "pdf", 

652 "has_pdf": has_pdf, 

653 "has_text_db": bool(doc.text_content), 

654 "has_rag_indexed": has_rag_indexed, 

655 "rag_chunk_count": total_chunks, 

656 "word_count": word_count, 

657 "collections": collections_list, 

658 } 

659 

660 # Not found 

661 return None 

662 

663 def toggle_favorite(self, document_id: str) -> bool: 

664 """Toggle favorite status of a document.""" 

665 with get_user_db_session(self.username) as session: 

666 doc = session.query(Document).get(document_id) 

667 if doc: 

668 doc.favorite = not doc.favorite 

669 session.commit() 

670 return doc.favorite 

671 return False 

672 

673 def delete_document(self, document_id: str) -> bool: 

674 """Delete a document from library (file and database entry).""" 

675 with get_user_db_session(self.username) as session: 

676 doc = session.query(Document).get(document_id) 

677 if not doc: 677 ↛ 681line 677 didn't jump to line 681 because the condition on line 677 was always true

678 return False 

679 

680 # Get file path from tracker (only if document has original_url) 

681 tracker = None 

682 if doc.original_url: 

683 tracker = ( 

684 session.query(DownloadTracker) 

685 .filter_by(url_hash=self._get_url_hash(doc.original_url)) 

686 .first() 

687 ) 

688 

689 # Delete physical file 

690 if tracker and tracker.file_path: 

691 try: 

692 file_path = get_absolute_path_from_settings( 

693 tracker.file_path 

694 ) 

695 if file_path.exists(): 

696 file_path.unlink() 

697 logger.info(f"Deleted file: {file_path}") 

698 except Exception: 

699 logger.exception("Failed to delete file") 

700 

701 # Update tracker 

702 if tracker: 

703 tracker.is_downloaded = False 

704 tracker.file_path = None 

705 

706 # Delete document and all related records 

707 from ..deletion.utils.cascade_helper import CascadeHelper 

708 

709 CascadeHelper.delete_document_completely(session, document_id) 

710 session.commit() 

711 

712 return True 

713 

714 def open_file_location(self, document_id: str) -> bool: 

715 """Open the folder containing the document.""" 

716 with get_user_db_session(self.username) as session: 

717 doc = session.query(Document).get(document_id) 

718 if not doc: 718 ↛ 721line 718 didn't jump to line 721 because the condition on line 718 was always true

719 return False 

720 

721 tracker = None 

722 if doc.original_url: 

723 tracker = ( 

724 session.query(DownloadTracker) 

725 .filter_by(url_hash=self._get_url_hash(doc.original_url)) 

726 .first() 

727 ) 

728 

729 if tracker and tracker.file_path: 

730 # Validate path is within library root to prevent traversal attacks 

731 library_root = get_absolute_path_from_settings("") 

732 try: 

733 validated_path = PathValidator.validate_safe_path( 

734 tracker.file_path, library_root, allow_absolute=False 

735 ) 

736 if validated_path and validated_path.exists(): 

737 return open_file_location(str(validated_path)) 

738 except ValueError as e: 

739 logger.warning(f"Path validation failed: {e}") 

740 return False 

741 

742 return False 

743 

744 def get_unique_domains(self) -> List[str]: 

745 """Get list of unique domains in library.""" 

746 from sqlalchemy import case 

747 

748 with get_user_db_session(self.username) as session: 

749 # Extract domains from URLs using SQL case statement 

750 domains = ( 

751 session.query( 

752 func.distinct( 

753 case( 

754 ( 

755 Document.original_url.like("%arxiv.org%"), 

756 "arxiv.org", 

757 ), 

758 ( 

759 Document.original_url.like("%pubmed%"), 

760 "pubmed", 

761 ), 

762 ( 

763 Document.original_url.like( 

764 "%ncbi.nlm.nih.gov%" 

765 ), 

766 "pubmed", 

767 ), 

768 else_="other", 

769 ) 

770 ) 

771 ) 

772 .filter(Document.original_url.isnot(None)) 

773 .all() 

774 ) 

775 

776 return [d[0] for d in domains if d[0]] 

777 

778 def _extract_domain(self, url: str) -> str: 

779 """Extract domain from URL.""" 

780 from urllib.parse import urlparse 

781 

782 try: 

783 return urlparse(url).netloc 

784 except (ValueError, AttributeError): 

785 return "" 

786 

787 def _get_url_hash(self, url: str) -> str: 

788 """Generate hash for URL.""" 

789 import re 

790 

791 # Normalize URL 

792 url = re.sub(r"^https?://", "", url) 

793 url = re.sub(r"^www\.", "", url) 

794 url = url.rstrip("/") 

795 

796 return get_url_hash(url) 

797 

798 def _get_storage_path(self) -> str: 

799 """Get library storage path from settings (respects LDR_DATA_DIR).""" 

800 from ...utilities.db_utils import get_settings_manager 

801 

802 settings = get_settings_manager() 

803 return str( 

804 Path( 

805 settings.get_setting( 

806 "research_library.storage_path", 

807 str(get_library_directory()), 

808 ) 

809 ).expanduser() 

810 ) 

811 

812 def sync_library_with_filesystem(self) -> Dict: 

813 """ 

814 Sync library database with filesystem. 

815 Check which PDF files exist and update database accordingly. 

816 

817 Returns: 

818 Statistics about the sync operation 

819 """ 

820 with get_user_db_session(self.username) as session: 

821 # Get all documents marked as completed 

822 documents = ( 

823 session.query(Document) 

824 .filter_by(status=DocumentStatus.COMPLETED) 

825 .all() 

826 ) 

827 

828 stats = { 

829 "total_documents": len(documents), 

830 "files_found": 0, 

831 "files_missing": 0, 

832 "trackers_updated": 0, 

833 "missing_files": [], 

834 } 

835 

836 # Sync documents with filesystem 

837 for doc in documents: 837 ↛ 839line 837 didn't jump to line 839 because the loop on line 837 never started

838 # Get download tracker 

839 tracker = ( 

840 session.query(DownloadTracker) 

841 .filter_by(url_hash=self._get_url_hash(doc.original_url)) 

842 .first() 

843 ) 

844 

845 if tracker and tracker.file_path: 

846 # Check if file exists 

847 file_path = get_absolute_path_from_settings( 

848 tracker.file_path 

849 ) 

850 if file_path.exists(): 

851 stats["files_found"] += 1 

852 else: 

853 # File missing - delete the document entry so it can be re-downloaded 

854 stats["files_missing"] += 1 

855 stats["missing_files"].append( 

856 { 

857 "id": doc.id, 

858 "title": doc.title, 

859 "path": str(file_path), 

860 "url": doc.original_url, 

861 } 

862 ) 

863 

864 # Reset tracker 

865 tracker.is_downloaded = False 

866 tracker.file_path = None 

867 

868 # Delete the document entry so it can be re-queued 

869 from ..deletion.utils.cascade_helper import ( 

870 CascadeHelper, 

871 ) 

872 

873 CascadeHelper.delete_document_completely( 

874 session, doc.id 

875 ) 

876 stats["trackers_updated"] += 1 

877 else: 

878 # No tracker or path - delete the document entry 

879 stats["files_missing"] += 1 

880 from ..deletion.utils.cascade_helper import CascadeHelper 

881 

882 CascadeHelper.delete_document_completely(session, doc.id) 

883 

884 session.commit() 

885 logger.info( 

886 f"Library sync completed: {stats['files_found']} found, {stats['files_missing']} missing" 

887 ) 

888 

889 return stats 

890 

891 def mark_for_redownload(self, document_ids: List[str]) -> int: 

892 """ 

893 Mark specific documents for re-download. 

894 

895 Args: 

896 document_ids: List of document IDs to mark for re-download 

897 

898 Returns: 

899 Number of documents marked 

900 """ 

901 with get_user_db_session(self.username) as session: 

902 count = 0 

903 for doc_id in document_ids: 

904 doc = session.query(Document).get(doc_id) 

905 if doc: 905 ↛ 903line 905 didn't jump to line 903 because the condition on line 905 was always true

906 # Get tracker and reset it 

907 tracker = ( 

908 session.query(DownloadTracker) 

909 .filter_by( 

910 url_hash=self._get_url_hash(doc.original_url) 

911 ) 

912 .first() 

913 ) 

914 

915 if tracker: 915 ↛ 920line 915 didn't jump to line 920 because the condition on line 915 was always true

916 tracker.is_downloaded = False 

917 tracker.file_path = None 

918 

919 # Mark document as pending 

920 doc.status = DocumentStatus.PENDING 

921 count += 1 

922 

923 session.commit() 

924 logger.info(f"Marked {count} documents for re-download") 

925 return count