Coverage for src / local_deep_research / research_library / routes / rag_routes.py: 27%

1044 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2RAG Management API Routes 

3 

4Provides endpoints for managing RAG indexing of library documents: 

5- Configure embedding models 

6- Index documents 

7- Get RAG statistics 

8- Bulk operations with progress tracking 

9""" 

10 

11from flask import ( 

12 Blueprint, 

13 jsonify, 

14 request, 

15 Response, 

16 render_template, 

17 session, 

18 stream_with_context, 

19) 

20from loguru import logger 

21import atexit 

22import glob 

23import json 

24import uuid 

25import time 

26import threading 

27import queue 

28from concurrent.futures import ThreadPoolExecutor 

29from datetime import datetime, UTC 

30from pathlib import Path 

31from typing import Optional 

32 

33from ...web.auth.decorators import login_required 

34from ...utilities.db_utils import get_settings_manager 

35from ..services.library_rag_service import LibraryRAGService 

36from ...settings.manager import SettingsManager 

37from ...security.path_validator import PathValidator 

38from ...security import upload_rate_limit 

39from ..utils import handle_api_error 

40from ...database.models.library import ( 

41 Document, 

42 Collection, 

43 DocumentCollection, 

44 RAGIndex, 

45 SourceType, 

46 EmbeddingProvider, 

47) 

48from ...database.models.queue import TaskMetadata 

49from ...web.utils.rate_limiter import limiter 

50from ...config.paths import get_library_directory 

51 

52rag_bp = Blueprint("rag", __name__, url_prefix="/library") 

53 

54# Global ThreadPoolExecutor for auto-indexing to prevent thread proliferation 

55_auto_index_executor: ThreadPoolExecutor | None = None 

56_auto_index_executor_lock = threading.Lock() 

57 

58 

59def _get_auto_index_executor() -> ThreadPoolExecutor: 

60 """Get or create the global auto-indexing executor (thread-safe).""" 

61 global _auto_index_executor 

62 with _auto_index_executor_lock: 

63 if _auto_index_executor is None: 

64 _auto_index_executor = ThreadPoolExecutor( 

65 max_workers=4, 

66 thread_name_prefix="auto_index_", 

67 ) 

68 return _auto_index_executor 

69 

70 

71def _shutdown_auto_index_executor() -> None: 

72 """Shutdown the auto-index executor gracefully.""" 

73 global _auto_index_executor 

74 if _auto_index_executor is not None: 

75 _auto_index_executor.shutdown(wait=True) 

76 _auto_index_executor = None 

77 

78 

79atexit.register(_shutdown_auto_index_executor) 

80 

81 

82def get_rag_service( 

83 collection_id: Optional[str] = None, 

84 use_defaults: bool = False, 

85) -> LibraryRAGService: 

86 """ 

87 Get RAG service instance with appropriate settings. 

88 

89 If collection_id is provided: 

90 - Uses collection's stored settings if they exist (unless use_defaults=True) 

91 - Uses current defaults for new collections (and stores them) 

92 

93 If no collection_id: 

94 - Uses current default settings 

95 

96 Args: 

97 use_defaults: When True, ignore stored collection settings and use 

98 current defaults. Pass True on force-reindex so that the new 

99 default embedding model is picked up. 

100 """ 

101 from ...database.session_context import get_user_db_session 

102 

103 settings = get_settings_manager() 

104 username = session["username"] 

105 

106 # Get current default settings 

107 default_embedding_model = settings.get_setting( 

108 "local_search_embedding_model", "all-MiniLM-L6-v2" 

109 ) 

110 default_embedding_provider = settings.get_setting( 

111 "local_search_embedding_provider", "sentence_transformers" 

112 ) 

113 default_chunk_size = int( 

114 settings.get_setting("local_search_chunk_size", 1000) 

115 ) 

116 default_chunk_overlap = int( 

117 settings.get_setting("local_search_chunk_overlap", 200) 

118 ) 

119 

120 # Get new advanced configuration settings (Issue #1054) 

121 import json 

122 

123 default_splitter_type = settings.get_setting( 

124 "local_search_splitter_type", "recursive" 

125 ) 

126 default_text_separators = settings.get_setting( 

127 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]' 

128 ) 

129 # Parse JSON string to list 

130 if isinstance(default_text_separators, str): 130 ↛ 139line 130 didn't jump to line 139 because the condition on line 130 was always true

131 try: 

132 default_text_separators = json.loads(default_text_separators) 

133 except json.JSONDecodeError: 

134 logger.warning( 

135 f"Invalid JSON for local_search_text_separators setting: {default_text_separators!r}. " 

136 "Using default separators." 

137 ) 

138 default_text_separators = ["\n\n", "\n", ". ", " ", ""] 

139 default_distance_metric = settings.get_setting( 

140 "local_search_distance_metric", "cosine" 

141 ) 

142 # Get normalize_vectors as a proper boolean 

143 default_normalize_vectors = settings.get_bool_setting( 

144 "local_search_normalize_vectors", True 

145 ) 

146 default_index_type = settings.get_setting("local_search_index_type", "flat") 

147 

148 # If collection_id provided, check for stored settings 

149 if collection_id: 

150 with get_user_db_session(username) as db_session: 

151 collection = ( 

152 db_session.query(Collection).filter_by(id=collection_id).first() 

153 ) 

154 

155 if collection and collection.embedding_model and not use_defaults: 

156 # Use collection's stored settings 

157 logger.info( 

158 f"Using stored settings for collection {collection_id}: " 

159 f"{collection.embedding_model_type.value}/{collection.embedding_model}" 

160 ) 

161 # Handle normalize_vectors - may be stored as string in some cases 

162 coll_normalize = collection.normalize_vectors 

163 if coll_normalize is not None: 163 ↛ 173line 163 didn't jump to line 173 because the condition on line 163 was always true

164 if isinstance(coll_normalize, str): 

165 coll_normalize = coll_normalize.lower() in ( 

166 "true", 

167 "1", 

168 "yes", 

169 ) 

170 else: 

171 coll_normalize = bool(coll_normalize) 

172 else: 

173 coll_normalize = default_normalize_vectors 

174 

175 return LibraryRAGService( 

176 username=username, 

177 embedding_model=collection.embedding_model, 

178 embedding_provider=collection.embedding_model_type.value, 

179 chunk_size=collection.chunk_size or default_chunk_size, 

180 chunk_overlap=collection.chunk_overlap 

181 or default_chunk_overlap, 

182 splitter_type=collection.splitter_type 

183 or default_splitter_type, 

184 text_separators=collection.text_separators 

185 or default_text_separators, 

186 distance_metric=collection.distance_metric 

187 or default_distance_metric, 

188 normalize_vectors=coll_normalize, 

189 index_type=collection.index_type or default_index_type, 

190 ) 

191 elif collection: 191 ↛ 218line 191 didn't jump to line 218

192 # New collection - use defaults and store them 

193 logger.info( 

194 f"New collection {collection_id}, using and storing default settings" 

195 ) 

196 

197 # Create service with defaults 

198 service = LibraryRAGService( 

199 username=username, 

200 embedding_model=default_embedding_model, 

201 embedding_provider=default_embedding_provider, 

202 chunk_size=default_chunk_size, 

203 chunk_overlap=default_chunk_overlap, 

204 splitter_type=default_splitter_type, 

205 text_separators=default_text_separators, 

206 distance_metric=default_distance_metric, 

207 normalize_vectors=default_normalize_vectors, 

208 index_type=default_index_type, 

209 ) 

210 

211 # Store settings on collection (will be done during indexing) 

212 # Note: We don't store here because we don't have embedding_dimension yet 

213 # It will be stored in index_collection when first document is indexed 

214 

215 return service 

216 

217 # No collection or fallback - use current defaults 

218 return LibraryRAGService( 

219 username=username, 

220 embedding_model=default_embedding_model, 

221 embedding_provider=default_embedding_provider, 

222 chunk_size=default_chunk_size, 

223 chunk_overlap=default_chunk_overlap, 

224 splitter_type=default_splitter_type, 

225 text_separators=default_text_separators, 

226 distance_metric=default_distance_metric, 

227 normalize_vectors=default_normalize_vectors, 

228 index_type=default_index_type, 

229 ) 

230 

231 

232# Config API Routes 

233 

234 

235@rag_bp.route("/api/config/supported-formats", methods=["GET"]) 

236@login_required 

237def get_supported_formats(): 

238 """Return list of supported file formats for upload. 

239 

240 This endpoint provides the single source of truth for supported file 

241 extensions, pulling from the document_loaders registry. The UI can 

242 use this to dynamically update the file input accept attribute. 

