Coverage for src / local_deep_research / research_library / routes / rag_routes.py: 15%
1017 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2RAG Management API Routes
4Provides endpoints for managing RAG indexing of library documents:
5- Configure embedding models
6- Index documents
7- Get RAG statistics
8- Bulk operations with progress tracking
9"""
11from flask import (
12 Blueprint,
13 jsonify,
14 request,
15 Response,
16 render_template,
17 session,
18 stream_with_context,
19)
20from loguru import logger
21import glob
22import json
23import uuid
24import time
25import threading
26import queue
27from datetime import datetime, UTC
28from pathlib import Path
29from typing import Optional
31from ...web.auth.decorators import login_required
32from ...utilities.db_utils import get_settings_manager
33from ..services.library_rag_service import LibraryRAGService
34from ...security.path_validator import PathValidator
35from ..utils import handle_api_error
36from ...database.models.library import (
37 Document,
38 Collection,
39 DocumentCollection,
40 RAGIndex,
41 SourceType,
42 EmbeddingProvider,
43)
44from ...database.models.queue import TaskMetadata
45from ...web.utils.rate_limiter import limiter
46from ...config.paths import get_library_directory
48rag_bp = Blueprint("rag", __name__, url_prefix="/library")
51def get_rag_service(collection_id: Optional[str] = None) -> LibraryRAGService:
52 """
53 Get RAG service instance with appropriate settings.
55 If collection_id is provided:
56 - Uses collection's stored settings if they exist
57 - Uses current defaults for new collections (and stores them)
59 If no collection_id:
60 - Uses current default settings
61 """
62 from ...database.session_context import get_user_db_session
64 settings = get_settings_manager()
65 username = session["username"]
67 # Get current default settings
68 default_embedding_model = settings.get_setting(
69 "local_search_embedding_model", "all-MiniLM-L6-v2"
70 )
71 default_embedding_provider = settings.get_setting(
72 "local_search_embedding_provider", "sentence_transformers"
73 )
74 default_chunk_size = int(
75 settings.get_setting("local_search_chunk_size", 1000)
76 )
77 default_chunk_overlap = int(
78 settings.get_setting("local_search_chunk_overlap", 200)
79 )
81 # Get new advanced configuration settings (Issue #1054)
82 import json
84 default_splitter_type = settings.get_setting(
85 "local_search_splitter_type", "recursive"
86 )
87 default_text_separators = settings.get_setting(
88 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]'
89 )
90 # Parse JSON string to list
91 if isinstance(default_text_separators, str):
92 try:
93 default_text_separators = json.loads(default_text_separators)
94 except json.JSONDecodeError:
95 logger.warning(
96 f"Invalid JSON for local_search_text_separators setting: {default_text_separators!r}. "
97 "Using default separators."
98 )
99 default_text_separators = ["\n\n", "\n", ". ", " ", ""]
100 default_distance_metric = settings.get_setting(
101 "local_search_distance_metric", "cosine"
102 )
103 # Ensure normalize_vectors is a proper boolean (settings may return string)
104 raw_normalize = settings.get_setting("local_search_normalize_vectors", True)
105 if isinstance(raw_normalize, str):
106 default_normalize_vectors = raw_normalize.lower() in (
107 "true",
108 "1",
109 "yes",
110 )
111 else:
112 default_normalize_vectors = bool(raw_normalize)
113 default_index_type = settings.get_setting("local_search_index_type", "flat")
115 # If collection_id provided, check for stored settings
116 if collection_id:
117 with get_user_db_session(username) as db_session:
118 collection = (
119 db_session.query(Collection).filter_by(id=collection_id).first()
120 )
122 if collection and collection.embedding_model:
123 # Use collection's stored settings
124 logger.info(
125 f"Using stored settings for collection {collection_id}: "
126 f"{collection.embedding_model_type.value}/{collection.embedding_model}"
127 )
128 # Handle normalize_vectors - may be stored as string in some cases
129 coll_normalize = collection.normalize_vectors
130 if coll_normalize is not None:
131 if isinstance(coll_normalize, str):
132 coll_normalize = coll_normalize.lower() in (
133 "true",
134 "1",
135 "yes",
136 )
137 else:
138 coll_normalize = bool(coll_normalize)
139 else:
140 coll_normalize = default_normalize_vectors
142 return LibraryRAGService(
143 username=username,
144 embedding_model=collection.embedding_model,
145 embedding_provider=collection.embedding_model_type.value,
146 chunk_size=collection.chunk_size or default_chunk_size,
147 chunk_overlap=collection.chunk_overlap
148 or default_chunk_overlap,
149 splitter_type=collection.splitter_type
150 or default_splitter_type,
151 text_separators=collection.text_separators
152 or default_text_separators,
153 distance_metric=collection.distance_metric
154 or default_distance_metric,
155 normalize_vectors=coll_normalize,
156 index_type=collection.index_type or default_index_type,
157 )
158 elif collection:
159 # New collection - use defaults and store them
160 logger.info(
161 f"New collection {collection_id}, using and storing default settings"
162 )
164 # Create service with defaults
165 service = LibraryRAGService(
166 username=username,
167 embedding_model=default_embedding_model,
168 embedding_provider=default_embedding_provider,
169 chunk_size=default_chunk_size,
170 chunk_overlap=default_chunk_overlap,
171 splitter_type=default_splitter_type,
172 text_separators=default_text_separators,
173 distance_metric=default_distance_metric,
174 normalize_vectors=default_normalize_vectors,
175 index_type=default_index_type,
176 )
178 # Store settings on collection (will be done during indexing)
179 # Note: We don't store here because we don't have embedding_dimension yet
180 # It will be stored in index_collection when first document is indexed
182 return service
184 # No collection or fallback - use current defaults
185 return LibraryRAGService(
186 username=username,
187 embedding_model=default_embedding_model,
188 embedding_provider=default_embedding_provider,
189 chunk_size=default_chunk_size,
190 chunk_overlap=default_chunk_overlap,
191 splitter_type=default_splitter_type,
192 text_separators=default_text_separators,
193 distance_metric=default_distance_metric,
194 normalize_vectors=default_normalize_vectors,
195 index_type=default_index_type,
196 )
199# Page Routes
202@rag_bp.route("/embedding-settings")
203@login_required
204def embedding_settings_page():
205 """Render the Embedding Settings page."""
206 return render_template(
207 "pages/embedding_settings.html", active_page="embedding-settings"
208 )
211@rag_bp.route("/document/<string:document_id>/chunks")
212@login_required
213def view_document_chunks(document_id):
214 """View all chunks for a document across all collections."""
215 from ...database.session_context import get_user_db_session
216 from ...database.models.library import DocumentChunk
218 username = session.get("username")
220 with get_user_db_session(username) as db_session:
221 # Get document info
222 document = db_session.query(Document).filter_by(id=document_id).first()
224 if not document:
225 return "Document not found", 404
227 # Get all chunks for this document
228 chunks = (
229 db_session.query(DocumentChunk)
230 .filter(DocumentChunk.source_id == document_id)
231 .order_by(DocumentChunk.collection_name, DocumentChunk.chunk_index)
232 .all()
233 )
235 # Group chunks by collection
236 chunks_by_collection = {}
237 for chunk in chunks:
238 coll_name = chunk.collection_name
239 if coll_name not in chunks_by_collection:
240 # Get collection display name
241 collection_id = coll_name.replace("collection_", "")
242 collection = (
243 db_session.query(Collection)
244 .filter_by(id=collection_id)
245 .first()
246 )
247 chunks_by_collection[coll_name] = {
248 "name": collection.name if collection else coll_name,
249 "id": collection_id,
250 "chunks": [],
251 }
253 chunks_by_collection[coll_name]["chunks"].append(
254 {
255 "id": chunk.id,
256 "index": chunk.chunk_index,
257 "text": chunk.chunk_text,
258 "word_count": chunk.word_count,
259 "start_char": chunk.start_char,
260 "end_char": chunk.end_char,
261 "embedding_model": chunk.embedding_model,
262 "embedding_model_type": chunk.embedding_model_type.value
263 if chunk.embedding_model_type
264 else None,
265 "embedding_dimension": chunk.embedding_dimension,
266 "created_at": chunk.created_at,
267 }
268 )
270 return render_template(
271 "pages/document_chunks.html",
272 document=document,
273 chunks_by_collection=chunks_by_collection,
274 total_chunks=len(chunks),
275 )
278@rag_bp.route("/collections")
279@login_required
280def collections_page():
281 """Render the Collections page."""
282 return render_template("pages/collections.html", active_page="collections")
285@rag_bp.route("/collections/<string:collection_id>")
286@login_required
287def collection_details_page(collection_id):
288 """Render the Collection Details page."""
289 return render_template(
290 "pages/collection_details.html",
291 active_page="collections",
292 collection_id=collection_id,
293 )
296@rag_bp.route("/collections/<string:collection_id>/upload")
297@login_required
298def collection_upload_page(collection_id):
299 """Render the Collection Upload page."""
300 # Get the upload PDF storage setting
301 settings = get_settings_manager()
302 upload_pdf_storage = settings.get_setting(
303 "research_library.upload_pdf_storage", "none"
304 )
305 # Only allow valid values for uploads (no filesystem)
306 if upload_pdf_storage not in ("database", "none"):
307 upload_pdf_storage = "none"
309 return render_template(
310 "pages/collection_upload.html",
311 active_page="collections",
312 collection_id=collection_id,
313 collection_name=None, # Could fetch from DB if needed
314 upload_pdf_storage=upload_pdf_storage,
315 )
318@rag_bp.route("/collections/create")
319@login_required
320def collection_create_page():
321 """Render the Create Collection page."""
322 return render_template(
323 "pages/collection_create.html", active_page="collections"
324 )
327# API Routes
330@rag_bp.route("/api/rag/settings", methods=["GET"])
331@login_required
332def get_current_settings():
333 """Get current RAG configuration from settings."""
334 import json as json_lib
336 try:
337 settings = get_settings_manager()
339 # Get text separators and parse if needed
340 text_separators = settings.get_setting(
341 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]'
342 )
343 if isinstance(text_separators, str):
344 try:
345 text_separators = json_lib.loads(text_separators)
346 except json_lib.JSONDecodeError:
347 logger.warning(
348 f"Invalid JSON for local_search_text_separators setting: {text_separators!r}. "
349 "Using default separators."
350 )
351 text_separators = ["\n\n", "\n", ". ", " ", ""]
353 normalize_vectors = settings.get_setting(
354 "local_search_normalize_vectors", True
355 )
357 return jsonify(
358 {
359 "success": True,
360 "settings": {
361 "embedding_provider": settings.get_setting(
362 "local_search_embedding_provider",
363 "sentence_transformers",
364 ),
365 "embedding_model": settings.get_setting(
366 "local_search_embedding_model", "all-MiniLM-L6-v2"
367 ),
368 "chunk_size": settings.get_setting(
369 "local_search_chunk_size", 1000
370 ),
371 "chunk_overlap": settings.get_setting(
372 "local_search_chunk_overlap", 200
373 ),
374 "splitter_type": settings.get_setting(
375 "local_search_splitter_type", "recursive"
376 ),
377 "text_separators": text_separators,
378 "distance_metric": settings.get_setting(
379 "local_search_distance_metric", "cosine"
380 ),
381 "normalize_vectors": normalize_vectors,
382 "index_type": settings.get_setting(
383 "local_search_index_type", "flat"
384 ),
385 },
386 }
387 )
388 except Exception as e:
389 return handle_api_error("getting RAG settings", e)
392@rag_bp.route("/api/rag/test-embedding", methods=["POST"])
393@login_required
394def test_embedding():
395 """Test an embedding configuration by generating a test embedding."""
397 try:
398 data = request.json
399 provider = data.get("provider")
400 model = data.get("model")
401 test_text = data.get("test_text", "This is a test.")
403 if not provider or not model:
404 return jsonify(
405 {"success": False, "error": "Provider and model are required"}
406 ), 400
408 # Import embedding functions
409 from ...embeddings.embeddings_config import (
410 get_embedding_function,
411 )
413 logger.info(
414 f"Testing embedding with provider={provider}, model={model}"
415 )
417 # Get embedding function with the specified configuration
418 start_time = time.time()
419 embedding_func = get_embedding_function(
420 provider=provider,
421 model_name=model,
422 )
424 # Generate test embedding
425 embedding = embedding_func([test_text])[0]
426 response_time_ms = int((time.time() - start_time) * 1000)
428 # Get embedding dimension
429 dimension = len(embedding) if hasattr(embedding, "__len__") else None
431 return jsonify(
432 {
433 "success": True,
434 "dimension": dimension,
435 "response_time_ms": response_time_ms,
436 "provider": provider,
437 "model": model,
438 }
439 )
441 except Exception as e:
442 return handle_api_error("testing embedding", e)
445@rag_bp.route("/api/rag/models", methods=["GET"])
446@login_required
447def get_available_models():
448 """Get list of available embedding providers and models."""
449 try:
450 from ...embeddings.embeddings_config import _get_provider_classes
452 # Get current settings for providers
453 settings = get_settings_manager()
454 settings_snapshot = (
455 settings.get_all_settings()
456 if hasattr(settings, "get_all_settings")
457 else {}
458 )
460 # Get provider classes
461 provider_classes = _get_provider_classes()
463 # Provider display names
464 provider_labels = {
465 "sentence_transformers": "Sentence Transformers (Local)",
466 "ollama": "Ollama (Local)",
467 "openai": "OpenAI API",
468 }
470 # Get provider options and models by looping through providers
471 provider_options = []
472 providers = {}
474 for provider_key, provider_class in provider_classes.items():
475 if provider_class.is_available(settings_snapshot):
476 # Add provider option
477 provider_options.append(
478 {
479 "value": provider_key,
480 "label": provider_labels.get(
481 provider_key, provider_key
482 ),
483 }
484 )
486 # Get models for this provider
487 models = provider_class.get_available_models(settings_snapshot)
488 providers[provider_key] = [
489 {
490 "value": m["value"],
491 "label": m["label"],
492 "provider": provider_key,
493 }
494 for m in models
495 ]
497 return jsonify(
498 {
499 "success": True,
500 "provider_options": provider_options,
501 "providers": providers,
502 }
503 )
504 except Exception as e:
505 return handle_api_error("getting available models", e)
508@rag_bp.route("/api/rag/info", methods=["GET"])
509@login_required
510def get_index_info():
511 """Get information about the current RAG index."""
512 from ...database.library_init import get_default_library_id
514 try:
515 # Get collection_id from request or use default Library collection
516 collection_id = request.args.get("collection_id")
517 if not collection_id:
518 collection_id = get_default_library_id(session["username"])
520 logger.info(
521 f"Getting RAG index info for collection_id: {collection_id}"
522 )
524 rag_service = get_rag_service(collection_id)
525 info = rag_service.get_current_index_info(collection_id)
527 if info is None:
528 logger.info(
529 f"No RAG index found for collection_id: {collection_id}"
530 )
531 return jsonify(
532 {"success": True, "info": None, "message": "No index found"}
533 )
535 logger.info(f"Found RAG index for collection_id: {collection_id}")
536 return jsonify({"success": True, "info": info})
537 except Exception as e:
538 return handle_api_error("getting index info", e)
541@rag_bp.route("/api/rag/stats", methods=["GET"])
542@login_required
543def get_rag_stats():
544 """Get RAG statistics for a collection."""
545 from ...database.library_init import get_default_library_id
547 try:
548 # Get collection_id from request or use default Library collection
549 collection_id = request.args.get("collection_id")
550 if not collection_id:
551 collection_id = get_default_library_id(session["username"])
553 rag_service = get_rag_service(collection_id)
554 stats = rag_service.get_rag_stats(collection_id)
556 return jsonify({"success": True, "stats": stats})
557 except Exception as e:
558 return handle_api_error("getting RAG stats", e)
561@rag_bp.route("/api/rag/index-document", methods=["POST"])
562@login_required
563def index_document():
564 """Index a single document in a collection."""
565 from ...database.library_init import get_default_library_id
567 try:
568 data = request.get_json()
569 text_doc_id = data.get("text_doc_id")
570 force_reindex = data.get("force_reindex", False)
571 collection_id = data.get("collection_id")
573 if not text_doc_id:
574 return jsonify(
575 {"success": False, "error": "text_doc_id is required"}
576 ), 400
578 # Get collection_id from request or use default Library collection
579 if not collection_id:
580 collection_id = get_default_library_id(session["username"])
582 rag_service = get_rag_service(collection_id)
583 result = rag_service.index_document(
584 text_doc_id, collection_id, force_reindex
585 )
587 if result["status"] == "error":
588 return jsonify(
589 {"success": False, "error": result.get("error")}
590 ), 400
592 return jsonify({"success": True, "result": result})
593 except Exception as e:
594 return handle_api_error(f"indexing document {text_doc_id}", e)
597@rag_bp.route("/api/rag/remove-document", methods=["POST"])
598@login_required
599def remove_document():
600 """Remove a document from RAG in a collection."""
601 from ...database.library_init import get_default_library_id
603 try:
604 data = request.get_json()
605 text_doc_id = data.get("text_doc_id")
606 collection_id = data.get("collection_id")
608 if not text_doc_id:
609 return jsonify(
610 {"success": False, "error": "text_doc_id is required"}
611 ), 400
613 # Get collection_id from request or use default Library collection
614 if not collection_id:
615 collection_id = get_default_library_id(session["username"])
617 rag_service = get_rag_service(collection_id)
618 result = rag_service.remove_document_from_rag(
619 text_doc_id, collection_id
620 )
622 if result["status"] == "error":
623 return jsonify(
624 {"success": False, "error": result.get("error")}
625 ), 400
627 return jsonify({"success": True, "result": result})
628 except Exception as e:
629 return handle_api_error(f"removing document {text_doc_id}", e)
632@rag_bp.route("/api/rag/index-research", methods=["POST"])
633@login_required
634def index_research():
635 """Index all documents from a research."""
636 try:
637 data = request.get_json()
638 research_id = data.get("research_id")
639 force_reindex = data.get("force_reindex", False)
641 if not research_id:
642 return jsonify(
643 {"success": False, "error": "research_id is required"}
644 ), 400
646 rag_service = get_rag_service()
647 results = rag_service.index_research_documents(
648 research_id, force_reindex
649 )
651 return jsonify({"success": True, "results": results})
652 except Exception as e:
653 return handle_api_error(f"indexing research {research_id}", e)
656@rag_bp.route("/api/rag/index-all", methods=["GET"])
657@login_required
658def index_all():
659 """Index all documents in a collection with Server-Sent Events progress."""
660 from ...database.session_context import get_user_db_session
661 from ...database.library_init import get_default_library_id
663 force_reindex = request.args.get("force_reindex", "false").lower() == "true"
664 username = session["username"]
666 # Get collection_id from request or use default Library collection
667 collection_id = request.args.get("collection_id")
668 if not collection_id:
669 collection_id = get_default_library_id(username)
671 logger.info(
672 f"Starting index-all for collection_id: {collection_id}, force_reindex: {force_reindex}"
673 )
675 # Create RAG service in request context before generator runs
676 rag_service = get_rag_service(collection_id)
678 def generate():
679 """Generator function for SSE progress updates."""
680 try:
681 # Send initial status
682 yield f"data: {json.dumps({'type': 'start', 'message': 'Starting bulk indexing...'})}\n\n"
684 # Get document IDs to index from DocumentCollection
685 with get_user_db_session(username) as db_session:
686 # Query Document joined with DocumentCollection for the collection
687 query = (
688 db_session.query(Document.id, Document.title)
689 .join(
690 DocumentCollection,
691 Document.id == DocumentCollection.document_id,
692 )
693 .filter(DocumentCollection.collection_id == collection_id)
694 )
696 if not force_reindex:
697 # Only index documents that haven't been indexed yet
698 query = query.filter(DocumentCollection.indexed.is_(False))
700 doc_info = [(doc_id, title) for doc_id, title in query.all()]
702 if not doc_info:
703 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n"
704 return
706 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []}
707 total = len(doc_info)
709 # Process documents in batches to optimize performance
710 # Get batch size from settings
711 settings = get_settings_manager()
712 batch_size = int(
713 settings.get_setting("rag.indexing_batch_size", 15)
714 )
715 processed = 0
717 for i in range(0, len(doc_info), batch_size):
718 batch = doc_info[i : i + batch_size]
720 # Process batch with collection_id
721 batch_results = rag_service.index_documents_batch(
722 batch, collection_id, force_reindex
723 )
725 # Process results and send progress updates
726 for j, (doc_id, title) in enumerate(batch):
727 processed += 1
728 result = batch_results[doc_id]
730 # Send progress update
731 yield f"data: {json.dumps({'type': 'progress', 'current': processed, 'total': total, 'title': title, 'percent': int((processed / total) * 100)})}\n\n"
733 if result["status"] == "success":
734 results["successful"] += 1
735 elif result["status"] == "skipped":
736 results["skipped"] += 1
737 else:
738 results["failed"] += 1
739 results["errors"].append(
740 {
741 "doc_id": doc_id,
742 "title": title,
743 "error": result.get("error"),
744 }
745 )
747 # Send completion status
748 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n"
750 # Log final status for debugging
751 logger.info(
752 f"Bulk indexing complete: {results['successful']} successful, {results['skipped']} skipped, {results['failed']} failed"
753 )
755 except Exception:
756 logger.exception("Error in bulk indexing")
757 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n"
759 return Response(
760 stream_with_context(generate()), mimetype="text/event-stream"
761 )
764@rag_bp.route("/api/rag/configure", methods=["POST"])
765@login_required
766def configure_rag():
767 """
768 Change RAG configuration (embedding model, chunk size, etc.).
769 This will create a new index with the new configuration.
770 """
771 import json as json_lib
773 try:
774 data = request.get_json()
775 embedding_model = data.get("embedding_model")
776 embedding_provider = data.get("embedding_provider")
777 chunk_size = data.get("chunk_size")
778 chunk_overlap = data.get("chunk_overlap")
779 collection_id = data.get("collection_id")
781 # Get new advanced settings (with defaults)
782 splitter_type = data.get("splitter_type", "recursive")
783 text_separators = data.get(
784 "text_separators", ["\n\n", "\n", ". ", " ", ""]
785 )
786 distance_metric = data.get("distance_metric", "cosine")
787 normalize_vectors = data.get("normalize_vectors", True)
788 index_type = data.get("index_type", "flat")
790 if not all(
791 [
792 embedding_model,
793 embedding_provider,
794 chunk_size,
795 chunk_overlap,
796 ]
797 ):
798 return jsonify(
799 {
800 "success": False,
801 "error": "All configuration parameters are required (embedding_model, embedding_provider, chunk_size, chunk_overlap)",
802 }
803 ), 400
805 # Save settings to database
806 settings = get_settings_manager()
807 settings.set_setting("local_search_embedding_model", embedding_model)
808 settings.set_setting(
809 "local_search_embedding_provider", embedding_provider
810 )
811 settings.set_setting("local_search_chunk_size", int(chunk_size))
812 settings.set_setting("local_search_chunk_overlap", int(chunk_overlap))
814 # Save new advanced settings
815 settings.set_setting("local_search_splitter_type", splitter_type)
816 # Convert list to JSON string for storage
817 if isinstance(text_separators, list):
818 text_separators_str = json_lib.dumps(text_separators)
819 else:
820 text_separators_str = text_separators
821 settings.set_setting(
822 "local_search_text_separators", text_separators_str
823 )
824 settings.set_setting("local_search_distance_metric", distance_metric)
825 settings.set_setting(
826 "local_search_normalize_vectors", bool(normalize_vectors)
827 )
828 settings.set_setting("local_search_index_type", index_type)
830 # If collection_id is provided, update that collection's configuration
831 if collection_id:
832 # Create new RAG service with new configuration
833 new_rag_service = LibraryRAGService(
834 username=session["username"],
835 embedding_model=embedding_model,
836 embedding_provider=embedding_provider,
837 chunk_size=int(chunk_size),
838 chunk_overlap=int(chunk_overlap),
839 splitter_type=splitter_type,
840 text_separators=text_separators
841 if isinstance(text_separators, list)
842 else json_lib.loads(text_separators),
843 distance_metric=distance_metric,
844 normalize_vectors=normalize_vectors,
845 index_type=index_type,
846 )
848 # Get or create new index with this configuration
849 rag_index = new_rag_service._get_or_create_rag_index(collection_id)
851 return jsonify(
852 {
853 "success": True,
854 "message": "Configuration updated for collection. You can now index documents with the new settings.",
855 "index_hash": rag_index.index_hash,
856 }
857 )
858 else:
859 # Just saving default settings without updating a specific collection
860 return jsonify(
861 {
862 "success": True,
863 "message": "Default embedding settings saved successfully. New collections will use these settings.",
864 }
865 )
867 except Exception as e:
868 return handle_api_error("configuring RAG", e)
871@rag_bp.route("/api/rag/documents", methods=["GET"])
872@login_required
873def get_documents():
874 """Get library documents with their RAG status for the default Library collection (paginated)."""
875 from ...database.session_context import get_user_db_session
876 from ...database.library_init import get_default_library_id
878 try:
879 # Get pagination parameters
880 page = request.args.get("page", 1, type=int)
881 per_page = request.args.get("per_page", 50, type=int)
882 filter_type = request.args.get(
883 "filter", "all"
884 ) # all, indexed, unindexed
886 # Validate pagination parameters
887 page = max(1, page)
888 per_page = min(max(10, per_page), 100) # Limit between 10-100
890 # Close current thread's session to force fresh connection
891 from ...database.thread_local_session import cleanup_current_thread
893 cleanup_current_thread()
895 username = session["username"]
897 # Get collection_id from request or use default Library collection
898 collection_id = request.args.get("collection_id")
899 if not collection_id:
900 collection_id = get_default_library_id(username)
902 logger.info(
903 f"Getting documents for collection_id: {collection_id}, filter: {filter_type}, page: {page}"
904 )
906 with get_user_db_session(username) as db_session:
907 # Expire all cached objects to ensure we get fresh data from DB
908 db_session.expire_all()
910 # Import RagDocumentStatus model
911 from ...database.models.library import RagDocumentStatus
913 # Build base query - join Document with DocumentCollection for the collection
914 # LEFT JOIN with rag_document_status to check indexed status
915 query = (
916 db_session.query(
917 Document, DocumentCollection, RagDocumentStatus
918 )
919 .join(
920 DocumentCollection,
921 (DocumentCollection.document_id == Document.id)
922 & (DocumentCollection.collection_id == collection_id),
923 )
924 .outerjoin(
925 RagDocumentStatus,
926 (RagDocumentStatus.document_id == Document.id)
927 & (RagDocumentStatus.collection_id == collection_id),
928 )
929 )
931 logger.debug(f"Base query for collection {collection_id}: {query}")
933 # Apply filters based on rag_document_status existence
934 if filter_type == "indexed":
935 query = query.filter(RagDocumentStatus.document_id.isnot(None))
936 elif filter_type == "unindexed":
937 # Documents in collection but not indexed yet
938 query = query.filter(RagDocumentStatus.document_id.is_(None))
940 # Get total count before pagination
941 total_count = query.count()
942 logger.info(
943 f"Found {total_count} total documents for collection {collection_id} with filter {filter_type}"
944 )
946 # Apply pagination
947 results = (
948 query.order_by(Document.created_at.desc())
949 .limit(per_page)
950 .offset((page - 1) * per_page)
951 .all()
952 )
954 documents = [
955 {
956 "id": doc.id,
957 "title": doc.title,
958 "original_url": doc.original_url,
959 "rag_indexed": rag_status is not None,
960 "chunk_count": rag_status.chunk_count if rag_status else 0,
961 "created_at": doc.created_at.isoformat()
962 if doc.created_at
963 else None,
964 }
965 for doc, doc_collection, rag_status in results
966 ]
968 # Debug logging to help diagnose indexing status issues
969 indexed_count = sum(1 for d in documents if d["rag_indexed"])
971 # Additional debug: check rag_document_status for this collection
972 all_indexed_statuses = (
973 db_session.query(RagDocumentStatus)
974 .filter_by(collection_id=collection_id)
975 .all()
976 )
977 logger.info(
978 f"rag_document_status table shows: {len(all_indexed_statuses)} documents indexed for collection {collection_id}"
979 )
981 logger.info(
982 f"Returning {len(documents)} documents on page {page}: "
983 f"{indexed_count} indexed, {len(documents) - indexed_count} not indexed"
984 )
986 return jsonify(
987 {
988 "success": True,
989 "documents": documents,
990 "pagination": {
991 "page": page,
992 "per_page": per_page,
993 "total": total_count,
994 "pages": (total_count + per_page - 1) // per_page,
995 },
996 }
997 )
998 except Exception as e:
999 return handle_api_error("getting documents", e)
1002@rag_bp.route("/api/rag/index-local", methods=["GET"])
1003@login_required
1004def index_local_library():
1005 """Index documents from a local folder with Server-Sent Events progress."""
1006 folder_path = request.args.get("path")
1007 file_patterns = request.args.get(
1008 "patterns", "*.pdf,*.txt,*.md,*.html"
1009 ).split(",")
1010 recursive = request.args.get("recursive", "true").lower() == "true"
1012 if not folder_path:
1013 return jsonify({"success": False, "error": "Path is required"}), 400
1015 # Validate and sanitize the path to prevent traversal attacks
1016 try:
1017 validated_path = PathValidator.validate_local_filesystem_path(
1018 folder_path
1019 )
1020 # Re-sanitize for static analyzer recognition (CodeQL)
1021 path = PathValidator.sanitize_for_filesystem_ops(validated_path)
1022 except ValueError as e:
1023 logger.warning(f"Path validation failed for '{folder_path}': {e}")
1024 return jsonify({"success": False, "error": "Invalid path"}), 400
1026 # Check path exists and is a directory
1027 if not path.exists():
1028 return jsonify({"success": False, "error": "Path does not exist"}), 400
1029 if not path.is_dir():
1030 return jsonify(
1031 {"success": False, "error": "Path is not a directory"}
1032 ), 400
1034 # Create RAG service in request context
1035 rag_service = get_rag_service()
1037 def generate():
1038 """Generator function for SSE progress updates."""
1039 try:
1040 # Send initial status
1041 yield f"data: {json.dumps({'type': 'start', 'message': f'Scanning folder: {path}'})}\n\n"
1043 # Find all matching files
1044 files_to_index = []
1045 for pattern in file_patterns:
1046 pattern = pattern.strip()
1047 if recursive:
1048 search_pattern = str(path / "**" / pattern)
1049 else:
1050 search_pattern = str(path / pattern)
1052 matching_files = glob.glob(search_pattern, recursive=recursive)
1053 files_to_index.extend(matching_files)
1055 # Remove duplicates and sort
1056 files_to_index = sorted(set(files_to_index))
1058 if not files_to_index:
1059 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No matching files found'}})}\n\n"
1060 return
1062 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []}
1063 total = len(files_to_index)
1065 # Index each file
1066 for idx, file_path in enumerate(files_to_index, 1):
1067 file_name = Path(file_path).name
1069 # Send progress update
1070 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': file_name, 'percent': int((idx / total) * 100)})}\n\n"
1072 try:
1073 # Index the file directly using RAG service
1074 result = rag_service.index_local_file(file_path)
1076 if result.get("status") == "success":
1077 results["successful"] += 1
1078 elif result.get("status") == "skipped":
1079 results["skipped"] += 1
1080 else:
1081 results["failed"] += 1
1082 results["errors"].append(
1083 {
1084 "file": file_name,
1085 "error": result.get("error", "Unknown error"),
1086 }
1087 )
1088 except Exception:
1089 results["failed"] += 1
1090 results["errors"].append(
1091 {"file": file_name, "error": "Failed to index file"}
1092 )
1093 logger.exception(f"Error indexing file {file_path}")
1095 # Send completion status
1096 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n"
1098 logger.info(
1099 f"Local library indexing complete for {path}: "
1100 f"{results['successful']} successful, "
1101 f"{results['skipped']} skipped, "
1102 f"{results['failed']} failed"
1103 )
1105 except Exception:
1106 logger.exception("Error in local library indexing")
1107 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n"
1109 return Response(
1110 stream_with_context(generate()), mimetype="text/event-stream"
1111 )
1114# Collection Management Routes
1117@rag_bp.route("/api/collections", methods=["GET"])
1118@login_required
1119def get_collections():
1120 """Get all document collections for the current user."""
1121 from ...database.session_context import get_user_db_session
1123 try:
1124 username = session["username"]
1125 with get_user_db_session(username) as db_session:
1126 # No need to filter by username - each user has their own database
1127 collections = db_session.query(Collection).all()
1129 result = []
1130 for coll in collections:
1131 collection_data = {
1132 "id": coll.id,
1133 "name": coll.name,
1134 "description": coll.description,
1135 "created_at": coll.created_at.isoformat()
1136 if coll.created_at
1137 else None,
1138 "collection_type": coll.collection_type,
1139 "is_default": coll.is_default
1140 if hasattr(coll, "is_default")
1141 else False,
1142 "document_count": len(coll.document_links)
1143 if hasattr(coll, "document_links")
1144 else 0,
1145 "folder_count": len(coll.linked_folders)
1146 if hasattr(coll, "linked_folders")
1147 else 0,
1148 }
1150 # Include embedding metadata if available
1151 if coll.embedding_model: 1151 ↛ 1152line 1151 didn't jump to line 1152 because the condition on line 1151 was never true
1152 collection_data["embedding"] = {
1153 "model": coll.embedding_model,
1154 "provider": coll.embedding_model_type.value
1155 if coll.embedding_model_type
1156 else None,
1157 "dimension": coll.embedding_dimension,
1158 "chunk_size": coll.chunk_size,
1159 "chunk_overlap": coll.chunk_overlap,
1160 }
1161 else:
1162 collection_data["embedding"] = None
1164 result.append(collection_data)
1166 return jsonify({"success": True, "collections": result})
1167 except Exception as e:
1168 return handle_api_error("getting collections", e)
1171@rag_bp.route("/api/collections", methods=["POST"])
1172@login_required
1173def create_collection():
1174 """Create a new document collection."""
1175 from ...database.session_context import get_user_db_session
1177 try:
1178 data = request.get_json()
1179 name = data.get("name", "").strip()
1180 description = data.get("description", "").strip()
1181 collection_type = data.get("type", "user_uploads")
1183 if not name:
1184 return jsonify({"success": False, "error": "Name is required"}), 400
1186 username = session["username"]
1187 with get_user_db_session(username) as db_session:
1188 # Check if collection with this name already exists in this user's database
1189 existing = db_session.query(Collection).filter_by(name=name).first()
1191 if existing:
1192 return jsonify(
1193 {
1194 "success": False,
1195 "error": f"Collection '{name}' already exists",
1196 }
1197 ), 400
1199 # Create new collection (no username needed - each user has their own DB)
1200 # Note: created_at uses default=utcnow() in the model, so we don't need to set it manually
1201 collection = Collection(
1202 id=str(uuid.uuid4()), # Generate UUID for collection
1203 name=name,
1204 description=description,
1205 collection_type=collection_type,
1206 )
1208 db_session.add(collection)
1209 db_session.commit()
1211 return jsonify(
1212 {
1213 "success": True,
1214 "collection": {
1215 "id": collection.id,
1216 "name": collection.name,
1217 "description": collection.description,
1218 "created_at": collection.created_at.isoformat(),
1219 "collection_type": collection.collection_type,
1220 },
1221 }
1222 )
1223 except Exception as e:
1224 return handle_api_error("creating collection", e)
1227@rag_bp.route("/api/collections/<string:collection_id>", methods=["PUT"])
1228@login_required
1229def update_collection(collection_id):
1230 """Update a collection's details."""
1231 from ...database.session_context import get_user_db_session
1233 try:
1234 data = request.get_json()
1235 name = data.get("name", "").strip()
1236 description = data.get("description", "").strip()
1238 username = session["username"]
1239 with get_user_db_session(username) as db_session:
1240 # No need to filter by username - each user has their own database
1241 collection = (
1242 db_session.query(Collection).filter_by(id=collection_id).first()
1243 )
1245 if not collection:
1246 return jsonify(
1247 {"success": False, "error": "Collection not found"}
1248 ), 404
1250 if name:
1251 # Check if new name conflicts with existing collection
1252 existing = (
1253 db_session.query(Collection)
1254 .filter(
1255 Collection.name == name,
1256 Collection.id != collection_id,
1257 )
1258 .first()
1259 )
1261 if existing:
1262 return jsonify(
1263 {
1264 "success": False,
1265 "error": f"Collection '{name}' already exists",
1266 }
1267 ), 400
1269 collection.name = name
1271 if description is not None: # Allow empty description 1271 ↛ 1274line 1271 didn't jump to line 1274 because the condition on line 1271 was always true
1272 collection.description = description
1274 db_session.commit()
1276 return jsonify(
1277 {
1278 "success": True,
1279 "collection": {
1280 "id": collection.id,
1281 "name": collection.name,
1282 "description": collection.description,
1283 "created_at": collection.created_at.isoformat()
1284 if collection.created_at
1285 else None,
1286 "collection_type": collection.collection_type,
1287 },
1288 }
1289 )
1290 except Exception as e:
1291 return handle_api_error("updating collection", e)
1294@rag_bp.route("/api/collections/<string:collection_id>", methods=["DELETE"])
1295@login_required
1296def delete_collection(collection_id):
1297 """Delete a collection and its orphaned documents."""
1298 from ..deletion.services.collection_deletion import (
1299 CollectionDeletionService,
1300 )
1302 try:
1303 username = session["username"]
1304 service = CollectionDeletionService(username)
1305 result = service.delete_collection(
1306 collection_id, delete_orphaned_documents=True
1307 )
1309 if result.get("deleted"):
1310 return jsonify(
1311 {
1312 "success": True,
1313 "message": "Collection deleted successfully",
1314 "deleted_chunks": result.get("chunks_deleted", 0),
1315 "orphaned_documents_deleted": result.get(
1316 "orphaned_documents_deleted", 0
1317 ),
1318 }
1319 )
1320 else:
1321 error = result.get("error", "Unknown error")
1322 status_code = 404 if "not found" in error.lower() else 400
1323 return jsonify({"success": False, "error": error}), status_code
1325 except Exception as e:
1326 return handle_api_error("deleting collection", e)
1329def extract_text_from_file(
1330 file_content: bytes, file_type: str, filename: str
1331) -> Optional[str]:
1332 """
1333 Extract text from uploaded file content.
1335 Args:
1336 file_content: Raw file content as bytes
1337 file_type: Type of file (pdf, text, markdown, html, etc.)
1338 filename: Original filename for logging
1340 Returns:
1341 Extracted text as string, or None if extraction failed
1342 """
1343 try:
1344 if file_type == "pdf":
1345 # Use the existing PDF text extraction functionality
1346 from ..downloaders.base import BaseDownloader
1348 text = BaseDownloader.extract_text_from_pdf(file_content)
1349 if text:
1350 logger.info(f"Successfully extracted text from PDF: {filename}")
1351 return text
1352 else:
1353 logger.warning(f"Could not extract text from PDF: {filename}")
1354 return None
1356 elif file_type in ["text", "txt", "markdown", "md"]:
1357 # Plain text files - just decode
1358 try:
1359 text = file_content.decode("utf-8")
1360 logger.info(f"Successfully read text file: {filename}")
1361 return text
1362 except UnicodeDecodeError:
1363 # Retry with errors='ignore' (same encoding, but skip invalid bytes)
1364 try:
1365 text = file_content.decode("utf-8", errors="ignore")
1366 logger.warning(
1367 f"Read text file with encoding errors ignored: {filename}"
1368 )
1369 return text
1370 except Exception:
1371 logger.exception(f"Failed to decode text file: {filename}")
1372 return None
1374 elif file_type in ["html", "htm"]:
1375 # HTML files - extract text content
1376 try:
1377 from bs4 import BeautifulSoup
1379 html_content = file_content.decode("utf-8", errors="ignore")
1380 soup = BeautifulSoup(html_content, "html.parser")
1382 # Remove script and style elements
1383 for script in soup(["script", "style"]):
1384 script.decompose()
1386 # Get text
1387 text = soup.get_text()
1389 # Clean up whitespace
1390 lines = (line.strip() for line in text.splitlines())
1391 chunks = (
1392 phrase.strip()
1393 for line in lines
1394 for phrase in line.split(" ")
1395 )
1396 text = "\n".join(chunk for chunk in chunks if chunk)
1398 logger.info(
1399 f"Successfully extracted text from HTML: {filename}"
1400 )
1401 return text
1402 except Exception:
1403 logger.exception(
1404 f"Failed to extract text from HTML: {filename}"
1405 )
1406 return None
1408 elif file_type in ["docx", "doc"]:
1409 # Word documents - would need python-docx library
1410 logger.warning(
1411 f"Word document extraction not yet implemented: {filename}"
1412 )
1413 return None
1415 else:
1416 logger.warning(
1417 f"Unsupported file type for text extraction: {file_type} ({filename})"
1418 )
1419 return None
1421 except Exception:
1422 logger.exception(f"Error extracting text from {filename}")
1423 return None
1426@rag_bp.route(
1427 "/api/collections/<string:collection_id>/upload", methods=["POST"]
1428)
1429@login_required
1430def upload_to_collection(collection_id):
1431 """Upload files to a collection."""
1432 from ...database.session_context import get_user_db_session
1433 from werkzeug.utils import secure_filename
1434 from pathlib import Path
1435 import hashlib
1436 import uuid
1437 from ..services.pdf_storage_manager import PDFStorageManager
1439 try:
1440 if "files" not in request.files:
1441 return jsonify(
1442 {"success": False, "error": "No files provided"}
1443 ), 400
1445 files = request.files.getlist("files")
1446 if not files:
1447 return jsonify(
1448 {"success": False, "error": "No files selected"}
1449 ), 400
1451 username = session["username"]
1452 with get_user_db_session(username) as db_session:
1453 # Verify collection exists in this user's database
1454 collection = (
1455 db_session.query(Collection).filter_by(id=collection_id).first()
1456 )
1458 if not collection:
1459 return jsonify(
1460 {"success": False, "error": "Collection not found"}
1461 ), 404
1463 # Get PDF storage mode from form data, falling back to user's setting
1464 settings = get_settings_manager()
1465 default_pdf_storage = settings.get_setting(
1466 "research_library.upload_pdf_storage", "none"
1467 )
1468 pdf_storage = request.form.get("pdf_storage", default_pdf_storage)
1469 if pdf_storage not in ("database", "none"):
1470 # Security: user uploads can only use database (encrypted) or none (text-only)
1471 # Filesystem storage is not allowed for user uploads
1472 pdf_storage = "none"
1474 # Initialize PDF storage manager if storing PDFs in database
1475 pdf_storage_manager = None
1476 if pdf_storage == "database":
1477 library_root = settings.get_setting(
1478 "research_library.storage_path",
1479 str(get_library_directory()),
1480 )
1481 library_root = str(Path(library_root).expanduser())
1482 pdf_storage_manager = PDFStorageManager(
1483 library_root=Path(library_root), storage_mode="database"
1484 )
1485 logger.info("PDF storage mode: database (encrypted)")
1486 else:
1487 logger.info("PDF storage mode: none (text-only)")
1489 uploaded_files = []
1490 errors = []
1492 for file in files:
1493 if not file.filename:
1494 continue
1496 try:
1497 # Read file content
1498 file_content = file.read()
1499 file.seek(0) # Reset for potential re-reading
1501 # Calculate file hash for deduplication
1502 file_hash = hashlib.sha256(file_content).hexdigest()
1504 # Check if document already exists
1505 existing_doc = (
1506 db_session.query(Document)
1507 .filter_by(document_hash=file_hash)
1508 .first()
1509 )
1511 if existing_doc:
1512 # Document exists, check if we can upgrade to include PDF
1513 pdf_upgraded = False
1514 if (
1515 pdf_storage == "database"
1516 and pdf_storage_manager is not None
1517 ):
1518 pdf_upgraded = pdf_storage_manager.upgrade_to_pdf(
1519 document=existing_doc,
1520 pdf_content=file_content,
1521 session=db_session,
1522 )
1524 # Check if already in collection
1525 existing_link = (
1526 db_session.query(DocumentCollection)
1527 .filter_by(
1528 document_id=existing_doc.id,
1529 collection_id=collection_id,
1530 )
1531 .first()
1532 )
1534 if not existing_link:
1535 # Add to collection
1536 collection_link = DocumentCollection(
1537 document_id=existing_doc.id,
1538 collection_id=collection_id,
1539 indexed=False,
1540 chunk_count=0,
1541 )
1542 db_session.add(collection_link)
1543 status = "added_to_collection"
1544 if pdf_upgraded:
1545 status = "added_to_collection_pdf_upgraded"
1546 uploaded_files.append(
1547 {
1548 "filename": existing_doc.filename,
1549 "status": status,
1550 "id": existing_doc.id,
1551 "pdf_upgraded": pdf_upgraded,
1552 }
1553 )
1554 else:
1555 status = "already_in_collection"
1556 if pdf_upgraded:
1557 status = "pdf_upgraded"
1558 uploaded_files.append(
1559 {
1560 "filename": existing_doc.filename,
1561 "status": status,
1562 "id": existing_doc.id,
1563 "pdf_upgraded": pdf_upgraded,
1564 }
1565 )
1566 else:
1567 # Create new document
1568 filename = secure_filename(file.filename)
1570 # Determine file type
1571 file_extension = Path(filename).suffix.lower()
1572 file_type = {
1573 ".pdf": "pdf",
1574 ".txt": "text",
1575 ".md": "markdown",
1576 ".html": "html",
1577 ".htm": "html",
1578 ".docx": "docx",
1579 ".doc": "doc",
1580 }.get(file_extension, "unknown")
1582 # Extract text from the file
1583 extracted_text = extract_text_from_file(
1584 file_content, file_type, filename
1585 )
1587 # Clean the extracted text to remove surrogate characters
1588 if extracted_text:
1589 from ...text_processing import remove_surrogates
1591 extracted_text = remove_surrogates(extracted_text)
1593 if not extracted_text:
1594 errors.append(
1595 {
1596 "filename": filename,
1597 "error": f"Could not extract text from {file_type} file",
1598 }
1599 )
1600 logger.warning(
1601 f"Skipping file {filename} - no text could be extracted"
1602 )
1603 continue
1605 # Get or create the user_upload source type
1606 logger.info(
1607 f"Getting or creating user_upload source type for {filename}"
1608 )
1609 source_type = (
1610 db_session.query(SourceType)
1611 .filter_by(name="user_upload")
1612 .first()
1613 )
1614 if not source_type:
1615 logger.info("Creating new user_upload source type")
1616 source_type = SourceType(
1617 id=str(uuid.uuid4()),
1618 name="user_upload",
1619 display_name="User Upload",
1620 description="Documents uploaded by users",
1621 icon="fas fa-upload",
1622 )
1623 db_session.add(source_type)
1624 db_session.flush()
1625 logger.info(
1626 f"Created source type with ID: {source_type.id}"
1627 )
1628 else:
1629 logger.info(
1630 f"Found existing source type with ID: {source_type.id}"
1631 )
1633 # Create document with extracted text (no username needed - in user's own database)
1634 # Note: uploaded_at uses default=utcnow() in the model, so we don't need to set it manually
1635 doc_id = str(uuid.uuid4())
1636 logger.info(
1637 f"Creating document {doc_id} for {filename}"
1638 )
1640 # Determine storage mode and file_path
1641 store_pdf_in_db = (
1642 pdf_storage == "database"
1643 and file_type == "pdf"
1644 and pdf_storage_manager is not None
1645 )
1647 new_doc = Document(
1648 id=doc_id,
1649 source_type_id=source_type.id,
1650 filename=filename,
1651 document_hash=file_hash,
1652 file_size=len(file_content),
1653 file_type=file_type,
1654 text_content=extracted_text, # Always store extracted text
1655 file_path=None
1656 if store_pdf_in_db
1657 else "text_only_not_stored",
1658 storage_mode="database"
1659 if store_pdf_in_db
1660 else "none",
1661 )
1662 db_session.add(new_doc)
1663 db_session.flush() # Get the ID
1664 logger.info(
1665 f"Document {new_doc.id} created successfully"
1666 )
1668 # Store PDF in encrypted database if requested
1669 pdf_stored = False
1670 if store_pdf_in_db:
1671 try:
1672 pdf_storage_manager.save_pdf(
1673 pdf_content=file_content,
1674 document=new_doc,
1675 session=db_session,
1676 filename=filename,
1677 )
1678 pdf_stored = True
1679 logger.info(
1680 f"PDF stored in encrypted database for {filename}"
1681 )
1682 except Exception:
1683 logger.exception(
1684 f"Failed to store PDF in database for {filename}"
1685 )
1686 # Continue without PDF storage - text is still saved
1688 # Add to collection
1689 collection_link = DocumentCollection(
1690 document_id=new_doc.id,
1691 collection_id=collection_id,
1692 indexed=False,
1693 chunk_count=0,
1694 )
1695 db_session.add(collection_link)
1697 uploaded_files.append(
1698 {
1699 "filename": filename,
1700 "status": "uploaded",
1701 "id": new_doc.id,
1702 "text_length": len(extracted_text),
1703 "pdf_stored": pdf_stored,
1704 }
1705 )
1707 except Exception:
1708 errors.append(
1709 {
1710 "filename": file.filename,
1711 "error": "Failed to upload file",
1712 }
1713 )
1714 logger.exception(f"Error uploading file {file.filename}")
1716 db_session.commit()
1718 return jsonify(
1719 {
1720 "success": True,
1721 "uploaded": uploaded_files,
1722 "errors": errors,
1723 "summary": {
1724 "total": len(files),
1725 "successful": len(uploaded_files),
1726 "failed": len(errors),
1727 },
1728 }
1729 )
1731 except Exception as e:
1732 return handle_api_error("uploading files", e)
1735@rag_bp.route(
1736 "/api/collections/<string:collection_id>/documents", methods=["GET"]
1737)
1738@login_required
1739def get_collection_documents(collection_id):
1740 """Get all documents in a collection."""
1741 from ...database.session_context import get_user_db_session
1743 try:
1744 username = session["username"]
1745 with get_user_db_session(username) as db_session:
1746 # Verify collection exists in this user's database
1747 collection = (
1748 db_session.query(Collection).filter_by(id=collection_id).first()
1749 )
1751 if not collection:
1752 return jsonify(
1753 {"success": False, "error": "Collection not found"}
1754 ), 404
1756 # Get documents through junction table
1757 doc_links = (
1758 db_session.query(DocumentCollection, Document)
1759 .join(Document)
1760 .filter(DocumentCollection.collection_id == collection_id)
1761 .all()
1762 )
1764 documents = []
1765 for link, doc in doc_links:
1766 # Check if PDF file is stored
1767 has_pdf = bool(
1768 doc.file_path
1769 and doc.file_path != "metadata_only"
1770 and doc.file_path != "text_only_not_stored"
1771 )
1772 has_text_db = bool(doc.text_content)
1774 # Use title if available, otherwise filename
1775 display_title = doc.title or doc.filename or "Untitled"
1777 # Get source type name
1778 source_type_name = (
1779 doc.source_type.name if doc.source_type else "unknown"
1780 )
1782 # Check if document is in other collections
1783 other_collections_count = (
1784 db_session.query(DocumentCollection)
1785 .filter(
1786 DocumentCollection.document_id == doc.id,
1787 DocumentCollection.collection_id != collection_id,
1788 )
1789 .count()
1790 )
1792 documents.append(
1793 {
1794 "id": doc.id,
1795 "filename": display_title,
1796 "title": display_title,
1797 "file_type": doc.file_type,
1798 "file_size": doc.file_size,
1799 "uploaded_at": doc.created_at.isoformat()
1800 if doc.created_at
1801 else None,
1802 "indexed": link.indexed,
1803 "chunk_count": link.chunk_count,
1804 "last_indexed_at": link.last_indexed_at.isoformat()
1805 if link.last_indexed_at
1806 else None,
1807 "has_pdf": has_pdf,
1808 "has_text_db": has_text_db,
1809 "source_type": source_type_name,
1810 "in_other_collections": other_collections_count > 0,
1811 "other_collections_count": other_collections_count,
1812 }
1813 )
1815 # Get index file size if available
1816 index_file_size = None
1817 index_file_size_bytes = None
1818 collection_name = f"collection_{collection_id}"
1819 rag_index = (
1820 db_session.query(RAGIndex)
1821 .filter_by(collection_name=collection_name)
1822 .first()
1823 )
1824 if rag_index and rag_index.index_path:
1825 from pathlib import Path
1827 index_path = Path(rag_index.index_path)
1828 if index_path.exists():
1829 size_bytes = index_path.stat().st_size
1830 index_file_size_bytes = size_bytes
1831 # Format as human-readable
1832 if size_bytes < 1024:
1833 index_file_size = f"{size_bytes} B"
1834 elif size_bytes < 1024 * 1024:
1835 index_file_size = f"{size_bytes / 1024:.1f} KB"
1836 else:
1837 index_file_size = f"{size_bytes / (1024 * 1024):.1f} MB"
1839 return jsonify(
1840 {
1841 "success": True,
1842 "collection": {
1843 "id": collection.id,
1844 "name": collection.name,
1845 "description": collection.description,
1846 "embedding_model": collection.embedding_model,
1847 "embedding_model_type": collection.embedding_model_type.value
1848 if collection.embedding_model_type
1849 else None,
1850 "embedding_dimension": collection.embedding_dimension,
1851 "chunk_size": collection.chunk_size,
1852 "chunk_overlap": collection.chunk_overlap,
1853 # Advanced settings
1854 "splitter_type": collection.splitter_type,
1855 "distance_metric": collection.distance_metric,
1856 "index_type": collection.index_type,
1857 "normalize_vectors": collection.normalize_vectors,
1858 # Index file info
1859 "index_file_size": index_file_size,
1860 "index_file_size_bytes": index_file_size_bytes,
1861 },
1862 "documents": documents,
1863 }
1864 )
1866 except Exception as e:
1867 return handle_api_error("getting collection documents", e)
1870@rag_bp.route("/api/collections/<string:collection_id>/index", methods=["GET"])
1871@login_required
1872def index_collection(collection_id):
1873 """Index all documents in a collection with Server-Sent Events progress."""
1874 from ...database.session_context import get_user_db_session
1875 from ...database.session_passwords import session_password_store
1877 force_reindex = request.args.get("force_reindex", "false").lower() == "true"
1878 username = session["username"]
1879 session_id = session.get("session_id")
1881 logger.info(f"Starting index_collection, force_reindex={force_reindex}")
1883 # Get password for thread access to encrypted database
1884 db_password = None
1885 if session_id:
1886 db_password = session_password_store.get_session_password(
1887 username, session_id
1888 )
1890 # Create RAG service
1891 rag_service = get_rag_service(collection_id)
1892 # Set password for thread access
1893 rag_service.db_password = db_password
1894 logger.info(
1895 f"RAG service created: provider={rag_service.embedding_provider}"
1896 )
1898 def generate():
1899 """Generator for SSE progress updates."""
1900 logger.info("SSE generator started")
1901 try:
1902 with get_user_db_session(username, db_password) as db_session:
1903 # Verify collection exists in this user's database
1904 collection = (
1905 db_session.query(Collection)
1906 .filter_by(id=collection_id)
1907 .first()
1908 )
1910 if not collection:
1911 yield f"data: {json.dumps({'type': 'error', 'error': 'Collection not found'})}\n\n"
1912 return
1914 # Store embedding metadata if this is the first time indexing
1915 if collection.embedding_model is None:
1916 # Get embedding dimension from the embedding manager
1917 embedding_dim = None
1918 try:
1919 # Try to get dimension from the embedding manager's provider
1920 if hasattr(rag_service.embedding_manager, "provider"):
1921 provider = rag_service.embedding_manager.provider
1922 if hasattr(provider, "embedding_dimension"):
1923 embedding_dim = provider.embedding_dimension
1924 except Exception as e:
1925 logger.warning(
1926 f"Could not get embedding dimension: {e}"
1927 )
1929 # Store the embedding configuration used for this collection
1930 collection.embedding_model = rag_service.embedding_model
1931 collection.embedding_model_type = EmbeddingProvider(
1932 rag_service.embedding_provider
1933 )
1934 collection.embedding_dimension = embedding_dim
1935 collection.chunk_size = rag_service.chunk_size
1936 collection.chunk_overlap = rag_service.chunk_overlap
1937 # Store advanced settings
1938 collection.splitter_type = rag_service.splitter_type
1939 collection.text_separators = rag_service.text_separators
1940 collection.distance_metric = rag_service.distance_metric
1941 # Ensure normalize_vectors is a proper boolean for database
1942 collection.normalize_vectors = bool(
1943 rag_service.normalize_vectors
1944 )
1945 collection.index_type = rag_service.index_type
1946 db_session.commit()
1947 logger.info(
1948 f"Stored embedding metadata for collection: provider={rag_service.embedding_provider}"
1949 )
1951 # Get documents to index
1952 query = (
1953 db_session.query(DocumentCollection, Document)
1954 .join(Document)
1955 .filter(DocumentCollection.collection_id == collection_id)
1956 )
1958 if not force_reindex:
1959 # IMPORTANT: DO NOT CHANGE THIS LINE! DO NOT REMOVE THIS COMMENT!
1960 # We MUST use `== False` instead of `not DocumentCollection.indexed`
1961 # The Python `not` operator does NOT work correctly in SQLAlchemy filters.
1962 # Using `not` will cause the query to return NO results (zero documents).
1963 # SQLAlchemy requires explicit comparison: `== False` or `== True`
1964 # This has been fixed multiple times - DO NOT change it back to `not`!
1965 query = query.filter(DocumentCollection.indexed == False) # noqa: E712
1967 doc_links = query.all()
1969 if not doc_links:
1970 logger.info("No documents to index in collection")
1971 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n"
1972 return
1974 total = len(doc_links)
1975 logger.info(f"Found {total} documents to index")
1976 results = {
1977 "successful": 0,
1978 "skipped": 0,
1979 "failed": 0,
1980 "errors": [],
1981 }
1983 yield f"data: {json.dumps({'type': 'start', 'message': f'Indexing {total} documents in collection: {collection.name}'})}\n\n"
1985 for idx, (link, doc) in enumerate(doc_links, 1):
1986 filename = doc.filename or doc.title or "Unknown"
1987 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': filename, 'percent': int((idx / total) * 100)})}\n\n"
1989 try:
1990 logger.debug(
1991 f"Indexing document {idx}/{total}: {filename}"
1992 )
1994 # Run index_document in a separate thread to allow sending SSE heartbeats.
1995 # This keeps the HTTP connection alive during long indexing operations,
1996 # preventing timeouts from proxy servers (nginx) and browsers.
1997 # The main thread periodically yields heartbeat comments while waiting.
1998 result_queue = queue.Queue()
1999 error_queue = queue.Queue()
2001 def index_in_thread():
2002 try:
2003 r = rag_service.index_document(
2004 document_id=doc.id,
2005 collection_id=collection_id,
2006 force_reindex=force_reindex,
2007 )
2008 result_queue.put(r)
2009 except Exception as ex:
2010 error_queue.put(ex)
2012 thread = threading.Thread(target=index_in_thread)
2013 thread.start()
2015 # Send heartbeats while waiting for the thread to complete
2016 heartbeat_interval = 5 # seconds
2017 while thread.is_alive():
2018 thread.join(timeout=heartbeat_interval)
2019 if thread.is_alive():
2020 # Send SSE comment as heartbeat (keeps connection alive)
2021 yield f": heartbeat {idx}/{total}\n\n"
2023 # Check for errors from thread
2024 if not error_queue.empty():
2025 raise error_queue.get()
2027 result = result_queue.get()
2028 logger.info(
2029 f"Indexed document {idx}/{total}: {filename} - status={result.get('status')}"
2030 )
2032 if result.get("status") == "success":
2033 results["successful"] += 1
2034 # DocumentCollection status is already updated in index_document
2035 # No need to update link here
2036 elif result.get("status") == "skipped":
2037 results["skipped"] += 1
2038 else:
2039 results["failed"] += 1
2040 error_msg = result.get("error", "Unknown error")
2041 results["errors"].append(
2042 {
2043 "filename": filename,
2044 "error": error_msg,
2045 }
2046 )
2047 logger.warning(
2048 f"Failed to index {filename} ({idx}/{total}): {error_msg}"
2049 )
2050 except Exception as e:
2051 results["failed"] += 1
2052 error_msg = str(e) or "Failed to index document"
2053 results["errors"].append(
2054 {
2055 "filename": filename,
2056 "error": error_msg,
2057 }
2058 )
2059 logger.exception(
2060 f"Exception indexing document {filename} ({idx}/{total})"
2061 )
2062 # Send error update to client so they know indexing is continuing
2063 yield f"data: {json.dumps({'type': 'doc_error', 'filename': filename, 'error': error_msg})}\n\n"
2065 db_session.commit()
2066 # Ensure all changes are written to disk
2067 db_session.flush()
2069 logger.info(
2070 f"Indexing complete: {results['successful']} successful, {results['failed']} failed, {results['skipped']} skipped"
2071 )
2072 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n"
2073 logger.info("SSE generator finished successfully")
2075 except Exception:
2076 logger.exception("Error in collection indexing")
2077 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n"
2079 response = Response(
2080 stream_with_context(generate()), mimetype="text/event-stream"
2081 )
2082 # Prevent buffering for proper SSE streaming
2083 response.headers["Cache-Control"] = "no-cache, no-transform"
2084 response.headers["Connection"] = "keep-alive"
2085 response.headers["X-Accel-Buffering"] = "no"
2086 return response
2089# =============================================================================
2090# Background Indexing Endpoints
2091# =============================================================================
2094def _get_rag_service_for_thread(
2095 collection_id: str,
2096 username: str,
2097 db_password: str,
2098) -> LibraryRAGService:
2099 """
2100 Create RAG service for use in background threads (no Flask context).
2101 """
2102 from ...database.session_context import get_user_db_session
2103 from ...settings.manager import SettingsManager
2104 from ...web_search_engines.engines.search_engine_local import (
2105 LocalEmbeddingManager,
2106 )
2107 import json
2109 with get_user_db_session(username, db_password) as db_session:
2110 settings_manager = SettingsManager(db_session)
2112 # Get default settings
2113 default_embedding_model = settings_manager.get_setting(
2114 "local_search_embedding_model", "all-MiniLM-L6-v2"
2115 )
2116 default_embedding_provider = settings_manager.get_setting(
2117 "local_search_embedding_provider", "sentence_transformers"
2118 )
2119 default_chunk_size = int(
2120 settings_manager.get_setting("local_search_chunk_size", 1000)
2121 )
2122 default_chunk_overlap = int(
2123 settings_manager.get_setting("local_search_chunk_overlap", 200)
2124 )
2125 default_splitter_type = settings_manager.get_setting(
2126 "local_search_splitter_type", "recursive"
2127 )
2128 default_text_separators = settings_manager.get_setting(
2129 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]'
2130 )
2131 if isinstance(default_text_separators, str):
2132 try:
2133 default_text_separators = json.loads(default_text_separators)
2134 except json.JSONDecodeError:
2135 logger.warning(
2136 f"Invalid JSON for local_search_text_separators setting: {default_text_separators!r}. "
2137 "Using default separators."
2138 )
2139 default_text_separators = ["\n\n", "\n", ". ", " ", ""]
2140 default_distance_metric = settings_manager.get_setting(
2141 "local_search_distance_metric", "cosine"
2142 )
2143 raw_normalize = settings_manager.get_setting(
2144 "local_search_normalize_vectors", True
2145 )
2146 if isinstance(raw_normalize, str):
2147 default_normalize_vectors = raw_normalize.lower() in (
2148 "true",
2149 "1",
2150 "yes",
2151 )
2152 else:
2153 default_normalize_vectors = bool(raw_normalize)
2154 default_index_type = settings_manager.get_setting(
2155 "local_search_index_type", "flat"
2156 )
2158 # Get settings snapshot for embedding manager
2159 settings_snapshot = settings_manager.get_settings_snapshot()
2160 settings_snapshot["_username"] = username
2162 # Check for collection's stored settings
2163 collection = (
2164 db_session.query(Collection).filter_by(id=collection_id).first()
2165 )
2167 if collection and collection.embedding_model:
2168 # Use collection's stored settings
2169 embedding_model = collection.embedding_model
2170 embedding_provider = collection.embedding_model_type.value
2171 chunk_size = collection.chunk_size or default_chunk_size
2172 chunk_overlap = collection.chunk_overlap or default_chunk_overlap
2173 splitter_type = collection.splitter_type or default_splitter_type
2174 text_separators = (
2175 collection.text_separators or default_text_separators
2176 )
2177 distance_metric = (
2178 collection.distance_metric or default_distance_metric
2179 )
2180 index_type = collection.index_type or default_index_type
2182 coll_normalize = collection.normalize_vectors
2183 if coll_normalize is not None:
2184 if isinstance(coll_normalize, str):
2185 coll_normalize = coll_normalize.lower() in (
2186 "true",
2187 "1",
2188 "yes",
2189 )
2190 else:
2191 coll_normalize = bool(coll_normalize)
2192 else:
2193 coll_normalize = default_normalize_vectors
2194 normalize_vectors = coll_normalize
2195 else:
2196 # Use default settings
2197 embedding_model = default_embedding_model
2198 embedding_provider = default_embedding_provider
2199 chunk_size = default_chunk_size
2200 chunk_overlap = default_chunk_overlap
2201 splitter_type = default_splitter_type
2202 text_separators = default_text_separators
2203 distance_metric = default_distance_metric
2204 normalize_vectors = default_normalize_vectors
2205 index_type = default_index_type
2207 # Update settings snapshot with embedding config
2208 settings_snapshot.update(
2209 {
2210 "embeddings.provider": embedding_provider,
2211 f"embeddings.{embedding_provider}.model": embedding_model,
2212 "local_search_chunk_size": chunk_size,
2213 "local_search_chunk_overlap": chunk_overlap,
2214 }
2215 )
2217 # Create embedding manager (to avoid database access in LibraryRAGService.__init__)
2218 embedding_manager = LocalEmbeddingManager(
2219 embedding_model=embedding_model,
2220 embedding_model_type=embedding_provider,
2221 chunk_size=chunk_size,
2222 chunk_overlap=chunk_overlap,
2223 settings_snapshot=settings_snapshot,
2224 )
2225 embedding_manager.db_password = db_password
2227 # Create RAG service with pre-built embedding manager and db_password
2228 rag_service = LibraryRAGService(
2229 username=username,
2230 embedding_model=embedding_model,
2231 embedding_provider=embedding_provider,
2232 chunk_size=chunk_size,
2233 chunk_overlap=chunk_overlap,
2234 splitter_type=splitter_type,
2235 text_separators=text_separators,
2236 distance_metric=distance_metric,
2237 normalize_vectors=normalize_vectors,
2238 index_type=index_type,
2239 embedding_manager=embedding_manager,
2240 db_password=db_password,
2241 )
2243 return rag_service
2246def _background_index_worker(
2247 task_id: str,
2248 collection_id: str,
2249 username: str,
2250 db_password: str,
2251 force_reindex: bool,
2252):
2253 """
2254 Background worker thread for indexing documents.
2255 Updates TaskMetadata with progress and checks for cancellation.
2256 """
2257 from ...database.session_context import get_user_db_session
2259 try:
2260 # Create RAG service (thread-safe, no Flask context needed)
2261 rag_service = _get_rag_service_for_thread(
2262 collection_id, username, db_password
2263 )
2265 with get_user_db_session(username, db_password) as db_session:
2266 # Get collection
2267 collection = (
2268 db_session.query(Collection).filter_by(id=collection_id).first()
2269 )
2271 if not collection:
2272 _update_task_status(
2273 username,
2274 db_password,
2275 task_id,
2276 status="failed",
2277 error_message="Collection not found",
2278 )
2279 return
2281 # Store embedding metadata if first time
2282 if collection.embedding_model is None:
2283 collection.embedding_model = rag_service.embedding_model
2284 collection.embedding_model_type = EmbeddingProvider(
2285 rag_service.embedding_provider
2286 )
2287 collection.chunk_size = rag_service.chunk_size
2288 collection.chunk_overlap = rag_service.chunk_overlap
2289 collection.splitter_type = rag_service.splitter_type
2290 collection.text_separators = rag_service.text_separators
2291 collection.distance_metric = rag_service.distance_metric
2292 collection.normalize_vectors = bool(
2293 rag_service.normalize_vectors
2294 )
2295 collection.index_type = rag_service.index_type
2296 db_session.commit()
2298 # Get documents to index
2299 query = (
2300 db_session.query(DocumentCollection, Document)
2301 .join(Document)
2302 .filter(DocumentCollection.collection_id == collection_id)
2303 )
2305 if not force_reindex:
2306 query = query.filter(DocumentCollection.indexed == False) # noqa: E712
2308 doc_links = query.all()
2310 if not doc_links:
2311 _update_task_status(
2312 username,
2313 db_password,
2314 task_id,
2315 status="completed",
2316 progress_message="No documents to index",
2317 )
2318 return
2320 total = len(doc_links)
2321 results = {"successful": 0, "skipped": 0, "failed": 0}
2323 # Update task with total count
2324 _update_task_status(
2325 username,
2326 db_password,
2327 task_id,
2328 progress_total=total,
2329 progress_message=f"Indexing {total} documents",
2330 )
2332 for idx, (link, doc) in enumerate(doc_links, 1):
2333 # Check if cancelled
2334 if _is_task_cancelled(username, db_password, task_id):
2335 _update_task_status(
2336 username,
2337 db_password,
2338 task_id,
2339 status="cancelled",
2340 progress_message=f"Cancelled after {idx - 1}/{total} documents",
2341 )
2342 logger.info(f"Indexing task {task_id} was cancelled")
2343 return
2345 filename = doc.filename or doc.title or "Unknown"
2347 # Update progress with filename
2348 _update_task_status(
2349 username,
2350 db_password,
2351 task_id,
2352 progress_current=idx,
2353 progress_message=f"Indexing {idx}/{total}: {filename}",
2354 )
2356 try:
2357 result = rag_service.index_document(
2358 document_id=doc.id,
2359 collection_id=collection_id,
2360 force_reindex=force_reindex,
2361 )
2363 if result.get("status") == "success":
2364 results["successful"] += 1
2365 elif result.get("status") == "skipped":
2366 results["skipped"] += 1
2367 else:
2368 results["failed"] += 1
2370 except Exception:
2371 results["failed"] += 1
2372 logger.exception(f"Error indexing document {idx}/{total}")
2374 db_session.commit()
2376 # Mark as completed
2377 _update_task_status(
2378 username,
2379 db_password,
2380 task_id,
2381 status="completed",
2382 progress_current=total,
2383 progress_message=f"Completed: {results['successful']} indexed, {results['failed']} failed, {results['skipped']} skipped",
2384 )
2385 logger.info(f"Background indexing task {task_id} completed: {results}")
2387 except Exception as e:
2388 logger.exception(f"Background indexing task {task_id} failed")
2389 _update_task_status(
2390 username,
2391 db_password,
2392 task_id,
2393 status="failed",
2394 error_message=str(e),
2395 )
2398def _update_task_status(
2399 username: str,
2400 db_password: str,
2401 task_id: str,
2402 status: str = None,
2403 progress_current: int = None,
2404 progress_total: int = None,
2405 progress_message: str = None,
2406 error_message: str = None,
2407):
2408 """Update task metadata in the database."""
2409 from ...database.session_context import get_user_db_session
2411 try:
2412 with get_user_db_session(username, db_password) as db_session:
2413 task = (
2414 db_session.query(TaskMetadata)
2415 .filter_by(task_id=task_id)
2416 .first()
2417 )
2418 if task:
2419 if status is not None:
2420 task.status = status
2421 if status == "completed":
2422 task.completed_at = datetime.now(UTC)
2423 if progress_current is not None:
2424 task.progress_current = progress_current
2425 if progress_total is not None:
2426 task.progress_total = progress_total
2427 if progress_message is not None:
2428 task.progress_message = progress_message
2429 if error_message is not None:
2430 task.error_message = error_message
2431 db_session.commit()
2432 except Exception:
2433 logger.exception(f"Failed to update task status for {task_id}")
2436def _is_task_cancelled(username: str, db_password: str, task_id: str) -> bool:
2437 """Check if a task has been cancelled."""
2438 from ...database.session_context import get_user_db_session
2440 try:
2441 with get_user_db_session(username, db_password) as db_session:
2442 task = (
2443 db_session.query(TaskMetadata)
2444 .filter_by(task_id=task_id)
2445 .first()
2446 )
2447 return task and task.status == "cancelled"
2448 except Exception:
2449 return False
2452@rag_bp.route(
2453 "/api/collections/<string:collection_id>/index/start", methods=["POST"]
2454)
2455@login_required
2456def start_background_index(collection_id):
2457 """Start background indexing for a collection."""
2458 from ...database.session_context import get_user_db_session
2459 from ...database.session_passwords import session_password_store
2461 username = session["username"]
2462 session_id = session.get("session_id")
2464 # Get password for thread access
2465 db_password = None
2466 if session_id:
2467 db_password = session_password_store.get_session_password(
2468 username, session_id
2469 )
2471 # Parse request body
2472 data = request.get_json() or {}
2473 force_reindex = data.get("force_reindex", False)
2475 try:
2476 with get_user_db_session(username, db_password) as db_session:
2477 # Check if there's already an active indexing task for this collection
2478 existing_task = (
2479 db_session.query(TaskMetadata)
2480 .filter(
2481 TaskMetadata.task_type == "indexing",
2482 TaskMetadata.status == "processing",
2483 )
2484 .first()
2485 )
2487 if existing_task:
2488 # Check if it's for this collection
2489 metadata = existing_task.metadata_json or {}
2490 if metadata.get("collection_id") == collection_id:
2491 return jsonify(
2492 {
2493 "success": False,
2494 "error": "Indexing is already in progress for this collection",
2495 "task_id": existing_task.task_id,
2496 }
2497 ), 409
2499 # Create new task
2500 task_id = str(uuid.uuid4())
2501 task = TaskMetadata(
2502 task_id=task_id,
2503 status="processing",
2504 task_type="indexing",
2505 created_at=datetime.now(UTC),
2506 started_at=datetime.now(UTC),
2507 progress_current=0,
2508 progress_total=0,
2509 progress_message="Starting indexing...",
2510 metadata_json={
2511 "collection_id": collection_id,
2512 "force_reindex": force_reindex,
2513 },
2514 )
2515 db_session.add(task)
2516 db_session.commit()
2518 # Start background thread
2519 thread = threading.Thread(
2520 target=_background_index_worker,
2521 args=(task_id, collection_id, username, db_password, force_reindex),
2522 daemon=True,
2523 )
2524 thread.start()
2526 logger.info(
2527 f"Started background indexing task {task_id} for collection {collection_id}"
2528 )
2530 return jsonify(
2531 {
2532 "success": True,
2533 "task_id": task_id,
2534 "message": "Indexing started in background",
2535 }
2536 )
2538 except Exception:
2539 logger.exception("Failed to start background indexing")
2540 return jsonify(
2541 {
2542 "success": False,
2543 "error": "Failed to start indexing. Please try again.",
2544 }
2545 ), 500
2548@rag_bp.route(
2549 "/api/collections/<string:collection_id>/index/status", methods=["GET"]
2550)
2551@limiter.exempt
2552@login_required
2553def get_index_status(collection_id):
2554 """Get the current indexing status for a collection."""
2555 from ...database.session_context import get_user_db_session
2556 from ...database.session_passwords import session_password_store
2558 username = session["username"]
2559 session_id = session.get("session_id")
2561 db_password = None
2562 if session_id:
2563 db_password = session_password_store.get_session_password(
2564 username, session_id
2565 )
2567 try:
2568 with get_user_db_session(username, db_password) as db_session:
2569 # Find the most recent indexing task for this collection
2570 task = (
2571 db_session.query(TaskMetadata)
2572 .filter(TaskMetadata.task_type == "indexing")
2573 .order_by(TaskMetadata.created_at.desc())
2574 .first()
2575 )
2577 if not task:
2578 return jsonify(
2579 {
2580 "status": "idle",
2581 "message": "No indexing task found",
2582 }
2583 )
2585 # Check if it's for this collection
2586 metadata = task.metadata_json or {}
2587 if metadata.get("collection_id") != collection_id:
2588 return jsonify(
2589 {
2590 "status": "idle",
2591 "message": "No indexing task for this collection",
2592 }
2593 )
2595 return jsonify(
2596 {
2597 "task_id": task.task_id,
2598 "status": task.status,
2599 "progress_current": task.progress_current or 0,
2600 "progress_total": task.progress_total or 0,
2601 "progress_message": task.progress_message,
2602 "error_message": task.error_message,
2603 "created_at": task.created_at.isoformat()
2604 if task.created_at
2605 else None,
2606 "completed_at": task.completed_at.isoformat()
2607 if task.completed_at
2608 else None,
2609 }
2610 )
2612 except Exception:
2613 logger.exception("Failed to get index status")
2614 return jsonify(
2615 {
2616 "status": "error",
2617 "error": "Failed to get indexing status. Please try again.",
2618 }
2619 ), 500
2622@rag_bp.route(
2623 "/api/collections/<string:collection_id>/index/cancel", methods=["POST"]
2624)
2625@login_required
2626def cancel_indexing(collection_id):
2627 """Cancel an active indexing task for a collection."""
2628 from ...database.session_context import get_user_db_session
2629 from ...database.session_passwords import session_password_store
2631 username = session["username"]
2632 session_id = session.get("session_id")
2634 db_password = None
2635 if session_id:
2636 db_password = session_password_store.get_session_password(
2637 username, session_id
2638 )
2640 try:
2641 with get_user_db_session(username, db_password) as db_session:
2642 # Find active indexing task for this collection
2643 task = (
2644 db_session.query(TaskMetadata)
2645 .filter(
2646 TaskMetadata.task_type == "indexing",
2647 TaskMetadata.status == "processing",
2648 )
2649 .first()
2650 )
2652 if not task:
2653 return jsonify(
2654 {
2655 "success": False,
2656 "error": "No active indexing task found",
2657 }
2658 ), 404
2660 # Check if it's for this collection
2661 metadata = task.metadata_json or {}
2662 if metadata.get("collection_id") != collection_id:
2663 return jsonify(
2664 {
2665 "success": False,
2666 "error": "No active indexing task for this collection",
2667 }
2668 ), 404
2670 # Mark as cancelled - the worker thread will check this
2671 task.status = "cancelled"
2672 task.progress_message = "Cancellation requested..."
2673 db_session.commit()
2675 logger.info(
2676 f"Cancelled indexing task {task.task_id} for collection {collection_id}"
2677 )
2679 return jsonify(
2680 {
2681 "success": True,
2682 "message": "Cancellation requested",
2683 "task_id": task.task_id,
2684 }
2685 )
2687 except Exception:
2688 logger.exception("Failed to cancel indexing")
2689 return jsonify(
2690 {
2691 "success": False,
2692 "error": "Failed to cancel indexing. Please try again.",
2693 }
2694 ), 500