Coverage for src/local_deep_research/research_library/routes/rag_routes.py: 85%

1002 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2RAG Management API Routes 

3 

4Provides endpoints for managing RAG indexing of library documents: 

5- Configure embedding models 

6- Index documents 

7- Get RAG statistics 

8- Bulk operations with progress tracking 

9""" 

10 

11import os 

12 

13from flask import ( 

14 Blueprint, 

15 jsonify, 

16 request, 

17 Response, 

18 render_template, 

19 session, 

20 stream_with_context, 

21) 

22from loguru import logger 

23import atexit 

24import glob 

25import json 

26import uuid 

27import time 

28import threading 

29import queue 

30from concurrent.futures import ThreadPoolExecutor 

31from datetime import datetime, UTC 

32from pathlib import Path 

33from typing import Optional 

34 

35from ...constants import FILE_PATH_SENTINELS, FILE_PATH_TEXT_ONLY 

36from ...security.decorators import require_json_body 

37from ...web.auth.decorators import login_required 

38from ...web.utils.request_helpers import parse_bool_arg 

39from ...utilities.db_utils import get_settings_manager 

40from ...utilities.resource_utils import safe_close 

41from ..services.library_rag_service import LibraryRAGService 

42from ...settings.manager import SettingsManager 

43from ...security.file_upload_validator import FileUploadValidator 

44from ...security.path_validator import PathValidator 

45from ...security.rate_limiter import ( 

46 upload_rate_limit_ip, 

47 upload_rate_limit_user, 

48) 

49from ..utils import ensure_in_collection, handle_api_error 

50from ...database.models.library import ( 

51 Document, 

52 Collection, 

53 DocumentCollection, 

54 RAGIndex, 

55 SourceType, 

56 EmbeddingProvider, 

57) 

58from ...database.models.queue import TaskMetadata 

59from ...database.thread_local_session import thread_cleanup 

60from ...security.rate_limiter import limiter 

61from ...config.paths import get_library_directory 

62 

63rag_bp = Blueprint("rag", __name__, url_prefix="/library") 

64 

65# NOTE: Routes use session["username"] (not .get()) intentionally. 

66# @login_required guarantees the key exists; direct access fails fast 

67# if the decorator is ever removed. 

68 

69# Global ThreadPoolExecutor for auto-indexing to prevent thread proliferation 

70_auto_index_executor: ThreadPoolExecutor | None = None 

71_auto_index_executor_lock = threading.Lock() 

72 

73 

74def _get_auto_index_executor() -> ThreadPoolExecutor: 

75 """Get or create the global auto-indexing executor (thread-safe).""" 

76 global _auto_index_executor 

77 with _auto_index_executor_lock: 

78 if _auto_index_executor is None: 

79 _auto_index_executor = ThreadPoolExecutor( 

80 max_workers=4, 

81 thread_name_prefix="auto_index_", 

82 ) 

83 return _auto_index_executor 

84 

85 

86def _shutdown_auto_index_executor() -> None: 

87 """Shutdown the auto-index executor gracefully.""" 

88 global _auto_index_executor 

89 if _auto_index_executor is not None: 

90 _auto_index_executor.shutdown(wait=True) 

91 _auto_index_executor = None 

92 

93 

94atexit.register(_shutdown_auto_index_executor) 

95 

96 

97def get_rag_service( 

98 collection_id: Optional[str] = None, 

99 use_defaults: bool = False, 

100) -> LibraryRAGService: 

101 """ 

102 Get RAG service instance with appropriate settings. 

103 

104 Delegates to rag_service_factory.get_rag_service() with the current 

105 Flask session username. For non-Flask contexts, use the factory directly. 

106 

107 Args: 

108 collection_id: Optional collection UUID to load stored settings from 

109 use_defaults: When True, ignore stored collection settings and use 

110 current defaults. Pass True on force-reindex so that the new 

111 default embedding model is picked up. 

112 """ 

113 from ..services.rag_service_factory import ( 

114 get_rag_service as _get_rag_service, 

115 ) 

116 from ...database.session_passwords import session_password_store 

117 

118 username = session["username"] 

119 session_id = session.get("session_id") 

120 db_password = None 

121 if session_id: 

122 db_password = session_password_store.get_session_password( 

123 username, session_id 

124 ) 

125 return _get_rag_service( 

126 username, 

127 collection_id, 

128 use_defaults=use_defaults, 

129 db_password=db_password, 

130 ) 

131 

132 

133# Config API Routes 

134 

135 

136@rag_bp.route("/api/config/supported-formats", methods=["GET"]) 

137@login_required 

138def get_supported_formats(): 

139 """Return list of supported file formats for upload. 

140 

141 This endpoint provides the single source of truth for supported file 

142 extensions, pulling from the document_loaders registry. The UI can 

143 use this to dynamically update the file input accept attribute. 