243 """ 

244 from ...document_loaders import get_supported_extensions 

245 

246 extensions = get_supported_extensions() 

247 # Sort extensions for consistent display 

248 extensions = sorted(extensions) 

249 

250 return jsonify( 

251 { 

252 "extensions": extensions, 

253 "accept_string": ",".join(extensions), 

254 "count": len(extensions), 

255 } 

256 ) 

257 

258 

259# Page Routes 

260 

261 

262@rag_bp.route("/embedding-settings") 

263@login_required 

264def embedding_settings_page(): 

265 """Render the Embedding Settings page.""" 

266 return render_template( 

267 "pages/embedding_settings.html", active_page="embedding-settings" 

268 ) 

269 

270 

271@rag_bp.route("/document/<string:document_id>/chunks") 

272@login_required 

273def view_document_chunks(document_id): 

274 """View all chunks for a document across all collections.""" 

275 from ...database.session_context import get_user_db_session 

276 from ...database.models.library import DocumentChunk 

277 

278 username = session.get("username") 

279 

280 with get_user_db_session(username) as db_session: 

281 # Get document info 

282 document = db_session.query(Document).filter_by(id=document_id).first() 

283 

284 if not document: 

285 return "Document not found", 404 

286 

287 # Get all chunks for this document 

288 chunks = ( 

289 db_session.query(DocumentChunk) 

290 .filter(DocumentChunk.source_id == document_id) 

291 .order_by(DocumentChunk.collection_name, DocumentChunk.chunk_index) 

292 .all() 

293 ) 

294 

295 # Group chunks by collection 

296 chunks_by_collection = {} 

297 for chunk in chunks: 

298 coll_name = chunk.collection_name 

299 if coll_name not in chunks_by_collection: 

300 # Get collection display name 

301 collection_id = coll_name.replace("collection_", "") 

302 collection = ( 

303 db_session.query(Collection) 

304 .filter_by(id=collection_id) 

305 .first() 

306 ) 

307 chunks_by_collection[coll_name] = { 

308 "name": collection.name if collection else coll_name, 

309 "id": collection_id, 

310 "chunks": [], 

311 } 

312 

313 chunks_by_collection[coll_name]["chunks"].append( 

314 { 

315 "id": chunk.id, 

316 "index": chunk.chunk_index, 

317 "text": chunk.chunk_text, 

318 "word_count": chunk.word_count, 

319 "start_char": chunk.start_char, 

320 "end_char": chunk.end_char, 

321 "embedding_model": chunk.embedding_model, 

322 "embedding_model_type": chunk.embedding_model_type.value 

323 if chunk.embedding_model_type 

324 else None, 

325 "embedding_dimension": chunk.embedding_dimension, 

326 "created_at": chunk.created_at, 

327 } 

328 ) 

329 

330 return render_template( 

331 "pages/document_chunks.html", 

332 document=document, 

333 chunks_by_collection=chunks_by_collection, 

334 total_chunks=len(chunks), 

335 ) 

336 

337 

338@rag_bp.route("/collections") 

339@login_required 

340def collections_page(): 

341 """Render the Collections page.""" 

342 return render_template("pages/collections.html", active_page="collections") 

343 

344 

345@rag_bp.route("/collections/<string:collection_id>") 

346@login_required 

347def collection_details_page(collection_id): 

348 """Render the Collection Details page.""" 

349 return render_template( 

350 "pages/collection_details.html", 

351 active_page="collections", 

352 collection_id=collection_id, 

353 ) 

354 

355 

356@rag_bp.route("/collections/<string:collection_id>/upload") 

357@login_required 

358def collection_upload_page(collection_id): 

359 """Render the Collection Upload page.""" 

360 # Get the upload PDF storage setting 

361 settings = get_settings_manager() 

362 upload_pdf_storage = settings.get_setting( 

363 "research_library.upload_pdf_storage", "none" 

364 ) 

365 # Only allow valid values for uploads (no filesystem) 

366 if upload_pdf_storage not in ("database", "none"): 

367 upload_pdf_storage = "none" 

368 

369 return render_template( 

370 "pages/collection_upload.html", 

371 active_page="collections", 

372 collection_id=collection_id, 

373 collection_name=None, # Could fetch from DB if needed 

374 upload_pdf_storage=upload_pdf_storage, 

375 ) 

376 

377 

378@rag_bp.route("/collections/create") 

379@login_required 

380def collection_create_page(): 

381 """Render the Create Collection page.""" 

382 return render_template( 

383 "pages/collection_create.html", active_page="collections" 

384 ) 

385 

386 

387# API Routes 

388 

389 

390@rag_bp.route("/api/rag/settings", methods=["GET"]) 

391@login_required 

392def get_current_settings(): 

393 """Get current RAG configuration from settings.""" 

394 import json as json_lib 

395 

396 try: 

397 settings = get_settings_manager() 

398 

399 # Get text separators and parse if needed 

400 text_separators = settings.get_setting( 

401 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]' 

402 ) 

403 if isinstance(text_separators, str): 

404 try: 

405 text_separators = json_lib.loads(text_separators) 

406 except json_lib.JSONDecodeError: 

407 logger.warning( 

408 f"Invalid JSON for local_search_text_separators setting: {text_separators!r}. " 

409 "Using default separators." 

410 ) 

411 text_separators = ["\n\n", "\n", ". ", " ", ""] 

412 

413 normalize_vectors = settings.get_setting( 

414 "local_search_normalize_vectors", True 

415 ) 

416 

417 return jsonify( 

418 { 

419 "success": True, 

420 "settings": { 

421 "embedding_provider": settings.get_setting( 

422 "local_search_embedding_provider", 

423 "sentence_transformers", 

424 ), 

425 "embedding_model": settings.get_setting( 

426 "local_search_embedding_model", "all-MiniLM-L6-v2" 

427 ), 

428 "chunk_size": settings.get_setting( 

429 "local_search_chunk_size", 1000 

430 ), 

431 "chunk_overlap": settings.get_setting( 

432 "local_search_chunk_overlap", 200 

433 ), 

434 "splitter_type": settings.get_setting( 

435 "local_search_splitter_type", "recursive" 

436 ), 

437 "text_separators": text_separators, 

438 "distance_metric": settings.get_setting( 

439 "local_search_distance_metric", "cosine" 

440 ), 

441 "normalize_vectors": normalize_vectors, 

442 "index_type": settings.get_setting( 

443 "local_search_index_type", "flat" 

444 ), 

445 }, 

446 } 

447 ) 

448 except Exception as e: 

449 return handle_api_error("getting RAG settings", e) 

450 

451 

452@rag_bp.route("/api/rag/test-embedding", methods=["POST"]) 

453@login_required 

454def test_embedding(): 

455 """Test an embedding configuration by generating a test embedding.""" 

456 

457 try: 

458 data = request.json 

459 provider = data.get("provider") 

460 model = data.get("model") 

461 test_text = data.get("test_text", "This is a test.") 

462 

463 if not provider or not model: 

464 return jsonify( 

465 {"success": False, "error": "Provider and model are required"} 

466 ), 400 

467 

468 # Import embedding functions 

469 from ...embeddings.embeddings_config import ( 

470 get_embedding_function, 

471 ) 

472 

473 logger.info( 

474 f"Testing embedding with provider={provider}, model={model}" 

475 ) 

476 

477 # Get user's settings so provider URLs (e.g. Ollama) are resolved correctly 

478 settings = get_settings_manager() 

479 settings_snapshot = ( 

480 settings.get_all_settings() 

481 if hasattr(settings, "get_all_settings") 

482 else {} 

483 ) 

484 

485 # Get embedding function with the specified configuration 

486 start_time = time.time() 

487 embedding_func = get_embedding_function( 

488 provider=provider, 

489 model_name=model, 

490 settings_snapshot=settings_snapshot, 

491 ) 

492 

493 # Generate test embedding 

494 embedding = embedding_func([test_text])[0] 

495 response_time_ms = int((time.time() - start_time) * 1000) 

496 

497 # Get embedding dimension 

498 dimension = len(embedding) if hasattr(embedding, "__len__") else None 

499 

500 return jsonify( 

501 { 

502 "success": True, 

503 "dimension": dimension, 

504 "response_time_ms": response_time_ms, 

505 "provider": provider, 

506 "model": model, 

507 } 

508 ) 

509 

510 except Exception as e: 

511 logger.exception("Error during testing embedding") 

512 error_str = str(e).lower() 

513 

514 # Detect common signs that an LLM was selected instead of an embedding model 

515 llm_hints = [ 

516 "does not support", 

517 "not an embedding", 

518 "generate embedding", 

519 "invalid model", 

520 "not found", 

521 "expected float", 

522 "could not convert", 

523 "list index out of range", 

524 "object is not subscriptable", 

525 "not iterable", 

526 "json", 

527 "chat", 

528 "completion", 

529 ] 

530 is_likely_llm = any(hint in error_str for hint in llm_hints) 

531 

532 if is_likely_llm: 

533 user_message = ( 

534 f"Embedding test failed for model '{model}'. " 

535 "This is most likely because an LLM (language model) was selected " 

536 "instead of an embedding model. Please choose a dedicated embedding " 

537 "model (e.g. nomic-embed-text, mxbai-embed-large, " 

538 "all-MiniLM-L6-v2)." 

539 ) 

540 else: 

541 user_message = ( 

542 f"Embedding test failed for model '{model}'. " 

543 "If you are unsure whether the selected model supports embeddings, " 

544 "try a dedicated embedding model instead (e.g. nomic-embed-text, " 

545 "mxbai-embed-large, all-MiniLM-L6-v2)." 

546 ) 

547 

548 return jsonify({"success": False, "error": user_message}), 500 

549 

550 

551@rag_bp.route("/api/rag/models", methods=["GET"]) 

552@login_required 

553def get_available_models(): 

554 """Get list of available embedding providers and models.""" 

555 try: 

556 from ...embeddings.embeddings_config import _get_provider_classes 

557 

558 # Get current settings for providers 

559 settings = get_settings_manager() 

560 settings_snapshot = ( 

561 settings.get_all_settings() 

562 if hasattr(settings, "get_all_settings") 

563 else {} 

564 ) 

565 

566 # Get provider classes 

567 provider_classes = _get_provider_classes() 

568 

569 # Provider display names 

570 provider_labels = { 

571 "sentence_transformers": "Sentence Transformers (Local)", 

572 "ollama": "Ollama (Local)", 

573 "openai": "OpenAI API", 

574 } 

575 

576 # Get provider options and models by looping through providers 

577 provider_options = [] 

578 providers = {} 

579 

580 for provider_key, provider_class in provider_classes.items(): 

581 available = provider_class.is_available(settings_snapshot) 

582 

583 # Always show the provider in the dropdown so users can 

584 # configure its settings (e.g. fix a wrong Ollama URL). 

585 provider_options.append( 

586 { 

587 "value": provider_key, 

588 "label": provider_labels.get(provider_key, provider_key), 

589 "available": available, 

590 } 

591 ) 

592 

593 # Only fetch models when the provider is reachable. 

594 if available: 

595 models = provider_class.get_available_models(settings_snapshot) 

596 providers[provider_key] = [ 

597 { 

598 "value": m["value"], 

599 "label": m["label"], 

600 "provider": provider_key, 

601 **( 

602 {"is_embedding": m["is_embedding"]} 

603 if "is_embedding" in m 

604 else {} 

605 ), 

606 } 

607 for m in models 

608 ] 

609 else: 

610 providers[provider_key] = [] 

611 

612 return jsonify( 

613 { 

614 "success": True, 

615 "provider_options": provider_options, 

616 "providers": providers, 

617 } 

618 ) 

619 except Exception as e: 

620 return handle_api_error("getting available models", e) 

621 

622 

623@rag_bp.route("/api/rag/info", methods=["GET"]) 

624@login_required 

625def get_index_info(): 

626 """Get information about the current RAG index.""" 

627 from ...database.library_init import get_default_library_id 

628 

629 try: 

630 # Get collection_id from request or use default Library collection 

631 collection_id = request.args.get("collection_id") 

632 if not collection_id: 

633 collection_id = get_default_library_id(session["username"]) 

634 

635 logger.info( 

636 f"Getting RAG index info for collection_id: {collection_id}" 

637 ) 

638 

639 rag_service = get_rag_service(collection_id) 

640 info = rag_service.get_current_index_info(collection_id) 

641 

642 if info is None: 

643 logger.info( 

644 f"No RAG index found for collection_id: {collection_id}" 

645 ) 

646 return jsonify( 

647 {"success": True, "info": None, "message": "No index found"} 

648 ) 

649 

650 logger.info(f"Found RAG index for collection_id: {collection_id}") 

651 return jsonify({"success": True, "info": info}) 

652 except Exception as e: 

653 return handle_api_error("getting index info", e) 

654 

655 

656@rag_bp.route("/api/rag/stats", methods=["GET"]) 

657@login_required 

658def get_rag_stats(): 

659 """Get RAG statistics for a collection.""" 

660 from ...database.library_init import get_default_library_id 

661 

662 try: 

663 # Get collection_id from request or use default Library collection 

664 collection_id = request.args.get("collection_id") 

665 if not collection_id: 

666 collection_id = get_default_library_id(session["username"]) 

667 

668 rag_service = get_rag_service(collection_id) 

669 stats = rag_service.get_rag_stats(collection_id) 

670 

671 return jsonify({"success": True, "stats": stats}) 

672 except Exception as e: 

673 return handle_api_error("getting RAG stats", e) 

674 

675 

676@rag_bp.route("/api/rag/index-document", methods=["POST"]) 

677@login_required 

678def index_document(): 

679 """Index a single document in a collection.""" 

680 from ...database.library_init import get_default_library_id 

681 

682 try: 

683 data = request.get_json() 

684 text_doc_id = data.get("text_doc_id") 

685 force_reindex = data.get("force_reindex", False) 

686 collection_id = data.get("collection_id") 

687 

688 if not text_doc_id: 

689 return jsonify( 

690 {"success": False, "error": "text_doc_id is required"} 

691 ), 400 

692 

693 # Get collection_id from request or use default Library collection 

694 if not collection_id: 

695 collection_id = get_default_library_id(session["username"]) 

696 

697 rag_service = get_rag_service(collection_id) 

698 result = rag_service.index_document( 

699 text_doc_id, collection_id, force_reindex 

700 ) 

701 

702 if result["status"] == "error": 

703 return jsonify( 

704 {"success": False, "error": result.get("error")} 

705 ), 400 

706 

707 return jsonify({"success": True, "result": result}) 

708 except Exception as e: 

709 return handle_api_error(f"indexing document {text_doc_id}", e) 

710 

711 

712@rag_bp.route("/api/rag/remove-document", methods=["POST"]) 

713@login_required 

714def remove_document(): 

715 """Remove a document from RAG in a collection.""" 

716 from ...database.library_init import get_default_library_id 

717 

718 try: 

719 data = request.get_json() 

720 text_doc_id = data.get("text_doc_id") 

721 collection_id = data.get("collection_id") 

722 

723 if not text_doc_id: 

724 return jsonify( 

725 {"success": False, "error": "text_doc_id is required"} 

726 ), 400 

727 

728 # Get collection_id from request or use default Library collection 

729 if not collection_id: 

730 collection_id = get_default_library_id(session["username"]) 

731 

732 rag_service = get_rag_service(collection_id) 

733 result = rag_service.remove_document_from_rag( 

734 text_doc_id, collection_id 

735 ) 

736 

737 if result["status"] == "error": 

738 return jsonify( 

739 {"success": False, "error": result.get("error")} 

740 ), 400 

741 

742 return jsonify({"success": True, "result": result}) 

743 except Exception as e: 

744 return handle_api_error(f"removing document {text_doc_id}", e) 

745 

746 

747@rag_bp.route("/api/rag/index-research", methods=["POST"]) 

748@login_required 

749def index_research(): 

750 """Index all documents from a research.""" 

751 try: 

752 data = request.get_json() 

753 research_id = data.get("research_id") 

754 force_reindex = data.get("force_reindex", False) 

755 

756 if not research_id: 

757 return jsonify( 

758 {"success": False, "error": "research_id is required"} 

759 ), 400 

760 

761 rag_service = get_rag_service() 

762 results = rag_service.index_research_documents( 

763 research_id, force_reindex 

764 ) 

765 

766 return jsonify({"success": True, "results": results}) 

767 except Exception as e: 

768 return handle_api_error(f"indexing research {research_id}", e) 

769 

770 

771@rag_bp.route("/api/rag/index-all", methods=["GET"]) 

772@login_required 

773def index_all(): 

774 """Index all documents in a collection with Server-Sent Events progress.""" 

775 from ...database.session_context import get_user_db_session 

776 from ...database.library_init import get_default_library_id 

777 

778 force_reindex = request.args.get("force_reindex", "false").lower() == "true" 

779 username = session["username"] 

780 

781 # Get collection_id from request or use default Library collection 

782 collection_id = request.args.get("collection_id") 

783 if not collection_id: 

784 collection_id = get_default_library_id(username) 

785 

786 logger.info( 

787 f"Starting index-all for collection_id: {collection_id}, force_reindex: {force_reindex}" 

788 ) 

789 

790 # Create RAG service in request context before generator runs 

791 rag_service = get_rag_service(collection_id) 

792 

793 def generate(): 

794 """Generator function for SSE progress updates.""" 

795 try: 

796 # Send initial status 

797 yield f"data: {json.dumps({'type': 'start', 'message': 'Starting bulk indexing...'})}\n\n" 

798 

799 # Get document IDs to index from DocumentCollection 

800 with get_user_db_session(username) as db_session: 

801 # Query Document joined with DocumentCollection for the collection 

802 query = ( 

803 db_session.query(Document.id, Document.title) 

804 .join( 

805 DocumentCollection, 

806 Document.id == DocumentCollection.document_id, 

807 ) 

808 .filter(DocumentCollection.collection_id == collection_id) 

809 ) 

810 

811 if not force_reindex: 

812 # Only index documents that haven't been indexed yet 

813 query = query.filter(DocumentCollection.indexed.is_(False)) 

814 

815 doc_info = [(doc_id, title) for doc_id, title in query.all()] 

816 

817 if not doc_info: 

818 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n" 

819 return 

820 

821 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []} 

822 total = len(doc_info) 

823 

824 # Process documents in batches to optimize performance 

825 # Get batch size from settings 

826 settings = get_settings_manager() 

827 batch_size = int( 

828 settings.get_setting("rag.indexing_batch_size", 15) 

829 ) 

830 processed = 0 

831 

832 for i in range(0, len(doc_info), batch_size): 

833 batch = doc_info[i : i + batch_size] 

834 

835 # Process batch with collection_id 

836 batch_results = rag_service.index_documents_batch( 

837 batch, collection_id, force_reindex 

838 ) 

839 

840 # Process results and send progress updates 

841 for j, (doc_id, title) in enumerate(batch): 

842 processed += 1 

843 result = batch_results[doc_id] 

844 

845 # Send progress update 

846 yield f"data: {json.dumps({'type': 'progress', 'current': processed, 'total': total, 'title': title, 'percent': int((processed / total) * 100)})}\n\n" 

847 

848 if result["status"] == "success": 

849 results["successful"] += 1 

850 elif result["status"] == "skipped": 

851 results["skipped"] += 1 

852 else: 

853 results["failed"] += 1 

854 results["errors"].append( 

855 { 

856 "doc_id": doc_id, 

857 "title": title, 

858 "error": result.get("error"), 

859 } 

860 ) 

861 

862 # Send completion status 

863 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n" 

864 

865 # Log final status for debugging 

866 logger.info( 

867 f"Bulk indexing complete: {results['successful']} successful, {results['skipped']} skipped, {results['failed']} failed" 

868 ) 

869 

870 except Exception: 

871 logger.exception("Error in bulk indexing") 

872 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n" 

873 

874 return Response( 

875 stream_with_context(generate()), mimetype="text/event-stream" 

876 ) 

877 

878 

879@rag_bp.route("/api/rag/configure", methods=["POST"]) 

880@login_required 

881def configure_rag(): 

882 """ 

