Coverage for src / local_deep_research / research_library / services / library_service.py: 30%

242 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Library Management Service 

3 

4Handles querying and managing the downloaded document library: 

5- Search and filter documents 

6- Get statistics and analytics 

7- Manage collections and favorites 

8- Handle file operations 

9""" 

10 

11from pathlib import Path 

12from typing import Dict, List, Optional 

13from urllib.parse import urlparse 

14 

15from loguru import logger 

16from sqlalchemy import and_, or_, func, Integer, case 

17 

18from ...database.models.download_tracker import DownloadTracker 

19from ...database.models.library import ( 

20 Collection, 

21 Document, 

22 DocumentBlob, 

23 DocumentCollection, 

24 DocumentStatus, 

25) 

26from ...database.models.metrics import ResearchRating 

27from ...database.models.research import ResearchHistory, ResearchResource 

28from ...database.session_context import get_user_db_session 

29from ...security import PathValidator 

30from ...config.paths import get_library_directory 

31from ..utils import ( 

32 get_absolute_path_from_settings, 

33 get_url_hash, 

34 open_file_location, 

35) 

36 

37 

38class LibraryService: 

39 """Service for managing and querying the document library.""" 

40 

41 def __init__(self, username: str): 

42 """Initialize library service for a user.""" 

43 self.username = username 

44 

45 def _has_blob_in_db(self, session, document_id: str) -> bool: 

46 """Check if a PDF blob exists in the database for a document.""" 

47 return ( 

48 session.query(DocumentBlob.document_id) 

49 .filter_by(document_id=document_id) 

50 .first() 

51 is not None 

52 ) 

53 

54 def _is_arxiv_url(self, url: str) -> bool: 

55 """Check if URL is from arXiv domain.""" 

56 try: 

57 hostname = urlparse(url).hostname 

58 return bool( 

59 hostname 

60 and (hostname == "arxiv.org" or hostname.endswith(".arxiv.org")) 

61 ) 

62 except Exception: 

63 return False 

64 

65 def _is_pubmed_url(self, url: str) -> bool: 

66 """Check if URL is from PubMed or NCBI domains.""" 

67 try: 

68 parsed = urlparse(url) 

69 hostname = parsed.hostname 

70 if not hostname: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 return False 

72 

73 # Check for pubmed.ncbi.nlm.nih.gov 

74 if hostname == "pubmed.ncbi.nlm.nih.gov": 

75 return True 

76 

77 # Check for ncbi.nlm.nih.gov with PMC path 

78 if hostname == "ncbi.nlm.nih.gov" and "/pmc" in parsed.path: 

79 return True 

80 

81 # Check for pubmed in subdomain 

82 if "pubmed" in hostname: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true

83 return True 

84 

85 return False 

86 except Exception: 

87 return False 

88 

89 def _apply_domain_filter(self, query, model_class, domain: str): 

90 """Apply domain filter to query for Document.""" 

91 if domain == "arxiv.org": 

92 return query.filter(model_class.original_url.like("%arxiv.org%")) 

93 elif domain == "pubmed": 

94 return query.filter( 

95 or_( 

96 model_class.original_url.like("%pubmed%"), 

97 model_class.original_url.like("%ncbi.nlm.nih.gov%"), 

98 ) 

99 ) 

100 elif domain == "other": 

101 return query.filter( 

102 and_( 

103 ~model_class.original_url.like("%arxiv.org%"), 

104 ~model_class.original_url.like("%pubmed%"), 

105 ~model_class.original_url.like("%ncbi.nlm.nih.gov%"), 

106 ) 

107 ) 

108 else: 

109 return query.filter(model_class.original_url.like(f"%{domain}%")) 

110 

111 def _apply_search_filter(self, query, model_class, search_query: str): 

112 """Apply search filter to query for Document.""" 

113 search_pattern = f"%{search_query}%" 

114 return query.filter( 

115 or_( 

116 model_class.title.ilike(search_pattern), 

117 model_class.authors.ilike(search_pattern), 

118 model_class.doi.ilike(search_pattern), 

119 ResearchResource.title.ilike(search_pattern), 

120 ) 

121 ) 

122 

123 def get_library_stats(self) -> Dict: 

124 """Get overall library statistics.""" 

125 with get_user_db_session(self.username) as session: 

126 # Get document counts 

127 total_docs = session.query(Document).count() 

128 total_pdfs = ( 

129 session.query(Document).filter_by(file_type="pdf").count() 

130 ) 

131 

132 # Get size stats 

133 size_result = session.query( 

134 func.sum(Document.file_size), 

135 func.avg(Document.file_size), 

136 ).first() 

137 

138 total_size = size_result[0] or 0 

139 avg_size = size_result[1] or 0 

140 

141 # Get research stats 

142 research_count = session.query( 

143 func.count(func.distinct(Document.research_id)) 

144 ).scalar() 

145 

146 # Get domain stats - count unique domains from URLs 

147 # Extract domain from original_url using SQL functions 

148 from sqlalchemy import case, func as sql_func 

149 

150 # Count unique domains by extracting them from URLs 

151 domain_subquery = session.query( 

152 sql_func.distinct( 

153 case( 

154 ( 

155 Document.original_url.like("%arxiv.org%"), 

156 "arxiv.org", 

157 ), 

158 ( 

159 Document.original_url.like("%pubmed%"), 

160 "pubmed", 

161 ), 

162 ( 

163 Document.original_url.like("%ncbi.nlm.nih.gov%"), 

164 "pubmed", 

165 ), 

166 else_="other", 

167 ) 

168 ) 

169 ).subquery() 

170 

171 domain_count = ( 

172 session.query(sql_func.count()) 

173 .select_from(domain_subquery) 

174 .scalar() 

175 ) 

176 

177 # Get download tracker stats 

178 pending_downloads = ( 

179 session.query(DownloadTracker) 

180 .filter_by(is_downloaded=False) 

181 .count() 

182 ) 

183 

184 return { 

185 "total_documents": total_docs, 

186 "total_pdfs": total_pdfs, 

187 "total_size_bytes": total_size, 

188 "total_size_mb": total_size / (1024 * 1024) 

189 if total_size 

190 else 0, 

191 "average_size_mb": avg_size / (1024 * 1024) if avg_size else 0, 

192 "research_sessions": research_count, 

193 "unique_domains": domain_count, 

194 "pending_downloads": pending_downloads, 

195 "storage_path": self._get_storage_path(), 

196 } 

197 

198 def get_documents( 

199 self, 

200 research_id: Optional[str] = None, 

201 domain: Optional[str] = None, 

202 file_type: Optional[str] = None, 

203 favorites_only: bool = False, 

204 search_query: Optional[str] = None, 

205 collection_id: Optional[str] = None, 

206 limit: int = 100, 

207 offset: int = 0, 

208 ) -> List[Dict]: 

209 """ 