144 """ 

145 from ...document_loaders import get_supported_extensions 

146 

147 extensions = get_supported_extensions() 

148 # Sort extensions for consistent display 

149 extensions = sorted(extensions) 

150 

151 return jsonify( 

152 { 

153 "extensions": extensions, 

154 "accept_string": ",".join(extensions), 

155 "count": len(extensions), 

156 } 

157 ) 

158 

159 

160# Page Routes 

161 

162 

163@rag_bp.route("/embedding-settings") 

164@login_required 

165def embedding_settings_page(): 

166 """Render the Embedding Settings page.""" 

167 return render_template( 

168 "pages/embedding_settings.html", active_page="embedding-settings" 

169 ) 

170 

171 

172@rag_bp.route("/document/<string:document_id>/chunks") 

173@login_required 

174def view_document_chunks(document_id): 

175 """View all chunks for a document across all collections.""" 

176 from ...database.session_context import get_user_db_session 

177 from ...database.models.library import DocumentChunk 

178 

179 username = session["username"] 

180 

181 with get_user_db_session(username) as db_session: 

182 # Get document info 

183 document = db_session.query(Document).filter_by(id=document_id).first() 

184 

185 if not document: 

186 return "Document not found", 404 

187 

188 # Get all chunks for this document 

189 chunks = ( 

190 db_session.query(DocumentChunk) 

191 .filter(DocumentChunk.source_id == document_id) 

192 .order_by(DocumentChunk.collection_name, DocumentChunk.chunk_index) 

193 .all() 

194 ) 

195 

196 # Group chunks by collection 

197 chunks_by_collection = {} 

198 for chunk in chunks: 

199 coll_name = chunk.collection_name 

200 if coll_name not in chunks_by_collection: 200 ↛ 214line 200 didn't jump to line 214 because the condition on line 200 was always true

201 # Get collection display name 

202 collection_id = coll_name.replace("collection_", "") 

203 collection = ( 

204 db_session.query(Collection) 

205 .filter_by(id=collection_id) 

206 .first() 

207 ) 

208 chunks_by_collection[coll_name] = { 

209 "name": collection.name if collection else coll_name, 

210 "id": collection_id, 

211 "chunks": [], 

212 } 

213 

214 chunks_by_collection[coll_name]["chunks"].append( 

215 { 

216 "id": chunk.id, 

217 "index": chunk.chunk_index, 

218 "text": chunk.chunk_text, 

219 "word_count": chunk.word_count, 

220 "start_char": chunk.start_char, 

221 "end_char": chunk.end_char, 

222 "embedding_model": chunk.embedding_model, 

223 "embedding_model_type": chunk.embedding_model_type.value 

224 if chunk.embedding_model_type 

225 else None, 

226 "embedding_dimension": chunk.embedding_dimension, 

227 "created_at": chunk.created_at, 

228 } 

229 ) 

230 

231 return render_template( 

232 "pages/document_chunks.html", 

233 document=document, 

234 chunks_by_collection=chunks_by_collection, 

235 total_chunks=len(chunks), 

236 ) 

237 

238 

239@rag_bp.route("/collections") 

240@login_required 

241def collections_page(): 

242 """Render the Collections page.""" 

243 return render_template("pages/collections.html", active_page="collections") 

244 

245 

246@rag_bp.route("/collections/<string:collection_id>") 

247@login_required 

248def collection_details_page(collection_id): 

249 """Render the Collection Details page.""" 

250 return render_template( 

251 "pages/collection_details.html", 

252 active_page="collections", 

253 collection_id=collection_id, 

254 ) 

255 

256 

257@rag_bp.route("/collections/<string:collection_id>/upload") 

258@login_required 

259def collection_upload_page(collection_id): 

260 """Render the Collection Upload page.""" 

261 # Get the upload PDF storage setting 

262 settings = get_settings_manager() 

263 upload_pdf_storage = settings.get_setting( 

264 "research_library.upload_pdf_storage", "none" 

265 ) 

266 # Only allow valid values for uploads (no filesystem) 

267 if upload_pdf_storage not in ("database", "none"): 

268 upload_pdf_storage = "none" 

269 

270 return render_template( 

271 "pages/collection_upload.html", 

272 active_page="collections", 

273 collection_id=collection_id, 

274 collection_name=None, # Could fetch from DB if needed 

275 upload_pdf_storage=upload_pdf_storage, 

276 ) 

277 

278 

279@rag_bp.route("/collections/create") 

280@login_required 

281def collection_create_page(): 

282 """Render the Create Collection page.""" 

283 return render_template( 

284 "pages/collection_create.html", active_page="collections" 

285 ) 

286 

287 

288# API Routes 

289 

290 

291@rag_bp.route("/api/rag/settings", methods=["GET"]) 

292@login_required 

293def get_current_settings(): 

294 """Get current RAG configuration from settings.""" 

295 import json as json_lib 

296 

297 try: 

298 settings = get_settings_manager() 

299 

300 # Get text separators and parse if needed 

301 text_separators = settings.get_setting( 

302 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]' 

303 ) 

304 if isinstance(text_separators, str): 304 ↛ 314line 304 didn't jump to line 314 because the condition on line 304 was always true

305 try: 

306 text_separators = json_lib.loads(text_separators) 

307 except json_lib.JSONDecodeError: 

308 logger.warning( 

309 f"Invalid JSON for local_search_text_separators setting: {text_separators!r}. " 

310 "Using default separators." 

311 ) 

312 text_separators = ["\n\n", "\n", ". ", " ", ""] 

313 

314 normalize_vectors = settings.get_setting( 

315 "local_search_normalize_vectors", True 

316 ) 

317 

318 return jsonify( 

319 { 

320 "success": True, 

321 "settings": { 

322 "embedding_provider": settings.get_setting( 

323 "local_search_embedding_provider", 

324 "sentence_transformers", 

325 ), 

326 "embedding_model": settings.get_setting( 

327 "local_search_embedding_model", "all-MiniLM-L6-v2" 

328 ), 

329 "chunk_size": settings.get_setting( 

330 "local_search_chunk_size", 1000 

331 ), 

332 "chunk_overlap": settings.get_setting( 

333 "local_search_chunk_overlap", 200 

334 ), 

335 "splitter_type": settings.get_setting( 

336 "local_search_splitter_type", "recursive" 

337 ), 

338 "text_separators": text_separators, 

339 "distance_metric": settings.get_setting( 

340 "local_search_distance_metric", "cosine" 

341 ), 

342 "normalize_vectors": normalize_vectors, 

343 "index_type": settings.get_setting( 

344 "local_search_index_type", "flat" 

345 ), 

346 }, 

347 } 

348 ) 

349 except Exception as e: 

350 return handle_api_error("getting RAG settings", e) 

351 

352 

353@rag_bp.route("/api/rag/test-embedding", methods=["POST"]) 

354@login_required 

355@require_json_body(error_format="success") 

356def test_embedding(): 

357 """Test an embedding configuration by generating a test embedding.""" 

358 

359 try: 

360 data = request.get_json() 

361 provider = data.get("provider") 

362 model = data.get("model") 

363 test_text = data.get("test_text", "This is a test.") 

364 

365 if not provider or not model: 

366 return jsonify( 

367 {"success": False, "error": "Provider and model are required"} 

368 ), 400 

369 

370 # Import embedding functions 

371 from ...embeddings.embeddings_config import ( 

372 get_embedding_function, 

373 ) 

374 

375 logger.info( 

376 f"Testing embedding with provider={provider}, model={model}" 

377 ) 

378 

379 # Get user's settings so provider URLs (e.g. Ollama) are resolved correctly 

380 settings = get_settings_manager() 

381 settings_snapshot = ( 

382 settings.get_all_settings() 

383 if hasattr(settings, "get_all_settings") 

384 else {} 

385 ) 

386 

387 # Get embedding function with the specified configuration 

388 start_time = time.time() 

389 embedding_func = get_embedding_function( 

390 provider=provider, 

391 model_name=model, 

392 settings_snapshot=settings_snapshot, 

393 ) 

394 

395 # Generate test embedding 

396 embedding = embedding_func([test_text])[0] 

397 response_time_ms = int((time.time() - start_time) * 1000) 

398 

399 # Get embedding dimension 

400 dimension = len(embedding) if hasattr(embedding, "__len__") else None 

401 

402 return jsonify( 

403 { 

404 "success": True, 

405 "dimension": dimension, 

406 "response_time_ms": response_time_ms, 

407 "provider": provider, 

408 "model": model, 

409 } 

410 ) 

411 

412 except Exception as e: 

413 logger.exception("Error during testing embedding") 

414 error_str = str(e).lower() 

415 

416 # Detect common signs that an LLM was selected instead of an embedding model 

417 llm_hints = [ 

418 "does not support", 

419 "not an embedding", 

420 "generate embedding", 

421 "invalid model", 

422 "not found", 

423 "expected float", 

424 "could not convert", 

425 "list index out of range", 

426 "object is not subscriptable", 

427 "not iterable", 

428 "json", 

429 "chat", 

430 "completion", 

431 ] 

432 is_likely_llm = any(hint in error_str for hint in llm_hints) 

433 

434 if is_likely_llm: 

435 user_message = ( 

436 f"Embedding test failed for model '{model}'. " 

437 "This is most likely because an LLM (language model) was selected " 

438 "instead of an embedding model. Please choose a dedicated embedding " 

439 "model (e.g. nomic-embed-text, mxbai-embed-large, " 

440 "all-MiniLM-L6-v2)." 

441 ) 

442 else: 

443 user_message = ( 

444 f"Embedding test failed for model '{model}'. " 

445 "If you are unsure whether the selected model supports embeddings, " 

446 "try a dedicated embedding model instead (e.g. nomic-embed-text, " 

447 "mxbai-embed-large, all-MiniLM-L6-v2)." 

448 ) 

449 

450 return jsonify({"success": False, "error": user_message}), 500 

451 

452 

453@rag_bp.route("/api/rag/models", methods=["GET"]) 

454@login_required 

455def get_available_models(): 

456 """Get list of available embedding providers and models.""" 

457 try: 

458 from ...embeddings.embeddings_config import _get_provider_classes 

459 

460 # Get current settings for providers 

461 settings = get_settings_manager() 

462 settings_snapshot = ( 

463 settings.get_all_settings() 

464 if hasattr(settings, "get_all_settings") 

465 else {} 

466 ) 

467 

468 # Get provider classes 

469 provider_classes = _get_provider_classes() 

470 

471 # Provider display names 

472 provider_labels = { 

473 "sentence_transformers": "Sentence Transformers (Local)", 

474 "ollama": "Ollama (Local)", 

475 "openai": "OpenAI API", 

476 } 

477 

478 # Get provider options and models by looping through providers 

479 provider_options = [] 

480 providers = {} 

481 

482 for provider_key, provider_class in provider_classes.items(): 

483 available = provider_class.is_available(settings_snapshot) 

484 

485 # Always show the provider in the dropdown so users can 

486 # configure its settings (e.g. fix a wrong Ollama URL). 

487 provider_options.append( 

488 { 

489 "value": provider_key, 

490 "label": provider_labels.get(provider_key, provider_key), 

491 "available": available, 

492 } 

493 ) 

494 

495 # Only fetch models when the provider is reachable. 

496 if available: 

497 models = provider_class.get_available_models(settings_snapshot) 

498 providers[provider_key] = [ 

499 { 

500 "value": m["value"], 

501 "label": m["label"], 

502 "provider": provider_key, 

503 **( 

504 {"is_embedding": m["is_embedding"]} 

505 if "is_embedding" in m 

506 else {} 

507 ), 

508 } 

509 for m in models 

510 ] 

511 else: 

512 providers[provider_key] = [] 

513 

514 return jsonify( 

515 { 

516 "success": True, 

517 "provider_options": provider_options, 

518 "providers": providers, 

519 } 

520 ) 

521 except Exception as e: 

522 return handle_api_error("getting available models", e) 

523 

524 

525@rag_bp.route("/api/rag/info", methods=["GET"]) 

526@login_required 

527def get_index_info(): 

528 """Get information about the current RAG index.""" 

529 from ...database.library_init import get_default_library_id 

530 

531 try: 

532 # Get collection_id from request or use default Library collection 

533 collection_id = request.args.get("collection_id") 

534 if not collection_id: 

535 collection_id = get_default_library_id(session["username"]) 

536 

537 logger.info( 

538 f"Getting RAG index info for collection_id: {collection_id}" 

539 ) 

540 

541 with get_rag_service(collection_id) as rag_service: 

542 info = rag_service.get_current_index_info(collection_id) 

543 

544 if info is None: 

545 logger.info( 

546 f"No RAG index found for collection_id: {collection_id}" 

547 ) 

548 return jsonify( 

549 {"success": True, "info": None, "message": "No index found"} 

550 ) 

551 

552 logger.info(f"Found RAG index for collection_id: {collection_id}") 

553 return jsonify({"success": True, "info": info}) 

554 except Exception as e: 

555 return handle_api_error("getting index info", e) 

556 

557 

558@rag_bp.route("/api/rag/stats", methods=["GET"]) 

559@login_required 

560def get_rag_stats(): 

561 """Get RAG statistics for a collection.""" 

562 from ...database.library_init import get_default_library_id 

563 

564 try: 

565 # Get collection_id from request or use default Library collection 

566 collection_id = request.args.get("collection_id") 

567 if not collection_id: 

568 collection_id = get_default_library_id(session["username"]) 

569 

570 with get_rag_service(collection_id) as rag_service: 

571 stats = rag_service.get_rag_stats(collection_id) 

572 

573 return jsonify({"success": True, "stats": stats}) 

574 except Exception as e: 

575 return handle_api_error("getting RAG stats", e) 

576 

577 

578@rag_bp.route("/api/rag/index-document", methods=["POST"]) 

579@login_required 

580@require_json_body(error_format="success") 

581def index_document(): 

582 """Index a single document in a collection.""" 

583 from ...database.library_init import get_default_library_id 

584 

585 try: 

586 data = request.get_json() 

587 text_doc_id = data.get("text_doc_id") 

588 force_reindex = data.get("force_reindex", False) 

589 collection_id = data.get("collection_id") 

590 

591 if not text_doc_id: 

592 return jsonify( 

593 {"success": False, "error": "text_doc_id is required"} 

594 ), 400 

595 

596 # Get collection_id from request or use default Library collection 

597 if not collection_id: 

598 collection_id = get_default_library_id(session["username"]) 

599 

600 with get_rag_service(collection_id) as rag_service: 

601 result = rag_service.index_document( 

602 text_doc_id, collection_id, force_reindex 

603 ) 

604 

605 if result["status"] == "error": 

606 return jsonify( 

607 {"success": False, "error": result.get("error")} 

608 ), 400 

609 

610 return jsonify({"success": True, "result": result}) 

611 except Exception as e: 

612 return handle_api_error(f"indexing document {text_doc_id}", e) 

613 

614 

615@rag_bp.route("/api/rag/remove-document", methods=["POST"]) 

616@login_required 

617@require_json_body(error_format="success") 

618def remove_document(): 

619 """Remove a document from RAG in a collection.""" 

620 from ...database.library_init import get_default_library_id 

621 

622 try: 

623 data = request.get_json() 

624 text_doc_id = data.get("text_doc_id") 

625 collection_id = data.get("collection_id") 

626 

627 if not text_doc_id: 

628 return jsonify( 

629 {"success": False, "error": "text_doc_id is required"} 

630 ), 400 

631 

632 # Get collection_id from request or use default Library collection 

633 if not collection_id: 633 ↛ 636line 633 didn't jump to line 636 because the condition on line 633 was always true

634 collection_id = get_default_library_id(session["username"]) 

635 

636 with get_rag_service(collection_id) as rag_service: 

637 result = rag_service.remove_document_from_rag( 

638 text_doc_id, collection_id 

639 ) 

640 

641 if result["status"] == "error": 

642 return jsonify( 

643 {"success": False, "error": result.get("error")} 

644 ), 400 

645 

646 return jsonify({"success": True, "result": result}) 

647 except Exception as e: 

648 return handle_api_error(f"removing document {text_doc_id}", e) 

649 

650 

651@rag_bp.route("/api/rag/index-research", methods=["POST"]) 

652@login_required 

653@require_json_body(error_format="success") 

654def index_research(): 

655 """Index all documents from a research.""" 

656 try: 

657 data = request.get_json() 

658 research_id = data.get("research_id") 

659 force_reindex = data.get("force_reindex", False) 

660 

661 if not research_id: 

662 return jsonify( 

663 {"success": False, "error": "research_id is required"} 

664 ), 400 

665 

666 with get_rag_service() as rag_service: 

667 results = rag_service.index_research_documents( 

668 research_id, force_reindex 

669 ) 

670 

671 return jsonify({"success": True, "results": results}) 

672 except Exception as e: 

673 return handle_api_error(f"indexing research {research_id}", e) 

674 

675 

676@rag_bp.route("/api/rag/index-all", methods=["GET"]) 

677@login_required 

678def index_all(): 

679 """Index all documents in a collection with Server-Sent Events progress.""" 

680 from ...database.session_context import get_user_db_session 

681 from ...database.library_init import get_default_library_id 

682 

683 force_reindex = parse_bool_arg("force_reindex") 

684 username = session["username"] 

685 

686 # Get collection_id from request or use default Library collection 

687 collection_id = request.args.get("collection_id") 

688 if not collection_id: 

689 collection_id = get_default_library_id(username) 

690 

691 logger.info( 

692 f"Starting index-all for collection_id: {collection_id}, force_reindex: {force_reindex}" 

693 ) 

694 

695 # Create RAG service in request context before generator runs 

696 rag_service = get_rag_service(collection_id) 

697 

698 def generate(): 

699 """Generator function for SSE progress updates.""" 

700 try: 

701 # Send initial status 

702 yield f"data: {json.dumps({'type': 'start', 'message': 'Starting bulk indexing...'})}\n\n" 

703 

704 # Get document IDs to index from DocumentCollection 

705 with get_user_db_session(username) as db_session: 

706 # Query Document joined with DocumentCollection for the collection 

707 query = ( 

708 db_session.query(Document.id, Document.title) 

709 .join( 

710 DocumentCollection, 

711 Document.id == DocumentCollection.document_id, 

712 ) 

713 .filter(DocumentCollection.collection_id == collection_id) 

714 ) 

715 

716 if not force_reindex: 

717 # Only index documents that haven't been indexed yet 

718 query = query.filter(DocumentCollection.indexed.is_(False)) 

719 

720 doc_info = [(doc_id, title) for doc_id, title in query.all()] 

721 

722 if not doc_info: 

723 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n" 

724 return 

725 

726 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []} 

727 total = len(doc_info) 

728 

729 # Process documents in batches to optimize performance 

730 # Get batch size from settings 

731 settings = get_settings_manager() 

732 batch_size = int( 

733 settings.get_setting("rag.indexing_batch_size", 15) 

734 ) 

735 processed = 0 

736 

737 for i in range(0, len(doc_info), batch_size): 

738 batch = doc_info[i : i + batch_size] 

739 

740 # Process batch with collection_id 

741 batch_results = rag_service.index_documents_batch( 

742 batch, collection_id, force_reindex 

743 ) 

744 

745 # Process results and send progress updates 

746 for j, (doc_id, title) in enumerate(batch): 

747 processed += 1 

748 result = batch_results[doc_id] 

749 

750 # Send progress update 

751 yield f"data: {json.dumps({'type': 'progress', 'current': processed, 'total': total, 'title': title, 'percent': int((processed / total) * 100)})}\n\n" 

752 

753 if result["status"] == "success": 

754 results["successful"] += 1 

755 elif result["status"] == "skipped": 

756 results["skipped"] += 1 

757 else: 

758 results["failed"] += 1 

759 results["errors"].append( 

760 { 

761 "doc_id": doc_id, 

762 "title": title, 

763 "error": result.get("error"), 

764 } 

765 ) 

766 

767 # Send completion status 

768 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n" 

769 

770 # Log final status for debugging 

771 logger.info( 

772 f"Bulk indexing complete: {results['successful']} successful, {results['skipped']} skipped, {results['failed']} failed" 

773 ) 

774 

775 except Exception: 

776 logger.exception("Error in bulk indexing") 

777 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n" 

778 finally: 

779 # Generator ``finally`` runs at stream completion (or client 

780 # disconnect via ``GeneratorExit``) — the safe place to release 

781 # the RAG service's embedding-manager httpx clients. Closing at 

782 # the outer route scope would tear it down before the streamed 

783 # generator runs. ``safe_close`` swallows close-time errors so 

784 # a broken Ollama doesn't mask the original generator outcome. 

785 safe_close(rag_service, "rag_service (index-all SSE)") 

786 

787 return Response( 

788 stream_with_context(generate()), mimetype="text/event-stream" 

789 ) 

790 

791 

792@rag_bp.route("/api/rag/configure", methods=["POST"]) 

793@login_required 

794@require_json_body(error_format="success") 

795def configure_rag(): 

796 """ 