883 Change RAG configuration (embedding model, chunk size, etc.). 

884 This will create a new index with the new configuration. 

885 """ 

886 import json as json_lib 

887 

888 try: 

889 data = request.get_json() 

890 embedding_model = data.get("embedding_model") 

891 embedding_provider = data.get("embedding_provider") 

892 chunk_size = data.get("chunk_size") 

893 chunk_overlap = data.get("chunk_overlap") 

894 collection_id = data.get("collection_id") 

895 

896 # Get new advanced settings (with defaults) 

897 splitter_type = data.get("splitter_type", "recursive") 

898 text_separators = data.get( 

899 "text_separators", ["\n\n", "\n", ". ", " ", ""] 

900 ) 

901 distance_metric = data.get("distance_metric", "cosine") 

902 normalize_vectors = data.get("normalize_vectors", True) 

903 index_type = data.get("index_type", "flat") 

904 

905 if not all( 

906 [ 

907 embedding_model, 

908 embedding_provider, 

909 chunk_size, 

910 chunk_overlap, 

911 ] 

912 ): 

913 return jsonify( 

914 { 

915 "success": False, 

916 "error": "All configuration parameters are required (embedding_model, embedding_provider, chunk_size, chunk_overlap)", 

917 } 

918 ), 400 

919 

920 # Save settings to database 

921 settings = get_settings_manager() 

922 settings.set_setting("local_search_embedding_model", embedding_model) 

923 settings.set_setting( 

924 "local_search_embedding_provider", embedding_provider 

925 ) 

926 settings.set_setting("local_search_chunk_size", int(chunk_size)) 

927 settings.set_setting("local_search_chunk_overlap", int(chunk_overlap)) 

928 

929 # Save new advanced settings 

930 settings.set_setting("local_search_splitter_type", splitter_type) 

931 # Convert list to JSON string for storage 

932 if isinstance(text_separators, list): 

933 text_separators_str = json_lib.dumps(text_separators) 

934 else: 

935 text_separators_str = text_separators 

936 settings.set_setting( 

937 "local_search_text_separators", text_separators_str 

938 ) 

939 settings.set_setting("local_search_distance_metric", distance_metric) 

940 settings.set_setting( 

941 "local_search_normalize_vectors", bool(normalize_vectors) 

942 ) 

943 settings.set_setting("local_search_index_type", index_type) 

944 

945 # If collection_id is provided, update that collection's configuration 

946 if collection_id: 

947 # Create new RAG service with new configuration 

948 with LibraryRAGService( 

949 username=session["username"], 

950 embedding_model=embedding_model, 

951 embedding_provider=embedding_provider, 

952 chunk_size=int(chunk_size), 

953 chunk_overlap=int(chunk_overlap), 

954 splitter_type=splitter_type, 

955 text_separators=text_separators 

956 if isinstance(text_separators, list) 

957 else json_lib.loads(text_separators), 

958 distance_metric=distance_metric, 

959 normalize_vectors=normalize_vectors, 

960 index_type=index_type, 

961 ) as new_rag_service: 

962 # Get or create new index with this configuration 

963 rag_index = new_rag_service._get_or_create_rag_index( 

964 collection_id 

965 ) 

966 

967 return jsonify( 

968 { 

969 "success": True, 

970 "message": "Configuration updated for collection. You can now index documents with the new settings.", 

971 "index_hash": rag_index.index_hash, 

972 } 

973 ) 

974 else: 

975 # Just saving default settings without updating a specific collection 

976 return jsonify( 

977 { 

978 "success": True, 

979 "message": "Default embedding settings saved successfully. New collections will use these settings.", 

980 } 

981 ) 

982 

983 except Exception as e: 

984 return handle_api_error("configuring RAG", e) 

985 

986 

987@rag_bp.route("/api/rag/documents", methods=["GET"]) 

988@login_required 

989def get_documents(): 

990 """Get library documents with their RAG status for the default Library collection (paginated).""" 

991 from ...database.session_context import get_user_db_session 

992 from ...database.library_init import get_default_library_id 

993 

994 try: 

995 # Get pagination parameters 

996 page = request.args.get("page", 1, type=int) 

997 per_page = request.args.get("per_page", 50, type=int) 

998 filter_type = request.args.get( 

999 "filter", "all" 

1000 ) # all, indexed, unindexed 

1001 

1002 # Validate pagination parameters 

1003 page = max(1, page) 

1004 per_page = min(max(10, per_page), 100) # Limit between 10-100 

1005 

1006 # Close current thread's session to force fresh connection 

1007 from ...database.thread_local_session import cleanup_current_thread 

1008 

1009 cleanup_current_thread() 

1010 

1011 username = session["username"] 

1012 

1013 # Get collection_id from request or use default Library collection 

1014 collection_id = request.args.get("collection_id") 

1015 if not collection_id: 

1016 collection_id = get_default_library_id(username) 

1017 

1018 logger.info( 

1019 f"Getting documents for collection_id: {collection_id}, filter: {filter_type}, page: {page}" 

1020 ) 

1021 

1022 with get_user_db_session(username) as db_session: 

1023 # Expire all cached objects to ensure we get fresh data from DB 

1024 db_session.expire_all() 

1025 

1026 # Import RagDocumentStatus model 

1027 from ...database.models.library import RagDocumentStatus 

1028 

1029 # Build base query - join Document with DocumentCollection for the collection 

1030 # LEFT JOIN with rag_document_status to check indexed status 

1031 query = ( 

1032 db_session.query( 

1033 Document, DocumentCollection, RagDocumentStatus 

1034 ) 

1035 .join( 

1036 DocumentCollection, 

1037 (DocumentCollection.document_id == Document.id) 

1038 & (DocumentCollection.collection_id == collection_id), 

1039 ) 

1040 .outerjoin( 

1041 RagDocumentStatus, 

1042 (RagDocumentStatus.document_id == Document.id) 

1043 & (RagDocumentStatus.collection_id == collection_id), 

1044 ) 

1045 ) 

1046 

1047 logger.debug(f"Base query for collection {collection_id}: {query}") 

1048 

1049 # Apply filters based on rag_document_status existence 

1050 if filter_type == "indexed": 

1051 query = query.filter(RagDocumentStatus.document_id.isnot(None)) 

1052 elif filter_type == "unindexed": 

1053 # Documents in collection but not indexed yet 

1054 query = query.filter(RagDocumentStatus.document_id.is_(None)) 

1055 

1056 # Get total count before pagination 

1057 total_count = query.count() 

1058 logger.info( 

1059 f"Found {total_count} total documents for collection {collection_id} with filter {filter_type}" 

1060 ) 

1061 

1062 # Apply pagination 

1063 results = ( 

1064 query.order_by(Document.created_at.desc()) 

1065 .limit(per_page) 

1066 .offset((page - 1) * per_page) 

1067 .all() 

1068 ) 

1069 

1070 documents = [ 

1071 { 

1072 "id": doc.id, 

1073 "title": doc.title, 

1074 "original_url": doc.original_url, 

1075 "rag_indexed": rag_status is not None, 

1076 "chunk_count": rag_status.chunk_count if rag_status else 0, 

1077 "created_at": doc.created_at.isoformat() 

1078 if doc.created_at 

1079 else None, 

1080 } 

1081 for doc, doc_collection, rag_status in results 

1082 ] 

1083 

1084 # Debug logging to help diagnose indexing status issues 

1085 indexed_count = sum(1 for d in documents if d["rag_indexed"]) 

1086 

1087 # Additional debug: check rag_document_status for this collection 

1088 all_indexed_statuses = ( 

1089 db_session.query(RagDocumentStatus) 

1090 .filter_by(collection_id=collection_id) 

1091 .all() 

1092 ) 

1093 logger.info( 

1094 f"rag_document_status table shows: {len(all_indexed_statuses)} documents indexed for collection {collection_id}" 

1095 ) 

1096 

1097 logger.info( 

1098 f"Returning {len(documents)} documents on page {page}: " 

1099 f"{indexed_count} indexed, {len(documents) - indexed_count} not indexed" 

1100 ) 

1101 

1102 return jsonify( 

1103 { 

1104 "success": True, 

1105 "documents": documents, 

1106 "pagination": { 

1107 "page": page, 

1108 "per_page": per_page, 

1109 "total": total_count, 

1110 "pages": (total_count + per_page - 1) // per_page, 

1111 }, 

1112 } 

1113 ) 

1114 except Exception as e: 

1115 return handle_api_error("getting documents", e) 

1116 

1117 

1118@rag_bp.route("/api/rag/index-local", methods=["GET"]) 

1119@login_required 

1120def index_local_library(): 

1121 """Index documents from a local folder with Server-Sent Events progress.""" 

1122 folder_path = request.args.get("path") 

1123 file_patterns = request.args.get( 

1124 "patterns", "*.pdf,*.txt,*.md,*.html" 

1125 ).split(",") 

1126 recursive = request.args.get("recursive", "true").lower() == "true" 

1127 

1128 if not folder_path: 

1129 return jsonify({"success": False, "error": "Path is required"}), 400 

1130 

1131 # Validate and sanitize the path to prevent traversal attacks 

1132 try: 

1133 validated_path = PathValidator.validate_local_filesystem_path( 

1134 folder_path 

1135 ) 

1136 # Re-sanitize for static analyzer recognition (CodeQL) 

1137 path = PathValidator.sanitize_for_filesystem_ops(validated_path) 

1138 except ValueError as e: 

1139 logger.warning(f"Path validation failed for '{folder_path}': {e}") 

1140 return jsonify({"success": False, "error": "Invalid path"}), 400 

1141 

1142 # Check path exists and is a directory 

1143 if not path.exists(): 

1144 return jsonify({"success": False, "error": "Path does not exist"}), 400 

1145 if not path.is_dir(): 

1146 return jsonify( 

1147 {"success": False, "error": "Path is not a directory"} 

1148 ), 400 

1149 

1150 # Create RAG service in request context 

1151 rag_service = get_rag_service() 

1152 

1153 def generate(): 

1154 """Generator function for SSE progress updates.""" 

1155 try: 

1156 # Send initial status 

1157 yield f"data: {json.dumps({'type': 'start', 'message': f'Scanning folder: {path}'})}\n\n" 

1158 

1159 # Find all matching files 

1160 files_to_index = [] 

1161 for pattern in file_patterns: 

1162 pattern = pattern.strip() 

1163 if recursive: 

1164 search_pattern = str(path / "**" / pattern) 

1165 else: 

1166 search_pattern = str(path / pattern) 

1167 

1168 matching_files = glob.glob(search_pattern, recursive=recursive) 

1169 files_to_index.extend(matching_files) 

1170 

1171 # Remove duplicates and sort 

1172 files_to_index = sorted(set(files_to_index)) 

1173 

1174 if not files_to_index: 

1175 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No matching files found'}})}\n\n" 

1176 return 

1177 

1178 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []} 

1179 total = len(files_to_index) 

1180 

1181 # Index each file 

1182 for idx, file_path in enumerate(files_to_index, 1): 

1183 file_name = Path(file_path).name 

1184 

1185 # Send progress update 

1186 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': file_name, 'percent': int((idx / total) * 100)})}\n\n" 

1187 

1188 try: 

1189 # Index the file directly using RAG service 

1190 result = rag_service.index_local_file(file_path) 

1191 

1192 if result.get("status") == "success": 

1193 results["successful"] += 1 

1194 elif result.get("status") == "skipped": 

1195 results["skipped"] += 1 

1196 else: 

1197 results["failed"] += 1 

1198 results["errors"].append( 

1199 { 

1200 "file": file_name, 

1201 "error": result.get("error", "Unknown error"), 

1202 } 

1203 ) 

1204 except Exception: 

1205 results["failed"] += 1 

1206 results["errors"].append( 

1207 {"file": file_name, "error": "Failed to index file"} 

1208 ) 

1209 logger.exception(f"Error indexing file {file_path}") 

1210 

1211 # Send completion status 

1212 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n" 

1213 

1214 logger.info( 

1215 f"Local library indexing complete for {path}: " 

1216 f"{results['successful']} successful, " 

1217 f"{results['skipped']} skipped, " 

1218 f"{results['failed']} failed" 

1219 ) 

1220 

1221 except Exception: 

1222 logger.exception("Error in local library indexing") 

1223 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n" 

1224 

1225 return Response( 

1226 stream_with_context(generate()), mimetype="text/event-stream" 

1227 ) 

1228 

1229 

1230# Collection Management Routes 

1231 

1232 

1233@rag_bp.route("/api/collections", methods=["GET"]) 

1234@login_required 

1235def get_collections(): 

1236 """Get all document collections for the current user.""" 

1237 from ...database.session_context import get_user_db_session 

1238 

1239 try: 

1240 username = session["username"] 

1241 with get_user_db_session(username) as db_session: 

1242 # No need to filter by username - each user has their own database 

1243 collections = db_session.query(Collection).all() 

1244 

1245 result = [] 

1246 for coll in collections: 

1247 collection_data = { 

1248 "id": coll.id, 

1249 "name": coll.name, 

1250 "description": coll.description, 

1251 "created_at": coll.created_at.isoformat() 

1252 if coll.created_at 

1253 else None, 

1254 "collection_type": coll.collection_type, 

1255 "is_default": coll.is_default 

1256 if hasattr(coll, "is_default") 

1257 else False, 

1258 "document_count": len(coll.document_links) 

1259 if hasattr(coll, "document_links") 

1260 else 0, 

1261 "folder_count": len(coll.linked_folders) 

1262 if hasattr(coll, "linked_folders") 

1263 else 0, 

1264 } 

1265 

1266 # Include embedding metadata if available 

1267 if coll.embedding_model: 1267 ↛ 1268line 1267 didn't jump to line 1268 because the condition on line 1267 was never true

1268 collection_data["embedding"] = { 

1269 "model": coll.embedding_model, 

1270 "provider": coll.embedding_model_type.value 

1271 if coll.embedding_model_type 

1272 else None, 

1273 "dimension": coll.embedding_dimension, 

1274 "chunk_size": coll.chunk_size, 

1275 "chunk_overlap": coll.chunk_overlap, 

1276 } 

1277 else: 

1278 collection_data["embedding"] = None 

1279 

1280 result.append(collection_data) 

1281 

1282 return jsonify({"success": True, "collections": result}) 

1283 except Exception as e: 

1284 return handle_api_error("getting collections", e) 

1285 

1286 

1287@rag_bp.route("/api/collections", methods=["POST"]) 

1288@login_required 

1289def create_collection(): 

1290 """Create a new document collection.""" 

1291 from ...database.session_context import get_user_db_session 

1292 

1293 try: 

1294 data = request.get_json() 

1295 name = data.get("name", "").strip() 

1296 description = data.get("description", "").strip() 

1297 collection_type = data.get("type", "user_uploads") 

1298 

1299 if not name: 

1300 return jsonify({"success": False, "error": "Name is required"}), 400 

1301 

1302 username = session["username"] 

1303 with get_user_db_session(username) as db_session: 

1304 # Check if collection with this name already exists in this user's database 

1305 existing = db_session.query(Collection).filter_by(name=name).first() 

1306 

1307 if existing: 

1308 return jsonify( 

1309 { 

1310 "success": False, 

1311 "error": f"Collection '{name}' already exists", 

1312 } 

1313 ), 400 

1314 

1315 # Create new collection (no username needed - each user has their own DB) 

1316 # Note: created_at uses default=utcnow() in the model, so we don't need to set it manually 

1317 collection = Collection( 

1318 id=str(uuid.uuid4()), # Generate UUID for collection 

1319 name=name, 

1320 description=description, 

1321 collection_type=collection_type, 

1322 ) 

1323 

1324 db_session.add(collection) 

1325 db_session.commit() 

1326 

1327 return jsonify( 

1328 { 

1329 "success": True, 

1330 "collection": { 

1331 "id": collection.id, 

1332 "name": collection.name, 

1333 "description": collection.description, 

1334 "created_at": collection.created_at.isoformat(), 

1335 "collection_type": collection.collection_type, 

1336 }, 

1337 } 

1338 ) 

1339 except Exception as e: 

1340 return handle_api_error("creating collection", e) 

1341 

1342 

1343@rag_bp.route("/api/collections/<string:collection_id>", methods=["PUT"]) 

1344@login_required 

1345def update_collection(collection_id): 

1346 """Update a collection's details.""" 

