Coverage for src / local_deep_research / research_library / routes / rag_routes.py: 85%

987 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2RAG Management API Routes 

3 

4Provides endpoints for managing RAG indexing of library documents: 

5- Configure embedding models 

6- Index documents 

7- Get RAG statistics 

8- Bulk operations with progress tracking 

9""" 

10 

11import os 

12 

13from flask import ( 

14 Blueprint, 

15 jsonify, 

16 request, 

17 Response, 

18 render_template, 

19 session, 

20 stream_with_context, 

21) 

22from loguru import logger 

23import atexit 

24import glob 

25import json 

26import uuid 

27import time 

28import threading 

29import queue 

30from concurrent.futures import ThreadPoolExecutor 

31from datetime import datetime, UTC 

32from pathlib import Path 

33from typing import Optional 

34 

35from ...constants import FILE_PATH_SENTINELS, FILE_PATH_TEXT_ONLY 

36from ...security.decorators import require_json_body 

37from ...web.auth.decorators import login_required 

38from ...utilities.db_utils import get_settings_manager 

39from ..services.library_rag_service import LibraryRAGService 

40from ...settings.manager import SettingsManager 

41from ...security.path_validator import PathValidator 

42from ...security.rate_limiter import ( 

43 upload_rate_limit_ip, 

44 upload_rate_limit_user, 

45) 

46from ..utils import handle_api_error 

47from ...database.models.library import ( 

48 Document, 

49 Collection, 

50 DocumentCollection, 

51 RAGIndex, 

52 SourceType, 

53 EmbeddingProvider, 

54) 

55from ...database.models.queue import TaskMetadata 

56from ...database.thread_local_session import thread_cleanup 

57from ...security.rate_limiter import limiter 

58from ...config.paths import get_library_directory 

59 

60rag_bp = Blueprint("rag", __name__, url_prefix="/library") 

61 

62# NOTE: Routes use session["username"] (not .get()) intentionally. 

63# @login_required guarantees the key exists; direct access fails fast 

64# if the decorator is ever removed. 

65 

66# Global ThreadPoolExecutor for auto-indexing to prevent thread proliferation 

67_auto_index_executor: ThreadPoolExecutor | None = None 

68_auto_index_executor_lock = threading.Lock() 

69 

70 

71def _get_auto_index_executor() -> ThreadPoolExecutor: 

72 """Get or create the global auto-indexing executor (thread-safe).""" 

73 global _auto_index_executor 

74 with _auto_index_executor_lock: 

75 if _auto_index_executor is None: 

76 _auto_index_executor = ThreadPoolExecutor( 

77 max_workers=4, 

78 thread_name_prefix="auto_index_", 

79 ) 

80 return _auto_index_executor 

81 

82 

83def _shutdown_auto_index_executor() -> None: 

84 """Shutdown the auto-index executor gracefully.""" 

85 global _auto_index_executor 

86 if _auto_index_executor is not None: 

87 _auto_index_executor.shutdown(wait=True) 

88 _auto_index_executor = None 

89 

90 

91atexit.register(_shutdown_auto_index_executor) 

92 

93 

94def get_rag_service( 

95 collection_id: Optional[str] = None, 

96 use_defaults: bool = False, 

97) -> LibraryRAGService: 

98 """ 

99 Get RAG service instance with appropriate settings. 

100 

101 Delegates to rag_service_factory.get_rag_service() with the current 

102 Flask session username. For non-Flask contexts, use the factory directly. 

103 

104 Args: 

105 collection_id: Optional collection UUID to load stored settings from 

106 use_defaults: When True, ignore stored collection settings and use 

107 current defaults. Pass True on force-reindex so that the new 

108 default embedding model is picked up. 

109 """ 

110 from ..services.rag_service_factory import ( 

111 get_rag_service as _get_rag_service, 

112 ) 

113 from ...database.session_passwords import session_password_store 

114 

115 username = session["username"] 

116 session_id = session.get("session_id") 

117 db_password = None 

118 if session_id: 

119 db_password = session_password_store.get_session_password( 

120 username, session_id 

121 ) 

122 return _get_rag_service( 

123 username, 

124 collection_id, 

125 use_defaults=use_defaults, 

126 db_password=db_password, 

127 ) 

128 

129 

130# Config API Routes 

131 

132 

133@rag_bp.route("/api/config/supported-formats", methods=["GET"]) 

134@login_required 

135def get_supported_formats(): 

136 """Return list of supported file formats for upload. 

137 

138 This endpoint provides the single source of truth for supported file 

139 extensions, pulling from the document_loaders registry. The UI can 

140 use this to dynamically update the file input accept attribute. 