797 Change RAG configuration (embedding model, chunk size, etc.). 

798 This will create a new index with the new configuration. 

799 """ 

800 import json as json_lib 

801 

802 try: 

803 data = request.get_json() 

804 embedding_model = data.get("embedding_model") 

805 embedding_provider = data.get("embedding_provider") 

806 chunk_size = data.get("chunk_size") 

807 chunk_overlap = data.get("chunk_overlap") 

808 collection_id = data.get("collection_id") 

809 

810 # Get new advanced settings (with defaults) 

811 splitter_type = data.get("splitter_type", "recursive") 

812 text_separators = data.get( 

813 "text_separators", ["\n\n", "\n", ". ", " ", ""] 

814 ) 

815 distance_metric = data.get("distance_metric", "cosine") 

816 normalize_vectors = data.get("normalize_vectors", True) 

817 index_type = data.get("index_type", "flat") 

818 

819 if not all( 

820 [ 

821 embedding_model, 

822 embedding_provider, 

823 chunk_size, 

824 chunk_overlap, 

825 ] 

826 ): 

827 return jsonify( 

828 { 

829 "success": False, 

830 "error": "All configuration parameters are required (embedding_model, embedding_provider, chunk_size, chunk_overlap)", 

831 } 

832 ), 400 

833 

834 # Save settings to database 

835 settings = get_settings_manager() 

836 settings.set_setting("local_search_embedding_model", embedding_model) 

837 settings.set_setting( 

838 "local_search_embedding_provider", embedding_provider 

839 ) 

840 settings.set_setting("local_search_chunk_size", int(chunk_size)) 

841 settings.set_setting("local_search_chunk_overlap", int(chunk_overlap)) 

842 

843 # Save new advanced settings 

844 settings.set_setting("local_search_splitter_type", splitter_type) 

845 # Convert list to JSON string for storage 

846 if isinstance(text_separators, list): 

847 text_separators_str = json_lib.dumps(text_separators) 

848 else: 

849 text_separators_str = text_separators 

850 settings.set_setting( 

851 "local_search_text_separators", text_separators_str 

852 ) 

853 settings.set_setting("local_search_distance_metric", distance_metric) 

854 settings.set_setting( 

855 "local_search_normalize_vectors", bool(normalize_vectors) 

856 ) 

857 settings.set_setting("local_search_index_type", index_type) 

858 

859 # If collection_id is provided, update that collection's configuration 

860 if collection_id: 

861 # Create new RAG service with new configuration 

862 with LibraryRAGService( 

863 username=session["username"], 

864 embedding_model=embedding_model, 

865 embedding_provider=embedding_provider, 

866 chunk_size=int(chunk_size), 

867 chunk_overlap=int(chunk_overlap), 

868 splitter_type=splitter_type, 

869 text_separators=text_separators 

870 if isinstance(text_separators, list) 

871 else json_lib.loads(text_separators), 

872 distance_metric=distance_metric, 

873 normalize_vectors=normalize_vectors, 

874 index_type=index_type, 

875 ) as new_rag_service: 

876 # Get or create new index with this configuration 

877 rag_index = new_rag_service._get_or_create_rag_index( 

878 collection_id 

879 ) 

880 

881 return jsonify( 

882 { 

883 "success": True, 

884 "message": "Configuration updated for collection. You can now index documents with the new settings.", 

885 "index_hash": rag_index.index_hash, 

886 } 

887 ) 

888 else: 

889 # Just saving default settings without updating a specific collection 

890 return jsonify( 

891 { 

892 "success": True, 

893 "message": "Default embedding settings saved successfully. New collections will use these settings.", 

894 } 

895 ) 

896 

897 except Exception as e: 

898 return handle_api_error("configuring RAG", e) 

899 

900 

901@rag_bp.route("/api/rag/documents", methods=["GET"]) 

902@login_required 

903def get_documents(): 

904 """Get library documents with their RAG status for the default Library collection (paginated).""" 

905 from ...database.session_context import get_user_db_session 

906 from ...database.library_init import get_default_library_id 

907 

908 try: 

909 # Get pagination parameters 

910 page = request.args.get("page", 1, type=int) 

911 per_page = request.args.get("per_page", 50, type=int) 

912 filter_type = request.args.get( 

913 "filter", "all" 

914 ) # all, indexed, unindexed 

915 

916 # Validate pagination parameters 

917 page = max(1, page) 

918 per_page = min(max(10, per_page), 100) # Limit between 10-100 

919 

920 # Close current thread's session to force fresh connection 

921 from ...database.thread_local_session import cleanup_current_thread 

922 

923 cleanup_current_thread() 

924 

925 username = session["username"] 

926 

927 # Get collection_id from request or use default Library collection 

928 collection_id = request.args.get("collection_id") 

929 if not collection_id: 

930 collection_id = get_default_library_id(username) 

931 

932 logger.info( 

933 f"Getting documents for collection_id: {collection_id}, filter: {filter_type}, page: {page}" 

934 ) 

935 

936 with get_user_db_session(username) as db_session: 

937 # Expire all cached objects to ensure we get fresh data from DB 

938 db_session.expire_all() 

939 

940 # Import RagDocumentStatus model 

941 from ...database.models.library import RagDocumentStatus 

942 

943 # Build base query - join Document with DocumentCollection for the collection 

944 # LEFT JOIN with rag_document_status to check indexed status 

945 query = ( 

946 db_session.query( 

947 Document, DocumentCollection, RagDocumentStatus 

948 ) 

949 .join( 

950 DocumentCollection, 

951 (DocumentCollection.document_id == Document.id) 

952 & (DocumentCollection.collection_id == collection_id), 

953 ) 

954 .outerjoin( 

955 RagDocumentStatus, 

956 (RagDocumentStatus.document_id == Document.id) 

957 & (RagDocumentStatus.collection_id == collection_id), 

958 ) 

959 ) 

960 

961 logger.debug(f"Base query for collection {collection_id}: {query}") 

962 

963 # Apply filters based on rag_document_status existence 

964 if filter_type == "indexed": 

965 query = query.filter(RagDocumentStatus.document_id.isnot(None)) 

966 elif filter_type == "unindexed": 

967 # Documents in collection but not indexed yet 

968 query = query.filter(RagDocumentStatus.document_id.is_(None)) 

969 

970 # Get total count before pagination 

971 total_count = query.count() 

972 logger.info( 

973 f"Found {total_count} total documents for collection {collection_id} with filter {filter_type}" 

974 ) 

975 

976 # Apply pagination 

977 results = ( 

978 query.order_by(Document.created_at.desc()) 

979 .limit(per_page) 

980 .offset((page - 1) * per_page) 

981 .all() 

982 ) 

983 

984 documents = [ 

985 { 

986 "id": doc.id, 

987 "title": doc.title, 

988 "original_url": doc.original_url, 

989 "rag_indexed": rag_status is not None, 

990 "chunk_count": rag_status.chunk_count if rag_status else 0, 

991 "created_at": doc.created_at.isoformat() 

992 if doc.created_at 

993 else None, 

994 } 

995 for doc, doc_collection, rag_status in results 

996 ] 

997 

998 # Debug logging to help diagnose indexing status issues 

999 indexed_count = sum(1 for d in documents if d["rag_indexed"]) 

1000 

1001 # Additional debug: check rag_document_status for this collection 

1002 all_indexed_statuses = ( 

1003 db_session.query(RagDocumentStatus) 

1004 .filter_by(collection_id=collection_id) 

1005 .all() 

1006 ) 

1007 logger.info( 

1008 f"rag_document_status table shows: {len(all_indexed_statuses)} documents indexed for collection {collection_id}" 

1009 ) 

1010 

1011 logger.info( 

1012 f"Returning {len(documents)} documents on page {page}: " 

1013 f"{indexed_count} indexed, {len(documents) - indexed_count} not indexed" 

1014 ) 

1015 

1016 return jsonify( 

1017 { 

1018 "success": True, 

1019 "documents": documents, 

1020 "pagination": { 

1021 "page": page, 

1022 "per_page": per_page, 

1023 "total": total_count, 

1024 "pages": (total_count + per_page - 1) // per_page, 

1025 }, 

1026 } 

1027 ) 

1028 except Exception as e: 

1029 return handle_api_error("getting documents", e) 

1030 

1031 

1032@rag_bp.route("/api/rag/index-local", methods=["GET"]) 

1033@login_required 

1034def index_local_library(): 

1035 """Index documents from a local folder with Server-Sent Events progress.""" 

1036 folder_path = request.args.get("path") 

1037 file_patterns = request.args.get( 

1038 "patterns", "*.pdf,*.txt,*.md,*.html" 

1039 ).split(",") 

1040 recursive = parse_bool_arg("recursive", default=True) 

1041 

1042 if not folder_path: 

1043 return jsonify({"success": False, "error": "Path is required"}), 400 

1044 

1045 # Validate and sanitize the path to prevent traversal attacks 

1046 try: 

1047 validated_path = PathValidator.validate_local_filesystem_path( 

1048 folder_path 

1049 ) 

1050 # Re-sanitize for static analyzer recognition (CodeQL) 

1051 path = PathValidator.sanitize_for_filesystem_ops(validated_path) 

1052 except ValueError: 

1053 logger.warning(f"Path validation failed for '{folder_path}'") 

1054 return jsonify({"success": False, "error": "Invalid path"}), 400 

1055 

1056 # Check path exists and is a directory 

1057 if not path.exists(): 

1058 return jsonify({"success": False, "error": "Path does not exist"}), 400 

1059 if not path.is_dir(): 1059 ↛ 1065line 1059 didn't jump to line 1065 because the condition on line 1059 was always true

1060 return jsonify( 

1061 {"success": False, "error": "Path is not a directory"} 

1062 ), 400 

1063 

1064 # Create RAG service in request context 

1065 rag_service = get_rag_service() 

1066 

1067 def generate(): 

1068 """Generator function for SSE progress updates.""" 

1069 try: 

1070 # Send initial status 

1071 yield f"data: {json.dumps({'type': 'start', 'message': f'Scanning folder: {path}'})}\n\n" 

1072 

1073 # Find all matching files 

1074 files_to_index = [] 

1075 for pattern in file_patterns: 

1076 pattern = pattern.strip() 

1077 if recursive: 

1078 search_pattern = str(path / "**" / pattern) 

1079 else: 

1080 search_pattern = str(path / pattern) 

1081 

1082 matching_files = glob.glob(search_pattern, recursive=recursive) 

1083 files_to_index.extend(matching_files) 

1084 

1085 # Remove duplicates and sort 

1086 files_to_index = sorted(set(files_to_index)) 

1087 

1088 if not files_to_index: 

1089 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No matching files found'}})}\n\n" 

1090 return 

1091 

1092 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []} 

1093 total = len(files_to_index) 

1094 

1095 # Index each file 

1096 for idx, file_path in enumerate(files_to_index, 1): 

1097 file_name = Path(file_path).name 

1098 

1099 # Send progress update 

1100 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': file_name, 'percent': int((idx / total) * 100)})}\n\n" 

1101 

1102 try: 

1103 # Index the file directly using RAG service 

1104 result = rag_service.index_local_file(file_path) 

1105 

1106 if result.get("status") == "success": 

1107 results["successful"] += 1 

1108 elif result.get("status") == "skipped": 

1109 results["skipped"] += 1 

1110 else: 

1111 results["failed"] += 1 

1112 results["errors"].append( 

1113 { 

1114 "file": file_name, 

1115 "error": result.get("error", "Unknown error"), 

1116 } 

1117 ) 

1118 except Exception: 

1119 results["failed"] += 1 

1120 results["errors"].append( 

1121 {"file": file_name, "error": "Failed to index file"} 

1122 ) 

1123 logger.exception(f"Error indexing file {file_path}") 

1124 

1125 # Send completion status 

1126 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n" 

1127 

1128 logger.info( 

1129 f"Local library indexing complete for {path}: " 

1130 f"{results['successful']} successful, " 

1131 f"{results['skipped']} skipped, " 

1132 f"{results['failed']} failed" 

1133 ) 

1134 

1135 except Exception: 

1136 logger.exception("Error in local library indexing") 

1137 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n" 

1138 finally: 

1139 # See parallel comment in index-all generator above — generator 

1140 # ``finally`` is the safe close site for streamed RAG services. 

1141 safe_close(rag_service, "rag_service (index-local SSE)") 

1142 

1143 return Response( 

1144 stream_with_context(generate()), mimetype="text/event-stream" 

1145 ) 

1146 

1147 

1148# Collection Management Routes 

1149 

1150 

1151@rag_bp.route("/api/collections", methods=["GET"]) 

1152@login_required 

1153def get_collections(): 

1154 """Get all document collections for the current user.""" 

1155 from ...database.session_context import get_user_db_session 

1156 

1157 try: 

1158 username = session["username"] 

1159 with get_user_db_session(username) as db_session: 

1160 # No need to filter by username - each user has their own database 

1161 collections = db_session.query(Collection).all() 

1162 

1163 result = [] 

1164 for coll in collections: 

1165 collection_data = { 

1166 "id": coll.id, 

1167 "name": coll.name, 

1168 "description": coll.description, 

1169 "created_at": coll.created_at.isoformat() 

1170 if coll.created_at 

1171 else None, 

1172 "collection_type": coll.collection_type, 

1173 "is_default": coll.is_default 

1174 if hasattr(coll, "is_default") 

1175 else False, 

1176 "document_count": len(coll.document_links) 

1177 if hasattr(coll, "document_links") 

1178 else 0, 

1179 "folder_count": len(coll.linked_folders) 

1180 if hasattr(coll, "linked_folders") 

1181 else 0, 

1182 } 

1183 

1184 # Include embedding metadata if available 

1185 if coll.embedding_model: 

1186 collection_data["embedding"] = { 

1187 "model": coll.embedding_model, 

1188 "provider": coll.embedding_model_type.value 

1189 if coll.embedding_model_type 

1190 else None, 

1191 "dimension": coll.embedding_dimension, 

1192 "chunk_size": coll.chunk_size, 

1193 "chunk_overlap": coll.chunk_overlap, 

1194 } 

1195 else: 

1196 collection_data["embedding"] = None 

1197 

1198 result.append(collection_data) 

1199 

1200 return jsonify({"success": True, "collections": result}) 

1201 except Exception as e: 

1202 return handle_api_error("getting collections", e) 

1203 

1204 

1205@rag_bp.route("/api/collections", methods=["POST"]) 

1206@login_required 

1207@require_json_body(error_format="success") 

1208def create_collection(): 

1209 """Create a new document collection.""" 

1210 from ...database.session_context import get_user_db_session 

1211 

1212 try: 

1213 data = request.get_json() 

1214 name = data.get("name", "").strip() 

1215 description = data.get("description", "").strip() 

1216 collection_type = data.get("type", "user_uploads") 

1217 

1218 if not name: 

1219 return jsonify({"success": False, "error": "Name is required"}), 400 

1220 

1221 username = session["username"] 

1222 with get_user_db_session(username) as db_session: 

1223 # Check if collection with this name already exists in this user's database 

1224 existing = db_session.query(Collection).filter_by(name=name).first() 

1225 

1226 if existing: 

1227 return jsonify( 

1228 { 

1229 "success": False, 

1230 "error": f"Collection '{name}' already exists", 

1231 } 

1232 ), 400 

1233 

1234 # Create new collection (no username needed - each user has their own DB) 

1235 # Note: created_at uses default=utcnow() in the model, so we don't need to set it manually 

1236 collection = Collection( 

1237 id=str(uuid.uuid4()), # Generate UUID for collection 

1238 name=name, 

1239 description=description, 

1240 collection_type=collection_type, 

1241 ) 

1242 

1243 db_session.add(collection) 

1244 db_session.commit() 

1245 

1246 return jsonify( 

1247 { 

1248 "success": True, 

1249 "collection": { 

1250 "id": collection.id, 

1251 "name": collection.name, 

1252 "description": collection.description, 

1253 "created_at": collection.created_at.isoformat(), 

1254 "collection_type": collection.collection_type, 

1255 }, 

1256 } 

1257 ) 

1258 except Exception as e: 

1259 return handle_api_error("creating collection", e) 

1260 

1261 

1262@rag_bp.route("/api/collections/<string:collection_id>", methods=["PUT"]) 

1263@login_required 

1264@require_json_body(error_format="success") 

1265def update_collection(collection_id): 

1266 """Update a collection's details.""" 

