Coverage for src / local_deep_research / research_library / routes / rag_routes.py: 15%

1017 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2RAG Management API Routes 

3 

4Provides endpoints for managing RAG indexing of library documents: 

5- Configure embedding models 

6- Index documents 

7- Get RAG statistics 

8- Bulk operations with progress tracking 

9""" 

10 

11from flask import ( 

12 Blueprint, 

13 jsonify, 

14 request, 

15 Response, 

16 render_template, 

17 session, 

18 stream_with_context, 

19) 

20from loguru import logger 

21import glob 

22import json 

23import uuid 

24import time 

25import threading 

26import queue 

27from datetime import datetime, UTC 

28from pathlib import Path 

29from typing import Optional 

30 

31from ...web.auth.decorators import login_required 

32from ...utilities.db_utils import get_settings_manager 

33from ..services.library_rag_service import LibraryRAGService 

34from ...security.path_validator import PathValidator 

35from ..utils import handle_api_error 

36from ...database.models.library import ( 

37 Document, 

38 Collection, 

39 DocumentCollection, 

40 RAGIndex, 

41 SourceType, 

42 EmbeddingProvider, 

43) 

44from ...database.models.queue import TaskMetadata 

45from ...web.utils.rate_limiter import limiter 

46from ...config.paths import get_library_directory 

47 

48rag_bp = Blueprint("rag", __name__, url_prefix="/library") 

49 

50 

51def get_rag_service(collection_id: Optional[str] = None) -> LibraryRAGService: 

52 """ 

53 Get RAG service instance with appropriate settings. 

54 

55 If collection_id is provided: 

56 - Uses collection's stored settings if they exist 

57 - Uses current defaults for new collections (and stores them) 

58 

59 If no collection_id: 

60 - Uses current default settings 