141 """ 

142 from ...document_loaders import get_supported_extensions 

143 

144 extensions = get_supported_extensions() 

145 # Sort extensions for consistent display 

146 extensions = sorted(extensions) 

147 

148 return jsonify( 

149 { 

150 "extensions": extensions, 

151 "accept_string": ",".join(extensions), 

152 "count": len(extensions), 

153 } 

154 ) 

155 

156 

157# Page Routes 

158 

159 

160@rag_bp.route("/embedding-settings") 

161@login_required 

162def embedding_settings_page(): 

163 """Render the Embedding Settings page.""" 

164 return render_template( 

165 "pages/embedding_settings.html", active_page="embedding-settings" 

166 ) 

167 

168 

169@rag_bp.route("/document/<string:document_id>/chunks") 

170@login_required 

171def view_document_chunks(document_id): 

172 """View all chunks for a document across all collections.""" 

173 from ...database.session_context import get_user_db_session 

174 from ...database.models.library import DocumentChunk 

175 

176 username = session["username"] 

177 

178 with get_user_db_session(username) as db_session: 

179 # Get document info 

180 document = db_session.query(Document).filter_by(id=document_id).first() 

181 

182 if not document: 

183 return "Document not found", 404 

184 

185 # Get all chunks for this document 

186 chunks = ( 

187 db_session.query(DocumentChunk) 

188 .filter(DocumentChunk.source_id == document_id) 

189 .order_by(DocumentChunk.collection_name, DocumentChunk.chunk_index) 

190 .all() 

191 ) 

192 

193 # Group chunks by collection 

194 chunks_by_collection = {} 

195 for chunk in chunks: 

196 coll_name = chunk.collection_name 

197 if coll_name not in chunks_by_collection: 197 ↛ 211line 197 didn't jump to line 211 because the condition on line 197 was always true

198 # Get collection display name 

199 collection_id = coll_name.replace("collection_", "") 

200 collection = ( 

201 db_session.query(Collection) 

202 .filter_by(id=collection_id) 

203 .first() 

204 ) 

205 chunks_by_collection[coll_name] = { 

206 "name": collection.name if collection else coll_name, 

207 "id": collection_id, 

208 "chunks": [], 

209 } 

210 

211 chunks_by_collection[coll_name]["chunks"].append( 

212 { 

213 "id": chunk.id, 

214 "index": chunk.chunk_index, 

215 "text": chunk.chunk_text, 

216 "word_count": chunk.word_count, 

217 "start_char": chunk.start_char, 

218 "end_char": chunk.end_char, 

219 "embedding_model": chunk.embedding_model, 

220 "embedding_model_type": chunk.embedding_model_type.value 

221 if chunk.embedding_model_type 

222 else None, 

223 "embedding_dimension": chunk.embedding_dimension, 

224 "created_at": chunk.created_at, 

225 } 

226 ) 

227 

228 return render_template( 

229 "pages/document_chunks.html", 

230 document=document, 

231 chunks_by_collection=chunks_by_collection, 

232 total_chunks=len(chunks), 

233 ) 

234 

235 

236@rag_bp.route("/collections") 

237@login_required 

238def collections_page(): 

239 """Render the Collections page.""" 

240 return render_template("pages/collections.html", active_page="collections") 

241 

242 

243@rag_bp.route("/collections/<string:collection_id>") 

244@login_required 

245def collection_details_page(collection_id): 

246 """Render the Collection Details page.""" 

247 return render_template( 

248 "pages/collection_details.html", 

249 active_page="collections", 

250 collection_id=collection_id, 

251 ) 

252 

253 

254@rag_bp.route("/collections/<string:collection_id>/upload") 

255@login_required 

256def collection_upload_page(collection_id): 

257 """Render the Collection Upload page.""" 

258 # Get the upload PDF storage setting 

259 settings = get_settings_manager() 

260 upload_pdf_storage = settings.get_setting( 

261 "research_library.upload_pdf_storage", "none" 

262 ) 

263 # Only allow valid values for uploads (no filesystem) 

264 if upload_pdf_storage not in ("database", "none"): 

265 upload_pdf_storage = "none" 

266 

267 return render_template( 

268 "pages/collection_upload.html", 

269 active_page="collections", 

270 collection_id=collection_id, 

271 collection_name=None, # Could fetch from DB if needed 

272 upload_pdf_storage=upload_pdf_storage, 

273 ) 

274 

275 

276@rag_bp.route("/collections/create") 

277@login_required 

278def collection_create_page(): 

279 """Render the Create Collection page.""" 

280 return render_template( 

281 "pages/collection_create.html", active_page="collections" 

282 ) 

283 

284 

285# API Routes 

286 

287 

288@rag_bp.route("/api/rag/settings", methods=["GET"]) 

289@login_required 

290def get_current_settings(): 

291 """Get current RAG configuration from settings.""" 

292 import json as json_lib 

293 

294 try: 

295 settings = get_settings_manager() 

296 

297 # Get text separators and parse if needed 

298 text_separators = settings.get_setting( 

299 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]' 

300 ) 

301 if isinstance(text_separators, str): 301 ↛ 311line 301 didn't jump to line 311 because the condition on line 301 was always true

302 try: 

303 text_separators = json_lib.loads(text_separators) 

304 except json_lib.JSONDecodeError: 

305 logger.warning( 

306 f"Invalid JSON for local_search_text_separators setting: {text_separators!r}. " 

307 "Using default separators." 

308 ) 

309 text_separators = ["\n\n", "\n", ". ", " ", ""] 

310 

311 normalize_vectors = settings.get_setting( 

312 "local_search_normalize_vectors", True 

313 ) 

314 

315 return jsonify( 

316 { 

317 "success": True, 

318 "settings": { 

319 "embedding_provider": settings.get_setting( 

320 "local_search_embedding_provider", 

321 "sentence_transformers", 

322 ), 

323 "embedding_model": settings.get_setting( 

324 "local_search_embedding_model", "all-MiniLM-L6-v2" 

325 ), 

326 "chunk_size": settings.get_setting( 

327 "local_search_chunk_size", 1000 

328 ), 

329 "chunk_overlap": settings.get_setting( 

330 "local_search_chunk_overlap", 200 

331 ), 

332 "splitter_type": settings.get_setting( 

333 "local_search_splitter_type", "recursive" 

334 ), 

335 "text_separators": text_separators, 

336 "distance_metric": settings.get_setting( 

337 "local_search_distance_metric", "cosine" 

338 ), 

339 "normalize_vectors": normalize_vectors, 

340 "index_type": settings.get_setting( 

341 "local_search_index_type", "flat" 

342 ), 

343 }, 

344 } 

345 ) 

346 except Exception as e: 

347 return handle_api_error("getting RAG settings", e) 

348 

349 

350@rag_bp.route("/api/rag/test-embedding", methods=["POST"]) 

351@login_required 

352@require_json_body(error_format="success") 

353def test_embedding(): 

354 """Test an embedding configuration by generating a test embedding.""" 

355 

356 try: 

357 data = request.json 

358 provider = data.get("provider") 

359 model = data.get("model") 

360 test_text = data.get("test_text", "This is a test.") 

361 

362 if not provider or not model: 

363 return jsonify( 

364 {"success": False, "error": "Provider and model are required"} 

365 ), 400 

366 

367 # Import embedding functions 

368 from ...embeddings.embeddings_config import ( 

369 get_embedding_function, 

370 ) 

371 

372 logger.info( 

373 f"Testing embedding with provider={provider}, model={model}" 

374 ) 

375 

376 # Get user's settings so provider URLs (e.g. Ollama) are resolved correctly 

377 settings = get_settings_manager() 

378 settings_snapshot = ( 

379 settings.get_all_settings() 

380 if hasattr(settings, "get_all_settings") 

381 else {} 

382 ) 

383 

384 # Get embedding function with the specified configuration 

385 start_time = time.time() 

386 embedding_func = get_embedding_function( 

387 provider=provider, 

388 model_name=model, 

389 settings_snapshot=settings_snapshot, 

390 ) 

391 

392 # Generate test embedding 

393 embedding = embedding_func([test_text])[0] 

394 response_time_ms = int((time.time() - start_time) * 1000) 

395 

396 # Get embedding dimension 

397 dimension = len(embedding) if hasattr(embedding, "__len__") else None 

398 

399 return jsonify( 

400 { 

401 "success": True, 

402 "dimension": dimension, 

403 "response_time_ms": response_time_ms, 

404 "provider": provider, 

405 "model": model, 

406 } 

407 ) 

408 

409 except Exception as e: 

410 logger.exception("Error during testing embedding") 

411 error_str = str(e).lower() 

412 

413 # Detect common signs that an LLM was selected instead of an embedding model 

414 llm_hints = [ 

415 "does not support", 

416 "not an embedding", 

417 "generate embedding", 

418 "invalid model", 

419 "not found", 

420 "expected float", 

421 "could not convert", 

422 "list index out of range", 

423 "object is not subscriptable", 

424 "not iterable", 

425 "json", 

426 "chat", 

427 "completion", 

428 ] 

429 is_likely_llm = any(hint in error_str for hint in llm_hints) 

430 

431 if is_likely_llm: 

432 user_message = ( 

433 f"Embedding test failed for model '{model}'. " 

434 "This is most likely because an LLM (language model) was selected " 

435 "instead of an embedding model. Please choose a dedicated embedding " 

436 "model (e.g. nomic-embed-text, mxbai-embed-large, " 

437 "all-MiniLM-L6-v2)." 

438 ) 

439 else: 

440 user_message = ( 

441 f"Embedding test failed for model '{model}'. " 

442 "If you are unsure whether the selected model supports embeddings, " 

443 "try a dedicated embedding model instead (e.g. nomic-embed-text, " 

444 "mxbai-embed-large, all-MiniLM-L6-v2)." 

445 ) 

446 

447 return jsonify({"success": False, "error": user_message}), 500 

448 

449 

450@rag_bp.route("/api/rag/models", methods=["GET"]) 

451@login_required 

452def get_available_models(): 

453 """Get list of available embedding providers and models.""" 

454 try: 

455 from ...embeddings.embeddings_config import _get_provider_classes 

456 

457 # Get current settings for providers 

458 settings = get_settings_manager() 

459 settings_snapshot = ( 

460 settings.get_all_settings() 

461 if hasattr(settings, "get_all_settings") 

462 else {} 

463 ) 

464 

465 # Get provider classes 

466 provider_classes = _get_provider_classes() 

467 

468 # Provider display names 

469 provider_labels = { 

470 "sentence_transformers": "Sentence Transformers (Local)", 

471 "ollama": "Ollama (Local)", 

472 "openai": "OpenAI API", 

473 } 

474 

475 # Get provider options and models by looping through providers 

476 provider_options = [] 

477 providers = {} 

478 

479 for provider_key, provider_class in provider_classes.items(): 

480 available = provider_class.is_available(settings_snapshot) 

481 

482 # Always show the provider in the dropdown so users can 

483 # configure its settings (e.g. fix a wrong Ollama URL). 

484 provider_options.append( 

485 { 

486 "value": provider_key, 

487 "label": provider_labels.get(provider_key, provider_key), 

488 "available": available, 

489 } 

490 ) 

491 

492 # Only fetch models when the provider is reachable. 

493 if available: 

494 models = provider_class.get_available_models(settings_snapshot) 

495 providers[provider_key] = [ 

496 { 

497 "value": m["value"], 

498 "label": m["label"], 

499 "provider": provider_key, 

500 **( 

501 {"is_embedding": m["is_embedding"]} 

502 if "is_embedding" in m 

503 else {} 

504 ), 

505 } 

506 for m in models 

507 ] 

508 else: 

509 providers[provider_key] = [] 

510 

511 return jsonify( 

512 { 

513 "success": True, 

514 "provider_options": provider_options, 

515 "providers": providers, 

516 } 

517 ) 

518 except Exception as e: 

519 return handle_api_error("getting available models", e) 

520 

521 

522@rag_bp.route("/api/rag/info", methods=["GET"]) 

523@login_required 

524def get_index_info(): 

525 """Get information about the current RAG index.""" 

526 from ...database.library_init import get_default_library_id 

527 

528 try: 

529 # Get collection_id from request or use default Library collection 

530 collection_id = request.args.get("collection_id") 

531 if not collection_id: 

532 collection_id = get_default_library_id(session["username"]) 

533 

534 logger.info( 

535 f"Getting RAG index info for collection_id: {collection_id}" 

536 ) 

537 

538 rag_service = get_rag_service(collection_id) 

539 info = rag_service.get_current_index_info(collection_id) 

540 

541 if info is None: 

542 logger.info( 

543 f"No RAG index found for collection_id: {collection_id}" 

544 ) 

545 return jsonify( 

546 {"success": True, "info": None, "message": "No index found"} 

547 ) 

548 

549 logger.info(f"Found RAG index for collection_id: {collection_id}") 

550 return jsonify({"success": True, "info": info}) 

551 except Exception as e: 

552 return handle_api_error("getting index info", e) 

553 

554 

555@rag_bp.route("/api/rag/stats", methods=["GET"]) 

556@login_required 

557def get_rag_stats(): 

558 """Get RAG statistics for a collection.""" 

559 from ...database.library_init import get_default_library_id 

560 

561 try: 

562 # Get collection_id from request or use default Library collection 

563 collection_id = request.args.get("collection_id") 

564 if not collection_id: 

565 collection_id = get_default_library_id(session["username"]) 

566 

567 rag_service = get_rag_service(collection_id) 

568 stats = rag_service.get_rag_stats(collection_id) 

569 

570 return jsonify({"success": True, "stats": stats}) 

571 except Exception as e: 

572 return handle_api_error("getting RAG stats", e) 

573 

574 

575@rag_bp.route("/api/rag/index-document", methods=["POST"]) 

576@login_required 

577@require_json_body(error_format="success") 

578def index_document(): 

579 """Index a single document in a collection.""" 

580 from ...database.library_init import get_default_library_id 

581 

582 try: 

583 data = request.get_json() 

584 text_doc_id = data.get("text_doc_id") 

585 force_reindex = data.get("force_reindex", False) 

586 collection_id = data.get("collection_id") 

587 

588 if not text_doc_id: 

589 return jsonify( 

590 {"success": False, "error": "text_doc_id is required"} 

591 ), 400 

592 

593 # Get collection_id from request or use default Library collection 

594 if not collection_id: 

595 collection_id = get_default_library_id(session["username"]) 

596 

597 rag_service = get_rag_service(collection_id) 

598 result = rag_service.index_document( 

599 text_doc_id, collection_id, force_reindex 

600 ) 

601 

602 if result["status"] == "error": 

603 return jsonify( 

604 {"success": False, "error": result.get("error")} 

605 ), 400 

606 

607 return jsonify({"success": True, "result": result}) 

608 except Exception as e: 

609 return handle_api_error(f"indexing document {text_doc_id}", e) 

610 

611 

612@rag_bp.route("/api/rag/remove-document", methods=["POST"]) 

613@login_required 

614@require_json_body(error_format="success") 

615def remove_document(): 

616 """Remove a document from RAG in a collection.""" 

617 from ...database.library_init import get_default_library_id 

618 

619 try: 

620 data = request.get_json() 

621 text_doc_id = data.get("text_doc_id") 

622 collection_id = data.get("collection_id") 

623 

624 if not text_doc_id: 

625 return jsonify( 

626 {"success": False, "error": "text_doc_id is required"} 

627 ), 400 

628 

629 # Get collection_id from request or use default Library collection 

630 if not collection_id: 630 ↛ 633line 630 didn't jump to line 633 because the condition on line 630 was always true

631 collection_id = get_default_library_id(session["username"]) 

632 

633 rag_service = get_rag_service(collection_id) 

634 result = rag_service.remove_document_from_rag( 

635 text_doc_id, collection_id 

636 ) 

637 

638 if result["status"] == "error": 

639 return jsonify( 

640 {"success": False, "error": result.get("error")} 

641 ), 400 

642 

643 return jsonify({"success": True, "result": result}) 

644 except Exception as e: 

645 return handle_api_error(f"removing document {text_doc_id}", e) 

646 

647 

648@rag_bp.route("/api/rag/index-research", methods=["POST"]) 

649@login_required 

650@require_json_body(error_format="success") 

651def index_research(): 

652 """Index all documents from a research.""" 

653 try: 

654 data = request.get_json() 

655 research_id = data.get("research_id") 

656 force_reindex = data.get("force_reindex", False) 

657 

658 if not research_id: 

659 return jsonify( 

660 {"success": False, "error": "research_id is required"} 

661 ), 400 

662 

663 rag_service = get_rag_service() 

664 results = rag_service.index_research_documents( 

665 research_id, force_reindex 

666 ) 

667 

668 return jsonify({"success": True, "results": results}) 

669 except Exception as e: 

670 return handle_api_error(f"indexing research {research_id}", e) 

671 

672 

673@rag_bp.route("/api/rag/index-all", methods=["GET"]) 

674@login_required 

675def index_all(): 

676 """Index all documents in a collection with Server-Sent Events progress.""" 

677 from ...database.session_context import get_user_db_session 

678 from ...database.library_init import get_default_library_id 

679 

680 force_reindex = request.args.get("force_reindex", "false").lower() == "true" 

681 username = session["username"] 

682 

683 # Get collection_id from request or use default Library collection 

684 collection_id = request.args.get("collection_id") 

685 if not collection_id: 

686 collection_id = get_default_library_id(username) 

687 

688 logger.info( 

689 f"Starting index-all for collection_id: {collection_id}, force_reindex: {force_reindex}" 

690 ) 

691 

692 # Create RAG service in request context before generator runs 

693 rag_service = get_rag_service(collection_id) 

694 

695 def generate(): 

696 """Generator function for SSE progress updates.""" 

697 try: 

698 # Send initial status 

699 yield f"data: {json.dumps({'type': 'start', 'message': 'Starting bulk indexing...'})}\n\n" 

700 

701 # Get document IDs to index from DocumentCollection 

702 with get_user_db_session(username) as db_session: 

703 # Query Document joined with DocumentCollection for the collection 

704 query = ( 

705 db_session.query(Document.id, Document.title) 

706 .join( 

707 DocumentCollection, 

708 Document.id == DocumentCollection.document_id, 

709 ) 

710 .filter(DocumentCollection.collection_id == collection_id) 

711 ) 

712 

713 if not force_reindex: 

714 # Only index documents that haven't been indexed yet 

715 query = query.filter(DocumentCollection.indexed.is_(False)) 

716 

717 doc_info = [(doc_id, title) for doc_id, title in query.all()] 

718 

719 if not doc_info: 

720 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n" 

721 return 

722 

723 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []} 

724 total = len(doc_info) 

725 

726 # Process documents in batches to optimize performance 

727 # Get batch size from settings 

728 settings = get_settings_manager() 

729 batch_size = int( 

730 settings.get_setting("rag.indexing_batch_size", 15) 

731 ) 

732 processed = 0 

733 

734 for i in range(0, len(doc_info), batch_size): 

735 batch = doc_info[i : i + batch_size] 

736 

737 # Process batch with collection_id 

738 batch_results = rag_service.index_documents_batch( 

739 batch, collection_id, force_reindex 

740 ) 

741 

742 # Process results and send progress updates 

743 for j, (doc_id, title) in enumerate(batch): 

744 processed += 1 

745 result = batch_results[doc_id] 

746 

747 # Send progress update 

748 yield f"data: {json.dumps({'type': 'progress', 'current': processed, 'total': total, 'title': title, 'percent': int((processed / total) * 100)})}\n\n" 

749 

750 if result["status"] == "success": 

751 results["successful"] += 1 

752 elif result["status"] == "skipped": 

753 results["skipped"] += 1 

754 else: 

755 results["failed"] += 1 

756 results["errors"].append( 

757 { 

758 "doc_id": doc_id, 

759 "title": title, 

760 "error": result.get("error"), 

761 } 

762 ) 

763 

764 # Send completion status 

765 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n" 

766 

767 # Log final status for debugging 

768 logger.info( 

769 f"Bulk indexing complete: {results['successful']} successful, {results['skipped']} skipped, {results['failed']} failed" 

770 ) 

771 

772 except Exception: 

773 logger.exception("Error in bulk indexing") 

774 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n" 

775 

776 return Response( 

777 stream_with_context(generate()), mimetype="text/event-stream" 

778 ) 

779 

780 

781@rag_bp.route("/api/rag/configure", methods=["POST"]) 

782@login_required 

783@require_json_body(error_format="success") 

784def configure_rag(): 

785 """ 