1347 from ...database.session_context import get_user_db_session 

1348 

1349 try: 

1350 data = request.get_json() 

1351 name = data.get("name", "").strip() 

1352 description = data.get("description", "").strip() 

1353 

1354 username = session["username"] 

1355 with get_user_db_session(username) as db_session: 

1356 # No need to filter by username - each user has their own database 

1357 collection = ( 

1358 db_session.query(Collection).filter_by(id=collection_id).first() 

1359 ) 

1360 

1361 if not collection: 

1362 return jsonify( 

1363 {"success": False, "error": "Collection not found"} 

1364 ), 404 

1365 

1366 if name: 

1367 # Check if new name conflicts with existing collection 

1368 existing = ( 

1369 db_session.query(Collection) 

1370 .filter( 

1371 Collection.name == name, 

1372 Collection.id != collection_id, 

1373 ) 

1374 .first() 

1375 ) 

1376 

1377 if existing: 

1378 return jsonify( 

1379 { 

1380 "success": False, 

1381 "error": f"Collection '{name}' already exists", 

1382 } 

1383 ), 400 

1384 

1385 collection.name = name 

1386 

1387 if description is not None: # Allow empty description 1387 ↛ 1390line 1387 didn't jump to line 1390 because the condition on line 1387 was always true

1388 collection.description = description 

1389 

1390 db_session.commit() 

1391 

1392 return jsonify( 

1393 { 

1394 "success": True, 

1395 "collection": { 

1396 "id": collection.id, 

1397 "name": collection.name, 

1398 "description": collection.description, 

1399 "created_at": collection.created_at.isoformat() 

1400 if collection.created_at 

1401 else None, 

1402 "collection_type": collection.collection_type, 

1403 }, 

1404 } 

1405 ) 

1406 except Exception as e: 

1407 return handle_api_error("updating collection", e) 

1408 

1409 

1410@rag_bp.route("/api/collections/<string:collection_id>", methods=["DELETE"]) 

1411@login_required 

1412def delete_collection(collection_id): 

1413 """Delete a collection and its orphaned documents.""" 

1414 from ..deletion.services.collection_deletion import ( 

1415 CollectionDeletionService, 

1416 ) 

1417 

1418 try: 

1419 username = session["username"] 

1420 service = CollectionDeletionService(username) 

1421 result = service.delete_collection( 

1422 collection_id, delete_orphaned_documents=True 

1423 ) 

1424 

1425 if result.get("deleted"): 

1426 return jsonify( 

1427 { 

1428 "success": True, 

1429 "message": "Collection deleted successfully", 

1430 "deleted_chunks": result.get("chunks_deleted", 0), 

1431 "orphaned_documents_deleted": result.get( 

1432 "orphaned_documents_deleted", 0 

1433 ), 

1434 } 

1435 ) 

1436 else: 

1437 error = result.get("error", "Unknown error") 

1438 status_code = 404 if "not found" in error.lower() else 400 

1439 return jsonify({"success": False, "error": error}), status_code 

1440 

1441 except Exception as e: 