61 """ 

62 from ...database.session_context import get_user_db_session 

63 

64 settings = get_settings_manager() 

65 username = session["username"] 

66 

67 # Get current default settings 

68 default_embedding_model = settings.get_setting( 

69 "local_search_embedding_model", "all-MiniLM-L6-v2" 

70 ) 

71 default_embedding_provider = settings.get_setting( 

72 "local_search_embedding_provider", "sentence_transformers" 

73 ) 

74 default_chunk_size = int( 

75 settings.get_setting("local_search_chunk_size", 1000) 

76 ) 

77 default_chunk_overlap = int( 

78 settings.get_setting("local_search_chunk_overlap", 200) 

79 ) 

80 

81 # Get new advanced configuration settings (Issue #1054) 

82 import json 

83 

84 default_splitter_type = settings.get_setting( 

85 "local_search_splitter_type", "recursive" 

86 ) 

87 default_text_separators = settings.get_setting( 

88 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]' 

89 ) 

90 # Parse JSON string to list 

91 if isinstance(default_text_separators, str): 

92 try: 

93 default_text_separators = json.loads(default_text_separators) 

94 except json.JSONDecodeError: 

95 logger.warning( 

96 f"Invalid JSON for local_search_text_separators setting: {default_text_separators!r}. " 

97 "Using default separators." 

98 ) 

99 default_text_separators = ["\n\n", "\n", ". ", " ", ""] 

100 default_distance_metric = settings.get_setting( 

101 "local_search_distance_metric", "cosine" 

102 ) 

103 # Ensure normalize_vectors is a proper boolean (settings may return string) 

104 raw_normalize = settings.get_setting("local_search_normalize_vectors", True) 

105 if isinstance(raw_normalize, str): 

106 default_normalize_vectors = raw_normalize.lower() in ( 

107 "true", 

108 "1", 

109 "yes", 

110 ) 

111 else: 

112 default_normalize_vectors = bool(raw_normalize) 

113 default_index_type = settings.get_setting("local_search_index_type", "flat") 

114 

115 # If collection_id provided, check for stored settings 

116 if collection_id: 

117 with get_user_db_session(username) as db_session: 

118 collection = ( 

119 db_session.query(Collection).filter_by(id=collection_id).first() 

120 ) 

121 

122 if collection and collection.embedding_model: 

123 # Use collection's stored settings 

124 logger.info( 

125 f"Using stored settings for collection {collection_id}: " 

126 f"{collection.embedding_model_type.value}/{collection.embedding_model}" 

127 ) 

128 # Handle normalize_vectors - may be stored as string in some cases 

129 coll_normalize = collection.normalize_vectors 

130 if coll_normalize is not None: 

131 if isinstance(coll_normalize, str): 

132 coll_normalize = coll_normalize.lower() in ( 

133 "true", 

134 "1", 

135 "yes", 

136 ) 

137 else: 

138 coll_normalize = bool(coll_normalize) 

139 else: 

140 coll_normalize = default_normalize_vectors 

141 

142 return LibraryRAGService( 

143 username=username, 

144 embedding_model=collection.embedding_model, 

145 embedding_provider=collection.embedding_model_type.value, 

146 chunk_size=collection.chunk_size or default_chunk_size, 

147 chunk_overlap=collection.chunk_overlap 

148 or default_chunk_overlap, 

149 splitter_type=collection.splitter_type 

150 or default_splitter_type, 

151 text_separators=collection.text_separators 

152 or default_text_separators, 

153 distance_metric=collection.distance_metric 

154 or default_distance_metric, 

155 normalize_vectors=coll_normalize, 

156 index_type=collection.index_type or default_index_type, 

157 ) 

158 elif collection: 

159 # New collection - use defaults and store them 

160 logger.info( 

161 f"New collection {collection_id}, using and storing default settings" 

162 ) 

163 

164 # Create service with defaults 

165 service = LibraryRAGService( 

166 username=username, 

167 embedding_model=default_embedding_model, 

168 embedding_provider=default_embedding_provider, 

169 chunk_size=default_chunk_size, 

170 chunk_overlap=default_chunk_overlap, 

171 splitter_type=default_splitter_type, 

172 text_separators=default_text_separators, 

173 distance_metric=default_distance_metric, 

174 normalize_vectors=default_normalize_vectors, 

175 index_type=default_index_type, 

176 ) 

177 

178 # Store settings on collection (will be done during indexing) 

179 # Note: We don't store here because we don't have embedding_dimension yet 

180 # It will be stored in index_collection when first document is indexed 

181 

182 return service 

183 

184 # No collection or fallback - use current defaults 

185 return LibraryRAGService( 

186 username=username, 

187 embedding_model=default_embedding_model, 

188 embedding_provider=default_embedding_provider, 

189 chunk_size=default_chunk_size, 

190 chunk_overlap=default_chunk_overlap, 

191 splitter_type=default_splitter_type, 

192 text_separators=default_text_separators, 

193 distance_metric=default_distance_metric, 

194 normalize_vectors=default_normalize_vectors, 

195 index_type=default_index_type, 

196 ) 

197 

198 

199# Page Routes 

200 

201 

202@rag_bp.route("/embedding-settings") 

203@login_required 

204def embedding_settings_page(): 

205 """Render the Embedding Settings page.""" 

206 return render_template( 

207 "pages/embedding_settings.html", active_page="embedding-settings" 

208 ) 

209 

210 

211@rag_bp.route("/document/<string:document_id>/chunks") 

212@login_required 

213def view_document_chunks(document_id): 

214 """View all chunks for a document across all collections.""" 

215 from ...database.session_context import get_user_db_session 

216 from ...database.models.library import DocumentChunk 

217 

218 username = session.get("username") 

219 

220 with get_user_db_session(username) as db_session: 

221 # Get document info 

222 document = db_session.query(Document).filter_by(id=document_id).first() 

223 

224 if not document: 

225 return "Document not found", 404 

226 

227 # Get all chunks for this document 

228 chunks = ( 

229 db_session.query(DocumentChunk) 

230 .filter(DocumentChunk.source_id == document_id) 

231 .order_by(DocumentChunk.collection_name, DocumentChunk.chunk_index) 

232 .all() 

233 ) 

234 

235 # Group chunks by collection 

236 chunks_by_collection = {} 

237 for chunk in chunks: 

238 coll_name = chunk.collection_name 

239 if coll_name not in chunks_by_collection: 

240 # Get collection display name 

241 collection_id = coll_name.replace("collection_", "") 

242 collection = ( 

243 db_session.query(Collection) 

244 .filter_by(id=collection_id) 

245 .first() 

246 ) 

247 chunks_by_collection[coll_name] = { 

248 "name": collection.name if collection else coll_name, 

249 "id": collection_id, 

250 "chunks": [], 

251 } 

252 

253 chunks_by_collection[coll_name]["chunks"].append( 

254 { 

255 "id": chunk.id, 

256 "index": chunk.chunk_index, 

257 "text": chunk.chunk_text, 

258 "word_count": chunk.word_count, 

259 "start_char": chunk.start_char, 

260 "end_char": chunk.end_char, 

261 "embedding_model": chunk.embedding_model, 

262 "embedding_model_type": chunk.embedding_model_type.value 

263 if chunk.embedding_model_type 

264 else None, 

265 "embedding_dimension": chunk.embedding_dimension, 

266 "created_at": chunk.created_at, 

267 } 

268 ) 

269 

270 return render_template( 

271 "pages/document_chunks.html", 

272 document=document, 

273 chunks_by_collection=chunks_by_collection, 

274 total_chunks=len(chunks), 

275 ) 

276 

277 

278@rag_bp.route("/collections") 

279@login_required 

280def collections_page(): 

281 """Render the Collections page.""" 

282 return render_template("pages/collections.html", active_page="collections") 

283 

284 

285@rag_bp.route("/collections/<string:collection_id>") 

286@login_required 

287def collection_details_page(collection_id): 

288 """Render the Collection Details page.""" 

289 return render_template( 

290 "pages/collection_details.html", 

291 active_page="collections", 

292 collection_id=collection_id, 

293 ) 

294 

295 

296@rag_bp.route("/collections/<string:collection_id>/upload") 

297@login_required 

298def collection_upload_page(collection_id): 

299 """Render the Collection Upload page.""" 

300 # Get the upload PDF storage setting 

301 settings = get_settings_manager() 

302 upload_pdf_storage = settings.get_setting( 

303 "research_library.upload_pdf_storage", "none" 

304 ) 

305 # Only allow valid values for uploads (no filesystem) 

306 if upload_pdf_storage not in ("database", "none"): 

307 upload_pdf_storage = "none" 

308 

309 return render_template( 

310 "pages/collection_upload.html", 

311 active_page="collections", 

312 collection_id=collection_id, 

313 collection_name=None, # Could fetch from DB if needed 

314 upload_pdf_storage=upload_pdf_storage, 

315 ) 

316 

317 

318@rag_bp.route("/collections/create") 

319@login_required 

320def collection_create_page(): 

321 """Render the Create Collection page.""" 

322 return render_template( 

323 "pages/collection_create.html", active_page="collections" 

324 ) 

325 

326 

327# API Routes 

328 

329 

330@rag_bp.route("/api/rag/settings", methods=["GET"]) 

331@login_required 

332def get_current_settings(): 

333 """Get current RAG configuration from settings.""" 

334 import json as json_lib 

335 

336 try: 

337 settings = get_settings_manager() 

338 

339 # Get text separators and parse if needed 

340 text_separators = settings.get_setting( 

341 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]' 

342 ) 

343 if isinstance(text_separators, str): 

344 try: 

345 text_separators = json_lib.loads(text_separators) 

346 except json_lib.JSONDecodeError: 

347 logger.warning( 

348 f"Invalid JSON for local_search_text_separators setting: {text_separators!r}. " 

349 "Using default separators." 

350 ) 

351 text_separators = ["\n\n", "\n", ". ", " ", ""] 

352 

353 normalize_vectors = settings.get_setting( 

354 "local_search_normalize_vectors", True 

355 ) 

356 

357 return jsonify( 

358 { 

359 "success": True, 

360 "settings": { 

361 "embedding_provider": settings.get_setting( 

362 "local_search_embedding_provider", 

363 "sentence_transformers", 

364 ), 

365 "embedding_model": settings.get_setting( 

366 "local_search_embedding_model", "all-MiniLM-L6-v2" 

367 ), 

368 "chunk_size": settings.get_setting( 

369 "local_search_chunk_size", 1000 

370 ), 

371 "chunk_overlap": settings.get_setting( 

372 "local_search_chunk_overlap", 200 

373 ), 

374 "splitter_type": settings.get_setting( 

375 "local_search_splitter_type", "recursive" 

376 ), 

377 "text_separators": text_separators, 

378 "distance_metric": settings.get_setting( 

379 "local_search_distance_metric", "cosine" 

380 ), 

381 "normalize_vectors": normalize_vectors, 

382 "index_type": settings.get_setting( 

383 "local_search_index_type", "flat" 

384 ), 

385 }, 

386 } 

387 ) 

388 except Exception as e: 

389 return handle_api_error("getting RAG settings", e) 

390 

391 

392@rag_bp.route("/api/rag/test-embedding", methods=["POST"]) 

393@login_required 

394def test_embedding(): 

395 """Test an embedding configuration by generating a test embedding.""" 

396 

397 try: 

398 data = request.json 

399 provider = data.get("provider") 

400 model = data.get("model") 

401 test_text = data.get("test_text", "This is a test.") 

402 

403 if not provider or not model: 

404 return jsonify( 

405 {"success": False, "error": "Provider and model are required"} 

406 ), 400 

407 

408 # Import embedding functions 

409 from ...embeddings.embeddings_config import ( 

410 get_embedding_function, 

411 ) 

412 

413 logger.info( 

414 f"Testing embedding with provider={provider}, model={model}" 

415 ) 

416 

417 # Get embedding function with the specified configuration 

418 start_time = time.time() 

419 embedding_func = get_embedding_function( 

420 provider=provider, 

421 model_name=model, 

422 ) 

423 

424 # Generate test embedding 

425 embedding = embedding_func([test_text])[0] 

426 response_time_ms = int((time.time() - start_time) * 1000) 

427 

428 # Get embedding dimension 

429 dimension = len(embedding) if hasattr(embedding, "__len__") else None 

430 

431 return jsonify( 

432 { 

433 "success": True, 

434 "dimension": dimension, 

435 "response_time_ms": response_time_ms, 

436 "provider": provider, 

437 "model": model, 

438 } 

439 ) 

440 

441 except Exception as e: 

442 return handle_api_error("testing embedding", e) 

443 

444 

445@rag_bp.route("/api/rag/models", methods=["GET"]) 

446@login_required 

447def get_available_models(): 

448 """Get list of available embedding providers and models.""" 

449 try: 

450 from ...embeddings.embeddings_config import _get_provider_classes 

451 

452 # Get current settings for providers 

453 settings = get_settings_manager() 

454 settings_snapshot = ( 

455 settings.get_all_settings() 

456 if hasattr(settings, "get_all_settings") 

457 else {} 

458 ) 

459 

460 # Get provider classes 

461 provider_classes = _get_provider_classes() 

462 

463 # Provider display names 

464 provider_labels = { 

465 "sentence_transformers": "Sentence Transformers (Local)", 

466 "ollama": "Ollama (Local)", 

467 "openai": "OpenAI API", 

468 } 

469 

470 # Get provider options and models by looping through providers 

471 provider_options = [] 

472 providers = {} 

473 

474 for provider_key, provider_class in provider_classes.items(): 

475 if provider_class.is_available(settings_snapshot): 

476 # Add provider option 

477 provider_options.append( 

478 { 

479 "value": provider_key, 

480 "label": provider_labels.get( 

481 provider_key, provider_key 

482 ), 

483 } 

484 ) 

485 

486 # Get models for this provider 

487 models = provider_class.get_available_models(settings_snapshot) 

488 providers[provider_key] = [ 

489 { 

490 "value": m["value"], 

491 "label": m["label"], 

492 "provider": provider_key, 

493 } 

494 for m in models 

495 ] 

496 

497 return jsonify( 

498 { 

499 "success": True, 

500 "provider_options": provider_options, 

501 "providers": providers, 

502 } 

503 ) 

504 except Exception as e: 

505 return handle_api_error("getting available models", e) 

506 

507 

508@rag_bp.route("/api/rag/info", methods=["GET"]) 

509@login_required 

510def get_index_info(): 

511 """Get information about the current RAG index.""" 

512 from ...database.library_init import get_default_library_id 

513 

514 try: 

515 # Get collection_id from request or use default Library collection 

516 collection_id = request.args.get("collection_id") 

517 if not collection_id: 

518 collection_id = get_default_library_id(session["username"]) 

519 

520 logger.info( 

521 f"Getting RAG index info for collection_id: {collection_id}" 

522 ) 

523 

524 rag_service = get_rag_service(collection_id) 

525 info = rag_service.get_current_index_info(collection_id) 

526 

527 if info is None: 

528 logger.info( 

529 f"No RAG index found for collection_id: {collection_id}" 

530 ) 

531 return jsonify( 

532 {"success": True, "info": None, "message": "No index found"} 

533 ) 

534 

535 logger.info(f"Found RAG index for collection_id: {collection_id}") 

536 return jsonify({"success": True, "info": info}) 

537 except Exception as e: 

538 return handle_api_error("getting index info", e) 

539 

540 

541@rag_bp.route("/api/rag/stats", methods=["GET"]) 

542@login_required 

543def get_rag_stats(): 

544 """Get RAG statistics for a collection.""" 

545 from ...database.library_init import get_default_library_id 

546 

547 try: 

548 # Get collection_id from request or use default Library collection 

549 collection_id = request.args.get("collection_id") 

550 if not collection_id: 

551 collection_id = get_default_library_id(session["username"]) 

552 

553 rag_service = get_rag_service(collection_id) 

554 stats = rag_service.get_rag_stats(collection_id) 

555 

556 return jsonify({"success": True, "stats": stats}) 

557 except Exception as e: 

558 return handle_api_error("getting RAG stats", e) 

559 

560 

561@rag_bp.route("/api/rag/index-document", methods=["POST"]) 

562@login_required 

563def index_document(): 

564 """Index a single document in a collection.""" 

565 from ...database.library_init import get_default_library_id 

566 

567 try: 

568 data = request.get_json() 

569 text_doc_id = data.get("text_doc_id") 

570 force_reindex = data.get("force_reindex", False) 

571 collection_id = data.get("collection_id") 

572 

573 if not text_doc_id: 

574 return jsonify( 

575 {"success": False, "error": "text_doc_id is required"} 

576 ), 400 

577 

578 # Get collection_id from request or use default Library collection 

579 if not collection_id: 

580 collection_id = get_default_library_id(session["username"]) 

581 

582 rag_service = get_rag_service(collection_id) 

583 result = rag_service.index_document( 

584 text_doc_id, collection_id, force_reindex 

585 ) 

586 

587 if result["status"] == "error": 

588 return jsonify( 

589 {"success": False, "error": result.get("error")} 

590 ), 400 

591 

592 return jsonify({"success": True, "result": result}) 

593 except Exception as e: 

594 return handle_api_error(f"indexing document {text_doc_id}", e) 

595 

596 

597@rag_bp.route("/api/rag/remove-document", methods=["POST"]) 

598@login_required 

599def remove_document(): 

600 """Remove a document from RAG in a collection.""" 

601 from ...database.library_init import get_default_library_id 

602 

603 try: 

604 data = request.get_json() 

605 text_doc_id = data.get("text_doc_id") 

606 collection_id = data.get("collection_id") 

607 

608 if not text_doc_id: 

609 return jsonify( 

610 {"success": False, "error": "text_doc_id is required"} 

611 ), 400 

612 

613 # Get collection_id from request or use default Library collection 

614 if not collection_id: 

615 collection_id = get_default_library_id(session["username"]) 

616 

617 rag_service = get_rag_service(collection_id) 

618 result = rag_service.remove_document_from_rag( 

619 text_doc_id, collection_id 

620 ) 

621 

622 if result["status"] == "error": 

623 return jsonify( 

624 {"success": False, "error": result.get("error")} 

625 ), 400 

626 

627 return jsonify({"success": True, "result": result}) 

628 except Exception as e: 

629 return handle_api_error(f"removing document {text_doc_id}", e) 

630 

631 

632@rag_bp.route("/api/rag/index-research", methods=["POST"]) 

633@login_required 

634def index_research(): 

635 """Index all documents from a research.""" 

636 try: 

637 data = request.get_json() 

638 research_id = data.get("research_id") 

639 force_reindex = data.get("force_reindex", False) 

640 

641 if not research_id: 

642 return jsonify( 

643 {"success": False, "error": "research_id is required"} 

644 ), 400 

645 

646 rag_service = get_rag_service() 

647 results = rag_service.index_research_documents( 

648 research_id, force_reindex 

649 ) 

650 

651 return jsonify({"success": True, "results": results}) 

652 except Exception as e: 

653 return handle_api_error(f"indexing research {research_id}", e) 

654 

655 

656@rag_bp.route("/api/rag/index-all", methods=["GET"]) 

657@login_required 

658def index_all(): 

659 """Index all documents in a collection with Server-Sent Events progress.""" 

660 from ...database.session_context import get_user_db_session 

661 from ...database.library_init import get_default_library_id 

662 

663 force_reindex = request.args.get("force_reindex", "false").lower() == "true" 

664 username = session["username"] 

665 

666 # Get collection_id from request or use default Library collection 

667 collection_id = request.args.get("collection_id") 

668 if not collection_id: 

669 collection_id = get_default_library_id(username) 

670 

671 logger.info( 

672 f"Starting index-all for collection_id: {collection_id}, force_reindex: {force_reindex}" 

673 ) 

674 

675 # Create RAG service in request context before generator runs 

676 rag_service = get_rag_service(collection_id) 

677 

678 def generate(): 

679 """Generator function for SSE progress updates.""" 

680 try: 

681 # Send initial status 

682 yield f"data: {json.dumps({'type': 'start', 'message': 'Starting bulk indexing...'})}\n\n" 

683 

684 # Get document IDs to index from DocumentCollection 

685 with get_user_db_session(username) as db_session: 

686 # Query Document joined with DocumentCollection for the collection 

687 query = ( 

688 db_session.query(Document.id, Document.title) 

689 .join( 

690 DocumentCollection, 

691 Document.id == DocumentCollection.document_id, 

692 ) 

693 .filter(DocumentCollection.collection_id == collection_id) 

694 ) 

695 

696 if not force_reindex: 

697 # Only index documents that haven't been indexed yet 

698 query = query.filter(DocumentCollection.indexed.is_(False)) 

699 

700 doc_info = [(doc_id, title) for doc_id, title in query.all()] 

701 

702 if not doc_info: 

703 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n" 

704 return 

705 

706 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []} 

707 total = len(doc_info) 

708 

709 # Process documents in batches to optimize performance 

710 # Get batch size from settings 

711 settings = get_settings_manager() 

712 batch_size = int( 

713 settings.get_setting("rag.indexing_batch_size", 15) 

714 ) 

715 processed = 0 

716 

717 for i in range(0, len(doc_info), batch_size): 

718 batch = doc_info[i : i + batch_size] 

719 

720 # Process batch with collection_id 

721 batch_results = rag_service.index_documents_batch( 

722 batch, collection_id, force_reindex 

723 ) 

724 

725 # Process results and send progress updates 

726 for j, (doc_id, title) in enumerate(batch): 

727 processed += 1 

728 result = batch_results[doc_id] 

729 

730 # Send progress update 

731 yield f"data: {json.dumps({'type': 'progress', 'current': processed, 'total': total, 'title': title, 'percent': int((processed / total) * 100)})}\n\n" 

732 

733 if result["status"] == "success": 

734 results["successful"] += 1 

735 elif result["status"] == "skipped": 

736 results["skipped"] += 1 

737 else: 

738 results["failed"] += 1 

739 results["errors"].append( 

740 { 

741 "doc_id": doc_id, 

742 "title": title, 

743 "error": result.get("error"), 

744 } 

745 ) 

746 

747 # Send completion status 

748 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n" 

749 

750 # Log final status for debugging 

751 logger.info( 

752 f"Bulk indexing complete: {results['successful']} successful, {results['skipped']} skipped, {results['failed']} failed" 

753 ) 

754 

755 except Exception: 

756 logger.exception("Error in bulk indexing") 

757 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n" 

758 

759 return Response( 

760 stream_with_context(generate()), mimetype="text/event-stream" 

761 ) 

762 

763 

764@rag_bp.route("/api/rag/configure", methods=["POST"]) 

765@login_required 

766def configure_rag(): 

767 """ 