786 Change RAG configuration (embedding model, chunk size, etc.). 

787 This will create a new index with the new configuration. 

788 """ 

789 import json as json_lib 

790 

791 try: 

792 data = request.get_json() 

793 embedding_model = data.get("embedding_model") 

794 embedding_provider = data.get("embedding_provider") 

795 chunk_size = data.get("chunk_size") 

796 chunk_overlap = data.get("chunk_overlap") 

797 collection_id = data.get("collection_id") 

798 

799 # Get new advanced settings (with defaults) 

800 splitter_type = data.get("splitter_type", "recursive") 

801 text_separators = data.get( 

802 "text_separators", ["\n\n", "\n", ". ", " ", ""] 

803 ) 

804 distance_metric = data.get("distance_metric", "cosine") 

805 normalize_vectors = data.get("normalize_vectors", True) 

806 index_type = data.get("index_type", "flat") 

807 

808 if not all( 

809 [ 

810 embedding_model, 

811 embedding_provider, 

812 chunk_size, 

813 chunk_overlap, 

814 ] 

815 ): 

816 return jsonify( 

817 { 

818 "success": False, 

819 "error": "All configuration parameters are required (embedding_model, embedding_provider, chunk_size, chunk_overlap)", 

820 } 

821 ), 400 

822 

823 # Save settings to database 

824 settings = get_settings_manager() 

825 settings.set_setting("local_search_embedding_model", embedding_model) 

826 settings.set_setting( 

827 "local_search_embedding_provider", embedding_provider 

828 ) 

829 settings.set_setting("local_search_chunk_size", int(chunk_size)) 

830 settings.set_setting("local_search_chunk_overlap", int(chunk_overlap)) 

831 

832 # Save new advanced settings 

833 settings.set_setting("local_search_splitter_type", splitter_type) 

834 # Convert list to JSON string for storage 

835 if isinstance(text_separators, list): 

836 text_separators_str = json_lib.dumps(text_separators) 

837 else: 

838 text_separators_str = text_separators 

839 settings.set_setting( 

840 "local_search_text_separators", text_separators_str 

841 ) 

842 settings.set_setting("local_search_distance_metric", distance_metric) 

843 settings.set_setting( 

844 "local_search_normalize_vectors", bool(normalize_vectors) 

845 ) 

846 settings.set_setting("local_search_index_type", index_type) 

847 

848 # If collection_id is provided, update that collection's configuration 

849 if collection_id: 

850 # Create new RAG service with new configuration 

851 with LibraryRAGService( 

852 username=session["username"], 

853 embedding_model=embedding_model, 

854 embedding_provider=embedding_provider, 

855 chunk_size=int(chunk_size), 

856 chunk_overlap=int(chunk_overlap), 

857 splitter_type=splitter_type, 

858 text_separators=text_separators 

859 if isinstance(text_separators, list) 

860 else json_lib.loads(text_separators), 

861 distance_metric=distance_metric, 

862 normalize_vectors=normalize_vectors, 

863 index_type=index_type, 

864 ) as new_rag_service: 

865 # Get or create new index with this configuration 

866 rag_index = new_rag_service._get_or_create_rag_index( 

867 collection_id 

868 ) 

869 

870 return jsonify( 

871 { 

872 "success": True, 

873 "message": "Configuration updated for collection. You can now index documents with the new settings.", 

874 "index_hash": rag_index.index_hash, 

875 } 

876 ) 

877 else: 

878 # Just saving default settings without updating a specific collection 

879 return jsonify( 

880 { 

881 "success": True, 

882 "message": "Default embedding settings saved successfully. New collections will use these settings.", 

883 } 

884 ) 

885 

886 except Exception as e: 

887 return handle_api_error("configuring RAG", e) 

888 

889 

890@rag_bp.route("/api/rag/documents", methods=["GET"]) 

891@login_required 

892def get_documents(): 

893 """Get library documents with their RAG status for the default Library collection (paginated).""" 

894 from ...database.session_context import get_user_db_session 

895 from ...database.library_init import get_default_library_id 

896 

897 try: 

898 # Get pagination parameters 

899 page = request.args.get("page", 1, type=int) 

900 per_page = request.args.get("per_page", 50, type=int) 

901 filter_type = request.args.get( 

902 "filter", "all" 

903 ) # all, indexed, unindexed 

904 

905 # Validate pagination parameters 

906 page = max(1, page) 

907 per_page = min(max(10, per_page), 100) # Limit between 10-100 

908 

909 # Close current thread's session to force fresh connection 

910 from ...database.thread_local_session import cleanup_current_thread 

911 

912 cleanup_current_thread() 

913 

914 username = session["username"] 

915 

916 # Get collection_id from request or use default Library collection 

917 collection_id = request.args.get("collection_id") 

918 if not collection_id: 

919 collection_id = get_default_library_id(username) 

920 

921 logger.info( 

922 f"Getting documents for collection_id: {collection_id}, filter: {filter_type}, page: {page}" 

923 ) 

924 

925 with get_user_db_session(username) as db_session: 

926 # Expire all cached objects to ensure we get fresh data from DB 

927 db_session.expire_all() 

928 

929 # Import RagDocumentStatus model 

930 from ...database.models.library import RagDocumentStatus 

931 

932 # Build base query - join Document with DocumentCollection for the collection 

933 # LEFT JOIN with rag_document_status to check indexed status 

934 query = ( 

935 db_session.query( 

936 Document, DocumentCollection, RagDocumentStatus 

937 ) 

938 .join( 

939 DocumentCollection, 

940 (DocumentCollection.document_id == Document.id) 

941 & (DocumentCollection.collection_id == collection_id), 

942 ) 

943 .outerjoin( 

944 RagDocumentStatus, 

945 (RagDocumentStatus.document_id == Document.id) 

946 & (RagDocumentStatus.collection_id == collection_id), 

947 ) 

948 ) 

949 

950 logger.debug(f"Base query for collection {collection_id}: {query}") 

951 

952 # Apply filters based on rag_document_status existence 

953 if filter_type == "indexed": 

954 query = query.filter(RagDocumentStatus.document_id.isnot(None)) 

955 elif filter_type == "unindexed": 

956 # Documents in collection but not indexed yet 

957 query = query.filter(RagDocumentStatus.document_id.is_(None)) 

958 

959 # Get total count before pagination 

960 total_count = query.count() 

961 logger.info( 

962 f"Found {total_count} total documents for collection {collection_id} with filter {filter_type}" 

963 ) 

964 

965 # Apply pagination 

966 results = ( 

967 query.order_by(Document.created_at.desc()) 

968 .limit(per_page) 

969 .offset((page - 1) * per_page) 

970 .all() 

971 ) 

972 

973 documents = [ 

974 { 

975 "id": doc.id, 

976 "title": doc.title, 

977 "original_url": doc.original_url, 

978 "rag_indexed": rag_status is not None, 

979 "chunk_count": rag_status.chunk_count if rag_status else 0, 

980 "created_at": doc.created_at.isoformat() 

981 if doc.created_at 

982 else None, 

983 } 

984 for doc, doc_collection, rag_status in results 

985 ] 

986 

987 # Debug logging to help diagnose indexing status issues 

988 indexed_count = sum(1 for d in documents if d["rag_indexed"]) 

989 

990 # Additional debug: check rag_document_status for this collection 

991 all_indexed_statuses = ( 

992 db_session.query(RagDocumentStatus) 

993 .filter_by(collection_id=collection_id) 

994 .all() 

995 ) 

996 logger.info( 

997 f"rag_document_status table shows: {len(all_indexed_statuses)} documents indexed for collection {collection_id}" 

998 ) 

999 

1000 logger.info( 

1001 f"Returning {len(documents)} documents on page {page}: " 

1002 f"{indexed_count} indexed, {len(documents) - indexed_count} not indexed" 

1003 ) 

1004 

1005 return jsonify( 

1006 { 

1007 "success": True, 

1008 "documents": documents, 

1009 "pagination": { 

1010 "page": page, 

1011 "per_page": per_page, 

1012 "total": total_count, 

1013 "pages": (total_count + per_page - 1) // per_page, 

1014 }, 

1015 } 

1016 ) 

1017 except Exception as e: 

1018 return handle_api_error("getting documents", e) 

1019 

1020 

1021@rag_bp.route("/api/rag/index-local", methods=["GET"]) 

1022@login_required 

1023def index_local_library(): 

1024 """Index documents from a local folder with Server-Sent Events progress.""" 

1025 folder_path = request.args.get("path") 

1026 file_patterns = request.args.get( 

1027 "patterns", "*.pdf,*.txt,*.md,*.html" 

1028 ).split(",") 

1029 recursive = request.args.get("recursive", "true").lower() == "true" 

1030 

1031 if not folder_path: 

1032 return jsonify({"success": False, "error": "Path is required"}), 400 

1033 

1034 # Validate and sanitize the path to prevent traversal attacks 

1035 try: 

1036 validated_path = PathValidator.validate_local_filesystem_path( 

1037 folder_path 

1038 ) 

1039 # Re-sanitize for static analyzer recognition (CodeQL) 

1040 path = PathValidator.sanitize_for_filesystem_ops(validated_path) 

1041 except ValueError: 

1042 logger.warning(f"Path validation failed for '{folder_path}'") 

1043 return jsonify({"success": False, "error": "Invalid path"}), 400 

1044 

1045 # Check path exists and is a directory 

1046 if not path.exists(): 

1047 return jsonify({"success": False, "error": "Path does not exist"}), 400 

1048 if not path.is_dir(): 1048 ↛ 1054line 1048 didn't jump to line 1054 because the condition on line 1048 was always true

1049 return jsonify( 

1050 {"success": False, "error": "Path is not a directory"} 

1051 ), 400 

1052 

1053 # Create RAG service in request context 

1054 rag_service = get_rag_service() 

1055 

1056 def generate(): 

1057 """Generator function for SSE progress updates.""" 

1058 try: 

1059 # Send initial status 

1060 yield f"data: {json.dumps({'type': 'start', 'message': f'Scanning folder: {path}'})}\n\n" 

1061 

1062 # Find all matching files 

1063 files_to_index = [] 

1064 for pattern in file_patterns: 

1065 pattern = pattern.strip() 

1066 if recursive: 

1067 search_pattern = str(path / "**" / pattern) 

1068 else: 

1069 search_pattern = str(path / pattern) 

1070 

1071 matching_files = glob.glob(search_pattern, recursive=recursive) 

1072 files_to_index.extend(matching_files) 

1073 

1074 # Remove duplicates and sort 

1075 files_to_index = sorted(set(files_to_index)) 

1076 

1077 if not files_to_index: 

1078 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No matching files found'}})}\n\n" 

1079 return 

1080 

1081 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []} 

1082 total = len(files_to_index) 

1083 

1084 # Index each file 

1085 for idx, file_path in enumerate(files_to_index, 1): 

1086 file_name = Path(file_path).name 

1087 

1088 # Send progress update 

1089 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': file_name, 'percent': int((idx / total) * 100)})}\n\n" 

1090 

1091 try: 

1092 # Index the file directly using RAG service 

1093 result = rag_service.index_local_file(file_path) 

1094 

1095 if result.get("status") == "success": 

1096 results["successful"] += 1 

1097 elif result.get("status") == "skipped": 

1098 results["skipped"] += 1 

1099 else: 

1100 results["failed"] += 1 

1101 results["errors"].append( 

1102 { 

1103 "file": file_name, 

1104 "error": result.get("error", "Unknown error"), 

1105 } 

1106 ) 

1107 except Exception: 

1108 results["failed"] += 1 

1109 results["errors"].append( 

1110 {"file": file_name, "error": "Failed to index file"} 

1111 ) 

1112 logger.exception(f"Error indexing file {file_path}") 

1113 

1114 # Send completion status 

1115 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n" 

1116 

1117 logger.info( 

1118 f"Local library indexing complete for {path}: " 

1119 f"{results['successful']} successful, " 

1120 f"{results['skipped']} skipped, " 

1121 f"{results['failed']} failed" 

1122 ) 

1123 

1124 except Exception: 

1125 logger.exception("Error in local library indexing") 

1126 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n" 

1127 

1128 return Response( 

1129 stream_with_context(generate()), mimetype="text/event-stream" 

1130 ) 

1131 

1132 

1133# Collection Management Routes 

1134 

1135 

1136@rag_bp.route("/api/collections", methods=["GET"]) 

1137@login_required 

1138def get_collections(): 

1139 """Get all document collections for the current user.""" 

1140 from ...database.session_context import get_user_db_session 

1141 

1142 try: 

1143 username = session["username"] 

1144 with get_user_db_session(username) as db_session: 

1145 # No need to filter by username - each user has their own database 

1146 collections = db_session.query(Collection).all() 

1147 

1148 result = [] 

1149 for coll in collections: 

1150 collection_data = { 

1151 "id": coll.id, 

1152 "name": coll.name, 

1153 "description": coll.description, 

1154 "created_at": coll.created_at.isoformat() 

1155 if coll.created_at 

1156 else None, 

1157 "collection_type": coll.collection_type, 

1158 "is_default": coll.is_default 

1159 if hasattr(coll, "is_default") 

1160 else False, 

1161 "document_count": len(coll.document_links) 

1162 if hasattr(coll, "document_links") 

1163 else 0, 

1164 "folder_count": len(coll.linked_folders) 

1165 if hasattr(coll, "linked_folders") 

1166 else 0, 

1167 } 

1168 

1169 # Include embedding metadata if available 

1170 if coll.embedding_model: 

1171 collection_data["embedding"] = { 

1172 "model": coll.embedding_model, 

1173 "provider": coll.embedding_model_type.value 

1174 if coll.embedding_model_type 

1175 else None, 

1176 "dimension": coll.embedding_dimension, 

1177 "chunk_size": coll.chunk_size, 

1178 "chunk_overlap": coll.chunk_overlap, 

1179 } 

1180 else: 

1181 collection_data["embedding"] = None 

1182 

1183 result.append(collection_data) 

1184 

1185 return jsonify({"success": True, "collections": result}) 

1186 except Exception as e: 

1187 return handle_api_error("getting collections", e) 

1188 

1189 

1190@rag_bp.route("/api/collections", methods=["POST"]) 

1191@login_required 

1192@require_json_body(error_format="success") 

1193def create_collection(): 

1194 """Create a new document collection.""" 

1195 from ...database.session_context import get_user_db_session 

1196 

1197 try: 

1198 data = request.get_json() 

1199 name = data.get("name", "").strip() 

1200 description = data.get("description", "").strip() 

1201 collection_type = data.get("type", "user_uploads") 

1202 

1203 if not name: 

1204 return jsonify({"success": False, "error": "Name is required"}), 400 

1205 

1206 username = session["username"] 

1207 with get_user_db_session(username) as db_session: 

1208 # Check if collection with this name already exists in this user's database 

1209 existing = db_session.query(Collection).filter_by(name=name).first() 

1210 

1211 if existing: 

1212 return jsonify( 

1213 { 

1214 "success": False, 

1215 "error": f"Collection '{name}' already exists", 

1216 } 

1217 ), 400 

1218 

1219 # Create new collection (no username needed - each user has their own DB) 

1220 # Note: created_at uses default=utcnow() in the model, so we don't need to set it manually 

1221 collection = Collection( 

1222 id=str(uuid.uuid4()), # Generate UUID for collection 

1223 name=name, 

1224 description=description, 

1225 collection_type=collection_type, 

1226 ) 

1227 

1228 db_session.add(collection) 

1229 db_session.commit() 

1230 

1231 return jsonify( 

1232 { 

1233 "success": True, 

1234 "collection": { 

1235 "id": collection.id, 

1236 "name": collection.name, 

1237 "description": collection.description, 

1238 "created_at": collection.created_at.isoformat(), 

1239 "collection_type": collection.collection_type, 

1240 }, 

1241 } 

1242 ) 

1243 except Exception as e: 

1244 return handle_api_error("creating collection", e) 

1245 

1246 

1247@rag_bp.route("/api/collections/<string:collection_id>", methods=["PUT"]) 

1248@login_required 

1249@require_json_body(error_format="success") 

1250def update_collection(collection_id): 

1251 """Update a collection's details.""" 

