Coverage for src/local_deep_research/research_library/routes/rag_routes.py: 85%
1002 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2RAG Management API Routes
4Provides endpoints for managing RAG indexing of library documents:
5- Configure embedding models
6- Index documents
7- Get RAG statistics
8- Bulk operations with progress tracking
9"""
11import os
13from flask import (
14 Blueprint,
15 jsonify,
16 request,
17 Response,
18 render_template,
19 session,
20 stream_with_context,
21)
22from loguru import logger
23import atexit
24import glob
25import json
26import uuid
27import time
28import threading
29import queue
30from concurrent.futures import ThreadPoolExecutor
31from datetime import datetime, UTC
32from pathlib import Path
33from typing import Optional
35from ...constants import FILE_PATH_SENTINELS, FILE_PATH_TEXT_ONLY
36from ...security.decorators import require_json_body
37from ...web.auth.decorators import login_required
38from ...web.utils.request_helpers import parse_bool_arg
39from ...utilities.db_utils import get_settings_manager
40from ...utilities.resource_utils import safe_close
41from ..services.library_rag_service import LibraryRAGService
42from ...settings.manager import SettingsManager
43from ...security.file_upload_validator import FileUploadValidator
44from ...security.path_validator import PathValidator
45from ...security.rate_limiter import (
46 upload_rate_limit_ip,
47 upload_rate_limit_user,
48)
49from ..utils import ensure_in_collection, handle_api_error
50from ...database.models.library import (
51 Document,
52 Collection,
53 DocumentCollection,
54 RAGIndex,
55 SourceType,
56 EmbeddingProvider,
57)
58from ...database.models.queue import TaskMetadata
59from ...database.thread_local_session import thread_cleanup
60from ...security.rate_limiter import limiter
61from ...config.paths import get_library_directory
63rag_bp = Blueprint("rag", __name__, url_prefix="/library")
65# NOTE: Routes use session["username"] (not .get()) intentionally.
66# @login_required guarantees the key exists; direct access fails fast
67# if the decorator is ever removed.
69# Global ThreadPoolExecutor for auto-indexing to prevent thread proliferation
70_auto_index_executor: ThreadPoolExecutor | None = None
71_auto_index_executor_lock = threading.Lock()
74def _get_auto_index_executor() -> ThreadPoolExecutor:
75 """Get or create the global auto-indexing executor (thread-safe)."""
76 global _auto_index_executor
77 with _auto_index_executor_lock:
78 if _auto_index_executor is None:
79 _auto_index_executor = ThreadPoolExecutor(
80 max_workers=4,
81 thread_name_prefix="auto_index_",
82 )
83 return _auto_index_executor
86def _shutdown_auto_index_executor() -> None:
87 """Shutdown the auto-index executor gracefully."""
88 global _auto_index_executor
89 if _auto_index_executor is not None:
90 _auto_index_executor.shutdown(wait=True)
91 _auto_index_executor = None
94atexit.register(_shutdown_auto_index_executor)
97def get_rag_service(
98 collection_id: Optional[str] = None,
99 use_defaults: bool = False,
100) -> LibraryRAGService:
101 """
102 Get RAG service instance with appropriate settings.
104 Delegates to rag_service_factory.get_rag_service() with the current
105 Flask session username. For non-Flask contexts, use the factory directly.
107 Args:
108 collection_id: Optional collection UUID to load stored settings from
109 use_defaults: When True, ignore stored collection settings and use
110 current defaults. Pass True on force-reindex so that the new
111 default embedding model is picked up.
112 """
113 from ..services.rag_service_factory import (
114 get_rag_service as _get_rag_service,
115 )
116 from ...database.session_passwords import session_password_store
118 username = session["username"]
119 session_id = session.get("session_id")
120 db_password = None
121 if session_id:
122 db_password = session_password_store.get_session_password(
123 username, session_id
124 )
125 return _get_rag_service(
126 username,
127 collection_id,
128 use_defaults=use_defaults,
129 db_password=db_password,
130 )
133# Config API Routes
136@rag_bp.route("/api/config/supported-formats", methods=["GET"])
137@login_required
138def get_supported_formats():
139 """Return list of supported file formats for upload.
141 This endpoint provides the single source of truth for supported file
142 extensions, pulling from the document_loaders registry. The UI can
143 use this to dynamically update the file input accept attribute.
144 """
145 from ...document_loaders import get_supported_extensions
147 extensions = get_supported_extensions()
148 # Sort extensions for consistent display
149 extensions = sorted(extensions)
151 return jsonify(
152 {
153 "extensions": extensions,
154 "accept_string": ",".join(extensions),
155 "count": len(extensions),
156 }
157 )
160# Page Routes
163@rag_bp.route("/embedding-settings")
164@login_required
165def embedding_settings_page():
166 """Render the Embedding Settings page."""
167 return render_template(
168 "pages/embedding_settings.html", active_page="embedding-settings"
169 )
172@rag_bp.route("/document/<string:document_id>/chunks")
173@login_required
174def view_document_chunks(document_id):
175 """View all chunks for a document across all collections."""
176 from ...database.session_context import get_user_db_session
177 from ...database.models.library import DocumentChunk
179 username = session["username"]
181 with get_user_db_session(username) as db_session:
182 # Get document info
183 document = db_session.query(Document).filter_by(id=document_id).first()
185 if not document:
186 return "Document not found", 404
188 # Get all chunks for this document
189 chunks = (
190 db_session.query(DocumentChunk)
191 .filter(DocumentChunk.source_id == document_id)
192 .order_by(DocumentChunk.collection_name, DocumentChunk.chunk_index)
193 .all()
194 )
196 # Group chunks by collection
197 chunks_by_collection = {}
198 for chunk in chunks:
199 coll_name = chunk.collection_name
200 if coll_name not in chunks_by_collection: 200 ↛ 214line 200 didn't jump to line 214 because the condition on line 200 was always true
201 # Get collection display name
202 collection_id = coll_name.replace("collection_", "")
203 collection = (
204 db_session.query(Collection)
205 .filter_by(id=collection_id)
206 .first()
207 )
208 chunks_by_collection[coll_name] = {
209 "name": collection.name if collection else coll_name,
210 "id": collection_id,
211 "chunks": [],
212 }
214 chunks_by_collection[coll_name]["chunks"].append(
215 {
216 "id": chunk.id,
217 "index": chunk.chunk_index,
218 "text": chunk.chunk_text,
219 "word_count": chunk.word_count,
220 "start_char": chunk.start_char,
221 "end_char": chunk.end_char,
222 "embedding_model": chunk.embedding_model,
223 "embedding_model_type": chunk.embedding_model_type.value
224 if chunk.embedding_model_type
225 else None,
226 "embedding_dimension": chunk.embedding_dimension,
227 "created_at": chunk.created_at,
228 }
229 )
231 return render_template(
232 "pages/document_chunks.html",
233 document=document,
234 chunks_by_collection=chunks_by_collection,
235 total_chunks=len(chunks),
236 )
239@rag_bp.route("/collections")
240@login_required
241def collections_page():
242 """Render the Collections page."""
243 return render_template("pages/collections.html", active_page="collections")
246@rag_bp.route("/collections/<string:collection_id>")
247@login_required
248def collection_details_page(collection_id):
249 """Render the Collection Details page."""
250 return render_template(
251 "pages/collection_details.html",
252 active_page="collections",
253 collection_id=collection_id,
254 )
257@rag_bp.route("/collections/<string:collection_id>/upload")
258@login_required
259def collection_upload_page(collection_id):
260 """Render the Collection Upload page."""
261 # Get the upload PDF storage setting
262 settings = get_settings_manager()
263 upload_pdf_storage = settings.get_setting(
264 "research_library.upload_pdf_storage", "none"
265 )
266 # Only allow valid values for uploads (no filesystem)
267 if upload_pdf_storage not in ("database", "none"):
268 upload_pdf_storage = "none"
270 return render_template(
271 "pages/collection_upload.html",
272 active_page="collections",
273 collection_id=collection_id,
274 collection_name=None, # Could fetch from DB if needed
275 upload_pdf_storage=upload_pdf_storage,
276 )
279@rag_bp.route("/collections/create")
280@login_required
281def collection_create_page():
282 """Render the Create Collection page."""
283 return render_template(
284 "pages/collection_create.html", active_page="collections"
285 )
288# API Routes
291@rag_bp.route("/api/rag/settings", methods=["GET"])
292@login_required
293def get_current_settings():
294 """Get current RAG configuration from settings."""
295 import json as json_lib
297 try:
298 settings = get_settings_manager()
300 # Get text separators and parse if needed
301 text_separators = settings.get_setting(
302 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]'
303 )
304 if isinstance(text_separators, str): 304 ↛ 314line 304 didn't jump to line 314 because the condition on line 304 was always true
305 try:
306 text_separators = json_lib.loads(text_separators)
307 except json_lib.JSONDecodeError:
308 logger.warning(
309 f"Invalid JSON for local_search_text_separators setting: {text_separators!r}. "
310 "Using default separators."
311 )
312 text_separators = ["\n\n", "\n", ". ", " ", ""]
314 normalize_vectors = settings.get_setting(
315 "local_search_normalize_vectors", True
316 )
318 return jsonify(
319 {
320 "success": True,
321 "settings": {
322 "embedding_provider": settings.get_setting(
323 "local_search_embedding_provider",
324 "sentence_transformers",
325 ),
326 "embedding_model": settings.get_setting(
327 "local_search_embedding_model", "all-MiniLM-L6-v2"
328 ),
329 "chunk_size": settings.get_setting(
330 "local_search_chunk_size", 1000
331 ),
332 "chunk_overlap": settings.get_setting(
333 "local_search_chunk_overlap", 200
334 ),
335 "splitter_type": settings.get_setting(
336 "local_search_splitter_type", "recursive"
337 ),
338 "text_separators": text_separators,
339 "distance_metric": settings.get_setting(
340 "local_search_distance_metric", "cosine"
341 ),
342 "normalize_vectors": normalize_vectors,
343 "index_type": settings.get_setting(
344 "local_search_index_type", "flat"
345 ),
346 },
347 }
348 )
349 except Exception as e:
350 return handle_api_error("getting RAG settings", e)
353@rag_bp.route("/api/rag/test-embedding", methods=["POST"])
354@login_required
355@require_json_body(error_format="success")
356def test_embedding():
357 """Test an embedding configuration by generating a test embedding."""
359 try:
360 data = request.get_json()
361 provider = data.get("provider")
362 model = data.get("model")
363 test_text = data.get("test_text", "This is a test.")
365 if not provider or not model:
366 return jsonify(
367 {"success": False, "error": "Provider and model are required"}
368 ), 400
370 # Import embedding functions
371 from ...embeddings.embeddings_config import (
372 get_embedding_function,
373 )
375 logger.info(
376 f"Testing embedding with provider={provider}, model={model}"
377 )
379 # Get user's settings so provider URLs (e.g. Ollama) are resolved correctly
380 settings = get_settings_manager()
381 settings_snapshot = (
382 settings.get_all_settings()
383 if hasattr(settings, "get_all_settings")
384 else {}
385 )
387 # Get embedding function with the specified configuration
388 start_time = time.time()
389 embedding_func = get_embedding_function(
390 provider=provider,
391 model_name=model,
392 settings_snapshot=settings_snapshot,
393 )
395 # Generate test embedding
396 embedding = embedding_func([test_text])[0]
397 response_time_ms = int((time.time() - start_time) * 1000)
399 # Get embedding dimension
400 dimension = len(embedding) if hasattr(embedding, "__len__") else None
402 return jsonify(
403 {
404 "success": True,
405 "dimension": dimension,
406 "response_time_ms": response_time_ms,
407 "provider": provider,
408 "model": model,
409 }
410 )
412 except Exception as e:
413 logger.exception("Error during testing embedding")
414 error_str = str(e).lower()
416 # Detect common signs that an LLM was selected instead of an embedding model
417 llm_hints = [
418 "does not support",
419 "not an embedding",
420 "generate embedding",
421 "invalid model",
422 "not found",
423 "expected float",
424 "could not convert",
425 "list index out of range",
426 "object is not subscriptable",
427 "not iterable",
428 "json",
429 "chat",
430 "completion",
431 ]
432 is_likely_llm = any(hint in error_str for hint in llm_hints)
434 if is_likely_llm:
435 user_message = (
436 f"Embedding test failed for model '{model}'. "
437 "This is most likely because an LLM (language model) was selected "
438 "instead of an embedding model. Please choose a dedicated embedding "
439 "model (e.g. nomic-embed-text, mxbai-embed-large, "
440 "all-MiniLM-L6-v2)."
441 )
442 else:
443 user_message = (
444 f"Embedding test failed for model '{model}'. "
445 "If you are unsure whether the selected model supports embeddings, "
446 "try a dedicated embedding model instead (e.g. nomic-embed-text, "
447 "mxbai-embed-large, all-MiniLM-L6-v2)."
448 )
450 return jsonify({"success": False, "error": user_message}), 500
453@rag_bp.route("/api/rag/models", methods=["GET"])
454@login_required
455def get_available_models():
456 """Get list of available embedding providers and models."""
457 try:
458 from ...embeddings.embeddings_config import _get_provider_classes
460 # Get current settings for providers
461 settings = get_settings_manager()
462 settings_snapshot = (
463 settings.get_all_settings()
464 if hasattr(settings, "get_all_settings")
465 else {}
466 )
468 # Get provider classes
469 provider_classes = _get_provider_classes()
471 # Provider display names
472 provider_labels = {
473 "sentence_transformers": "Sentence Transformers (Local)",
474 "ollama": "Ollama (Local)",
475 "openai": "OpenAI API",
476 }
478 # Get provider options and models by looping through providers
479 provider_options = []
480 providers = {}
482 for provider_key, provider_class in provider_classes.items():
483 available = provider_class.is_available(settings_snapshot)
485 # Always show the provider in the dropdown so users can
486 # configure its settings (e.g. fix a wrong Ollama URL).
487 provider_options.append(
488 {
489 "value": provider_key,
490 "label": provider_labels.get(provider_key, provider_key),
491 "available": available,
492 }
493 )
495 # Only fetch models when the provider is reachable.
496 if available:
497 models = provider_class.get_available_models(settings_snapshot)
498 providers[provider_key] = [
499 {
500 "value": m["value"],
501 "label": m["label"],
502 "provider": provider_key,
503 **(
504 {"is_embedding": m["is_embedding"]}
505 if "is_embedding" in m
506 else {}
507 ),
508 }
509 for m in models
510 ]
511 else:
512 providers[provider_key] = []
514 return jsonify(
515 {
516 "success": True,
517 "provider_options": provider_options,
518 "providers": providers,
519 }
520 )
521 except Exception as e:
522 return handle_api_error("getting available models", e)
525@rag_bp.route("/api/rag/info", methods=["GET"])
526@login_required
527def get_index_info():
528 """Get information about the current RAG index."""
529 from ...database.library_init import get_default_library_id
531 try:
532 # Get collection_id from request or use default Library collection
533 collection_id = request.args.get("collection_id")
534 if not collection_id:
535 collection_id = get_default_library_id(session["username"])
537 logger.info(
538 f"Getting RAG index info for collection_id: {collection_id}"
539 )
541 with get_rag_service(collection_id) as rag_service:
542 info = rag_service.get_current_index_info(collection_id)
544 if info is None:
545 logger.info(
546 f"No RAG index found for collection_id: {collection_id}"
547 )
548 return jsonify(
549 {"success": True, "info": None, "message": "No index found"}
550 )
552 logger.info(f"Found RAG index for collection_id: {collection_id}")
553 return jsonify({"success": True, "info": info})
554 except Exception as e:
555 return handle_api_error("getting index info", e)
558@rag_bp.route("/api/rag/stats", methods=["GET"])
559@login_required
560def get_rag_stats():
561 """Get RAG statistics for a collection."""
562 from ...database.library_init import get_default_library_id
564 try:
565 # Get collection_id from request or use default Library collection
566 collection_id = request.args.get("collection_id")
567 if not collection_id:
568 collection_id = get_default_library_id(session["username"])
570 with get_rag_service(collection_id) as rag_service:
571 stats = rag_service.get_rag_stats(collection_id)
573 return jsonify({"success": True, "stats": stats})
574 except Exception as e:
575 return handle_api_error("getting RAG stats", e)
578@rag_bp.route("/api/rag/index-document", methods=["POST"])
579@login_required
580@require_json_body(error_format="success")
581def index_document():
582 """Index a single document in a collection."""
583 from ...database.library_init import get_default_library_id
585 try:
586 data = request.get_json()
587 text_doc_id = data.get("text_doc_id")
588 force_reindex = data.get("force_reindex", False)
589 collection_id = data.get("collection_id")
591 if not text_doc_id:
592 return jsonify(
593 {"success": False, "error": "text_doc_id is required"}
594 ), 400
596 # Get collection_id from request or use default Library collection
597 if not collection_id:
598 collection_id = get_default_library_id(session["username"])
600 with get_rag_service(collection_id) as rag_service:
601 result = rag_service.index_document(
602 text_doc_id, collection_id, force_reindex
603 )
605 if result["status"] == "error":
606 return jsonify(
607 {"success": False, "error": result.get("error")}
608 ), 400
610 return jsonify({"success": True, "result": result})
611 except Exception as e:
612 return handle_api_error(f"indexing document {text_doc_id}", e)
615@rag_bp.route("/api/rag/remove-document", methods=["POST"])
616@login_required
617@require_json_body(error_format="success")
618def remove_document():
619 """Remove a document from RAG in a collection."""
620 from ...database.library_init import get_default_library_id
622 try:
623 data = request.get_json()
624 text_doc_id = data.get("text_doc_id")
625 collection_id = data.get("collection_id")
627 if not text_doc_id:
628 return jsonify(
629 {"success": False, "error": "text_doc_id is required"}
630 ), 400
632 # Get collection_id from request or use default Library collection
633 if not collection_id: 633 ↛ 636line 633 didn't jump to line 636 because the condition on line 633 was always true
634 collection_id = get_default_library_id(session["username"])
636 with get_rag_service(collection_id) as rag_service:
637 result = rag_service.remove_document_from_rag(
638 text_doc_id, collection_id
639 )
641 if result["status"] == "error":
642 return jsonify(
643 {"success": False, "error": result.get("error")}
644 ), 400
646 return jsonify({"success": True, "result": result})
647 except Exception as e:
648 return handle_api_error(f"removing document {text_doc_id}", e)
651@rag_bp.route("/api/rag/index-research", methods=["POST"])
652@login_required
653@require_json_body(error_format="success")
654def index_research():
655 """Index all documents from a research."""
656 try:
657 data = request.get_json()
658 research_id = data.get("research_id")
659 force_reindex = data.get("force_reindex", False)
661 if not research_id:
662 return jsonify(
663 {"success": False, "error": "research_id is required"}
664 ), 400
666 with get_rag_service() as rag_service:
667 results = rag_service.index_research_documents(
668 research_id, force_reindex
669 )
671 return jsonify({"success": True, "results": results})
672 except Exception as e:
673 return handle_api_error(f"indexing research {research_id}", e)
676@rag_bp.route("/api/rag/index-all", methods=["GET"])
677@login_required
678def index_all():
679 """Index all documents in a collection with Server-Sent Events progress."""
680 from ...database.session_context import get_user_db_session
681 from ...database.library_init import get_default_library_id
683 force_reindex = parse_bool_arg("force_reindex")
684 username = session["username"]
686 # Get collection_id from request or use default Library collection
687 collection_id = request.args.get("collection_id")
688 if not collection_id:
689 collection_id = get_default_library_id(username)
691 logger.info(
692 f"Starting index-all for collection_id: {collection_id}, force_reindex: {force_reindex}"
693 )
695 # Create RAG service in request context before generator runs
696 rag_service = get_rag_service(collection_id)
698 def generate():
699 """Generator function for SSE progress updates."""
700 try:
701 # Send initial status
702 yield f"data: {json.dumps({'type': 'start', 'message': 'Starting bulk indexing...'})}\n\n"
704 # Get document IDs to index from DocumentCollection
705 with get_user_db_session(username) as db_session:
706 # Query Document joined with DocumentCollection for the collection
707 query = (
708 db_session.query(Document.id, Document.title)
709 .join(
710 DocumentCollection,
711 Document.id == DocumentCollection.document_id,
712 )
713 .filter(DocumentCollection.collection_id == collection_id)
714 )
716 if not force_reindex:
717 # Only index documents that haven't been indexed yet
718 query = query.filter(DocumentCollection.indexed.is_(False))
720 doc_info = [(doc_id, title) for doc_id, title in query.all()]
722 if not doc_info:
723 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n"
724 return
726 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []}
727 total = len(doc_info)
729 # Process documents in batches to optimize performance
730 # Get batch size from settings
731 settings = get_settings_manager()
732 batch_size = int(
733 settings.get_setting("rag.indexing_batch_size", 15)
734 )
735 processed = 0
737 for i in range(0, len(doc_info), batch_size):
738 batch = doc_info[i : i + batch_size]
740 # Process batch with collection_id
741 batch_results = rag_service.index_documents_batch(
742 batch, collection_id, force_reindex
743 )
745 # Process results and send progress updates
746 for j, (doc_id, title) in enumerate(batch):
747 processed += 1
748 result = batch_results[doc_id]
750 # Send progress update
751 yield f"data: {json.dumps({'type': 'progress', 'current': processed, 'total': total, 'title': title, 'percent': int((processed / total) * 100)})}\n\n"
753 if result["status"] == "success":
754 results["successful"] += 1
755 elif result["status"] == "skipped":
756 results["skipped"] += 1
757 else:
758 results["failed"] += 1
759 results["errors"].append(
760 {
761 "doc_id": doc_id,
762 "title": title,
763 "error": result.get("error"),
764 }
765 )
767 # Send completion status
768 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n"
770 # Log final status for debugging
771 logger.info(
772 f"Bulk indexing complete: {results['successful']} successful, {results['skipped']} skipped, {results['failed']} failed"
773 )
775 except Exception:
776 logger.exception("Error in bulk indexing")
777 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n"
778 finally:
779 # Generator ``finally`` runs at stream completion (or client
780 # disconnect via ``GeneratorExit``) — the safe place to release
781 # the RAG service's embedding-manager httpx clients. Closing at
782 # the outer route scope would tear it down before the streamed
783 # generator runs. ``safe_close`` swallows close-time errors so
784 # a broken Ollama doesn't mask the original generator outcome.
785 safe_close(rag_service, "rag_service (index-all SSE)")
787 return Response(
788 stream_with_context(generate()), mimetype="text/event-stream"
789 )
792@rag_bp.route("/api/rag/configure", methods=["POST"])
793@login_required
794@require_json_body(error_format="success")
795def configure_rag():
796 """
797 Change RAG configuration (embedding model, chunk size, etc.).
798 This will create a new index with the new configuration.
799 """
800 import json as json_lib
802 try:
803 data = request.get_json()
804 embedding_model = data.get("embedding_model")
805 embedding_provider = data.get("embedding_provider")
806 chunk_size = data.get("chunk_size")
807 chunk_overlap = data.get("chunk_overlap")
808 collection_id = data.get("collection_id")
810 # Get new advanced settings (with defaults)
811 splitter_type = data.get("splitter_type", "recursive")
812 text_separators = data.get(
813 "text_separators", ["\n\n", "\n", ". ", " ", ""]
814 )
815 distance_metric = data.get("distance_metric", "cosine")
816 normalize_vectors = data.get("normalize_vectors", True)
817 index_type = data.get("index_type", "flat")
819 if not all(
820 [
821 embedding_model,
822 embedding_provider,
823 chunk_size,
824 chunk_overlap,
825 ]
826 ):
827 return jsonify(
828 {
829 "success": False,
830 "error": "All configuration parameters are required (embedding_model, embedding_provider, chunk_size, chunk_overlap)",
831 }
832 ), 400
834 # Save settings to database
835 settings = get_settings_manager()
836 settings.set_setting("local_search_embedding_model", embedding_model)
837 settings.set_setting(
838 "local_search_embedding_provider", embedding_provider
839 )
840 settings.set_setting("local_search_chunk_size", int(chunk_size))
841 settings.set_setting("local_search_chunk_overlap", int(chunk_overlap))
843 # Save new advanced settings
844 settings.set_setting("local_search_splitter_type", splitter_type)
845 # Convert list to JSON string for storage
846 if isinstance(text_separators, list):
847 text_separators_str = json_lib.dumps(text_separators)
848 else:
849 text_separators_str = text_separators
850 settings.set_setting(
851 "local_search_text_separators", text_separators_str
852 )
853 settings.set_setting("local_search_distance_metric", distance_metric)
854 settings.set_setting(
855 "local_search_normalize_vectors", bool(normalize_vectors)
856 )
857 settings.set_setting("local_search_index_type", index_type)
859 # If collection_id is provided, update that collection's configuration
860 if collection_id:
861 # Create new RAG service with new configuration
862 with LibraryRAGService(
863 username=session["username"],
864 embedding_model=embedding_model,
865 embedding_provider=embedding_provider,
866 chunk_size=int(chunk_size),
867 chunk_overlap=int(chunk_overlap),
868 splitter_type=splitter_type,
869 text_separators=text_separators
870 if isinstance(text_separators, list)
871 else json_lib.loads(text_separators),
872 distance_metric=distance_metric,
873 normalize_vectors=normalize_vectors,
874 index_type=index_type,
875 ) as new_rag_service:
876 # Get or create new index with this configuration
877 rag_index = new_rag_service._get_or_create_rag_index(
878 collection_id
879 )
881 return jsonify(
882 {
883 "success": True,
884 "message": "Configuration updated for collection. You can now index documents with the new settings.",
885 "index_hash": rag_index.index_hash,
886 }
887 )
888 else:
889 # Just saving default settings without updating a specific collection
890 return jsonify(
891 {
892 "success": True,
893 "message": "Default embedding settings saved successfully. New collections will use these settings.",
894 }
895 )
897 except Exception as e:
898 return handle_api_error("configuring RAG", e)
901@rag_bp.route("/api/rag/documents", methods=["GET"])
902@login_required
903def get_documents():
904 """Get library documents with their RAG status for the default Library collection (paginated)."""
905 from ...database.session_context import get_user_db_session
906 from ...database.library_init import get_default_library_id
908 try:
909 # Get pagination parameters
910 page = request.args.get("page", 1, type=int)
911 per_page = request.args.get("per_page", 50, type=int)
912 filter_type = request.args.get(
913 "filter", "all"
914 ) # all, indexed, unindexed
916 # Validate pagination parameters
917 page = max(1, page)
918 per_page = min(max(10, per_page), 100) # Limit between 10-100
920 # Close current thread's session to force fresh connection
921 from ...database.thread_local_session import cleanup_current_thread
923 cleanup_current_thread()
925 username = session["username"]
927 # Get collection_id from request or use default Library collection
928 collection_id = request.args.get("collection_id")
929 if not collection_id:
930 collection_id = get_default_library_id(username)
932 logger.info(
933 f"Getting documents for collection_id: {collection_id}, filter: {filter_type}, page: {page}"
934 )
936 with get_user_db_session(username) as db_session:
937 # Expire all cached objects to ensure we get fresh data from DB
938 db_session.expire_all()
940 # Import RagDocumentStatus model
941 from ...database.models.library import RagDocumentStatus
943 # Build base query - join Document with DocumentCollection for the collection
944 # LEFT JOIN with rag_document_status to check indexed status
945 query = (
946 db_session.query(
947 Document, DocumentCollection, RagDocumentStatus
948 )
949 .join(
950 DocumentCollection,
951 (DocumentCollection.document_id == Document.id)
952 & (DocumentCollection.collection_id == collection_id),
953 )
954 .outerjoin(
955 RagDocumentStatus,
956 (RagDocumentStatus.document_id == Document.id)
957 & (RagDocumentStatus.collection_id == collection_id),
958 )
959 )
961 logger.debug(f"Base query for collection {collection_id}: {query}")
963 # Apply filters based on rag_document_status existence
964 if filter_type == "indexed":
965 query = query.filter(RagDocumentStatus.document_id.isnot(None))
966 elif filter_type == "unindexed":
967 # Documents in collection but not indexed yet
968 query = query.filter(RagDocumentStatus.document_id.is_(None))
970 # Get total count before pagination
971 total_count = query.count()
972 logger.info(
973 f"Found {total_count} total documents for collection {collection_id} with filter {filter_type}"
974 )
976 # Apply pagination
977 results = (
978 query.order_by(Document.created_at.desc())
979 .limit(per_page)
980 .offset((page - 1) * per_page)
981 .all()
982 )
984 documents = [
985 {
986 "id": doc.id,
987 "title": doc.title,
988 "original_url": doc.original_url,
989 "rag_indexed": rag_status is not None,
990 "chunk_count": rag_status.chunk_count if rag_status else 0,
991 "created_at": doc.created_at.isoformat()
992 if doc.created_at
993 else None,
994 }
995 for doc, doc_collection, rag_status in results
996 ]
998 # Debug logging to help diagnose indexing status issues
999 indexed_count = sum(1 for d in documents if d["rag_indexed"])
1001 # Additional debug: check rag_document_status for this collection
1002 all_indexed_statuses = (
1003 db_session.query(RagDocumentStatus)
1004 .filter_by(collection_id=collection_id)
1005 .all()
1006 )
1007 logger.info(
1008 f"rag_document_status table shows: {len(all_indexed_statuses)} documents indexed for collection {collection_id}"
1009 )
1011 logger.info(
1012 f"Returning {len(documents)} documents on page {page}: "
1013 f"{indexed_count} indexed, {len(documents) - indexed_count} not indexed"
1014 )
1016 return jsonify(
1017 {
1018 "success": True,
1019 "documents": documents,
1020 "pagination": {
1021 "page": page,
1022 "per_page": per_page,
1023 "total": total_count,
1024 "pages": (total_count + per_page - 1) // per_page,
1025 },
1026 }
1027 )
1028 except Exception as e:
1029 return handle_api_error("getting documents", e)
1032@rag_bp.route("/api/rag/index-local", methods=["GET"])
1033@login_required
1034def index_local_library():
1035 """Index documents from a local folder with Server-Sent Events progress."""
1036 folder_path = request.args.get("path")
1037 file_patterns = request.args.get(
1038 "patterns", "*.pdf,*.txt,*.md,*.html"
1039 ).split(",")
1040 recursive = parse_bool_arg("recursive", default=True)
1042 if not folder_path:
1043 return jsonify({"success": False, "error": "Path is required"}), 400
1045 # Validate and sanitize the path to prevent traversal attacks
1046 try:
1047 validated_path = PathValidator.validate_local_filesystem_path(
1048 folder_path
1049 )
1050 # Re-sanitize for static analyzer recognition (CodeQL)
1051 path = PathValidator.sanitize_for_filesystem_ops(validated_path)
1052 except ValueError:
1053 logger.warning(f"Path validation failed for '{folder_path}'")
1054 return jsonify({"success": False, "error": "Invalid path"}), 400
1056 # Check path exists and is a directory
1057 if not path.exists():
1058 return jsonify({"success": False, "error": "Path does not exist"}), 400
1059 if not path.is_dir(): 1059 ↛ 1065line 1059 didn't jump to line 1065 because the condition on line 1059 was always true
1060 return jsonify(
1061 {"success": False, "error": "Path is not a directory"}
1062 ), 400
1064 # Create RAG service in request context
1065 rag_service = get_rag_service()
1067 def generate():
1068 """Generator function for SSE progress updates."""
1069 try:
1070 # Send initial status
1071 yield f"data: {json.dumps({'type': 'start', 'message': f'Scanning folder: {path}'})}\n\n"
1073 # Find all matching files
1074 files_to_index = []
1075 for pattern in file_patterns:
1076 pattern = pattern.strip()
1077 if recursive:
1078 search_pattern = str(path / "**" / pattern)
1079 else:
1080 search_pattern = str(path / pattern)
1082 matching_files = glob.glob(search_pattern, recursive=recursive)
1083 files_to_index.extend(matching_files)
1085 # Remove duplicates and sort
1086 files_to_index = sorted(set(files_to_index))
1088 if not files_to_index:
1089 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No matching files found'}})}\n\n"
1090 return
1092 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []}
1093 total = len(files_to_index)
1095 # Index each file
1096 for idx, file_path in enumerate(files_to_index, 1):
1097 file_name = Path(file_path).name
1099 # Send progress update
1100 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': file_name, 'percent': int((idx / total) * 100)})}\n\n"
1102 try:
1103 # Index the file directly using RAG service
1104 result = rag_service.index_local_file(file_path)
1106 if result.get("status") == "success":
1107 results["successful"] += 1
1108 elif result.get("status") == "skipped":
1109 results["skipped"] += 1
1110 else:
1111 results["failed"] += 1
1112 results["errors"].append(
1113 {
1114 "file": file_name,
1115 "error": result.get("error", "Unknown error"),
1116 }
1117 )
1118 except Exception:
1119 results["failed"] += 1
1120 results["errors"].append(
1121 {"file": file_name, "error": "Failed to index file"}
1122 )
1123 logger.exception(f"Error indexing file {file_path}")
1125 # Send completion status
1126 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n"
1128 logger.info(
1129 f"Local library indexing complete for {path}: "
1130 f"{results['successful']} successful, "
1131 f"{results['skipped']} skipped, "
1132 f"{results['failed']} failed"
1133 )
1135 except Exception:
1136 logger.exception("Error in local library indexing")
1137 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n"
1138 finally:
1139 # See parallel comment in index-all generator above — generator
1140 # ``finally`` is the safe close site for streamed RAG services.
1141 safe_close(rag_service, "rag_service (index-local SSE)")
1143 return Response(
1144 stream_with_context(generate()), mimetype="text/event-stream"
1145 )
1148# Collection Management Routes
1151@rag_bp.route("/api/collections", methods=["GET"])
1152@login_required
1153def get_collections():
1154 """Get all document collections for the current user."""
1155 from ...database.session_context import get_user_db_session
1157 try:
1158 username = session["username"]
1159 with get_user_db_session(username) as db_session:
1160 # No need to filter by username - each user has their own database
1161 collections = db_session.query(Collection).all()
1163 result = []
1164 for coll in collections:
1165 collection_data = {
1166 "id": coll.id,
1167 "name": coll.name,
1168 "description": coll.description,
1169 "created_at": coll.created_at.isoformat()
1170 if coll.created_at
1171 else None,
1172 "collection_type": coll.collection_type,
1173 "is_default": coll.is_default
1174 if hasattr(coll, "is_default")
1175 else False,
1176 "document_count": len(coll.document_links)
1177 if hasattr(coll, "document_links")
1178 else 0,
1179 "folder_count": len(coll.linked_folders)
1180 if hasattr(coll, "linked_folders")
1181 else 0,
1182 }
1184 # Include embedding metadata if available
1185 if coll.embedding_model:
1186 collection_data["embedding"] = {
1187 "model": coll.embedding_model,
1188 "provider": coll.embedding_model_type.value
1189 if coll.embedding_model_type
1190 else None,
1191 "dimension": coll.embedding_dimension,
1192 "chunk_size": coll.chunk_size,
1193 "chunk_overlap": coll.chunk_overlap,
1194 }
1195 else:
1196 collection_data["embedding"] = None
1198 result.append(collection_data)
1200 return jsonify({"success": True, "collections": result})
1201 except Exception as e:
1202 return handle_api_error("getting collections", e)
1205@rag_bp.route("/api/collections", methods=["POST"])
1206@login_required
1207@require_json_body(error_format="success")
1208def create_collection():
1209 """Create a new document collection."""
1210 from ...database.session_context import get_user_db_session
1212 try:
1213 data = request.get_json()
1214 name = data.get("name", "").strip()
1215 description = data.get("description", "").strip()
1216 collection_type = data.get("type", "user_uploads")
1218 if not name:
1219 return jsonify({"success": False, "error": "Name is required"}), 400
1221 username = session["username"]
1222 with get_user_db_session(username) as db_session:
1223 # Check if collection with this name already exists in this user's database
1224 existing = db_session.query(Collection).filter_by(name=name).first()
1226 if existing:
1227 return jsonify(
1228 {
1229 "success": False,
1230 "error": f"Collection '{name}' already exists",
1231 }
1232 ), 400
1234 # Create new collection (no username needed - each user has their own DB)
1235 # Note: created_at uses default=utcnow() in the model, so we don't need to set it manually
1236 collection = Collection(
1237 id=str(uuid.uuid4()), # Generate UUID for collection
1238 name=name,
1239 description=description,
1240 collection_type=collection_type,
1241 )
1243 db_session.add(collection)
1244 db_session.commit()
1246 return jsonify(
1247 {
1248 "success": True,
1249 "collection": {
1250 "id": collection.id,
1251 "name": collection.name,
1252 "description": collection.description,
1253 "created_at": collection.created_at.isoformat(),
1254 "collection_type": collection.collection_type,
1255 },
1256 }
1257 )
1258 except Exception as e:
1259 return handle_api_error("creating collection", e)
1262@rag_bp.route("/api/collections/<string:collection_id>", methods=["PUT"])
1263@login_required
1264@require_json_body(error_format="success")
1265def update_collection(collection_id):
1266 """Update a collection's details."""
1267 from ...database.session_context import get_user_db_session
1269 try:
1270 data = request.get_json()
1271 name = data.get("name", "").strip()
1272 description = data.get("description", "").strip()
1274 username = session["username"]
1275 with get_user_db_session(username) as db_session:
1276 # No need to filter by username - each user has their own database
1277 collection = (
1278 db_session.query(Collection).filter_by(id=collection_id).first()
1279 )
1281 if not collection:
1282 return jsonify(
1283 {"success": False, "error": "Collection not found"}
1284 ), 404
1286 if name:
1287 # Check if new name conflicts with existing collection
1288 existing = (
1289 db_session.query(Collection)
1290 .filter(
1291 Collection.name == name,
1292 Collection.id != collection_id,
1293 )
1294 .first()
1295 )
1297 if existing:
1298 return jsonify(
1299 {
1300 "success": False,
1301 "error": f"Collection '{name}' already exists",
1302 }
1303 ), 400
1305 collection.name = name
1307 if description is not None: # Allow empty description 1307 ↛ 1310line 1307 didn't jump to line 1310 because the condition on line 1307 was always true
1308 collection.description = description
1310 db_session.commit()
1312 return jsonify(
1313 {
1314 "success": True,
1315 "collection": {
1316 "id": collection.id,
1317 "name": collection.name,
1318 "description": collection.description,
1319 "created_at": collection.created_at.isoformat()
1320 if collection.created_at
1321 else None,
1322 "collection_type": collection.collection_type,
1323 },
1324 }
1325 )
1326 except Exception as e:
1327 return handle_api_error("updating collection", e)
1330@rag_bp.route(
1331 "/api/collections/<string:collection_id>/upload", methods=["POST"]
1332)
1333@login_required
1334@upload_rate_limit_user
1335@upload_rate_limit_ip
1336def upload_to_collection(collection_id):
1337 """Upload files to a collection."""
1338 from ...database.session_context import get_user_db_session
1339 from ...security import sanitize_filename, UnsafeFilenameError
1340 from pathlib import Path
1341 import hashlib
1342 import uuid
1343 from ..services.pdf_storage_manager import PDFStorageManager
1345 try:
1346 if "files" not in request.files:
1347 return jsonify(
1348 {"success": False, "error": "No files provided"}
1349 ), 400
1351 files = request.files.getlist("files")
1352 if not files: 1352 ↛ 1353line 1352 didn't jump to line 1353 because the condition on line 1352 was never true
1353 return jsonify(
1354 {"success": False, "error": "No files selected"}
1355 ), 400
1357 # Bound the per-request file count BEFORE doing any work. The
1358 # request-level MAX_CONTENT_LENGTH gate covers total bytes, but
1359 # not file *count*; a request with 10000 zero-byte files would
1360 # otherwise reach the loop below.
1361 is_valid, error_msg = FileUploadValidator.validate_file_count(
1362 len(files)
1363 )
1364 if not is_valid:
1365 return jsonify({"success": False, "error": error_msg}), 400
1367 username = session["username"]
1368 with get_user_db_session(username) as db_session:
1369 # Verify collection exists in this user's database
1370 collection = (
1371 db_session.query(Collection).filter_by(id=collection_id).first()
1372 )
1374 if not collection:
1375 return jsonify(
1376 {"success": False, "error": "Collection not found"}
1377 ), 404
1379 # Get PDF storage mode from form data, falling back to user's setting
1380 settings = get_settings_manager()
1381 default_pdf_storage = settings.get_setting(
1382 "research_library.upload_pdf_storage", "none"
1383 )
1384 pdf_storage = request.form.get("pdf_storage", default_pdf_storage)
1385 if pdf_storage not in ("database", "none"): 1385 ↛ 1388line 1385 didn't jump to line 1388 because the condition on line 1385 was never true
1386 # Security: user uploads can only use database (encrypted) or none (text-only)
1387 # Filesystem storage is not allowed for user uploads
1388 pdf_storage = "none"
1390 # Initialize PDF storage manager if storing PDFs in database
1391 pdf_storage_manager = None
1392 if pdf_storage == "database":
1393 library_root = settings.get_setting(
1394 "research_library.storage_path",
1395 str(get_library_directory()),
1396 )
1397 library_root = str(
1398 Path(os.path.expandvars(library_root))
1399 .expanduser()
1400 .resolve()
1401 )
1402 pdf_storage_manager = PDFStorageManager(
1403 library_root=Path(library_root), storage_mode="database"
1404 )
1405 logger.info("PDF storage mode: database (encrypted)")
1406 else:
1407 logger.info("PDF storage mode: none (text-only)")
1409 uploaded_files = []
1410 errors = []
1412 for file in files:
1413 if not file.filename:
1414 continue
1416 try:
1417 filename = sanitize_filename(file.filename)
1418 except UnsafeFilenameError:
1419 errors.append(
1420 {
1421 "filename": "rejected",
1422 "error": "Invalid or unsafe filename",
1423 }
1424 )
1425 continue
1427 try:
1428 # Pre-flight size check on Content-Length BEFORE reading
1429 # bytes into memory. Cheap rejection for oversized files;
1430 # avoids loading 50MB+ into memory just to discard it.
1431 is_valid, error_msg = (
1432 FileUploadValidator.validate_file_size(
1433 content_length=file.content_length,
1434 file_content=None,
1435 )
1436 )
1437 if not is_valid: 1437 ↛ 1438line 1437 didn't jump to line 1438 because the condition on line 1437 was never true
1438 errors.append(
1439 {"filename": filename, "error": error_msg}
1440 )
1441 continue
1443 # Read file content
1444 file_content = file.read()
1445 file.seek(0) # Reset for potential re-reading
1447 # Post-read size check (Content-Length can be missing or
1448 # spoofed; the actual byte count is authoritative).
1449 is_valid, error_msg = (
1450 FileUploadValidator.validate_file_size(
1451 content_length=None,
1452 file_content=file_content,
1453 )
1454 )
1455 if not is_valid:
1456 errors.append(
1457 {"filename": filename, "error": error_msg}
1458 )
1459 continue
1461 # Calculate file hash for deduplication
1462 file_hash = hashlib.sha256(file_content).hexdigest()
1464 # Check if document already exists
1465 existing_doc = (
1466 db_session.query(Document)
1467 .filter_by(document_hash=file_hash)
1468 .first()
1469 )
1471 if existing_doc:
1472 # Document exists, check if we can upgrade to include PDF
1473 pdf_upgraded = False
1474 if (
1475 pdf_storage == "database"
1476 and pdf_storage_manager is not None
1477 ):
1478 # NOTE: Only the PDF magic-byte check is needed here.
1479 # File count validation is already handled by Flask's MAX_CONTENT_LENGTH.
1480 # Filename sanitization already happens via sanitize_filename() above.
1481 # See PR #3145 review for details.
1482 if file_content[:4] != b"%PDF": 1482 ↛ 1483line 1482 didn't jump to line 1483 because the condition on line 1482 was never true
1483 logger.debug(
1484 "Skipping PDF upgrade for {}: not a PDF file",
1485 filename,
1486 )
1487 else:
1488 pdf_upgraded = (
1489 pdf_storage_manager.upgrade_to_pdf(
1490 document=existing_doc,
1491 pdf_content=file_content,
1492 session=db_session,
1493 )
1494 )
1496 # Check if already in collection
1497 existing_link = (
1498 db_session.query(DocumentCollection)
1499 .filter_by(
1500 document_id=existing_doc.id,
1501 collection_id=collection_id,
1502 )
1503 .first()
1504 )
1506 if not existing_link:
1507 ensure_in_collection(
1508 db_session, existing_doc.id, collection_id
1509 )
1510 status = "added_to_collection"
1511 if pdf_upgraded:
1512 status = "added_to_collection_pdf_upgraded"
1513 uploaded_files.append(
1514 {
1515 "filename": existing_doc.filename,
1516 "status": status,
1517 "id": existing_doc.id,
1518 "pdf_upgraded": pdf_upgraded,
1519 }
1520 )
1521 else:
1522 status = "already_in_collection"
1523 if pdf_upgraded:
1524 status = "pdf_upgraded"
1525 uploaded_files.append(
1526 {
1527 "filename": existing_doc.filename,
1528 "status": status,
1529 "id": existing_doc.id,
1530 "pdf_upgraded": pdf_upgraded,
1531 }
1532 )
1533 else:
1534 # Create new document
1535 from ...document_loaders import (
1536 extract_text_from_bytes,
1537 is_extension_supported,
1538 )
1540 file_extension = Path(filename).suffix.lower()
1542 # Validate extension is supported before extraction
1543 if not is_extension_supported(file_extension):
1544 errors.append(
1545 {
1546 "filename": filename,
1547 "error": f"Unsupported format: {file_extension}",
1548 }
1549 )
1550 continue
1552 # Use file_type without leading dot for storage
1553 file_type = (
1554 file_extension[1:]
1555 if file_extension.startswith(".")
1556 else file_extension
1557 )
1559 # Extract text using document_loaders module
1560 extracted_text = extract_text_from_bytes(
1561 file_content, file_extension, filename
1562 )
1564 # Clean the extracted text to remove surrogate characters
1565 if extracted_text:
1566 from ...text_processing import remove_surrogates
1568 extracted_text = remove_surrogates(extracted_text)
1570 if not extracted_text:
1571 errors.append(
1572 {
1573 "filename": filename,
1574 "error": f"Could not extract text from {file_type} file",
1575 }
1576 )
1577 logger.warning(
1578 f"Skipping file {filename} - no text could be extracted"
1579 )
1580 continue
1582 # Get or create the user_upload source type
1583 logger.info(
1584 f"Getting or creating user_upload source type for {filename}"
1585 )
1586 source_type = (
1587 db_session.query(SourceType)
1588 .filter_by(name="user_upload")
1589 .first()
1590 )
1591 if not source_type: 1591 ↛ 1592line 1591 didn't jump to line 1592 because the condition on line 1591 was never true
1592 logger.info("Creating new user_upload source type")
1593 source_type = SourceType(
1594 id=str(uuid.uuid4()),
1595 name="user_upload",
1596 display_name="User Upload",
1597 description="Documents uploaded by users",
1598 icon="fas fa-upload",
1599 )
1600 db_session.add(source_type)
1601 db_session.flush()
1602 logger.info(
1603 f"Created source type with ID: {source_type.id}"
1604 )
1605 else:
1606 logger.info(
1607 f"Found existing source type with ID: {source_type.id}"
1608 )
1610 # Create document with extracted text (no username needed - in user's own database)
1611 # Note: uploaded_at uses default=utcnow() in the model, so we don't need to set it manually
1612 doc_id = str(uuid.uuid4())
1613 logger.info(
1614 f"Creating document {doc_id} for {filename}"
1615 )
1617 # Determine storage mode and file_path
1618 store_pdf_in_db = (
1619 pdf_storage == "database"
1620 and file_type == "pdf"
1621 and pdf_storage_manager is not None
1622 )
1624 new_doc = Document(
1625 id=doc_id,
1626 source_type_id=source_type.id,
1627 filename=filename,
1628 document_hash=file_hash,
1629 file_size=len(file_content),
1630 file_type=file_type,
1631 text_content=extracted_text, # Always store extracted text
1632 file_path=None
1633 if store_pdf_in_db
1634 else FILE_PATH_TEXT_ONLY,
1635 storage_mode="database"
1636 if store_pdf_in_db
1637 else "none",
1638 )
1639 db_session.add(new_doc)
1640 db_session.flush() # Get the ID
1641 logger.info(
1642 f"Document {new_doc.id} created successfully"
1643 )
1645 # Store PDF in encrypted database if requested
1646 pdf_stored = False
1647 if store_pdf_in_db:
1648 try:
1649 pdf_storage_manager.save_pdf(
1650 pdf_content=file_content,
1651 document=new_doc,
1652 session=db_session,
1653 filename=filename,
1654 )
1655 pdf_stored = True
1656 logger.info(
1657 f"PDF stored in encrypted database for {filename}"
1658 )
1659 except Exception:
1660 logger.exception(
1661 f"Failed to store PDF in database for {filename}"
1662 )
1663 # Continue without PDF storage - text is still saved
1665 # Add to collection
1666 ensure_in_collection(
1667 db_session, new_doc.id, collection_id
1668 )
1670 uploaded_files.append(
1671 {
1672 "filename": filename,
1673 "status": "uploaded",
1674 "id": new_doc.id,
1675 "text_length": len(extracted_text),
1676 "pdf_stored": pdf_stored,
1677 }
1678 )
1680 except Exception:
1681 errors.append(
1682 {
1683 "filename": filename,
1684 "error": "Failed to upload file",
1685 }
1686 )
1687 logger.exception(f"Error uploading file {filename}")
1689 db_session.commit()
1691 # Trigger auto-indexing for successfully uploaded documents
1692 document_ids = [
1693 f["id"]
1694 for f in uploaded_files
1695 if f.get("status") in ("uploaded", "added_to_collection")
1696 ]
1697 if document_ids:
1698 from ...database.session_passwords import session_password_store
1700 session_id = session.get("session_id")
1701 db_password = session_password_store.get_session_password(
1702 username, session_id
1703 )
1704 if db_password:
1705 trigger_auto_index(
1706 document_ids, collection_id, username, db_password
1707 )
1709 return jsonify(
1710 {
1711 "success": True,
1712 "uploaded": uploaded_files,
1713 "errors": errors,
1714 "summary": {
1715 "total": len(files),
1716 "successful": len(uploaded_files),
1717 "failed": len(errors),
1718 },
1719 }
1720 )
1722 except Exception as e:
1723 return handle_api_error("uploading files", e)
1726@rag_bp.route(
1727 "/api/collections/<string:collection_id>/documents", methods=["GET"]
1728)
1729@login_required
1730def get_collection_documents(collection_id):
1731 """Get all documents in a collection."""
1732 from ...database.session_context import get_user_db_session
1734 try:
1735 username = session["username"]
1736 with get_user_db_session(username) as db_session:
1737 # Verify collection exists in this user's database
1738 collection = (
1739 db_session.query(Collection).filter_by(id=collection_id).first()
1740 )
1742 if not collection:
1743 return jsonify(
1744 {"success": False, "error": "Collection not found"}
1745 ), 404
1747 # Get documents through junction table
1748 doc_links = (
1749 db_session.query(DocumentCollection, Document)
1750 .join(Document)
1751 .filter(DocumentCollection.collection_id == collection_id)
1752 .all()
1753 )
1755 documents = []
1756 for link, doc in doc_links:
1757 # Check if PDF file is stored
1758 has_pdf = bool(
1759 doc.file_path and doc.file_path not in FILE_PATH_SENTINELS
1760 )
1761 has_text_db = bool(doc.text_content)
1763 # Use title if available, otherwise filename
1764 display_title = doc.title or doc.filename or "Untitled"
1766 # Get source type name
1767 source_type_name = (
1768 doc.source_type.name if doc.source_type else "unknown"
1769 )
1771 # Check if document is in other collections
1772 other_collections_count = (
1773 db_session.query(DocumentCollection)
1774 .filter(
1775 DocumentCollection.document_id == doc.id,
1776 DocumentCollection.collection_id != collection_id,
1777 )
1778 .count()
1779 )
1781 documents.append(
1782 {
1783 "id": doc.id,
1784 "filename": display_title,
1785 "title": display_title,
1786 "file_type": doc.file_type,
1787 "file_size": doc.file_size,
1788 "uploaded_at": doc.created_at.isoformat()
1789 if doc.created_at
1790 else None,
1791 "indexed": link.indexed,
1792 "chunk_count": link.chunk_count,
1793 "last_indexed_at": link.last_indexed_at.isoformat()
1794 if link.last_indexed_at
1795 else None,
1796 "has_pdf": has_pdf,
1797 "has_text_db": has_text_db,
1798 "source_type": source_type_name,
1799 "in_other_collections": other_collections_count > 0,
1800 "other_collections_count": other_collections_count,
1801 }
1802 )
1804 # Get index file size if available
1805 index_file_size = None
1806 index_file_size_bytes = None
1807 collection_name = f"collection_{collection_id}"
1808 rag_index = (
1809 db_session.query(RAGIndex)
1810 .filter_by(collection_name=collection_name)
1811 .first()
1812 )
1813 if rag_index and rag_index.index_path:
1814 from pathlib import Path
1816 index_path = Path(rag_index.index_path)
1817 if index_path.exists(): 1817 ↛ 1828line 1817 didn't jump to line 1828 because the condition on line 1817 was always true
1818 size_bytes = index_path.stat().st_size
1819 index_file_size_bytes = size_bytes
1820 # Format as human-readable
1821 if size_bytes < 1024:
1822 index_file_size = f"{size_bytes} B"
1823 elif size_bytes < 1024 * 1024:
1824 index_file_size = f"{size_bytes / 1024:.1f} KB"
1825 else:
1826 index_file_size = f"{size_bytes / (1024 * 1024):.1f} MB"
1828 return jsonify(
1829 {
1830 "success": True,
1831 "collection": {
1832 "id": collection.id,
1833 "name": collection.name,
1834 "description": collection.description,
1835 "embedding_model": collection.embedding_model,
1836 "embedding_model_type": collection.embedding_model_type.value
1837 if collection.embedding_model_type
1838 else None,
1839 "embedding_dimension": collection.embedding_dimension,
1840 "chunk_size": collection.chunk_size,
1841 "chunk_overlap": collection.chunk_overlap,
1842 # Advanced settings
1843 "splitter_type": collection.splitter_type,
1844 "distance_metric": collection.distance_metric,
1845 "index_type": collection.index_type,
1846 "normalize_vectors": collection.normalize_vectors,
1847 # Index file info
1848 "index_file_size": index_file_size,
1849 "index_file_size_bytes": index_file_size_bytes,
1850 "collection_type": collection.collection_type,
1851 },
1852 "documents": documents,
1853 }
1854 )
1856 except Exception as e:
1857 return handle_api_error("getting collection documents", e)
1860@rag_bp.route("/api/collections/<string:collection_id>/index", methods=["GET"])
1861@login_required
1862def index_collection(collection_id):
1863 """Index all documents in a collection with Server-Sent Events progress."""
1864 from ...database.session_context import get_user_db_session
1865 from ...database.session_passwords import session_password_store
1867 force_reindex = parse_bool_arg("force_reindex")
1868 username = session["username"]
1869 session_id = session.get("session_id")
1871 logger.info(f"Starting index_collection, force_reindex={force_reindex}")
1873 # Get password for thread access to encrypted database
1874 db_password = None
1875 if session_id: 1875 ↛ 1881line 1875 didn't jump to line 1881 because the condition on line 1875 was always true
1876 db_password = session_password_store.get_session_password(
1877 username, session_id
1878 )
1880 # Create RAG service — on force reindex use current default model
1881 rag_service = get_rag_service(collection_id, use_defaults=force_reindex)
1882 logger.info(
1883 f"RAG service created: provider={rag_service.embedding_provider}"
1884 )
1886 def generate():
1887 """Generator for SSE progress updates."""
1888 logger.info("SSE generator started")
1889 try:
1890 with get_user_db_session(username, db_password) as db_session:
1891 # Verify collection exists in this user's database
1892 collection = (
1893 db_session.query(Collection)
1894 .filter_by(id=collection_id)
1895 .first()
1896 )
1898 if not collection:
1899 yield f"data: {json.dumps({'type': 'error', 'error': 'Collection not found'})}\n\n"
1900 return
1902 # Store embedding metadata on first index or force reindex
1903 if collection.embedding_model is None or force_reindex:
1904 # Get embedding dimension from the embedding manager
1905 embedding_dim = None
1906 try:
1907 # Try to get dimension from the embedding manager's provider
1908 if hasattr(rag_service.embedding_manager, "provider"): 1908 ↛ 1909line 1908 didn't jump to line 1909 because the condition on line 1908 was never true
1909 provider = rag_service.embedding_manager.provider
1910 if hasattr(provider, "embedding_dimension"):
1911 embedding_dim = provider.embedding_dimension
1912 except Exception:
1913 logger.warning("Could not get embedding dimension")
1915 # Store the embedding configuration used for this collection
1916 collection.embedding_model = rag_service.embedding_model
1917 collection.embedding_model_type = EmbeddingProvider(
1918 rag_service.embedding_provider
1919 )
1920 collection.embedding_dimension = embedding_dim
1921 collection.chunk_size = rag_service.chunk_size
1922 collection.chunk_overlap = rag_service.chunk_overlap
1923 # Store advanced settings
1924 collection.splitter_type = rag_service.splitter_type
1925 collection.text_separators = rag_service.text_separators
1926 collection.distance_metric = rag_service.distance_metric
1927 # Ensure normalize_vectors is a proper boolean for database
1928 collection.normalize_vectors = bool(
1929 rag_service.normalize_vectors
1930 )
1931 collection.index_type = rag_service.index_type
1932 db_session.commit()
1933 logger.info(
1934 f"Stored embedding metadata for collection: provider={rag_service.embedding_provider}"
1935 )
1937 # Get documents to index
1938 query = (
1939 db_session.query(DocumentCollection, Document)
1940 .join(Document)
1941 .filter(DocumentCollection.collection_id == collection_id)
1942 )
1944 if not force_reindex:
1945 # IMPORTANT: DO NOT CHANGE THIS LINE! DO NOT REMOVE THIS COMMENT!
1946 # We MUST use `== False` instead of `not DocumentCollection.indexed`
1947 # The Python `not` operator does NOT work correctly in SQLAlchemy filters.
1948 # Using `not` will cause the query to return NO results (zero documents).
1949 # SQLAlchemy requires explicit comparison: `== False` or `== True`
1950 # This has been fixed multiple times - DO NOT change it back to `not`!
1951 query = query.filter(DocumentCollection.indexed == False) # noqa: E712
1953 doc_links = query.all()
1955 if not doc_links:
1956 logger.info("No documents to index in collection")
1957 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n"
1958 return
1960 total = len(doc_links)
1961 logger.info(f"Found {total} documents to index")
1962 results = {
1963 "successful": 0,
1964 "skipped": 0,
1965 "failed": 0,
1966 "errors": [],
1967 }
1969 yield f"data: {json.dumps({'type': 'start', 'message': f'Indexing {total} documents in collection: {collection.name}'})}\n\n"
1971 for idx, (link, doc) in enumerate(doc_links, 1):
1972 filename = doc.filename or doc.title or "Unknown"
1973 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': filename, 'percent': int((idx / total) * 100)})}\n\n"
1975 try:
1976 logger.debug(
1977 f"Indexing document {idx}/{total}: {filename}"
1978 )
1980 # Run index_document in a separate thread to allow sending SSE heartbeats.
1981 # This keeps the HTTP connection alive during long indexing operations,
1982 # preventing timeouts from proxy servers (nginx) and browsers.
1983 # The main thread periodically yields heartbeat comments while waiting.
1984 result_queue = queue.Queue()
1985 error_queue = queue.Queue()
1987 def index_in_thread():
1988 try:
1989 r = rag_service.index_document(
1990 document_id=doc.id,
1991 collection_id=collection_id,
1992 force_reindex=force_reindex,
1993 )
1994 result_queue.put(r)
1995 except Exception as ex:
1996 error_queue.put(ex)
1997 finally:
1998 try:
1999 from ...database.thread_local_session import (
2000 cleanup_current_thread,
2001 )
2003 cleanup_current_thread()
2004 except Exception:
2005 logger.debug(
2006 "best-effort thread-local DB session cleanup",
2007 exc_info=True,
2008 )
2010 thread = threading.Thread(target=index_in_thread)
2011 thread.start()
2013 # Send heartbeats while waiting for the thread to complete
2014 heartbeat_interval = 5 # seconds
2015 while thread.is_alive(): 2015 ↛ 2016line 2015 didn't jump to line 2016 because the condition on line 2015 was never true
2016 thread.join(timeout=heartbeat_interval)
2017 if thread.is_alive():
2018 # Send SSE comment as heartbeat (keeps connection alive)
2019 yield f": heartbeat {idx}/{total}\n\n"
2021 # Check for errors from thread
2022 if not error_queue.empty():
2023 raise error_queue.get() # noqa: TRY301 — re-raises thread exception for per-document error handling
2025 result = result_queue.get()
2026 logger.info(
2027 f"Indexed document {idx}/{total}: {filename} - status={result.get('status')}"
2028 )
2030 if result.get("status") == "success":
2031 results["successful"] += 1
2032 # DocumentCollection status is already updated in index_document
2033 # No need to update link here
2034 elif result.get("status") == "skipped": 2034 ↛ 2037line 2034 didn't jump to line 2037 because the condition on line 2034 was always true
2035 results["skipped"] += 1
2036 else:
2037 results["failed"] += 1
2038 error_msg = result.get("error", "Unknown error")
2039 results["errors"].append(
2040 {
2041 "filename": filename,
2042 "error": error_msg,
2043 }
2044 )
2045 logger.warning(
2046 f"Failed to index {filename} ({idx}/{total}): {error_msg}"
2047 )
2048 except Exception as e:
2049 results["failed"] += 1
2050 error_msg = str(e) or "Failed to index document"
2051 results["errors"].append(
2052 {
2053 "filename": filename,
2054 "error": error_msg,
2055 }
2056 )
2057 logger.exception(
2058 f"Exception indexing document {filename} ({idx}/{total})"
2059 )
2060 # Send error update to client so they know indexing is continuing
2061 yield f"data: {json.dumps({'type': 'doc_error', 'filename': filename, 'error': error_msg})}\n\n"
2063 db_session.commit()
2064 # Ensure all changes are written to disk
2065 db_session.flush()
2067 logger.info(
2068 f"Indexing complete: {results['successful']} successful, {results['failed']} failed, {results['skipped']} skipped"
2069 )
2070 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n"
2071 logger.info("SSE generator finished successfully")
2073 except Exception:
2074 logger.exception("Error in collection indexing")
2075 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n"
2076 finally:
2077 # See parallel comment in index-all generator above — generator
2078 # ``finally`` is the safe close site for streamed RAG services.
2079 safe_close(rag_service, "rag_service (index-collection SSE)")
2081 response = Response(
2082 stream_with_context(generate()), mimetype="text/event-stream"
2083 )
2084 # Prevent buffering for proper SSE streaming
2085 response.headers["Cache-Control"] = "no-cache, no-transform"
2086 response.headers["Connection"] = "keep-alive"
2087 response.headers["X-Accel-Buffering"] = "no"
2088 return response
2091# =============================================================================
2092# Background Indexing Endpoints
2093# =============================================================================
2096def _get_rag_service_for_thread(
2097 collection_id: str,
2098 username: str,
2099 db_password: str,
2100 use_defaults: bool = False,
2101) -> LibraryRAGService:
2102 """
2103 Create RAG service for use in background threads (no Flask context).
2105 Delegates settings resolution to the shared rag_service_factory, then
2106 propagates db_password to the embedding manager for thread-safe DB access.
2107 """
2108 from ..services.rag_service_factory import (
2109 get_rag_service as _get_rag_service,
2110 )
2112 service = _get_rag_service(
2113 username,
2114 collection_id,
2115 use_defaults=use_defaults,
2116 db_password=db_password,
2117 )
2118 # The factory passes db_password to LibraryRAGService, but __init__ stores
2119 # it in the backing field (_db_password) without propagating to sub-managers.
2120 # Re-assign via the property setter to propagate to embedding_manager and
2121 # integrity_manager, which need it for thread-safe session access.
2122 service.db_password = db_password
2123 return service
2126def trigger_auto_index(
2127 document_ids: list[str],
2128 collection_id: str,
2129 username: str,
2130 db_password: str,
2131) -> None:
2132 """
2133 Trigger automatic RAG indexing for documents if auto-indexing is enabled.
2135 This function checks the auto_index_enabled setting and spawns a background
2136 thread to index the specified documents. It does not block the caller.
2138 Args:
2139 document_ids: List of document IDs to index
2140 collection_id: The collection to index into
2141 username: The username for database access
2142 db_password: The user's database password for thread-safe access
2143 """
2144 from ...database.session_context import get_user_db_session
2146 if not document_ids:
2147 logger.debug("No documents to auto-index")
2148 return
2150 # Check if auto-indexing is enabled
2151 try:
2152 with get_user_db_session(username, db_password) as db_session:
2153 settings = SettingsManager(db_session)
2154 auto_index_enabled = settings.get_bool_setting(
2155 "research_library.auto_index_enabled", True
2156 )
2158 if not auto_index_enabled:
2159 logger.debug("Auto-indexing is disabled, skipping")
2160 return
2161 except Exception:
2162 logger.exception(
2163 "Failed to check auto-index setting, skipping auto-index"
2164 )
2165 return
2167 logger.info(
2168 f"Auto-indexing {len(document_ids)} documents in collection {collection_id}"
2169 )
2171 # Submit to thread pool (bounded concurrency, prevents thread proliferation)
2172 executor = _get_auto_index_executor()
2173 executor.submit(
2174 _auto_index_documents_worker,
2175 document_ids,
2176 collection_id,
2177 username,
2178 db_password,
2179 )
2182@thread_cleanup
2183def _auto_index_documents_worker(
2184 document_ids: list[str],
2185 collection_id: str,
2186 username: str,
2187 db_password: str,
2188) -> None:
2189 """
2190 Background worker to index documents automatically.
2192 This is a simpler worker than _background_index_worker - it doesn't track
2193 progress via TaskMetadata since it's meant to be a lightweight auto-indexing
2194 operation.
2195 """
2197 try:
2198 # Create RAG service (thread-safe, no Flask context needed)
2199 with _get_rag_service_for_thread(
2200 collection_id, username, db_password
2201 ) as rag_service:
2202 indexed_count = 0
2203 for doc_id in document_ids:
2204 try:
2205 result = rag_service.index_document(
2206 doc_id, collection_id, force_reindex=False
2207 )
2208 if result.get("status") == "success":
2209 indexed_count += 1
2210 logger.debug(f"Auto-indexed document {doc_id}")
2211 elif result.get("status") == "skipped": 2211 ↛ 2203line 2211 didn't jump to line 2203 because the condition on line 2211 was always true
2212 logger.debug(
2213 f"Document {doc_id} already indexed, skipped"
2214 )
2215 except Exception:
2216 logger.exception(f"Failed to auto-index document {doc_id}")
2218 logger.info(
2219 f"Auto-indexing complete: {indexed_count}/{len(document_ids)} documents indexed"
2220 )
2222 except Exception:
2223 logger.exception("Auto-indexing worker failed")
2226@thread_cleanup
2227def _background_index_worker(
2228 task_id: str,
2229 collection_id: str,
2230 username: str,
2231 db_password: str,
2232 force_reindex: bool,
2233):
2234 """
2235 Background worker thread for indexing documents.
2236 Updates TaskMetadata with progress and checks for cancellation.
2237 """
2238 from ...database.session_context import get_user_db_session
2240 try:
2241 # Create RAG service (thread-safe, no Flask context needed)
2242 with _get_rag_service_for_thread(
2243 collection_id, username, db_password, use_defaults=force_reindex
2244 ) as rag_service:
2245 with get_user_db_session(username, db_password) as db_session:
2246 # Get collection
2247 collection = (
2248 db_session.query(Collection)
2249 .filter_by(id=collection_id)
2250 .first()
2251 )
2253 if not collection:
2254 _update_task_status(
2255 username,
2256 db_password,
2257 task_id,
2258 status="failed",
2259 error_message="Collection not found",
2260 )
2261 return
2263 # Store embedding metadata on first index or force reindex
2264 if collection.embedding_model is None or force_reindex:
2265 collection.embedding_model = rag_service.embedding_model
2266 collection.embedding_model_type = EmbeddingProvider(
2267 rag_service.embedding_provider
2268 )
2269 collection.chunk_size = rag_service.chunk_size
2270 collection.chunk_overlap = rag_service.chunk_overlap
2271 collection.splitter_type = rag_service.splitter_type
2272 collection.text_separators = rag_service.text_separators
2273 collection.distance_metric = rag_service.distance_metric
2274 collection.normalize_vectors = bool(
2275 rag_service.normalize_vectors
2276 )
2277 collection.index_type = rag_service.index_type
2278 db_session.commit()
2280 # Clean up old index data for a fresh rebuild.
2281 # This prevents mixed-model vectors if cancelled midway
2282 # and ensures accurate stats during partial reindex.
2283 if force_reindex:
2284 from ..deletion.utils.cascade_helper import CascadeHelper
2286 collection_name = f"collection_{collection_id}"
2288 # Delete all old document chunks from DB
2289 deleted_chunks = CascadeHelper.delete_collection_chunks(
2290 db_session, collection_name
2291 )
2292 logger.info(
2293 f"Cleared {deleted_chunks} old chunks for collection {collection_id}"
2294 )
2296 # Delete old FAISS index files (.faiss + .pkl) and RAGIndex records
2297 # RagDocumentStatus rows cascade-delete via FK ondelete="CASCADE"
2298 rag_result = (
2299 CascadeHelper.delete_rag_indices_for_collection(
2300 db_session, collection_name
2301 )
2302 )
2303 logger.info(
2304 f"Cleared old RAG indices for collection {collection_id}: {rag_result}"
2305 )
2307 # Mark all documents as unindexed
2308 db_session.query(DocumentCollection).filter_by(
2309 collection_id=collection_id
2310 ).update(
2311 {
2312 DocumentCollection.indexed: False,
2313 DocumentCollection.chunk_count: 0,
2314 }
2315 )
2316 db_session.commit()
2317 logger.info(
2318 f"Reset indexing state for collection {collection_id}"
2319 )
2321 # Get documents to index
2322 query = (
2323 db_session.query(DocumentCollection, Document)
2324 .join(Document)
2325 .filter(DocumentCollection.collection_id == collection_id)
2326 )
2328 if not force_reindex:
2329 query = query.filter(DocumentCollection.indexed == False) # noqa: E712
2331 doc_links = query.all()
2333 if not doc_links:
2334 _update_task_status(
2335 username,
2336 db_password,
2337 task_id,
2338 status="completed",
2339 progress_message="No documents to index",
2340 )
2341 return
2343 total = len(doc_links)
2344 results = {"successful": 0, "skipped": 0, "failed": 0}
2346 # Update task with total count
2347 _update_task_status(
2348 username,
2349 db_password,
2350 task_id,
2351 progress_total=total,
2352 progress_message=f"Indexing {total} documents",
2353 )
2355 for idx, (link, doc) in enumerate(doc_links, 1):
2356 # Check if cancelled
2357 if _is_task_cancelled(username, db_password, task_id):
2358 _update_task_status(
2359 username,
2360 db_password,
2361 task_id,
2362 status="cancelled",
2363 progress_message=f"Cancelled after {idx - 1}/{total} documents",
2364 )
2365 logger.info(f"Indexing task {task_id} was cancelled")
2366 return
2368 filename = doc.filename or doc.title or "Unknown"
2370 # Update progress with filename
2371 _update_task_status(
2372 username,
2373 db_password,
2374 task_id,
2375 progress_current=idx,
2376 progress_message=f"Indexing {idx}/{total}: {filename}",
2377 )
2379 try:
2380 result = rag_service.index_document(
2381 document_id=doc.id,
2382 collection_id=collection_id,
2383 force_reindex=force_reindex,
2384 )
2386 if result.get("status") == "success":
2387 results["successful"] += 1
2388 elif result.get("status") == "skipped":
2389 results["skipped"] += 1
2390 else:
2391 results["failed"] += 1
2393 except Exception:
2394 results["failed"] += 1
2395 logger.exception(
2396 f"Error indexing document {idx}/{total}"
2397 )
2399 db_session.commit()
2401 # Mark as completed
2402 _update_task_status(
2403 username,
2404 db_password,
2405 task_id,
2406 status="completed",
2407 progress_current=total,
2408 progress_message=f"Completed: {results['successful']} indexed, {results['failed']} failed, {results['skipped']} skipped",
2409 )
2410 logger.info(
2411 f"Background indexing task {task_id} completed: {results}"
2412 )
2414 except Exception as e:
2415 logger.exception(f"Background indexing task {task_id} failed")
2416 _update_task_status(
2417 username,
2418 db_password,
2419 task_id,
2420 status="failed",
2421 error_message=str(e),
2422 )
2425def _update_task_status(
2426 username: str,
2427 db_password: str,
2428 task_id: str,
2429 status: str = None,
2430 progress_current: int = None,
2431 progress_total: int = None,
2432 progress_message: str = None,
2433 error_message: str = None,
2434):
2435 """Update task metadata in the database."""
2436 from ...database.session_context import get_user_db_session
2438 try:
2439 with get_user_db_session(username, db_password) as db_session:
2440 task = (
2441 db_session.query(TaskMetadata)
2442 .filter_by(task_id=task_id)
2443 .first()
2444 )
2445 if task:
2446 if status is not None:
2447 task.status = status
2448 if status == "completed":
2449 task.completed_at = datetime.now(UTC)
2450 if progress_current is not None:
2451 task.progress_current = progress_current
2452 if progress_total is not None:
2453 task.progress_total = progress_total
2454 if progress_message is not None:
2455 task.progress_message = progress_message
2456 if error_message is not None:
2457 task.error_message = error_message
2458 db_session.commit()
2459 except Exception:
2460 logger.exception(f"Failed to update task status for {task_id}")
2463def _is_task_cancelled(username: str, db_password: str, task_id: str) -> bool:
2464 """Check if a task has been cancelled."""
2465 from ...database.session_context import get_user_db_session
2467 try:
2468 with get_user_db_session(username, db_password) as db_session:
2469 task = (
2470 db_session.query(TaskMetadata)
2471 .filter_by(task_id=task_id)
2472 .first()
2473 )
2474 return task and task.status == "cancelled"
2475 except Exception:
2476 logger.warning(
2477 "Could not check cancellation status for task {}", task_id
2478 )
2479 return False
2482@rag_bp.route(
2483 "/api/collections/<string:collection_id>/index/start", methods=["POST"]
2484)
2485@login_required
2486def start_background_index(collection_id):
2487 """Start background indexing for a collection."""
2488 from ...database.session_context import get_user_db_session
2489 from ...database.session_passwords import session_password_store
2491 username = session["username"]
2492 session_id = session.get("session_id")
2494 # Get password for thread access
2495 db_password = None
2496 if session_id: 2496 ↛ 2502line 2496 didn't jump to line 2502 because the condition on line 2496 was always true
2497 db_password = session_password_store.get_session_password(
2498 username, session_id
2499 )
2501 # Parse request body
2502 data = request.get_json() or {}
2503 force_reindex = data.get("force_reindex", False)
2505 try:
2506 with get_user_db_session(username, db_password) as db_session:
2507 # Check if there's already an active indexing task for this collection
2508 existing_task = (
2509 db_session.query(TaskMetadata)
2510 .filter(
2511 TaskMetadata.task_type == "indexing",
2512 TaskMetadata.status == "processing",
2513 )
2514 .first()
2515 )
2517 if existing_task:
2518 # Check if it's for this collection
2519 metadata = existing_task.metadata_json or {}
2520 if metadata.get("collection_id") == collection_id:
2521 return jsonify(
2522 {
2523 "success": False,
2524 "error": "Indexing is already in progress for this collection",
2525 "task_id": existing_task.task_id,
2526 }
2527 ), 409
2529 # Create new task
2530 task_id = str(uuid.uuid4())
2531 task = TaskMetadata(
2532 task_id=task_id,
2533 status="processing",
2534 task_type="indexing",
2535 created_at=datetime.now(UTC),
2536 started_at=datetime.now(UTC),
2537 progress_current=0,
2538 progress_total=0,
2539 progress_message="Starting indexing...",
2540 metadata_json={
2541 "collection_id": collection_id,
2542 "force_reindex": force_reindex,
2543 },
2544 )
2545 db_session.add(task)
2546 db_session.commit()
2548 # Start background thread
2549 thread = threading.Thread(
2550 target=_background_index_worker,
2551 args=(task_id, collection_id, username, db_password, force_reindex),
2552 daemon=True,
2553 )
2554 thread.start()
2556 logger.info(
2557 f"Started background indexing task {task_id} for collection {collection_id}"
2558 )
2560 return jsonify(
2561 {
2562 "success": True,
2563 "task_id": task_id,
2564 "message": "Indexing started in background",
2565 }
2566 )
2568 except Exception:
2569 logger.exception("Failed to start background indexing")
2570 return jsonify(
2571 {
2572 "success": False,
2573 "error": "Failed to start indexing. Please try again.",
2574 }
2575 ), 500
2578@rag_bp.route(
2579 "/api/collections/<string:collection_id>/index/status", methods=["GET"]
2580)
2581@limiter.exempt
2582@login_required
2583def get_index_status(collection_id):
2584 """Get the current indexing status for a collection."""
2585 from ...database.session_context import get_user_db_session
2586 from ...database.session_passwords import session_password_store
2588 username = session["username"]
2589 session_id = session.get("session_id")
2591 db_password = None
2592 if session_id: 2592 ↛ 2597line 2592 didn't jump to line 2597 because the condition on line 2592 was always true
2593 db_password = session_password_store.get_session_password(
2594 username, session_id
2595 )
2597 try:
2598 with get_user_db_session(username, db_password) as db_session:
2599 # Find the most recent indexing task for this collection
2600 task = (
2601 db_session.query(TaskMetadata)
2602 .filter(TaskMetadata.task_type == "indexing")
2603 .order_by(TaskMetadata.created_at.desc())
2604 .first()
2605 )
2607 if not task:
2608 return jsonify(
2609 {
2610 "status": "idle",
2611 "message": "No indexing task found",
2612 }
2613 )
2615 # Check if it's for this collection
2616 metadata = task.metadata_json or {}
2617 if metadata.get("collection_id") != collection_id:
2618 return jsonify(
2619 {
2620 "status": "idle",
2621 "message": "No indexing task for this collection",
2622 }
2623 )
2625 return jsonify(
2626 {
2627 "task_id": task.task_id,
2628 "status": task.status,
2629 "progress_current": task.progress_current or 0,
2630 "progress_total": task.progress_total or 0,
2631 "progress_message": task.progress_message,
2632 "error_message": task.error_message,
2633 "created_at": task.created_at.isoformat()
2634 if task.created_at
2635 else None,
2636 "completed_at": task.completed_at.isoformat()
2637 if task.completed_at
2638 else None,
2639 }
2640 )
2642 except Exception:
2643 logger.exception("Failed to get index status")
2644 return jsonify(
2645 {
2646 "status": "error",
2647 "error": "Failed to get indexing status. Please try again.",
2648 }
2649 ), 500
2652@rag_bp.route(
2653 "/api/collections/<string:collection_id>/index/cancel", methods=["POST"]
2654)
2655@login_required
2656def cancel_indexing(collection_id):
2657 """Cancel an active indexing task for a collection."""
2658 from ...database.session_context import get_user_db_session
2659 from ...database.session_passwords import session_password_store
2661 username = session["username"]
2662 session_id = session.get("session_id")
2664 db_password = None
2665 if session_id: 2665 ↛ 2670line 2665 didn't jump to line 2670 because the condition on line 2665 was always true
2666 db_password = session_password_store.get_session_password(
2667 username, session_id
2668 )
2670 try:
2671 with get_user_db_session(username, db_password) as db_session:
2672 # Find active indexing task for this collection
2673 task = (
2674 db_session.query(TaskMetadata)
2675 .filter(
2676 TaskMetadata.task_type == "indexing",
2677 TaskMetadata.status == "processing",
2678 )
2679 .first()
2680 )
2682 if not task:
2683 return jsonify(
2684 {
2685 "success": False,
2686 "error": "No active indexing task found",
2687 }
2688 ), 404
2690 # Check if it's for this collection
2691 metadata = task.metadata_json or {}
2692 if metadata.get("collection_id") != collection_id:
2693 return jsonify(
2694 {
2695 "success": False,
2696 "error": "No active indexing task for this collection",
2697 }
2698 ), 404
2700 # Mark as cancelled - the worker thread will check this
2701 task.status = "cancelled"
2702 task.progress_message = "Cancellation requested..."
2703 db_session.commit()
2705 logger.info(
2706 f"Cancelled indexing task {task.task_id} for collection {collection_id}"
2707 )
2709 return jsonify(
2710 {
2711 "success": True,
2712 "message": "Cancellation requested",
2713 "task_id": task.task_id,
2714 }
2715 )
2717 except Exception:
2718 logger.exception("Failed to cancel indexing")
2719 return jsonify(
2720 {
2721 "success": False,
2722 "error": "Failed to cancel indexing. Please try again.",
2723 }
2724 ), 500
2727# Research History Semantic Search Routes have been moved to
2728# research_library.search.routes.search_routes