1442 return handle_api_error("deleting collection", e) 

1443 

1444 

1445@rag_bp.route( 

1446 "/api/collections/<string:collection_id>/upload", methods=["POST"] 

1447) 

1448@login_required 

1449@upload_rate_limit 

1450def upload_to_collection(collection_id): 

1451 """Upload files to a collection.""" 

1452 from ...database.session_context import get_user_db_session 

1453 from werkzeug.utils import secure_filename 

1454 from pathlib import Path 

1455 import hashlib 

1456 import uuid 

1457 from ..services.pdf_storage_manager import PDFStorageManager 

1458 

1459 try: 

1460 if "files" not in request.files: 

1461 return jsonify( 

1462 {"success": False, "error": "No files provided"} 

1463 ), 400 

1464 

1465 files = request.files.getlist("files") 

1466 if not files: 

1467 return jsonify( 

1468 {"success": False, "error": "No files selected"} 

1469 ), 400 

1470 

1471 username = session["username"] 

1472 with get_user_db_session(username) as db_session: 

1473 # Verify collection exists in this user's database 

1474 collection = ( 

1475 db_session.query(Collection).filter_by(id=collection_id).first() 

1476 ) 

1477 

1478 if not collection: 

1479 return jsonify( 

1480 {"success": False, "error": "Collection not found"} 

1481 ), 404 

1482 

1483 # Get PDF storage mode from form data, falling back to user's setting 

1484 settings = get_settings_manager() 

1485 default_pdf_storage = settings.get_setting( 

1486 "research_library.upload_pdf_storage", "none" 

1487 ) 

1488 pdf_storage = request.form.get("pdf_storage", default_pdf_storage) 

1489 if pdf_storage not in ("database", "none"): 

1490 # Security: user uploads can only use database (encrypted) or none (text-only) 

1491 # Filesystem storage is not allowed for user uploads 

1492 pdf_storage = "none" 

1493 

1494 # Initialize PDF storage manager if storing PDFs in database 

1495 pdf_storage_manager = None 

1496 if pdf_storage == "database": 

1497 library_root = settings.get_setting( 

1498 "research_library.storage_path", 

1499 str(get_library_directory()), 

1500 ) 

1501 library_root = str(Path(library_root).expanduser()) 

1502 pdf_storage_manager = PDFStorageManager( 

1503 library_root=Path(library_root), storage_mode="database" 

1504 ) 

1505 logger.info("PDF storage mode: database (encrypted)") 

1506 else: 

1507 logger.info("PDF storage mode: none (text-only)") 

1508 

1509 uploaded_files = [] 

1510 errors = [] 

1511 

1512 for file in files: 

1513 if not file.filename: 

1514 continue 

1515 

1516 try: 

1517 # Read file content 

1518 file_content = file.read() 

1519 file.seek(0) # Reset for potential re-reading 

1520 

1521 # Calculate file hash for deduplication 

1522 file_hash = hashlib.sha256(file_content).hexdigest() 

1523 

1524 # Check if document already exists 

1525 existing_doc = ( 

1526 db_session.query(Document) 

1527 .filter_by(document_hash=file_hash) 

1528 .first() 

1529 ) 

1530 

1531 if existing_doc: 

1532 # Document exists, check if we can upgrade to include PDF 

1533 pdf_upgraded = False 

1534 if ( 

1535 pdf_storage == "database" 

1536 and pdf_storage_manager is not None 

1537 ): 

1538 pdf_upgraded = pdf_storage_manager.upgrade_to_pdf( 

1539 document=existing_doc, 

1540 pdf_content=file_content, 

1541 session=db_session, 

1542 ) 

1543 

1544 # Check if already in collection 

1545 existing_link = ( 

1546 db_session.query(DocumentCollection) 

1547 .filter_by( 

1548 document_id=existing_doc.id, 

1549 collection_id=collection_id, 

1550 ) 

1551 .first() 

1552 ) 

1553 

1554 if not existing_link: 

1555 # Add to collection 

1556 collection_link = DocumentCollection( 

1557 document_id=existing_doc.id, 

1558 collection_id=collection_id, 

1559 indexed=False, 

1560 chunk_count=0, 

1561 ) 

1562 db_session.add(collection_link) 

1563 status = "added_to_collection" 

1564 if pdf_upgraded: 

1565 status = "added_to_collection_pdf_upgraded" 

1566 uploaded_files.append( 

1567 { 

1568 "filename": existing_doc.filename, 

1569 "status": status, 

1570 "id": existing_doc.id, 

1571 "pdf_upgraded": pdf_upgraded, 

1572 } 

1573 ) 

1574 else: 

1575 status = "already_in_collection" 

1576 if pdf_upgraded: 

1577 status = "pdf_upgraded" 

1578 uploaded_files.append( 

1579 { 

1580 "filename": existing_doc.filename, 

1581 "status": status, 

1582 "id": existing_doc.id, 

1583 "pdf_upgraded": pdf_upgraded, 

1584 } 

1585 ) 

1586 else: 

1587 # Create new document 

1588 from ...document_loaders import ( 

1589 extract_text_from_bytes, 

1590 is_extension_supported, 

1591 ) 

1592 

1593 filename = secure_filename(file.filename) 

1594 file_extension = Path(filename).suffix.lower() 

1595 

1596 # Validate extension is supported before extraction 

1597 if not is_extension_supported(file_extension): 

1598 errors.append( 

1599 { 

1600 "filename": filename, 

1601 "error": f"Unsupported format: {file_extension}", 

1602 } 

1603 ) 

1604 continue 

1605 

1606 # Use file_type without leading dot for storage 

1607 file_type = ( 

1608 file_extension[1:] 

1609 if file_extension.startswith(".") 

1610 else file_extension 

1611 ) 

1612 

1613 # Extract text using document_loaders module 

1614 extracted_text = extract_text_from_bytes( 

1615 file_content, file_extension, filename 

1616 ) 

1617 

1618 # Clean the extracted text to remove surrogate characters 

1619 if extracted_text: 

1620 from ...text_processing import remove_surrogates 

1621 

1622 extracted_text = remove_surrogates(extracted_text) 

1623 

1624 if not extracted_text: 

1625 errors.append( 

1626 { 

1627 "filename": filename, 

1628 "error": f"Could not extract text from {file_type} file", 

1629 } 

1630 ) 

1631 logger.warning( 

1632 f"Skipping file {filename} - no text could be extracted" 

1633 ) 

1634 continue 

1635 

1636 # Get or create the user_upload source type 

1637 logger.info( 

1638 f"Getting or creating user_upload source type for {filename}" 

1639 ) 

1640 source_type = ( 

1641 db_session.query(SourceType) 

1642 .filter_by(name="user_upload") 

1643 .first() 

1644 ) 

1645 if not source_type: 

1646 logger.info("Creating new user_upload source type") 

1647 source_type = SourceType( 

1648 id=str(uuid.uuid4()), 

1649 name="user_upload", 

1650 display_name="User Upload", 

1651 description="Documents uploaded by users", 

1652 icon="fas fa-upload", 

1653 ) 

1654 db_session.add(source_type) 

1655 db_session.flush() 

1656 logger.info( 

1657 f"Created source type with ID: {source_type.id}" 

1658 ) 

1659 else: 

1660 logger.info( 

1661 f"Found existing source type with ID: {source_type.id}" 

1662 ) 

1663 

1664 # Create document with extracted text (no username needed - in user's own database) 

1665 # Note: uploaded_at uses default=utcnow() in the model, so we don't need to set it manually 

1666 doc_id = str(uuid.uuid4()) 

1667 logger.info( 

1668 f"Creating document {doc_id} for {filename}" 

1669 ) 

1670 

1671 # Determine storage mode and file_path 

1672 store_pdf_in_db = ( 

1673 pdf_storage == "database" 

1674 and file_type == "pdf" 

1675 and pdf_storage_manager is not None 

1676 ) 

1677 

1678 new_doc = Document( 

1679 id=doc_id, 

1680 source_type_id=source_type.id, 

1681 filename=filename, 

1682 document_hash=file_hash, 

1683 file_size=len(file_content), 

1684 file_type=file_type, 

1685 text_content=extracted_text, # Always store extracted text 

1686 file_path=None 

1687 if store_pdf_in_db 

1688 else "text_only_not_stored", 

1689 storage_mode="database" 

1690 if store_pdf_in_db 

1691 else "none", 

1692 ) 

1693 db_session.add(new_doc) 

1694 db_session.flush() # Get the ID 

1695 logger.info( 

1696 f"Document {new_doc.id} created successfully" 

1697 ) 

1698 

1699 # Store PDF in encrypted database if requested 

1700 pdf_stored = False 

1701 if store_pdf_in_db: 

1702 try: 

1703 pdf_storage_manager.save_pdf( 

1704 pdf_content=file_content, 

1705 document=new_doc, 

1706 session=db_session, 

1707 filename=filename, 

1708 ) 

1709 pdf_stored = True 

1710 logger.info( 

1711 f"PDF stored in encrypted database for {filename}" 

1712 ) 

1713 except Exception: 

1714 logger.exception( 

1715 f"Failed to store PDF in database for {filename}" 

1716 ) 

1717 # Continue without PDF storage - text is still saved 

1718 

1719 # Add to collection 

1720 collection_link = DocumentCollection( 

1721 document_id=new_doc.id, 

1722 collection_id=collection_id, 

1723 indexed=False, 

1724 chunk_count=0, 

1725 ) 

1726 db_session.add(collection_link) 

1727 

1728 uploaded_files.append( 

1729 { 

1730 "filename": filename, 

1731 "status": "uploaded", 

1732 "id": new_doc.id, 

1733 "text_length": len(extracted_text), 

1734 "pdf_stored": pdf_stored, 

1735 } 

1736 ) 

1737 

1738 except Exception: 

1739 errors.append( 

1740 { 

1741 "filename": file.filename, 

1742 "error": "Failed to upload file", 

1743 } 

1744 ) 

1745 logger.exception(f"Error uploading file {file.filename}") 

1746 

1747 db_session.commit() 

1748 

1749 # Trigger auto-indexing for successfully uploaded documents 

1750 document_ids = [ 

1751 f["id"] 

1752 for f in uploaded_files 

1753 if f.get("status") in ("uploaded", "added_to_collection") 

1754 ] 

1755 if document_ids: 

1756 from ...database.session_passwords import session_password_store 

1757 

1758 session_id = session.get("session_id") 

1759 db_password = session_password_store.get_session_password( 

1760 username, session_id 

1761 ) 

1762 if db_password: 

1763 trigger_auto_index( 

1764 document_ids, collection_id, username, db_password 

1765 ) 

1766 

1767 return jsonify( 

1768 { 

1769 "success": True, 

1770 "uploaded": uploaded_files, 

1771 "errors": errors, 

1772 "summary": { 

1773 "total": len(files), 

1774 "successful": len(uploaded_files), 

1775 "failed": len(errors), 

1776 }, 

1777 } 

1778 ) 

1779 

1780 except Exception as e: 

1781 return handle_api_error("uploading files", e) 

1782 

1783 

1784@rag_bp.route( 

1785 "/api/collections/<string:collection_id>/documents", methods=["GET"] 

1786) 

1787@login_required 

1788def get_collection_documents(collection_id): 

1789 """Get all documents in a collection.""" 

1790 from ...database.session_context import get_user_db_session 

1791 

1792 try: 

1793 username = session["username"] 

1794 with get_user_db_session(username) as db_session: 

1795 # Verify collection exists in this user's database 

1796 collection = ( 

1797 db_session.query(Collection).filter_by(id=collection_id).first() 

1798 ) 

1799 

1800 if not collection: 

1801 return jsonify( 

1802 {"success": False, "error": "Collection not found"} 

1803 ), 404 

1804 

1805 # Get documents through junction table 

1806 doc_links = ( 

1807 db_session.query(DocumentCollection, Document) 

1808 .join(Document) 

1809 .filter(DocumentCollection.collection_id == collection_id) 

1810 .all() 

1811 ) 

1812 

1813 documents = [] 

1814 for link, doc in doc_links: 