1252 from ...database.session_context import get_user_db_session 

1253 

1254 try: 

1255 data = request.get_json() 

1256 name = data.get("name", "").strip() 

1257 description = data.get("description", "").strip() 

1258 

1259 username = session["username"] 

1260 with get_user_db_session(username) as db_session: 

1261 # No need to filter by username - each user has their own database 

1262 collection = ( 

1263 db_session.query(Collection).filter_by(id=collection_id).first() 

1264 ) 

1265 

1266 if not collection: 

1267 return jsonify( 

1268 {"success": False, "error": "Collection not found"} 

1269 ), 404 

1270 

1271 if name: 

1272 # Check if new name conflicts with existing collection 

1273 existing = ( 

1274 db_session.query(Collection) 

1275 .filter( 

1276 Collection.name == name, 

1277 Collection.id != collection_id, 

1278 ) 

1279 .first() 

1280 ) 

1281 

1282 if existing: 

1283 return jsonify( 

1284 { 

1285 "success": False, 

1286 "error": f"Collection '{name}' already exists", 

1287 } 

1288 ), 400 

1289 

1290 collection.name = name 

1291 

1292 if description is not None: # Allow empty description 1292 ↛ 1295line 1292 didn't jump to line 1295 because the condition on line 1292 was always true

1293 collection.description = description 

1294 

1295 db_session.commit() 

1296 

1297 return jsonify( 

1298 { 

1299 "success": True, 

1300 "collection": { 

1301 "id": collection.id, 

1302 "name": collection.name, 

1303 "description": collection.description, 

1304 "created_at": collection.created_at.isoformat() 

1305 if collection.created_at 

1306 else None, 

1307 "collection_type": collection.collection_type, 

1308 }, 

1309 } 

1310 ) 

1311 except Exception as e: 

1312 return handle_api_error("updating collection", e) 

1313 

1314 

1315@rag_bp.route( 

1316 "/api/collections/<string:collection_id>/upload", methods=["POST"] 

1317) 

1318@login_required 

1319@upload_rate_limit_user 

1320@upload_rate_limit_ip 

1321def upload_to_collection(collection_id): 

1322 """Upload files to a collection.""" 

1323 from ...database.session_context import get_user_db_session 

1324 from ...security import sanitize_filename, UnsafeFilenameError 

1325 from pathlib import Path 

1326 import hashlib 

1327 import uuid 

1328 from ..services.pdf_storage_manager import PDFStorageManager 

1329 

1330 try: 

1331 if "files" not in request.files: 

1332 return jsonify( 

1333 {"success": False, "error": "No files provided"} 

1334 ), 400 

1335 

1336 files = request.files.getlist("files") 

1337 if not files: 1337 ↛ 1338line 1337 didn't jump to line 1338 because the condition on line 1337 was never true

1338 return jsonify( 

1339 {"success": False, "error": "No files selected"} 

1340 ), 400 

1341 

1342 username = session["username"] 

1343 with get_user_db_session(username) as db_session: 

1344 # Verify collection exists in this user's database 

1345 collection = ( 

1346 db_session.query(Collection).filter_by(id=collection_id).first() 

1347 ) 

1348 

1349 if not collection: 

1350 return jsonify( 

1351 {"success": False, "error": "Collection not found"} 

1352 ), 404 

1353 

1354 # Get PDF storage mode from form data, falling back to user's setting 

1355 settings = get_settings_manager() 

1356 default_pdf_storage = settings.get_setting( 

1357 "research_library.upload_pdf_storage", "none" 

1358 ) 

1359 pdf_storage = request.form.get("pdf_storage", default_pdf_storage) 

1360 if pdf_storage not in ("database", "none"): 1360 ↛ 1363line 1360 didn't jump to line 1363 because the condition on line 1360 was never true

1361 # Security: user uploads can only use database (encrypted) or none (text-only) 

1362 # Filesystem storage is not allowed for user uploads 

1363 pdf_storage = "none" 

1364 