210 Get documents with filtering options. 

211 

212 Returns enriched document information with research details. 

213 """ 

214 with get_user_db_session(self.username) as session: 

215 # Get default Library collection ID if not specified 

216 from ...database.library_init import get_default_library_id 

217 

218 if not collection_id: 

219 collection_id = get_default_library_id(self.username) 

220 

221 logger.info( 

222 f"[LibraryService] Getting documents for collection_id: {collection_id}, research_id: {research_id}, domain: {domain}" 

223 ) 

224 

225 all_documents = [] 

226 

227 # Query documents - join with DocumentCollection to filter by collection 

228 # Use outer joins for ResearchResource and ResearchHistory to include user uploads 

229 query = ( 

230 session.query( 

231 Document, 

232 ResearchResource, 

233 ResearchHistory, 

234 DocumentCollection, 

235 ) 

236 .join( 

237 DocumentCollection, 

238 Document.id == DocumentCollection.document_id, 

239 ) 

240 .outerjoin( 

241 ResearchResource, 

242 Document.resource_id == ResearchResource.id, 

243 ) 

244 .outerjoin( 

245 ResearchHistory, 

246 Document.research_id == ResearchHistory.id, 

247 ) 

248 .filter(DocumentCollection.collection_id == collection_id) 

249 ) 

250 

251 # Apply filters 

252 if research_id: 

253 query = query.filter(Document.research_id == research_id) 

254 

255 if domain: 

256 query = self._apply_domain_filter(query, Document, domain) 

257 

258 if file_type: 

259 query = query.filter(Document.file_type == file_type) 

260 

261 if favorites_only: 

262 query = query.filter(Document.favorite.is_(True)) 

263 

264 if search_query: 

265 query = self._apply_search_filter(query, Document, search_query) 

266 

267 # Filter to only completed documents 

268 query = query.filter(Document.status == "completed") 

269 

270 # Apply safety limit to prevent memory issues 

271 query = query.limit(500000) 

272 

273 # Execute query 

274 results = query.all() 

275 logger.info( 

276 f"[LibraryService] Found {len(results)} documents in collection {collection_id}" 

277 ) 

278 

279 # Process results 

280 for doc, resource, research, doc_collection in results: 

281 # Determine availability flags - use Document.file_path directly 

282 file_absolute_path = None 

283 if ( 

284 doc.file_path 

285 and doc.file_path != "metadata_only" 

286 and doc.file_path != "text_only_not_stored" 

287 ): 

288 file_absolute_path = str( 

289 get_absolute_path_from_settings(doc.file_path) 

290 ) 

291 

292 # Check if PDF is available (filesystem OR database) 

293 has_pdf = bool(file_absolute_path) 

294 if not has_pdf and doc.storage_mode == "database": 

295 has_pdf = self._has_blob_in_db(session, doc.id) 

296 has_text_db = bool(doc.text_content) # Text now in Document 

297 

298 # Use DocumentCollection from query results 

299 has_rag_indexed = ( 

300 doc_collection.indexed if doc_collection else False 

301 ) 

302 rag_chunk_count = ( 

303 doc_collection.chunk_count if doc_collection else 0 

304 ) 

305 

306 all_documents.append( 

307 { 

308 "id": doc.id, 

309 "resource_id": doc.resource_id, 

310 "research_id": doc.research_id, 

311 # Document info 

312 "document_title": doc.title 

313 or (resource.title if resource else doc.filename), 

314 "authors": doc.authors, 

315 "published_date": doc.published_date, 

316 "doi": doc.doi, 

317 "arxiv_id": doc.arxiv_id, 

318 "pmid": doc.pmid, 

319 # File info 

320 "file_path": doc.file_path, 

321 "file_absolute_path": file_absolute_path, 

322 "file_name": Path(doc.file_path).name 

323 if doc.file_path and doc.file_path != "metadata_only" 

324 else "metadata_only", 

325 "file_size": doc.file_size, 

326 "file_type": doc.file_type, 

327 # URLs 

328 "original_url": doc.original_url, 

329 "domain": self._extract_domain(doc.original_url) 

330 if doc.original_url 

331 else "User Upload", 

332 # Status 

333 "download_status": doc.status or "completed", 

334 "downloaded_at": doc.processed_at.isoformat() 

335 if doc.processed_at 

336 else ( 

337 doc.uploaded_at.isoformat() 

338 if hasattr(doc, "uploaded_at") and doc.uploaded_at 

339 else None 

340 ), 

341 "favorite": doc.favorite 

342 if hasattr(doc, "favorite") 

343 else False, 

344 "tags": doc.tags if hasattr(doc, "tags") else [], 

345 # Research info (None for user uploads) 

346 "research_title": research.title or research.query[:80] 

347 if research 

348 else "User Upload", 

349 "research_query": research.query if research else None, 

350 "research_mode": research.mode if research else None, 

351 "research_date": research.created_at 

352 if research 

353 else None, 

354 # Classification flags 

355 "is_arxiv": self._is_arxiv_url(doc.original_url) 

356 if doc.original_url 

357 else False, 

358 "is_pubmed": self._is_pubmed_url(doc.original_url) 

359 if doc.original_url 

360 else False, 

361 "is_pdf": doc.file_type == "pdf", 

362 # Availability flags 

363 "has_pdf": has_pdf, 

364 "has_text_db": has_text_db, 

365 "has_rag_indexed": has_rag_indexed, 

366 "rag_chunk_count": rag_chunk_count, 

367 # Sort key 

368 "_sort_date": doc.processed_at 

369 or ( 

370 doc.uploaded_at 

371 if hasattr(doc, "uploaded_at") 

372 else None 

373 ), 

374 } 

375 ) 

376 

377 # Sort all documents by date (descending) 

378 all_documents.sort( 

379 key=lambda d: d["_sort_date"] if d["_sort_date"] else "", 

380 reverse=True, 

381 ) 

382 

383 # Apply pagination 

384 paginated_documents = all_documents[offset : offset + limit] 

385 

386 # Remove internal sort key 

387 for doc in paginated_documents: 

388 doc.pop("_sort_date", None) 

389 

390 return paginated_documents 

391 

392 def get_all_collections(self) -> List[Dict]: 

393 """Get all collections with document counts.""" 

394 with get_user_db_session(self.username) as session: 

395 # Query collections with document counts 

396 results = ( 

397 session.query( 

398 Collection, 

399 func.count(DocumentCollection.document_id).label( 

400 "document_count" 

401 ), 

402 ) 

403 .outerjoin( 

404 DocumentCollection, 

405 Collection.id == DocumentCollection.collection_id, 

406 ) 

407 .group_by(Collection.id) 

408 .order_by(Collection.is_default.desc(), Collection.name) 

409 .all() 

410 ) 

411 

412 logger.info(f"[LibraryService] Found {len(results)} collections") 

413 

414 collections = [] 

415 for collection, doc_count in results: 

416 logger.debug( 

417 f"[LibraryService] Collection: {collection.name} (ID: {collection.id}), documents: {doc_count}" 

418 ) 

419 collections.append( 

420 { 

421 "id": collection.id, 

422 "name": collection.name, 

423 "description": collection.description, 

424 "is_default": collection.is_default, 

425 "document_count": doc_count or 0, 

426 } 

427 ) 

428 

429 return collections 

430 

431 def get_research_list_with_stats(self) -> List[Dict]: 

432 """Get all research sessions with download statistics.""" 

433 with get_user_db_session(self.username) as session: 

434 # Query research sessions with resource counts 

435 results = ( 

436 session.query( 

437 ResearchHistory, 

438 func.count(ResearchResource.id).label("total_resources"), 

439 func.count( 

440 case( 

441 (Document.status == "completed", 1), 

442 else_=None, 

443 ) 

444 ).label("downloaded_count"), 

445 func.sum( 

446 func.cast( 

447 ResearchResource.url.like("%.pdf") 

448 | ResearchResource.url.like("%arxiv.org%") 

449 | ResearchResource.url.like( 

450 "%ncbi.nlm.nih.gov/pmc%" 

451 ), 

452 Integer, 

453 ) 

454 ).label("downloadable_count"), 

455 ) 

456 .outerjoin( 

457 ResearchResource, 

458 ResearchHistory.id == ResearchResource.research_id, 

459 ) 

460 .outerjoin( 

461 Document, 

462 ResearchResource.id == Document.resource_id, 

463 ) 

464 .group_by(ResearchHistory.id) 

465 .order_by(ResearchHistory.created_at.desc()) 

466 .all() 

467 ) 

468 

469 research_list = [] 

470 for ( 

471 research, 

472 total_resources, 

473 downloaded_count, 

474 downloadable_count, 

475 ) in results: 

476 # Get rating if exists 

477 rating = ( 

478 session.query(ResearchRating) 

479 .filter_by(research_id=research.id) 

480 .first() 

481 ) 

482 

483 # Get domain breakdown - simplified version 

484 # Extract domain from URLs using SQL case statements 

485 domains = ( 

486 session.query( 

487 case( 

488 ( 

489 ResearchResource.url.like("%arxiv.org%"), 

490 "arxiv.org", 

491 ), 

492 (ResearchResource.url.like("%pubmed%"), "pubmed"), 

493 ( 

494 ResearchResource.url.like("%ncbi.nlm.nih.gov%"), 

495 "pubmed", 

496 ), 

497 else_="other", 

498 ).label("domain"), 

499 func.count().label("count"), 

500 ) 

501 .filter(ResearchResource.research_id == research.id) 

502 .group_by("domain") 

503 .limit(5) 

504 .all() 

505 ) 

506 

507 research_list.append( 

508 { 

509 "id": research.id, 

510 "title": research.title, 

511 "query": research.query, 

512 "mode": research.mode, 

513 "status": research.status, 

514 "created_at": research.created_at, 

515 "duration_seconds": research.duration_seconds, 

516 "total_resources": total_resources or 0, 

517 "downloaded_count": downloaded_count or 0, 

518 "downloadable_count": downloadable_count or 0, 

519 "rating": rating.rating if rating else None, 

520 "top_domains": [(d, c) for d, c in domains if d], 

521 } 

522 ) 

523 

524 return research_list 

525 

526 def get_document_by_id(self, doc_id: str) -> Optional[Dict]: 

527 """ 