768 Change RAG configuration (embedding model, chunk size, etc.). 

769 This will create a new index with the new configuration. 

770 """ 

771 import json as json_lib 

772 

773 try: 

774 data = request.get_json() 

775 embedding_model = data.get("embedding_model") 

776 embedding_provider = data.get("embedding_provider") 

777 chunk_size = data.get("chunk_size") 

778 chunk_overlap = data.get("chunk_overlap") 

779 collection_id = data.get("collection_id") 

780 

781 # Get new advanced settings (with defaults) 

782 splitter_type = data.get("splitter_type", "recursive") 

783 text_separators = data.get( 

784 "text_separators", ["\n\n", "\n", ". ", " ", ""] 

785 ) 

786 distance_metric = data.get("distance_metric", "cosine") 

787 normalize_vectors = data.get("normalize_vectors", True) 

788 index_type = data.get("index_type", "flat") 

789 

790 if not all( 

791 [ 

792 embedding_model, 

793 embedding_provider, 

794 chunk_size, 

795 chunk_overlap, 

796 ] 

797 ): 

798 return jsonify( 

799 { 

800 "success": False, 

801 "error": "All configuration parameters are required (embedding_model, embedding_provider, chunk_size, chunk_overlap)", 

802 } 

803 ), 400 

804 

805 # Save settings to database 

806 settings = get_settings_manager() 

807 settings.set_setting("local_search_embedding_model", embedding_model) 

808 settings.set_setting( 

809 "local_search_embedding_provider", embedding_provider 

810 ) 

811 settings.set_setting("local_search_chunk_size", int(chunk_size)) 

812 settings.set_setting("local_search_chunk_overlap", int(chunk_overlap)) 

813 

814 # Save new advanced settings 

815 settings.set_setting("local_search_splitter_type", splitter_type) 

816 # Convert list to JSON string for storage 

817 if isinstance(text_separators, list): 

818 text_separators_str = json_lib.dumps(text_separators) 

819 else: 

820 text_separators_str = text_separators 

821 settings.set_setting( 

822 "local_search_text_separators", text_separators_str 

823 ) 

824 settings.set_setting("local_search_distance_metric", distance_metric) 

825 settings.set_setting( 

826 "local_search_normalize_vectors", bool(normalize_vectors) 

827 ) 

828 settings.set_setting("local_search_index_type", index_type) 

829 

830 # If collection_id is provided, update that collection's configuration 

831 if collection_id: 

832 # Create new RAG service with new configuration 

833 new_rag_service = LibraryRAGService( 

834 username=session["username"], 

835 embedding_model=embedding_model, 

836 embedding_provider=embedding_provider, 

837 chunk_size=int(chunk_size), 

838 chunk_overlap=int(chunk_overlap), 

839 splitter_type=splitter_type, 

840 text_separators=text_separators 

841 if isinstance(text_separators, list) 

842 else json_lib.loads(text_separators), 

843 distance_metric=distance_metric, 

844 normalize_vectors=normalize_vectors, 

845 index_type=index_type, 

846 ) 

847 

848 # Get or create new index with this configuration 

849 rag_index = new_rag_service._get_or_create_rag_index(collection_id) 

850 

851 return jsonify( 

852 { 

853 "success": True, 

854 "message": "Configuration updated for collection. You can now index documents with the new settings.", 

855 "index_hash": rag_index.index_hash, 

856 } 

857 ) 

858 else: 

859 # Just saving default settings without updating a specific collection 

860 return jsonify( 

861 { 

862 "success": True, 

863 "message": "Default embedding settings saved successfully. New collections will use these settings.", 

864 } 

865 ) 

866 

867 except Exception as e: 

868 return handle_api_error("configuring RAG", e) 

869 

870 

871@rag_bp.route("/api/rag/documents", methods=["GET"]) 

872@login_required 

873def get_documents(): 

874 """Get library documents with their RAG status for the default Library collection (paginated).""" 

875 from ...database.session_context import get_user_db_session 

876 from ...database.library_init import get_default_library_id 

877 

878 try: 

879 # Get pagination parameters 

880 page = request.args.get("page", 1, type=int) 

881 per_page = request.args.get("per_page", 50, type=int) 

882 filter_type = request.args.get( 

883 "filter", "all" 

884 ) # all, indexed, unindexed 

885 

886 # Validate pagination parameters 

887 page = max(1, page) 

888 per_page = min(max(10, per_page), 100) # Limit between 10-100 

889 

890 # Close current thread's session to force fresh connection 

891 from ...database.thread_local_session import cleanup_current_thread 

892 

893 cleanup_current_thread() 

894 

895 username = session["username"] 

896 

897 # Get collection_id from request or use default Library collection 

898 collection_id = request.args.get("collection_id") 

899 if not collection_id: 

900 collection_id = get_default_library_id(username) 

901 

902 logger.info( 

903 f"Getting documents for collection_id: {collection_id}, filter: {filter_type}, page: {page}" 

904 ) 

905 

906 with get_user_db_session(username) as db_session: 

907 # Expire all cached objects to ensure we get fresh data from DB 

908 db_session.expire_all() 

909 

910 # Import RagDocumentStatus model 

911 from ...database.models.library import RagDocumentStatus 

912 

913 # Build base query - join Document with DocumentCollection for the collection 

914 # LEFT JOIN with rag_document_status to check indexed status 

915 query = ( 

916 db_session.query( 

917 Document, DocumentCollection, RagDocumentStatus 

918 ) 

919 .join( 

920 DocumentCollection, 

921 (DocumentCollection.document_id == Document.id) 

922 & (DocumentCollection.collection_id == collection_id), 

923 ) 

924 .outerjoin( 

925 RagDocumentStatus, 

926 (RagDocumentStatus.document_id == Document.id) 

927 & (RagDocumentStatus.collection_id == collection_id), 

928 ) 

929 ) 

930 

931 logger.debug(f"Base query for collection {collection_id}: {query}") 

932 

933 # Apply filters based on rag_document_status existence 

934 if filter_type == "indexed": 

935 query = query.filter(RagDocumentStatus.document_id.isnot(None)) 

936 elif filter_type == "unindexed": 

937 # Documents in collection but not indexed yet 

938 query = query.filter(RagDocumentStatus.document_id.is_(None)) 

939 

940 # Get total count before pagination 

941 total_count = query.count() 

942 logger.info( 

943 f"Found {total_count} total documents for collection {collection_id} with filter {filter_type}" 

944 ) 

945 

946 # Apply pagination 

947 results = ( 

948 query.order_by(Document.created_at.desc()) 

949 .limit(per_page) 

950 .offset((page - 1) * per_page) 

951 .all() 

952 ) 

953 

954 documents = [ 

955 { 

956 "id": doc.id, 

957 "title": doc.title, 

958 "original_url": doc.original_url, 

959 "rag_indexed": rag_status is not None, 

960 "chunk_count": rag_status.chunk_count if rag_status else 0, 

961 "created_at": doc.created_at.isoformat() 

962 if doc.created_at 

963 else None, 

964 } 

965 for doc, doc_collection, rag_status in results 

966 ] 

967 

968 # Debug logging to help diagnose indexing status issues 

969 indexed_count = sum(1 for d in documents if d["rag_indexed"]) 

970 

971 # Additional debug: check rag_document_status for this collection 

972 all_indexed_statuses = ( 

973 db_session.query(RagDocumentStatus) 

974 .filter_by(collection_id=collection_id) 

975 .all() 

976 ) 

977 logger.info( 

978 f"rag_document_status table shows: {len(all_indexed_statuses)} documents indexed for collection {collection_id}" 

979 ) 

980 

981 logger.info( 

982 f"Returning {len(documents)} documents on page {page}: " 

983 f"{indexed_count} indexed, {len(documents) - indexed_count} not indexed" 

984 ) 

985 

986 return jsonify( 

987 { 

988 "success": True, 

989 "documents": documents, 

990 "pagination": { 

991 "page": page, 

992 "per_page": per_page, 

993 "total": total_count, 

994 "pages": (total_count + per_page - 1) // per_page, 

995 }, 

996 } 

997 ) 

998 except Exception as e: 

999 return handle_api_error("getting documents", e) 

1000 

1001 

1002@rag_bp.route("/api/rag/index-local", methods=["GET"]) 

1003@login_required 

1004def index_local_library(): 

1005 """Index documents from a local folder with Server-Sent Events progress.""" 

1006 folder_path = request.args.get("path") 

1007 file_patterns = request.args.get( 

1008 "patterns", "*.pdf,*.txt,*.md,*.html" 

1009 ).split(",") 

1010 recursive = request.args.get("recursive", "true").lower() == "true" 

1011 

1012 if not folder_path: 

1013 return jsonify({"success": False, "error": "Path is required"}), 400 

1014 

1015 # Validate and sanitize the path to prevent traversal attacks 

1016 try: 

1017 validated_path = PathValidator.validate_local_filesystem_path( 

1018 folder_path 

1019 ) 

1020 # Re-sanitize for static analyzer recognition (CodeQL) 

1021 path = PathValidator.sanitize_for_filesystem_ops(validated_path) 

1022 except ValueError as e: 

1023 logger.warning(f"Path validation failed for '{folder_path}': {e}") 

1024 return jsonify({"success": False, "error": "Invalid path"}), 400 

1025 

1026 # Check path exists and is a directory 

1027 if not path.exists(): 

1028 return jsonify({"success": False, "error": "Path does not exist"}), 400 

1029 if not path.is_dir(): 

1030 return jsonify( 

1031 {"success": False, "error": "Path is not a directory"} 

1032 ), 400 

1033 

1034 # Create RAG service in request context 

1035 rag_service = get_rag_service() 

1036 

1037 def generate(): 

1038 """Generator function for SSE progress updates.""" 

1039 try: 

1040 # Send initial status 

1041 yield f"data: {json.dumps({'type': 'start', 'message': f'Scanning folder: {path}'})}\n\n" 

1042 

1043 # Find all matching files 

1044 files_to_index = [] 

1045 for pattern in file_patterns: 

1046 pattern = pattern.strip() 

1047 if recursive: 

1048 search_pattern = str(path / "**" / pattern) 

1049 else: 

1050 search_pattern = str(path / pattern) 

1051 

1052 matching_files = glob.glob(search_pattern, recursive=recursive) 

1053 files_to_index.extend(matching_files) 

1054 

1055 # Remove duplicates and sort 

1056 files_to_index = sorted(set(files_to_index)) 

1057 

1058 if not files_to_index: 

1059 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No matching files found'}})}\n\n" 

1060 return 

1061 

1062 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []} 

1063 total = len(files_to_index) 

1064 

1065 # Index each file 

1066 for idx, file_path in enumerate(files_to_index, 1): 

1067 file_name = Path(file_path).name 

1068 

1069 # Send progress update 

1070 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': file_name, 'percent': int((idx / total) * 100)})}\n\n" 

1071 

1072 try: 

1073 # Index the file directly using RAG service 

1074 result = rag_service.index_local_file(file_path) 

1075 

1076 if result.get("status") == "success": 

1077 results["successful"] += 1 

1078 elif result.get("status") == "skipped": 

1079 results["skipped"] += 1 

1080 else: 

1081 results["failed"] += 1 

1082 results["errors"].append( 

1083 { 

1084 "file": file_name, 

1085 "error": result.get("error", "Unknown error"), 

1086 } 

1087 ) 

1088 except Exception: 

1089 results["failed"] += 1 

1090 results["errors"].append( 

1091 {"file": file_name, "error": "Failed to index file"} 

1092 ) 

1093 logger.exception(f"Error indexing file {file_path}") 

1094 

1095 # Send completion status 

1096 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n" 

1097 

1098 logger.info( 

1099 f"Local library indexing complete for {path}: " 

1100 f"{results['successful']} successful, " 

1101 f"{results['skipped']} skipped, " 

1102 f"{results['failed']} failed" 

1103 ) 

1104 

1105 except Exception: 

1106 logger.exception("Error in local library indexing") 

1107 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n" 

1108 

1109 return Response( 

1110 stream_with_context(generate()), mimetype="text/event-stream" 

1111 ) 

1112 

1113 

1114# Collection Management Routes 

1115 

1116 

1117@rag_bp.route("/api/collections", methods=["GET"]) 

1118@login_required 

1119def get_collections(): 

1120 """Get all document collections for the current user.""" 

1121 from ...database.session_context import get_user_db_session 

1122 

1123 try: 

1124 username = session["username"] 

1125 with get_user_db_session(username) as db_session: 

1126 # No need to filter by username - each user has their own database 

1127 collections = db_session.query(Collection).all() 

1128 

1129 result = [] 

1130 for coll in collections: 

1131 collection_data = { 

1132 "id": coll.id, 

1133 "name": coll.name, 

1134 "description": coll.description, 

1135 "created_at": coll.created_at.isoformat() 

1136 if coll.created_at 

1137 else None, 

1138 "collection_type": coll.collection_type, 

1139 "is_default": coll.is_default 

1140 if hasattr(coll, "is_default") 

1141 else False, 

1142 "document_count": len(coll.document_links) 

1143 if hasattr(coll, "document_links") 

1144 else 0, 

1145 "folder_count": len(coll.linked_folders) 

1146 if hasattr(coll, "linked_folders") 

1147 else 0, 

1148 } 

1149 

1150 # Include embedding metadata if available 

1151 if coll.embedding_model: 1151 ↛ 1152line 1151 didn't jump to line 1152 because the condition on line 1151 was never true

1152 collection_data["embedding"] = { 

1153 "model": coll.embedding_model, 

1154 "provider": coll.embedding_model_type.value 

1155 if coll.embedding_model_type 

1156 else None, 

1157 "dimension": coll.embedding_dimension, 

1158 "chunk_size": coll.chunk_size, 

1159 "chunk_overlap": coll.chunk_overlap, 

1160 } 

1161 else: 

1162 collection_data["embedding"] = None 

1163 

1164 result.append(collection_data) 

1165 

1166 return jsonify({"success": True, "collections": result}) 

1167 except Exception as e: 

1168 return handle_api_error("getting collections", e) 

1169 

1170 

1171@rag_bp.route("/api/collections", methods=["POST"]) 

1172@login_required 

1173def create_collection(): 

1174 """Create a new document collection.""" 

1175 from ...database.session_context import get_user_db_session 

1176 

1177 try: 

1178 data = request.get_json() 

1179 name = data.get("name", "").strip() 

1180 description = data.get("description", "").strip() 

1181 collection_type = data.get("type", "user_uploads") 

1182 

1183 if not name: 

1184 return jsonify({"success": False, "error": "Name is required"}), 400 

1185 

1186 username = session["username"] 

1187 with get_user_db_session(username) as db_session: 

1188 # Check if collection with this name already exists in this user's database 

1189 existing = db_session.query(Collection).filter_by(name=name).first() 

1190 

1191 if existing: 

1192 return jsonify( 

1193 { 

1194 "success": False, 

1195 "error": f"Collection '{name}' already exists", 

1196 } 

1197 ), 400 

1198 

1199 # Create new collection (no username needed - each user has their own DB) 

1200 # Note: created_at uses default=utcnow() in the model, so we don't need to set it manually 

1201 collection = Collection( 

1202 id=str(uuid.uuid4()), # Generate UUID for collection 

1203 name=name, 

1204 description=description, 

1205 collection_type=collection_type, 

1206 ) 

1207 

1208 db_session.add(collection) 

1209 db_session.commit() 

1210 

1211 return jsonify( 

1212 { 

1213 "success": True, 

1214 "collection": { 

1215 "id": collection.id, 

1216 "name": collection.name, 

1217 "description": collection.description, 

1218 "created_at": collection.created_at.isoformat(), 

1219 "collection_type": collection.collection_type, 

1220 }, 

1221 } 

1222 ) 

1223 except Exception as e: 

1224 return handle_api_error("creating collection", e) 

1225 

1226 

1227@rag_bp.route("/api/collections/<string:collection_id>", methods=["PUT"]) 

1228@login_required 

1229def update_collection(collection_id): 

1230 """Update a collection's details.""" 

