Coverage for src / local_deep_research / research_library / routes / rag_routes.py: 27%
1044 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2RAG Management API Routes
4Provides endpoints for managing RAG indexing of library documents:
5- Configure embedding models
6- Index documents
7- Get RAG statistics
8- Bulk operations with progress tracking
9"""
11from flask import (
12 Blueprint,
13 jsonify,
14 request,
15 Response,
16 render_template,
17 session,
18 stream_with_context,
19)
20from loguru import logger
21import atexit
22import glob
23import json
24import uuid
25import time
26import threading
27import queue
28from concurrent.futures import ThreadPoolExecutor
29from datetime import datetime, UTC
30from pathlib import Path
31from typing import Optional
33from ...web.auth.decorators import login_required
34from ...utilities.db_utils import get_settings_manager
35from ..services.library_rag_service import LibraryRAGService
36from ...settings.manager import SettingsManager
37from ...security.path_validator import PathValidator
38from ...security import upload_rate_limit
39from ..utils import handle_api_error
40from ...database.models.library import (
41 Document,
42 Collection,
43 DocumentCollection,
44 RAGIndex,
45 SourceType,
46 EmbeddingProvider,
47)
48from ...database.models.queue import TaskMetadata
49from ...web.utils.rate_limiter import limiter
50from ...config.paths import get_library_directory
52rag_bp = Blueprint("rag", __name__, url_prefix="/library")
54# Global ThreadPoolExecutor for auto-indexing to prevent thread proliferation
55_auto_index_executor: ThreadPoolExecutor | None = None
56_auto_index_executor_lock = threading.Lock()
59def _get_auto_index_executor() -> ThreadPoolExecutor:
60 """Get or create the global auto-indexing executor (thread-safe)."""
61 global _auto_index_executor
62 with _auto_index_executor_lock:
63 if _auto_index_executor is None:
64 _auto_index_executor = ThreadPoolExecutor(
65 max_workers=4,
66 thread_name_prefix="auto_index_",
67 )
68 return _auto_index_executor
71def _shutdown_auto_index_executor() -> None:
72 """Shutdown the auto-index executor gracefully."""
73 global _auto_index_executor
74 if _auto_index_executor is not None:
75 _auto_index_executor.shutdown(wait=True)
76 _auto_index_executor = None
79atexit.register(_shutdown_auto_index_executor)
82def get_rag_service(
83 collection_id: Optional[str] = None,
84 use_defaults: bool = False,
85) -> LibraryRAGService:
86 """
87 Get RAG service instance with appropriate settings.
89 If collection_id is provided:
90 - Uses collection's stored settings if they exist (unless use_defaults=True)
91 - Uses current defaults for new collections (and stores them)
93 If no collection_id:
94 - Uses current default settings
96 Args:
97 use_defaults: When True, ignore stored collection settings and use
98 current defaults. Pass True on force-reindex so that the new
99 default embedding model is picked up.
100 """
101 from ...database.session_context import get_user_db_session
103 settings = get_settings_manager()
104 username = session["username"]
106 # Get current default settings
107 default_embedding_model = settings.get_setting(
108 "local_search_embedding_model", "all-MiniLM-L6-v2"
109 )
110 default_embedding_provider = settings.get_setting(
111 "local_search_embedding_provider", "sentence_transformers"
112 )
113 default_chunk_size = int(
114 settings.get_setting("local_search_chunk_size", 1000)
115 )
116 default_chunk_overlap = int(
117 settings.get_setting("local_search_chunk_overlap", 200)
118 )
120 # Get new advanced configuration settings (Issue #1054)
121 import json
123 default_splitter_type = settings.get_setting(
124 "local_search_splitter_type", "recursive"
125 )
126 default_text_separators = settings.get_setting(
127 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]'
128 )
129 # Parse JSON string to list
130 if isinstance(default_text_separators, str): 130 ↛ 139line 130 didn't jump to line 139 because the condition on line 130 was always true
131 try:
132 default_text_separators = json.loads(default_text_separators)
133 except json.JSONDecodeError:
134 logger.warning(
135 f"Invalid JSON for local_search_text_separators setting: {default_text_separators!r}. "
136 "Using default separators."
137 )
138 default_text_separators = ["\n\n", "\n", ". ", " ", ""]
139 default_distance_metric = settings.get_setting(
140 "local_search_distance_metric", "cosine"
141 )
142 # Get normalize_vectors as a proper boolean
143 default_normalize_vectors = settings.get_bool_setting(
144 "local_search_normalize_vectors", True
145 )
146 default_index_type = settings.get_setting("local_search_index_type", "flat")
148 # If collection_id provided, check for stored settings
149 if collection_id:
150 with get_user_db_session(username) as db_session:
151 collection = (
152 db_session.query(Collection).filter_by(id=collection_id).first()
153 )
155 if collection and collection.embedding_model and not use_defaults:
156 # Use collection's stored settings
157 logger.info(
158 f"Using stored settings for collection {collection_id}: "
159 f"{collection.embedding_model_type.value}/{collection.embedding_model}"
160 )
161 # Handle normalize_vectors - may be stored as string in some cases
162 coll_normalize = collection.normalize_vectors
163 if coll_normalize is not None: 163 ↛ 173line 163 didn't jump to line 173 because the condition on line 163 was always true
164 if isinstance(coll_normalize, str):
165 coll_normalize = coll_normalize.lower() in (
166 "true",
167 "1",
168 "yes",
169 )
170 else:
171 coll_normalize = bool(coll_normalize)
172 else:
173 coll_normalize = default_normalize_vectors
175 return LibraryRAGService(
176 username=username,
177 embedding_model=collection.embedding_model,
178 embedding_provider=collection.embedding_model_type.value,
179 chunk_size=collection.chunk_size or default_chunk_size,
180 chunk_overlap=collection.chunk_overlap
181 or default_chunk_overlap,
182 splitter_type=collection.splitter_type
183 or default_splitter_type,
184 text_separators=collection.text_separators
185 or default_text_separators,
186 distance_metric=collection.distance_metric
187 or default_distance_metric,
188 normalize_vectors=coll_normalize,
189 index_type=collection.index_type or default_index_type,
190 )
191 elif collection: 191 ↛ 218line 191 didn't jump to line 218
192 # New collection - use defaults and store them
193 logger.info(
194 f"New collection {collection_id}, using and storing default settings"
195 )
197 # Create service with defaults
198 service = LibraryRAGService(
199 username=username,
200 embedding_model=default_embedding_model,
201 embedding_provider=default_embedding_provider,
202 chunk_size=default_chunk_size,
203 chunk_overlap=default_chunk_overlap,
204 splitter_type=default_splitter_type,
205 text_separators=default_text_separators,
206 distance_metric=default_distance_metric,
207 normalize_vectors=default_normalize_vectors,
208 index_type=default_index_type,
209 )
211 # Store settings on collection (will be done during indexing)
212 # Note: We don't store here because we don't have embedding_dimension yet
213 # It will be stored in index_collection when first document is indexed
215 return service
217 # No collection or fallback - use current defaults
218 return LibraryRAGService(
219 username=username,
220 embedding_model=default_embedding_model,
221 embedding_provider=default_embedding_provider,
222 chunk_size=default_chunk_size,
223 chunk_overlap=default_chunk_overlap,
224 splitter_type=default_splitter_type,
225 text_separators=default_text_separators,
226 distance_metric=default_distance_metric,
227 normalize_vectors=default_normalize_vectors,
228 index_type=default_index_type,
229 )
232# Config API Routes
235@rag_bp.route("/api/config/supported-formats", methods=["GET"])
236@login_required
237def get_supported_formats():
238 """Return list of supported file formats for upload.
240 This endpoint provides the single source of truth for supported file
241 extensions, pulling from the document_loaders registry. The UI can
242 use this to dynamically update the file input accept attribute.
243 """
244 from ...document_loaders import get_supported_extensions
246 extensions = get_supported_extensions()
247 # Sort extensions for consistent display
248 extensions = sorted(extensions)
250 return jsonify(
251 {
252 "extensions": extensions,
253 "accept_string": ",".join(extensions),
254 "count": len(extensions),
255 }
256 )
259# Page Routes
262@rag_bp.route("/embedding-settings")
263@login_required
264def embedding_settings_page():
265 """Render the Embedding Settings page."""
266 return render_template(
267 "pages/embedding_settings.html", active_page="embedding-settings"
268 )
271@rag_bp.route("/document/<string:document_id>/chunks")
272@login_required
273def view_document_chunks(document_id):
274 """View all chunks for a document across all collections."""
275 from ...database.session_context import get_user_db_session
276 from ...database.models.library import DocumentChunk
278 username = session.get("username")
280 with get_user_db_session(username) as db_session:
281 # Get document info
282 document = db_session.query(Document).filter_by(id=document_id).first()
284 if not document:
285 return "Document not found", 404
287 # Get all chunks for this document
288 chunks = (
289 db_session.query(DocumentChunk)
290 .filter(DocumentChunk.source_id == document_id)
291 .order_by(DocumentChunk.collection_name, DocumentChunk.chunk_index)
292 .all()
293 )
295 # Group chunks by collection
296 chunks_by_collection = {}
297 for chunk in chunks:
298 coll_name = chunk.collection_name
299 if coll_name not in chunks_by_collection:
300 # Get collection display name
301 collection_id = coll_name.replace("collection_", "")
302 collection = (
303 db_session.query(Collection)
304 .filter_by(id=collection_id)
305 .first()
306 )
307 chunks_by_collection[coll_name] = {
308 "name": collection.name if collection else coll_name,
309 "id": collection_id,
310 "chunks": [],
311 }
313 chunks_by_collection[coll_name]["chunks"].append(
314 {
315 "id": chunk.id,
316 "index": chunk.chunk_index,
317 "text": chunk.chunk_text,
318 "word_count": chunk.word_count,
319 "start_char": chunk.start_char,
320 "end_char": chunk.end_char,
321 "embedding_model": chunk.embedding_model,
322 "embedding_model_type": chunk.embedding_model_type.value
323 if chunk.embedding_model_type
324 else None,
325 "embedding_dimension": chunk.embedding_dimension,
326 "created_at": chunk.created_at,
327 }
328 )
330 return render_template(
331 "pages/document_chunks.html",
332 document=document,
333 chunks_by_collection=chunks_by_collection,
334 total_chunks=len(chunks),
335 )
338@rag_bp.route("/collections")
339@login_required
340def collections_page():
341 """Render the Collections page."""
342 return render_template("pages/collections.html", active_page="collections")
345@rag_bp.route("/collections/<string:collection_id>")
346@login_required
347def collection_details_page(collection_id):
348 """Render the Collection Details page."""
349 return render_template(
350 "pages/collection_details.html",
351 active_page="collections",
352 collection_id=collection_id,
353 )
356@rag_bp.route("/collections/<string:collection_id>/upload")
357@login_required
358def collection_upload_page(collection_id):
359 """Render the Collection Upload page."""
360 # Get the upload PDF storage setting
361 settings = get_settings_manager()
362 upload_pdf_storage = settings.get_setting(
363 "research_library.upload_pdf_storage", "none"
364 )
365 # Only allow valid values for uploads (no filesystem)
366 if upload_pdf_storage not in ("database", "none"):
367 upload_pdf_storage = "none"
369 return render_template(
370 "pages/collection_upload.html",
371 active_page="collections",
372 collection_id=collection_id,
373 collection_name=None, # Could fetch from DB if needed
374 upload_pdf_storage=upload_pdf_storage,
375 )
378@rag_bp.route("/collections/create")
379@login_required
380def collection_create_page():
381 """Render the Create Collection page."""
382 return render_template(
383 "pages/collection_create.html", active_page="collections"
384 )
387# API Routes
390@rag_bp.route("/api/rag/settings", methods=["GET"])
391@login_required
392def get_current_settings():
393 """Get current RAG configuration from settings."""
394 import json as json_lib
396 try:
397 settings = get_settings_manager()
399 # Get text separators and parse if needed
400 text_separators = settings.get_setting(
401 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]'
402 )
403 if isinstance(text_separators, str):
404 try:
405 text_separators = json_lib.loads(text_separators)
406 except json_lib.JSONDecodeError:
407 logger.warning(
408 f"Invalid JSON for local_search_text_separators setting: {text_separators!r}. "
409 "Using default separators."
410 )
411 text_separators = ["\n\n", "\n", ". ", " ", ""]
413 normalize_vectors = settings.get_setting(
414 "local_search_normalize_vectors", True
415 )
417 return jsonify(
418 {
419 "success": True,
420 "settings": {
421 "embedding_provider": settings.get_setting(
422 "local_search_embedding_provider",
423 "sentence_transformers",
424 ),
425 "embedding_model": settings.get_setting(
426 "local_search_embedding_model", "all-MiniLM-L6-v2"
427 ),
428 "chunk_size": settings.get_setting(
429 "local_search_chunk_size", 1000
430 ),
431 "chunk_overlap": settings.get_setting(
432 "local_search_chunk_overlap", 200
433 ),
434 "splitter_type": settings.get_setting(
435 "local_search_splitter_type", "recursive"
436 ),
437 "text_separators": text_separators,
438 "distance_metric": settings.get_setting(
439 "local_search_distance_metric", "cosine"
440 ),
441 "normalize_vectors": normalize_vectors,
442 "index_type": settings.get_setting(
443 "local_search_index_type", "flat"
444 ),
445 },
446 }
447 )
448 except Exception as e:
449 return handle_api_error("getting RAG settings", e)
452@rag_bp.route("/api/rag/test-embedding", methods=["POST"])
453@login_required
454def test_embedding():
455 """Test an embedding configuration by generating a test embedding."""
457 try:
458 data = request.json
459 provider = data.get("provider")
460 model = data.get("model")
461 test_text = data.get("test_text", "This is a test.")
463 if not provider or not model:
464 return jsonify(
465 {"success": False, "error": "Provider and model are required"}
466 ), 400
468 # Import embedding functions
469 from ...embeddings.embeddings_config import (
470 get_embedding_function,
471 )
473 logger.info(
474 f"Testing embedding with provider={provider}, model={model}"
475 )
477 # Get user's settings so provider URLs (e.g. Ollama) are resolved correctly
478 settings = get_settings_manager()
479 settings_snapshot = (
480 settings.get_all_settings()
481 if hasattr(settings, "get_all_settings")
482 else {}
483 )
485 # Get embedding function with the specified configuration
486 start_time = time.time()
487 embedding_func = get_embedding_function(
488 provider=provider,
489 model_name=model,
490 settings_snapshot=settings_snapshot,
491 )
493 # Generate test embedding
494 embedding = embedding_func([test_text])[0]
495 response_time_ms = int((time.time() - start_time) * 1000)
497 # Get embedding dimension
498 dimension = len(embedding) if hasattr(embedding, "__len__") else None
500 return jsonify(
501 {
502 "success": True,
503 "dimension": dimension,
504 "response_time_ms": response_time_ms,
505 "provider": provider,
506 "model": model,
507 }
508 )
510 except Exception as e:
511 logger.exception("Error during testing embedding")
512 error_str = str(e).lower()
514 # Detect common signs that an LLM was selected instead of an embedding model
515 llm_hints = [
516 "does not support",
517 "not an embedding",
518 "generate embedding",
519 "invalid model",
520 "not found",
521 "expected float",
522 "could not convert",
523 "list index out of range",
524 "object is not subscriptable",
525 "not iterable",
526 "json",
527 "chat",
528 "completion",
529 ]
530 is_likely_llm = any(hint in error_str for hint in llm_hints)
532 if is_likely_llm:
533 user_message = (
534 f"Embedding test failed for model '{model}'. "
535 "This is most likely because an LLM (language model) was selected "
536 "instead of an embedding model. Please choose a dedicated embedding "
537 "model (e.g. nomic-embed-text, mxbai-embed-large, "
538 "all-MiniLM-L6-v2)."
539 )
540 else:
541 user_message = (
542 f"Embedding test failed for model '{model}'. "
543 "If you are unsure whether the selected model supports embeddings, "
544 "try a dedicated embedding model instead (e.g. nomic-embed-text, "
545 "mxbai-embed-large, all-MiniLM-L6-v2)."
546 )
548 return jsonify({"success": False, "error": user_message}), 500
551@rag_bp.route("/api/rag/models", methods=["GET"])
552@login_required
553def get_available_models():
554 """Get list of available embedding providers and models."""
555 try:
556 from ...embeddings.embeddings_config import _get_provider_classes
558 # Get current settings for providers
559 settings = get_settings_manager()
560 settings_snapshot = (
561 settings.get_all_settings()
562 if hasattr(settings, "get_all_settings")
563 else {}
564 )
566 # Get provider classes
567 provider_classes = _get_provider_classes()
569 # Provider display names
570 provider_labels = {
571 "sentence_transformers": "Sentence Transformers (Local)",
572 "ollama": "Ollama (Local)",
573 "openai": "OpenAI API",
574 }
576 # Get provider options and models by looping through providers
577 provider_options = []
578 providers = {}
580 for provider_key, provider_class in provider_classes.items():
581 available = provider_class.is_available(settings_snapshot)
583 # Always show the provider in the dropdown so users can
584 # configure its settings (e.g. fix a wrong Ollama URL).
585 provider_options.append(
586 {
587 "value": provider_key,
588 "label": provider_labels.get(provider_key, provider_key),
589 "available": available,
590 }
591 )
593 # Only fetch models when the provider is reachable.
594 if available:
595 models = provider_class.get_available_models(settings_snapshot)
596 providers[provider_key] = [
597 {
598 "value": m["value"],
599 "label": m["label"],
600 "provider": provider_key,
601 **(
602 {"is_embedding": m["is_embedding"]}
603 if "is_embedding" in m
604 else {}
605 ),
606 }
607 for m in models
608 ]
609 else:
610 providers[provider_key] = []
612 return jsonify(
613 {
614 "success": True,
615 "provider_options": provider_options,
616 "providers": providers,
617 }
618 )
619 except Exception as e:
620 return handle_api_error("getting available models", e)
623@rag_bp.route("/api/rag/info", methods=["GET"])
624@login_required
625def get_index_info():
626 """Get information about the current RAG index."""
627 from ...database.library_init import get_default_library_id
629 try:
630 # Get collection_id from request or use default Library collection
631 collection_id = request.args.get("collection_id")
632 if not collection_id:
633 collection_id = get_default_library_id(session["username"])
635 logger.info(
636 f"Getting RAG index info for collection_id: {collection_id}"
637 )
639 rag_service = get_rag_service(collection_id)
640 info = rag_service.get_current_index_info(collection_id)
642 if info is None:
643 logger.info(
644 f"No RAG index found for collection_id: {collection_id}"
645 )
646 return jsonify(
647 {"success": True, "info": None, "message": "No index found"}
648 )
650 logger.info(f"Found RAG index for collection_id: {collection_id}")
651 return jsonify({"success": True, "info": info})
652 except Exception as e:
653 return handle_api_error("getting index info", e)
656@rag_bp.route("/api/rag/stats", methods=["GET"])
657@login_required
658def get_rag_stats():
659 """Get RAG statistics for a collection."""
660 from ...database.library_init import get_default_library_id
662 try:
663 # Get collection_id from request or use default Library collection
664 collection_id = request.args.get("collection_id")
665 if not collection_id:
666 collection_id = get_default_library_id(session["username"])
668 rag_service = get_rag_service(collection_id)
669 stats = rag_service.get_rag_stats(collection_id)
671 return jsonify({"success": True, "stats": stats})
672 except Exception as e:
673 return handle_api_error("getting RAG stats", e)
676@rag_bp.route("/api/rag/index-document", methods=["POST"])
677@login_required
678def index_document():
679 """Index a single document in a collection."""
680 from ...database.library_init import get_default_library_id
682 try:
683 data = request.get_json()
684 text_doc_id = data.get("text_doc_id")
685 force_reindex = data.get("force_reindex", False)
686 collection_id = data.get("collection_id")
688 if not text_doc_id:
689 return jsonify(
690 {"success": False, "error": "text_doc_id is required"}
691 ), 400
693 # Get collection_id from request or use default Library collection
694 if not collection_id:
695 collection_id = get_default_library_id(session["username"])
697 rag_service = get_rag_service(collection_id)
698 result = rag_service.index_document(
699 text_doc_id, collection_id, force_reindex
700 )
702 if result["status"] == "error":
703 return jsonify(
704 {"success": False, "error": result.get("error")}
705 ), 400
707 return jsonify({"success": True, "result": result})
708 except Exception as e:
709 return handle_api_error(f"indexing document {text_doc_id}", e)
712@rag_bp.route("/api/rag/remove-document", methods=["POST"])
713@login_required
714def remove_document():
715 """Remove a document from RAG in a collection."""
716 from ...database.library_init import get_default_library_id
718 try:
719 data = request.get_json()
720 text_doc_id = data.get("text_doc_id")
721 collection_id = data.get("collection_id")
723 if not text_doc_id:
724 return jsonify(
725 {"success": False, "error": "text_doc_id is required"}
726 ), 400
728 # Get collection_id from request or use default Library collection
729 if not collection_id:
730 collection_id = get_default_library_id(session["username"])
732 rag_service = get_rag_service(collection_id)
733 result = rag_service.remove_document_from_rag(
734 text_doc_id, collection_id
735 )
737 if result["status"] == "error":
738 return jsonify(
739 {"success": False, "error": result.get("error")}
740 ), 400
742 return jsonify({"success": True, "result": result})
743 except Exception as e:
744 return handle_api_error(f"removing document {text_doc_id}", e)
747@rag_bp.route("/api/rag/index-research", methods=["POST"])
748@login_required
749def index_research():
750 """Index all documents from a research."""
751 try:
752 data = request.get_json()
753 research_id = data.get("research_id")
754 force_reindex = data.get("force_reindex", False)
756 if not research_id:
757 return jsonify(
758 {"success": False, "error": "research_id is required"}
759 ), 400
761 rag_service = get_rag_service()
762 results = rag_service.index_research_documents(
763 research_id, force_reindex
764 )
766 return jsonify({"success": True, "results": results})
767 except Exception as e:
768 return handle_api_error(f"indexing research {research_id}", e)
771@rag_bp.route("/api/rag/index-all", methods=["GET"])
772@login_required
773def index_all():
774 """Index all documents in a collection with Server-Sent Events progress."""
775 from ...database.session_context import get_user_db_session
776 from ...database.library_init import get_default_library_id
778 force_reindex = request.args.get("force_reindex", "false").lower() == "true"
779 username = session["username"]
781 # Get collection_id from request or use default Library collection
782 collection_id = request.args.get("collection_id")
783 if not collection_id:
784 collection_id = get_default_library_id(username)
786 logger.info(
787 f"Starting index-all for collection_id: {collection_id}, force_reindex: {force_reindex}"
788 )
790 # Create RAG service in request context before generator runs
791 rag_service = get_rag_service(collection_id)
793 def generate():
794 """Generator function for SSE progress updates."""
795 try:
796 # Send initial status
797 yield f"data: {json.dumps({'type': 'start', 'message': 'Starting bulk indexing...'})}\n\n"
799 # Get document IDs to index from DocumentCollection
800 with get_user_db_session(username) as db_session:
801 # Query Document joined with DocumentCollection for the collection
802 query = (
803 db_session.query(Document.id, Document.title)
804 .join(
805 DocumentCollection,
806 Document.id == DocumentCollection.document_id,
807 )
808 .filter(DocumentCollection.collection_id == collection_id)
809 )
811 if not force_reindex:
812 # Only index documents that haven't been indexed yet
813 query = query.filter(DocumentCollection.indexed.is_(False))
815 doc_info = [(doc_id, title) for doc_id, title in query.all()]
817 if not doc_info:
818 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n"
819 return
821 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []}
822 total = len(doc_info)
824 # Process documents in batches to optimize performance
825 # Get batch size from settings
826 settings = get_settings_manager()
827 batch_size = int(
828 settings.get_setting("rag.indexing_batch_size", 15)
829 )
830 processed = 0
832 for i in range(0, len(doc_info), batch_size):
833 batch = doc_info[i : i + batch_size]
835 # Process batch with collection_id
836 batch_results = rag_service.index_documents_batch(
837 batch, collection_id, force_reindex
838 )
840 # Process results and send progress updates
841 for j, (doc_id, title) in enumerate(batch):
842 processed += 1
843 result = batch_results[doc_id]
845 # Send progress update
846 yield f"data: {json.dumps({'type': 'progress', 'current': processed, 'total': total, 'title': title, 'percent': int((processed / total) * 100)})}\n\n"
848 if result["status"] == "success":
849 results["successful"] += 1
850 elif result["status"] == "skipped":
851 results["skipped"] += 1
852 else:
853 results["failed"] += 1
854 results["errors"].append(
855 {
856 "doc_id": doc_id,
857 "title": title,
858 "error": result.get("error"),
859 }
860 )
862 # Send completion status
863 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n"
865 # Log final status for debugging
866 logger.info(
867 f"Bulk indexing complete: {results['successful']} successful, {results['skipped']} skipped, {results['failed']} failed"
868 )
870 except Exception:
871 logger.exception("Error in bulk indexing")
872 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n"
874 return Response(
875 stream_with_context(generate()), mimetype="text/event-stream"
876 )
879@rag_bp.route("/api/rag/configure", methods=["POST"])
880@login_required
881def configure_rag():
882 """
883 Change RAG configuration (embedding model, chunk size, etc.).
884 This will create a new index with the new configuration.
885 """
886 import json as json_lib
888 try:
889 data = request.get_json()
890 embedding_model = data.get("embedding_model")
891 embedding_provider = data.get("embedding_provider")
892 chunk_size = data.get("chunk_size")
893 chunk_overlap = data.get("chunk_overlap")
894 collection_id = data.get("collection_id")
896 # Get new advanced settings (with defaults)
897 splitter_type = data.get("splitter_type", "recursive")
898 text_separators = data.get(
899 "text_separators", ["\n\n", "\n", ". ", " ", ""]
900 )
901 distance_metric = data.get("distance_metric", "cosine")
902 normalize_vectors = data.get("normalize_vectors", True)
903 index_type = data.get("index_type", "flat")
905 if not all(
906 [
907 embedding_model,
908 embedding_provider,
909 chunk_size,
910 chunk_overlap,
911 ]
912 ):
913 return jsonify(
914 {
915 "success": False,
916 "error": "All configuration parameters are required (embedding_model, embedding_provider, chunk_size, chunk_overlap)",
917 }
918 ), 400
920 # Save settings to database
921 settings = get_settings_manager()
922 settings.set_setting("local_search_embedding_model", embedding_model)
923 settings.set_setting(
924 "local_search_embedding_provider", embedding_provider
925 )
926 settings.set_setting("local_search_chunk_size", int(chunk_size))
927 settings.set_setting("local_search_chunk_overlap", int(chunk_overlap))
929 # Save new advanced settings
930 settings.set_setting("local_search_splitter_type", splitter_type)
931 # Convert list to JSON string for storage
932 if isinstance(text_separators, list):
933 text_separators_str = json_lib.dumps(text_separators)
934 else:
935 text_separators_str = text_separators
936 settings.set_setting(
937 "local_search_text_separators", text_separators_str
938 )
939 settings.set_setting("local_search_distance_metric", distance_metric)
940 settings.set_setting(
941 "local_search_normalize_vectors", bool(normalize_vectors)
942 )
943 settings.set_setting("local_search_index_type", index_type)
945 # If collection_id is provided, update that collection's configuration
946 if collection_id:
947 # Create new RAG service with new configuration
948 with LibraryRAGService(
949 username=session["username"],
950 embedding_model=embedding_model,
951 embedding_provider=embedding_provider,
952 chunk_size=int(chunk_size),
953 chunk_overlap=int(chunk_overlap),
954 splitter_type=splitter_type,
955 text_separators=text_separators
956 if isinstance(text_separators, list)
957 else json_lib.loads(text_separators),
958 distance_metric=distance_metric,
959 normalize_vectors=normalize_vectors,
960 index_type=index_type,
961 ) as new_rag_service:
962 # Get or create new index with this configuration
963 rag_index = new_rag_service._get_or_create_rag_index(
964 collection_id
965 )
967 return jsonify(
968 {
969 "success": True,
970 "message": "Configuration updated for collection. You can now index documents with the new settings.",
971 "index_hash": rag_index.index_hash,
972 }
973 )
974 else:
975 # Just saving default settings without updating a specific collection
976 return jsonify(
977 {
978 "success": True,
979 "message": "Default embedding settings saved successfully. New collections will use these settings.",
980 }
981 )
983 except Exception as e:
984 return handle_api_error("configuring RAG", e)
987@rag_bp.route("/api/rag/documents", methods=["GET"])
988@login_required
989def get_documents():
990 """Get library documents with their RAG status for the default Library collection (paginated)."""
991 from ...database.session_context import get_user_db_session
992 from ...database.library_init import get_default_library_id
994 try:
995 # Get pagination parameters
996 page = request.args.get("page", 1, type=int)
997 per_page = request.args.get("per_page", 50, type=int)
998 filter_type = request.args.get(
999 "filter", "all"
1000 ) # all, indexed, unindexed
1002 # Validate pagination parameters
1003 page = max(1, page)
1004 per_page = min(max(10, per_page), 100) # Limit between 10-100
1006 # Close current thread's session to force fresh connection
1007 from ...database.thread_local_session import cleanup_current_thread
1009 cleanup_current_thread()
1011 username = session["username"]
1013 # Get collection_id from request or use default Library collection
1014 collection_id = request.args.get("collection_id")
1015 if not collection_id:
1016 collection_id = get_default_library_id(username)
1018 logger.info(
1019 f"Getting documents for collection_id: {collection_id}, filter: {filter_type}, page: {page}"
1020 )
1022 with get_user_db_session(username) as db_session:
1023 # Expire all cached objects to ensure we get fresh data from DB
1024 db_session.expire_all()
1026 # Import RagDocumentStatus model
1027 from ...database.models.library import RagDocumentStatus
1029 # Build base query - join Document with DocumentCollection for the collection
1030 # LEFT JOIN with rag_document_status to check indexed status
1031 query = (
1032 db_session.query(
1033 Document, DocumentCollection, RagDocumentStatus
1034 )
1035 .join(
1036 DocumentCollection,
1037 (DocumentCollection.document_id == Document.id)
1038 & (DocumentCollection.collection_id == collection_id),
1039 )
1040 .outerjoin(
1041 RagDocumentStatus,
1042 (RagDocumentStatus.document_id == Document.id)
1043 & (RagDocumentStatus.collection_id == collection_id),
1044 )
1045 )
1047 logger.debug(f"Base query for collection {collection_id}: {query}")
1049 # Apply filters based on rag_document_status existence
1050 if filter_type == "indexed":
1051 query = query.filter(RagDocumentStatus.document_id.isnot(None))
1052 elif filter_type == "unindexed":
1053 # Documents in collection but not indexed yet
1054 query = query.filter(RagDocumentStatus.document_id.is_(None))
1056 # Get total count before pagination
1057 total_count = query.count()
1058 logger.info(
1059 f"Found {total_count} total documents for collection {collection_id} with filter {filter_type}"
1060 )
1062 # Apply pagination
1063 results = (
1064 query.order_by(Document.created_at.desc())
1065 .limit(per_page)
1066 .offset((page - 1) * per_page)
1067 .all()
1068 )
1070 documents = [
1071 {
1072 "id": doc.id,
1073 "title": doc.title,
1074 "original_url": doc.original_url,
1075 "rag_indexed": rag_status is not None,
1076 "chunk_count": rag_status.chunk_count if rag_status else 0,
1077 "created_at": doc.created_at.isoformat()
1078 if doc.created_at
1079 else None,
1080 }
1081 for doc, doc_collection, rag_status in results
1082 ]
1084 # Debug logging to help diagnose indexing status issues
1085 indexed_count = sum(1 for d in documents if d["rag_indexed"])
1087 # Additional debug: check rag_document_status for this collection
1088 all_indexed_statuses = (
1089 db_session.query(RagDocumentStatus)
1090 .filter_by(collection_id=collection_id)
1091 .all()
1092 )
1093 logger.info(
1094 f"rag_document_status table shows: {len(all_indexed_statuses)} documents indexed for collection {collection_id}"
1095 )
1097 logger.info(
1098 f"Returning {len(documents)} documents on page {page}: "
1099 f"{indexed_count} indexed, {len(documents) - indexed_count} not indexed"
1100 )
1102 return jsonify(
1103 {
1104 "success": True,
1105 "documents": documents,
1106 "pagination": {
1107 "page": page,
1108 "per_page": per_page,
1109 "total": total_count,
1110 "pages": (total_count + per_page - 1) // per_page,
1111 },
1112 }
1113 )
1114 except Exception as e:
1115 return handle_api_error("getting documents", e)
1118@rag_bp.route("/api/rag/index-local", methods=["GET"])
1119@login_required
1120def index_local_library():
1121 """Index documents from a local folder with Server-Sent Events progress."""
1122 folder_path = request.args.get("path")
1123 file_patterns = request.args.get(
1124 "patterns", "*.pdf,*.txt,*.md,*.html"
1125 ).split(",")
1126 recursive = request.args.get("recursive", "true").lower() == "true"
1128 if not folder_path:
1129 return jsonify({"success": False, "error": "Path is required"}), 400
1131 # Validate and sanitize the path to prevent traversal attacks
1132 try:
1133 validated_path = PathValidator.validate_local_filesystem_path(
1134 folder_path
1135 )
1136 # Re-sanitize for static analyzer recognition (CodeQL)
1137 path = PathValidator.sanitize_for_filesystem_ops(validated_path)
1138 except ValueError as e:
1139 logger.warning(f"Path validation failed for '{folder_path}': {e}")
1140 return jsonify({"success": False, "error": "Invalid path"}), 400
1142 # Check path exists and is a directory
1143 if not path.exists():
1144 return jsonify({"success": False, "error": "Path does not exist"}), 400
1145 if not path.is_dir():
1146 return jsonify(
1147 {"success": False, "error": "Path is not a directory"}
1148 ), 400
1150 # Create RAG service in request context
1151 rag_service = get_rag_service()
1153 def generate():
1154 """Generator function for SSE progress updates."""
1155 try:
1156 # Send initial status
1157 yield f"data: {json.dumps({'type': 'start', 'message': f'Scanning folder: {path}'})}\n\n"
1159 # Find all matching files
1160 files_to_index = []
1161 for pattern in file_patterns:
1162 pattern = pattern.strip()
1163 if recursive:
1164 search_pattern = str(path / "**" / pattern)
1165 else:
1166 search_pattern = str(path / pattern)
1168 matching_files = glob.glob(search_pattern, recursive=recursive)
1169 files_to_index.extend(matching_files)
1171 # Remove duplicates and sort
1172 files_to_index = sorted(set(files_to_index))
1174 if not files_to_index:
1175 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No matching files found'}})}\n\n"
1176 return
1178 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []}
1179 total = len(files_to_index)
1181 # Index each file
1182 for idx, file_path in enumerate(files_to_index, 1):
1183 file_name = Path(file_path).name
1185 # Send progress update
1186 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': file_name, 'percent': int((idx / total) * 100)})}\n\n"
1188 try:
1189 # Index the file directly using RAG service
1190 result = rag_service.index_local_file(file_path)
1192 if result.get("status") == "success":
1193 results["successful"] += 1
1194 elif result.get("status") == "skipped":
1195 results["skipped"] += 1
1196 else:
1197 results["failed"] += 1
1198 results["errors"].append(
1199 {
1200 "file": file_name,
1201 "error": result.get("error", "Unknown error"),
1202 }
1203 )
1204 except Exception:
1205 results["failed"] += 1
1206 results["errors"].append(
1207 {"file": file_name, "error": "Failed to index file"}
1208 )
1209 logger.exception(f"Error indexing file {file_path}")
1211 # Send completion status
1212 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n"
1214 logger.info(
1215 f"Local library indexing complete for {path}: "
1216 f"{results['successful']} successful, "
1217 f"{results['skipped']} skipped, "
1218 f"{results['failed']} failed"
1219 )
1221 except Exception:
1222 logger.exception("Error in local library indexing")
1223 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n"
1225 return Response(
1226 stream_with_context(generate()), mimetype="text/event-stream"
1227 )
1230# Collection Management Routes
1233@rag_bp.route("/api/collections", methods=["GET"])
1234@login_required
1235def get_collections():
1236 """Get all document collections for the current user."""
1237 from ...database.session_context import get_user_db_session
1239 try:
1240 username = session["username"]
1241 with get_user_db_session(username) as db_session:
1242 # No need to filter by username - each user has their own database
1243 collections = db_session.query(Collection).all()
1245 result = []
1246 for coll in collections:
1247 collection_data = {
1248 "id": coll.id,
1249 "name": coll.name,
1250 "description": coll.description,
1251 "created_at": coll.created_at.isoformat()
1252 if coll.created_at
1253 else None,
1254 "collection_type": coll.collection_type,
1255 "is_default": coll.is_default
1256 if hasattr(coll, "is_default")
1257 else False,
1258 "document_count": len(coll.document_links)
1259 if hasattr(coll, "document_links")
1260 else 0,
1261 "folder_count": len(coll.linked_folders)
1262 if hasattr(coll, "linked_folders")
1263 else 0,
1264 }
1266 # Include embedding metadata if available
1267 if coll.embedding_model: 1267 ↛ 1268line 1267 didn't jump to line 1268 because the condition on line 1267 was never true
1268 collection_data["embedding"] = {
1269 "model": coll.embedding_model,
1270 "provider": coll.embedding_model_type.value
1271 if coll.embedding_model_type
1272 else None,
1273 "dimension": coll.embedding_dimension,
1274 "chunk_size": coll.chunk_size,
1275 "chunk_overlap": coll.chunk_overlap,
1276 }
1277 else:
1278 collection_data["embedding"] = None
1280 result.append(collection_data)
1282 return jsonify({"success": True, "collections": result})
1283 except Exception as e:
1284 return handle_api_error("getting collections", e)
1287@rag_bp.route("/api/collections", methods=["POST"])
1288@login_required
1289def create_collection():
1290 """Create a new document collection."""
1291 from ...database.session_context import get_user_db_session
1293 try:
1294 data = request.get_json()
1295 name = data.get("name", "").strip()
1296 description = data.get("description", "").strip()
1297 collection_type = data.get("type", "user_uploads")
1299 if not name:
1300 return jsonify({"success": False, "error": "Name is required"}), 400
1302 username = session["username"]
1303 with get_user_db_session(username) as db_session:
1304 # Check if collection with this name already exists in this user's database
1305 existing = db_session.query(Collection).filter_by(name=name).first()
1307 if existing:
1308 return jsonify(
1309 {
1310 "success": False,
1311 "error": f"Collection '{name}' already exists",
1312 }
1313 ), 400
1315 # Create new collection (no username needed - each user has their own DB)
1316 # Note: created_at uses default=utcnow() in the model, so we don't need to set it manually
1317 collection = Collection(
1318 id=str(uuid.uuid4()), # Generate UUID for collection
1319 name=name,
1320 description=description,
1321 collection_type=collection_type,
1322 )
1324 db_session.add(collection)
1325 db_session.commit()
1327 return jsonify(
1328 {
1329 "success": True,
1330 "collection": {
1331 "id": collection.id,
1332 "name": collection.name,
1333 "description": collection.description,
1334 "created_at": collection.created_at.isoformat(),
1335 "collection_type": collection.collection_type,
1336 },
1337 }
1338 )
1339 except Exception as e:
1340 return handle_api_error("creating collection", e)
1343@rag_bp.route("/api/collections/<string:collection_id>", methods=["PUT"])
1344@login_required
1345def update_collection(collection_id):
1346 """Update a collection's details."""
1347 from ...database.session_context import get_user_db_session
1349 try:
1350 data = request.get_json()
1351 name = data.get("name", "").strip()
1352 description = data.get("description", "").strip()
1354 username = session["username"]
1355 with get_user_db_session(username) as db_session:
1356 # No need to filter by username - each user has their own database
1357 collection = (
1358 db_session.query(Collection).filter_by(id=collection_id).first()
1359 )
1361 if not collection:
1362 return jsonify(
1363 {"success": False, "error": "Collection not found"}
1364 ), 404
1366 if name:
1367 # Check if new name conflicts with existing collection
1368 existing = (
1369 db_session.query(Collection)
1370 .filter(
1371 Collection.name == name,
1372 Collection.id != collection_id,
1373 )
1374 .first()
1375 )
1377 if existing:
1378 return jsonify(
1379 {
1380 "success": False,
1381 "error": f"Collection '{name}' already exists",
1382 }
1383 ), 400
1385 collection.name = name
1387 if description is not None: # Allow empty description 1387 ↛ 1390line 1387 didn't jump to line 1390 because the condition on line 1387 was always true
1388 collection.description = description
1390 db_session.commit()
1392 return jsonify(
1393 {
1394 "success": True,
1395 "collection": {
1396 "id": collection.id,
1397 "name": collection.name,
1398 "description": collection.description,
1399 "created_at": collection.created_at.isoformat()
1400 if collection.created_at
1401 else None,
1402 "collection_type": collection.collection_type,
1403 },
1404 }
1405 )
1406 except Exception as e:
1407 return handle_api_error("updating collection", e)
1410@rag_bp.route("/api/collections/<string:collection_id>", methods=["DELETE"])
1411@login_required
1412def delete_collection(collection_id):
1413 """Delete a collection and its orphaned documents."""
1414 from ..deletion.services.collection_deletion import (
1415 CollectionDeletionService,
1416 )
1418 try:
1419 username = session["username"]
1420 service = CollectionDeletionService(username)
1421 result = service.delete_collection(
1422 collection_id, delete_orphaned_documents=True
1423 )
1425 if result.get("deleted"):
1426 return jsonify(
1427 {
1428 "success": True,
1429 "message": "Collection deleted successfully",
1430 "deleted_chunks": result.get("chunks_deleted", 0),
1431 "orphaned_documents_deleted": result.get(
1432 "orphaned_documents_deleted", 0
1433 ),
1434 }
1435 )
1436 else:
1437 error = result.get("error", "Unknown error")
1438 status_code = 404 if "not found" in error.lower() else 400
1439 return jsonify({"success": False, "error": error}), status_code
1441 except Exception as e:
1442 return handle_api_error("deleting collection", e)
1445@rag_bp.route(
1446 "/api/collections/<string:collection_id>/upload", methods=["POST"]
1447)
1448@login_required
1449@upload_rate_limit
1450def upload_to_collection(collection_id):
1451 """Upload files to a collection."""
1452 from ...database.session_context import get_user_db_session
1453 from werkzeug.utils import secure_filename
1454 from pathlib import Path
1455 import hashlib
1456 import uuid
1457 from ..services.pdf_storage_manager import PDFStorageManager
1459 try:
1460 if "files" not in request.files:
1461 return jsonify(
1462 {"success": False, "error": "No files provided"}
1463 ), 400
1465 files = request.files.getlist("files")
1466 if not files:
1467 return jsonify(
1468 {"success": False, "error": "No files selected"}
1469 ), 400
1471 username = session["username"]
1472 with get_user_db_session(username) as db_session:
1473 # Verify collection exists in this user's database
1474 collection = (
1475 db_session.query(Collection).filter_by(id=collection_id).first()
1476 )
1478 if not collection:
1479 return jsonify(
1480 {"success": False, "error": "Collection not found"}
1481 ), 404
1483 # Get PDF storage mode from form data, falling back to user's setting
1484 settings = get_settings_manager()
1485 default_pdf_storage = settings.get_setting(
1486 "research_library.upload_pdf_storage", "none"
1487 )
1488 pdf_storage = request.form.get("pdf_storage", default_pdf_storage)
1489 if pdf_storage not in ("database", "none"):
1490 # Security: user uploads can only use database (encrypted) or none (text-only)
1491 # Filesystem storage is not allowed for user uploads
1492 pdf_storage = "none"
1494 # Initialize PDF storage manager if storing PDFs in database
1495 pdf_storage_manager = None
1496 if pdf_storage == "database":
1497 library_root = settings.get_setting(
1498 "research_library.storage_path",
1499 str(get_library_directory()),
1500 )
1501 library_root = str(Path(library_root).expanduser())
1502 pdf_storage_manager = PDFStorageManager(
1503 library_root=Path(library_root), storage_mode="database"
1504 )
1505 logger.info("PDF storage mode: database (encrypted)")
1506 else:
1507 logger.info("PDF storage mode: none (text-only)")
1509 uploaded_files = []
1510 errors = []
1512 for file in files:
1513 if not file.filename:
1514 continue
1516 try:
1517 # Read file content
1518 file_content = file.read()
1519 file.seek(0) # Reset for potential re-reading
1521 # Calculate file hash for deduplication
1522 file_hash = hashlib.sha256(file_content).hexdigest()
1524 # Check if document already exists
1525 existing_doc = (
1526 db_session.query(Document)
1527 .filter_by(document_hash=file_hash)
1528 .first()
1529 )
1531 if existing_doc:
1532 # Document exists, check if we can upgrade to include PDF
1533 pdf_upgraded = False
1534 if (
1535 pdf_storage == "database"
1536 and pdf_storage_manager is not None
1537 ):
1538 pdf_upgraded = pdf_storage_manager.upgrade_to_pdf(
1539 document=existing_doc,
1540 pdf_content=file_content,
1541 session=db_session,
1542 )
1544 # Check if already in collection
1545 existing_link = (
1546 db_session.query(DocumentCollection)
1547 .filter_by(
1548 document_id=existing_doc.id,
1549 collection_id=collection_id,
1550 )
1551 .first()
1552 )
1554 if not existing_link:
1555 # Add to collection
1556 collection_link = DocumentCollection(
1557 document_id=existing_doc.id,
1558 collection_id=collection_id,
1559 indexed=False,
1560 chunk_count=0,
1561 )
1562 db_session.add(collection_link)
1563 status = "added_to_collection"
1564 if pdf_upgraded:
1565 status = "added_to_collection_pdf_upgraded"
1566 uploaded_files.append(
1567 {
1568 "filename": existing_doc.filename,
1569 "status": status,
1570 "id": existing_doc.id,
1571 "pdf_upgraded": pdf_upgraded,
1572 }
1573 )
1574 else:
1575 status = "already_in_collection"
1576 if pdf_upgraded:
1577 status = "pdf_upgraded"
1578 uploaded_files.append(
1579 {
1580 "filename": existing_doc.filename,
1581 "status": status,
1582 "id": existing_doc.id,
1583 "pdf_upgraded": pdf_upgraded,
1584 }
1585 )
1586 else:
1587 # Create new document
1588 from ...document_loaders import (
1589 extract_text_from_bytes,
1590 is_extension_supported,
1591 )
1593 filename = secure_filename(file.filename)
1594 file_extension = Path(filename).suffix.lower()
1596 # Validate extension is supported before extraction
1597 if not is_extension_supported(file_extension):
1598 errors.append(
1599 {
1600 "filename": filename,
1601 "error": f"Unsupported format: {file_extension}",
1602 }
1603 )
1604 continue
1606 # Use file_type without leading dot for storage
1607 file_type = (
1608 file_extension[1:]
1609 if file_extension.startswith(".")
1610 else file_extension
1611 )
1613 # Extract text using document_loaders module
1614 extracted_text = extract_text_from_bytes(
1615 file_content, file_extension, filename
1616 )
1618 # Clean the extracted text to remove surrogate characters
1619 if extracted_text:
1620 from ...text_processing import remove_surrogates
1622 extracted_text = remove_surrogates(extracted_text)
1624 if not extracted_text:
1625 errors.append(
1626 {
1627 "filename": filename,
1628 "error": f"Could not extract text from {file_type} file",
1629 }
1630 )
1631 logger.warning(
1632 f"Skipping file {filename} - no text could be extracted"
1633 )
1634 continue
1636 # Get or create the user_upload source type
1637 logger.info(
1638 f"Getting or creating user_upload source type for {filename}"
1639 )
1640 source_type = (
1641 db_session.query(SourceType)
1642 .filter_by(name="user_upload")
1643 .first()
1644 )
1645 if not source_type:
1646 logger.info("Creating new user_upload source type")
1647 source_type = SourceType(
1648 id=str(uuid.uuid4()),
1649 name="user_upload",
1650 display_name="User Upload",
1651 description="Documents uploaded by users",
1652 icon="fas fa-upload",
1653 )
1654 db_session.add(source_type)
1655 db_session.flush()
1656 logger.info(
1657 f"Created source type with ID: {source_type.id}"
1658 )
1659 else:
1660 logger.info(
1661 f"Found existing source type with ID: {source_type.id}"
1662 )
1664 # Create document with extracted text (no username needed - in user's own database)
1665 # Note: uploaded_at uses default=utcnow() in the model, so we don't need to set it manually
1666 doc_id = str(uuid.uuid4())
1667 logger.info(
1668 f"Creating document {doc_id} for {filename}"
1669 )
1671 # Determine storage mode and file_path
1672 store_pdf_in_db = (
1673 pdf_storage == "database"
1674 and file_type == "pdf"
1675 and pdf_storage_manager is not None
1676 )
1678 new_doc = Document(
1679 id=doc_id,
1680 source_type_id=source_type.id,
1681 filename=filename,
1682 document_hash=file_hash,
1683 file_size=len(file_content),
1684 file_type=file_type,
1685 text_content=extracted_text, # Always store extracted text
1686 file_path=None
1687 if store_pdf_in_db
1688 else "text_only_not_stored",
1689 storage_mode="database"
1690 if store_pdf_in_db
1691 else "none",
1692 )
1693 db_session.add(new_doc)
1694 db_session.flush() # Get the ID
1695 logger.info(
1696 f"Document {new_doc.id} created successfully"
1697 )
1699 # Store PDF in encrypted database if requested
1700 pdf_stored = False
1701 if store_pdf_in_db:
1702 try:
1703 pdf_storage_manager.save_pdf(
1704 pdf_content=file_content,
1705 document=new_doc,
1706 session=db_session,
1707 filename=filename,
1708 )
1709 pdf_stored = True
1710 logger.info(
1711 f"PDF stored in encrypted database for {filename}"
1712 )
1713 except Exception:
1714 logger.exception(
1715 f"Failed to store PDF in database for {filename}"
1716 )
1717 # Continue without PDF storage - text is still saved
1719 # Add to collection
1720 collection_link = DocumentCollection(
1721 document_id=new_doc.id,
1722 collection_id=collection_id,
1723 indexed=False,
1724 chunk_count=0,
1725 )
1726 db_session.add(collection_link)
1728 uploaded_files.append(
1729 {
1730 "filename": filename,
1731 "status": "uploaded",
1732 "id": new_doc.id,
1733 "text_length": len(extracted_text),
1734 "pdf_stored": pdf_stored,
1735 }
1736 )
1738 except Exception:
1739 errors.append(
1740 {
1741 "filename": file.filename,
1742 "error": "Failed to upload file",
1743 }
1744 )
1745 logger.exception(f"Error uploading file {file.filename}")
1747 db_session.commit()
1749 # Trigger auto-indexing for successfully uploaded documents
1750 document_ids = [
1751 f["id"]
1752 for f in uploaded_files
1753 if f.get("status") in ("uploaded", "added_to_collection")
1754 ]
1755 if document_ids:
1756 from ...database.session_passwords import session_password_store
1758 session_id = session.get("session_id")
1759 db_password = session_password_store.get_session_password(
1760 username, session_id
1761 )
1762 if db_password:
1763 trigger_auto_index(
1764 document_ids, collection_id, username, db_password
1765 )
1767 return jsonify(
1768 {
1769 "success": True,
1770 "uploaded": uploaded_files,
1771 "errors": errors,
1772 "summary": {
1773 "total": len(files),
1774 "successful": len(uploaded_files),
1775 "failed": len(errors),
1776 },
1777 }
1778 )
1780 except Exception as e:
1781 return handle_api_error("uploading files", e)
1784@rag_bp.route(
1785 "/api/collections/<string:collection_id>/documents", methods=["GET"]
1786)
1787@login_required
1788def get_collection_documents(collection_id):
1789 """Get all documents in a collection."""
1790 from ...database.session_context import get_user_db_session
1792 try:
1793 username = session["username"]
1794 with get_user_db_session(username) as db_session:
1795 # Verify collection exists in this user's database
1796 collection = (
1797 db_session.query(Collection).filter_by(id=collection_id).first()
1798 )
1800 if not collection:
1801 return jsonify(
1802 {"success": False, "error": "Collection not found"}
1803 ), 404
1805 # Get documents through junction table
1806 doc_links = (
1807 db_session.query(DocumentCollection, Document)
1808 .join(Document)
1809 .filter(DocumentCollection.collection_id == collection_id)
1810 .all()
1811 )
1813 documents = []
1814 for link, doc in doc_links:
1815 # Check if PDF file is stored
1816 has_pdf = bool(
1817 doc.file_path
1818 and doc.file_path != "metadata_only"
1819 and doc.file_path != "text_only_not_stored"
1820 )
1821 has_text_db = bool(doc.text_content)
1823 # Use title if available, otherwise filename
1824 display_title = doc.title or doc.filename or "Untitled"
1826 # Get source type name
1827 source_type_name = (
1828 doc.source_type.name if doc.source_type else "unknown"
1829 )
1831 # Check if document is in other collections
1832 other_collections_count = (
1833 db_session.query(DocumentCollection)
1834 .filter(
1835 DocumentCollection.document_id == doc.id,
1836 DocumentCollection.collection_id != collection_id,
1837 )
1838 .count()
1839 )
1841 documents.append(
1842 {
1843 "id": doc.id,
1844 "filename": display_title,
1845 "title": display_title,
1846 "file_type": doc.file_type,
1847 "file_size": doc.file_size,
1848 "uploaded_at": doc.created_at.isoformat()
1849 if doc.created_at
1850 else None,
1851 "indexed": link.indexed,
1852 "chunk_count": link.chunk_count,
1853 "last_indexed_at": link.last_indexed_at.isoformat()
1854 if link.last_indexed_at
1855 else None,
1856 "has_pdf": has_pdf,
1857 "has_text_db": has_text_db,
1858 "source_type": source_type_name,
1859 "in_other_collections": other_collections_count > 0,
1860 "other_collections_count": other_collections_count,
1861 }
1862 )
1864 # Get index file size if available
1865 index_file_size = None
1866 index_file_size_bytes = None
1867 collection_name = f"collection_{collection_id}"
1868 rag_index = (
1869 db_session.query(RAGIndex)
1870 .filter_by(collection_name=collection_name)
1871 .first()
1872 )
1873 if rag_index and rag_index.index_path:
1874 from pathlib import Path
1876 index_path = Path(rag_index.index_path)
1877 if index_path.exists():
1878 size_bytes = index_path.stat().st_size
1879 index_file_size_bytes = size_bytes
1880 # Format as human-readable
1881 if size_bytes < 1024:
1882 index_file_size = f"{size_bytes} B"
1883 elif size_bytes < 1024 * 1024:
1884 index_file_size = f"{size_bytes / 1024:.1f} KB"
1885 else:
1886 index_file_size = f"{size_bytes / (1024 * 1024):.1f} MB"
1888 return jsonify(
1889 {
1890 "success": True,
1891 "collection": {
1892 "id": collection.id,
1893 "name": collection.name,
1894 "description": collection.description,
1895 "embedding_model": collection.embedding_model,
1896 "embedding_model_type": collection.embedding_model_type.value
1897 if collection.embedding_model_type
1898 else None,
1899 "embedding_dimension": collection.embedding_dimension,
1900 "chunk_size": collection.chunk_size,
1901 "chunk_overlap": collection.chunk_overlap,
1902 # Advanced settings
1903 "splitter_type": collection.splitter_type,
1904 "distance_metric": collection.distance_metric,
1905 "index_type": collection.index_type,
1906 "normalize_vectors": collection.normalize_vectors,
1907 # Index file info
1908 "index_file_size": index_file_size,
1909 "index_file_size_bytes": index_file_size_bytes,
1910 },
1911 "documents": documents,
1912 }
1913 )
1915 except Exception as e:
1916 return handle_api_error("getting collection documents", e)
1919@rag_bp.route("/api/collections/<string:collection_id>/index", methods=["GET"])
1920@login_required
1921def index_collection(collection_id):
1922 """Index all documents in a collection with Server-Sent Events progress."""
1923 from ...database.session_context import get_user_db_session
1924 from ...database.session_passwords import session_password_store
1926 force_reindex = request.args.get("force_reindex", "false").lower() == "true"
1927 username = session["username"]
1928 session_id = session.get("session_id")
1930 logger.info(f"Starting index_collection, force_reindex={force_reindex}")
1932 # Get password for thread access to encrypted database
1933 db_password = None
1934 if session_id:
1935 db_password = session_password_store.get_session_password(
1936 username, session_id
1937 )
1939 # Create RAG service — on force reindex use current default model
1940 rag_service = get_rag_service(collection_id, use_defaults=force_reindex)
1941 # Set password for thread access
1942 rag_service.db_password = db_password
1943 logger.info(
1944 f"RAG service created: provider={rag_service.embedding_provider}"
1945 )
1947 def generate():
1948 """Generator for SSE progress updates."""
1949 logger.info("SSE generator started")
1950 try:
1951 with get_user_db_session(username, db_password) as db_session:
1952 # Verify collection exists in this user's database
1953 collection = (
1954 db_session.query(Collection)
1955 .filter_by(id=collection_id)
1956 .first()
1957 )
1959 if not collection:
1960 yield f"data: {json.dumps({'type': 'error', 'error': 'Collection not found'})}\n\n"
1961 return
1963 # Store embedding metadata on first index or force reindex
1964 if collection.embedding_model is None or force_reindex:
1965 # Get embedding dimension from the embedding manager
1966 embedding_dim = None
1967 try:
1968 # Try to get dimension from the embedding manager's provider
1969 if hasattr(rag_service.embedding_manager, "provider"):
1970 provider = rag_service.embedding_manager.provider
1971 if hasattr(provider, "embedding_dimension"):
1972 embedding_dim = provider.embedding_dimension
1973 except Exception as e:
1974 logger.warning(
1975 f"Could not get embedding dimension: {e}"
1976 )
1978 # Store the embedding configuration used for this collection
1979 collection.embedding_model = rag_service.embedding_model
1980 collection.embedding_model_type = EmbeddingProvider(
1981 rag_service.embedding_provider
1982 )
1983 collection.embedding_dimension = embedding_dim
1984 collection.chunk_size = rag_service.chunk_size
1985 collection.chunk_overlap = rag_service.chunk_overlap
1986 # Store advanced settings
1987 collection.splitter_type = rag_service.splitter_type
1988 collection.text_separators = rag_service.text_separators
1989 collection.distance_metric = rag_service.distance_metric
1990 # Ensure normalize_vectors is a proper boolean for database
1991 collection.normalize_vectors = bool(
1992 rag_service.normalize_vectors
1993 )
1994 collection.index_type = rag_service.index_type
1995 db_session.commit()
1996 logger.info(
1997 f"Stored embedding metadata for collection: provider={rag_service.embedding_provider}"
1998 )
2000 # Get documents to index
2001 query = (
2002 db_session.query(DocumentCollection, Document)
2003 .join(Document)
2004 .filter(DocumentCollection.collection_id == collection_id)
2005 )
2007 if not force_reindex:
2008 # IMPORTANT: DO NOT CHANGE THIS LINE! DO NOT REMOVE THIS COMMENT!
2009 # We MUST use `== False` instead of `not DocumentCollection.indexed`
2010 # The Python `not` operator does NOT work correctly in SQLAlchemy filters.
2011 # Using `not` will cause the query to return NO results (zero documents).
2012 # SQLAlchemy requires explicit comparison: `== False` or `== True`
2013 # This has been fixed multiple times - DO NOT change it back to `not`!
2014 query = query.filter(DocumentCollection.indexed == False) # noqa: E712
2016 doc_links = query.all()
2018 if not doc_links:
2019 logger.info("No documents to index in collection")
2020 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n"
2021 return
2023 total = len(doc_links)
2024 logger.info(f"Found {total} documents to index")
2025 results = {
2026 "successful": 0,
2027 "skipped": 0,
2028 "failed": 0,
2029 "errors": [],
2030 }
2032 yield f"data: {json.dumps({'type': 'start', 'message': f'Indexing {total} documents in collection: {collection.name}'})}\n\n"
2034 for idx, (link, doc) in enumerate(doc_links, 1):
2035 filename = doc.filename or doc.title or "Unknown"
2036 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': filename, 'percent': int((idx / total) * 100)})}\n\n"
2038 try:
2039 logger.debug(
2040 f"Indexing document {idx}/{total}: {filename}"
2041 )
2043 # Run index_document in a separate thread to allow sending SSE heartbeats.
2044 # This keeps the HTTP connection alive during long indexing operations,
2045 # preventing timeouts from proxy servers (nginx) and browsers.
2046 # The main thread periodically yields heartbeat comments while waiting.
2047 result_queue = queue.Queue()
2048 error_queue = queue.Queue()
2050 def index_in_thread():
2051 try:
2052 r = rag_service.index_document(
2053 document_id=doc.id,
2054 collection_id=collection_id,
2055 force_reindex=force_reindex,
2056 )
2057 result_queue.put(r)
2058 except Exception as ex:
2059 error_queue.put(ex)
2061 thread = threading.Thread(target=index_in_thread)
2062 thread.start()
2064 # Send heartbeats while waiting for the thread to complete
2065 heartbeat_interval = 5 # seconds
2066 while thread.is_alive():
2067 thread.join(timeout=heartbeat_interval)
2068 if thread.is_alive():
2069 # Send SSE comment as heartbeat (keeps connection alive)
2070 yield f": heartbeat {idx}/{total}\n\n"
2072 # Check for errors from thread
2073 if not error_queue.empty():
2074 raise error_queue.get()
2076 result = result_queue.get()
2077 logger.info(
2078 f"Indexed document {idx}/{total}: {filename} - status={result.get('status')}"
2079 )
2081 if result.get("status") == "success":
2082 results["successful"] += 1
2083 # DocumentCollection status is already updated in index_document
2084 # No need to update link here
2085 elif result.get("status") == "skipped":
2086 results["skipped"] += 1
2087 else:
2088 results["failed"] += 1
2089 error_msg = result.get("error", "Unknown error")
2090 results["errors"].append(
2091 {
2092 "filename": filename,
2093 "error": error_msg,
2094 }
2095 )
2096 logger.warning(
2097 f"Failed to index {filename} ({idx}/{total}): {error_msg}"
2098 )
2099 except Exception as e:
2100 results["failed"] += 1
2101 error_msg = str(e) or "Failed to index document"
2102 results["errors"].append(
2103 {
2104 "filename": filename,
2105 "error": error_msg,
2106 }
2107 )
2108 logger.exception(
2109 f"Exception indexing document {filename} ({idx}/{total})"
2110 )
2111 # Send error update to client so they know indexing is continuing
2112 yield f"data: {json.dumps({'type': 'doc_error', 'filename': filename, 'error': error_msg})}\n\n"
2114 db_session.commit()
2115 # Ensure all changes are written to disk
2116 db_session.flush()
2118 logger.info(
2119 f"Indexing complete: {results['successful']} successful, {results['failed']} failed, {results['skipped']} skipped"
2120 )
2121 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n"
2122 logger.info("SSE generator finished successfully")
2124 except Exception:
2125 logger.exception("Error in collection indexing")
2126 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n"
2128 response = Response(
2129 stream_with_context(generate()), mimetype="text/event-stream"
2130 )
2131 # Prevent buffering for proper SSE streaming
2132 response.headers["Cache-Control"] = "no-cache, no-transform"
2133 response.headers["Connection"] = "keep-alive"
2134 response.headers["X-Accel-Buffering"] = "no"
2135 return response
2138# =============================================================================
2139# Background Indexing Endpoints
2140# =============================================================================
2143def _get_rag_service_for_thread(
2144 collection_id: str,
2145 username: str,
2146 db_password: str,
2147 use_defaults: bool = False,
2148) -> LibraryRAGService:
2149 """
2150 Create RAG service for use in background threads (no Flask context).
2151 """
2152 from ...database.session_context import get_user_db_session
2153 from ...web_search_engines.engines.search_engine_local import (
2154 LocalEmbeddingManager,
2155 )
2156 import json
2158 with get_user_db_session(username, db_password) as db_session:
2159 settings_manager = SettingsManager(db_session)
2161 # Get default settings
2162 default_embedding_model = settings_manager.get_setting(
2163 "local_search_embedding_model", "all-MiniLM-L6-v2"
2164 )
2165 default_embedding_provider = settings_manager.get_setting(
2166 "local_search_embedding_provider", "sentence_transformers"
2167 )
2168 default_chunk_size = int(
2169 settings_manager.get_setting("local_search_chunk_size", 1000)
2170 )
2171 default_chunk_overlap = int(
2172 settings_manager.get_setting("local_search_chunk_overlap", 200)
2173 )
2174 default_splitter_type = settings_manager.get_setting(
2175 "local_search_splitter_type", "recursive"
2176 )
2177 default_text_separators = settings_manager.get_setting(
2178 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]'
2179 )
2180 if isinstance(default_text_separators, str): 2180 ↛ 2189line 2180 didn't jump to line 2189 because the condition on line 2180 was always true
2181 try:
2182 default_text_separators = json.loads(default_text_separators)
2183 except json.JSONDecodeError:
2184 logger.warning(
2185 f"Invalid JSON for local_search_text_separators setting: {default_text_separators!r}. "
2186 "Using default separators."
2187 )
2188 default_text_separators = ["\n\n", "\n", ". ", " ", ""]
2189 default_distance_metric = settings_manager.get_setting(
2190 "local_search_distance_metric", "cosine"
2191 )
2192 default_normalize_vectors = settings_manager.get_bool_setting(
2193 "local_search_normalize_vectors", True
2194 )
2195 default_index_type = settings_manager.get_setting(
2196 "local_search_index_type", "flat"
2197 )
2199 # Get settings snapshot for embedding manager
2200 settings_snapshot = settings_manager.get_settings_snapshot()
2201 settings_snapshot["_username"] = username
2203 # Check for collection's stored settings
2204 collection = (
2205 db_session.query(Collection).filter_by(id=collection_id).first()
2206 )
2208 if collection and collection.embedding_model and not use_defaults: 2208 ↛ 2238line 2208 didn't jump to line 2238 because the condition on line 2208 was always true
2209 # Use collection's stored settings
2210 embedding_model = collection.embedding_model
2211 embedding_provider = collection.embedding_model_type.value
2212 chunk_size = collection.chunk_size or default_chunk_size
2213 chunk_overlap = collection.chunk_overlap or default_chunk_overlap
2214 splitter_type = collection.splitter_type or default_splitter_type
2215 text_separators = (
2216 collection.text_separators or default_text_separators
2217 )
2218 distance_metric = (
2219 collection.distance_metric or default_distance_metric
2220 )
2221 index_type = collection.index_type or default_index_type
2223 coll_normalize = collection.normalize_vectors
2224 if coll_normalize is not None: 2224 ↛ 2234line 2224 didn't jump to line 2234 because the condition on line 2224 was always true
2225 if isinstance(coll_normalize, str): 2225 ↛ 2226line 2225 didn't jump to line 2226 because the condition on line 2225 was never true
2226 coll_normalize = coll_normalize.lower() in (
2227 "true",
2228 "1",
2229 "yes",
2230 )
2231 else:
2232 coll_normalize = bool(coll_normalize)
2233 else:
2234 coll_normalize = default_normalize_vectors
2235 normalize_vectors = coll_normalize
2236 else:
2237 # Use default settings
2238 embedding_model = default_embedding_model
2239 embedding_provider = default_embedding_provider
2240 chunk_size = default_chunk_size
2241 chunk_overlap = default_chunk_overlap
2242 splitter_type = default_splitter_type
2243 text_separators = default_text_separators
2244 distance_metric = default_distance_metric
2245 normalize_vectors = default_normalize_vectors
2246 index_type = default_index_type
2248 # Update settings snapshot with embedding config
2249 settings_snapshot.update(
2250 {
2251 "embeddings.provider": embedding_provider,
2252 f"embeddings.{embedding_provider}.model": embedding_model,
2253 "local_search_chunk_size": chunk_size,
2254 "local_search_chunk_overlap": chunk_overlap,
2255 }
2256 )
2258 # Create embedding manager (to avoid database access in LibraryRAGService.__init__)
2259 embedding_manager = LocalEmbeddingManager(
2260 embedding_model=embedding_model,
2261 embedding_model_type=embedding_provider,
2262 chunk_size=chunk_size,
2263 chunk_overlap=chunk_overlap,
2264 settings_snapshot=settings_snapshot,
2265 )
2266 embedding_manager.db_password = db_password
2268 # Create RAG service with pre-built embedding manager and db_password
2269 rag_service = LibraryRAGService(
2270 username=username,
2271 embedding_model=embedding_model,
2272 embedding_provider=embedding_provider,
2273 chunk_size=chunk_size,
2274 chunk_overlap=chunk_overlap,
2275 splitter_type=splitter_type,
2276 text_separators=text_separators,
2277 distance_metric=distance_metric,
2278 normalize_vectors=normalize_vectors,
2279 index_type=index_type,
2280 embedding_manager=embedding_manager,
2281 db_password=db_password,
2282 )
2284 return rag_service
2287def trigger_auto_index(
2288 document_ids: list[str],
2289 collection_id: str,
2290 username: str,
2291 db_password: str,
2292) -> None:
2293 """
2294 Trigger automatic RAG indexing for documents if auto-indexing is enabled.
2296 This function checks the auto_index_enabled setting and spawns a background
2297 thread to index the specified documents. It does not block the caller.
2299 Args:
2300 document_ids: List of document IDs to index
2301 collection_id: The collection to index into
2302 username: The username for database access
2303 db_password: The user's database password for thread-safe access
2304 """
2305 from ...database.session_context import get_user_db_session
2307 if not document_ids:
2308 logger.debug("No documents to auto-index")
2309 return
2311 # Check if auto-indexing is enabled
2312 try:
2313 with get_user_db_session(username, db_password) as db_session:
2314 settings = SettingsManager(db_session)
2315 auto_index_enabled = settings.get_bool_setting(
2316 "research_library.auto_index_enabled", True
2317 )
2319 if not auto_index_enabled:
2320 logger.debug("Auto-indexing is disabled, skipping")
2321 return
2322 except Exception:
2323 logger.exception(
2324 "Failed to check auto-index setting, skipping auto-index"
2325 )
2326 return
2328 logger.info(
2329 f"Auto-indexing {len(document_ids)} documents in collection {collection_id}"
2330 )
2332 # Submit to thread pool (bounded concurrency, prevents thread proliferation)
2333 executor = _get_auto_index_executor()
2334 executor.submit(
2335 _auto_index_documents_worker,
2336 document_ids,
2337 collection_id,
2338 username,
2339 db_password,
2340 )
2343def _auto_index_documents_worker(
2344 document_ids: list[str],
2345 collection_id: str,
2346 username: str,
2347 db_password: str,
2348) -> None:
2349 """
2350 Background worker to index documents automatically.
2352 This is a simpler worker than _background_index_worker - it doesn't track
2353 progress via TaskMetadata since it's meant to be a lightweight auto-indexing
2354 operation.
2355 """
2357 try:
2358 # Create RAG service (thread-safe, no Flask context needed)
2359 with _get_rag_service_for_thread(
2360 collection_id, username, db_password
2361 ) as rag_service:
2362 indexed_count = 0
2363 for doc_id in document_ids:
2364 try:
2365 result = rag_service.index_document(
2366 doc_id, collection_id, force_reindex=False
2367 )
2368 if result.get("status") == "success":
2369 indexed_count += 1
2370 logger.debug(f"Auto-indexed document {doc_id}")
2371 elif result.get("status") == "skipped": 2371 ↛ 2363line 2371 didn't jump to line 2363 because the condition on line 2371 was always true
2372 logger.debug(
2373 f"Document {doc_id} already indexed, skipped"
2374 )
2375 except Exception:
2376 logger.exception(f"Failed to auto-index document {doc_id}")
2378 logger.info(
2379 f"Auto-indexing complete: {indexed_count}/{len(document_ids)} documents indexed"
2380 )
2382 except Exception:
2383 logger.exception("Auto-indexing worker failed")
2386def _background_index_worker(
2387 task_id: str,
2388 collection_id: str,
2389 username: str,
2390 db_password: str,
2391 force_reindex: bool,
2392):
2393 """
2394 Background worker thread for indexing documents.
2395 Updates TaskMetadata with progress and checks for cancellation.
2396 """
2397 from ...database.session_context import get_user_db_session
2399 try:
2400 # Create RAG service (thread-safe, no Flask context needed)
2401 with _get_rag_service_for_thread(
2402 collection_id, username, db_password, use_defaults=force_reindex
2403 ) as rag_service:
2404 with get_user_db_session(username, db_password) as db_session:
2405 # Get collection
2406 collection = (
2407 db_session.query(Collection)
2408 .filter_by(id=collection_id)
2409 .first()
2410 )
2412 if not collection:
2413 _update_task_status(
2414 username,
2415 db_password,
2416 task_id,
2417 status="failed",
2418 error_message="Collection not found",
2419 )
2420 return
2422 # Store embedding metadata on first index or force reindex
2423 if collection.embedding_model is None or force_reindex:
2424 collection.embedding_model = rag_service.embedding_model
2425 collection.embedding_model_type = EmbeddingProvider(
2426 rag_service.embedding_provider
2427 )
2428 collection.chunk_size = rag_service.chunk_size
2429 collection.chunk_overlap = rag_service.chunk_overlap
2430 collection.splitter_type = rag_service.splitter_type
2431 collection.text_separators = rag_service.text_separators
2432 collection.distance_metric = rag_service.distance_metric
2433 collection.normalize_vectors = bool(
2434 rag_service.normalize_vectors
2435 )
2436 collection.index_type = rag_service.index_type
2437 db_session.commit()
2439 # Get documents to index
2440 query = (
2441 db_session.query(DocumentCollection, Document)
2442 .join(Document)
2443 .filter(DocumentCollection.collection_id == collection_id)
2444 )
2446 if not force_reindex:
2447 query = query.filter(DocumentCollection.indexed == False) # noqa: E712
2449 doc_links = query.all()
2451 if not doc_links:
2452 _update_task_status(
2453 username,
2454 db_password,
2455 task_id,
2456 status="completed",
2457 progress_message="No documents to index",
2458 )
2459 return
2461 total = len(doc_links)
2462 results = {"successful": 0, "skipped": 0, "failed": 0}
2464 # Update task with total count
2465 _update_task_status(
2466 username,
2467 db_password,
2468 task_id,
2469 progress_total=total,
2470 progress_message=f"Indexing {total} documents",
2471 )
2473 for idx, (link, doc) in enumerate(doc_links, 1):
2474 # Check if cancelled
2475 if _is_task_cancelled(username, db_password, task_id):
2476 _update_task_status(
2477 username,
2478 db_password,
2479 task_id,
2480 status="cancelled",
2481 progress_message=f"Cancelled after {idx - 1}/{total} documents",
2482 )
2483 logger.info(f"Indexing task {task_id} was cancelled")
2484 return
2486 filename = doc.filename or doc.title or "Unknown"
2488 # Update progress with filename
2489 _update_task_status(
2490 username,
2491 db_password,
2492 task_id,
2493 progress_current=idx,
2494 progress_message=f"Indexing {idx}/{total}: {filename}",
2495 )
2497 try:
2498 result = rag_service.index_document(
2499 document_id=doc.id,
2500 collection_id=collection_id,
2501 force_reindex=force_reindex,
2502 )
2504 if result.get("status") == "success":
2505 results["successful"] += 1
2506 elif result.get("status") == "skipped":
2507 results["skipped"] += 1
2508 else:
2509 results["failed"] += 1
2511 except Exception:
2512 results["failed"] += 1
2513 logger.exception(
2514 f"Error indexing document {idx}/{total}"
2515 )
2517 db_session.commit()
2519 # Mark as completed
2520 _update_task_status(
2521 username,
2522 db_password,
2523 task_id,
2524 status="completed",
2525 progress_current=total,
2526 progress_message=f"Completed: {results['successful']} indexed, {results['failed']} failed, {results['skipped']} skipped",
2527 )
2528 logger.info(
2529 f"Background indexing task {task_id} completed: {results}"
2530 )
2532 except Exception as e:
2533 logger.exception(f"Background indexing task {task_id} failed")
2534 _update_task_status(
2535 username,
2536 db_password,
2537 task_id,
2538 status="failed",
2539 error_message=str(e),
2540 )
2543def _update_task_status(
2544 username: str,
2545 db_password: str,
2546 task_id: str,
2547 status: str = None,
2548 progress_current: int = None,
2549 progress_total: int = None,
2550 progress_message: str = None,
2551 error_message: str = None,
2552):
2553 """Update task metadata in the database."""
2554 from ...database.session_context import get_user_db_session
2556 try:
2557 with get_user_db_session(username, db_password) as db_session:
2558 task = (
2559 db_session.query(TaskMetadata)
2560 .filter_by(task_id=task_id)
2561 .first()
2562 )
2563 if task:
2564 if status is not None:
2565 task.status = status
2566 if status == "completed":
2567 task.completed_at = datetime.now(UTC)
2568 if progress_current is not None:
2569 task.progress_current = progress_current
2570 if progress_total is not None:
2571 task.progress_total = progress_total
2572 if progress_message is not None:
2573 task.progress_message = progress_message
2574 if error_message is not None:
2575 task.error_message = error_message
2576 db_session.commit()
2577 except Exception:
2578 logger.exception(f"Failed to update task status for {task_id}")
2581def _is_task_cancelled(username: str, db_password: str, task_id: str) -> bool:
2582 """Check if a task has been cancelled."""
2583 from ...database.session_context import get_user_db_session
2585 try:
2586 with get_user_db_session(username, db_password) as db_session:
2587 task = (
2588 db_session.query(TaskMetadata)
2589 .filter_by(task_id=task_id)
2590 .first()
2591 )
2592 return task and task.status == "cancelled"
2593 except Exception:
2594 return False
2597@rag_bp.route(
2598 "/api/collections/<string:collection_id>/index/start", methods=["POST"]
2599)
2600@login_required
2601def start_background_index(collection_id):
2602 """Start background indexing for a collection."""
2603 from ...database.session_context import get_user_db_session
2604 from ...database.session_passwords import session_password_store
2606 username = session["username"]
2607 session_id = session.get("session_id")
2609 # Get password for thread access
2610 db_password = None
2611 if session_id:
2612 db_password = session_password_store.get_session_password(
2613 username, session_id
2614 )
2616 # Parse request body
2617 data = request.get_json() or {}
2618 force_reindex = data.get("force_reindex", False)
2620 try:
2621 with get_user_db_session(username, db_password) as db_session:
2622 # Check if there's already an active indexing task for this collection
2623 existing_task = (
2624 db_session.query(TaskMetadata)
2625 .filter(
2626 TaskMetadata.task_type == "indexing",
2627 TaskMetadata.status == "processing",
2628 )
2629 .first()
2630 )
2632 if existing_task:
2633 # Check if it's for this collection
2634 metadata = existing_task.metadata_json or {}
2635 if metadata.get("collection_id") == collection_id:
2636 return jsonify(
2637 {
2638 "success": False,
2639 "error": "Indexing is already in progress for this collection",
2640 "task_id": existing_task.task_id,
2641 }
2642 ), 409
2644 # Create new task
2645 task_id = str(uuid.uuid4())
2646 task = TaskMetadata(
2647 task_id=task_id,
2648 status="processing",
2649 task_type="indexing",
2650 created_at=datetime.now(UTC),
2651 started_at=datetime.now(UTC),
2652 progress_current=0,
2653 progress_total=0,
2654 progress_message="Starting indexing...",
2655 metadata_json={
2656 "collection_id": collection_id,
2657 "force_reindex": force_reindex,
2658 },
2659 )
2660 db_session.add(task)
2661 db_session.commit()
2663 # Start background thread
2664 thread = threading.Thread(
2665 target=_background_index_worker,
2666 args=(task_id, collection_id, username, db_password, force_reindex),
2667 daemon=True,
2668 )
2669 thread.start()
2671 logger.info(
2672 f"Started background indexing task {task_id} for collection {collection_id}"
2673 )
2675 return jsonify(
2676 {
2677 "success": True,
2678 "task_id": task_id,
2679 "message": "Indexing started in background",
2680 }
2681 )
2683 except Exception:
2684 logger.exception("Failed to start background indexing")
2685 return jsonify(
2686 {
2687 "success": False,
2688 "error": "Failed to start indexing. Please try again.",
2689 }
2690 ), 500
2693@rag_bp.route(
2694 "/api/collections/<string:collection_id>/index/status", methods=["GET"]
2695)
2696@limiter.exempt
2697@login_required
2698def get_index_status(collection_id):
2699 """Get the current indexing status for a collection."""
2700 from ...database.session_context import get_user_db_session
2701 from ...database.session_passwords import session_password_store
2703 username = session["username"]
2704 session_id = session.get("session_id")
2706 db_password = None
2707 if session_id:
2708 db_password = session_password_store.get_session_password(
2709 username, session_id
2710 )
2712 try:
2713 with get_user_db_session(username, db_password) as db_session:
2714 # Find the most recent indexing task for this collection
2715 task = (
2716 db_session.query(TaskMetadata)
2717 .filter(TaskMetadata.task_type == "indexing")
2718 .order_by(TaskMetadata.created_at.desc())
2719 .first()
2720 )
2722 if not task:
2723 return jsonify(
2724 {
2725 "status": "idle",
2726 "message": "No indexing task found",
2727 }
2728 )
2730 # Check if it's for this collection
2731 metadata = task.metadata_json or {}
2732 if metadata.get("collection_id") != collection_id:
2733 return jsonify(
2734 {
2735 "status": "idle",
2736 "message": "No indexing task for this collection",
2737 }
2738 )
2740 return jsonify(
2741 {
2742 "task_id": task.task_id,
2743 "status": task.status,
2744 "progress_current": task.progress_current or 0,
2745 "progress_total": task.progress_total or 0,
2746 "progress_message": task.progress_message,
2747 "error_message": task.error_message,
2748 "created_at": task.created_at.isoformat()
2749 if task.created_at
2750 else None,
2751 "completed_at": task.completed_at.isoformat()
2752 if task.completed_at
2753 else None,
2754 }
2755 )
2757 except Exception:
2758 logger.exception("Failed to get index status")
2759 return jsonify(
2760 {
2761 "status": "error",
2762 "error": "Failed to get indexing status. Please try again.",
2763 }
2764 ), 500
2767@rag_bp.route(
2768 "/api/collections/<string:collection_id>/index/cancel", methods=["POST"]
2769)
2770@login_required
2771def cancel_indexing(collection_id):
2772 """Cancel an active indexing task for a collection."""
2773 from ...database.session_context import get_user_db_session
2774 from ...database.session_passwords import session_password_store
2776 username = session["username"]
2777 session_id = session.get("session_id")
2779 db_password = None
2780 if session_id:
2781 db_password = session_password_store.get_session_password(
2782 username, session_id
2783 )
2785 try:
2786 with get_user_db_session(username, db_password) as db_session:
2787 # Find active indexing task for this collection
2788 task = (
2789 db_session.query(TaskMetadata)
2790 .filter(
2791 TaskMetadata.task_type == "indexing",
2792 TaskMetadata.status == "processing",
2793 )
2794 .first()
2795 )
2797 if not task:
2798 return jsonify(
2799 {
2800 "success": False,
2801 "error": "No active indexing task found",
2802 }
2803 ), 404
2805 # Check if it's for this collection
2806 metadata = task.metadata_json or {}
2807 if metadata.get("collection_id") != collection_id:
2808 return jsonify(
2809 {
2810 "success": False,
2811 "error": "No active indexing task for this collection",
2812 }
2813 ), 404
2815 # Mark as cancelled - the worker thread will check this
2816 task.status = "cancelled"
2817 task.progress_message = "Cancellation requested..."
2818 db_session.commit()
2820 logger.info(
2821 f"Cancelled indexing task {task.task_id} for collection {collection_id}"
2822 )
2824 return jsonify(
2825 {
2826 "success": True,
2827 "message": "Cancellation requested",
2828 "task_id": task.task_id,
2829 }
2830 )
2832 except Exception:
2833 logger.exception("Failed to cancel indexing")
2834 return jsonify(
2835 {
2836 "success": False,
2837 "error": "Failed to cancel indexing. Please try again.",
2838 }
2839 ), 500