Coverage for src / local_deep_research / research_library / routes / rag_routes.py: 85%
987 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2RAG Management API Routes
4Provides endpoints for managing RAG indexing of library documents:
5- Configure embedding models
6- Index documents
7- Get RAG statistics
8- Bulk operations with progress tracking
9"""
11import os
13from flask import (
14 Blueprint,
15 jsonify,
16 request,
17 Response,
18 render_template,
19 session,
20 stream_with_context,
21)
22from loguru import logger
23import atexit
24import glob
25import json
26import uuid
27import time
28import threading
29import queue
30from concurrent.futures import ThreadPoolExecutor
31from datetime import datetime, UTC
32from pathlib import Path
33from typing import Optional
35from ...constants import FILE_PATH_SENTINELS, FILE_PATH_TEXT_ONLY
36from ...security.decorators import require_json_body
37from ...web.auth.decorators import login_required
38from ...utilities.db_utils import get_settings_manager
39from ..services.library_rag_service import LibraryRAGService
40from ...settings.manager import SettingsManager
41from ...security.path_validator import PathValidator
42from ...security.rate_limiter import (
43 upload_rate_limit_ip,
44 upload_rate_limit_user,
45)
46from ..utils import handle_api_error
47from ...database.models.library import (
48 Document,
49 Collection,
50 DocumentCollection,
51 RAGIndex,
52 SourceType,
53 EmbeddingProvider,
54)
55from ...database.models.queue import TaskMetadata
56from ...database.thread_local_session import thread_cleanup
57from ...security.rate_limiter import limiter
58from ...config.paths import get_library_directory
60rag_bp = Blueprint("rag", __name__, url_prefix="/library")
62# NOTE: Routes use session["username"] (not .get()) intentionally.
63# @login_required guarantees the key exists; direct access fails fast
64# if the decorator is ever removed.
66# Global ThreadPoolExecutor for auto-indexing to prevent thread proliferation
67_auto_index_executor: ThreadPoolExecutor | None = None
68_auto_index_executor_lock = threading.Lock()
71def _get_auto_index_executor() -> ThreadPoolExecutor:
72 """Get or create the global auto-indexing executor (thread-safe)."""
73 global _auto_index_executor
74 with _auto_index_executor_lock:
75 if _auto_index_executor is None:
76 _auto_index_executor = ThreadPoolExecutor(
77 max_workers=4,
78 thread_name_prefix="auto_index_",
79 )
80 return _auto_index_executor
83def _shutdown_auto_index_executor() -> None:
84 """Shutdown the auto-index executor gracefully."""
85 global _auto_index_executor
86 if _auto_index_executor is not None:
87 _auto_index_executor.shutdown(wait=True)
88 _auto_index_executor = None
91atexit.register(_shutdown_auto_index_executor)
94def get_rag_service(
95 collection_id: Optional[str] = None,
96 use_defaults: bool = False,
97) -> LibraryRAGService:
98 """
99 Get RAG service instance with appropriate settings.
101 Delegates to rag_service_factory.get_rag_service() with the current
102 Flask session username. For non-Flask contexts, use the factory directly.
104 Args:
105 collection_id: Optional collection UUID to load stored settings from
106 use_defaults: When True, ignore stored collection settings and use
107 current defaults. Pass True on force-reindex so that the new
108 default embedding model is picked up.
109 """
110 from ..services.rag_service_factory import (
111 get_rag_service as _get_rag_service,
112 )
113 from ...database.session_passwords import session_password_store
115 username = session["username"]
116 session_id = session.get("session_id")
117 db_password = None
118 if session_id:
119 db_password = session_password_store.get_session_password(
120 username, session_id
121 )
122 return _get_rag_service(
123 username,
124 collection_id,
125 use_defaults=use_defaults,
126 db_password=db_password,
127 )
130# Config API Routes
133@rag_bp.route("/api/config/supported-formats", methods=["GET"])
134@login_required
135def get_supported_formats():
136 """Return list of supported file formats for upload.
138 This endpoint provides the single source of truth for supported file
139 extensions, pulling from the document_loaders registry. The UI can
140 use this to dynamically update the file input accept attribute.
141 """
142 from ...document_loaders import get_supported_extensions
144 extensions = get_supported_extensions()
145 # Sort extensions for consistent display
146 extensions = sorted(extensions)
148 return jsonify(
149 {
150 "extensions": extensions,
151 "accept_string": ",".join(extensions),
152 "count": len(extensions),
153 }
154 )
157# Page Routes
160@rag_bp.route("/embedding-settings")
161@login_required
162def embedding_settings_page():
163 """Render the Embedding Settings page."""
164 return render_template(
165 "pages/embedding_settings.html", active_page="embedding-settings"
166 )
169@rag_bp.route("/document/<string:document_id>/chunks")
170@login_required
171def view_document_chunks(document_id):
172 """View all chunks for a document across all collections."""
173 from ...database.session_context import get_user_db_session
174 from ...database.models.library import DocumentChunk
176 username = session["username"]
178 with get_user_db_session(username) as db_session:
179 # Get document info
180 document = db_session.query(Document).filter_by(id=document_id).first()
182 if not document:
183 return "Document not found", 404
185 # Get all chunks for this document
186 chunks = (
187 db_session.query(DocumentChunk)
188 .filter(DocumentChunk.source_id == document_id)
189 .order_by(DocumentChunk.collection_name, DocumentChunk.chunk_index)
190 .all()
191 )
193 # Group chunks by collection
194 chunks_by_collection = {}
195 for chunk in chunks:
196 coll_name = chunk.collection_name
197 if coll_name not in chunks_by_collection: 197 ↛ 211line 197 didn't jump to line 211 because the condition on line 197 was always true
198 # Get collection display name
199 collection_id = coll_name.replace("collection_", "")
200 collection = (
201 db_session.query(Collection)
202 .filter_by(id=collection_id)
203 .first()
204 )
205 chunks_by_collection[coll_name] = {
206 "name": collection.name if collection else coll_name,
207 "id": collection_id,
208 "chunks": [],
209 }
211 chunks_by_collection[coll_name]["chunks"].append(
212 {
213 "id": chunk.id,
214 "index": chunk.chunk_index,
215 "text": chunk.chunk_text,
216 "word_count": chunk.word_count,
217 "start_char": chunk.start_char,
218 "end_char": chunk.end_char,
219 "embedding_model": chunk.embedding_model,
220 "embedding_model_type": chunk.embedding_model_type.value
221 if chunk.embedding_model_type
222 else None,
223 "embedding_dimension": chunk.embedding_dimension,
224 "created_at": chunk.created_at,
225 }
226 )
228 return render_template(
229 "pages/document_chunks.html",
230 document=document,
231 chunks_by_collection=chunks_by_collection,
232 total_chunks=len(chunks),
233 )
236@rag_bp.route("/collections")
237@login_required
238def collections_page():
239 """Render the Collections page."""
240 return render_template("pages/collections.html", active_page="collections")
243@rag_bp.route("/collections/<string:collection_id>")
244@login_required
245def collection_details_page(collection_id):
246 """Render the Collection Details page."""
247 return render_template(
248 "pages/collection_details.html",
249 active_page="collections",
250 collection_id=collection_id,
251 )
254@rag_bp.route("/collections/<string:collection_id>/upload")
255@login_required
256def collection_upload_page(collection_id):
257 """Render the Collection Upload page."""
258 # Get the upload PDF storage setting
259 settings = get_settings_manager()
260 upload_pdf_storage = settings.get_setting(
261 "research_library.upload_pdf_storage", "none"
262 )
263 # Only allow valid values for uploads (no filesystem)
264 if upload_pdf_storage not in ("database", "none"):
265 upload_pdf_storage = "none"
267 return render_template(
268 "pages/collection_upload.html",
269 active_page="collections",
270 collection_id=collection_id,
271 collection_name=None, # Could fetch from DB if needed
272 upload_pdf_storage=upload_pdf_storage,
273 )
276@rag_bp.route("/collections/create")
277@login_required
278def collection_create_page():
279 """Render the Create Collection page."""
280 return render_template(
281 "pages/collection_create.html", active_page="collections"
282 )
285# API Routes
288@rag_bp.route("/api/rag/settings", methods=["GET"])
289@login_required
290def get_current_settings():
291 """Get current RAG configuration from settings."""
292 import json as json_lib
294 try:
295 settings = get_settings_manager()
297 # Get text separators and parse if needed
298 text_separators = settings.get_setting(
299 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]'
300 )
301 if isinstance(text_separators, str): 301 ↛ 311line 301 didn't jump to line 311 because the condition on line 301 was always true
302 try:
303 text_separators = json_lib.loads(text_separators)
304 except json_lib.JSONDecodeError:
305 logger.warning(
306 f"Invalid JSON for local_search_text_separators setting: {text_separators!r}. "
307 "Using default separators."
308 )
309 text_separators = ["\n\n", "\n", ". ", " ", ""]
311 normalize_vectors = settings.get_setting(
312 "local_search_normalize_vectors", True
313 )
315 return jsonify(
316 {
317 "success": True,
318 "settings": {
319 "embedding_provider": settings.get_setting(
320 "local_search_embedding_provider",
321 "sentence_transformers",
322 ),
323 "embedding_model": settings.get_setting(
324 "local_search_embedding_model", "all-MiniLM-L6-v2"
325 ),
326 "chunk_size": settings.get_setting(
327 "local_search_chunk_size", 1000
328 ),
329 "chunk_overlap": settings.get_setting(
330 "local_search_chunk_overlap", 200
331 ),
332 "splitter_type": settings.get_setting(
333 "local_search_splitter_type", "recursive"
334 ),
335 "text_separators": text_separators,
336 "distance_metric": settings.get_setting(
337 "local_search_distance_metric", "cosine"
338 ),
339 "normalize_vectors": normalize_vectors,
340 "index_type": settings.get_setting(
341 "local_search_index_type", "flat"
342 ),
343 },
344 }
345 )
346 except Exception as e:
347 return handle_api_error("getting RAG settings", e)
350@rag_bp.route("/api/rag/test-embedding", methods=["POST"])
351@login_required
352@require_json_body(error_format="success")
353def test_embedding():
354 """Test an embedding configuration by generating a test embedding."""
356 try:
357 data = request.json
358 provider = data.get("provider")
359 model = data.get("model")
360 test_text = data.get("test_text", "This is a test.")
362 if not provider or not model:
363 return jsonify(
364 {"success": False, "error": "Provider and model are required"}
365 ), 400
367 # Import embedding functions
368 from ...embeddings.embeddings_config import (
369 get_embedding_function,
370 )
372 logger.info(
373 f"Testing embedding with provider={provider}, model={model}"
374 )
376 # Get user's settings so provider URLs (e.g. Ollama) are resolved correctly
377 settings = get_settings_manager()
378 settings_snapshot = (
379 settings.get_all_settings()
380 if hasattr(settings, "get_all_settings")
381 else {}
382 )
384 # Get embedding function with the specified configuration
385 start_time = time.time()
386 embedding_func = get_embedding_function(
387 provider=provider,
388 model_name=model,
389 settings_snapshot=settings_snapshot,
390 )
392 # Generate test embedding
393 embedding = embedding_func([test_text])[0]
394 response_time_ms = int((time.time() - start_time) * 1000)
396 # Get embedding dimension
397 dimension = len(embedding) if hasattr(embedding, "__len__") else None
399 return jsonify(
400 {
401 "success": True,
402 "dimension": dimension,
403 "response_time_ms": response_time_ms,
404 "provider": provider,
405 "model": model,
406 }
407 )
409 except Exception as e:
410 logger.exception("Error during testing embedding")
411 error_str = str(e).lower()
413 # Detect common signs that an LLM was selected instead of an embedding model
414 llm_hints = [
415 "does not support",
416 "not an embedding",
417 "generate embedding",
418 "invalid model",
419 "not found",
420 "expected float",
421 "could not convert",
422 "list index out of range",
423 "object is not subscriptable",
424 "not iterable",
425 "json",
426 "chat",
427 "completion",
428 ]
429 is_likely_llm = any(hint in error_str for hint in llm_hints)
431 if is_likely_llm:
432 user_message = (
433 f"Embedding test failed for model '{model}'. "
434 "This is most likely because an LLM (language model) was selected "
435 "instead of an embedding model. Please choose a dedicated embedding "
436 "model (e.g. nomic-embed-text, mxbai-embed-large, "
437 "all-MiniLM-L6-v2)."
438 )
439 else:
440 user_message = (
441 f"Embedding test failed for model '{model}'. "
442 "If you are unsure whether the selected model supports embeddings, "
443 "try a dedicated embedding model instead (e.g. nomic-embed-text, "
444 "mxbai-embed-large, all-MiniLM-L6-v2)."
445 )
447 return jsonify({"success": False, "error": user_message}), 500
450@rag_bp.route("/api/rag/models", methods=["GET"])
451@login_required
452def get_available_models():
453 """Get list of available embedding providers and models."""
454 try:
455 from ...embeddings.embeddings_config import _get_provider_classes
457 # Get current settings for providers
458 settings = get_settings_manager()
459 settings_snapshot = (
460 settings.get_all_settings()
461 if hasattr(settings, "get_all_settings")
462 else {}
463 )
465 # Get provider classes
466 provider_classes = _get_provider_classes()
468 # Provider display names
469 provider_labels = {
470 "sentence_transformers": "Sentence Transformers (Local)",
471 "ollama": "Ollama (Local)",
472 "openai": "OpenAI API",
473 }
475 # Get provider options and models by looping through providers
476 provider_options = []
477 providers = {}
479 for provider_key, provider_class in provider_classes.items():
480 available = provider_class.is_available(settings_snapshot)
482 # Always show the provider in the dropdown so users can
483 # configure its settings (e.g. fix a wrong Ollama URL).
484 provider_options.append(
485 {
486 "value": provider_key,
487 "label": provider_labels.get(provider_key, provider_key),
488 "available": available,
489 }
490 )
492 # Only fetch models when the provider is reachable.
493 if available:
494 models = provider_class.get_available_models(settings_snapshot)
495 providers[provider_key] = [
496 {
497 "value": m["value"],
498 "label": m["label"],
499 "provider": provider_key,
500 **(
501 {"is_embedding": m["is_embedding"]}
502 if "is_embedding" in m
503 else {}
504 ),
505 }
506 for m in models
507 ]
508 else:
509 providers[provider_key] = []
511 return jsonify(
512 {
513 "success": True,
514 "provider_options": provider_options,
515 "providers": providers,
516 }
517 )
518 except Exception as e:
519 return handle_api_error("getting available models", e)
522@rag_bp.route("/api/rag/info", methods=["GET"])
523@login_required
524def get_index_info():
525 """Get information about the current RAG index."""
526 from ...database.library_init import get_default_library_id
528 try:
529 # Get collection_id from request or use default Library collection
530 collection_id = request.args.get("collection_id")
531 if not collection_id:
532 collection_id = get_default_library_id(session["username"])
534 logger.info(
535 f"Getting RAG index info for collection_id: {collection_id}"
536 )
538 rag_service = get_rag_service(collection_id)
539 info = rag_service.get_current_index_info(collection_id)
541 if info is None:
542 logger.info(
543 f"No RAG index found for collection_id: {collection_id}"
544 )
545 return jsonify(
546 {"success": True, "info": None, "message": "No index found"}
547 )
549 logger.info(f"Found RAG index for collection_id: {collection_id}")
550 return jsonify({"success": True, "info": info})
551 except Exception as e:
552 return handle_api_error("getting index info", e)
555@rag_bp.route("/api/rag/stats", methods=["GET"])
556@login_required
557def get_rag_stats():
558 """Get RAG statistics for a collection."""
559 from ...database.library_init import get_default_library_id
561 try:
562 # Get collection_id from request or use default Library collection
563 collection_id = request.args.get("collection_id")
564 if not collection_id:
565 collection_id = get_default_library_id(session["username"])
567 rag_service = get_rag_service(collection_id)
568 stats = rag_service.get_rag_stats(collection_id)
570 return jsonify({"success": True, "stats": stats})
571 except Exception as e:
572 return handle_api_error("getting RAG stats", e)
575@rag_bp.route("/api/rag/index-document", methods=["POST"])
576@login_required
577@require_json_body(error_format="success")
578def index_document():
579 """Index a single document in a collection."""
580 from ...database.library_init import get_default_library_id
582 try:
583 data = request.get_json()
584 text_doc_id = data.get("text_doc_id")
585 force_reindex = data.get("force_reindex", False)
586 collection_id = data.get("collection_id")
588 if not text_doc_id:
589 return jsonify(
590 {"success": False, "error": "text_doc_id is required"}
591 ), 400
593 # Get collection_id from request or use default Library collection
594 if not collection_id:
595 collection_id = get_default_library_id(session["username"])
597 rag_service = get_rag_service(collection_id)
598 result = rag_service.index_document(
599 text_doc_id, collection_id, force_reindex
600 )
602 if result["status"] == "error":
603 return jsonify(
604 {"success": False, "error": result.get("error")}
605 ), 400
607 return jsonify({"success": True, "result": result})
608 except Exception as e:
609 return handle_api_error(f"indexing document {text_doc_id}", e)
612@rag_bp.route("/api/rag/remove-document", methods=["POST"])
613@login_required
614@require_json_body(error_format="success")
615def remove_document():
616 """Remove a document from RAG in a collection."""
617 from ...database.library_init import get_default_library_id
619 try:
620 data = request.get_json()
621 text_doc_id = data.get("text_doc_id")
622 collection_id = data.get("collection_id")
624 if not text_doc_id:
625 return jsonify(
626 {"success": False, "error": "text_doc_id is required"}
627 ), 400
629 # Get collection_id from request or use default Library collection
630 if not collection_id: 630 ↛ 633line 630 didn't jump to line 633 because the condition on line 630 was always true
631 collection_id = get_default_library_id(session["username"])
633 rag_service = get_rag_service(collection_id)
634 result = rag_service.remove_document_from_rag(
635 text_doc_id, collection_id
636 )
638 if result["status"] == "error":
639 return jsonify(
640 {"success": False, "error": result.get("error")}
641 ), 400
643 return jsonify({"success": True, "result": result})
644 except Exception as e:
645 return handle_api_error(f"removing document {text_doc_id}", e)
648@rag_bp.route("/api/rag/index-research", methods=["POST"])
649@login_required
650@require_json_body(error_format="success")
651def index_research():
652 """Index all documents from a research."""
653 try:
654 data = request.get_json()
655 research_id = data.get("research_id")
656 force_reindex = data.get("force_reindex", False)
658 if not research_id:
659 return jsonify(
660 {"success": False, "error": "research_id is required"}
661 ), 400
663 rag_service = get_rag_service()
664 results = rag_service.index_research_documents(
665 research_id, force_reindex
666 )
668 return jsonify({"success": True, "results": results})
669 except Exception as e:
670 return handle_api_error(f"indexing research {research_id}", e)
673@rag_bp.route("/api/rag/index-all", methods=["GET"])
674@login_required
675def index_all():
676 """Index all documents in a collection with Server-Sent Events progress."""
677 from ...database.session_context import get_user_db_session
678 from ...database.library_init import get_default_library_id
680 force_reindex = request.args.get("force_reindex", "false").lower() == "true"
681 username = session["username"]
683 # Get collection_id from request or use default Library collection
684 collection_id = request.args.get("collection_id")
685 if not collection_id:
686 collection_id = get_default_library_id(username)
688 logger.info(
689 f"Starting index-all for collection_id: {collection_id}, force_reindex: {force_reindex}"
690 )
692 # Create RAG service in request context before generator runs
693 rag_service = get_rag_service(collection_id)
695 def generate():
696 """Generator function for SSE progress updates."""
697 try:
698 # Send initial status
699 yield f"data: {json.dumps({'type': 'start', 'message': 'Starting bulk indexing...'})}\n\n"
701 # Get document IDs to index from DocumentCollection
702 with get_user_db_session(username) as db_session:
703 # Query Document joined with DocumentCollection for the collection
704 query = (
705 db_session.query(Document.id, Document.title)
706 .join(
707 DocumentCollection,
708 Document.id == DocumentCollection.document_id,
709 )
710 .filter(DocumentCollection.collection_id == collection_id)
711 )
713 if not force_reindex:
714 # Only index documents that haven't been indexed yet
715 query = query.filter(DocumentCollection.indexed.is_(False))
717 doc_info = [(doc_id, title) for doc_id, title in query.all()]
719 if not doc_info:
720 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n"
721 return
723 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []}
724 total = len(doc_info)
726 # Process documents in batches to optimize performance
727 # Get batch size from settings
728 settings = get_settings_manager()
729 batch_size = int(
730 settings.get_setting("rag.indexing_batch_size", 15)
731 )
732 processed = 0
734 for i in range(0, len(doc_info), batch_size):
735 batch = doc_info[i : i + batch_size]
737 # Process batch with collection_id
738 batch_results = rag_service.index_documents_batch(
739 batch, collection_id, force_reindex
740 )
742 # Process results and send progress updates
743 for j, (doc_id, title) in enumerate(batch):
744 processed += 1
745 result = batch_results[doc_id]
747 # Send progress update
748 yield f"data: {json.dumps({'type': 'progress', 'current': processed, 'total': total, 'title': title, 'percent': int((processed / total) * 100)})}\n\n"
750 if result["status"] == "success":
751 results["successful"] += 1
752 elif result["status"] == "skipped":
753 results["skipped"] += 1
754 else:
755 results["failed"] += 1
756 results["errors"].append(
757 {
758 "doc_id": doc_id,
759 "title": title,
760 "error": result.get("error"),
761 }
762 )
764 # Send completion status
765 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n"
767 # Log final status for debugging
768 logger.info(
769 f"Bulk indexing complete: {results['successful']} successful, {results['skipped']} skipped, {results['failed']} failed"
770 )
772 except Exception:
773 logger.exception("Error in bulk indexing")
774 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n"
776 return Response(
777 stream_with_context(generate()), mimetype="text/event-stream"
778 )
781@rag_bp.route("/api/rag/configure", methods=["POST"])
782@login_required
783@require_json_body(error_format="success")
784def configure_rag():
785 """
786 Change RAG configuration (embedding model, chunk size, etc.).
787 This will create a new index with the new configuration.
788 """
789 import json as json_lib
791 try:
792 data = request.get_json()
793 embedding_model = data.get("embedding_model")
794 embedding_provider = data.get("embedding_provider")
795 chunk_size = data.get("chunk_size")
796 chunk_overlap = data.get("chunk_overlap")
797 collection_id = data.get("collection_id")
799 # Get new advanced settings (with defaults)
800 splitter_type = data.get("splitter_type", "recursive")
801 text_separators = data.get(
802 "text_separators", ["\n\n", "\n", ". ", " ", ""]
803 )
804 distance_metric = data.get("distance_metric", "cosine")
805 normalize_vectors = data.get("normalize_vectors", True)
806 index_type = data.get("index_type", "flat")
808 if not all(
809 [
810 embedding_model,
811 embedding_provider,
812 chunk_size,
813 chunk_overlap,
814 ]
815 ):
816 return jsonify(
817 {
818 "success": False,
819 "error": "All configuration parameters are required (embedding_model, embedding_provider, chunk_size, chunk_overlap)",
820 }
821 ), 400
823 # Save settings to database
824 settings = get_settings_manager()
825 settings.set_setting("local_search_embedding_model", embedding_model)
826 settings.set_setting(
827 "local_search_embedding_provider", embedding_provider
828 )
829 settings.set_setting("local_search_chunk_size", int(chunk_size))
830 settings.set_setting("local_search_chunk_overlap", int(chunk_overlap))
832 # Save new advanced settings
833 settings.set_setting("local_search_splitter_type", splitter_type)
834 # Convert list to JSON string for storage
835 if isinstance(text_separators, list):
836 text_separators_str = json_lib.dumps(text_separators)
837 else:
838 text_separators_str = text_separators
839 settings.set_setting(
840 "local_search_text_separators", text_separators_str
841 )
842 settings.set_setting("local_search_distance_metric", distance_metric)
843 settings.set_setting(
844 "local_search_normalize_vectors", bool(normalize_vectors)
845 )
846 settings.set_setting("local_search_index_type", index_type)
848 # If collection_id is provided, update that collection's configuration
849 if collection_id:
850 # Create new RAG service with new configuration
851 with LibraryRAGService(
852 username=session["username"],
853 embedding_model=embedding_model,
854 embedding_provider=embedding_provider,
855 chunk_size=int(chunk_size),
856 chunk_overlap=int(chunk_overlap),
857 splitter_type=splitter_type,
858 text_separators=text_separators
859 if isinstance(text_separators, list)
860 else json_lib.loads(text_separators),
861 distance_metric=distance_metric,
862 normalize_vectors=normalize_vectors,
863 index_type=index_type,
864 ) as new_rag_service:
865 # Get or create new index with this configuration
866 rag_index = new_rag_service._get_or_create_rag_index(
867 collection_id
868 )
870 return jsonify(
871 {
872 "success": True,
873 "message": "Configuration updated for collection. You can now index documents with the new settings.",
874 "index_hash": rag_index.index_hash,
875 }
876 )
877 else:
878 # Just saving default settings without updating a specific collection
879 return jsonify(
880 {
881 "success": True,
882 "message": "Default embedding settings saved successfully. New collections will use these settings.",
883 }
884 )
886 except Exception as e:
887 return handle_api_error("configuring RAG", e)
890@rag_bp.route("/api/rag/documents", methods=["GET"])
891@login_required
892def get_documents():
893 """Get library documents with their RAG status for the default Library collection (paginated)."""
894 from ...database.session_context import get_user_db_session
895 from ...database.library_init import get_default_library_id
897 try:
898 # Get pagination parameters
899 page = request.args.get("page", 1, type=int)
900 per_page = request.args.get("per_page", 50, type=int)
901 filter_type = request.args.get(
902 "filter", "all"
903 ) # all, indexed, unindexed
905 # Validate pagination parameters
906 page = max(1, page)
907 per_page = min(max(10, per_page), 100) # Limit between 10-100
909 # Close current thread's session to force fresh connection
910 from ...database.thread_local_session import cleanup_current_thread
912 cleanup_current_thread()
914 username = session["username"]
916 # Get collection_id from request or use default Library collection
917 collection_id = request.args.get("collection_id")
918 if not collection_id:
919 collection_id = get_default_library_id(username)
921 logger.info(
922 f"Getting documents for collection_id: {collection_id}, filter: {filter_type}, page: {page}"
923 )
925 with get_user_db_session(username) as db_session:
926 # Expire all cached objects to ensure we get fresh data from DB
927 db_session.expire_all()
929 # Import RagDocumentStatus model
930 from ...database.models.library import RagDocumentStatus
932 # Build base query - join Document with DocumentCollection for the collection
933 # LEFT JOIN with rag_document_status to check indexed status
934 query = (
935 db_session.query(
936 Document, DocumentCollection, RagDocumentStatus
937 )
938 .join(
939 DocumentCollection,
940 (DocumentCollection.document_id == Document.id)
941 & (DocumentCollection.collection_id == collection_id),
942 )
943 .outerjoin(
944 RagDocumentStatus,
945 (RagDocumentStatus.document_id == Document.id)
946 & (RagDocumentStatus.collection_id == collection_id),
947 )
948 )
950 logger.debug(f"Base query for collection {collection_id}: {query}")
952 # Apply filters based on rag_document_status existence
953 if filter_type == "indexed":
954 query = query.filter(RagDocumentStatus.document_id.isnot(None))
955 elif filter_type == "unindexed":
956 # Documents in collection but not indexed yet
957 query = query.filter(RagDocumentStatus.document_id.is_(None))
959 # Get total count before pagination
960 total_count = query.count()
961 logger.info(
962 f"Found {total_count} total documents for collection {collection_id} with filter {filter_type}"
963 )
965 # Apply pagination
966 results = (
967 query.order_by(Document.created_at.desc())
968 .limit(per_page)
969 .offset((page - 1) * per_page)
970 .all()
971 )
973 documents = [
974 {
975 "id": doc.id,
976 "title": doc.title,
977 "original_url": doc.original_url,
978 "rag_indexed": rag_status is not None,
979 "chunk_count": rag_status.chunk_count if rag_status else 0,
980 "created_at": doc.created_at.isoformat()
981 if doc.created_at
982 else None,
983 }
984 for doc, doc_collection, rag_status in results
985 ]
987 # Debug logging to help diagnose indexing status issues
988 indexed_count = sum(1 for d in documents if d["rag_indexed"])
990 # Additional debug: check rag_document_status for this collection
991 all_indexed_statuses = (
992 db_session.query(RagDocumentStatus)
993 .filter_by(collection_id=collection_id)
994 .all()
995 )
996 logger.info(
997 f"rag_document_status table shows: {len(all_indexed_statuses)} documents indexed for collection {collection_id}"
998 )
1000 logger.info(
1001 f"Returning {len(documents)} documents on page {page}: "
1002 f"{indexed_count} indexed, {len(documents) - indexed_count} not indexed"
1003 )
1005 return jsonify(
1006 {
1007 "success": True,
1008 "documents": documents,
1009 "pagination": {
1010 "page": page,
1011 "per_page": per_page,
1012 "total": total_count,
1013 "pages": (total_count + per_page - 1) // per_page,
1014 },
1015 }
1016 )
1017 except Exception as e:
1018 return handle_api_error("getting documents", e)
1021@rag_bp.route("/api/rag/index-local", methods=["GET"])
1022@login_required
1023def index_local_library():
1024 """Index documents from a local folder with Server-Sent Events progress."""
1025 folder_path = request.args.get("path")
1026 file_patterns = request.args.get(
1027 "patterns", "*.pdf,*.txt,*.md,*.html"
1028 ).split(",")
1029 recursive = request.args.get("recursive", "true").lower() == "true"
1031 if not folder_path:
1032 return jsonify({"success": False, "error": "Path is required"}), 400
1034 # Validate and sanitize the path to prevent traversal attacks
1035 try:
1036 validated_path = PathValidator.validate_local_filesystem_path(
1037 folder_path
1038 )
1039 # Re-sanitize for static analyzer recognition (CodeQL)
1040 path = PathValidator.sanitize_for_filesystem_ops(validated_path)
1041 except ValueError:
1042 logger.warning(f"Path validation failed for '{folder_path}'")
1043 return jsonify({"success": False, "error": "Invalid path"}), 400
1045 # Check path exists and is a directory
1046 if not path.exists():
1047 return jsonify({"success": False, "error": "Path does not exist"}), 400
1048 if not path.is_dir(): 1048 ↛ 1054line 1048 didn't jump to line 1054 because the condition on line 1048 was always true
1049 return jsonify(
1050 {"success": False, "error": "Path is not a directory"}
1051 ), 400
1053 # Create RAG service in request context
1054 rag_service = get_rag_service()
1056 def generate():
1057 """Generator function for SSE progress updates."""
1058 try:
1059 # Send initial status
1060 yield f"data: {json.dumps({'type': 'start', 'message': f'Scanning folder: {path}'})}\n\n"
1062 # Find all matching files
1063 files_to_index = []
1064 for pattern in file_patterns:
1065 pattern = pattern.strip()
1066 if recursive:
1067 search_pattern = str(path / "**" / pattern)
1068 else:
1069 search_pattern = str(path / pattern)
1071 matching_files = glob.glob(search_pattern, recursive=recursive)
1072 files_to_index.extend(matching_files)
1074 # Remove duplicates and sort
1075 files_to_index = sorted(set(files_to_index))
1077 if not files_to_index:
1078 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No matching files found'}})}\n\n"
1079 return
1081 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []}
1082 total = len(files_to_index)
1084 # Index each file
1085 for idx, file_path in enumerate(files_to_index, 1):
1086 file_name = Path(file_path).name
1088 # Send progress update
1089 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': file_name, 'percent': int((idx / total) * 100)})}\n\n"
1091 try:
1092 # Index the file directly using RAG service
1093 result = rag_service.index_local_file(file_path)
1095 if result.get("status") == "success":
1096 results["successful"] += 1
1097 elif result.get("status") == "skipped":
1098 results["skipped"] += 1
1099 else:
1100 results["failed"] += 1
1101 results["errors"].append(
1102 {
1103 "file": file_name,
1104 "error": result.get("error", "Unknown error"),
1105 }
1106 )
1107 except Exception:
1108 results["failed"] += 1
1109 results["errors"].append(
1110 {"file": file_name, "error": "Failed to index file"}
1111 )
1112 logger.exception(f"Error indexing file {file_path}")
1114 # Send completion status
1115 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n"
1117 logger.info(
1118 f"Local library indexing complete for {path}: "
1119 f"{results['successful']} successful, "
1120 f"{results['skipped']} skipped, "
1121 f"{results['failed']} failed"
1122 )
1124 except Exception:
1125 logger.exception("Error in local library indexing")
1126 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n"
1128 return Response(
1129 stream_with_context(generate()), mimetype="text/event-stream"
1130 )
1133# Collection Management Routes
1136@rag_bp.route("/api/collections", methods=["GET"])
1137@login_required
1138def get_collections():
1139 """Get all document collections for the current user."""
1140 from ...database.session_context import get_user_db_session
1142 try:
1143 username = session["username"]
1144 with get_user_db_session(username) as db_session:
1145 # No need to filter by username - each user has their own database
1146 collections = db_session.query(Collection).all()
1148 result = []
1149 for coll in collections:
1150 collection_data = {
1151 "id": coll.id,
1152 "name": coll.name,
1153 "description": coll.description,
1154 "created_at": coll.created_at.isoformat()
1155 if coll.created_at
1156 else None,
1157 "collection_type": coll.collection_type,
1158 "is_default": coll.is_default
1159 if hasattr(coll, "is_default")
1160 else False,
1161 "document_count": len(coll.document_links)
1162 if hasattr(coll, "document_links")
1163 else 0,
1164 "folder_count": len(coll.linked_folders)
1165 if hasattr(coll, "linked_folders")
1166 else 0,
1167 }
1169 # Include embedding metadata if available
1170 if coll.embedding_model:
1171 collection_data["embedding"] = {
1172 "model": coll.embedding_model,
1173 "provider": coll.embedding_model_type.value
1174 if coll.embedding_model_type
1175 else None,
1176 "dimension": coll.embedding_dimension,
1177 "chunk_size": coll.chunk_size,
1178 "chunk_overlap": coll.chunk_overlap,
1179 }
1180 else:
1181 collection_data["embedding"] = None
1183 result.append(collection_data)
1185 return jsonify({"success": True, "collections": result})
1186 except Exception as e:
1187 return handle_api_error("getting collections", e)
1190@rag_bp.route("/api/collections", methods=["POST"])
1191@login_required
1192@require_json_body(error_format="success")
1193def create_collection():
1194 """Create a new document collection."""
1195 from ...database.session_context import get_user_db_session
1197 try:
1198 data = request.get_json()
1199 name = data.get("name", "").strip()
1200 description = data.get("description", "").strip()
1201 collection_type = data.get("type", "user_uploads")
1203 if not name:
1204 return jsonify({"success": False, "error": "Name is required"}), 400
1206 username = session["username"]
1207 with get_user_db_session(username) as db_session:
1208 # Check if collection with this name already exists in this user's database
1209 existing = db_session.query(Collection).filter_by(name=name).first()
1211 if existing:
1212 return jsonify(
1213 {
1214 "success": False,
1215 "error": f"Collection '{name}' already exists",
1216 }
1217 ), 400
1219 # Create new collection (no username needed - each user has their own DB)
1220 # Note: created_at uses default=utcnow() in the model, so we don't need to set it manually
1221 collection = Collection(
1222 id=str(uuid.uuid4()), # Generate UUID for collection
1223 name=name,
1224 description=description,
1225 collection_type=collection_type,
1226 )
1228 db_session.add(collection)
1229 db_session.commit()
1231 return jsonify(
1232 {
1233 "success": True,
1234 "collection": {
1235 "id": collection.id,
1236 "name": collection.name,
1237 "description": collection.description,
1238 "created_at": collection.created_at.isoformat(),
1239 "collection_type": collection.collection_type,
1240 },
1241 }
1242 )
1243 except Exception as e:
1244 return handle_api_error("creating collection", e)
1247@rag_bp.route("/api/collections/<string:collection_id>", methods=["PUT"])
1248@login_required
1249@require_json_body(error_format="success")
1250def update_collection(collection_id):
1251 """Update a collection's details."""
1252 from ...database.session_context import get_user_db_session
1254 try:
1255 data = request.get_json()
1256 name = data.get("name", "").strip()
1257 description = data.get("description", "").strip()
1259 username = session["username"]
1260 with get_user_db_session(username) as db_session:
1261 # No need to filter by username - each user has their own database
1262 collection = (
1263 db_session.query(Collection).filter_by(id=collection_id).first()
1264 )
1266 if not collection:
1267 return jsonify(
1268 {"success": False, "error": "Collection not found"}
1269 ), 404
1271 if name:
1272 # Check if new name conflicts with existing collection
1273 existing = (
1274 db_session.query(Collection)
1275 .filter(
1276 Collection.name == name,
1277 Collection.id != collection_id,
1278 )
1279 .first()
1280 )
1282 if existing:
1283 return jsonify(
1284 {
1285 "success": False,
1286 "error": f"Collection '{name}' already exists",
1287 }
1288 ), 400
1290 collection.name = name
1292 if description is not None: # Allow empty description 1292 ↛ 1295line 1292 didn't jump to line 1295 because the condition on line 1292 was always true
1293 collection.description = description
1295 db_session.commit()
1297 return jsonify(
1298 {
1299 "success": True,
1300 "collection": {
1301 "id": collection.id,
1302 "name": collection.name,
1303 "description": collection.description,
1304 "created_at": collection.created_at.isoformat()
1305 if collection.created_at
1306 else None,
1307 "collection_type": collection.collection_type,
1308 },
1309 }
1310 )
1311 except Exception as e:
1312 return handle_api_error("updating collection", e)
1315@rag_bp.route(
1316 "/api/collections/<string:collection_id>/upload", methods=["POST"]
1317)
1318@login_required
1319@upload_rate_limit_user
1320@upload_rate_limit_ip
1321def upload_to_collection(collection_id):
1322 """Upload files to a collection."""
1323 from ...database.session_context import get_user_db_session
1324 from ...security import sanitize_filename, UnsafeFilenameError
1325 from pathlib import Path
1326 import hashlib
1327 import uuid
1328 from ..services.pdf_storage_manager import PDFStorageManager
1330 try:
1331 if "files" not in request.files:
1332 return jsonify(
1333 {"success": False, "error": "No files provided"}
1334 ), 400
1336 files = request.files.getlist("files")
1337 if not files: 1337 ↛ 1338line 1337 didn't jump to line 1338 because the condition on line 1337 was never true
1338 return jsonify(
1339 {"success": False, "error": "No files selected"}
1340 ), 400
1342 username = session["username"]
1343 with get_user_db_session(username) as db_session:
1344 # Verify collection exists in this user's database
1345 collection = (
1346 db_session.query(Collection).filter_by(id=collection_id).first()
1347 )
1349 if not collection:
1350 return jsonify(
1351 {"success": False, "error": "Collection not found"}
1352 ), 404
1354 # Get PDF storage mode from form data, falling back to user's setting
1355 settings = get_settings_manager()
1356 default_pdf_storage = settings.get_setting(
1357 "research_library.upload_pdf_storage", "none"
1358 )
1359 pdf_storage = request.form.get("pdf_storage", default_pdf_storage)
1360 if pdf_storage not in ("database", "none"): 1360 ↛ 1363line 1360 didn't jump to line 1363 because the condition on line 1360 was never true
1361 # Security: user uploads can only use database (encrypted) or none (text-only)
1362 # Filesystem storage is not allowed for user uploads
1363 pdf_storage = "none"
1365 # Initialize PDF storage manager if storing PDFs in database
1366 pdf_storage_manager = None
1367 if pdf_storage == "database":
1368 library_root = settings.get_setting(
1369 "research_library.storage_path",
1370 str(get_library_directory()),
1371 )
1372 library_root = str(
1373 Path(os.path.expandvars(library_root))
1374 .expanduser()
1375 .resolve()
1376 )
1377 pdf_storage_manager = PDFStorageManager(
1378 library_root=Path(library_root), storage_mode="database"
1379 )
1380 logger.info("PDF storage mode: database (encrypted)")
1381 else:
1382 logger.info("PDF storage mode: none (text-only)")
1384 uploaded_files = []
1385 errors = []
1387 for file in files:
1388 if not file.filename:
1389 continue
1391 try:
1392 filename = sanitize_filename(file.filename)
1393 except UnsafeFilenameError:
1394 errors.append(
1395 {
1396 "filename": "rejected",
1397 "error": "Invalid or unsafe filename",
1398 }
1399 )
1400 continue
1402 try:
1403 # Read file content
1404 file_content = file.read()
1405 file.seek(0) # Reset for potential re-reading
1407 # Calculate file hash for deduplication
1408 file_hash = hashlib.sha256(file_content).hexdigest()
1410 # Check if document already exists
1411 existing_doc = (
1412 db_session.query(Document)
1413 .filter_by(document_hash=file_hash)
1414 .first()
1415 )
1417 if existing_doc:
1418 # Document exists, check if we can upgrade to include PDF
1419 pdf_upgraded = False
1420 if (
1421 pdf_storage == "database"
1422 and pdf_storage_manager is not None
1423 ):
1424 # NOTE: Only the PDF magic-byte check is needed here.
1425 # File count validation is already handled by Flask's MAX_CONTENT_LENGTH.
1426 # Filename sanitization already happens via sanitize_filename() above.
1427 # See PR #3145 review for details.
1428 if file_content[:4] != b"%PDF": 1428 ↛ 1429line 1428 didn't jump to line 1429 because the condition on line 1428 was never true
1429 logger.debug(
1430 "Skipping PDF upgrade for {}: not a PDF file",
1431 filename,
1432 )
1433 else:
1434 pdf_upgraded = (
1435 pdf_storage_manager.upgrade_to_pdf(
1436 document=existing_doc,
1437 pdf_content=file_content,
1438 session=db_session,
1439 )
1440 )
1442 # Check if already in collection
1443 existing_link = (
1444 db_session.query(DocumentCollection)
1445 .filter_by(
1446 document_id=existing_doc.id,
1447 collection_id=collection_id,
1448 )
1449 .first()
1450 )
1452 if not existing_link:
1453 # Add to collection
1454 collection_link = DocumentCollection(
1455 document_id=existing_doc.id,
1456 collection_id=collection_id,
1457 indexed=False,
1458 chunk_count=0,
1459 )
1460 db_session.add(collection_link)
1461 status = "added_to_collection"
1462 if pdf_upgraded:
1463 status = "added_to_collection_pdf_upgraded"
1464 uploaded_files.append(
1465 {
1466 "filename": existing_doc.filename,
1467 "status": status,
1468 "id": existing_doc.id,
1469 "pdf_upgraded": pdf_upgraded,
1470 }
1471 )
1472 else:
1473 status = "already_in_collection"
1474 if pdf_upgraded:
1475 status = "pdf_upgraded"
1476 uploaded_files.append(
1477 {
1478 "filename": existing_doc.filename,
1479 "status": status,
1480 "id": existing_doc.id,
1481 "pdf_upgraded": pdf_upgraded,
1482 }
1483 )
1484 else:
1485 # Create new document
1486 from ...document_loaders import (
1487 extract_text_from_bytes,
1488 is_extension_supported,
1489 )
1491 file_extension = Path(filename).suffix.lower()
1493 # Validate extension is supported before extraction
1494 if not is_extension_supported(file_extension):
1495 errors.append(
1496 {
1497 "filename": filename,
1498 "error": f"Unsupported format: {file_extension}",
1499 }
1500 )
1501 continue
1503 # Use file_type without leading dot for storage
1504 file_type = (
1505 file_extension[1:]
1506 if file_extension.startswith(".")
1507 else file_extension
1508 )
1510 # Extract text using document_loaders module
1511 extracted_text = extract_text_from_bytes(
1512 file_content, file_extension, filename
1513 )
1515 # Clean the extracted text to remove surrogate characters
1516 if extracted_text:
1517 from ...text_processing import remove_surrogates
1519 extracted_text = remove_surrogates(extracted_text)
1521 if not extracted_text:
1522 errors.append(
1523 {
1524 "filename": filename,
1525 "error": f"Could not extract text from {file_type} file",
1526 }
1527 )
1528 logger.warning(
1529 f"Skipping file {filename} - no text could be extracted"
1530 )
1531 continue
1533 # Get or create the user_upload source type
1534 logger.info(
1535 f"Getting or creating user_upload source type for {filename}"
1536 )
1537 source_type = (
1538 db_session.query(SourceType)
1539 .filter_by(name="user_upload")
1540 .first()
1541 )
1542 if not source_type: 1542 ↛ 1543line 1542 didn't jump to line 1543 because the condition on line 1542 was never true
1543 logger.info("Creating new user_upload source type")
1544 source_type = SourceType(
1545 id=str(uuid.uuid4()),
1546 name="user_upload",
1547 display_name="User Upload",
1548 description="Documents uploaded by users",
1549 icon="fas fa-upload",
1550 )
1551 db_session.add(source_type)
1552 db_session.flush()
1553 logger.info(
1554 f"Created source type with ID: {source_type.id}"
1555 )
1556 else:
1557 logger.info(
1558 f"Found existing source type with ID: {source_type.id}"
1559 )
1561 # Create document with extracted text (no username needed - in user's own database)
1562 # Note: uploaded_at uses default=utcnow() in the model, so we don't need to set it manually
1563 doc_id = str(uuid.uuid4())
1564 logger.info(
1565 f"Creating document {doc_id} for {filename}"
1566 )
1568 # Determine storage mode and file_path
1569 store_pdf_in_db = (
1570 pdf_storage == "database"
1571 and file_type == "pdf"
1572 and pdf_storage_manager is not None
1573 )
1575 new_doc = Document(
1576 id=doc_id,
1577 source_type_id=source_type.id,
1578 filename=filename,
1579 document_hash=file_hash,
1580 file_size=len(file_content),
1581 file_type=file_type,
1582 text_content=extracted_text, # Always store extracted text
1583 file_path=None
1584 if store_pdf_in_db
1585 else FILE_PATH_TEXT_ONLY,
1586 storage_mode="database"
1587 if store_pdf_in_db
1588 else "none",
1589 )
1590 db_session.add(new_doc)
1591 db_session.flush() # Get the ID
1592 logger.info(
1593 f"Document {new_doc.id} created successfully"
1594 )
1596 # Store PDF in encrypted database if requested
1597 pdf_stored = False
1598 if store_pdf_in_db:
1599 try:
1600 pdf_storage_manager.save_pdf(
1601 pdf_content=file_content,
1602 document=new_doc,
1603 session=db_session,
1604 filename=filename,
1605 )
1606 pdf_stored = True
1607 logger.info(
1608 f"PDF stored in encrypted database for {filename}"
1609 )
1610 except Exception:
1611 logger.exception(
1612 f"Failed to store PDF in database for {filename}"
1613 )
1614 # Continue without PDF storage - text is still saved
1616 # Add to collection
1617 collection_link = DocumentCollection(
1618 document_id=new_doc.id,
1619 collection_id=collection_id,
1620 indexed=False,
1621 chunk_count=0,
1622 )
1623 db_session.add(collection_link)
1625 uploaded_files.append(
1626 {
1627 "filename": filename,
1628 "status": "uploaded",
1629 "id": new_doc.id,
1630 "text_length": len(extracted_text),
1631 "pdf_stored": pdf_stored,
1632 }
1633 )
1635 except Exception:
1636 errors.append(
1637 {
1638 "filename": filename,
1639 "error": "Failed to upload file",
1640 }
1641 )
1642 logger.exception(f"Error uploading file {filename}")
1644 db_session.commit()
1646 # Trigger auto-indexing for successfully uploaded documents
1647 document_ids = [
1648 f["id"]
1649 for f in uploaded_files
1650 if f.get("status") in ("uploaded", "added_to_collection")
1651 ]
1652 if document_ids:
1653 from ...database.session_passwords import session_password_store
1655 session_id = session.get("session_id")
1656 db_password = session_password_store.get_session_password(
1657 username, session_id
1658 )
1659 if db_password:
1660 trigger_auto_index(
1661 document_ids, collection_id, username, db_password
1662 )
1664 return jsonify(
1665 {
1666 "success": True,
1667 "uploaded": uploaded_files,
1668 "errors": errors,
1669 "summary": {
1670 "total": len(files),
1671 "successful": len(uploaded_files),
1672 "failed": len(errors),
1673 },
1674 }
1675 )
1677 except Exception as e:
1678 return handle_api_error("uploading files", e)
1681@rag_bp.route(
1682 "/api/collections/<string:collection_id>/documents", methods=["GET"]
1683)
1684@login_required
1685def get_collection_documents(collection_id):
1686 """Get all documents in a collection."""
1687 from ...database.session_context import get_user_db_session
1689 try:
1690 username = session["username"]
1691 with get_user_db_session(username) as db_session:
1692 # Verify collection exists in this user's database
1693 collection = (
1694 db_session.query(Collection).filter_by(id=collection_id).first()
1695 )
1697 if not collection:
1698 return jsonify(
1699 {"success": False, "error": "Collection not found"}
1700 ), 404
1702 # Get documents through junction table
1703 doc_links = (
1704 db_session.query(DocumentCollection, Document)
1705 .join(Document)
1706 .filter(DocumentCollection.collection_id == collection_id)
1707 .all()
1708 )
1710 documents = []
1711 for link, doc in doc_links:
1712 # Check if PDF file is stored
1713 has_pdf = bool(
1714 doc.file_path and doc.file_path not in FILE_PATH_SENTINELS
1715 )
1716 has_text_db = bool(doc.text_content)
1718 # Use title if available, otherwise filename
1719 display_title = doc.title or doc.filename or "Untitled"
1721 # Get source type name
1722 source_type_name = (
1723 doc.source_type.name if doc.source_type else "unknown"
1724 )
1726 # Check if document is in other collections
1727 other_collections_count = (
1728 db_session.query(DocumentCollection)
1729 .filter(
1730 DocumentCollection.document_id == doc.id,
1731 DocumentCollection.collection_id != collection_id,
1732 )
1733 .count()
1734 )
1736 documents.append(
1737 {
1738 "id": doc.id,
1739 "filename": display_title,
1740 "title": display_title,
1741 "file_type": doc.file_type,
1742 "file_size": doc.file_size,
1743 "uploaded_at": doc.created_at.isoformat()
1744 if doc.created_at
1745 else None,
1746 "indexed": link.indexed,
1747 "chunk_count": link.chunk_count,
1748 "last_indexed_at": link.last_indexed_at.isoformat()
1749 if link.last_indexed_at
1750 else None,
1751 "has_pdf": has_pdf,
1752 "has_text_db": has_text_db,
1753 "source_type": source_type_name,
1754 "in_other_collections": other_collections_count > 0,
1755 "other_collections_count": other_collections_count,
1756 }
1757 )
1759 # Get index file size if available
1760 index_file_size = None
1761 index_file_size_bytes = None
1762 collection_name = f"collection_{collection_id}"
1763 rag_index = (
1764 db_session.query(RAGIndex)
1765 .filter_by(collection_name=collection_name)
1766 .first()
1767 )
1768 if rag_index and rag_index.index_path:
1769 from pathlib import Path
1771 index_path = Path(rag_index.index_path)
1772 if index_path.exists(): 1772 ↛ 1783line 1772 didn't jump to line 1783 because the condition on line 1772 was always true
1773 size_bytes = index_path.stat().st_size
1774 index_file_size_bytes = size_bytes
1775 # Format as human-readable
1776 if size_bytes < 1024:
1777 index_file_size = f"{size_bytes} B"
1778 elif size_bytes < 1024 * 1024:
1779 index_file_size = f"{size_bytes / 1024:.1f} KB"
1780 else:
1781 index_file_size = f"{size_bytes / (1024 * 1024):.1f} MB"
1783 return jsonify(
1784 {
1785 "success": True,
1786 "collection": {
1787 "id": collection.id,
1788 "name": collection.name,
1789 "description": collection.description,
1790 "embedding_model": collection.embedding_model,
1791 "embedding_model_type": collection.embedding_model_type.value
1792 if collection.embedding_model_type
1793 else None,
1794 "embedding_dimension": collection.embedding_dimension,
1795 "chunk_size": collection.chunk_size,
1796 "chunk_overlap": collection.chunk_overlap,
1797 # Advanced settings
1798 "splitter_type": collection.splitter_type,
1799 "distance_metric": collection.distance_metric,
1800 "index_type": collection.index_type,
1801 "normalize_vectors": collection.normalize_vectors,
1802 # Index file info
1803 "index_file_size": index_file_size,
1804 "index_file_size_bytes": index_file_size_bytes,
1805 "collection_type": collection.collection_type,
1806 },
1807 "documents": documents,
1808 }
1809 )
1811 except Exception as e:
1812 return handle_api_error("getting collection documents", e)
1815@rag_bp.route("/api/collections/<string:collection_id>/index", methods=["GET"])
1816@login_required
1817def index_collection(collection_id):
1818 """Index all documents in a collection with Server-Sent Events progress."""
1819 from ...database.session_context import get_user_db_session
1820 from ...database.session_passwords import session_password_store
1822 force_reindex = request.args.get("force_reindex", "false").lower() == "true"
1823 username = session["username"]
1824 session_id = session.get("session_id")
1826 logger.info(f"Starting index_collection, force_reindex={force_reindex}")
1828 # Get password for thread access to encrypted database
1829 db_password = None
1830 if session_id: 1830 ↛ 1836line 1830 didn't jump to line 1836 because the condition on line 1830 was always true
1831 db_password = session_password_store.get_session_password(
1832 username, session_id
1833 )
1835 # Create RAG service — on force reindex use current default model
1836 rag_service = get_rag_service(collection_id, use_defaults=force_reindex)
1837 logger.info(
1838 f"RAG service created: provider={rag_service.embedding_provider}"
1839 )
1841 def generate():
1842 """Generator for SSE progress updates."""
1843 logger.info("SSE generator started")
1844 try:
1845 with get_user_db_session(username, db_password) as db_session:
1846 # Verify collection exists in this user's database
1847 collection = (
1848 db_session.query(Collection)
1849 .filter_by(id=collection_id)
1850 .first()
1851 )
1853 if not collection:
1854 yield f"data: {json.dumps({'type': 'error', 'error': 'Collection not found'})}\n\n"
1855 return
1857 # Store embedding metadata on first index or force reindex
1858 if collection.embedding_model is None or force_reindex:
1859 # Get embedding dimension from the embedding manager
1860 embedding_dim = None
1861 try:
1862 # Try to get dimension from the embedding manager's provider
1863 if hasattr(rag_service.embedding_manager, "provider"): 1863 ↛ 1864line 1863 didn't jump to line 1864 because the condition on line 1863 was never true
1864 provider = rag_service.embedding_manager.provider
1865 if hasattr(provider, "embedding_dimension"):
1866 embedding_dim = provider.embedding_dimension
1867 except Exception:
1868 logger.warning("Could not get embedding dimension")
1870 # Store the embedding configuration used for this collection
1871 collection.embedding_model = rag_service.embedding_model
1872 collection.embedding_model_type = EmbeddingProvider(
1873 rag_service.embedding_provider
1874 )
1875 collection.embedding_dimension = embedding_dim
1876 collection.chunk_size = rag_service.chunk_size
1877 collection.chunk_overlap = rag_service.chunk_overlap
1878 # Store advanced settings
1879 collection.splitter_type = rag_service.splitter_type
1880 collection.text_separators = rag_service.text_separators
1881 collection.distance_metric = rag_service.distance_metric
1882 # Ensure normalize_vectors is a proper boolean for database
1883 collection.normalize_vectors = bool(
1884 rag_service.normalize_vectors
1885 )
1886 collection.index_type = rag_service.index_type
1887 db_session.commit()
1888 logger.info(
1889 f"Stored embedding metadata for collection: provider={rag_service.embedding_provider}"
1890 )
1892 # Get documents to index
1893 query = (
1894 db_session.query(DocumentCollection, Document)
1895 .join(Document)
1896 .filter(DocumentCollection.collection_id == collection_id)
1897 )
1899 if not force_reindex: 1899 ↛ 1908line 1899 didn't jump to line 1908 because the condition on line 1899 was always true
1900 # IMPORTANT: DO NOT CHANGE THIS LINE! DO NOT REMOVE THIS COMMENT!
1901 # We MUST use `== False` instead of `not DocumentCollection.indexed`
1902 # The Python `not` operator does NOT work correctly in SQLAlchemy filters.
1903 # Using `not` will cause the query to return NO results (zero documents).
1904 # SQLAlchemy requires explicit comparison: `== False` or `== True`
1905 # This has been fixed multiple times - DO NOT change it back to `not`!
1906 query = query.filter(DocumentCollection.indexed == False) # noqa: E712
1908 doc_links = query.all()
1910 if not doc_links:
1911 logger.info("No documents to index in collection")
1912 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n"
1913 return
1915 total = len(doc_links)
1916 logger.info(f"Found {total} documents to index")
1917 results = {
1918 "successful": 0,
1919 "skipped": 0,
1920 "failed": 0,
1921 "errors": [],
1922 }
1924 yield f"data: {json.dumps({'type': 'start', 'message': f'Indexing {total} documents in collection: {collection.name}'})}\n\n"
1926 for idx, (link, doc) in enumerate(doc_links, 1):
1927 filename = doc.filename or doc.title or "Unknown"
1928 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': filename, 'percent': int((idx / total) * 100)})}\n\n"
1930 try:
1931 logger.debug(
1932 f"Indexing document {idx}/{total}: {filename}"
1933 )
1935 # Run index_document in a separate thread to allow sending SSE heartbeats.
1936 # This keeps the HTTP connection alive during long indexing operations,
1937 # preventing timeouts from proxy servers (nginx) and browsers.
1938 # The main thread periodically yields heartbeat comments while waiting.
1939 result_queue = queue.Queue()
1940 error_queue = queue.Queue()
1942 def index_in_thread():
1943 try:
1944 r = rag_service.index_document(
1945 document_id=doc.id,
1946 collection_id=collection_id,
1947 force_reindex=force_reindex,
1948 )
1949 result_queue.put(r)
1950 except Exception as ex:
1951 error_queue.put(ex)
1952 finally:
1953 try:
1954 from ...database.thread_local_session import (
1955 cleanup_current_thread,
1956 )
1958 cleanup_current_thread()
1959 except Exception:
1960 logger.debug(
1961 "best-effort thread-local DB session cleanup",
1962 exc_info=True,
1963 )
1965 thread = threading.Thread(target=index_in_thread)
1966 thread.start()
1968 # Send heartbeats while waiting for the thread to complete
1969 heartbeat_interval = 5 # seconds
1970 while thread.is_alive(): 1970 ↛ 1971line 1970 didn't jump to line 1971 because the condition on line 1970 was never true
1971 thread.join(timeout=heartbeat_interval)
1972 if thread.is_alive():
1973 # Send SSE comment as heartbeat (keeps connection alive)
1974 yield f": heartbeat {idx}/{total}\n\n"
1976 # Check for errors from thread
1977 if not error_queue.empty():
1978 raise error_queue.get() # noqa: TRY301 — re-raises thread exception for per-document error handling
1980 result = result_queue.get()
1981 logger.info(
1982 f"Indexed document {idx}/{total}: {filename} - status={result.get('status')}"
1983 )
1985 if result.get("status") == "success":
1986 results["successful"] += 1
1987 # DocumentCollection status is already updated in index_document
1988 # No need to update link here
1989 elif result.get("status") == "skipped": 1989 ↛ 1992line 1989 didn't jump to line 1992 because the condition on line 1989 was always true
1990 results["skipped"] += 1
1991 else:
1992 results["failed"] += 1
1993 error_msg = result.get("error", "Unknown error")
1994 results["errors"].append(
1995 {
1996 "filename": filename,
1997 "error": error_msg,
1998 }
1999 )
2000 logger.warning(
2001 f"Failed to index {filename} ({idx}/{total}): {error_msg}"
2002 )
2003 except Exception as e:
2004 results["failed"] += 1
2005 error_msg = str(e) or "Failed to index document"
2006 results["errors"].append(
2007 {
2008 "filename": filename,
2009 "error": error_msg,
2010 }
2011 )
2012 logger.exception(
2013 f"Exception indexing document {filename} ({idx}/{total})"
2014 )
2015 # Send error update to client so they know indexing is continuing
2016 yield f"data: {json.dumps({'type': 'doc_error', 'filename': filename, 'error': error_msg})}\n\n"
2018 db_session.commit()
2019 # Ensure all changes are written to disk
2020 db_session.flush()
2022 logger.info(
2023 f"Indexing complete: {results['successful']} successful, {results['failed']} failed, {results['skipped']} skipped"
2024 )
2025 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n"
2026 logger.info("SSE generator finished successfully")
2028 except Exception:
2029 logger.exception("Error in collection indexing")
2030 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n"
2032 response = Response(
2033 stream_with_context(generate()), mimetype="text/event-stream"
2034 )
2035 # Prevent buffering for proper SSE streaming
2036 response.headers["Cache-Control"] = "no-cache, no-transform"
2037 response.headers["Connection"] = "keep-alive"
2038 response.headers["X-Accel-Buffering"] = "no"
2039 return response
2042# =============================================================================
2043# Background Indexing Endpoints
2044# =============================================================================
2047def _get_rag_service_for_thread(
2048 collection_id: str,
2049 username: str,
2050 db_password: str,
2051 use_defaults: bool = False,
2052) -> LibraryRAGService:
2053 """
2054 Create RAG service for use in background threads (no Flask context).
2056 Delegates settings resolution to the shared rag_service_factory, then
2057 propagates db_password to the embedding manager for thread-safe DB access.
2058 """
2059 from ..services.rag_service_factory import (
2060 get_rag_service as _get_rag_service,
2061 )
2063 service = _get_rag_service(
2064 username,
2065 collection_id,
2066 use_defaults=use_defaults,
2067 db_password=db_password,
2068 )
2069 # The factory passes db_password to LibraryRAGService, but __init__ stores
2070 # it in the backing field (_db_password) without propagating to sub-managers.
2071 # Re-assign via the property setter to propagate to embedding_manager and
2072 # integrity_manager, which need it for thread-safe session access.
2073 service.db_password = db_password
2074 return service
2077def trigger_auto_index(
2078 document_ids: list[str],
2079 collection_id: str,
2080 username: str,
2081 db_password: str,
2082) -> None:
2083 """
2084 Trigger automatic RAG indexing for documents if auto-indexing is enabled.
2086 This function checks the auto_index_enabled setting and spawns a background
2087 thread to index the specified documents. It does not block the caller.
2089 Args:
2090 document_ids: List of document IDs to index
2091 collection_id: The collection to index into
2092 username: The username for database access
2093 db_password: The user's database password for thread-safe access
2094 """
2095 from ...database.session_context import get_user_db_session
2097 if not document_ids:
2098 logger.debug("No documents to auto-index")
2099 return
2101 # Check if auto-indexing is enabled
2102 try:
2103 with get_user_db_session(username, db_password) as db_session:
2104 settings = SettingsManager(db_session)
2105 auto_index_enabled = settings.get_bool_setting(
2106 "research_library.auto_index_enabled", True
2107 )
2109 if not auto_index_enabled:
2110 logger.debug("Auto-indexing is disabled, skipping")
2111 return
2112 except Exception:
2113 logger.exception(
2114 "Failed to check auto-index setting, skipping auto-index"
2115 )
2116 return
2118 logger.info(
2119 f"Auto-indexing {len(document_ids)} documents in collection {collection_id}"
2120 )
2122 # Submit to thread pool (bounded concurrency, prevents thread proliferation)
2123 executor = _get_auto_index_executor()
2124 executor.submit(
2125 _auto_index_documents_worker,
2126 document_ids,
2127 collection_id,
2128 username,
2129 db_password,
2130 )
2133@thread_cleanup
2134def _auto_index_documents_worker(
2135 document_ids: list[str],
2136 collection_id: str,
2137 username: str,
2138 db_password: str,
2139) -> None:
2140 """
2141 Background worker to index documents automatically.
2143 This is a simpler worker than _background_index_worker - it doesn't track
2144 progress via TaskMetadata since it's meant to be a lightweight auto-indexing
2145 operation.
2146 """
2148 try:
2149 # Create RAG service (thread-safe, no Flask context needed)
2150 with _get_rag_service_for_thread(
2151 collection_id, username, db_password
2152 ) as rag_service:
2153 indexed_count = 0
2154 for doc_id in document_ids:
2155 try:
2156 result = rag_service.index_document(
2157 doc_id, collection_id, force_reindex=False
2158 )
2159 if result.get("status") == "success":
2160 indexed_count += 1
2161 logger.debug(f"Auto-indexed document {doc_id}")
2162 elif result.get("status") == "skipped": 2162 ↛ 2154line 2162 didn't jump to line 2154 because the condition on line 2162 was always true
2163 logger.debug(
2164 f"Document {doc_id} already indexed, skipped"
2165 )
2166 except Exception:
2167 logger.exception(f"Failed to auto-index document {doc_id}")
2169 logger.info(
2170 f"Auto-indexing complete: {indexed_count}/{len(document_ids)} documents indexed"
2171 )
2173 except Exception:
2174 logger.exception("Auto-indexing worker failed")
2177@thread_cleanup
2178def _background_index_worker(
2179 task_id: str,
2180 collection_id: str,
2181 username: str,
2182 db_password: str,
2183 force_reindex: bool,
2184):
2185 """
2186 Background worker thread for indexing documents.
2187 Updates TaskMetadata with progress and checks for cancellation.
2188 """
2189 from ...database.session_context import get_user_db_session
2191 try:
2192 # Create RAG service (thread-safe, no Flask context needed)
2193 with _get_rag_service_for_thread(
2194 collection_id, username, db_password, use_defaults=force_reindex
2195 ) as rag_service:
2196 with get_user_db_session(username, db_password) as db_session:
2197 # Get collection
2198 collection = (
2199 db_session.query(Collection)
2200 .filter_by(id=collection_id)
2201 .first()
2202 )
2204 if not collection:
2205 _update_task_status(
2206 username,
2207 db_password,
2208 task_id,
2209 status="failed",
2210 error_message="Collection not found",
2211 )
2212 return
2214 # Store embedding metadata on first index or force reindex
2215 if collection.embedding_model is None or force_reindex:
2216 collection.embedding_model = rag_service.embedding_model
2217 collection.embedding_model_type = EmbeddingProvider(
2218 rag_service.embedding_provider
2219 )
2220 collection.chunk_size = rag_service.chunk_size
2221 collection.chunk_overlap = rag_service.chunk_overlap
2222 collection.splitter_type = rag_service.splitter_type
2223 collection.text_separators = rag_service.text_separators
2224 collection.distance_metric = rag_service.distance_metric
2225 collection.normalize_vectors = bool(
2226 rag_service.normalize_vectors
2227 )
2228 collection.index_type = rag_service.index_type
2229 db_session.commit()
2231 # Clean up old index data for a fresh rebuild.
2232 # This prevents mixed-model vectors if cancelled midway
2233 # and ensures accurate stats during partial reindex.
2234 if force_reindex:
2235 from ..deletion.utils.cascade_helper import CascadeHelper
2237 collection_name = f"collection_{collection_id}"
2239 # Delete all old document chunks from DB
2240 deleted_chunks = CascadeHelper.delete_collection_chunks(
2241 db_session, collection_name
2242 )
2243 logger.info(
2244 f"Cleared {deleted_chunks} old chunks for collection {collection_id}"
2245 )
2247 # Delete old FAISS index files (.faiss + .pkl) and RAGIndex records
2248 # RagDocumentStatus rows cascade-delete via FK ondelete="CASCADE"
2249 rag_result = (
2250 CascadeHelper.delete_rag_indices_for_collection(
2251 db_session, collection_name
2252 )
2253 )
2254 logger.info(
2255 f"Cleared old RAG indices for collection {collection_id}: {rag_result}"
2256 )
2258 # Mark all documents as unindexed
2259 db_session.query(DocumentCollection).filter_by(
2260 collection_id=collection_id
2261 ).update(
2262 {
2263 DocumentCollection.indexed: False,
2264 DocumentCollection.chunk_count: 0,
2265 }
2266 )
2267 db_session.commit()
2268 logger.info(
2269 f"Reset indexing state for collection {collection_id}"
2270 )
2272 # Get documents to index
2273 query = (
2274 db_session.query(DocumentCollection, Document)
2275 .join(Document)
2276 .filter(DocumentCollection.collection_id == collection_id)
2277 )
2279 if not force_reindex:
2280 query = query.filter(DocumentCollection.indexed == False) # noqa: E712
2282 doc_links = query.all()
2284 if not doc_links:
2285 _update_task_status(
2286 username,
2287 db_password,
2288 task_id,
2289 status="completed",
2290 progress_message="No documents to index",
2291 )
2292 return
2294 total = len(doc_links)
2295 results = {"successful": 0, "skipped": 0, "failed": 0}
2297 # Update task with total count
2298 _update_task_status(
2299 username,
2300 db_password,
2301 task_id,
2302 progress_total=total,
2303 progress_message=f"Indexing {total} documents",
2304 )
2306 for idx, (link, doc) in enumerate(doc_links, 1):
2307 # Check if cancelled
2308 if _is_task_cancelled(username, db_password, task_id):
2309 _update_task_status(
2310 username,
2311 db_password,
2312 task_id,
2313 status="cancelled",
2314 progress_message=f"Cancelled after {idx - 1}/{total} documents",
2315 )
2316 logger.info(f"Indexing task {task_id} was cancelled")
2317 return
2319 filename = doc.filename or doc.title or "Unknown"
2321 # Update progress with filename
2322 _update_task_status(
2323 username,
2324 db_password,
2325 task_id,
2326 progress_current=idx,
2327 progress_message=f"Indexing {idx}/{total}: {filename}",
2328 )
2330 try:
2331 result = rag_service.index_document(
2332 document_id=doc.id,
2333 collection_id=collection_id,
2334 force_reindex=force_reindex,
2335 )
2337 if result.get("status") == "success":
2338 results["successful"] += 1
2339 elif result.get("status") == "skipped":
2340 results["skipped"] += 1
2341 else:
2342 results["failed"] += 1
2344 except Exception:
2345 results["failed"] += 1
2346 logger.exception(
2347 f"Error indexing document {idx}/{total}"
2348 )
2350 db_session.commit()
2352 # Mark as completed
2353 _update_task_status(
2354 username,
2355 db_password,
2356 task_id,
2357 status="completed",
2358 progress_current=total,
2359 progress_message=f"Completed: {results['successful']} indexed, {results['failed']} failed, {results['skipped']} skipped",
2360 )
2361 logger.info(
2362 f"Background indexing task {task_id} completed: {results}"
2363 )
2365 except Exception as e:
2366 logger.exception(f"Background indexing task {task_id} failed")
2367 _update_task_status(
2368 username,
2369 db_password,
2370 task_id,
2371 status="failed",
2372 error_message=str(e),
2373 )
2376def _update_task_status(
2377 username: str,
2378 db_password: str,
2379 task_id: str,
2380 status: str = None,
2381 progress_current: int = None,
2382 progress_total: int = None,
2383 progress_message: str = None,
2384 error_message: str = None,
2385):
2386 """Update task metadata in the database."""
2387 from ...database.session_context import get_user_db_session
2389 try:
2390 with get_user_db_session(username, db_password) as db_session:
2391 task = (
2392 db_session.query(TaskMetadata)
2393 .filter_by(task_id=task_id)
2394 .first()
2395 )
2396 if task:
2397 if status is not None:
2398 task.status = status
2399 if status == "completed":
2400 task.completed_at = datetime.now(UTC)
2401 if progress_current is not None:
2402 task.progress_current = progress_current
2403 if progress_total is not None:
2404 task.progress_total = progress_total
2405 if progress_message is not None:
2406 task.progress_message = progress_message
2407 if error_message is not None:
2408 task.error_message = error_message
2409 db_session.commit()
2410 except Exception:
2411 logger.exception(f"Failed to update task status for {task_id}")
2414def _is_task_cancelled(username: str, db_password: str, task_id: str) -> bool:
2415 """Check if a task has been cancelled."""
2416 from ...database.session_context import get_user_db_session
2418 try:
2419 with get_user_db_session(username, db_password) as db_session:
2420 task = (
2421 db_session.query(TaskMetadata)
2422 .filter_by(task_id=task_id)
2423 .first()
2424 )
2425 return task and task.status == "cancelled"
2426 except Exception:
2427 logger.warning(
2428 "Could not check cancellation status for task {}", task_id
2429 )
2430 return False
2433@rag_bp.route(
2434 "/api/collections/<string:collection_id>/index/start", methods=["POST"]
2435)
2436@login_required
2437def start_background_index(collection_id):
2438 """Start background indexing for a collection."""
2439 from ...database.session_context import get_user_db_session
2440 from ...database.session_passwords import session_password_store
2442 username = session["username"]
2443 session_id = session.get("session_id")
2445 # Get password for thread access
2446 db_password = None
2447 if session_id: 2447 ↛ 2453line 2447 didn't jump to line 2453 because the condition on line 2447 was always true
2448 db_password = session_password_store.get_session_password(
2449 username, session_id
2450 )
2452 # Parse request body
2453 data = request.get_json() or {}
2454 force_reindex = data.get("force_reindex", False)
2456 try:
2457 with get_user_db_session(username, db_password) as db_session:
2458 # Check if there's already an active indexing task for this collection
2459 existing_task = (
2460 db_session.query(TaskMetadata)
2461 .filter(
2462 TaskMetadata.task_type == "indexing",
2463 TaskMetadata.status == "processing",
2464 )
2465 .first()
2466 )
2468 if existing_task:
2469 # Check if it's for this collection
2470 metadata = existing_task.metadata_json or {}
2471 if metadata.get("collection_id") == collection_id:
2472 return jsonify(
2473 {
2474 "success": False,
2475 "error": "Indexing is already in progress for this collection",
2476 "task_id": existing_task.task_id,
2477 }
2478 ), 409
2480 # Create new task
2481 task_id = str(uuid.uuid4())
2482 task = TaskMetadata(
2483 task_id=task_id,
2484 status="processing",
2485 task_type="indexing",
2486 created_at=datetime.now(UTC),
2487 started_at=datetime.now(UTC),
2488 progress_current=0,
2489 progress_total=0,
2490 progress_message="Starting indexing...",
2491 metadata_json={
2492 "collection_id": collection_id,
2493 "force_reindex": force_reindex,
2494 },
2495 )
2496 db_session.add(task)
2497 db_session.commit()
2499 # Start background thread
2500 thread = threading.Thread(
2501 target=_background_index_worker,
2502 args=(task_id, collection_id, username, db_password, force_reindex),
2503 daemon=True,
2504 )
2505 thread.start()
2507 logger.info(
2508 f"Started background indexing task {task_id} for collection {collection_id}"
2509 )
2511 return jsonify(
2512 {
2513 "success": True,
2514 "task_id": task_id,
2515 "message": "Indexing started in background",
2516 }
2517 )
2519 except Exception:
2520 logger.exception("Failed to start background indexing")
2521 return jsonify(
2522 {
2523 "success": False,
2524 "error": "Failed to start indexing. Please try again.",
2525 }
2526 ), 500
2529@rag_bp.route(
2530 "/api/collections/<string:collection_id>/index/status", methods=["GET"]
2531)
2532@limiter.exempt
2533@login_required
2534def get_index_status(collection_id):
2535 """Get the current indexing status for a collection."""
2536 from ...database.session_context import get_user_db_session
2537 from ...database.session_passwords import session_password_store
2539 username = session["username"]
2540 session_id = session.get("session_id")
2542 db_password = None
2543 if session_id: 2543 ↛ 2548line 2543 didn't jump to line 2548 because the condition on line 2543 was always true
2544 db_password = session_password_store.get_session_password(
2545 username, session_id
2546 )
2548 try:
2549 with get_user_db_session(username, db_password) as db_session:
2550 # Find the most recent indexing task for this collection
2551 task = (
2552 db_session.query(TaskMetadata)
2553 .filter(TaskMetadata.task_type == "indexing")
2554 .order_by(TaskMetadata.created_at.desc())
2555 .first()
2556 )
2558 if not task:
2559 return jsonify(
2560 {
2561 "status": "idle",
2562 "message": "No indexing task found",
2563 }
2564 )
2566 # Check if it's for this collection
2567 metadata = task.metadata_json or {}
2568 if metadata.get("collection_id") != collection_id:
2569 return jsonify(
2570 {
2571 "status": "idle",
2572 "message": "No indexing task for this collection",
2573 }
2574 )
2576 return jsonify(
2577 {
2578 "task_id": task.task_id,
2579 "status": task.status,
2580 "progress_current": task.progress_current or 0,
2581 "progress_total": task.progress_total or 0,
2582 "progress_message": task.progress_message,
2583 "error_message": task.error_message,
2584 "created_at": task.created_at.isoformat()
2585 if task.created_at
2586 else None,
2587 "completed_at": task.completed_at.isoformat()
2588 if task.completed_at
2589 else None,
2590 }
2591 )
2593 except Exception:
2594 logger.exception("Failed to get index status")
2595 return jsonify(
2596 {
2597 "status": "error",
2598 "error": "Failed to get indexing status. Please try again.",
2599 }
2600 ), 500
2603@rag_bp.route(
2604 "/api/collections/<string:collection_id>/index/cancel", methods=["POST"]
2605)
2606@login_required
2607def cancel_indexing(collection_id):
2608 """Cancel an active indexing task for a collection."""
2609 from ...database.session_context import get_user_db_session
2610 from ...database.session_passwords import session_password_store
2612 username = session["username"]
2613 session_id = session.get("session_id")
2615 db_password = None
2616 if session_id: 2616 ↛ 2621line 2616 didn't jump to line 2621 because the condition on line 2616 was always true
2617 db_password = session_password_store.get_session_password(
2618 username, session_id
2619 )
2621 try:
2622 with get_user_db_session(username, db_password) as db_session:
2623 # Find active indexing task for this collection
2624 task = (
2625 db_session.query(TaskMetadata)
2626 .filter(
2627 TaskMetadata.task_type == "indexing",
2628 TaskMetadata.status == "processing",
2629 )
2630 .first()
2631 )
2633 if not task:
2634 return jsonify(
2635 {
2636 "success": False,
2637 "error": "No active indexing task found",
2638 }
2639 ), 404
2641 # Check if it's for this collection
2642 metadata = task.metadata_json or {}
2643 if metadata.get("collection_id") != collection_id:
2644 return jsonify(
2645 {
2646 "success": False,
2647 "error": "No active indexing task for this collection",
2648 }
2649 ), 404
2651 # Mark as cancelled - the worker thread will check this
2652 task.status = "cancelled"
2653 task.progress_message = "Cancellation requested..."
2654 db_session.commit()
2656 logger.info(
2657 f"Cancelled indexing task {task.task_id} for collection {collection_id}"
2658 )
2660 return jsonify(
2661 {
2662 "success": True,
2663 "message": "Cancellation requested",
2664 "task_id": task.task_id,
2665 }
2666 )
2668 except Exception:
2669 logger.exception("Failed to cancel indexing")
2670 return jsonify(
2671 {
2672 "success": False,
2673 "error": "Failed to cancel indexing. Please try again.",
2674 }
2675 ), 500
2678# Research History Semantic Search Routes have been moved to
2679# research_library.search.routes.search_routes