1231 from ...database.session_context import get_user_db_session 

1232 

1233 try: 

1234 data = request.get_json() 

1235 name = data.get("name", "").strip() 

1236 description = data.get("description", "").strip() 

1237 

1238 username = session["username"] 

1239 with get_user_db_session(username) as db_session: 

1240 # No need to filter by username - each user has their own database 

1241 collection = ( 

1242 db_session.query(Collection).filter_by(id=collection_id).first() 

1243 ) 

1244 

1245 if not collection: 

1246 return jsonify( 

1247 {"success": False, "error": "Collection not found"} 

1248 ), 404 

1249 

1250 if name: 

1251 # Check if new name conflicts with existing collection 

1252 existing = ( 

1253 db_session.query(Collection) 

1254 .filter( 

1255 Collection.name == name, 

1256 Collection.id != collection_id, 

1257 ) 

1258 .first() 

1259 ) 

1260 

1261 if existing: 

1262 return jsonify( 

1263 { 

1264 "success": False, 

1265 "error": f"Collection '{name}' already exists", 

1266 } 

1267 ), 400 

1268 

1269 collection.name = name 

1270 

1271 if description is not None: # Allow empty description 1271 ↛ 1274line 1271 didn't jump to line 1274 because the condition on line 1271 was always true

1272 collection.description = description 

1273 

1274 db_session.commit() 

1275 

1276 return jsonify( 

1277 { 

1278 "success": True, 

1279 "collection": { 

1280 "id": collection.id, 

1281 "name": collection.name, 

1282 "description": collection.description, 

1283 "created_at": collection.created_at.isoformat() 

1284 if collection.created_at 

1285 else None, 

1286 "collection_type": collection.collection_type, 

1287 }, 

1288 } 

1289 ) 

1290 except Exception as e: 

1291 return handle_api_error("updating collection", e) 

1292 

1293 

1294@rag_bp.route("/api/collections/<string:collection_id>", methods=["DELETE"]) 

1295@login_required 

1296def delete_collection(collection_id): 

1297 """Delete a collection and its orphaned documents.""" 

1298 from ..deletion.services.collection_deletion import ( 

1299 CollectionDeletionService, 

1300 ) 

1301 

1302 try: 

1303 username = session["username"] 

1304 service = CollectionDeletionService(username) 

1305 result = service.delete_collection( 

1306 collection_id, delete_orphaned_documents=True 

1307 ) 

1308 

1309 if result.get("deleted"): 

1310 return jsonify( 

1311 { 

1312 "success": True, 

1313 "message": "Collection deleted successfully", 

1314 "deleted_chunks": result.get("chunks_deleted", 0), 

1315 "orphaned_documents_deleted": result.get( 

1316 "orphaned_documents_deleted", 0 

1317 ), 

1318 } 

1319 ) 

1320 else: 

1321 error = result.get("error", "Unknown error") 

1322 status_code = 404 if "not found" in error.lower() else 400 

1323 return jsonify({"success": False, "error": error}), status_code 

1324 

1325 except Exception as e: 

1326 return handle_api_error("deleting collection", e) 

1327 

1328 

1329def extract_text_from_file( 

1330 file_content: bytes, file_type: str, filename: str 

1331) -> Optional[str]: 

1332 """ 

1333 Extract text from uploaded file content. 

1334 

1335 Args: 

1336 file_content: Raw file content as bytes 

1337 file_type: Type of file (pdf, text, markdown, html, etc.) 

1338 filename: Original filename for logging 

1339 

1340 Returns: 

1341 Extracted text as string, or None if extraction failed 

1342 """ 

1343 try: 

1344 if file_type == "pdf": 

1345 # Use the existing PDF text extraction functionality 

1346 from ..downloaders.base import BaseDownloader 

1347 

1348 text = BaseDownloader.extract_text_from_pdf(file_content) 

1349 if text: 

1350 logger.info(f"Successfully extracted text from PDF: {filename}") 

1351 return text 

1352 else: 

1353 logger.warning(f"Could not extract text from PDF: {filename}") 

1354 return None 

1355 

1356 elif file_type in ["text", "txt", "markdown", "md"]: 

1357 # Plain text files - just decode 

1358 try: 

1359 text = file_content.decode("utf-8") 

1360 logger.info(f"Successfully read text file: {filename}") 

1361 return text 

1362 except UnicodeDecodeError: 

1363 # Retry with errors='ignore' (same encoding, but skip invalid bytes) 

1364 try: 

1365 text = file_content.decode("utf-8", errors="ignore") 

1366 logger.warning( 

1367 f"Read text file with encoding errors ignored: {filename}" 

1368 ) 

1369 return text 

1370 except Exception: 

1371 logger.exception(f"Failed to decode text file: {filename}") 

1372 return None 

1373 

1374 elif file_type in ["html", "htm"]: 

1375 # HTML files - extract text content 