1815 # Check if PDF file is stored 

1816 has_pdf = bool( 

1817 doc.file_path 

1818 and doc.file_path != "metadata_only" 

1819 and doc.file_path != "text_only_not_stored" 

1820 ) 

1821 has_text_db = bool(doc.text_content) 

1822 

1823 # Use title if available, otherwise filename 

1824 display_title = doc.title or doc.filename or "Untitled" 

1825 

1826 # Get source type name 

1827 source_type_name = ( 

1828 doc.source_type.name if doc.source_type else "unknown" 

1829 ) 

1830 

1831 # Check if document is in other collections 

1832 other_collections_count = ( 

1833 db_session.query(DocumentCollection) 

1834 .filter( 

1835 DocumentCollection.document_id == doc.id, 

1836 DocumentCollection.collection_id != collection_id, 

1837 ) 

1838 .count() 

1839 ) 

1840 

1841 documents.append( 

1842 { 

1843 "id": doc.id, 

1844 "filename": display_title, 

1845 "title": display_title, 

1846 "file_type": doc.file_type, 

1847 "file_size": doc.file_size, 

1848 "uploaded_at": doc.created_at.isoformat() 

1849 if doc.created_at 

1850 else None, 

1851 "indexed": link.indexed, 

1852 "chunk_count": link.chunk_count, 

1853 "last_indexed_at": link.last_indexed_at.isoformat() 

1854 if link.last_indexed_at 

1855 else None, 

1856 "has_pdf": has_pdf, 

1857 "has_text_db": has_text_db, 

1858 "source_type": source_type_name, 

1859 "in_other_collections": other_collections_count > 0, 

1860 "other_collections_count": other_collections_count, 

1861 } 

1862 ) 

1863 

1864 # Get index file size if available 

1865 index_file_size = None 

1866 index_file_size_bytes = None 

1867 collection_name = f"collection_{collection_id}" 

1868 rag_index = ( 

1869 db_session.query(RAGIndex) 

1870 .filter_by(collection_name=collection_name) 

1871 .first() 

1872 ) 

1873 if rag_index and rag_index.index_path: 

1874 from pathlib import Path 

1875 

1876 index_path = Path(rag_index.index_path) 

1877 if index_path.exists(): 

1878 size_bytes = index_path.stat().st_size 

1879 index_file_size_bytes = size_bytes 

1880 # Format as human-readable 

1881 if size_bytes < 1024: 

1882 index_file_size = f"{size_bytes} B" 

1883 elif size_bytes < 1024 * 1024: 

1884 index_file_size = f"{size_bytes / 1024:.1f} KB" 

1885 else: 

1886 index_file_size = f"{size_bytes / (1024 * 1024):.1f} MB" 

1887 

1888 return jsonify( 

1889 { 

1890 "success": True, 

1891 "collection": { 

1892 "id": collection.id, 

1893 "name": collection.name, 

1894 "description": collection.description, 

1895 "embedding_model": collection.embedding_model, 

1896 "embedding_model_type": collection.embedding_model_type.value 

1897 if collection.embedding_model_type 

1898 else None, 

1899 "embedding_dimension": collection.embedding_dimension, 

1900 "chunk_size": collection.chunk_size, 

1901 "chunk_overlap": collection.chunk_overlap, 

1902 # Advanced settings 

1903 "splitter_type": collection.splitter_type, 

1904 "distance_metric": collection.distance_metric, 

1905 "index_type": collection.index_type, 

1906 "normalize_vectors": collection.normalize_vectors, 

1907 # Index file info 

1908 "index_file_size": index_file_size, 

1909 "index_file_size_bytes": index_file_size_bytes, 

1910 }, 

1911 "documents": documents, 

1912 } 

1913 ) 

1914 

1915 except Exception as e: 

1916 return handle_api_error("getting collection documents", e) 

1917 

1918 

1919@rag_bp.route("/api/collections/<string:collection_id>/index", methods=["GET"]) 

1920@login_required 

1921def index_collection(collection_id): 

1922 """Index all documents in a collection with Server-Sent Events progress.""" 

1923 from ...database.session_context import get_user_db_session 

1924 from ...database.session_passwords import session_password_store 

1925 

1926 force_reindex = request.args.get("force_reindex", "false").lower() == "true" 

1927 username = session["username"] 

1928 session_id = session.get("session_id") 

1929 

1930 logger.info(f"Starting index_collection, force_reindex={force_reindex}") 

1931 

1932 # Get password for thread access to encrypted database 

1933 db_password = None 

1934 if session_id: 

1935 db_password = session_password_store.get_session_password( 

1936 username, session_id 

1937 ) 

1938 

1939 # Create RAG service — on force reindex use current default model 

1940 rag_service = get_rag_service(collection_id, use_defaults=force_reindex) 

1941 # Set password for thread access 

1942 rag_service.db_password = db_password 

1943 logger.info( 

1944 f"RAG service created: provider={rag_service.embedding_provider}" 

1945 ) 

1946 

1947 def generate(): 

1948 """Generator for SSE progress updates.""" 

1949 logger.info("SSE generator started") 

1950 try: 

1951 with get_user_db_session(username, db_password) as db_session: 

1952 # Verify collection exists in this user's database 

1953 collection = ( 

1954 db_session.query(Collection) 

1955 .filter_by(id=collection_id) 

1956 .first() 

1957 ) 

1958 

1959 if not collection: 

1960 yield f"data: {json.dumps({'type': 'error', 'error': 'Collection not found'})}\n\n" 

1961 return 

1962 

1963 # Store embedding metadata on first index or force reindex 

1964 if collection.embedding_model is None or force_reindex: 

1965 # Get embedding dimension from the embedding manager 

1966 embedding_dim = None 

1967 try: 

1968 # Try to get dimension from the embedding manager's provider 

1969 if hasattr(rag_service.embedding_manager, "provider"): 

1970 provider = rag_service.embedding_manager.provider 

1971 if hasattr(provider, "embedding_dimension"): 

1972 embedding_dim = provider.embedding_dimension 

1973 except Exception as e: 

1974 logger.warning( 

1975 f"Could not get embedding dimension: {e}" 

1976 ) 

1977 

1978 # Store the embedding configuration used for this collection 

1979 collection.embedding_model = rag_service.embedding_model 

1980 collection.embedding_model_type = EmbeddingProvider( 

1981 rag_service.embedding_provider 

1982 ) 

1983 collection.embedding_dimension = embedding_dim 

1984 collection.chunk_size = rag_service.chunk_size 

1985 collection.chunk_overlap = rag_service.chunk_overlap 

1986 # Store advanced settings 

1987 collection.splitter_type = rag_service.splitter_type 

1988 collection.text_separators = rag_service.text_separators 

1989 collection.distance_metric = rag_service.distance_metric 

1990 # Ensure normalize_vectors is a proper boolean for database 

1991 collection.normalize_vectors = bool( 

1992 rag_service.normalize_vectors 

1993 ) 

1994 collection.index_type = rag_service.index_type 

1995 db_session.commit() 

1996 logger.info( 

1997 f"Stored embedding metadata for collection: provider={rag_service.embedding_provider}" 

1998 ) 

1999 

2000 # Get documents to index 

2001 query = ( 

2002 db_session.query(DocumentCollection, Document) 

2003 .join(Document) 

2004 .filter(DocumentCollection.collection_id == collection_id) 

2005 ) 

2006 

2007 if not force_reindex: 

2008 # IMPORTANT: DO NOT CHANGE THIS LINE! DO NOT REMOVE THIS COMMENT! 

2009 # We MUST use `== False` instead of `not DocumentCollection.indexed` 

2010 # The Python `not` operator does NOT work correctly in SQLAlchemy filters. 

2011 # Using `not` will cause the query to return NO results (zero documents). 

2012 # SQLAlchemy requires explicit comparison: `== False` or `== True` 

2013 # This has been fixed multiple times - DO NOT change it back to `not`! 

2014 query = query.filter(DocumentCollection.indexed == False) # noqa: E712 

2015 

2016 doc_links = query.all() 

2017 

2018 if not doc_links: 

2019 logger.info("No documents to index in collection") 

2020 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n" 

2021 return 

2022 

2023 total = len(doc_links) 

2024 logger.info(f"Found {total} documents to index") 

2025 results = { 

2026 "successful": 0, 

2027 "skipped": 0, 

2028 "failed": 0, 

2029 "errors": [], 

2030 } 

2031 

2032 yield f"data: {json.dumps({'type': 'start', 'message': f'Indexing {total} documents in collection: {collection.name}'})}\n\n" 

2033 

2034 for idx, (link, doc) in enumerate(doc_links, 1): 

2035 filename = doc.filename or doc.title or "Unknown" 

2036 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': filename, 'percent': int((idx / total) * 100)})}\n\n" 

2037 

2038 try: 

2039 logger.debug( 

2040 f"Indexing document {idx}/{total}: {filename}" 

2041 ) 

2042 

2043 # Run index_document in a separate thread to allow sending SSE heartbeats. 

2044 # This keeps the HTTP connection alive during long indexing operations, 

2045 # preventing timeouts from proxy servers (nginx) and browsers. 

2046 # The main thread periodically yields heartbeat comments while waiting. 

2047 result_queue = queue.Queue() 

2048 error_queue = queue.Queue() 

2049 

2050 def index_in_thread(): 

2051 try: 

2052 r = rag_service.index_document( 

2053 document_id=doc.id, 

2054 collection_id=collection_id, 

2055 force_reindex=force_reindex, 

2056 ) 

2057 result_queue.put(r) 

2058 except Exception as ex: 

2059 error_queue.put(ex) 

2060 

2061 thread = threading.Thread(target=index_in_thread) 

2062 thread.start() 

2063 

2064 # Send heartbeats while waiting for the thread to complete 

2065 heartbeat_interval = 5 # seconds 

2066 while thread.is_alive(): 

2067 thread.join(timeout=heartbeat_interval) 

2068 if thread.is_alive(): 

2069 # Send SSE comment as heartbeat (keeps connection alive) 

2070 yield f": heartbeat {idx}/{total}\n\n" 

2071 

2072 # Check for errors from thread 

2073 if not error_queue.empty(): 

2074 raise error_queue.get() 

2075 

2076 result = result_queue.get() 

2077 logger.info( 

2078 f"Indexed document {idx}/{total}: {filename} - status={result.get('status')}" 

2079 ) 

2080 

2081 if result.get("status") == "success": 

2082 results["successful"] += 1 

2083 # DocumentCollection status is already updated in index_document 

2084 # No need to update link here 

2085 elif result.get("status") == "skipped": 

2086 results["skipped"] += 1 

2087 else: 

2088 results["failed"] += 1 

2089 error_msg = result.get("error", "Unknown error") 

2090 results["errors"].append( 

2091 { 

2092 "filename": filename, 

2093 "error": error_msg, 

2094 } 

2095 ) 

2096 logger.warning( 

2097 f"Failed to index {filename} ({idx}/{total}): {error_msg}" 

2098 ) 

2099 except Exception as e: 

2100 results["failed"] += 1 

2101 error_msg = str(e) or "Failed to index document" 

2102 results["errors"].append( 

2103 { 

2104 "filename": filename, 

2105 "error": error_msg, 

2106 } 

2107 ) 

2108 logger.exception( 

2109 f"Exception indexing document {filename} ({idx}/{total})" 

2110 ) 

2111 # Send error update to client so they know indexing is continuing 

2112 yield f"data: {json.dumps({'type': 'doc_error', 'filename': filename, 'error': error_msg})}\n\n" 

2113 

2114 db_session.commit() 

2115 # Ensure all changes are written to disk 

2116 db_session.flush() 

2117 

2118 logger.info( 

2119 f"Indexing complete: {results['successful']} successful, {results['failed']} failed, {results['skipped']} skipped" 

2120 ) 

2121 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n" 

2122 logger.info("SSE generator finished successfully") 

2123 

2124 except Exception: 

2125 logger.exception("Error in collection indexing") 

2126 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n" 

2127 

2128 response = Response( 

2129 stream_with_context(generate()), mimetype="text/event-stream" 

2130 ) 