1267 from ...database.session_context import get_user_db_session 

1268 

1269 try: 

1270 data = request.get_json() 

1271 name = data.get("name", "").strip() 

1272 description = data.get("description", "").strip() 

1273 

1274 username = session["username"] 

1275 with get_user_db_session(username) as db_session: 

1276 # No need to filter by username - each user has their own database 

1277 collection = ( 

1278 db_session.query(Collection).filter_by(id=collection_id).first() 

1279 ) 

1280 

1281 if not collection: 

1282 return jsonify( 

1283 {"success": False, "error": "Collection not found"} 

1284 ), 404 

1285 

1286 if name: 

1287 # Check if new name conflicts with existing collection 

1288 existing = ( 

1289 db_session.query(Collection) 

1290 .filter( 

1291 Collection.name == name, 

1292 Collection.id != collection_id, 

1293 ) 

1294 .first() 

1295 ) 

1296 

1297 if existing: 

1298 return jsonify( 

1299 { 

1300 "success": False, 

1301 "error": f"Collection '{name}' already exists", 

1302 } 

1303 ), 400 

1304 

1305 collection.name = name 

1306 

1307 if description is not None: # Allow empty description 1307 ↛ 1310line 1307 didn't jump to line 1310 because the condition on line 1307 was always true

1308 collection.description = description 

1309 

1310 db_session.commit() 

1311 

1312 return jsonify( 

1313 { 

1314 "success": True, 

1315 "collection": { 

1316 "id": collection.id, 

1317 "name": collection.name, 

1318 "description": collection.description, 

1319 "created_at": collection.created_at.isoformat() 

1320 if collection.created_at 

1321 else None, 

1322 "collection_type": collection.collection_type, 

1323 }, 

1324 } 

1325 ) 

1326 except Exception as e: 

1327 return handle_api_error("updating collection", e) 

1328 

1329 

1330@rag_bp.route( 

1331 "/api/collections/<string:collection_id>/upload", methods=["POST"] 

1332) 

1333@login_required 

1334@upload_rate_limit_user 

1335@upload_rate_limit_ip 

1336def upload_to_collection(collection_id): 

1337 """Upload files to a collection.""" 

1338 from ...database.session_context import get_user_db_session 

1339 from ...security import sanitize_filename, UnsafeFilenameError 

1340 from pathlib import Path 

1341 import hashlib 

1342 import uuid 

1343 from ..services.pdf_storage_manager import PDFStorageManager 

1344 

1345 try: 

1346 if "files" not in request.files: 

1347 return jsonify( 

1348 {"success": False, "error": "No files provided"} 

1349 ), 400 

1350 

1351 files = request.files.getlist("files") 

1352 if not files: 1352 ↛ 1353line 1352 didn't jump to line 1353 because the condition on line 1352 was never true

1353 return jsonify( 

1354 {"success": False, "error": "No files selected"} 

1355 ), 400 

1356 

1357 # Bound the per-request file count BEFORE doing any work. The 

1358 # request-level MAX_CONTENT_LENGTH gate covers total bytes, but 

1359 # not file *count*; a request with 10000 zero-byte files would 

1360 # otherwise reach the loop below. 

1361 is_valid, error_msg = FileUploadValidator.validate_file_count( 

1362 len(files) 

1363 ) 

1364 if not is_valid: 

1365 return jsonify({"success": False, "error": error_msg}), 400 

1366 

1367 username = session["username"] 

1368 with get_user_db_session(username) as db_session: 

1369 # Verify collection exists in this user's database 

1370 collection = ( 

1371 db_session.query(Collection).filter_by(id=collection_id).first() 

1372 ) 

1373 

1374 if not collection: 

1375 return jsonify( 

1376 {"success": False, "error": "Collection not found"} 

1377 ), 404 

1378 

1379 # Get PDF storage mode from form data, falling back to user's setting 

1380 settings = get_settings_manager() 

1381 default_pdf_storage = settings.get_setting( 

1382 "research_library.upload_pdf_storage", "none" 

1383 ) 

1384 pdf_storage = request.form.get("pdf_storage", default_pdf_storage) 

1385 if pdf_storage not in ("database", "none"): 1385 ↛ 1388line 1385 didn't jump to line 1388 because the condition on line 1385 was never true

