Coverage for src / local_deep_research / research_library / services / library_service.py: 87%
353 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Library Management Service
4Handles querying and managing the downloaded document library:
5- Search and filter documents
6- Get statistics and analytics
7- Manage collections and favorites
8- Handle file operations
9"""
11from pathlib import Path
12from typing import Dict, List, Optional
13from urllib.parse import urlparse
15from loguru import logger
16from sqlalchemy import or_, func, case
17from sqlalchemy.orm import aliased
19from ...constants import FILE_PATH_SENTINELS
20from ...database.models.download_tracker import DownloadTracker
21from ...database.models.library import (
22 Collection,
23 Document,
24 DocumentBlob,
25 DocumentCollection,
26 DocumentStatus,
27)
28from ...database.models.metrics import ResearchRating
29from ...database.models.research import ResearchHistory, ResearchResource
30from ...database.session_context import get_user_db_session
31from ...security import PathValidator
32from ...config.paths import get_library_directory
33from ..utils import (
34 get_absolute_path_from_settings,
35 get_url_hash,
36 open_file_location,
37)
40class LibraryService:
41 """Service for managing and querying the document library."""
43 def __init__(self, username: str):
44 """Initialize library service for a user."""
45 self.username = username
47 def _has_blob_in_db(self, session, document_id: str) -> bool:
48 """Check if a PDF blob exists in the database for a document."""
49 return (
50 session.query(DocumentBlob.document_id)
51 .filter_by(document_id=document_id)
52 .first()
53 is not None
54 )
56 def _get_safe_absolute_path(self, file_path: str) -> Optional[str]:
57 """
58 Get the absolute path for a file, safely handling invalid paths.
60 Args:
61 file_path: Relative file path from library root
63 Returns:
64 Absolute path as string, or None if path is invalid/unsafe
65 """
66 if not file_path or file_path in FILE_PATH_SENTINELS:
67 return None
68 abs_path = get_absolute_path_from_settings(file_path)
69 return str(abs_path) if abs_path else None
71 def _is_arxiv_url(self, url: str) -> bool:
72 """Check if URL is from arXiv domain."""
73 try:
74 hostname = urlparse(url).hostname
75 return bool(
76 hostname
77 and (hostname == "arxiv.org" or hostname.endswith(".arxiv.org"))
78 )
79 except Exception:
80 return False
82 def _is_pubmed_url(self, url: str) -> bool:
83 """Check if URL is from PubMed or NCBI domains."""
84 try:
85 parsed = urlparse(url)
86 hostname = parsed.hostname
87 if not hostname:
88 return False
90 # Check for pubmed.ncbi.nlm.nih.gov
91 if hostname == "pubmed.ncbi.nlm.nih.gov":
92 return True
94 # Check for ncbi.nlm.nih.gov with PMC path
95 if hostname == "ncbi.nlm.nih.gov" and "/pmc" in parsed.path:
96 return True
98 # Check for pubmed in subdomain
99 if "pubmed" in hostname:
100 return True
102 return False
103 except Exception:
104 return False
106 def _apply_date_filter(self, query, model_class, date_filter: str):
107 """Apply date range filter based on processed_at timestamp."""
108 from datetime import datetime, timedelta, timezone
110 now = datetime.now(timezone.utc)
111 if date_filter == "today":
112 cutoff = now.replace(hour=0, minute=0, second=0, microsecond=0)
113 elif date_filter == "week":
114 cutoff = now - timedelta(days=7)
115 elif date_filter == "month":
116 cutoff = now - timedelta(days=30)
117 else:
118 return query
119 return query.filter(model_class.processed_at >= cutoff)
121 @staticmethod
122 def _escape_like(value: str) -> str:
123 """Escape SQL LIKE wildcards (%, _) and the escape char itself.
125 Without this, a value like ``my_journal`` would have ``_`` interpreted
126 as a single-character wildcard, and ``%`` would match anything.
127 Used together with ``escape="\\\\"`` on the .like()/.ilike() call.
128 """
129 return (
130 value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
131 )
133 def _apply_domain_filter(self, query, model_class, domain: str):
134 """Apply domain filter to query for Document.
136 The dropdown is fully data-driven (populated from get_unique_domains),
137 so the filter is a generic substring match against original_url.
138 """
139 pattern = f"%{self._escape_like(domain)}%"
140 return query.filter(model_class.original_url.like(pattern, escape="\\"))
142 def _apply_search_filter(self, query, model_class, search_query: str):
143 """Apply search filter to query for Document."""
144 search_pattern = f"%{self._escape_like(search_query)}%"
145 return query.filter(
146 or_(
147 model_class.title.ilike(search_pattern, escape="\\"),
148 model_class.authors.ilike(search_pattern, escape="\\"),
149 model_class.doi.ilike(search_pattern, escape="\\"),
150 ResearchResource.title.ilike(search_pattern, escape="\\"),
151 )
152 )
154 def get_library_stats(self) -> Dict:
155 """Get overall library statistics."""
156 with get_user_db_session(self.username) as session:
157 # Get document counts
158 total_docs = session.query(Document).count()
159 total_pdfs = (
160 session.query(Document).filter_by(file_type="pdf").count()
161 )
163 # Get size stats
164 size_result = session.query(
165 func.sum(Document.file_size),
166 func.avg(Document.file_size),
167 ).first()
169 total_size = size_result[0] or 0
170 avg_size = size_result[1] or 0
172 # Get research stats
173 research_count = session.query(
174 func.count(func.distinct(Document.research_id))
175 ).scalar()
177 # Get domain stats - count unique domains from URLs
178 # Extract domain from original_url using SQL functions
179 from sqlalchemy import case, func as sql_func
181 # Count unique domains by extracting them from URLs
182 domain_subquery = session.query(
183 sql_func.distinct(
184 case(
185 (
186 Document.original_url.like("%arxiv.org%"),
187 "arxiv.org",
188 ),
189 (
190 Document.original_url.like("%pubmed%"),
191 "pubmed",
192 ),
193 (
194 Document.original_url.like("%ncbi.nlm.nih.gov%"),
195 "pubmed",
196 ),
197 else_="other",
198 )
199 )
200 ).subquery()
202 domain_count = (
203 session.query(sql_func.count())
204 .select_from(domain_subquery)
205 .scalar()
206 )
208 # Get download tracker stats
209 pending_downloads = (
210 session.query(DownloadTracker)
211 .filter_by(is_downloaded=False)
212 .count()
213 )
215 return {
216 "total_documents": total_docs,
217 "total_pdfs": total_pdfs,
218 "total_size_bytes": total_size,
219 "total_size_mb": total_size / (1024 * 1024)
220 if total_size
221 else 0,
222 "average_size_mb": avg_size / (1024 * 1024) if avg_size else 0,
223 "research_sessions": research_count,
224 "unique_domains": domain_count,
225 "pending_downloads": pending_downloads,
226 "storage_path": self._get_storage_path(),
227 }
229 def count_documents(
230 self,
231 research_id: Optional[str] = None,
232 domain: Optional[str] = None,
233 collection_id: Optional[str] = None,
234 date_filter: Optional[str] = None,
235 ) -> int:
236 """Count documents matching the given filters (for pagination)."""
237 with get_user_db_session(self.username) as session:
238 from ...database.library_init import get_default_library_id
240 if not collection_id:
241 collection_id = get_default_library_id(self.username)
243 q = (
244 session.query(func.count(Document.id))
245 .join(
246 DocumentCollection,
247 Document.id == DocumentCollection.document_id,
248 )
249 .filter(DocumentCollection.collection_id == collection_id)
250 .filter(Document.status == "completed")
251 )
253 if research_id:
254 q = q.filter(Document.research_id == research_id)
255 if domain:
256 q = self._apply_domain_filter(q, Document, domain)
257 if date_filter:
258 q = self._apply_date_filter(q, Document, date_filter)
260 return q.scalar() or 0
262 def get_documents(
263 self,
264 research_id: Optional[str] = None,
265 domain: Optional[str] = None,
266 file_type: Optional[str] = None,
267 favorites_only: bool = False,
268 search_query: Optional[str] = None,
269 collection_id: Optional[str] = None,
270 date_filter: Optional[str] = None,
271 limit: int = 100,
272 offset: int = 0,
273 ) -> List[Dict]:
274 """
275 Get documents with filtering options.
277 Returns enriched document information with research details.
278 """
279 with get_user_db_session(self.username) as session:
280 # Get default Library collection ID if not specified
281 from ...database.library_init import get_default_library_id
283 if not collection_id:
284 collection_id = get_default_library_id(self.username)
286 logger.info(
287 f"[LibraryService] Getting documents for collection_id: {collection_id}, research_id: {research_id}, domain: {domain}"
288 )
290 all_documents = []
292 # Step 1: subquery to get paginated document IDs.
293 # Pagination and sorting happen here on Document alone,
294 # avoiding non-determinism from outer-joining related tables.
295 doc_subq = (
296 session.query(Document.id)
297 .join(
298 DocumentCollection,
299 Document.id == DocumentCollection.document_id,
300 )
301 .filter(DocumentCollection.collection_id == collection_id)
302 )
304 # Apply filters
305 if research_id: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true
306 doc_subq = doc_subq.filter(Document.research_id == research_id)
308 if domain: 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true
309 doc_subq = self._apply_domain_filter(doc_subq, Document, domain)
311 if date_filter: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true
312 doc_subq = self._apply_date_filter(
313 doc_subq, Document, date_filter
314 )
316 if file_type: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true
317 doc_subq = doc_subq.filter(Document.file_type == file_type)
319 if favorites_only: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true
320 doc_subq = doc_subq.filter(Document.favorite.is_(True))
322 if search_query:
323 # _apply_search_filter references ResearchResource.title,
324 # so we must outerjoin it here. DISTINCT prevents fan-out
325 # from the outerjoin duplicating Document IDs.
326 doc_subq = doc_subq.outerjoin(
327 ResearchResource,
328 (Document.resource_id == ResearchResource.id)
329 | (ResearchResource.document_id == Document.id),
330 ).distinct()
331 doc_subq = self._apply_search_filter(
332 doc_subq, Document, search_query
333 )
335 # Filter to only completed documents
336 doc_subq = doc_subq.filter(Document.status == "completed")
338 # Sort at SQL level (SQLite-safe NULL handling)
339 doc_subq = doc_subq.order_by(
340 case((Document.processed_at.isnot(None), 0), else_=1),
341 Document.processed_at.desc(),
342 )
344 # Apply SQL-level pagination
345 doc_subq = doc_subq.offset(offset).limit(limit)
346 doc_id_subq = doc_subq.subquery()
348 # Step 2: join the paginated document IDs with related tables.
349 # Use two separate outer joins for ResearchResource to avoid
350 # the OR-condition join that can fan out to multiple rows:
351 # - ResourceByFK: matched via Document.resource_id (primary FK)
352 # - ResourceByDoc: matched via ResearchResource.document_id
353 # We prefer ResourceByFK; fall back to ResourceByDoc in Python.
354 ResourceByFK = aliased(ResearchResource)
355 ResourceByDoc = aliased(ResearchResource)
357 query = (
358 session.query(
359 Document,
360 ResourceByFK,
361 ResourceByDoc,
362 ResearchHistory,
363 DocumentCollection,
364 )
365 .join(doc_id_subq, Document.id == doc_id_subq.c.id)
366 .join(
367 DocumentCollection,
368 Document.id == DocumentCollection.document_id,
369 )
370 .outerjoin(
371 ResourceByFK,
372 Document.resource_id == ResourceByFK.id,
373 )
374 .outerjoin(
375 ResourceByDoc,
376 ResourceByDoc.document_id == Document.id,
377 )
378 .outerjoin(
379 ResearchHistory,
380 Document.research_id == ResearchHistory.id,
381 )
382 .filter(DocumentCollection.collection_id == collection_id)
383 # Re-apply sort so final results are ordered
384 .order_by(
385 case((Document.processed_at.isnot(None), 0), else_=1),
386 Document.processed_at.desc(),
387 )
388 )
390 # Execute query
391 results = query.all()
392 logger.info(
393 f"[LibraryService] Found {len(results)} documents in collection {collection_id}"
394 )
396 # Batch-check blob existence to avoid N+1 queries
397 doc_ids = [row[0].id for row in results]
398 blob_ids = set()
399 if doc_ids:
400 blob_ids = {
401 r[0]
402 for r in session.query(DocumentBlob.document_id)
403 .filter(DocumentBlob.document_id.in_(doc_ids))
404 .all()
405 }
407 # Process results — deduplicate by doc.id since the ResourceByDoc
408 # outer join can fan out when multiple ResearchResource rows
409 # point to the same document via document_id.
410 seen_doc_ids = set()
411 for doc, res_by_fk, res_by_doc, research, doc_collection in results:
412 if doc.id in seen_doc_ids:
413 continue
414 seen_doc_ids.add(doc.id)
415 # Prefer the resource matched via Document.resource_id FK;
416 # fall back to the one matched via ResearchResource.document_id.
417 resource = res_by_fk or res_by_doc
418 # Determine availability flags - use Document.file_path directly
419 file_absolute_path = None
420 if doc.file_path and doc.file_path not in FILE_PATH_SENTINELS:
421 abs_path = get_absolute_path_from_settings(doc.file_path)
422 if abs_path: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true
423 file_absolute_path = str(abs_path)
425 # Check if PDF is available (filesystem OR database)
426 has_pdf = bool(file_absolute_path)
427 if not has_pdf and doc.storage_mode == "database":
428 has_pdf = doc.id in blob_ids
429 has_text_db = bool(doc.text_content) # Text now in Document
431 # Use DocumentCollection from query results
432 has_rag_indexed = (
433 doc_collection.indexed if doc_collection else False
434 )
435 rag_chunk_count = (
436 doc_collection.chunk_count if doc_collection else 0
437 )
439 all_documents.append(
440 {
441 "id": doc.id,
442 "resource_id": doc.resource_id,
443 "research_id": doc.research_id,
444 # Document info
445 "document_title": doc.title
446 or (resource.title if resource else doc.filename),
447 "authors": doc.authors,
448 "published_date": doc.published_date,
449 "doi": doc.doi,
450 "arxiv_id": doc.arxiv_id,
451 "pmid": doc.pmid,
452 # File info
453 "file_path": doc.file_path,
454 "file_absolute_path": file_absolute_path,
455 "file_name": Path(doc.file_path).name
456 if doc.file_path
457 and doc.file_path not in FILE_PATH_SENTINELS
458 else doc.filename,
459 "file_size": doc.file_size,
460 "file_type": doc.file_type,
461 # URLs
462 "original_url": doc.original_url,
463 "domain": self._extract_domain(doc.original_url)
464 if doc.original_url
465 else "User Upload",
466 # Status
467 "download_status": doc.status or "completed",
468 "downloaded_at": doc.processed_at.isoformat()
469 if doc.processed_at
470 else (
471 doc.uploaded_at.isoformat()
472 if hasattr(doc, "uploaded_at") and doc.uploaded_at
473 else None
474 ),
475 "favorite": doc.favorite
476 if hasattr(doc, "favorite")
477 else False,
478 "tags": doc.tags if hasattr(doc, "tags") else [],
479 # Research info (None for user uploads)
480 "research_title": research.title or research.query[:80]
481 if research
482 else "User Upload",
483 "research_query": research.query if research else None,
484 "research_mode": research.mode if research else None,
485 "research_date": research.created_at
486 if research
487 else None,
488 # Classification flags
489 "is_arxiv": self._is_arxiv_url(doc.original_url)
490 if doc.original_url
491 else False,
492 "is_pubmed": self._is_pubmed_url(doc.original_url)
493 if doc.original_url
494 else False,
495 "is_pdf": doc.file_type == "pdf",
496 # Availability flags
497 "has_pdf": has_pdf,
498 "has_text_db": has_text_db,
499 "has_rag_indexed": has_rag_indexed,
500 "rag_chunk_count": rag_chunk_count,
501 }
502 )
504 # Sorting and pagination are now handled at SQL level
505 return all_documents
507 def get_all_collections(self) -> List[Dict]:
508 """Get all collections with document and indexed document counts."""
509 with get_user_db_session(self.username) as session:
510 # Query collections with document counts and indexed counts
511 results = (
512 session.query(
513 Collection,
514 func.count(DocumentCollection.document_id).label(
515 "document_count"
516 ),
517 func.count(
518 case(
519 (
520 DocumentCollection.indexed == True, # noqa: E712
521 DocumentCollection.document_id,
522 ),
523 else_=None,
524 )
525 ).label("indexed_document_count"),
526 )
527 .outerjoin(
528 DocumentCollection,
529 Collection.id == DocumentCollection.collection_id,
530 )
531 .group_by(Collection.id)
532 .order_by(Collection.is_default.desc(), Collection.name)
533 .all()
534 )
536 logger.info(f"[LibraryService] Found {len(results)} collections")
538 collections = []
539 for collection, doc_count, indexed_count in results:
540 logger.debug(
541 f"[LibraryService] Collection: {collection.name} (ID: {collection.id}), documents: {doc_count}, indexed: {indexed_count}"
542 )
543 collections.append(
544 {
545 "id": collection.id,
546 "name": collection.name,
547 "description": collection.description,
548 "is_default": collection.is_default,
549 "document_count": doc_count or 0,
550 "indexed_document_count": indexed_count or 0,
551 }
552 )
554 return collections
556 def get_research_list_for_dropdown(self) -> List[Dict]:
557 """Get minimal research session info for filter dropdowns.
559 Returns only id, title, and query — no joins or aggregates.
560 """
561 with get_user_db_session(self.username) as session:
562 results = (
563 session.query(
564 ResearchHistory.id,
565 ResearchHistory.title,
566 ResearchHistory.query,
567 )
568 .order_by(ResearchHistory.created_at.desc())
569 .all()
570 )
571 return [
572 {"id": r.id, "title": r.title, "query": r.query}
573 for r in results
574 ]
576 def get_research_list_with_stats(
577 self,
578 limit: int = 0,
579 offset: int = 0,
580 ) -> List[Dict]:
581 """Get research sessions with download statistics.
583 Args:
584 limit: Maximum number of results (0 = no limit, for backwards compat).
585 offset: Number of rows to skip. Only applied when limit > 0.
586 """
587 with get_user_db_session(self.username) as session:
588 # Query research sessions with resource counts
589 query = (
590 session.query(
591 ResearchHistory,
592 func.count(func.distinct(ResearchResource.id)).label(
593 "total_resources"
594 ),
595 func.count(
596 func.distinct(
597 case(
598 (Document.status == "completed", Document.id),
599 else_=None,
600 )
601 )
602 ).label("downloaded_count"),
603 func.count(
604 func.distinct(
605 case(
606 (
607 ResearchResource.url.like("%.pdf")
608 | ResearchResource.url.like("%arxiv.org%")
609 | ResearchResource.url.like(
610 "%ncbi.nlm.nih.gov/pmc%"
611 ),
612 ResearchResource.id,
613 ),
614 else_=None,
615 )
616 )
617 ).label("downloadable_count"),
618 )
619 .outerjoin(
620 ResearchResource,
621 ResearchHistory.id == ResearchResource.research_id,
622 )
623 .outerjoin(
624 Document,
625 (ResearchResource.id == Document.resource_id)
626 | (ResearchResource.document_id == Document.id),
627 )
628 .group_by(ResearchHistory.id)
629 .order_by(ResearchHistory.created_at.desc())
630 )
632 # Apply SQL-level pagination when limit is set
633 if limit > 0:
634 query = query.offset(offset).limit(limit)
636 results = query.all()
638 # Preload all ratings to avoid N+1 queries
639 research_ids = [r[0].id for r in results]
640 all_ratings = (
641 session.query(ResearchRating)
642 .filter(ResearchRating.research_id.in_(research_ids))
643 .all()
644 if research_ids
645 else []
646 )
647 ratings_by_research = {r.research_id: r for r in all_ratings}
649 # Batch domain queries to avoid N+1 (same pattern as ratings)
650 domain_case = case(
651 (
652 ResearchResource.url.like("%arxiv.org%"),
653 "arxiv.org",
654 ),
655 (ResearchResource.url.like("%pubmed%"), "pubmed"),
656 (
657 ResearchResource.url.like("%ncbi.nlm.nih.gov%"),
658 "pubmed",
659 ),
660 else_="other",
661 )
662 all_domains = (
663 session.query(
664 ResearchResource.research_id,
665 domain_case.label("domain"),
666 func.count().label("count"),
667 )
668 .filter(ResearchResource.research_id.in_(research_ids))
669 .group_by(ResearchResource.research_id, domain_case)
670 .all()
671 if research_ids
672 else []
673 )
674 domains_by_research: Dict[str, list] = {}
675 for rid, domain, count in all_domains:
676 domains_by_research.setdefault(rid, []).append((domain, count))
678 research_list = []
679 for (
680 research,
681 total_resources,
682 downloaded_count,
683 downloadable_count,
684 ) in results:
685 # Get rating from preloaded dict
686 rating = ratings_by_research.get(research.id)
688 # Get domain breakdown from preloaded dict
689 domains = domains_by_research.get(research.id, [])
691 research_list.append(
692 {
693 "id": research.id,
694 "title": research.title,
695 "query": research.query,
696 "mode": research.mode,
697 "status": research.status,
698 "created_at": research.created_at,
699 "duration_seconds": research.duration_seconds,
700 "total_resources": total_resources or 0,
701 "downloaded_count": downloaded_count or 0,
702 "downloadable_count": downloadable_count or 0,
703 "rating": rating.rating if rating else None,
704 "top_domains": [(d, c) for d, c in domains if d],
705 }
706 )
708 return research_list
710 def get_download_manager_summary_stats(self) -> Dict:
711 """Get aggregate download stats across ALL research sessions.
713 This is a lightweight query that only returns totals — used for
714 the download-manager header so stats remain accurate regardless
715 of which page the user is viewing.
716 """
717 with get_user_db_session(self.username) as session:
718 row = (
719 session.query(
720 func.count(func.distinct(ResearchHistory.id)).label(
721 "total_researches"
722 ),
723 func.count(func.distinct(ResearchResource.id)).label(
724 "total_resources"
725 ),
726 func.count(
727 func.distinct(
728 case(
729 (Document.status == "completed", Document.id),
730 else_=None,
731 )
732 )
733 ).label("downloaded_count"),
734 func.count(
735 func.distinct(
736 case(
737 (
738 ResearchResource.url.like("%.pdf")
739 | ResearchResource.url.like("%arxiv.org%")
740 | ResearchResource.url.like(
741 "%ncbi.nlm.nih.gov/pmc%"
742 ),
743 ResearchResource.id,
744 ),
745 else_=None,
746 )
747 )
748 ).label("downloadable_count"),
749 )
750 .select_from(ResearchHistory)
751 .outerjoin(
752 ResearchResource,
753 ResearchHistory.id == ResearchResource.research_id,
754 )
755 .outerjoin(
756 Document,
757 (ResearchResource.id == Document.resource_id)
758 | (ResearchResource.document_id == Document.id),
759 )
760 .one()
761 )
763 total_researches = row.total_researches or 0
764 total_resources = row.total_resources or 0
765 downloaded = row.downloaded_count or 0
766 downloadable = row.downloadable_count or 0
768 return {
769 "total_researches": total_researches,
770 "total_resources": total_resources,
771 "already_downloaded": downloaded,
772 "available_to_download": max(downloadable - downloaded, 0),
773 }
775 def get_pdf_previews_batch(
776 self, research_ids: List, limit_per_research: int = 10
777 ) -> Dict[str, Dict]:
778 """Batch-fetch PDF documents and domain breakdowns for multiple research sessions.
780 Returns a dict keyed by research_id with:
781 - "pdf_sources": list of document dicts (capped at limit_per_research)
782 - "domains": dict of domain -> {total, pdfs, downloaded}
783 """
784 if not research_ids:
785 return {}
787 with get_user_db_session(self.username) as session:
788 results = (
789 session.query(Document, ResearchResource)
790 .outerjoin(
791 ResearchResource,
792 (Document.resource_id == ResearchResource.id)
793 | (ResearchResource.document_id == Document.id),
794 )
795 .filter(
796 Document.research_id.in_(research_ids),
797 Document.file_type == "pdf",
798 )
799 .order_by(Document.processed_at.desc())
800 .limit(limit_per_research * len(research_ids))
801 .all()
802 )
804 previews: Dict[str, Dict] = {}
805 seen_doc_ids: set = set()
806 for doc, resource in results:
807 if doc.id in seen_doc_ids:
808 continue
809 seen_doc_ids.add(doc.id)
811 rid = doc.research_id
812 if rid not in previews:
813 previews[rid] = {"pdf_sources": [], "domains": {}}
815 entry = previews[rid]
817 # Domain breakdown (within the SQL LIMIT budget)
818 domain = "unknown"
819 if resource and resource.url:
820 try:
821 domain = urlparse(resource.url).netloc or "unknown"
822 except Exception:
823 logger.debug("Failed to parse resource URL for domain")
824 elif doc.original_url: 824 ↛ 830line 824 didn't jump to line 830 because the condition on line 824 was always true
825 try:
826 domain = urlparse(doc.original_url).netloc or "unknown"
827 except Exception:
828 logger.debug("Failed to parse document URL for domain")
830 if domain not in entry["domains"]:
831 entry["domains"][domain] = {
832 "total": 0,
833 "pdfs": 0,
834 "downloaded": 0,
835 }
836 entry["domains"][domain]["total"] += 1
837 if doc.file_type == "pdf": 837 ↛ 839line 837 didn't jump to line 839 because the condition on line 837 was always true
838 entry["domains"][domain]["pdfs"] += 1
839 if doc.status == "completed":
840 entry["domains"][domain]["downloaded"] += 1
842 # PDF sources preview (capped)
843 if len(entry["pdf_sources"]) < limit_per_research:
844 title = "Untitled"
845 if resource and resource.title:
846 title = resource.title
847 elif doc.filename: 847 ↛ 850line 847 didn't jump to line 850 because the condition on line 847 was always true
848 title = doc.filename
850 entry["pdf_sources"].append(
851 {
852 "document_title": title,
853 "domain": domain,
854 "file_type": doc.file_type,
855 "download_status": doc.status or "unknown",
856 }
857 )
859 return previews
861 def get_document_by_id(self, doc_id: str) -> Optional[Dict]:
862 """
863 Get a specific document by its ID.
865 Returns document information with file path.
866 """
867 with get_user_db_session(self.username) as session:
868 # Find document - use outer joins to support both research downloads and user uploads
869 result = (
870 session.query(Document, ResearchResource, ResearchHistory)
871 .outerjoin(
872 ResearchResource,
873 (Document.resource_id == ResearchResource.id)
874 | (ResearchResource.document_id == Document.id),
875 )
876 .outerjoin(
877 ResearchHistory,
878 Document.research_id == ResearchHistory.id,
879 )
880 .filter(Document.id == doc_id)
881 .first()
882 )
884 if result:
885 # Found document
886 doc, resource, research = result
888 # Get RAG indexing status across all collections
889 doc_collections = (
890 session.query(DocumentCollection, Collection)
891 .join(Collection)
892 .filter(DocumentCollection.document_id == doc_id)
893 .all()
894 )
896 # Check if indexed in any collection
897 has_rag_indexed = any(
898 dc.indexed for dc, coll in doc_collections
899 )
900 total_chunks = sum(
901 dc.chunk_count for dc, coll in doc_collections if dc.indexed
902 )
904 # Build collections list
905 collections_list = [
906 {
907 "id": coll.id,
908 "name": coll.name,
909 "indexed": dc.indexed,
910 "chunk_count": dc.chunk_count,
911 }
912 for dc, coll in doc_collections
913 ]
915 # Calculate word count from text content
916 word_count = (
917 len(doc.text_content.split()) if doc.text_content else 0
918 )
920 # Check if PDF is available (database OR filesystem)
921 has_pdf = bool(
922 doc.file_path and doc.file_path not in FILE_PATH_SENTINELS
923 )
924 if not has_pdf and doc.storage_mode == "database":
925 has_pdf = self._has_blob_in_db(session, doc.id)
927 return {
928 "id": doc.id,
929 "resource_id": doc.resource_id,
930 "research_id": doc.research_id,
931 "document_title": doc.title
932 or (resource.title if resource else doc.filename),
933 "original_url": doc.original_url
934 or (resource.url if resource else None),
935 "file_path": doc.file_path,
936 "file_absolute_path": self._get_safe_absolute_path(
937 doc.file_path
938 ),
939 "file_name": Path(doc.file_path).name
940 if doc.file_path
941 and doc.file_path not in FILE_PATH_SENTINELS
942 else doc.filename,
943 "file_size": doc.file_size,
944 "file_type": doc.file_type,
945 "mime_type": doc.mime_type,
946 "domain": self._extract_domain(resource.url)
947 if resource
948 else "User Upload",
949 "download_status": doc.status,
950 "downloaded_at": doc.processed_at.isoformat()
951 if doc.processed_at
952 and hasattr(doc.processed_at, "isoformat")
953 else str(doc.processed_at)
954 if doc.processed_at
955 else (
956 doc.uploaded_at.isoformat()
957 if hasattr(doc, "uploaded_at") and doc.uploaded_at
958 else None
959 ),
960 "favorite": doc.favorite
961 if hasattr(doc, "favorite")
962 else False,
963 "tags": doc.tags if hasattr(doc, "tags") else [],
964 "research_title": research.query[:100]
965 if research
966 else "User Upload",
967 "research_created_at": research.created_at
968 if research and isinstance(research.created_at, str)
969 else research.created_at.isoformat()
970 if research and research.created_at
971 else None,
972 # Document fields
973 "is_pdf": doc.file_type == "pdf",
974 "has_pdf": has_pdf,
975 "has_text_db": bool(doc.text_content),
976 "has_rag_indexed": has_rag_indexed,
977 "rag_chunk_count": total_chunks,
978 "word_count": word_count,
979 "collections": collections_list,
980 }
982 # Not found
983 return None
985 def toggle_favorite(self, document_id: str) -> bool:
986 """Toggle favorite status of a document."""
987 with get_user_db_session(self.username) as session:
988 doc = session.query(Document).get(document_id)
989 if doc:
990 doc.favorite = not doc.favorite
991 session.commit()
992 return doc.favorite
993 return False
995 def delete_document(self, document_id: str) -> bool:
996 """Delete a document from library (file and database entry)."""
997 with get_user_db_session(self.username) as session:
998 doc = session.query(Document).get(document_id)
999 if not doc:
1000 return False
1002 # Get file path from tracker (only if document has original_url)
1003 tracker = None
1004 if doc.original_url:
1005 tracker = (
1006 session.query(DownloadTracker)
1007 .filter_by(url_hash=self._get_url_hash(doc.original_url))
1008 .first()
1009 )
1011 # Delete physical file
1012 if tracker and tracker.file_path:
1013 try:
1014 file_path = get_absolute_path_from_settings(
1015 tracker.file_path
1016 )
1017 if file_path and file_path.is_file():
1018 file_path.unlink()
1019 logger.info(f"Deleted file: {file_path}")
1020 except Exception:
1021 logger.exception("Failed to delete file")
1023 # Update tracker
1024 if tracker:
1025 tracker.is_downloaded = False
1026 tracker.file_path = None
1028 # Delete document and all related records
1029 from ..deletion.utils.cascade_helper import CascadeHelper
1031 CascadeHelper.delete_document_completely(session, document_id)
1032 session.commit()
1034 return True
1036 def open_file_location(self, document_id: str) -> bool:
1037 """Open the folder containing the document."""
1038 with get_user_db_session(self.username) as session:
1039 doc = session.query(Document).get(document_id)
1040 if not doc:
1041 return False
1043 tracker = None
1044 if doc.original_url:
1045 tracker = (
1046 session.query(DownloadTracker)
1047 .filter_by(url_hash=self._get_url_hash(doc.original_url))
1048 .first()
1049 )
1051 if tracker and tracker.file_path:
1052 # Validate path is within library root to prevent traversal attacks
1053 library_root = get_absolute_path_from_settings("")
1054 if not library_root:
1055 logger.warning("Could not determine library root")
1056 return False
1057 try:
1058 validated_path = PathValidator.validate_safe_path(
1059 tracker.file_path, library_root, allow_absolute=False
1060 )
1061 if validated_path and validated_path.is_file(): 1061 ↛ 1067line 1061 didn't jump to line 1067 because the condition on line 1061 was always true
1062 return open_file_location(str(validated_path))
1063 except ValueError:
1064 logger.warning("Path validation failed")
1065 return False
1067 return False
1069 def get_unique_domains(self) -> List[str]:
1070 """Get sorted list of unique netlocs from all document URLs."""
1071 with get_user_db_session(self.username) as session:
1072 urls = (
1073 session.query(Document.original_url)
1074 .filter(Document.original_url.isnot(None))
1075 .all()
1076 )
1077 netlocs = {self._extract_domain(row[0]) for row in urls}
1078 return sorted(n for n in netlocs if n)
1080 def _extract_domain(self, url: str) -> str:
1081 """Extract domain from URL."""
1082 from urllib.parse import urlparse
1084 try:
1085 return urlparse(url).netloc
1086 except (ValueError, AttributeError):
1087 return ""
1089 def _get_url_hash(self, url: str) -> str:
1090 """Generate hash for URL."""
1091 import re
1093 # Normalize URL
1094 url = re.sub(r"^https?://", "", url)
1095 url = re.sub(r"^www\.", "", url)
1096 url = url.rstrip("/")
1098 return get_url_hash(url)
1100 def _get_storage_path(self) -> str:
1101 """Get library storage path from settings (respects LDR_DATA_DIR)."""
1102 from ...utilities.db_utils import get_settings_manager
1104 settings = get_settings_manager()
1105 return str(
1106 Path(
1107 settings.get_setting(
1108 "research_library.storage_path",
1109 str(get_library_directory()),
1110 )
1111 )
1112 .expanduser()
1113 .resolve()
1114 )
1116 def sync_library_with_filesystem(self) -> Dict:
1117 """
1118 Sync library database with filesystem.
1119 Check which PDF files exist and update database accordingly.
1121 Returns:
1122 Statistics about the sync operation
1123 """
1124 with get_user_db_session(self.username) as session:
1125 # Get all documents marked as completed
1126 documents = (
1127 session.query(Document)
1128 .filter_by(status=DocumentStatus.COMPLETED)
1129 .all()
1130 )
1132 stats = {
1133 "total_documents": len(documents),
1134 "files_found": 0,
1135 "files_missing": 0,
1136 "trackers_updated": 0,
1137 "missing_files": [],
1138 }
1140 # Sync documents with filesystem
1141 for doc in documents:
1142 # Get download tracker
1143 tracker = (
1144 session.query(DownloadTracker)
1145 .filter_by(url_hash=self._get_url_hash(doc.original_url))
1146 .first()
1147 )
1149 if tracker and tracker.file_path:
1150 # Check if file exists
1151 file_path = get_absolute_path_from_settings(
1152 tracker.file_path
1153 )
1154 if file_path and file_path.is_file():
1155 stats["files_found"] += 1
1156 else:
1157 # File missing or path invalid - mark for re-download
1158 stats["files_missing"] += 1
1159 stats["missing_files"].append(
1160 {
1161 "id": doc.id,
1162 "title": doc.title,
1163 "path": str(file_path)
1164 if file_path
1165 else "invalid",
1166 "url": doc.original_url,
1167 }
1168 )
1170 # Reset tracker
1171 tracker.is_downloaded = False
1172 tracker.file_path = None
1174 # Delete the document entry so it can be re-queued
1175 from ..deletion.utils.cascade_helper import (
1176 CascadeHelper,
1177 )
1179 CascadeHelper.delete_document_completely(
1180 session, doc.id
1181 )
1182 stats["trackers_updated"] += 1
1183 else:
1184 # No tracker or path - delete the document entry
1185 stats["files_missing"] += 1
1186 from ..deletion.utils.cascade_helper import CascadeHelper
1188 CascadeHelper.delete_document_completely(session, doc.id)
1190 session.commit()
1191 logger.info(
1192 f"Library sync completed: {stats['files_found']} found, {stats['files_missing']} missing"
1193 )
1195 return stats
1197 def mark_for_redownload(self, document_ids: List[str]) -> int:
1198 """
1199 Mark specific documents for re-download.
1201 Args:
1202 document_ids: List of document IDs to mark for re-download
1204 Returns:
1205 Number of documents marked
1206 """
1207 with get_user_db_session(self.username) as session:
1208 count = 0
1209 for doc_id in document_ids:
1210 doc = session.query(Document).get(doc_id)
1211 if doc:
1212 # Get tracker and reset it
1213 tracker = (
1214 session.query(DownloadTracker)
1215 .filter_by(
1216 url_hash=self._get_url_hash(doc.original_url)
1217 )
1218 .first()
1219 )
1221 if tracker: 1221 ↛ 1226line 1221 didn't jump to line 1226 because the condition on line 1221 was always true
1222 tracker.is_downloaded = False
1223 tracker.file_path = None
1225 # Mark document as pending
1226 doc.status = DocumentStatus.PENDING
1227 count += 1
1229 session.commit()
1230 logger.info(f"Marked {count} documents for re-download")
1231 return count