1365 # Initialize PDF storage manager if storing PDFs in database 

1366 pdf_storage_manager = None 

1367 if pdf_storage == "database": 

1368 library_root = settings.get_setting( 

1369 "research_library.storage_path", 

1370 str(get_library_directory()), 

1371 ) 

1372 library_root = str( 

1373 Path(os.path.expandvars(library_root)) 

1374 .expanduser() 

1375 .resolve() 

1376 ) 

1377 pdf_storage_manager = PDFStorageManager( 

1378 library_root=Path(library_root), storage_mode="database" 

1379 ) 

1380 logger.info("PDF storage mode: database (encrypted)") 

1381 else: 

1382 logger.info("PDF storage mode: none (text-only)") 

1383 

1384 uploaded_files = [] 

1385 errors = [] 

1386 

1387 for file in files: 

1388 if not file.filename: 

1389 continue 

1390 

1391 try: 

1392 filename = sanitize_filename(file.filename) 

1393 except UnsafeFilenameError: 

1394 errors.append( 

1395 { 

1396 "filename": "rejected", 

1397 "error": "Invalid or unsafe filename", 

1398 } 

1399 ) 

1400 continue 

1401 

1402 try: 

1403 # Read file content 

1404 file_content = file.read() 

1405 file.seek(0) # Reset for potential re-reading 

1406 

1407 # Calculate file hash for deduplication 

1408 file_hash = hashlib.sha256(file_content).hexdigest() 

1409 

1410 # Check if document already exists 

1411 existing_doc = ( 

1412 db_session.query(Document) 

1413 .filter_by(document_hash=file_hash) 

1414 .first() 

1415 ) 

1416 

1417 if existing_doc: 

1418 # Document exists, check if we can upgrade to include PDF 

1419 pdf_upgraded = False 

1420 if ( 

1421 pdf_storage == "database" 

1422 and pdf_storage_manager is not None 

1423 ): 

1424 # NOTE: Only the PDF magic-byte check is needed here. 

1425 # File count validation is already handled by Flask's MAX_CONTENT_LENGTH. 

1426 # Filename sanitization already happens via sanitize_filename() above. 

1427 # See PR #3145 review for details. 

1428 if file_content[:4] != b"%PDF": 1428 ↛ 1429line 1428 didn't jump to line 1429 because the condition on line 1428 was never true

1429 logger.debug( 

1430 "Skipping PDF upgrade for {}: not a PDF file", 

1431 filename, 

1432 ) 

1433 else: 

1434 pdf_upgraded = ( 

1435 pdf_storage_manager.upgrade_to_pdf( 

1436 document=existing_doc, 

1437 pdf_content=file_content, 

1438 session=db_session, 

1439 ) 

1440 ) 

1441 

1442 # Check if already in collection 

1443 existing_link = ( 

1444 db_session.query(DocumentCollection) 

1445 .filter_by( 

1446 document_id=existing_doc.id, 

1447 collection_id=collection_id, 

1448 ) 

1449 .first() 

1450 ) 

1451 

1452 if not existing_link: 

1453 # Add to collection 

1454 collection_link = DocumentCollection( 

1455 document_id=existing_doc.id, 

1456 collection_id=collection_id, 

1457 indexed=False, 

1458 chunk_count=0, 

1459 ) 

1460 db_session.add(collection_link) 

1461 status = "added_to_collection" 

1462 if pdf_upgraded: 

1463 status = "added_to_collection_pdf_upgraded" 

1464 uploaded_files.append( 

1465 { 

1466 "filename": existing_doc.filename, 

1467 "status": status, 

1468 "id": existing_doc.id, 

1469 "pdf_upgraded": pdf_upgraded, 

1470 } 

1471 ) 

1472 else: 

1473 status = "already_in_collection" 

1474 if pdf_upgraded: 

1475 status = "pdf_upgraded" 

1476 uploaded_files.append( 

1477 { 

1478 "filename": existing_doc.filename, 

1479 "status": status, 

1480 "id": existing_doc.id, 

1481 "pdf_upgraded": pdf_upgraded, 

1482 } 

1483 ) 

1484 else: 

1485 # Create new document 

1486 from ...document_loaders import ( 

1487 extract_text_from_bytes, 

1488 is_extension_supported, 

1489 ) 

1490 

1491 file_extension = Path(filename).suffix.lower() 

1492 

1493 # Validate extension is supported before extraction 

1494 if not is_extension_supported(file_extension): 

1495 errors.append( 

1496 { 

1497 "filename": filename, 

1498 "error": f"Unsupported format: {file_extension}", 

1499 } 

1500 ) 

1501 continue 

1502 

1503 # Use file_type without leading dot for storage 

1504 file_type = ( 

1505 file_extension[1:] 

1506 if file_extension.startswith(".") 

1507 else file_extension 

1508 ) 

1509 

1510 # Extract text using document_loaders module 

1511 extracted_text = extract_text_from_bytes( 

1512 file_content, file_extension, filename 

1513 ) 

1514 

1515 # Clean the extracted text to remove surrogate characters 

1516 if extracted_text: 

1517 from ...text_processing import remove_surrogates 

1518 

1519 extracted_text = remove_surrogates(extracted_text) 

1520 

1521 if not extracted_text: 

1522 errors.append( 

1523 { 

1524 "filename": filename, 

1525 "error": f"Could not extract text from {file_type} file", 

1526 } 

1527 ) 

1528 logger.warning( 

1529 f"Skipping file {filename} - no text could be extracted" 

1530 ) 

1531 continue 

1532 

1533 # Get or create the user_upload source type 

1534 logger.info( 

1535 f"Getting or creating user_upload source type for {filename}" 

1536 ) 

1537 source_type = ( 

1538 db_session.query(SourceType) 

1539 .filter_by(name="user_upload") 

1540 .first() 

1541 ) 

1542 if not source_type: 1542 ↛ 1543line 1542 didn't jump to line 1543 because the condition on line 1542 was never true

1543 logger.info("Creating new user_upload source type") 

1544 source_type = SourceType( 

1545 id=str(uuid.uuid4()), 

1546 name="user_upload", 

1547 display_name="User Upload", 

1548 description="Documents uploaded by users", 

1549 icon="fas fa-upload", 

1550 ) 

1551 db_session.add(source_type) 

1552 db_session.flush() 

1553 logger.info( 

1554 f"Created source type with ID: {source_type.id}" 

1555 ) 

1556 else: 

1557 logger.info( 

1558 f"Found existing source type with ID: {source_type.id}" 

1559 ) 

1560 

1561 # Create document with extracted text (no username needed - in user's own database) 

1562 # Note: uploaded_at uses default=utcnow() in the model, so we don't need to set it manually 

1563 doc_id = str(uuid.uuid4()) 

1564 logger.info( 

1565 f"Creating document {doc_id} for {filename}" 

1566 ) 

1567 

1568 # Determine storage mode and file_path 

1569 store_pdf_in_db = ( 

1570 pdf_storage == "database" 

1571 and file_type == "pdf" 

1572 and pdf_storage_manager is not None 

1573 ) 

1574 

1575 new_doc = Document( 

1576 id=doc_id, 

1577 source_type_id=source_type.id, 

1578 filename=filename, 

1579 document_hash=file_hash, 

1580 file_size=len(file_content), 

1581 file_type=file_type, 

1582 text_content=extracted_text, # Always store extracted text 

1583 file_path=None 

1584 if store_pdf_in_db 

1585 else FILE_PATH_TEXT_ONLY, 

1586 storage_mode="database" 

1587 if store_pdf_in_db 

1588 else "none", 

1589 ) 

1590 db_session.add(new_doc) 

1591 db_session.flush() # Get the ID 

1592 logger.info( 

1593 f"Document {new_doc.id} created successfully" 

1594 ) 

1595 

1596 # Store PDF in encrypted database if requested 

1597 pdf_stored = False 

1598 if store_pdf_in_db: 

1599 try: 

1600 pdf_storage_manager.save_pdf( 

1601 pdf_content=file_content, 

1602 document=new_doc, 

1603 session=db_session, 

1604 filename=filename, 

1605 ) 

1606 pdf_stored = True 

1607 logger.info( 

1608 f"PDF stored in encrypted database for {filename}" 

1609 ) 

1610 except Exception: 

1611 logger.exception( 

1612 f"Failed to store PDF in database for {filename}" 

1613 ) 

1614 # Continue without PDF storage - text is still saved 

1615 

1616 # Add to collection 

1617 collection_link = DocumentCollection( 

1618 document_id=new_doc.id, 

1619 collection_id=collection_id, 

1620 indexed=False, 

1621 chunk_count=0, 

1622 ) 

1623 db_session.add(collection_link) 

1624 

1625 uploaded_files.append( 

1626 { 

1627 "filename": filename, 

1628 "status": "uploaded", 

1629 "id": new_doc.id, 

1630 "text_length": len(extracted_text), 

1631 "pdf_stored": pdf_stored, 

1632 } 

1633 ) 

1634 

1635 except Exception: 

1636 errors.append( 

1637 { 

1638 "filename": filename, 

1639 "error": "Failed to upload file", 

1640 } 

1641 ) 

1642 logger.exception(f"Error uploading file {filename}") 

1643 

1644 db_session.commit() 

1645 

1646 # Trigger auto-indexing for successfully uploaded documents 

1647 document_ids = [ 

1648 f["id"] 

1649 for f in uploaded_files 

1650 if f.get("status") in ("uploaded", "added_to_collection") 

1651 ] 

1652 if document_ids: 

1653 from ...database.session_passwords import session_password_store 

1654 

1655 session_id = session.get("session_id") 

1656 db_password = session_password_store.get_session_password( 

1657 username, session_id 

1658 ) 

1659 if db_password: 

1660 trigger_auto_index( 

1661 document_ids, collection_id, username, db_password 

1662 ) 

1663 

1664 return jsonify( 

1665 { 

1666 "success": True, 

1667 "uploaded": uploaded_files, 

1668 "errors": errors, 

1669 "summary": { 

1670 "total": len(files), 

1671 "successful": len(uploaded_files), 

1672 "failed": len(errors), 

1673 }, 

1674 } 

1675 ) 

1676 

1677 except Exception as e: 

1678 return handle_api_error("uploading files", e) 

1679 

1680 

1681@rag_bp.route( 

1682 "/api/collections/<string:collection_id>/documents", methods=["GET"] 

1683) 

1684@login_required 

1685def get_collection_documents(collection_id): 

1686 """Get all documents in a collection.""" 

1687 from ...database.session_context import get_user_db_session 

1688 

1689 try: 

1690 username = session["username"] 

1691 with get_user_db_session(username) as db_session: 

1692 # Verify collection exists in this user's database 

1693 collection = ( 

1694 db_session.query(Collection).filter_by(id=collection_id).first() 

1695 ) 

1696 

1697 if not collection: 

1698 return jsonify( 

1699 {"success": False, "error": "Collection not found"} 

1700 ), 404 

1701 

1702 # Get documents through junction table 

1703 doc_links = ( 

1704 db_session.query(DocumentCollection, Document) 

1705 .join(Document) 

1706 .filter(DocumentCollection.collection_id == collection_id) 

1707 .all() 

1708 ) 

1709 

1710 documents = [] 

1711 for link, doc in doc_links: 

1712 # Check if PDF file is stored 

1713 has_pdf = bool( 

1714 doc.file_path and doc.file_path not in FILE_PATH_SENTINELS 

1715 ) 