1386 # Security: user uploads can only use database (encrypted) or none (text-only) 

1387 # Filesystem storage is not allowed for user uploads 

1388 pdf_storage = "none" 

1389 

1390 # Initialize PDF storage manager if storing PDFs in database 

1391 pdf_storage_manager = None 

1392 if pdf_storage == "database": 

1393 library_root = settings.get_setting( 

1394 "research_library.storage_path", 

1395 str(get_library_directory()), 

1396 ) 

1397 library_root = str( 

1398 Path(os.path.expandvars(library_root)) 

1399 .expanduser() 

1400 .resolve() 

1401 ) 

1402 pdf_storage_manager = PDFStorageManager( 

1403 library_root=Path(library_root), storage_mode="database" 

1404 ) 

1405 logger.info("PDF storage mode: database (encrypted)") 

1406 else: 

1407 logger.info("PDF storage mode: none (text-only)") 

1408 

1409 uploaded_files = [] 

1410 errors = [] 

1411 

1412 for file in files: 

1413 if not file.filename: 

1414 continue 

1415 

1416 try: 

1417 filename = sanitize_filename(file.filename) 

1418 except UnsafeFilenameError: 

1419 errors.append( 

1420 { 

1421 "filename": "rejected", 

1422 "error": "Invalid or unsafe filename", 

1423 } 

1424 ) 

1425 continue 

1426 

1427 try: 

1428 # Pre-flight size check on Content-Length BEFORE reading 

1429 # bytes into memory. Cheap rejection for oversized files; 

1430 # avoids loading 50MB+ into memory just to discard it. 

1431 is_valid, error_msg = ( 

1432 FileUploadValidator.validate_file_size( 

1433 content_length=file.content_length, 

1434 file_content=None, 

1435 ) 

1436 ) 

1437 if not is_valid: 1437 ↛ 1438line 1437 didn't jump to line 1438 because the condition on line 1437 was never true

1438 errors.append( 

1439 {"filename": filename, "error": error_msg} 

1440 ) 

1441 continue 

1442 

1443 # Read file content 

1444 file_content = file.read() 

1445 file.seek(0) # Reset for potential re-reading 

1446 

1447 # Post-read size check (Content-Length can be missing or 

1448 # spoofed; the actual byte count is authoritative). 

1449 is_valid, error_msg = ( 

1450 FileUploadValidator.validate_file_size( 

1451 content_length=None, 

1452 file_content=file_content, 

1453 ) 

1454 ) 

1455 if not is_valid: 

1456 errors.append( 

1457 {"filename": filename, "error": error_msg} 

1458 ) 

1459 continue 

1460 

1461 # Calculate file hash for deduplication 

1462 file_hash = hashlib.sha256(file_content).hexdigest() 

1463 

1464 # Check if document already exists 

1465 existing_doc = ( 

1466 db_session.query(Document) 

1467 .filter_by(document_hash=file_hash) 

1468 .first() 

1469 ) 

1470 

1471 if existing_doc: 

1472 # Document exists, check if we can upgrade to include PDF 

1473 pdf_upgraded = False 

1474 if ( 

1475 pdf_storage == "database" 

1476 and pdf_storage_manager is not None 

1477 ): 

1478 # NOTE: Only the PDF magic-byte check is needed here. 

1479 # File count validation is already handled by Flask's MAX_CONTENT_LENGTH. 

1480 # Filename sanitization already happens via sanitize_filename() above. 

1481 # See PR #3145 review for details. 

1482 if file_content[:4] != b"%PDF": 1482 ↛ 1483line 1482 didn't jump to line 1483 because the condition on line 1482 was never true

1483 logger.debug( 

1484 "Skipping PDF upgrade for {}: not a PDF file", 

1485 filename, 

1486 ) 

1487 else: 

1488 pdf_upgraded = ( 

1489 pdf_storage_manager.upgrade_to_pdf( 

1490 document=existing_doc, 

1491 pdf_content=file_content, 

1492 session=db_session, 

1493 ) 

1494 ) 

1495 

1496 # Check if already in collection 

1497 existing_link = ( 

1498 db_session.query(DocumentCollection) 

1499 .filter_by( 

1500 document_id=existing_doc.id, 

1501 collection_id=collection_id, 

1502 ) 

1503 .first() 

1504 ) 

1505 

1506 if not existing_link: 

1507 ensure_in_collection( 

1508 db_session, existing_doc.id, collection_id 

1509 ) 

1510 status = "added_to_collection" 

1511 if pdf_upgraded: 

1512 status = "added_to_collection_pdf_upgraded" 

1513 uploaded_files.append( 

1514 { 

1515 "filename": existing_doc.filename, 

1516 "status": status, 

1517 "id": existing_doc.id, 

1518 "pdf_upgraded": pdf_upgraded, 

1519 } 

1520 ) 

1521 else: 

1522 status = "already_in_collection" 

1523 if pdf_upgraded: 

1524 status = "pdf_upgraded" 

1525 uploaded_files.append( 

1526 { 

1527 "filename": existing_doc.filename, 

1528 "status": status, 

1529 "id": existing_doc.id, 

1530 "pdf_upgraded": pdf_upgraded, 

1531 } 

1532 ) 

1533 else: 

1534 # Create new document 

1535 from ...document_loaders import ( 

1536 extract_text_from_bytes, 

1537 is_extension_supported, 

1538 ) 

1539 

1540 file_extension = Path(filename).suffix.lower() 

1541 

1542 # Validate extension is supported before extraction 

1543 if not is_extension_supported(file_extension): 

1544 errors.append( 

1545 { 

1546 "filename": filename, 

1547 "error": f"Unsupported format: {file_extension}", 

1548 } 

1549 ) 

1550 continue 

1551 

1552 # Use file_type without leading dot for storage 

1553 file_type = ( 

1554 file_extension[1:] 

1555 if file_extension.startswith(".") 

1556 else file_extension 

1557 ) 

1558 

1559 # Extract text using document_loaders module 

1560 extracted_text = extract_text_from_bytes( 

1561 file_content, file_extension, filename 

1562 ) 

1563 

1564 # Clean the extracted text to remove surrogate characters 

1565 if extracted_text: 

1566 from ...text_processing import remove_surrogates 

1567 

1568 extracted_text = remove_surrogates(extracted_text) 

1569 

1570 if not extracted_text: 

1571 errors.append( 

1572 { 

1573 "filename": filename, 

1574 "error": f"Could not extract text from {file_type} file", 

1575 } 

1576 ) 

1577 logger.warning( 

1578 f"Skipping file {filename} - no text could be extracted" 

1579 ) 

1580 continue 

1581 

1582 # Get or create the user_upload source type 

1583 logger.info( 

1584 f"Getting or creating user_upload source type for {filename}" 

1585 ) 

1586 source_type = ( 

1587 db_session.query(SourceType) 

1588 .filter_by(name="user_upload") 

1589 .first() 

1590 ) 

1591 if not source_type: 1591 ↛ 1592line 1591 didn't jump to line 1592 because the condition on line 1591 was never true

1592 logger.info("Creating new user_upload source type") 

1593 source_type = SourceType( 

1594 id=str(uuid.uuid4()), 

1595 name="user_upload", 

1596 display_name="User Upload", 

1597 description="Documents uploaded by users", 

1598 icon="fas fa-upload", 

1599 ) 

1600 db_session.add(source_type) 

1601 db_session.flush() 

1602 logger.info( 

1603 f"Created source type with ID: {source_type.id}" 

1604 ) 

1605 else: 

1606 logger.info( 

1607 f"Found existing source type with ID: {source_type.id}" 

1608 ) 

1609 

1610 # Create document with extracted text (no username needed - in user's own database) 

1611 # Note: uploaded_at uses default=utcnow() in the model, so we don't need to set it manually 

1612 doc_id = str(uuid.uuid4()) 

1613 logger.info( 

1614 f"Creating document {doc_id} for {filename}" 

1615 ) 

1616 

1617 # Determine storage mode and file_path 

1618 store_pdf_in_db = ( 

1619 pdf_storage == "database" 

1620 and file_type == "pdf" 

1621 and pdf_storage_manager is not None 

1622 ) 

1623 

1624 new_doc = Document( 

1625 id=doc_id, 

1626 source_type_id=source_type.id, 

1627 filename=filename, 

1628 document_hash=file_hash, 

1629 file_size=len(file_content), 

1630 file_type=file_type, 

1631 text_content=extracted_text, # Always store extracted text 

1632 file_path=None 

1633 if store_pdf_in_db 

1634 else FILE_PATH_TEXT_ONLY, 

1635 storage_mode="database" 

1636 if store_pdf_in_db 

1637 else "none", 

1638 ) 

1639 db_session.add(new_doc) 

1640 db_session.flush() # Get the ID 

1641 logger.info( 

1642 f"Document {new_doc.id} created successfully" 

1643 ) 

1644 

1645 # Store PDF in encrypted database if requested 

1646 pdf_stored = False 

1647 if store_pdf_in_db: 

1648 try: 

1649 pdf_storage_manager.save_pdf( 

1650 pdf_content=file_content, 

1651 document=new_doc, 

1652 session=db_session, 

1653 filename=filename, 

1654 ) 

1655 pdf_stored = True 

1656 logger.info( 

1657 f"PDF stored in encrypted database for {filename}" 

1658 ) 

1659 except Exception: 

1660 logger.exception( 

1661 f"Failed to store PDF in database for {filename}" 

1662 ) 

1663 # Continue without PDF storage - text is still saved 

1664 

1665 # Add to collection 

1666 ensure_in_collection( 

1667 db_session, new_doc.id, collection_id 

1668 ) 

1669 

1670 uploaded_files.append( 

1671 { 

1672 "filename": filename, 

1673 "status": "uploaded", 

1674 "id": new_doc.id, 

1675 "text_length": len(extracted_text), 

1676 "pdf_stored": pdf_stored, 

1677 } 

1678 ) 

1679 

1680 except Exception: 

1681 errors.append( 

1682 { 

1683 "filename": filename, 

1684 "error": "Failed to upload file", 

1685 } 

1686 ) 

1687 logger.exception(f"Error uploading file {filename}") 

1688 

1689 db_session.commit() 

1690 

1691 # Trigger auto-indexing for successfully uploaded documents 

1692 document_ids = [ 

1693 f["id"] 

1694 for f in uploaded_files 

1695 if f.get("status") in ("uploaded", "added_to_collection") 

1696 ] 

1697 if document_ids: 

1698 from ...database.session_passwords import session_password_store 

1699 

1700 session_id = session.get("session_id") 

1701 db_password = session_password_store.get_session_password( 

1702 username, session_id 

1703 ) 

1704 if db_password: 

1705 trigger_auto_index( 

1706 document_ids, collection_id, username, db_password 

1707 ) 

1708 

1709 return jsonify( 

1710 { 

1711 "success": True, 

1712 "uploaded": uploaded_files, 

1713 "errors": errors, 

1714 "summary": { 

1715 "total": len(files), 

1716 "successful": len(uploaded_files), 

1717 "failed": len(errors), 

1718 }, 

1719 } 

1720 ) 

1721 

1722 except Exception as e: 

1723 return handle_api_error("uploading files", e) 

1724 

1725 

1726@rag_bp.route( 

1727 "/api/collections/<string:collection_id>/documents", methods=["GET"] 

1728) 

1729@login_required 

1730def get_collection_documents(collection_id): 

1731 """Get all documents in a collection.""" 

1732 from ...database.session_context import get_user_db_session 

1733 

1734 try: 

1735 username = session["username"] 

1736 with get_user_db_session(username) as db_session: 