528 Get a specific document by its ID. 

529 

530 Returns document information with file path. 

531 """ 

532 with get_user_db_session(self.username) as session: 

533 # Find document - use outer joins to support both research downloads and user uploads 

534 result = ( 

535 session.query(Document, ResearchResource, ResearchHistory) 

536 .outerjoin( 

537 ResearchResource, 

538 Document.resource_id == ResearchResource.id, 

539 ) 

540 .outerjoin( 

541 ResearchHistory, 

542 Document.research_id == ResearchHistory.id, 

543 ) 

544 .filter(Document.id == doc_id) 

545 .first() 

546 ) 

547 

548 if result: 548 ↛ 550line 548 didn't jump to line 550 because the condition on line 548 was never true

549 # Found document 

550 doc, resource, research = result 

551 

552 # Get RAG indexing status across all collections 

553 doc_collections = ( 

554 session.query(DocumentCollection, Collection) 

555 .join(Collection) 

556 .filter(DocumentCollection.document_id == doc_id) 

557 .all() 

558 ) 

559 

560 # Check if indexed in any collection 

561 has_rag_indexed = any( 

562 dc.indexed for dc, coll in doc_collections 

563 ) 

564 total_chunks = sum( 

565 dc.chunk_count for dc, coll in doc_collections if dc.indexed 

566 ) 

567 

568 # Build collections list 

569 collections_list = [ 

570 { 

571 "id": coll.id, 

572 "name": coll.name, 

573 "indexed": dc.indexed, 

574 "chunk_count": dc.chunk_count, 

575 } 

576 for dc, coll in doc_collections 

577 ] 

578 

579 # Calculate word count from text content 

580 word_count = ( 

581 len(doc.text_content.split()) if doc.text_content else 0 

582 ) 

583 

584 # Check if PDF is available (database OR filesystem) 

585 has_pdf = bool( 

586 doc.file_path 

587 and doc.file_path != "metadata_only" 

588 and doc.file_path != "text_only_not_stored" 

589 ) 

590 if not has_pdf and doc.storage_mode == "database": 

591 has_pdf = self._has_blob_in_db(session, doc.id) 

592 

593 return { 

594 "id": doc.id, 

595 "resource_id": doc.resource_id, 

596 "research_id": doc.research_id, 

597 "document_title": doc.title 

598 or (resource.title if resource else doc.filename), 

599 "original_url": doc.original_url 

600 or (resource.url if resource else None), 

601 "file_path": doc.file_path, 

602 "file_absolute_path": str( 

603 get_absolute_path_from_settings(doc.file_path) 

604 ) 

605 if doc.file_path 

606 and doc.file_path 

607 not in ("metadata_only", "text_only_not_stored") 

608 else None, 

609 "file_name": Path(doc.file_path).name 

610 if doc.file_path 

611 and doc.file_path 

612 not in ("metadata_only", "text_only_not_stored") 

613 else doc.filename, 

614 "file_size": doc.file_size, 

615 "file_type": doc.file_type, 

616 "mime_type": doc.mime_type, 

617 "domain": self._extract_domain(resource.url) 

618 if resource 

619 else "User Upload", 

620 "download_status": doc.status, 

621 "downloaded_at": doc.processed_at.isoformat() 

622 if doc.processed_at 

623 and hasattr(doc.processed_at, "isoformat") 

624 else str(doc.processed_at) 

625 if doc.processed_at 

626 else ( 

627 doc.uploaded_at.isoformat() 

628 if hasattr(doc, "uploaded_at") and doc.uploaded_at 

629 else None 

630 ), 

631 "favorite": doc.favorite 

632 if hasattr(doc, "favorite") 

633 else False, 

634 "tags": doc.tags if hasattr(doc, "tags") else [], 

635 "research_title": research.query[:100] 

636 if research 

637 else "User Upload", 

638 "research_created_at": research.created_at 

639 if research and isinstance(research.created_at, str) 

640 else research.created_at.isoformat() 

641 if research and research.created_at 

642 else None, 

643 # Document fields 

644 "is_pdf": doc.file_type == "pdf", 

645 "has_pdf": has_pdf, 

646 "has_text_db": bool(doc.text_content), 

647 "has_rag_indexed": has_rag_indexed, 

648 "rag_chunk_count": total_chunks, 

649 "word_count": word_count, 

650 "collections": collections_list, 

651 } 

652 

653 # Not found 

654 return None 

655 

656 def toggle_favorite(self, document_id: str) -> bool: 

657 """Toggle favorite status of a document.""" 

658 with get_user_db_session(self.username) as session: 

659 doc = session.query(Document).get(document_id) 

660 if doc: 

661 doc.favorite = not doc.favorite 

662 session.commit() 

663 return doc.favorite 

664 return False 

665 

666 def delete_document(self, document_id: str) -> bool: 

667 """Delete a document from library (file and database entry).""" 

668 with get_user_db_session(self.username) as session: 

669 doc = session.query(Document).get(document_id) 

670 if not doc: 670 ↛ 674line 670 didn't jump to line 674 because the condition on line 670 was always true

671 return False 

672 

673 # Get file path from tracker (only if document has original_url) 

674 tracker = None 

675 if doc.original_url: 

676 tracker = ( 

677 session.query(DownloadTracker) 

678 .filter_by(url_hash=self._get_url_hash(doc.original_url)) 

679 .first() 

680 ) 

681 

682 # Delete physical file 

683 if tracker and tracker.file_path: 

684 try: 

685 file_path = get_absolute_path_from_settings( 

686 tracker.file_path 

687 ) 

688 if file_path.exists(): 

689 file_path.unlink() 

690 logger.info(f"Deleted file: {file_path}") 

691 except Exception: 

692 logger.exception("Failed to delete file") 

693 

694 # Update tracker 

695 if tracker: 

696 tracker.is_downloaded = False 

697 tracker.file_path = None 

698 

699 # Delete document and all related records 

700 from ..deletion.utils.cascade_helper import CascadeHelper 

701 

702 CascadeHelper.delete_document_completely(session, document_id) 

703 session.commit() 

704 

705 return True 

706 

707 def open_file_location(self, document_id: str) -> bool: 

708 """Open the folder containing the document.""" 

709 with get_user_db_session(self.username) as session: 

710 doc = session.query(Document).get(document_id) 

711 if not doc: 

712 return False 

713 

714 tracker = None 

715 if doc.original_url: 

716 tracker = ( 

717 session.query(DownloadTracker) 

718 .filter_by(url_hash=self._get_url_hash(doc.original_url)) 

719 .first() 

720 ) 

721 

722 if tracker and tracker.file_path: 

723 # Validate path is within library root to prevent traversal attacks 

724 library_root = get_absolute_path_from_settings("") 

725 try: 

726 validated_path = PathValidator.validate_safe_path( 

727 tracker.file_path, library_root, allow_absolute=False 

728 ) 

729 if validated_path and validated_path.exists(): 

730 return open_file_location(str(validated_path)) 

731 except ValueError as e: 

732 logger.warning(f"Path validation failed: {e}") 

733 return False 

734 

735 return False 

736 

737 def get_unique_domains(self) -> List[str]: 

738 """Get list of unique domains in library.""" 

739 from sqlalchemy import case 

740 

741 with get_user_db_session(self.username) as session: 

742 # Extract domains from URLs using SQL case statement 

743 domains = ( 

744 session.query( 

745 func.distinct( 

746 case( 

747 ( 

748 Document.original_url.like("%arxiv.org%"), 

749 "arxiv.org", 

750 ), 

751 ( 

752 Document.original_url.like("%pubmed%"), 

753 "pubmed", 

754 ), 

755 ( 

756 Document.original_url.like( 

757 "%ncbi.nlm.nih.gov%" 

758 ), 

759 "pubmed", 

760 ), 

761 else_="other", 

762 ) 

763 ) 

764 ) 

765 .filter(Document.original_url.isnot(None)) 

766 .all() 

767 ) 

768 

769 return [d[0] for d in domains if d[0]] 

770 

771 def _extract_domain(self, url: str) -> str: 

772 """Extract domain from URL.""" 

773 from urllib.parse import urlparse 

774 

775 try: 

776 return urlparse(url).netloc 

777 except: 

778 return "" 

779 

780 def _get_url_hash(self, url: str) -> str: 

781 """Generate hash for URL.""" 

782 import re 

783 

784 # Normalize URL 

785 url = re.sub(r"^https?://", "", url) 

786 url = re.sub(r"^www\.", "", url) 

787 url = url.rstrip("/") 

788 

789 return get_url_hash(url) 

790 

791 def _get_storage_path(self) -> str: 

792 """Get library storage path from settings (respects LDR_DATA_DIR).""" 

793 from ...utilities.db_utils import get_settings_manager 

794 

795 settings = get_settings_manager() 

796 return str( 

797 Path( 

798 settings.get_setting( 

799 "research_library.storage_path", 

800 str(get_library_directory()), 

801 ) 

802 ).expanduser() 

803 ) 

804 

805 def sync_library_with_filesystem(self) -> Dict: 

806 """ 