1716 has_text_db = bool(doc.text_content) 

1717 

1718 # Use title if available, otherwise filename 

1719 display_title = doc.title or doc.filename or "Untitled" 

1720 

1721 # Get source type name 

1722 source_type_name = ( 

1723 doc.source_type.name if doc.source_type else "unknown" 

1724 ) 

1725 

1726 # Check if document is in other collections 

1727 other_collections_count = ( 

1728 db_session.query(DocumentCollection) 

1729 .filter( 

1730 DocumentCollection.document_id == doc.id, 

1731 DocumentCollection.collection_id != collection_id, 

1732 ) 

1733 .count() 

1734 ) 

1735 

1736 documents.append( 

1737 { 

1738 "id": doc.id, 

1739 "filename": display_title, 

1740 "title": display_title, 

1741 "file_type": doc.file_type, 

1742 "file_size": doc.file_size, 

1743 "uploaded_at": doc.created_at.isoformat() 

1744 if doc.created_at 

1745 else None, 

1746 "indexed": link.indexed, 

1747 "chunk_count": link.chunk_count, 

1748 "last_indexed_at": link.last_indexed_at.isoformat() 

1749 if link.last_indexed_at 

1750 else None, 

1751 "has_pdf": has_pdf, 

1752 "has_text_db": has_text_db, 

1753 "source_type": source_type_name, 

1754 "in_other_collections": other_collections_count > 0, 

1755 "other_collections_count": other_collections_count, 

1756 } 

1757 ) 

1758 

1759 # Get index file size if available 

1760 index_file_size = None 

1761 index_file_size_bytes = None 

1762 collection_name = f"collection_{collection_id}" 

1763 rag_index = ( 

1764 db_session.query(RAGIndex) 

1765 .filter_by(collection_name=collection_name) 

1766 .first() 

1767 ) 

1768 if rag_index and rag_index.index_path: 

1769 from pathlib import Path 

1770 

1771 index_path = Path(rag_index.index_path) 

1772 if index_path.exists(): 1772 ↛ 1783line 1772 didn't jump to line 1783 because the condition on line 1772 was always true

1773 size_bytes = index_path.stat().st_size 

1774 index_file_size_bytes = size_bytes 

1775 # Format as human-readable 

1776 if size_bytes < 1024: 

1777 index_file_size = f"{size_bytes} B" 

1778 elif size_bytes < 1024 * 1024: 

1779 index_file_size = f"{size_bytes / 1024:.1f} KB" 

1780 else: 

1781 index_file_size = f"{size_bytes / (1024 * 1024):.1f} MB" 

1782 

1783 return jsonify( 

1784 { 

1785 "success": True, 

1786 "collection": { 

1787 "id": collection.id, 

1788 "name": collection.name, 

1789 "description": collection.description, 

1790 "embedding_model": collection.embedding_model, 

1791 "embedding_model_type": collection.embedding_model_type.value 

1792 if collection.embedding_model_type 

1793 else None, 

1794 "embedding_dimension": collection.embedding_dimension, 

1795 "chunk_size": collection.chunk_size, 

1796 "chunk_overlap": collection.chunk_overlap, 

1797 # Advanced settings 

1798 "splitter_type": collection.splitter_type, 

1799 "distance_metric": collection.distance_metric, 

1800 "index_type": collection.index_type, 

1801 "normalize_vectors": collection.normalize_vectors, 

1802 # Index file info 

1803 "index_file_size": index_file_size, 

1804 "index_file_size_bytes": index_file_size_bytes, 

1805 "collection_type": collection.collection_type, 

1806 }, 

1807 "documents": documents, 

1808 } 

1809 ) 

1810 

1811 except Exception as e: 

1812 return handle_api_error("getting collection documents", e) 

1813 

1814 

1815@rag_bp.route("/api/collections/<string:collection_id>/index", methods=["GET"]) 

1816@login_required 

1817def index_collection(collection_id): 

1818 """Index all documents in a collection with Server-Sent Events progress.""" 

1819 from ...database.session_context import get_user_db_session 

1820 from ...database.session_passwords import session_password_store 

1821 

1822 force_reindex = request.args.get("force_reindex", "false").lower() == "true" 

1823 username = session["username"] 

1824 session_id = session.get("session_id") 

1825 

1826 logger.info(f"Starting index_collection, force_reindex={force_reindex}") 

1827 

1828 # Get password for thread access to encrypted database 

1829 db_password = None 

1830 if session_id: 1830 ↛ 1836line 1830 didn't jump to line 1836 because the condition on line 1830 was always true

1831 db_password = session_password_store.get_session_password( 

1832 username, session_id 

1833 ) 

1834 

1835 # Create RAG service — on force reindex use current default model 

1836 rag_service = get_rag_service(collection_id, use_defaults=force_reindex) 

1837 logger.info( 

1838 f"RAG service created: provider={rag_service.embedding_provider}" 

1839 ) 

1840 

1841 def generate(): 

1842 """Generator for SSE progress updates.""" 

1843 logger.info("SSE generator started") 

1844 try: 

1845 with get_user_db_session(username, db_password) as db_session: 

1846 # Verify collection exists in this user's database 

1847 collection = ( 

1848 db_session.query(Collection) 

1849 .filter_by(id=collection_id) 

1850 .first() 

1851 ) 

1852 

1853 if not collection: 

1854 yield f"data: {json.dumps({'type': 'error', 'error': 'Collection not found'})}\n\n" 

1855 return 

1856 

1857 # Store embedding metadata on first index or force reindex 

1858 if collection.embedding_model is None or force_reindex: 

1859 # Get embedding dimension from the embedding manager 

1860 embedding_dim = None 

1861 try: 

1862 # Try to get dimension from the embedding manager's provider 

1863 if hasattr(rag_service.embedding_manager, "provider"): 1863 ↛ 1864line 1863 didn't jump to line 1864 because the condition on line 1863 was never true

1864 provider = rag_service.embedding_manager.provider 

1865 if hasattr(provider, "embedding_dimension"): 

1866 embedding_dim = provider.embedding_dimension 

1867 except Exception: 

1868 logger.warning("Could not get embedding dimension") 

1869 

1870 # Store the embedding configuration used for this collection 

1871 collection.embedding_model = rag_service.embedding_model 

1872 collection.embedding_model_type = EmbeddingProvider( 

1873 rag_service.embedding_provider 

1874 ) 

1875 collection.embedding_dimension = embedding_dim 

1876 collection.chunk_size = rag_service.chunk_size 

1877 collection.chunk_overlap = rag_service.chunk_overlap 

1878 # Store advanced settings 

1879 collection.splitter_type = rag_service.splitter_type 

1880 collection.text_separators = rag_service.text_separators 

1881 collection.distance_metric = rag_service.distance_metric 

1882 # Ensure normalize_vectors is a proper boolean for database 

1883 collection.normalize_vectors = bool( 

1884 rag_service.normalize_vectors 

1885 ) 

1886 collection.index_type = rag_service.index_type 

1887 db_session.commit() 

1888 logger.info( 

1889 f"Stored embedding metadata for collection: provider={rag_service.embedding_provider}" 

1890 ) 

1891 

1892 # Get documents to index 

1893 query = ( 

1894 db_session.query(DocumentCollection, Document) 

1895 .join(Document) 

1896 .filter(DocumentCollection.collection_id == collection_id) 

1897 ) 

1898 

1899 if not force_reindex: 1899 ↛ 1908line 1899 didn't jump to line 1908 because the condition on line 1899 was always true

1900 # IMPORTANT: DO NOT CHANGE THIS LINE! DO NOT REMOVE THIS COMMENT! 

1901 # We MUST use `== False` instead of `not DocumentCollection.indexed` 

1902 # The Python `not` operator does NOT work correctly in SQLAlchemy filters. 

1903 # Using `not` will cause the query to return NO results (zero documents). 

1904 # SQLAlchemy requires explicit comparison: `== False` or `== True` 

1905 # This has been fixed multiple times - DO NOT change it back to `not`! 

1906 query = query.filter(DocumentCollection.indexed == False) # noqa: E712 

1907 

1908 doc_links = query.all() 

1909 

1910 if not doc_links: 

1911 logger.info("No documents to index in collection") 

1912 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n" 

1913 return 

1914 

1915 total = len(doc_links) 

1916 logger.info(f"Found {total} documents to index") 

1917 results = { 

1918 "successful": 0, 

1919 "skipped": 0, 

1920 "failed": 0, 

1921 "errors": [], 

1922 } 

1923 

1924 yield f"data: {json.dumps({'type': 'start', 'message': f'Indexing {total} documents in collection: {collection.name}'})}\n\n" 

1925 

1926 for idx, (link, doc) in enumerate(doc_links, 1): 

1927 filename = doc.filename or doc.title or "Unknown" 

1928 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': filename, 'percent': int((idx / total) * 100)})}\n\n" 

1929 

1930 try: 

1931 logger.debug( 

1932 f"Indexing document {idx}/{total}: {filename}" 

1933 ) 

1934 

1935 # Run index_document in a separate thread to allow sending SSE heartbeats. 

1936 # This keeps the HTTP connection alive during long indexing operations, 

1937 # preventing timeouts from proxy servers (nginx) and browsers. 

1938 # The main thread periodically yields heartbeat comments while waiting. 

1939 result_queue = queue.Queue() 

1940 error_queue = queue.Queue() 

1941 

1942 def index_in_thread(): 

1943 try: 

1944 r = rag_service.index_document( 

1945 document_id=doc.id, 

1946 collection_id=collection_id, 

1947 force_reindex=force_reindex, 

1948 ) 

1949 result_queue.put(r) 

1950 except Exception as ex: 

1951 error_queue.put(ex) 

1952 finally: 

1953 try: 

1954 from ...database.thread_local_session import ( 

1955 cleanup_current_thread, 

1956 ) 

1957 

1958 cleanup_current_thread() 

1959 except Exception: 

1960 logger.debug( 

1961 "best-effort thread-local DB session cleanup", 

1962 exc_info=True, 

1963 ) 

1964 

1965 thread = threading.Thread(target=index_in_thread) 

1966 thread.start() 

1967 

1968 # Send heartbeats while waiting for the thread to complete 

1969 heartbeat_interval = 5 # seconds 

1970 while thread.is_alive(): 1970 ↛ 1971line 1970 didn't jump to line 1971 because the condition on line 1970 was never true

1971 thread.join(timeout=heartbeat_interval) 

1972 if thread.is_alive(): 

1973 # Send SSE comment as heartbeat (keeps connection alive) 

1974 yield f": heartbeat {idx}/{total}\n\n" 

1975 

1976 # Check for errors from thread 

1977 if not error_queue.empty(): 

1978 raise error_queue.get() # noqa: TRY301 — re-raises thread exception for per-document error handling 

1979 

1980 result = result_queue.get() 

1981 logger.info( 

1982 f"Indexed document {idx}/{total}: {filename} - status={result.get('status')}" 

1983 ) 

1984 

1985 if result.get("status") == "success": 

1986 results["successful"] += 1 

1987 # DocumentCollection status is already updated in index_document 

1988 # No need to update link here 

1989 elif result.get("status") == "skipped": 1989 ↛ 1992line 1989 didn't jump to line 1992 because the condition on line 1989 was always true

1990 results["skipped"] += 1 

1991 else: 