1737 # Verify collection exists in this user's database 

1738 collection = ( 

1739 db_session.query(Collection).filter_by(id=collection_id).first() 

1740 ) 

1741 

1742 if not collection: 

1743 return jsonify( 

1744 {"success": False, "error": "Collection not found"} 

1745 ), 404 

1746 

1747 # Get documents through junction table 

1748 doc_links = ( 

1749 db_session.query(DocumentCollection, Document) 

1750 .join(Document) 

1751 .filter(DocumentCollection.collection_id == collection_id) 

1752 .all() 

1753 ) 

1754 

1755 documents = [] 

1756 for link, doc in doc_links: 

1757 # Check if PDF file is stored 

1758 has_pdf = bool( 

1759 doc.file_path and doc.file_path not in FILE_PATH_SENTINELS 

1760 ) 

1761 has_text_db = bool(doc.text_content) 

1762 

1763 # Use title if available, otherwise filename 

1764 display_title = doc.title or doc.filename or "Untitled" 

1765 

1766 # Get source type name 

1767 source_type_name = ( 

1768 doc.source_type.name if doc.source_type else "unknown" 

1769 ) 

1770 

1771 # Check if document is in other collections 

1772 other_collections_count = ( 

1773 db_session.query(DocumentCollection) 

1774 .filter( 

1775 DocumentCollection.document_id == doc.id, 

1776 DocumentCollection.collection_id != collection_id, 

1777 ) 

1778 .count() 

1779 ) 

1780 

1781 documents.append( 

1782 { 

1783 "id": doc.id, 

1784 "filename": display_title, 

1785 "title": display_title, 

1786 "file_type": doc.file_type, 

1787 "file_size": doc.file_size, 

1788 "uploaded_at": doc.created_at.isoformat() 

1789 if doc.created_at 

1790 else None, 

1791 "indexed": link.indexed, 

1792 "chunk_count": link.chunk_count, 

1793 "last_indexed_at": link.last_indexed_at.isoformat() 

1794 if link.last_indexed_at 

1795 else None, 

1796 "has_pdf": has_pdf, 

1797 "has_text_db": has_text_db, 

1798 "source_type": source_type_name, 

1799 "in_other_collections": other_collections_count > 0, 

1800 "other_collections_count": other_collections_count, 

1801 } 

1802 ) 

1803 

1804 # Get index file size if available 

1805 index_file_size = None 

1806 index_file_size_bytes = None 

1807 collection_name = f"collection_{collection_id}" 

1808 rag_index = ( 

1809 db_session.query(RAGIndex) 

1810 .filter_by(collection_name=collection_name) 

1811 .first() 

1812 ) 

1813 if rag_index and rag_index.index_path: 

1814 from pathlib import Path 

1815 

1816 index_path = Path(rag_index.index_path) 

1817 if index_path.exists(): 1817 ↛ 1828line 1817 didn't jump to line 1828 because the condition on line 1817 was always true

1818 size_bytes = index_path.stat().st_size 

1819 index_file_size_bytes = size_bytes 

1820 # Format as human-readable 

1821 if size_bytes < 1024: 

1822 index_file_size = f"{size_bytes} B" 

1823 elif size_bytes < 1024 * 1024: 

1824 index_file_size = f"{size_bytes / 1024:.1f} KB" 

1825 else: 

1826 index_file_size = f"{size_bytes / (1024 * 1024):.1f} MB" 

1827 

1828 return jsonify( 

1829 { 

1830 "success": True, 

1831 "collection": { 

1832 "id": collection.id, 

1833 "name": collection.name, 

1834 "description": collection.description, 

1835 "embedding_model": collection.embedding_model, 

1836 "embedding_model_type": collection.embedding_model_type.value 

1837 if collection.embedding_model_type 

1838 else None, 

1839 "embedding_dimension": collection.embedding_dimension, 

1840 "chunk_size": collection.chunk_size, 

1841 "chunk_overlap": collection.chunk_overlap, 

1842 # Advanced settings 

1843 "splitter_type": collection.splitter_type, 

1844 "distance_metric": collection.distance_metric, 

1845 "index_type": collection.index_type, 

1846 "normalize_vectors": collection.normalize_vectors, 

1847 # Index file info 

1848 "index_file_size": index_file_size, 

1849 "index_file_size_bytes": index_file_size_bytes, 

1850 "collection_type": collection.collection_type, 

1851 }, 

1852 "documents": documents, 

1853 } 

1854 ) 

1855 

1856 except Exception as e: 

1857 return handle_api_error("getting collection documents", e) 

1858 

1859 

1860@rag_bp.route("/api/collections/<string:collection_id>/index", methods=["GET"]) 

1861@login_required 

1862def index_collection(collection_id): 

1863 """Index all documents in a collection with Server-Sent Events progress.""" 

1864 from ...database.session_context import get_user_db_session 

1865 from ...database.session_passwords import session_password_store 

1866 

1867 force_reindex = parse_bool_arg("force_reindex") 

1868 username = session["username"] 

1869 session_id = session.get("session_id") 

1870 

1871 logger.info(f"Starting index_collection, force_reindex={force_reindex}") 

1872 

1873 # Get password for thread access to encrypted database 

1874 db_password = None 

1875 if session_id: 1875 ↛ 1881line 1875 didn't jump to line 1881 because the condition on line 1875 was always true

1876 db_password = session_password_store.get_session_password( 

1877 username, session_id 

1878 ) 

1879 

1880 # Create RAG service — on force reindex use current default model 

1881 rag_service = get_rag_service(collection_id, use_defaults=force_reindex) 

1882 logger.info( 

1883 f"RAG service created: provider={rag_service.embedding_provider}" 

1884 ) 

1885 

1886 def generate(): 

1887 """Generator for SSE progress updates.""" 

1888 logger.info("SSE generator started") 

1889 try: 

1890 with get_user_db_session(username, db_password) as db_session: 

1891 # Verify collection exists in this user's database 

1892 collection = ( 

1893 db_session.query(Collection) 

1894 .filter_by(id=collection_id) 

1895 .first() 

1896 ) 

1897 

1898 if not collection: 

1899 yield f"data: {json.dumps({'type': 'error', 'error': 'Collection not found'})}\n\n" 

1900 return 

1901 

1902 # Store embedding metadata on first index or force reindex 

1903 if collection.embedding_model is None or force_reindex: 

1904 # Get embedding dimension from the embedding manager 

1905 embedding_dim = None 

1906 try: 

1907 # Try to get dimension from the embedding manager's provider 

1908 if hasattr(rag_service.embedding_manager, "provider"): 1908 ↛ 1909line 1908 didn't jump to line 1909 because the condition on line 1908 was never true

1909 provider = rag_service.embedding_manager.provider 

1910 if hasattr(provider, "embedding_dimension"): 

1911 embedding_dim = provider.embedding_dimension 

1912 except Exception: 

1913 logger.warning("Could not get embedding dimension") 

1914 

1915 # Store the embedding configuration used for this collection 

1916 collection.embedding_model = rag_service.embedding_model 

1917 collection.embedding_model_type = EmbeddingProvider( 

1918 rag_service.embedding_provider 

1919 ) 

1920 collection.embedding_dimension = embedding_dim 

1921 collection.chunk_size = rag_service.chunk_size 

1922 collection.chunk_overlap = rag_service.chunk_overlap 

1923 # Store advanced settings 

1924 collection.splitter_type = rag_service.splitter_type 

1925 collection.text_separators = rag_service.text_separators 

1926 collection.distance_metric = rag_service.distance_metric 

1927 # Ensure normalize_vectors is a proper boolean for database 

1928 collection.normalize_vectors = bool( 

1929 rag_service.normalize_vectors 

1930 ) 

1931 collection.index_type = rag_service.index_type 

1932 db_session.commit() 

1933 logger.info( 

1934 f"Stored embedding metadata for collection: provider={rag_service.embedding_provider}" 

1935 ) 

1936 

1937 # Get documents to index 

1938 query = ( 

1939 db_session.query(DocumentCollection, Document) 

1940 .join(Document) 

1941 .filter(DocumentCollection.collection_id == collection_id) 

1942 ) 

1943 

1944 if not force_reindex: 

1945 # IMPORTANT: DO NOT CHANGE THIS LINE! DO NOT REMOVE THIS COMMENT! 

1946 # We MUST use `== False` instead of `not DocumentCollection.indexed` 

1947 # The Python `not` operator does NOT work correctly in SQLAlchemy filters. 

1948 # Using `not` will cause the query to return NO results (zero documents). 

1949 # SQLAlchemy requires explicit comparison: `== False` or `== True` 

1950 # This has been fixed multiple times - DO NOT change it back to `not`! 

1951 query = query.filter(DocumentCollection.indexed == False) # noqa: E712 

1952 

1953 doc_links = query.all() 

1954 

1955 if not doc_links: 

1956 logger.info("No documents to index in collection") 

1957 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n" 

1958 return 

1959 

1960 total = len(doc_links) 

1961 logger.info(f"Found {total} documents to index") 

1962 results = { 

1963 "successful": 0, 

1964 "skipped": 0, 

1965 "failed": 0, 

1966 "errors": [], 

1967 } 

1968 

1969 yield f"data: {json.dumps({'type': 'start', 'message': f'Indexing {total} documents in collection: {collection.name}'})}\n\n" 

1970 

1971 for idx, (link, doc) in enumerate(doc_links, 1): 

1972 filename = doc.filename or doc.title or "Unknown" 

1973 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': filename, 'percent': int((idx / total) * 100)})}\n\n" 

1974 

1975 try: 

1976 logger.debug( 

1977 f"Indexing document {idx}/{total}: {filename}" 

1978 ) 

1979 

1980 # Run index_document in a separate thread to allow sending SSE heartbeats. 

1981 # This keeps the HTTP connection alive during long indexing operations, 

1982 # preventing timeouts from proxy servers (nginx) and browsers. 

1983 # The main thread periodically yields heartbeat comments while waiting. 

1984 result_queue = queue.Queue() 

1985 error_queue = queue.Queue() 

1986 

1987 def index_in_thread(): 

1988 try: 

1989 r = rag_service.index_document( 

1990 document_id=doc.id, 

1991 collection_id=collection_id, 

1992 force_reindex=force_reindex, 

1993 ) 

1994 result_queue.put(r) 

1995 except Exception as ex: 

1996 error_queue.put(ex) 

1997 finally: 

1998 try: 

1999 from ...database.thread_local_session import ( 

2000 cleanup_current_thread, 

2001 ) 

2002 

2003 cleanup_current_thread() 

2004 except Exception: 

2005 logger.debug( 

2006 "best-effort thread-local DB session cleanup", 

2007 exc_info=True, 

2008 ) 

2009 

2010 thread = threading.Thread(target=index_in_thread) 

2011 thread.start() 

2012 

2013 # Send heartbeats while waiting for the thread to complete 

2014 heartbeat_interval = 5 # seconds 

2015 while thread.is_alive(): 2015 ↛ 2016line 2015 didn't jump to line 2016 because the condition on line 2015 was never true

2016 thread.join(timeout=heartbeat_interval) 

2017 if thread.is_alive(): 

2018 # Send SSE comment as heartbeat (keeps connection alive) 

2019 yield f": heartbeat {idx}/{total}\n\n" 

2020 

2021 # Check for errors from thread 

2022 if not error_queue.empty(): 

2023 raise error_queue.get() # noqa: TRY301 — re-raises thread exception for per-document error handling 

2024 

2025 result = result_queue.get() 

2026 logger.info( 

2027 f"Indexed document {idx}/{total}: {filename} - status={result.get('status')}" 

2028 ) 

2029 