1376 try: 

1377 from bs4 import BeautifulSoup 

1378 

1379 html_content = file_content.decode("utf-8", errors="ignore") 

1380 soup = BeautifulSoup(html_content, "html.parser") 

1381 

1382 # Remove script and style elements 

1383 for script in soup(["script", "style"]): 

1384 script.decompose() 

1385 

1386 # Get text 

1387 text = soup.get_text() 

1388 

1389 # Clean up whitespace 

1390 lines = (line.strip() for line in text.splitlines()) 

1391 chunks = ( 

1392 phrase.strip() 

1393 for line in lines 

1394 for phrase in line.split(" ") 

1395 ) 

1396 text = "\n".join(chunk for chunk in chunks if chunk) 

1397 

1398 logger.info( 

1399 f"Successfully extracted text from HTML: {filename}" 

1400 ) 

1401 return text 

1402 except Exception: 

1403 logger.exception( 

1404 f"Failed to extract text from HTML: {filename}" 

1405 ) 

1406 return None 

1407 

1408 elif file_type in ["docx", "doc"]: 

1409 # Word documents - would need python-docx library 

1410 logger.warning( 

1411 f"Word document extraction not yet implemented: {filename}" 

1412 ) 

1413 return None 

1414 

1415 else: 

1416 logger.warning( 

1417 f"Unsupported file type for text extraction: {file_type} ({filename})" 

1418 ) 

1419 return None 

1420 

1421 except Exception: 

1422 logger.exception(f"Error extracting text from {filename}") 

1423 return None 

1424 

1425 

1426@rag_bp.route( 

1427 "/api/collections/<string:collection_id>/upload", methods=["POST"] 

1428) 

1429@login_required 

1430def upload_to_collection(collection_id): 

1431 """Upload files to a collection.""" 

1432 from ...database.session_context import get_user_db_session 

1433 from werkzeug.utils import secure_filename 

1434 from pathlib import Path 

1435 import hashlib 

1436 import uuid 

1437 from ..services.pdf_storage_manager import PDFStorageManager 

1438 

1439 try: 

1440 if "files" not in request.files: 

1441 return jsonify( 

1442 {"success": False, "error": "No files provided"} 

1443 ), 400 

1444 

1445 files = request.files.getlist("files") 

1446 if not files: 

1447 return jsonify( 

1448 {"success": False, "error": "No files selected"} 

1449 ), 400 

1450 

1451 username = session["username"] 

1452 with get_user_db_session(username) as db_session: 

1453 # Verify collection exists in this user's database 

1454 collection = ( 

1455 db_session.query(Collection).filter_by(id=collection_id).first() 

1456 ) 

1457 

1458 if not collection: 

1459 return jsonify( 

1460 {"success": False, "error": "Collection not found"} 

1461 ), 404 

1462 

1463 # Get PDF storage mode from form data, falling back to user's setting 

1464 settings = get_settings_manager() 

1465 default_pdf_storage = settings.get_setting( 

1466 "research_library.upload_pdf_storage", "none" 

1467 ) 

1468 pdf_storage = request.form.get("pdf_storage", default_pdf_storage) 

1469 if pdf_storage not in ("database", "none"): 

1470 # Security: user uploads can only use database (encrypted) or none (text-only) 

1471 # Filesystem storage is not allowed for user uploads 

1472 pdf_storage = "none" 

1473 

1474 # Initialize PDF storage manager if storing PDFs in database 

1475 pdf_storage_manager = None 

1476 if pdf_storage == "database": 

1477 library_root = settings.get_setting( 

1478 "research_library.storage_path", 

1479 str(get_library_directory()), 

1480 ) 

1481 library_root = str(Path(library_root).expanduser()) 

1482 pdf_storage_manager = PDFStorageManager( 

1483 library_root=Path(library_root), storage_mode="database" 

1484 ) 

1485 logger.info("PDF storage mode: database (encrypted)") 

1486 else: 

1487 logger.info("PDF storage mode: none (text-only)") 

1488 

1489 uploaded_files = [] 

1490 errors = [] 

1491 

1492 for file in files: 

1493 if not file.filename: 

1494 continue 

1495 

1496 try: 

1497 # Read file content 

1498 file_content = file.read() 

1499 file.seek(0) # Reset for potential re-reading 

1500 

1501 # Calculate file hash for deduplication 

1502 file_hash = hashlib.sha256(file_content).hexdigest() 

1503 

1504 # Check if document already exists 

1505 existing_doc = ( 

1506 db_session.query(Document) 

1507 .filter_by(document_hash=file_hash) 

1508 .first() 

1509 ) 

1510 

1511 if existing_doc: 

1512 # Document exists, check if we can upgrade to include PDF 

1513 pdf_upgraded = False 

1514 if ( 

1515 pdf_storage == "database" 

1516 and pdf_storage_manager is not None 

1517 ): 

1518 pdf_upgraded = pdf_storage_manager.upgrade_to_pdf( 

1519 document=existing_doc, 

1520 pdf_content=file_content, 

1521 session=db_session, 

1522 ) 

1523 

1524 # Check if already in collection 

1525 existing_link = ( 

1526 db_session.query(DocumentCollection) 

1527 .filter_by( 

1528 document_id=existing_doc.id, 

1529 collection_id=collection_id, 

1530 ) 

1531 .first() 

1532 ) 

1533 

1534 if not existing_link: 

1535 # Add to collection 

1536 collection_link = DocumentCollection( 

1537 document_id=existing_doc.id, 

1538 collection_id=collection_id, 

1539 indexed=False, 

1540 chunk_count=0, 

1541 ) 

1542 db_session.add(collection_link) 

1543 status = "added_to_collection" 

1544 if pdf_upgraded: 

1545 status = "added_to_collection_pdf_upgraded" 

1546 uploaded_files.append( 

1547 { 

1548 "filename": existing_doc.filename, 

1549 "status": status, 

1550 "id": existing_doc.id, 

1551 "pdf_upgraded": pdf_upgraded, 

1552 } 

1553 ) 

1554 else: 

1555 status = "already_in_collection" 

1556 if pdf_upgraded: 

1557 status = "pdf_upgraded" 

1558 uploaded_files.append( 

1559 { 

1560 "filename": existing_doc.filename, 

1561 "status": status, 

1562 "id": existing_doc.id, 

1563 "pdf_upgraded": pdf_upgraded, 

1564 } 

1565 ) 

1566 else: 

1567 # Create new document 

1568 filename = secure_filename(file.filename) 

1569 

1570 # Determine file type 

1571 file_extension = Path(filename).suffix.lower() 

1572 file_type = { 

1573 ".pdf": "pdf", 

1574 ".txt": "text", 

1575 ".md": "markdown", 

1576 ".html": "html", 

1577 ".htm": "html", 

1578 ".docx": "docx", 

1579 ".doc": "doc", 

1580 }.get(file_extension, "unknown") 

1581 

1582 # Extract text from the file 

1583 extracted_text = extract_text_from_file( 

1584 file_content, file_type, filename 

1585 ) 

1586 

1587 # Clean the extracted text to remove surrogate characters 

1588 if extracted_text: 

1589 from ...text_processing import remove_surrogates 

1590 

1591 extracted_text = remove_surrogates(extracted_text) 

1592 

1593 if not extracted_text: 

1594 errors.append( 

1595 { 

1596 "filename": filename, 

1597 "error": f"Could not extract text from {file_type} file", 

1598 } 

1599 ) 

1600 logger.warning( 

1601 f"Skipping file {filename} - no text could be extracted" 

1602 ) 

1603 continue 

1604 

1605 # Get or create the user_upload source type 

1606 logger.info( 

1607 f"Getting or creating user_upload source type for {filename}" 

1608 ) 

1609 source_type = ( 

1610 db_session.query(SourceType) 

1611 .filter_by(name="user_upload") 

1612 .first() 

1613 ) 

1614 if not source_type: 

1615 logger.info("Creating new user_upload source type") 

1616 source_type = SourceType( 

1617 id=str(uuid.uuid4()), 

1618 name="user_upload", 

1619 display_name="User Upload", 

1620 description="Documents uploaded by users", 

1621 icon="fas fa-upload", 

1622 ) 

1623 db_session.add(source_type) 

1624 db_session.flush() 

1625 logger.info( 

1626 f"Created source type with ID: {source_type.id}" 

1627 ) 

1628 else: 

1629 logger.info( 

1630 f"Found existing source type with ID: {source_type.id}" 

1631 ) 

1632 

1633 # Create document with extracted text (no username needed - in user's own database) 

1634 # Note: uploaded_at uses default=utcnow() in the model, so we don't need to set it manually 

1635 doc_id = str(uuid.uuid4()) 

1636 logger.info( 

1637 f"Creating document {doc_id} for {filename}" 

1638 ) 

1639 

1640 # Determine storage mode and file_path 

1641 store_pdf_in_db = ( 

1642 pdf_storage == "database" 

1643 and file_type == "pdf" 

1644 and pdf_storage_manager is not None 

1645 ) 

1646 

1647 new_doc = Document( 

1648 id=doc_id, 

1649 source_type_id=source_type.id, 

1650 filename=filename, 

1651 document_hash=file_hash, 

1652 file_size=len(file_content), 

1653 file_type=file_type, 

1654 text_content=extracted_text, # Always store extracted text 

1655 file_path=None 

1656 if store_pdf_in_db 

1657 else "text_only_not_stored", 

1658 storage_mode="database" 

1659 if store_pdf_in_db 

1660 else "none", 

1661 ) 

1662 db_session.add(new_doc) 

1663 db_session.flush() # Get the ID 

1664 logger.info( 

1665 f"Document {new_doc.id} created successfully" 

1666 ) 

1667 

1668 # Store PDF in encrypted database if requested 

1669 pdf_stored = False 

1670 if store_pdf_in_db: 

1671 try: 

1672 pdf_storage_manager.save_pdf( 

1673 pdf_content=file_content, 

1674 document=new_doc, 

1675 session=db_session, 

1676 filename=filename, 

1677 ) 

1678 pdf_stored = True 

1679 logger.info( 

1680 f"PDF stored in encrypted database for {filename}" 

1681 ) 

1682 except Exception: 

1683 logger.exception( 

1684 f"Failed to store PDF in database for {filename}" 

1685 ) 

1686 # Continue without PDF storage - text is still saved 

1687 

1688 # Add to collection 

1689 collection_link = DocumentCollection( 

1690 document_id=new_doc.id, 

1691 collection_id=collection_id, 

1692 indexed=False, 

1693 chunk_count=0, 

1694 ) 

1695 db_session.add(collection_link) 

1696 

1697 uploaded_files.append( 

1698 { 

1699 "filename": filename, 

1700 "status": "uploaded", 

1701 "id": new_doc.id, 

1702 "text_length": len(extracted_text), 

1703 "pdf_stored": pdf_stored, 

1704 } 

1705 ) 

1706 

1707 except Exception: 

1708 errors.append( 

1709 { 

1710 "filename": file.filename, 

1711 "error": "Failed to upload file", 

1712 } 

1713 ) 

1714 logger.exception(f"Error uploading file {file.filename}") 

1715 

1716 db_session.commit() 

1717 

1718 return jsonify( 

1719 { 

1720 "success": True, 

1721 "uploaded": uploaded_files, 

1722 "errors": errors, 

1723 "summary": { 

1724 "total": len(files), 

1725 "successful": len(uploaded_files), 

1726 "failed": len(errors), 

1727 }, 

1728 } 

1729 ) 