1992 results["failed"] += 1 

1993 error_msg = result.get("error", "Unknown error") 

1994 results["errors"].append( 

1995 { 

1996 "filename": filename, 

1997 "error": error_msg, 

1998 } 

1999 ) 

2000 logger.warning( 

2001 f"Failed to index {filename} ({idx}/{total}): {error_msg}" 

2002 ) 

2003 except Exception as e: 

2004 results["failed"] += 1 

2005 error_msg = str(e) or "Failed to index document" 

2006 results["errors"].append( 

2007 { 

2008 "filename": filename, 

2009 "error": error_msg, 

2010 } 

2011 ) 

2012 logger.exception( 

2013 f"Exception indexing document {filename} ({idx}/{total})" 

2014 ) 

2015 # Send error update to client so they know indexing is continuing 

2016 yield f"data: {json.dumps({'type': 'doc_error', 'filename': filename, 'error': error_msg})}\n\n" 

2017 

2018 db_session.commit() 

2019 # Ensure all changes are written to disk 

2020 db_session.flush() 

2021 

2022 logger.info( 

2023 f"Indexing complete: {results['successful']} successful, {results['failed']} failed, {results['skipped']} skipped" 

2024 ) 

2025 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n" 

2026 logger.info("SSE generator finished successfully") 

2027 

2028 except Exception: 

2029 logger.exception("Error in collection indexing") 

2030 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n" 

2031 

2032 response = Response( 

2033 stream_with_context(generate()), mimetype="text/event-stream" 

2034 ) 

2035 # Prevent buffering for proper SSE streaming 

2036 response.headers["Cache-Control"] = "no-cache, no-transform" 

2037 response.headers["Connection"] = "keep-alive" 

2038 response.headers["X-Accel-Buffering"] = "no" 

2039 return response 

2040 

2041 

2042# ============================================================================= 

2043# Background Indexing Endpoints 

2044# ============================================================================= 

2045 

2046 

2047def _get_rag_service_for_thread( 

2048 collection_id: str, 

2049 username: str, 

2050 db_password: str, 

2051 use_defaults: bool = False, 

2052) -> LibraryRAGService: 

2053 """ 

2054 Create RAG service for use in background threads (no Flask context). 

2055 

2056 Delegates settings resolution to the shared rag_service_factory, then 

2057 propagates db_password to the embedding manager for thread-safe DB access. 

2058 """ 

2059 from ..services.rag_service_factory import ( 

2060 get_rag_service as _get_rag_service, 

2061 ) 

2062 

2063 service = _get_rag_service( 

2064 username, 

2065 collection_id, 

2066 use_defaults=use_defaults, 

2067 db_password=db_password, 

2068 ) 

2069 # The factory passes db_password to LibraryRAGService, but __init__ stores 

2070 # it in the backing field (_db_password) without propagating to sub-managers. 

2071 # Re-assign via the property setter to propagate to embedding_manager and 

2072 # integrity_manager, which need it for thread-safe session access. 

2073 service.db_password = db_password 

2074 return service 

2075 

2076 

2077def trigger_auto_index( 

2078 document_ids: list[str], 

2079 collection_id: str, 

2080 username: str, 

2081 db_password: str, 

2082) -> None: 

2083 """ 

2084 Trigger automatic RAG indexing for documents if auto-indexing is enabled. 

2085 

2086 This function checks the auto_index_enabled setting and spawns a background 

2087 thread to index the specified documents. It does not block the caller. 

2088 

2089 Args: 

2090 document_ids: List of document IDs to index 

2091 collection_id: The collection to index into 

2092 username: The username for database access 

2093 db_password: The user's database password for thread-safe access 

2094 """ 

2095 from ...database.session_context import get_user_db_session 

2096 

2097 if not document_ids: 

2098 logger.debug("No documents to auto-index") 

2099 return 

2100 

2101 # Check if auto-indexing is enabled 

2102 try: 

2103 with get_user_db_session(username, db_password) as db_session: 

2104 settings = SettingsManager(db_session) 

2105 auto_index_enabled = settings.get_bool_setting( 

2106 "research_library.auto_index_enabled", True 

2107 ) 

2108 

2109 if not auto_index_enabled: 

2110 logger.debug("Auto-indexing is disabled, skipping") 

2111 return 

2112 except Exception: 

2113 logger.exception( 

2114 "Failed to check auto-index setting, skipping auto-index" 

2115 ) 

2116 return 

2117 

2118 logger.info( 

2119 f"Auto-indexing {len(document_ids)} documents in collection {collection_id}" 

2120 ) 

2121 

2122 # Submit to thread pool (bounded concurrency, prevents thread proliferation) 

2123 executor = _get_auto_index_executor() 

2124 executor.submit( 

2125 _auto_index_documents_worker, 

2126 document_ids, 

2127 collection_id, 

2128 username, 

2129 db_password, 

2130 ) 

2131 

2132 

2133@thread_cleanup 

2134def _auto_index_documents_worker( 

2135 document_ids: list[str], 

2136 collection_id: str, 

2137 username: str, 

2138 db_password: str, 

2139) -> None: 

2140 """ 

2141 Background worker to index documents automatically. 

2142 

2143 This is a simpler worker than _background_index_worker - it doesn't track 

2144 progress via TaskMetadata since it's meant to be a lightweight auto-indexing 

2145 operation. 

2146 """ 

2147 

2148 try: 

2149 # Create RAG service (thread-safe, no Flask context needed) 

2150 with _get_rag_service_for_thread( 

2151 collection_id, username, db_password 

2152 ) as rag_service: 

2153 indexed_count = 0 

2154 for doc_id in document_ids: 

2155 try: 

2156 result = rag_service.index_document( 

2157 doc_id, collection_id, force_reindex=False 

2158 ) 

2159 if result.get("status") == "success": 

2160 indexed_count += 1 

2161 logger.debug(f"Auto-indexed document {doc_id}") 

2162 elif result.get("status") == "skipped": 2162 ↛ 2154line 2162 didn't jump to line 2154 because the condition on line 2162 was always true

2163 logger.debug( 

2164 f"Document {doc_id} already indexed, skipped" 

2165 ) 

2166 except Exception: 

2167 logger.exception(f"Failed to auto-index document {doc_id}") 

2168 

2169 logger.info( 

2170 f"Auto-indexing complete: {indexed_count}/{len(document_ids)} documents indexed" 

2171 ) 

2172 

2173 except Exception: 

2174 logger.exception("Auto-indexing worker failed") 

2175 

2176 

2177@thread_cleanup 

2178def _background_index_worker( 

2179 task_id: str, 

2180 collection_id: str, 

2181 username: str, 

2182 db_password: str, 

2183 force_reindex: bool, 

2184): 

2185 """ 

2186 Background worker thread for indexing documents. 

2187 Updates TaskMetadata with progress and checks for cancellation. 

2188 """ 

2189 from ...database.session_context import get_user_db_session 

2190 

2191 try: 

2192 # Create RAG service (thread-safe, no Flask context needed) 

2193 with _get_rag_service_for_thread( 

2194 collection_id, username, db_password, use_defaults=force_reindex 

2195 ) as rag_service: 

2196 with get_user_db_session(username, db_password) as db_session: 

2197 # Get collection 

2198 collection = ( 

2199 db_session.query(Collection) 

2200 .filter_by(id=collection_id) 

2201 .first() 

2202 ) 

2203 

2204 if not collection: 

2205 _update_task_status( 

2206 username, 

2207 db_password, 

2208 task_id, 

2209 status="failed", 

2210 error_message="Collection not found", 

2211 ) 

2212 return 

2213 

2214 # Store embedding metadata on first index or force reindex 

2215 if collection.embedding_model is None or force_reindex: 

2216 collection.embedding_model = rag_service.embedding_model 

2217 collection.embedding_model_type = EmbeddingProvider( 

2218 rag_service.embedding_provider 

2219 ) 

2220 collection.chunk_size = rag_service.chunk_size 

2221 collection.chunk_overlap = rag_service.chunk_overlap 

2222 collection.splitter_type = rag_service.splitter_type 

2223 collection.text_separators = rag_service.text_separators 

2224 collection.distance_metric = rag_service.distance_metric 

2225 collection.normalize_vectors = bool( 

2226 rag_service.normalize_vectors 

2227 ) 

2228 collection.index_type = rag_service.index_type 

2229 db_session.commit() 

2230 

2231 # Clean up old index data for a fresh rebuild. 

2232 # This prevents mixed-model vectors if cancelled midway 

2233 # and ensures accurate stats during partial reindex. 

2234 if force_reindex: 

2235 from ..deletion.utils.cascade_helper import CascadeHelper 

2236 

2237 collection_name = f"collection_{collection_id}" 

2238 

2239 # Delete all old document chunks from DB 

2240 deleted_chunks = CascadeHelper.delete_collection_chunks( 

2241 db_session, collection_name 

2242 ) 

2243 logger.info( 

2244 f"Cleared {deleted_chunks} old chunks for collection {collection_id}" 

2245 ) 

2246 

2247 # Delete old FAISS index files (.faiss + .pkl) and RAGIndex records 

2248 # RagDocumentStatus rows cascade-delete via FK ondelete="CASCADE" 

2249 rag_result = ( 

2250 CascadeHelper.delete_rag_indices_for_collection( 

2251 db_session, collection_name 

2252 ) 

2253 ) 

2254 logger.info( 

2255 f"Cleared old RAG indices for collection {collection_id}: {rag_result}" 

2256 ) 

2257 

2258 # Mark all documents as unindexed 

2259 db_session.query(DocumentCollection).filter_by( 

2260 collection_id=collection_id 

2261 ).update( 

2262 { 

2263 DocumentCollection.indexed: False, 

2264 DocumentCollection.chunk_count: 0, 

2265 } 

2266 ) 

2267 db_session.commit() 

2268 logger.info( 

2269 f"Reset indexing state for collection {collection_id}" 

2270 ) 

2271 

2272 # Get documents to index 

2273 query = ( 

2274 db_session.query(DocumentCollection, Document) 

2275 .join(Document) 

2276 .filter(DocumentCollection.collection_id == collection_id) 

2277 ) 

2278 

2279 if not force_reindex: 

2280 query = query.filter(DocumentCollection.indexed == False) # noqa: E712 

2281 

2282 doc_links = query.all() 

2283 

2284 if not doc_links: 

2285 _update_task_status( 

2286 username, 

2287 db_password, 

2288 task_id, 

2289 status="completed", 

2290 progress_message="No documents to index", 

2291 ) 

2292 return 

2293 

2294 total = len(doc_links) 

2295 results = {"successful": 0, "skipped": 0, "failed": 0} 

2296 

2297 # Update task with total count 

2298 _update_task_status( 

2299 username, 

2300 db_password, 

2301 task_id, 

2302 progress_total=total, 

2303 progress_message=f"Indexing {total} documents", 

2304 ) 

2305 

2306 for idx, (link, doc) in enumerate(doc_links, 1): 

2307 # Check if cancelled 

2308 if _is_task_cancelled(username, db_password, task_id): 

2309 _update_task_status( 

2310 username, 

2311 db_password, 

2312 task_id, 

2313 status="cancelled", 

2314 progress_message=f"Cancelled after {idx - 1}/{total} documents", 

2315 ) 

2316 logger.info(f"Indexing task {task_id} was cancelled") 

2317 return 

2318 