2030 if result.get("status") == "success": 

2031 results["successful"] += 1 

2032 # DocumentCollection status is already updated in index_document 

2033 # No need to update link here 

2034 elif result.get("status") == "skipped": 2034 ↛ 2037line 2034 didn't jump to line 2037 because the condition on line 2034 was always true

2035 results["skipped"] += 1 

2036 else: 

2037 results["failed"] += 1 

2038 error_msg = result.get("error", "Unknown error") 

2039 results["errors"].append( 

2040 { 

2041 "filename": filename, 

2042 "error": error_msg, 

2043 } 

2044 ) 

2045 logger.warning( 

2046 f"Failed to index {filename} ({idx}/{total}): {error_msg}" 

2047 ) 

2048 except Exception as e: 

2049 results["failed"] += 1 

2050 error_msg = str(e) or "Failed to index document" 

2051 results["errors"].append( 

2052 { 

2053 "filename": filename, 

2054 "error": error_msg, 

2055 } 

2056 ) 

2057 logger.exception( 

2058 f"Exception indexing document {filename} ({idx}/{total})" 

2059 ) 

2060 # Send error update to client so they know indexing is continuing 

2061 yield f"data: {json.dumps({'type': 'doc_error', 'filename': filename, 'error': error_msg})}\n\n" 

2062 

2063 db_session.commit() 

2064 # Ensure all changes are written to disk 

2065 db_session.flush() 

2066 

2067 logger.info( 

2068 f"Indexing complete: {results['successful']} successful, {results['failed']} failed, {results['skipped']} skipped" 

2069 ) 

2070 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n" 

2071 logger.info("SSE generator finished successfully") 

2072 

2073 except Exception: 

2074 logger.exception("Error in collection indexing") 

2075 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n" 

2076 finally: 

2077 # See parallel comment in index-all generator above — generator 

2078 # ``finally`` is the safe close site for streamed RAG services. 

2079 safe_close(rag_service, "rag_service (index-collection SSE)") 

2080 

2081 response = Response( 

2082 stream_with_context(generate()), mimetype="text/event-stream" 

2083 ) 

2084 # Prevent buffering for proper SSE streaming 

2085 response.headers["Cache-Control"] = "no-cache, no-transform" 

2086 response.headers["Connection"] = "keep-alive" 

2087 response.headers["X-Accel-Buffering"] = "no" 

2088 return response 

2089 

2090 

2091# ============================================================================= 

2092# Background Indexing Endpoints 

2093# ============================================================================= 

2094 

2095 

2096def _get_rag_service_for_thread( 

2097 collection_id: str, 

2098 username: str, 

2099 db_password: str, 

2100 use_defaults: bool = False, 

2101) -> LibraryRAGService: 

2102 """ 

2103 Create RAG service for use in background threads (no Flask context). 

2104 

2105 Delegates settings resolution to the shared rag_service_factory, then 

2106 propagates db_password to the embedding manager for thread-safe DB access. 

2107 """ 

2108 from ..services.rag_service_factory import ( 

2109 get_rag_service as _get_rag_service, 

2110 ) 

2111 

2112 service = _get_rag_service( 

2113 username, 

2114 collection_id, 

2115 use_defaults=use_defaults, 

2116 db_password=db_password, 

2117 ) 

2118 # The factory passes db_password to LibraryRAGService, but __init__ stores 

2119 # it in the backing field (_db_password) without propagating to sub-managers. 

2120 # Re-assign via the property setter to propagate to embedding_manager and 

2121 # integrity_manager, which need it for thread-safe session access. 

2122 service.db_password = db_password 

2123 return service 

2124 

2125 

2126def trigger_auto_index( 

2127 document_ids: list[str], 

2128 collection_id: str, 

2129 username: str, 

2130 db_password: str, 

2131) -> None: 

2132 """ 

2133 Trigger automatic RAG indexing for documents if auto-indexing is enabled. 

2134 

2135 This function checks the auto_index_enabled setting and spawns a background 

2136 thread to index the specified documents. It does not block the caller. 

2137 

2138 Args: 

2139 document_ids: List of document IDs to index 

2140 collection_id: The collection to index into 

2141 username: The username for database access 

2142 db_password: The user's database password for thread-safe access 

2143 """ 

2144 from ...database.session_context import get_user_db_session 

2145 

2146 if not document_ids: 

2147 logger.debug("No documents to auto-index") 

2148 return 

2149 

2150 # Check if auto-indexing is enabled 

2151 try: 

2152 with get_user_db_session(username, db_password) as db_session: 

2153 settings = SettingsManager(db_session) 

2154 auto_index_enabled = settings.get_bool_setting( 

2155 "research_library.auto_index_enabled", True 

2156 ) 

2157 

2158 if not auto_index_enabled: 

2159 logger.debug("Auto-indexing is disabled, skipping") 

2160 return 

2161 except Exception: 

2162 logger.exception( 

2163 "Failed to check auto-index setting, skipping auto-index" 

2164 ) 

2165 return 

2166 

2167 logger.info( 

2168 f"Auto-indexing {len(document_ids)} documents in collection {collection_id}" 

2169 ) 

2170 

2171 # Submit to thread pool (bounded concurrency, prevents thread proliferation) 

2172 executor = _get_auto_index_executor() 

2173 executor.submit( 

2174 _auto_index_documents_worker, 

2175 document_ids, 

2176 collection_id, 

2177 username, 

2178 db_password, 

2179 ) 

2180 

2181 

2182@thread_cleanup 

2183def _auto_index_documents_worker( 

2184 document_ids: list[str], 

2185 collection_id: str, 

2186 username: str, 

2187 db_password: str, 

2188) -> None: 

2189 """ 

2190 Background worker to index documents automatically. 

2191 

2192 This is a simpler worker than _background_index_worker - it doesn't track 

2193 progress via TaskMetadata since it's meant to be a lightweight auto-indexing 

2194 operation. 

2195 """ 

2196 

2197 try: 

2198 # Create RAG service (thread-safe, no Flask context needed) 

2199 with _get_rag_service_for_thread( 

2200 collection_id, username, db_password 

2201 ) as rag_service: 

2202 indexed_count = 0 

2203 for doc_id in document_ids: 

2204 try: 

2205 result = rag_service.index_document( 

2206 doc_id, collection_id, force_reindex=False 

2207 ) 

2208 if result.get("status") == "success": 

2209 indexed_count += 1 

2210 logger.debug(f"Auto-indexed document {doc_id}") 

2211 elif result.get("status") == "skipped": 2211 ↛ 2203line 2211 didn't jump to line 2203 because the condition on line 2211 was always true

2212 logger.debug( 

2213 f"Document {doc_id} already indexed, skipped" 

2214 ) 

2215 except Exception: 

2216 logger.exception(f"Failed to auto-index document {doc_id}") 

2217 

2218 logger.info( 

2219 f"Auto-indexing complete: {indexed_count}/{len(document_ids)} documents indexed" 

2220 ) 

2221 

2222 except Exception: 

2223 logger.exception("Auto-indexing worker failed") 

2224 

2225 

2226@thread_cleanup 

2227def _background_index_worker( 

2228 task_id: str, 

2229 collection_id: str, 

2230 username: str, 

2231 db_password: str, 

2232 force_reindex: bool, 

2233): 

2234 """ 

2235 Background worker thread for indexing documents. 

2236 Updates TaskMetadata with progress and checks for cancellation. 

2237 """ 

2238 from ...database.session_context import get_user_db_session 

2239 

2240 try: 

2241 # Create RAG service (thread-safe, no Flask context needed) 

2242 with _get_rag_service_for_thread( 

2243 collection_id, username, db_password, use_defaults=force_reindex 

2244 ) as rag_service: 

2245 with get_user_db_session(username, db_password) as db_session: 

2246 # Get collection 

2247 collection = ( 

2248 db_session.query(Collection) 

2249 .filter_by(id=collection_id) 

2250 .first() 

2251 ) 

2252 

2253 if not collection: 

2254 _update_task_status( 

2255 username, 

2256 db_password, 

2257 task_id, 

2258 status="failed", 

2259 error_message="Collection not found", 

2260 ) 

2261 return 

2262 

2263 # Store embedding metadata on first index or force reindex 

2264 if collection.embedding_model is None or force_reindex: 

2265 collection.embedding_model = rag_service.embedding_model 

2266 collection.embedding_model_type = EmbeddingProvider( 

2267 rag_service.embedding_provider 

2268 ) 

2269 collection.chunk_size = rag_service.chunk_size 

2270 collection.chunk_overlap = rag_service.chunk_overlap 

2271 collection.splitter_type = rag_service.splitter_type 

2272 collection.text_separators = rag_service.text_separators 

2273 collection.distance_metric = rag_service.distance_metric 

2274 collection.normalize_vectors = bool( 

2275 rag_service.normalize_vectors 

2276 ) 

2277 collection.index_type = rag_service.index_type 

2278 db_session.commit() 

2279 

2280 # Clean up old index data for a fresh rebuild. 

2281 # This prevents mixed-model vectors if cancelled midway 

2282 # and ensures accurate stats during partial reindex. 

2283 if force_reindex: 

2284 from ..deletion.utils.cascade_helper import CascadeHelper 

2285 

2286 collection_name = f"collection_{collection_id}" 

2287 

2288 # Delete all old document chunks from DB 

2289 deleted_chunks = CascadeHelper.delete_collection_chunks( 

2290 db_session, collection_name 

2291 ) 

2292 logger.info( 

2293 f"Cleared {deleted_chunks} old chunks for collection {collection_id}" 

2294 ) 

2295 

2296 # Delete old FAISS index files (.faiss + .pkl) and RAGIndex records 

2297 # RagDocumentStatus rows cascade-delete via FK ondelete="CASCADE" 

2298 rag_result = ( 

2299 CascadeHelper.delete_rag_indices_for_collection( 

2300 db_session, collection_name 

2301 ) 

2302 ) 

2303 logger.info( 

2304 f"Cleared old RAG indices for collection {collection_id}: {rag_result}" 

2305 ) 

2306 

2307 # Mark all documents as unindexed 

2308 db_session.query(DocumentCollection).filter_by( 

2309 collection_id=collection_id 

2310 ).update( 

2311 { 

2312 DocumentCollection.indexed: False, 

2313 DocumentCollection.chunk_count: 0, 

2314 } 

2315 ) 

2316 db_session.commit() 

2317 logger.info( 

2318 f"Reset indexing state for collection {collection_id}" 

2319 ) 

2320 

2321 # Get documents to index 

2322 query = ( 

2323 db_session.query(DocumentCollection, Document) 

2324 .join(Document) 

2325 .filter(DocumentCollection.collection_id == collection_id) 

2326 ) 

2327 

2328 if not force_reindex: 

2329 query = query.filter(DocumentCollection.indexed == False) # noqa: E712 

2330 

2331 doc_links = query.all() 

2332 

2333 if not doc_links: 

2334 _update_task_status( 

2335 username, 

2336 db_password, 

2337 task_id, 

2338 status="completed", 

2339 progress_message="No documents to index", 

2340 ) 

2341 return 

2342 

2343 total = len(doc_links) 

2344 results = {"successful": 0, "skipped": 0, "failed": 0} 

2345 

2346 # Update task with total count 

2347 _update_task_status( 

2348 username, 

2349 db_password, 

2350 task_id, 

2351 progress_total=total, 

2352 progress_message=f"Indexing {total} documents", 

2353 ) 

2354 

2355 for idx, (link, doc) in enumerate(doc_links, 1): 

2356 # Check if cancelled 

2357 if _is_task_cancelled(username, db_password, task_id): 