1730 

1731 except Exception as e: 

1732 return handle_api_error("uploading files", e) 

1733 

1734 

1735@rag_bp.route( 

1736 "/api/collections/<string:collection_id>/documents", methods=["GET"] 

1737) 

1738@login_required 

1739def get_collection_documents(collection_id): 

1740 """Get all documents in a collection.""" 

1741 from ...database.session_context import get_user_db_session 

1742 

1743 try: 

1744 username = session["username"] 

1745 with get_user_db_session(username) as db_session: 

1746 # Verify collection exists in this user's database 

1747 collection = ( 

1748 db_session.query(Collection).filter_by(id=collection_id).first() 

1749 ) 

1750 

1751 if not collection: 

1752 return jsonify( 

1753 {"success": False, "error": "Collection not found"} 

1754 ), 404 

1755 

1756 # Get documents through junction table 

1757 doc_links = ( 

1758 db_session.query(DocumentCollection, Document) 

1759 .join(Document) 

1760 .filter(DocumentCollection.collection_id == collection_id) 

1761 .all() 

1762 ) 

1763 

1764 documents = [] 

1765 for link, doc in doc_links: 

1766 # Check if PDF file is stored 

1767 has_pdf = bool( 

1768 doc.file_path 

1769 and doc.file_path != "metadata_only" 

1770 and doc.file_path != "text_only_not_stored" 

1771 ) 

1772 has_text_db = bool(doc.text_content) 

1773 

1774 # Use title if available, otherwise filename 

1775 display_title = doc.title or doc.filename or "Untitled" 

1776 

1777 # Get source type name 

1778 source_type_name = ( 

1779 doc.source_type.name if doc.source_type else "unknown" 

1780 ) 

1781 

1782 # Check if document is in other collections 

1783 other_collections_count = ( 

1784 db_session.query(DocumentCollection) 

1785 .filter( 

1786 DocumentCollection.document_id == doc.id, 

1787 DocumentCollection.collection_id != collection_id, 

1788 ) 

1789 .count() 

1790 ) 

1791 

1792 documents.append( 

1793 { 

1794 "id": doc.id, 

1795 "filename": display_title, 

1796 "title": display_title, 

1797 "file_type": doc.file_type, 

1798 "file_size": doc.file_size, 

1799 "uploaded_at": doc.created_at.isoformat() 

1800 if doc.created_at 

1801 else None, 

1802 "indexed": link.indexed, 

1803 "chunk_count": link.chunk_count, 

1804 "last_indexed_at": link.last_indexed_at.isoformat() 

1805 if link.last_indexed_at 

1806 else None, 

1807 "has_pdf": has_pdf, 

1808 "has_text_db": has_text_db, 

1809 "source_type": source_type_name, 

1810 "in_other_collections": other_collections_count > 0, 

1811 "other_collections_count": other_collections_count, 

1812 } 

1813 ) 

1814 

1815 # Get index file size if available 

1816 index_file_size = None 

1817 index_file_size_bytes = None 

1818 collection_name = f"collection_{collection_id}" 

1819 rag_index = ( 

1820 db_session.query(RAGIndex) 

1821 .filter_by(collection_name=collection_name) 

1822 .first() 

1823 ) 

1824 if rag_index and rag_index.index_path: 

1825 from pathlib import Path 

1826 

1827 index_path = Path(rag_index.index_path) 

1828 if index_path.exists(): 

1829 size_bytes = index_path.stat().st_size 

1830 index_file_size_bytes = size_bytes 

1831 # Format as human-readable 

1832 if size_bytes < 1024: 

1833 index_file_size = f"{size_bytes} B" 

1834 elif size_bytes < 1024 * 1024: 

1835 index_file_size = f"{size_bytes / 1024:.1f} KB" 

1836 else: 

1837 index_file_size = f"{size_bytes / (1024 * 1024):.1f} MB" 

1838 

1839 return jsonify( 

1840 { 

1841 "success": True, 

1842 "collection": { 

1843 "id": collection.id, 

1844 "name": collection.name, 

1845 "description": collection.description, 

1846 "embedding_model": collection.embedding_model, 

1847 "embedding_model_type": collection.embedding_model_type.value 

1848 if collection.embedding_model_type 

1849 else None, 

1850 "embedding_dimension": collection.embedding_dimension, 

1851 "chunk_size": collection.chunk_size, 

1852 "chunk_overlap": collection.chunk_overlap, 

1853 # Advanced settings 

1854 "splitter_type": collection.splitter_type, 

1855 "distance_metric": collection.distance_metric, 

1856 "index_type": collection.index_type, 

1857 "normalize_vectors": collection.normalize_vectors, 

1858 # Index file info 

1859 "index_file_size": index_file_size, 

1860 "index_file_size_bytes": index_file_size_bytes, 

1861 }, 

1862 "documents": documents, 

1863 } 

1864 ) 

1865 

1866 except Exception as e: 

1867 return handle_api_error("getting collection documents", e) 

1868 

1869 

1870@rag_bp.route("/api/collections/<string:collection_id>/index", methods=["GET"]) 

1871@login_required 

1872def index_collection(collection_id): 

1873 """Index all documents in a collection with Server-Sent Events progress.""" 

1874 from ...database.session_context import get_user_db_session 

1875 from ...database.session_passwords import session_password_store 

1876 

1877 force_reindex = request.args.get("force_reindex", "false").lower() == "true" 

1878 username = session["username"] 

1879 session_id = session.get("session_id") 

1880 

1881 logger.info(f"Starting index_collection, force_reindex={force_reindex}") 

1882 

1883 # Get password for thread access to encrypted database 

1884 db_password = None 

1885 if session_id: 

1886 db_password = session_password_store.get_session_password( 

1887 username, session_id 

1888 ) 

1889 

1890 # Create RAG service 

1891 rag_service = get_rag_service(collection_id) 

1892 # Set password for thread access 

1893 rag_service.db_password = db_password 

1894 logger.info( 

1895 f"RAG service created: provider={rag_service.embedding_provider}" 

1896 ) 

1897 

1898 def generate(): 

1899 """Generator for SSE progress updates.""" 

1900 logger.info("SSE generator started") 

1901 try: 

1902 with get_user_db_session(username, db_password) as db_session: 

1903 # Verify collection exists in this user's database 

1904 collection = ( 

1905 db_session.query(Collection) 

1906 .filter_by(id=collection_id) 

1907 .first() 

1908 ) 

1909 

1910 if not collection: 

1911 yield f"data: {json.dumps({'type': 'error', 'error': 'Collection not found'})}\n\n" 

1912 return 

1913 

1914 # Store embedding metadata if this is the first time indexing 

1915 if collection.embedding_model is None: 

1916 # Get embedding dimension from the embedding manager 

1917 embedding_dim = None 

1918 try: 

1919 # Try to get dimension from the embedding manager's provider 

1920 if hasattr(rag_service.embedding_manager, "provider"): 

1921 provider = rag_service.embedding_manager.provider 

1922 if hasattr(provider, "embedding_dimension"): 

1923 embedding_dim = provider.embedding_dimension 

1924 except Exception as e: 

1925 logger.warning( 

1926 f"Could not get embedding dimension: {e}" 

1927 ) 

1928 

1929 # Store the embedding configuration used for this collection 

1930 collection.embedding_model = rag_service.embedding_model 

1931 collection.embedding_model_type = EmbeddingProvider( 

1932 rag_service.embedding_provider 

1933 ) 

1934 collection.embedding_dimension = embedding_dim 

1935 collection.chunk_size = rag_service.chunk_size 

1936 collection.chunk_overlap = rag_service.chunk_overlap 

1937 # Store advanced settings 

1938 collection.splitter_type = rag_service.splitter_type 

1939 collection.text_separators = rag_service.text_separators 

1940 collection.distance_metric = rag_service.distance_metric 

1941 # Ensure normalize_vectors is a proper boolean for database 

1942 collection.normalize_vectors = bool( 

1943 rag_service.normalize_vectors 

1944 ) 

1945 collection.index_type = rag_service.index_type 

1946 db_session.commit() 

1947 logger.info( 

1948 f"Stored embedding metadata for collection: provider={rag_service.embedding_provider}" 

1949 ) 

1950 

1951 # Get documents to index 

1952 query = ( 

1953 db_session.query(DocumentCollection, Document) 

1954 .join(Document) 

1955 .filter(DocumentCollection.collection_id == collection_id) 

1956 ) 

1957 

1958 if not force_reindex: 

1959 # IMPORTANT: DO NOT CHANGE THIS LINE! DO NOT REMOVE THIS COMMENT! 

1960 # We MUST use `== False` instead of `not DocumentCollection.indexed` 

1961 # The Python `not` operator does NOT work correctly in SQLAlchemy filters. 

1962 # Using `not` will cause the query to return NO results (zero documents). 

1963 # SQLAlchemy requires explicit comparison: `== False` or `== True` 

1964 # This has been fixed multiple times - DO NOT change it back to `not`! 

1965 query = query.filter(DocumentCollection.indexed == False) # noqa: E712 

1966 

1967 doc_links = query.all() 

1968 

1969 if not doc_links: 

1970 logger.info("No documents to index in collection") 

1971 yield f"data: {json.dumps({'type': 'complete', 'results': {'successful': 0, 'skipped': 0, 'failed': 0, 'message': 'No documents to index'}})}\n\n" 

1972 return 

1973 

1974 total = len(doc_links) 

1975 logger.info(f"Found {total} documents to index") 

1976 results = { 

1977 "successful": 0, 

1978 "skipped": 0, 

1979 "failed": 0, 

1980 "errors": [], 

1981 } 

1982 

1983 yield f"data: {json.dumps({'type': 'start', 'message': f'Indexing {total} documents in collection: {collection.name}'})}\n\n" 

1984 

1985 for idx, (link, doc) in enumerate(doc_links, 1): 

1986 filename = doc.filename or doc.title or "Unknown" 

1987 yield f"data: {json.dumps({'type': 'progress', 'current': idx, 'total': total, 'filename': filename, 'percent': int((idx / total) * 100)})}\n\n" 

1988 

1989 try: 

1990 logger.debug( 

1991 f"Indexing document {idx}/{total}: {filename}" 

1992 ) 

1993 

1994 # Run index_document in a separate thread to allow sending SSE heartbeats. 

1995 # This keeps the HTTP connection alive during long indexing operations, 

1996 # preventing timeouts from proxy servers (nginx) and browsers. 

1997 # The main thread periodically yields heartbeat comments while waiting. 

1998 result_queue = queue.Queue() 

1999 error_queue = queue.Queue() 

2000 

2001 def index_in_thread(): 

2002 try: 

2003 r = rag_service.index_document( 

2004 document_id=doc.id, 

2005 collection_id=collection_id, 

2006 force_reindex=force_reindex, 

2007 ) 

2008 result_queue.put(r) 

2009 except Exception as ex: 

2010 error_queue.put(ex) 

2011 

2012 thread = threading.Thread(target=index_in_thread) 

2013 thread.start() 

2014 

2015 # Send heartbeats while waiting for the thread to complete 

2016 heartbeat_interval = 5 # seconds 

2017 while thread.is_alive(): 

2018 thread.join(timeout=heartbeat_interval) 

2019 if thread.is_alive(): 

2020 # Send SSE comment as heartbeat (keeps connection alive) 