2131 # Prevent buffering for proper SSE streaming 

2132 response.headers["Cache-Control"] = "no-cache, no-transform" 

2133 response.headers["Connection"] = "keep-alive" 

2134 response.headers["X-Accel-Buffering"] = "no" 

2135 return response 

2136 

2137 

2138# ============================================================================= 

2139# Background Indexing Endpoints 

2140# ============================================================================= 

2141 

2142 

2143def _get_rag_service_for_thread( 

2144 collection_id: str, 

2145 username: str, 

2146 db_password: str, 

2147 use_defaults: bool = False, 

2148) -> LibraryRAGService: 

2149 """ 

2150 Create RAG service for use in background threads (no Flask context). 

2151 """ 

2152 from ...database.session_context import get_user_db_session 

2153 from ...web_search_engines.engines.search_engine_local import ( 

2154 LocalEmbeddingManager, 

2155 ) 

2156 import json 

2157 

2158 with get_user_db_session(username, db_password) as db_session: 

2159 settings_manager = SettingsManager(db_session) 

2160 

2161 # Get default settings 

2162 default_embedding_model = settings_manager.get_setting( 

2163 "local_search_embedding_model", "all-MiniLM-L6-v2" 

2164 ) 

2165 default_embedding_provider = settings_manager.get_setting( 

2166 "local_search_embedding_provider", "sentence_transformers" 

2167 ) 

2168 default_chunk_size = int( 

2169 settings_manager.get_setting("local_search_chunk_size", 1000) 

2170 ) 

2171 default_chunk_overlap = int( 

2172 settings_manager.get_setting("local_search_chunk_overlap", 200) 

2173 ) 

2174 default_splitter_type = settings_manager.get_setting( 

2175 "local_search_splitter_type", "recursive" 

2176 ) 

2177 default_text_separators = settings_manager.get_setting( 

2178 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]' 

2179 ) 

2180 if isinstance(default_text_separators, str): 2180 ↛ 2189line 2180 didn't jump to line 2189 because the condition on line 2180 was always true

2181 try: 

2182 default_text_separators = json.loads(default_text_separators) 

2183 except json.JSONDecodeError: 

2184 logger.warning( 

2185 f"Invalid JSON for local_search_text_separators setting: {default_text_separators!r}. " 

2186 "Using default separators." 

2187 ) 

2188 default_text_separators = ["\n\n", "\n", ". ", " ", ""] 

2189 default_distance_metric = settings_manager.get_setting( 

2190 "local_search_distance_metric", "cosine" 

2191 ) 

2192 default_normalize_vectors = settings_manager.get_bool_setting( 

2193 "local_search_normalize_vectors", True 

2194 ) 

2195 default_index_type = settings_manager.get_setting( 

2196 "local_search_index_type", "flat" 

2197 ) 

2198 

2199 # Get settings snapshot for embedding manager 

2200 settings_snapshot = settings_manager.get_settings_snapshot() 

2201 settings_snapshot["_username"] = username 

2202 

2203 # Check for collection's stored settings 

2204 collection = ( 

2205 db_session.query(Collection).filter_by(id=collection_id).first() 

2206 ) 

2207 

2208 if collection and collection.embedding_model and not use_defaults: 2208 ↛ 2238line 2208 didn't jump to line 2238 because the condition on line 2208 was always true

2209 # Use collection's stored settings 

2210 embedding_model = collection.embedding_model 

2211 embedding_provider = collection.embedding_model_type.value 

2212 chunk_size = collection.chunk_size or default_chunk_size 

2213 chunk_overlap = collection.chunk_overlap or default_chunk_overlap 

2214 splitter_type = collection.splitter_type or default_splitter_type 

2215 text_separators = ( 

2216 collection.text_separators or default_text_separators 

2217 ) 

2218 distance_metric = ( 

2219 collection.distance_metric or default_distance_metric 

2220 ) 

2221 index_type = collection.index_type or default_index_type 

2222 

2223 coll_normalize = collection.normalize_vectors 

2224 if coll_normalize is not None: 2224 ↛ 2234line 2224 didn't jump to line 2234 because the condition on line 2224 was always true

2225 if isinstance(coll_normalize, str): 2225 ↛ 2226line 2225 didn't jump to line 2226 because the condition on line 2225 was never true

2226 coll_normalize = coll_normalize.lower() in ( 

2227 "true", 

2228 "1", 

2229 "yes", 

2230 ) 

2231 else: 

2232 coll_normalize = bool(coll_normalize) 

2233 else: 

2234 coll_normalize = default_normalize_vectors 

2235 normalize_vectors = coll_normalize 

2236 else: 

2237 # Use default settings 

2238 embedding_model = default_embedding_model 

2239 embedding_provider = default_embedding_provider 

2240 chunk_size = default_chunk_size 

2241 chunk_overlap = default_chunk_overlap 

2242 splitter_type = default_splitter_type 

2243 text_separators = default_text_separators 

2244 distance_metric = default_distance_metric 

2245 normalize_vectors = default_normalize_vectors 

2246 index_type = default_index_type 

2247 

2248 # Update settings snapshot with embedding config 

2249 settings_snapshot.update( 

2250 { 

2251 "embeddings.provider": embedding_provider, 

2252 f"embeddings.{embedding_provider}.model": embedding_model, 

2253 "local_search_chunk_size": chunk_size, 

2254 "local_search_chunk_overlap": chunk_overlap, 

2255 } 

2256 ) 

2257 

2258 # Create embedding manager (to avoid database access in LibraryRAGService.__init__) 

2259 embedding_manager = LocalEmbeddingManager( 

2260 embedding_model=embedding_model, 

2261 embedding_model_type=embedding_provider, 

2262 chunk_size=chunk_size, 

2263 chunk_overlap=chunk_overlap, 

2264 settings_snapshot=settings_snapshot, 

2265 ) 

2266 embedding_manager.db_password = db_password 

2267 

2268 # Create RAG service with pre-built embedding manager and db_password 

2269 rag_service = LibraryRAGService( 

2270 username=username, 

2271 embedding_model=embedding_model, 

2272 embedding_provider=embedding_provider, 

2273 chunk_size=chunk_size, 

2274 chunk_overlap=chunk_overlap, 

2275 splitter_type=splitter_type, 

2276 text_separators=text_separators, 

2277 distance_metric=distance_metric, 

2278 normalize_vectors=normalize_vectors, 

2279 index_type=index_type, 

2280 embedding_manager=embedding_manager, 

2281 db_password=db_password, 

2282 ) 

2283 

2284 return rag_service 

2285 

2286 

2287def trigger_auto_index( 

2288 document_ids: list[str], 

2289 collection_id: str, 

2290 username: str, 

2291 db_password: str, 

2292) -> None: 

2293 """ 

2294 Trigger automatic RAG indexing for documents if auto-indexing is enabled. 

2295 

2296 This function checks the auto_index_enabled setting and spawns a background 

2297 thread to index the specified documents. It does not block the caller. 

2298 

2299 Args: 

2300 document_ids: List of document IDs to index 

2301 collection_id: The collection to index into 

2302 username: The username for database access 

2303 db_password: The user's database password for thread-safe access 

2304 """ 

2305 from ...database.session_context import get_user_db_session 

2306 

2307 if not document_ids: 

2308 logger.debug("No documents to auto-index") 

2309 return 

2310 

2311 # Check if auto-indexing is enabled 

2312 try: 

2313 with get_user_db_session(username, db_password) as db_session: 

2314 settings = SettingsManager(db_session) 

2315 auto_index_enabled = settings.get_bool_setting( 

2316 "research_library.auto_index_enabled", True 

2317 ) 

2318 

2319 if not auto_index_enabled: 

2320 logger.debug("Auto-indexing is disabled, skipping") 

2321 return 

2322 except Exception: 

2323 logger.exception( 

2324 "Failed to check auto-index setting, skipping auto-index" 

2325 ) 

2326 return 

2327 

2328 logger.info( 

2329 f"Auto-indexing {len(document_ids)} documents in collection {collection_id}" 

2330 ) 

2331 

2332 # Submit to thread pool (bounded concurrency, prevents thread proliferation) 

2333 executor = _get_auto_index_executor() 

2334 executor.submit( 

2335 _auto_index_documents_worker, 

2336 document_ids, 

2337 collection_id, 

2338 username, 

2339 db_password, 

2340 ) 

2341 

2342 

2343def _auto_index_documents_worker( 

2344 document_ids: list[str], 

2345 collection_id: str, 

2346 username: str, 

2347 db_password: str, 

2348) -> None: 

2349 """ 

2350 Background worker to index documents automatically. 

2351 

2352 This is a simpler worker than _background_index_worker - it doesn't track 

2353 progress via TaskMetadata since it's meant to be a lightweight auto-indexing 

2354 operation. 

2355 """ 

2356 

2357 try: 

2358 # Create RAG service (thread-safe, no Flask context needed) 

2359 with _get_rag_service_for_thread( 

2360 collection_id, username, db_password 

2361 ) as rag_service: 

2362 indexed_count = 0 

2363 for doc_id in document_ids: 

2364 try: 

2365 result = rag_service.index_document( 

2366 doc_id, collection_id, force_reindex=False 

2367 ) 

2368 if result.get("status") == "success": 

2369 indexed_count += 1 

2370 logger.debug(f"Auto-indexed document {doc_id}") 

2371 elif result.get("status") == "skipped": 2371 ↛ 2363line 2371 didn't jump to line 2363 because the condition on line 2371 was always true

2372 logger.debug( 

2373 f"Document {doc_id} already indexed, skipped" 

2374 ) 

2375 except Exception: 

2376 logger.exception(f"Failed to auto-index document {doc_id}") 

2377 

2378 logger.info( 

2379 f"Auto-indexing complete: {indexed_count}/{len(document_ids)} documents indexed" 

2380 ) 

2381 

2382 except Exception: 

2383 logger.exception("Auto-indexing worker failed") 

2384 

2385 

2386def _background_index_worker( 

2387 task_id: str, 

2388 collection_id: str, 

2389 username: str, 

2390 db_password: str, 

2391 force_reindex: bool, 

2392): 

2393 """ 

2394 Background worker thread for indexing documents. 

2395 Updates TaskMetadata with progress and checks for cancellation. 

2396 """ 

2397 from ...database.session_context import get_user_db_session 

2398 

2399 try: 

2400 # Create RAG service (thread-safe, no Flask context needed) 

2401 with _get_rag_service_for_thread( 

2402 collection_id, username, db_password, use_defaults=force_reindex 

2403 ) as rag_service: 

2404 with get_user_db_session(username, db_password) as db_session: 

2405 # Get collection 

2406 collection = ( 

2407 db_session.query(Collection) 

2408 .filter_by(id=collection_id) 

2409 .first() 

2410 ) 

2411 

2412 if not collection: 

2413 _update_task_status( 

2414 username, 

2415 db_password, 

2416 task_id, 

2417 status="failed", 

2418 error_message="Collection not found", 

2419 ) 

2420 return 

2421 

2422 # Store embedding metadata on first index or force reindex 

2423 if collection.embedding_model is None or force_reindex: 

2424 collection.embedding_model = rag_service.embedding_model 

2425 collection.embedding_model_type = EmbeddingProvider( 

2426 rag_service.embedding_provider 

2427 ) 

2428 collection.chunk_size = rag_service.chunk_size 

2429 collection.chunk_overlap = rag_service.chunk_overlap 

2430 collection.splitter_type = rag_service.splitter_type 

2431 collection.text_separators = rag_service.text_separators 

2432 collection.distance_metric = rag_service.distance_metric 

2433 collection.normalize_vectors = bool( 

2434 rag_service.normalize_vectors 

2435 ) 

2436 collection.index_type = rag_service.index_type 

2437 db_session.commit() 

2438 

2439 # Get documents to index 

2440 query = ( 

2441 db_session.query(DocumentCollection, Document) 

2442 .join(Document) 

2443 .filter(DocumentCollection.collection_id == collection_id) 

2444 ) 

2445 