2319 filename = doc.filename or doc.title or "Unknown" 

2320 

2321 # Update progress with filename 

2322 _update_task_status( 

2323 username, 

2324 db_password, 

2325 task_id, 

2326 progress_current=idx, 

2327 progress_message=f"Indexing {idx}/{total}: {filename}", 

2328 ) 

2329 

2330 try: 

2331 result = rag_service.index_document( 

2332 document_id=doc.id, 

2333 collection_id=collection_id, 

2334 force_reindex=force_reindex, 

2335 ) 

2336 

2337 if result.get("status") == "success": 

2338 results["successful"] += 1 

2339 elif result.get("status") == "skipped": 

2340 results["skipped"] += 1 

2341 else: 

2342 results["failed"] += 1 

2343 

2344 except Exception: 

2345 results["failed"] += 1 

2346 logger.exception( 

2347 f"Error indexing document {idx}/{total}" 

2348 ) 

2349 

2350 db_session.commit() 

2351 

2352 # Mark as completed 

2353 _update_task_status( 

2354 username, 

2355 db_password, 

2356 task_id, 

2357 status="completed", 

2358 progress_current=total, 

2359 progress_message=f"Completed: {results['successful']} indexed, {results['failed']} failed, {results['skipped']} skipped", 

2360 ) 

2361 logger.info( 

2362 f"Background indexing task {task_id} completed: {results}" 

2363 ) 

2364 

2365 except Exception as e: 

2366 logger.exception(f"Background indexing task {task_id} failed") 

2367 _update_task_status( 

2368 username, 

2369 db_password, 

2370 task_id, 

2371 status="failed", 

2372 error_message=str(e), 

2373 ) 

2374 

2375 

2376def _update_task_status( 

2377 username: str, 

2378 db_password: str, 

2379 task_id: str, 

2380 status: str = None, 

2381 progress_current: int = None, 

2382 progress_total: int = None, 

2383 progress_message: str = None, 

2384 error_message: str = None, 

2385): 

2386 """Update task metadata in the database.""" 

2387 from ...database.session_context import get_user_db_session 

2388 

2389 try: 

2390 with get_user_db_session(username, db_password) as db_session: 

2391 task = ( 

2392 db_session.query(TaskMetadata) 

2393 .filter_by(task_id=task_id) 

2394 .first() 

2395 ) 

2396 if task: 

2397 if status is not None: 

2398 task.status = status 

2399 if status == "completed": 

2400 task.completed_at = datetime.now(UTC) 

2401 if progress_current is not None: 

2402 task.progress_current = progress_current 

2403 if progress_total is not None: 

2404 task.progress_total = progress_total 

2405 if progress_message is not None: 

2406 task.progress_message = progress_message 

2407 if error_message is not None: 

2408 task.error_message = error_message 

2409 db_session.commit() 

2410 except Exception: 

2411 logger.exception(f"Failed to update task status for {task_id}") 

2412 

2413 

2414def _is_task_cancelled(username: str, db_password: str, task_id: str) -> bool: 

2415 """Check if a task has been cancelled.""" 

2416 from ...database.session_context import get_user_db_session 

2417 

2418 try: 

2419 with get_user_db_session(username, db_password) as db_session: 

2420 task = ( 

2421 db_session.query(TaskMetadata) 

2422 .filter_by(task_id=task_id) 

2423 .first() 

2424 ) 

2425 return task and task.status == "cancelled" 

2426 except Exception: 

2427 logger.warning( 

2428 "Could not check cancellation status for task {}", task_id 

2429 ) 

2430 return False 

2431 

2432 

2433@rag_bp.route( 

2434 "/api/collections/<string:collection_id>/index/start", methods=["POST"] 

2435) 

2436@login_required 

2437def start_background_index(collection_id): 

2438 """Start background indexing for a collection.""" 

2439 from ...database.session_context import get_user_db_session 

2440 from ...database.session_passwords import session_password_store 

2441 

2442 username = session["username"] 

2443 session_id = session.get("session_id") 

2444 

2445 # Get password for thread access 

2446 db_password = None 

2447 if session_id: 2447 ↛ 2453line 2447 didn't jump to line 2453 because the condition on line 2447 was always true

2448 db_password = session_password_store.get_session_password( 

2449 username, session_id 

2450 ) 

2451 

2452 # Parse request body 

2453 data = request.get_json() or {} 

2454 force_reindex = data.get("force_reindex", False) 

2455 

2456 try: 

2457 with get_user_db_session(username, db_password) as db_session: 

2458 # Check if there's already an active indexing task for this collection 

2459 existing_task = ( 

2460 db_session.query(TaskMetadata) 

2461 .filter( 

2462 TaskMetadata.task_type == "indexing", 

2463 TaskMetadata.status == "processing", 

2464 ) 

2465 .first() 

2466 ) 

2467 

2468 if existing_task: 

2469 # Check if it's for this collection 

2470 metadata = existing_task.metadata_json or {} 

2471 if metadata.get("collection_id") == collection_id: 

2472 return jsonify( 

2473 { 

2474 "success": False, 

2475 "error": "Indexing is already in progress for this collection", 

2476 "task_id": existing_task.task_id, 

2477 } 

2478 ), 409 

2479 

2480 # Create new task 

2481 task_id = str(uuid.uuid4()) 

2482 task = TaskMetadata( 

2483 task_id=task_id, 

2484 status="processing", 

2485 task_type="indexing", 

2486 created_at=datetime.now(UTC), 

2487 started_at=datetime.now(UTC), 

2488 progress_current=0, 

2489 progress_total=0, 

2490 progress_message="Starting indexing...", 

2491 metadata_json={ 

2492 "collection_id": collection_id, 

2493 "force_reindex": force_reindex, 

2494 }, 

2495 ) 

2496 db_session.add(task) 

2497 db_session.commit() 

2498 

2499 # Start background thread 

2500 thread = threading.Thread( 

2501 target=_background_index_worker, 

2502 args=(task_id, collection_id, username, db_password, force_reindex), 

2503 daemon=True, 

2504 ) 

2505 thread.start() 

2506 

2507 logger.info( 

2508 f"Started background indexing task {task_id} for collection {collection_id}" 

2509 ) 

2510 

2511 return jsonify( 

2512 { 

2513 "success": True, 

2514 "task_id": task_id, 

2515 "message": "Indexing started in background", 

2516 } 

2517 ) 

2518 

2519 except Exception: 

2520 logger.exception("Failed to start background indexing") 

2521 return jsonify( 

2522 { 

2523 "success": False, 

2524 "error": "Failed to start indexing. Please try again.", 

2525 } 

2526 ), 500 

2527 

2528 

2529@rag_bp.route( 

2530 "/api/collections/<string:collection_id>/index/status", methods=["GET"] 

2531) 

2532@limiter.exempt 

2533@login_required 

2534def get_index_status(collection_id): 

2535 """Get the current indexing status for a collection.""" 

2536 from ...database.session_context import get_user_db_session 

2537 from ...database.session_passwords import session_password_store 

2538 

2539 username = session["username"] 

2540 session_id = session.get("session_id") 

2541 

2542 db_password = None 

2543 if session_id: 2543 ↛ 2548line 2543 didn't jump to line 2548 because the condition on line 2543 was always true

2544 db_password = session_password_store.get_session_password( 

2545 username, session_id 

2546 ) 

2547 

2548 try: 

2549 with get_user_db_session(username, db_password) as db_session: 

2550 # Find the most recent indexing task for this collection 

2551 task = ( 

2552 db_session.query(TaskMetadata) 

2553 .filter(TaskMetadata.task_type == "indexing") 

2554 .order_by(TaskMetadata.created_at.desc()) 

2555 .first() 

2556 ) 

2557 

2558 if not task: 

2559 return jsonify( 

2560 { 

2561 "status": "idle", 

2562 "message": "No indexing task found", 

2563 } 

2564 ) 

2565 

2566 # Check if it's for this collection 

2567 metadata = task.metadata_json or {} 

2568 if metadata.get("collection_id") != collection_id: 

2569 return jsonify( 

2570 { 

2571 "status": "idle", 

2572 "message": "No indexing task for this collection", 

2573 } 

2574 ) 

2575 

2576 return jsonify( 

2577 { 

2578 "task_id": task.task_id, 

2579 "status": task.status, 

2580 "progress_current": task.progress_current or 0, 

2581 "progress_total": task.progress_total or 0, 

2582 "progress_message": task.progress_message, 

2583 "error_message": task.error_message, 

2584 "created_at": task.created_at.isoformat() 

2585 if task.created_at 

2586 else None, 

2587 "completed_at": task.completed_at.isoformat() 

2588 if task.completed_at 

2589 else None, 

2590 } 

2591 ) 

2592 

2593 except Exception: 

2594 logger.exception("Failed to get index status") 

2595 return jsonify( 

2596 { 

2597 "status": "error", 

2598 "error": "Failed to get indexing status. Please try again.", 

2599 } 

2600 ), 500 

2601 

2602 

2603@rag_bp.route( 

2604 "/api/collections/<string:collection_id>/index/cancel", methods=["POST"] 

2605) 

2606@login_required 

2607def cancel_indexing(collection_id): 

2608 """Cancel an active indexing task for a collection.""" 

2609 from ...database.session_context import get_user_db_session 

2610 from ...database.session_passwords import session_password_store 

2611 

2612 username = session["username"] 

2613 session_id = session.get("session_id") 

2614 

2615 db_password = None 

2616 if session_id: 2616 ↛ 2621line 2616 didn't jump to line 2621 because the condition on line 2616 was always true

2617 db_password = session_password_store.get_session_password( 

2618 username, session_id 

2619 ) 

2620 

2621 try: 

2622 with get_user_db_session(username, db_password) as db_session: 

2623 # Find active indexing task for this collection 

2624 task = ( 

2625 db_session.query(TaskMetadata) 

2626 .filter( 

2627 TaskMetadata.task_type == "indexing", 

2628 TaskMetadata.status == "processing", 

2629 ) 

2630 .first() 

2631 ) 

2632 

2633 if not task: 

2634 return jsonify( 

2635 { 

2636 "success": False, 

2637 "error": "No active indexing task found", 

2638 } 

2639 ), 404 

2640 

2641 # Check if it's for this collection 

2642 metadata = task.metadata_json or {} 

2643 if metadata.get("collection_id") != collection_id: 

2644 return jsonify( 

2645 { 

2646 "success": False, 

2647 "error": "No active indexing task for this collection", 

2648 } 

2649 ), 404 

2650 

2651 # Mark as cancelled - the worker thread will check this 

2652 task.status = "cancelled" 

2653 task.progress_message = "Cancellation requested..." 

2654 db_session.commit() 

2655 

2656 logger.info( 

2657 f"Cancelled indexing task {task.task_id} for collection {collection_id}" 

2658 ) 

2659 

2660 return jsonify( 

2661 { 

2662 "success": True, 

2663 "message": "Cancellation requested", 

2664 "task_id": task.task_id, 

2665 } 

2666 ) 

2667 

2668 except Exception: 

2669 logger.exception("Failed to cancel indexing") 

2670 return jsonify( 

2671 { 

2672 "success": False, 

2673 "error": "Failed to cancel indexing. Please try again.", 

2674 } 

2675 ), 500 

2676 

2677 

2678# Research History Semantic Search Routes have been moved to 

2679# research_library.search.routes.search_routes