2021 yield f": heartbeat {idx}/{total}\n\n" 

2022 

2023 # Check for errors from thread 

2024 if not error_queue.empty(): 

2025 raise error_queue.get() 

2026 

2027 result = result_queue.get() 

2028 logger.info( 

2029 f"Indexed document {idx}/{total}: {filename} - status={result.get('status')}" 

2030 ) 

2031 

2032 if result.get("status") == "success": 

2033 results["successful"] += 1 

2034 # DocumentCollection status is already updated in index_document 

2035 # No need to update link here 

2036 elif result.get("status") == "skipped": 

2037 results["skipped"] += 1 

2038 else: 

2039 results["failed"] += 1 

2040 error_msg = result.get("error", "Unknown error") 

2041 results["errors"].append( 

2042 { 

2043 "filename": filename, 

2044 "error": error_msg, 

2045 } 

2046 ) 

2047 logger.warning( 

2048 f"Failed to index {filename} ({idx}/{total}): {error_msg}" 

2049 ) 

2050 except Exception as e: 

2051 results["failed"] += 1 

2052 error_msg = str(e) or "Failed to index document" 

2053 results["errors"].append( 

2054 { 

2055 "filename": filename, 

2056 "error": error_msg, 

2057 } 

2058 ) 

2059 logger.exception( 

2060 f"Exception indexing document {filename} ({idx}/{total})" 

2061 ) 

2062 # Send error update to client so they know indexing is continuing 

2063 yield f"data: {json.dumps({'type': 'doc_error', 'filename': filename, 'error': error_msg})}\n\n" 

2064 

2065 db_session.commit() 

2066 # Ensure all changes are written to disk 

2067 db_session.flush() 

2068 

2069 logger.info( 

2070 f"Indexing complete: {results['successful']} successful, {results['failed']} failed, {results['skipped']} skipped" 

2071 ) 

2072 yield f"data: {json.dumps({'type': 'complete', 'results': results})}\n\n" 

2073 logger.info("SSE generator finished successfully") 

2074 

2075 except Exception: 

2076 logger.exception("Error in collection indexing") 

2077 yield f"data: {json.dumps({'type': 'error', 'error': 'An internal error occurred during indexing'})}\n\n" 

2078 

2079 response = Response( 

2080 stream_with_context(generate()), mimetype="text/event-stream" 

2081 ) 

2082 # Prevent buffering for proper SSE streaming 

2083 response.headers["Cache-Control"] = "no-cache, no-transform" 

2084 response.headers["Connection"] = "keep-alive" 

2085 response.headers["X-Accel-Buffering"] = "no" 

2086 return response 

2087 

2088 

2089# ============================================================================= 

2090# Background Indexing Endpoints 

2091# ============================================================================= 

2092 

2093 

2094def _get_rag_service_for_thread( 

2095 collection_id: str, 

2096 username: str, 

2097 db_password: str, 

2098) -> LibraryRAGService: 

2099 """ 

2100 Create RAG service for use in background threads (no Flask context). 

2101 """ 

2102 from ...database.session_context import get_user_db_session 

2103 from ...settings.manager import SettingsManager 

2104 from ...web_search_engines.engines.search_engine_local import ( 

2105 LocalEmbeddingManager, 

2106 ) 

2107 import json 

2108 

2109 with get_user_db_session(username, db_password) as db_session: 

2110 settings_manager = SettingsManager(db_session) 

2111 

2112 # Get default settings 

2113 default_embedding_model = settings_manager.get_setting( 

2114 "local_search_embedding_model", "all-MiniLM-L6-v2" 

2115 ) 

2116 default_embedding_provider = settings_manager.get_setting( 

2117 "local_search_embedding_provider", "sentence_transformers" 

2118 ) 

2119 default_chunk_size = int( 

2120 settings_manager.get_setting("local_search_chunk_size", 1000) 

2121 ) 

2122 default_chunk_overlap = int( 

2123 settings_manager.get_setting("local_search_chunk_overlap", 200) 

2124 ) 

2125 default_splitter_type = settings_manager.get_setting( 

2126 "local_search_splitter_type", "recursive" 

2127 ) 

2128 default_text_separators = settings_manager.get_setting( 

2129 "local_search_text_separators", '["\n\n", "\n", ". ", " ", ""]' 

2130 ) 

2131 if isinstance(default_text_separators, str): 

2132 try: 

2133 default_text_separators = json.loads(default_text_separators) 

2134 except json.JSONDecodeError: 

2135 logger.warning( 

2136 f"Invalid JSON for local_search_text_separators setting: {default_text_separators!r}. " 

2137 "Using default separators." 

2138 ) 

2139 default_text_separators = ["\n\n", "\n", ". ", " ", ""] 

2140 default_distance_metric = settings_manager.get_setting( 

2141 "local_search_distance_metric", "cosine" 

2142 ) 

2143 raw_normalize = settings_manager.get_setting( 

2144 "local_search_normalize_vectors", True 

2145 ) 

2146 if isinstance(raw_normalize, str): 

2147 default_normalize_vectors = raw_normalize.lower() in ( 

2148 "true", 

2149 "1", 

2150 "yes", 

2151 ) 

2152 else: 

2153 default_normalize_vectors = bool(raw_normalize) 

2154 default_index_type = settings_manager.get_setting( 

2155 "local_search_index_type", "flat" 

2156 ) 

2157 

2158 # Get settings snapshot for embedding manager 

2159 settings_snapshot = settings_manager.get_settings_snapshot() 

2160 settings_snapshot["_username"] = username 

2161 

2162 # Check for collection's stored settings 

2163 collection = ( 

2164 db_session.query(Collection).filter_by(id=collection_id).first() 

2165 ) 

2166 

2167 if collection and collection.embedding_model: 

2168 # Use collection's stored settings 

2169 embedding_model = collection.embedding_model 

2170 embedding_provider = collection.embedding_model_type.value 

2171 chunk_size = collection.chunk_size or default_chunk_size 

2172 chunk_overlap = collection.chunk_overlap or default_chunk_overlap 

2173 splitter_type = collection.splitter_type or default_splitter_type 

2174 text_separators = ( 

2175 collection.text_separators or default_text_separators 

2176 ) 

2177 distance_metric = ( 

2178 collection.distance_metric or default_distance_metric 

2179 ) 

2180 index_type = collection.index_type or default_index_type 

2181 

2182 coll_normalize = collection.normalize_vectors 

2183 if coll_normalize is not None: 

2184 if isinstance(coll_normalize, str): 

2185 coll_normalize = coll_normalize.lower() in ( 

2186 "true", 

2187 "1", 

2188 "yes", 

2189 ) 

2190 else: 

2191 coll_normalize = bool(coll_normalize) 

2192 else: 

2193 coll_normalize = default_normalize_vectors 

2194 normalize_vectors = coll_normalize 

2195 else: 

2196 # Use default settings 

2197 embedding_model = default_embedding_model 

2198 embedding_provider = default_embedding_provider 

2199 chunk_size = default_chunk_size 

2200 chunk_overlap = default_chunk_overlap 

2201 splitter_type = default_splitter_type 

2202 text_separators = default_text_separators 

2203 distance_metric = default_distance_metric 

2204 normalize_vectors = default_normalize_vectors 

2205 index_type = default_index_type 

2206 

2207 # Update settings snapshot with embedding config 

2208 settings_snapshot.update( 

2209 { 

2210 "embeddings.provider": embedding_provider, 

2211 f"embeddings.{embedding_provider}.model": embedding_model, 

2212 "local_search_chunk_size": chunk_size, 

2213 "local_search_chunk_overlap": chunk_overlap, 

2214 } 

2215 ) 

2216 

2217 # Create embedding manager (to avoid database access in LibraryRAGService.__init__) 

2218 embedding_manager = LocalEmbeddingManager( 

2219 embedding_model=embedding_model, 

2220 embedding_model_type=embedding_provider, 

2221 chunk_size=chunk_size, 

2222 chunk_overlap=chunk_overlap, 

2223 settings_snapshot=settings_snapshot, 

2224 ) 

2225 embedding_manager.db_password = db_password 

2226 

2227 # Create RAG service with pre-built embedding manager and db_password 

2228 rag_service = LibraryRAGService( 

2229 username=username, 

2230 embedding_model=embedding_model, 

2231 embedding_provider=embedding_provider, 

2232 chunk_size=chunk_size, 

2233 chunk_overlap=chunk_overlap, 

2234 splitter_type=splitter_type, 

2235 text_separators=text_separators, 

2236 distance_metric=distance_metric, 

2237 normalize_vectors=normalize_vectors, 

2238 index_type=index_type, 

2239 embedding_manager=embedding_manager, 

2240 db_password=db_password, 

2241 ) 

2242 

2243 return rag_service 

2244 

2245 

2246def _background_index_worker( 

2247 task_id: str, 

2248 collection_id: str, 

2249 username: str, 

2250 db_password: str, 

2251 force_reindex: bool, 

2252): 

2253 """ 

2254 Background worker thread for indexing documents. 

2255 Updates TaskMetadata with progress and checks for cancellation. 

2256 """ 

2257 from ...database.session_context import get_user_db_session 

2258 

2259 try: 

2260 # Create RAG service (thread-safe, no Flask context needed) 

2261 rag_service = _get_rag_service_for_thread( 

2262 collection_id, username, db_password 

2263 ) 

2264 

2265 with get_user_db_session(username, db_password) as db_session: 

2266 # Get collection 

2267 collection = ( 

2268 db_session.query(Collection).filter_by(id=collection_id).first() 

2269 ) 

2270 

2271 if not collection: 

2272 _update_task_status( 

2273 username, 

2274 db_password, 

2275 task_id, 

2276 status="failed", 

2277 error_message="Collection not found", 

2278 ) 

2279 return 

2280 

2281 # Store embedding metadata if first time 

2282 if collection.embedding_model is None: 

2283 collection.embedding_model = rag_service.embedding_model 

2284 collection.embedding_model_type = EmbeddingProvider( 

2285 rag_service.embedding_provider 

2286 ) 

2287 collection.chunk_size = rag_service.chunk_size 

2288 collection.chunk_overlap = rag_service.chunk_overlap 

2289 collection.splitter_type = rag_service.splitter_type 

2290 collection.text_separators = rag_service.text_separators 

2291 collection.distance_metric = rag_service.distance_metric 

2292 collection.normalize_vectors = bool( 

2293 rag_service.normalize_vectors 

2294 ) 

2295 collection.index_type = rag_service.index_type 

2296 db_session.commit() 

2297 

2298 # Get documents to index 

2299 query = ( 

2300 db_session.query(DocumentCollection, Document) 

2301 .join(Document) 

2302 .filter(DocumentCollection.collection_id == collection_id) 

2303 ) 

2304 

2305 if not force_reindex: 

2306 query = query.filter(DocumentCollection.indexed == False) # noqa: E712 

2307 

2308 doc_links = query.all() 

2309 

2310 if not doc_links: 

2311 _update_task_status( 

2312 username, 

2313 db_password, 

2314 task_id, 

2315 status="completed", 

2316 progress_message="No documents to index", 

2317 ) 

2318 return 

2319 

2320 total = len(doc_links) 

2321 results = {"successful": 0, "skipped": 0, "failed": 0} 

2322 

2323 # Update task with total count 