2446 if not force_reindex: 

2447 query = query.filter(DocumentCollection.indexed == False) # noqa: E712 

2448 

2449 doc_links = query.all() 

2450 

2451 if not doc_links: 

2452 _update_task_status( 

2453 username, 

2454 db_password, 

2455 task_id, 

2456 status="completed", 

2457 progress_message="No documents to index", 

2458 ) 

2459 return 

2460 

2461 total = len(doc_links) 

2462 results = {"successful": 0, "skipped": 0, "failed": 0} 

2463 

2464 # Update task with total count 

2465 _update_task_status( 

2466 username, 

2467 db_password, 

2468 task_id, 

2469 progress_total=total, 

2470 progress_message=f"Indexing {total} documents", 

2471 ) 

2472 

2473 for idx, (link, doc) in enumerate(doc_links, 1): 

2474 # Check if cancelled 

2475 if _is_task_cancelled(username, db_password, task_id): 

2476 _update_task_status( 

2477 username, 

2478 db_password, 

2479 task_id, 

2480 status="cancelled", 

2481 progress_message=f"Cancelled after {idx - 1}/{total} documents", 

2482 ) 

2483 logger.info(f"Indexing task {task_id} was cancelled") 

2484 return 

2485 

2486 filename = doc.filename or doc.title or "Unknown" 

2487 

2488 # Update progress with filename 

2489 _update_task_status( 

2490 username, 

2491 db_password, 

2492 task_id, 

2493 progress_current=idx, 

2494 progress_message=f"Indexing {idx}/{total}: {filename}", 

2495 ) 

2496 

2497 try: 

2498 result = rag_service.index_document( 

2499 document_id=doc.id, 

2500 collection_id=collection_id, 

2501 force_reindex=force_reindex, 

2502 ) 

2503 

2504 if result.get("status") == "success": 

2505 results["successful"] += 1 

2506 elif result.get("status") == "skipped": 

2507 results["skipped"] += 1 

2508 else: 

2509 results["failed"] += 1 

2510 

2511 except Exception: 

2512 results["failed"] += 1 

2513 logger.exception( 

2514 f"Error indexing document {idx}/{total}" 

2515 ) 

2516 

2517 db_session.commit() 

2518 

2519 # Mark as completed 

2520 _update_task_status( 

2521 username, 

2522 db_password, 

2523 task_id, 

2524 status="completed", 

2525 progress_current=total, 

2526 progress_message=f"Completed: {results['successful']} indexed, {results['failed']} failed, {results['skipped']} skipped", 

2527 ) 

2528 logger.info( 

2529 f"Background indexing task {task_id} completed: {results}" 

2530 ) 

2531 

2532 except Exception as e: 

2533 logger.exception(f"Background indexing task {task_id} failed") 

2534 _update_task_status( 

2535 username, 

2536 db_password, 

2537 task_id, 

2538 status="failed", 

2539 error_message=str(e), 

2540 ) 

2541 

2542 

2543def _update_task_status( 

2544 username: str, 

2545 db_password: str, 

2546 task_id: str, 

2547 status: str = None, 

2548 progress_current: int = None, 

2549 progress_total: int = None, 

2550 progress_message: str = None, 

2551 error_message: str = None, 

2552): 

2553 """Update task metadata in the database.""" 

2554 from ...database.session_context import get_user_db_session 

2555 

2556 try: 

2557 with get_user_db_session(username, db_password) as db_session: 

2558 task = ( 

2559 db_session.query(TaskMetadata) 

2560 .filter_by(task_id=task_id) 

2561 .first() 

2562 ) 

2563 if task: 

2564 if status is not None: 

2565 task.status = status 

2566 if status == "completed": 

2567 task.completed_at = datetime.now(UTC) 

2568 if progress_current is not None: 

2569 task.progress_current = progress_current 

2570 if progress_total is not None: 

2571 task.progress_total = progress_total 

2572 if progress_message is not None: 

2573 task.progress_message = progress_message 

2574 if error_message is not None: 

2575 task.error_message = error_message 

2576 db_session.commit() 

2577 except Exception: 

2578 logger.exception(f"Failed to update task status for {task_id}") 

2579 

2580 

2581def _is_task_cancelled(username: str, db_password: str, task_id: str) -> bool: 

2582 """Check if a task has been cancelled.""" 

2583 from ...database.session_context import get_user_db_session 

2584 

2585 try: 

2586 with get_user_db_session(username, db_password) as db_session: 

2587 task = ( 

2588 db_session.query(TaskMetadata) 

2589 .filter_by(task_id=task_id) 

2590 .first() 

2591 ) 

2592 return task and task.status == "cancelled" 

2593 except Exception: 

2594 return False 

2595 

2596 

2597@rag_bp.route( 

2598 "/api/collections/<string:collection_id>/index/start", methods=["POST"] 

2599) 

2600@login_required 

2601def start_background_index(collection_id): 

2602 """Start background indexing for a collection.""" 

2603 from ...database.session_context import get_user_db_session 

2604 from ...database.session_passwords import session_password_store 

2605 

2606 username = session["username"] 

2607 session_id = session.get("session_id") 

2608 

2609 # Get password for thread access 

2610 db_password = None 

2611 if session_id: 

2612 db_password = session_password_store.get_session_password( 

2613 username, session_id 

2614 ) 

2615 

2616 # Parse request body 

2617 data = request.get_json() or {} 

2618 force_reindex = data.get("force_reindex", False) 

2619 

2620 try: 

2621 with get_user_db_session(username, db_password) as db_session: 

2622 # Check if there's already an active indexing task for this collection 

2623 existing_task = ( 

2624 db_session.query(TaskMetadata) 

2625 .filter( 

2626 TaskMetadata.task_type == "indexing", 

2627 TaskMetadata.status == "processing", 

2628 ) 

2629 .first() 

2630 ) 

2631 

2632 if existing_task: 

2633 # Check if it's for this collection 

2634 metadata = existing_task.metadata_json or {} 

2635 if metadata.get("collection_id") == collection_id: 

2636 return jsonify( 

2637 { 

2638 "success": False, 

2639 "error": "Indexing is already in progress for this collection", 

2640 "task_id": existing_task.task_id, 

2641 } 

2642 ), 409 

2643 

2644 # Create new task 

2645 task_id = str(uuid.uuid4()) 

2646 task = TaskMetadata( 

2647 task_id=task_id, 

2648 status="processing", 

2649 task_type="indexing", 

2650 created_at=datetime.now(UTC), 

2651 started_at=datetime.now(UTC), 

2652 progress_current=0, 

2653 progress_total=0, 

2654 progress_message="Starting indexing...", 

2655 metadata_json={ 

2656 "collection_id": collection_id, 

2657 "force_reindex": force_reindex, 

2658 }, 

2659 ) 

2660 db_session.add(task) 

2661 db_session.commit() 

2662 

2663 # Start background thread 

2664 thread = threading.Thread( 

2665 target=_background_index_worker, 

2666 args=(task_id, collection_id, username, db_password, force_reindex), 

2667 daemon=True, 

2668 ) 

2669 thread.start() 

2670 

2671 logger.info( 

2672 f"Started background indexing task {task_id} for collection {collection_id}" 

2673 ) 

2674 

2675 return jsonify( 

2676 { 

2677 "success": True, 

2678 "task_id": task_id, 

2679 "message": "Indexing started in background", 

2680 } 

2681 ) 

2682 

2683 except Exception: 

2684 logger.exception("Failed to start background indexing") 

2685 return jsonify( 

2686 { 

2687 "success": False, 

2688 "error": "Failed to start indexing. Please try again.", 

2689 } 

2690 ), 500 

2691 

2692 

2693@rag_bp.route( 

2694 "/api/collections/<string:collection_id>/index/status", methods=["GET"] 

2695) 

2696@limiter.exempt 

2697@login_required 

2698def get_index_status(collection_id): 

2699 """Get the current indexing status for a collection.""" 

2700 from ...database.session_context import get_user_db_session 

2701 from ...database.session_passwords import session_password_store 

2702 

2703 username = session["username"] 

2704 session_id = session.get("session_id") 

2705 

2706 db_password = None 

2707 if session_id: 

2708 db_password = session_password_store.get_session_password( 

2709 username, session_id 

2710 ) 

2711 

2712 try: 

2713 with get_user_db_session(username, db_password) as db_session: 

2714 # Find the most recent indexing task for this collection 

2715 task = ( 

2716 db_session.query(TaskMetadata) 

2717 .filter(TaskMetadata.task_type == "indexing") 

2718 .order_by(TaskMetadata.created_at.desc()) 

2719 .first() 

2720 ) 

2721 

2722 if not task: 

2723 return jsonify( 

2724 { 

2725 "status": "idle", 

2726 "message": "No indexing task found", 

2727 } 

2728 ) 

2729 

2730 # Check if it's for this collection 

2731 metadata = task.metadata_json or {} 

2732 if metadata.get("collection_id") != collection_id: 

2733 return jsonify( 

2734 { 

2735 "status": "idle", 

2736 "message": "No indexing task for this collection", 

2737 } 

2738 ) 

2739 

2740 return jsonify( 

2741 { 

2742 "task_id": task.task_id, 

2743 "status": task.status, 

2744 "progress_current": task.progress_current or 0, 

2745 "progress_total": task.progress_total or 0, 

2746 "progress_message": task.progress_message, 

2747 "error_message": task.error_message, 

2748 "created_at": task.created_at.isoformat() 

2749 if task.created_at 

2750 else None, 

2751 "completed_at": task.completed_at.isoformat() 

2752 if task.completed_at 

2753 else None, 

2754 } 

2755 ) 

2756 

2757 except Exception: 

2758 logger.exception("Failed to get index status") 

2759 return jsonify( 

2760 { 

2761 "status": "error", 

2762 "error": "Failed to get indexing status. Please try again.", 

2763 } 

2764 ), 500 

2765 

2766 

2767@rag_bp.route( 

2768 "/api/collections/<string:collection_id>/index/cancel", methods=["POST"] 

2769) 

2770@login_required 

2771def cancel_indexing(collection_id): 

2772 """Cancel an active indexing task for a collection.""" 

2773 from ...database.session_context import get_user_db_session 

2774 from ...database.session_passwords import session_password_store 

2775 

2776 username = session["username"] 

2777 session_id = session.get("session_id") 

2778 

2779 db_password = None 

2780 if session_id: 

2781 db_password = session_password_store.get_session_password( 

2782 username, session_id 

2783 ) 

2784 

2785 try: 

2786 with get_user_db_session(username, db_password) as db_session: 

2787 # Find active indexing task for this collection 

2788 task = ( 

2789 db_session.query(TaskMetadata) 

2790 .filter( 

2791 TaskMetadata.task_type == "indexing", 

2792 TaskMetadata.status == "processing", 

2793 ) 

2794 .first() 

2795 ) 

2796 

2797 if not task: 

2798 return jsonify( 

2799 { 

2800 "success": False, 

2801 "error": "No active indexing task found", 

2802 } 

2803 ), 404 

2804 

2805 # Check if it's for this collection 

2806 metadata = task.metadata_json or {} 

2807 if metadata.get("collection_id") != collection_id: 

2808 return jsonify( 

2809 { 

2810 "success": False, 

2811 "error": "No active indexing task for this collection", 

2812 } 

2813 ), 404 

2814 

2815 # Mark as cancelled - the worker thread will check this 

2816 task.status = "cancelled" 

2817 task.progress_message = "Cancellation requested..." 

2818 db_session.commit() 

2819 

2820 logger.info( 

2821 f"Cancelled indexing task {task.task_id} for collection {collection_id}" 

2822 ) 

2823 

2824 return jsonify( 

2825 { 

2826 "success": True, 

2827 "message": "Cancellation requested", 

2828 "task_id": task.task_id, 

2829 } 

2830 ) 

2831 

2832 except Exception: 

2833 logger.exception("Failed to cancel indexing") 

2834 return jsonify( 

2835 { 

2836 "success": False, 

2837 "error": "Failed to cancel indexing. Please try again.", 

2838 } 

2839 ), 500