807 Sync library database with filesystem. 

808 Check which PDF files exist and update database accordingly. 

809 

810 Returns: 

811 Statistics about the sync operation 

812 """ 

813 with get_user_db_session(self.username) as session: 

814 # Get all documents marked as completed 

815 documents = ( 

816 session.query(Document) 

817 .filter_by(status=DocumentStatus.COMPLETED) 

818 .all() 

819 ) 

820 

821 stats = { 

822 "total_documents": len(documents), 

823 "files_found": 0, 

824 "files_missing": 0, 

825 "trackers_updated": 0, 

826 "missing_files": [], 

827 } 

828 

829 # Sync documents with filesystem 

830 for doc in documents: 

831 # Get download tracker 

832 tracker = ( 

833 session.query(DownloadTracker) 

834 .filter_by(url_hash=self._get_url_hash(doc.original_url)) 

835 .first() 

836 ) 

837 

838 if tracker and tracker.file_path: 

839 # Check if file exists 

840 file_path = get_absolute_path_from_settings( 

841 tracker.file_path 

842 ) 

843 if file_path.exists(): 

844 stats["files_found"] += 1 

845 else: 

846 # File missing - delete the document entry so it can be re-downloaded 

847 stats["files_missing"] += 1 

848 stats["missing_files"].append( 

849 { 

850 "id": doc.id, 

851 "title": doc.title, 

852 "path": str(file_path), 

853 "url": doc.original_url, 

854 } 

855 ) 

856 

857 # Reset tracker 

858 tracker.is_downloaded = False 

859 tracker.file_path = None 

860 

861 # Delete the document entry so it can be re-queued 

862 from ..deletion.utils.cascade_helper import ( 

863 CascadeHelper, 

864 ) 

865 

866 CascadeHelper.delete_document_completely( 

867 session, doc.id 

868 ) 

869 stats["trackers_updated"] += 1 

870 else: 

871 # No tracker or path - delete the document entry 

872 stats["files_missing"] += 1 

873 from ..deletion.utils.cascade_helper import CascadeHelper 

874 

875 CascadeHelper.delete_document_completely(session, doc.id) 

876 

877 session.commit() 

878 logger.info( 

879 f"Library sync completed: {stats['files_found']} found, {stats['files_missing']} missing" 

880 ) 

881 

882 return stats 

883 

884 def mark_for_redownload(self, document_ids: List[str]) -> int: 

885 """ 

886 Mark specific documents for re-download. 

887 

888 Args: 

889 document_ids: List of document IDs to mark for re-download 

890 

891 Returns: 

892 Number of documents marked 

893 """ 

894 with get_user_db_session(self.username) as session: 

895 count = 0 

896 for doc_id in document_ids: 

897 doc = session.query(Document).get(doc_id) 

898 if doc: 

899 # Get tracker and reset it 

900 tracker = ( 

901 session.query(DownloadTracker) 

902 .filter_by( 

903 url_hash=self._get_url_hash(doc.original_url) 

904 ) 

905 .first() 

906 ) 

907 

908 if tracker: 

909 tracker.is_downloaded = False 

910 tracker.file_path = None 

911 

912 # Mark document as pending 

913 doc.status = DocumentStatus.PENDING 

914 count += 1 

915 

916 session.commit() 

917 logger.info(f"Marked {count} documents for re-download") 

918 return count