2358 _update_task_status( 

2359 username, 

2360 db_password, 

2361 task_id, 

2362 status="cancelled", 

2363 progress_message=f"Cancelled after {idx - 1}/{total} documents", 

2364 ) 

2365 logger.info(f"Indexing task {task_id} was cancelled") 

2366 return 

2367 

2368 filename = doc.filename or doc.title or "Unknown" 

2369 

2370 # Update progress with filename 

2371 _update_task_status( 

2372 username, 

2373 db_password, 

2374 task_id, 

2375 progress_current=idx, 

2376 progress_message=f"Indexing {idx}/{total}: {filename}", 

2377 ) 

2378 

2379 try: 

2380 result = rag_service.index_document( 

2381 document_id=doc.id, 

2382 collection_id=collection_id, 

2383 force_reindex=force_reindex, 

2384 ) 

2385 

2386 if result.get("status") == "success": 

2387 results["successful"] += 1 

2388 elif result.get("status") == "skipped": 

2389 results["skipped"] += 1 

2390 else: 

2391 results["failed"] += 1 

2392 

2393 except Exception: 

2394 results["failed"] += 1 

2395 logger.exception( 

2396 f"Error indexing document {idx}/{total}" 

2397 ) 

2398 

2399 db_session.commit() 

2400 

2401 # Mark as completed 

2402 _update_task_status( 

2403 username, 

2404 db_password, 

2405 task_id, 

2406 status="completed", 

2407 progress_current=total, 

2408 progress_message=f"Completed: {results['successful']} indexed, {results['failed']} failed, {results['skipped']} skipped", 

2409 ) 

2410 logger.info( 

2411 f"Background indexing task {task_id} completed: {results}" 

2412 ) 

2413 

2414 except Exception as e: 

2415 logger.exception(f"Background indexing task {task_id} failed") 

2416 _update_task_status( 

2417 username, 

2418 db_password, 

2419 task_id, 

2420 status="failed", 

2421 error_message=str(e), 

2422 ) 

2423 

2424 

2425def _update_task_status( 

2426 username: str, 

2427 db_password: str, 

2428 task_id: str, 

2429 status: str = None, 

2430 progress_current: int = None, 

2431 progress_total: int = None, 

2432 progress_message: str = None, 

2433 error_message: str = None, 

2434): 

2435 """Update task metadata in the database.""" 

2436 from ...database.session_context import get_user_db_session 

2437 

2438 try: 

2439 with get_user_db_session(username, db_password) as db_session: 

2440 task = ( 

2441 db_session.query(TaskMetadata) 

2442 .filter_by(task_id=task_id) 

2443 .first() 

2444 ) 

2445 if task: 

2446 if status is not None: 

2447 task.status = status 

2448 if status == "completed": 

2449 task.completed_at = datetime.now(UTC) 

2450 if progress_current is not None: 

2451 task.progress_current = progress_current 

2452 if progress_total is not None: 

2453 task.progress_total = progress_total 

2454 if progress_message is not None: 

2455 task.progress_message = progress_message 

2456 if error_message is not None: 

2457 task.error_message = error_message 

2458 db_session.commit() 

2459 except Exception: 

2460 logger.exception(f"Failed to update task status for {task_id}") 

2461 

2462 

2463def _is_task_cancelled(username: str, db_password: str, task_id: str) -> bool: 

2464 """Check if a task has been cancelled.""" 

2465 from ...database.session_context import get_user_db_session 

2466 

2467 try: 

2468 with get_user_db_session(username, db_password) as db_session: 

2469 task = ( 

2470 db_session.query(TaskMetadata) 

2471 .filter_by(task_id=task_id) 

2472 .first() 

2473 ) 

2474 return task and task.status == "cancelled" 

2475 except Exception: 

2476 logger.warning( 

2477 "Could not check cancellation status for task {}", task_id 

2478 ) 

2479 return False 

2480 

2481 

2482@rag_bp.route( 

2483 "/api/collections/<string:collection_id>/index/start", methods=["POST"] 

2484) 

2485@login_required 

2486def start_background_index(collection_id): 

2487 """Start background indexing for a collection.""" 

2488 from ...database.session_context import get_user_db_session 

2489 from ...database.session_passwords import session_password_store 

2490 

2491 username = session["username"] 

2492 session_id = session.get("session_id") 

2493 

2494 # Get password for thread access 

2495 db_password = None 

2496 if session_id: 2496 ↛ 2502line 2496 didn't jump to line 2502 because the condition on line 2496 was always true

2497 db_password = session_password_store.get_session_password( 

2498 username, session_id 

2499 ) 

2500 

2501 # Parse request body 

2502 data = request.get_json() or {} 

2503 force_reindex = data.get("force_reindex", False) 

2504 

2505 try: 

2506 with get_user_db_session(username, db_password) as db_session: 

2507 # Check if there's already an active indexing task for this collection 

2508 existing_task = ( 

2509 db_session.query(TaskMetadata) 

2510 .filter( 

2511 TaskMetadata.task_type == "indexing", 

2512 TaskMetadata.status == "processing", 

2513 ) 

2514 .first() 

2515 ) 

2516 

2517 if existing_task: 

2518 # Check if it's for this collection 

2519 metadata = existing_task.metadata_json or {} 

2520 if metadata.get("collection_id") == collection_id: 

2521 return jsonify( 

2522 { 

2523 "success": False, 

2524 "error": "Indexing is already in progress for this collection", 

2525 "task_id": existing_task.task_id, 

2526 } 

2527 ), 409 

2528 

2529 # Create new task 

2530 task_id = str(uuid.uuid4()) 

2531 task = TaskMetadata( 

2532 task_id=task_id, 

2533 status="processing", 

2534 task_type="indexing", 

2535 created_at=datetime.now(UTC), 

2536 started_at=datetime.now(UTC), 

2537 progress_current=0, 

2538 progress_total=0, 

2539 progress_message="Starting indexing...", 

2540 metadata_json={ 

2541 "collection_id": collection_id, 

2542 "force_reindex": force_reindex, 

2543 }, 

2544 ) 

2545 db_session.add(task) 

2546 db_session.commit() 

2547 

2548 # Start background thread 

2549 thread = threading.Thread( 

2550 target=_background_index_worker, 

2551 args=(task_id, collection_id, username, db_password, force_reindex), 

2552 daemon=True, 

2553 ) 

2554 thread.start() 

2555 

2556 logger.info( 

2557 f"Started background indexing task {task_id} for collection {collection_id}" 

2558 ) 

2559 

2560 return jsonify( 

2561 { 

2562 "success": True, 

2563 "task_id": task_id, 

2564 "message": "Indexing started in background", 

2565 } 

2566 ) 

2567 

2568 except Exception: 

2569 logger.exception("Failed to start background indexing") 

2570 return jsonify( 

2571 { 

2572 "success": False, 

2573 "error": "Failed to start indexing. Please try again.", 

2574 } 

2575 ), 500 

2576 

2577 

2578@rag_bp.route( 

2579 "/api/collections/<string:collection_id>/index/status", methods=["GET"] 

2580) 

2581@limiter.exempt 

2582@login_required 

2583def get_index_status(collection_id): 

2584 """Get the current indexing status for a collection.""" 

2585 from ...database.session_context import get_user_db_session 

2586 from ...database.session_passwords import session_password_store 

2587 

2588 username = session["username"] 

2589 session_id = session.get("session_id") 

2590 

2591 db_password = None 

2592 if session_id: 2592 ↛ 2597line 2592 didn't jump to line 2597 because the condition on line 2592 was always true

2593 db_password = session_password_store.get_session_password( 

2594 username, session_id 

2595 ) 

2596 

2597 try: 

2598 with get_user_db_session(username, db_password) as db_session: 

2599 # Find the most recent indexing task for this collection 

2600 task = ( 

2601 db_session.query(TaskMetadata) 

2602 .filter(TaskMetadata.task_type == "indexing") 

2603 .order_by(TaskMetadata.created_at.desc()) 

2604 .first() 

2605 ) 

2606 

2607 if not task: 

2608 return jsonify( 

2609 { 

2610 "status": "idle", 

2611 "message": "No indexing task found", 

2612 } 

2613 ) 

2614 

2615 # Check if it's for this collection 

2616 metadata = task.metadata_json or {} 

2617 if metadata.get("collection_id") != collection_id: 

2618 return jsonify( 

2619 { 

2620 "status": "idle", 

2621 "message": "No indexing task for this collection", 

2622 } 

2623 ) 

2624 

2625 return jsonify( 

2626 { 

2627 "task_id": task.task_id, 

2628 "status": task.status, 

2629 "progress_current": task.progress_current or 0, 

2630 "progress_total": task.progress_total or 0, 

2631 "progress_message": task.progress_message, 

2632 "error_message": task.error_message, 

2633 "created_at": task.created_at.isoformat() 

2634 if task.created_at 

2635 else None, 

2636 "completed_at": task.completed_at.isoformat() 

2637 if task.completed_at 

2638 else None, 

2639 } 

2640 ) 

2641 

2642 except Exception: 

2643 logger.exception("Failed to get index status") 

2644 return jsonify( 

2645 { 

2646 "status": "error", 

2647 "error": "Failed to get indexing status. Please try again.", 

2648 } 

2649 ), 500 

2650 

2651 

2652@rag_bp.route( 

2653 "/api/collections/<string:collection_id>/index/cancel", methods=["POST"] 

2654) 

2655@login_required 

2656def cancel_indexing(collection_id): 

2657 """Cancel an active indexing task for a collection.""" 

2658 from ...database.session_context import get_user_db_session 

2659 from ...database.session_passwords import session_password_store 

2660 

2661 username = session["username"] 

2662 session_id = session.get("session_id") 

2663 

2664 db_password = None 

2665 if session_id: 2665 ↛ 2670line 2665 didn't jump to line 2670 because the condition on line 2665 was always true

2666 db_password = session_password_store.get_session_password( 

2667 username, session_id 

2668 ) 

2669 

2670 try: 

2671 with get_user_db_session(username, db_password) as db_session: 

2672 # Find active indexing task for this collection 

2673 task = ( 

2674 db_session.query(TaskMetadata) 

2675 .filter( 

2676 TaskMetadata.task_type == "indexing", 

2677 TaskMetadata.status == "processing", 

2678 ) 

2679 .first() 

2680 ) 

2681 

2682 if not task: 

2683 return jsonify( 

2684 { 

2685 "success": False, 

2686 "error": "No active indexing task found", 

2687 } 

2688 ), 404 

2689 

2690 # Check if it's for this collection 

2691 metadata = task.metadata_json or {} 

2692 if metadata.get("collection_id") != collection_id: 

2693 return jsonify( 

2694 { 

2695 "success": False, 

2696 "error": "No active indexing task for this collection", 

2697 } 

2698 ), 404 

2699 

2700 # Mark as cancelled - the worker thread will check this 

2701 task.status = "cancelled" 

2702 task.progress_message = "Cancellation requested..." 

2703 db_session.commit() 

2704 

2705 logger.info( 

2706 f"Cancelled indexing task {task.task_id} for collection {collection_id}" 

2707 ) 

2708 

2709 return jsonify( 

2710 { 

2711 "success": True, 

2712 "message": "Cancellation requested", 

2713 "task_id": task.task_id, 

2714 } 

2715 ) 

2716 

2717 except Exception: 

2718 logger.exception("Failed to cancel indexing") 

2719 return jsonify( 

2720 { 

2721 "success": False, 

2722 "error": "Failed to cancel indexing. Please try again.", 

2723 } 

2724 ), 500 

2725 

2726 

2727# Research History Semantic Search Routes have been moved to 

2728# research_library.search.routes.search_routes