2324 _update_task_status( 

2325 username, 

2326 db_password, 

2327 task_id, 

2328 progress_total=total, 

2329 progress_message=f"Indexing {total} documents", 

2330 ) 

2331 

2332 for idx, (link, doc) in enumerate(doc_links, 1): 

2333 # Check if cancelled 

2334 if _is_task_cancelled(username, db_password, task_id): 

2335 _update_task_status( 

2336 username, 

2337 db_password, 

2338 task_id, 

2339 status="cancelled", 

2340 progress_message=f"Cancelled after {idx - 1}/{total} documents", 

2341 ) 

2342 logger.info(f"Indexing task {task_id} was cancelled") 

2343 return 

2344 

2345 filename = doc.filename or doc.title or "Unknown" 

2346 

2347 # Update progress with filename 

2348 _update_task_status( 

2349 username, 

2350 db_password, 

2351 task_id, 

2352 progress_current=idx, 

2353 progress_message=f"Indexing {idx}/{total}: {filename}", 

2354 ) 

2355 

2356 try: 

2357 result = rag_service.index_document( 

2358 document_id=doc.id, 

2359 collection_id=collection_id, 

2360 force_reindex=force_reindex, 

2361 ) 

2362 

2363 if result.get("status") == "success": 

2364 results["successful"] += 1 

2365 elif result.get("status") == "skipped": 

2366 results["skipped"] += 1 

2367 else: 

2368 results["failed"] += 1 

2369 

2370 except Exception: 

2371 results["failed"] += 1 

2372 logger.exception(f"Error indexing document {idx}/{total}") 

2373 

2374 db_session.commit() 

2375 

2376 # Mark as completed 

2377 _update_task_status( 

2378 username, 

2379 db_password, 

2380 task_id, 

2381 status="completed", 

2382 progress_current=total, 

2383 progress_message=f"Completed: {results['successful']} indexed, {results['failed']} failed, {results['skipped']} skipped", 

2384 ) 

2385 logger.info(f"Background indexing task {task_id} completed: {results}") 

2386 

2387 except Exception as e: 

2388 logger.exception(f"Background indexing task {task_id} failed") 

2389 _update_task_status( 

2390 username, 

2391 db_password, 

2392 task_id, 

2393 status="failed", 

2394 error_message=str(e), 

2395 ) 

2396 

2397 

2398def _update_task_status( 

2399 username: str, 

2400 db_password: str, 

2401 task_id: str, 

2402 status: str = None, 

2403 progress_current: int = None, 

2404 progress_total: int = None, 

2405 progress_message: str = None, 

2406 error_message: str = None, 

2407): 

2408 """Update task metadata in the database.""" 

2409 from ...database.session_context import get_user_db_session 

2410 

2411 try: 

2412 with get_user_db_session(username, db_password) as db_session: 

2413 task = ( 

2414 db_session.query(TaskMetadata) 

2415 .filter_by(task_id=task_id) 

2416 .first() 

2417 ) 

2418 if task: 

2419 if status is not None: 

2420 task.status = status 

2421 if status == "completed": 

2422 task.completed_at = datetime.now(UTC) 

2423 if progress_current is not None: 

2424 task.progress_current = progress_current 

2425 if progress_total is not None: 

2426 task.progress_total = progress_total 

2427 if progress_message is not None: 

2428 task.progress_message = progress_message 

2429 if error_message is not None: 

2430 task.error_message = error_message 

2431 db_session.commit() 

2432 except Exception: 

2433 logger.exception(f"Failed to update task status for {task_id}") 

2434 

2435 

2436def _is_task_cancelled(username: str, db_password: str, task_id: str) -> bool: 

2437 """Check if a task has been cancelled.""" 

2438 from ...database.session_context import get_user_db_session 

2439 

2440 try: 

2441 with get_user_db_session(username, db_password) as db_session: 

2442 task = ( 

2443 db_session.query(TaskMetadata) 

2444 .filter_by(task_id=task_id) 

2445 .first() 

2446 ) 

2447 return task and task.status == "cancelled" 

2448 except Exception: 

2449 return False 

2450 

2451 

2452@rag_bp.route( 

2453 "/api/collections/<string:collection_id>/index/start", methods=["POST"] 

2454) 

2455@login_required 

2456def start_background_index(collection_id): 

2457 """Start background indexing for a collection.""" 

2458 from ...database.session_context import get_user_db_session 

2459 from ...database.session_passwords import session_password_store 

2460 

2461 username = session["username"] 

2462 session_id = session.get("session_id") 

2463 

2464 # Get password for thread access 

2465 db_password = None 

2466 if session_id: 

2467 db_password = session_password_store.get_session_password( 

2468 username, session_id 

2469 ) 

2470 

2471 # Parse request body 

2472 data = request.get_json() or {} 

2473 force_reindex = data.get("force_reindex", False) 

2474 

2475 try: 

2476 with get_user_db_session(username, db_password) as db_session: 

2477 # Check if there's already an active indexing task for this collection 

2478 existing_task = ( 

2479 db_session.query(TaskMetadata) 

2480 .filter( 

2481 TaskMetadata.task_type == "indexing", 

2482 TaskMetadata.status == "processing", 

2483 ) 

2484 .first() 

2485 ) 

2486 

2487 if existing_task: 

2488 # Check if it's for this collection 

2489 metadata = existing_task.metadata_json or {} 

2490 if metadata.get("collection_id") == collection_id: 

2491 return jsonify( 

2492 { 

2493 "success": False, 

2494 "error": "Indexing is already in progress for this collection", 

2495 "task_id": existing_task.task_id, 

2496 } 

2497 ), 409 

2498 

2499 # Create new task 

2500 task_id = str(uuid.uuid4()) 

2501 task = TaskMetadata( 

2502 task_id=task_id, 

2503 status="processing", 

2504 task_type="indexing", 

2505 created_at=datetime.now(UTC), 

2506 started_at=datetime.now(UTC), 

2507 progress_current=0, 

2508 progress_total=0, 

2509 progress_message="Starting indexing...", 

2510 metadata_json={ 

2511 "collection_id": collection_id, 

2512 "force_reindex": force_reindex, 

2513 }, 

2514 ) 

2515 db_session.add(task) 

2516 db_session.commit() 

2517 

2518 # Start background thread 

2519 thread = threading.Thread( 

2520 target=_background_index_worker, 

2521 args=(task_id, collection_id, username, db_password, force_reindex), 

2522 daemon=True, 

2523 ) 

2524 thread.start() 

2525 

2526 logger.info( 

2527 f"Started background indexing task {task_id} for collection {collection_id}" 

2528 ) 

2529 

2530 return jsonify( 

2531 { 

2532 "success": True, 

2533 "task_id": task_id, 

2534 "message": "Indexing started in background", 

2535 } 

2536 ) 

2537 

2538 except Exception: 

2539 logger.exception("Failed to start background indexing") 

2540 return jsonify( 

2541 { 

2542 "success": False, 

2543 "error": "Failed to start indexing. Please try again.", 

2544 } 

2545 ), 500 

2546 

2547 

2548@rag_bp.route( 

2549 "/api/collections/<string:collection_id>/index/status", methods=["GET"] 

2550) 

2551@limiter.exempt 

2552@login_required 

2553def get_index_status(collection_id): 

2554 """Get the current indexing status for a collection.""" 

2555 from ...database.session_context import get_user_db_session 

2556 from ...database.session_passwords import session_password_store 

2557 

2558 username = session["username"] 

2559 session_id = session.get("session_id") 

2560 

2561 db_password = None 

2562 if session_id: 

2563 db_password = session_password_store.get_session_password( 

2564 username, session_id 

2565 ) 

2566 

2567 try: 

2568 with get_user_db_session(username, db_password) as db_session: 

2569 # Find the most recent indexing task for this collection 

2570 task = ( 

2571 db_session.query(TaskMetadata) 

2572 .filter(TaskMetadata.task_type == "indexing") 

2573 .order_by(TaskMetadata.created_at.desc()) 

2574 .first() 

2575 ) 

2576 

2577 if not task: 

2578 return jsonify( 

2579 { 

2580 "status": "idle", 

2581 "message": "No indexing task found", 

2582 } 

2583 ) 

2584 

2585 # Check if it's for this collection 

2586 metadata = task.metadata_json or {} 

2587 if metadata.get("collection_id") != collection_id: 

2588 return jsonify( 

2589 { 

2590 "status": "idle", 

2591 "message": "No indexing task for this collection", 

2592 } 

2593 ) 

2594 

2595 return jsonify( 

2596 { 

2597 "task_id": task.task_id, 

2598 "status": task.status, 

2599 "progress_current": task.progress_current or 0, 

2600 "progress_total": task.progress_total or 0, 

2601 "progress_message": task.progress_message, 

2602 "error_message": task.error_message, 

2603 "created_at": task.created_at.isoformat() 

2604 if task.created_at 

2605 else None, 

2606 "completed_at": task.completed_at.isoformat() 

2607 if task.completed_at 

2608 else None, 

2609 } 

2610 ) 

2611 

2612 except Exception: 

2613 logger.exception("Failed to get index status") 

2614 return jsonify( 

2615 { 

2616 "status": "error", 

2617 "error": "Failed to get indexing status. Please try again.", 

2618 } 

2619 ), 500 

2620 

2621 

2622@rag_bp.route( 

2623 "/api/collections/<string:collection_id>/index/cancel", methods=["POST"] 

2624) 

2625@login_required 

2626def cancel_indexing(collection_id): 

2627 """Cancel an active indexing task for a collection.""" 

2628 from ...database.session_context import get_user_db_session 

2629 from ...database.session_passwords import session_password_store 

2630 

2631 username = session["username"] 

2632 session_id = session.get("session_id") 

2633 

2634 db_password = None 

2635 if session_id: 

2636 db_password = session_password_store.get_session_password( 

2637 username, session_id 

2638 ) 

2639 

2640 try: 

2641 with get_user_db_session(username, db_password) as db_session: 

2642 # Find active indexing task for this collection 

2643 task = ( 

2644 db_session.query(TaskMetadata) 

2645 .filter( 

2646 TaskMetadata.task_type == "indexing", 

2647 TaskMetadata.status == "processing", 

2648 ) 

2649 .first() 

2650 ) 

2651 

2652 if not task: 

2653 return jsonify( 

2654 { 

2655 "success": False, 

2656 "error": "No active indexing task found", 

2657 } 

2658 ), 404 

2659 

2660 # Check if it's for this collection 

2661 metadata = task.metadata_json or {} 

2662 if metadata.get("collection_id") != collection_id: 

2663 return jsonify( 

2664 { 

2665 "success": False, 

2666 "error": "No active indexing task for this collection", 

2667 } 

2668 ), 404 

2669 

2670 # Mark as cancelled - the worker thread will check this 

2671 task.status = "cancelled" 

2672 task.progress_message = "Cancellation requested..." 

2673 db_session.commit() 

2674 

2675 logger.info( 

2676 f"Cancelled indexing task {task.task_id} for collection {collection_id}" 

2677 ) 

2678 

2679 return jsonify( 

2680 { 

2681 "success": True, 

2682 "message": "Cancellation requested", 

2683 "task_id": task.task_id, 

2684 } 

2685 ) 

2686 

2687 except Exception: 

2688 logger.exception("Failed to cancel indexing") 

2689 return jsonify( 

2690 { 

2691 "success": False, 

2692 "error": "Failed to cancel indexing. Please try again.", 

2693 } 

2694 ), 500