Coverage for src / local_deep_research / research_library / search / routes / search_routes.py: 92%

173 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Semantic Search Routes 

3 

4Provides endpoints for: 

5- Research history collection management and indexing 

6- Semantic search across any library collection 

7""" 

8 

9from flask import ( 

10 Blueprint, 

11 jsonify, 

12 request, 

13 session, 

14) 

15from loguru import logger 

16 

17from ....database.models.library import Collection, Document 

18from ....security.decorators import require_json_body 

19from ....web.auth.decorators import login_required 

20from ...utils import handle_api_error 

21 

22search_bp = Blueprint("search", __name__, url_prefix="/library") 

23 

24# ============================================================================= 

25# Research History Collection & Indexing 

26# ============================================================================= 

27 

28 

29@search_bp.route("/api/research-history/collection", methods=["GET"]) 

30@login_required 

31def get_research_history_collection(): 

32 """ 

33 Get the Research History collection info and indexing status. 

34 

35 Returns collection ID and statistics about indexed vs total research. 

36 Counts are derived from DocumentCollection membership (matching the 

37 collection page) rather than source_type_id filtering. 

38 """ 

39 from ....constants import ResearchStatus 

40 from ....database.models.library import DocumentCollection 

41 from ....database.models.research import ResearchHistory 

42 from ....database.session_context import get_user_db_session 

43 from ....database.session_passwords import session_password_store 

44 from ..services.research_history_indexer import ResearchHistoryIndexer 

45 

46 username = session["username"] 

47 session_id = session.get("session_id") 

48 

49 db_password = None 

50 if session_id: 50 ↛ 57line 50 didn't jump to line 57 because the condition on line 50 was always true

51 db_password = ( 

52 session_password_store.get_session_password( # gitleaks:allow 

53 username, session_id 

54 ) 

55 ) 

56 

57 try: 

58 indexer = ResearchHistoryIndexer(username, db_password) 

59 collection_id = indexer.get_or_create_collection() 

60 

61 # Auto-convert any unconverted research entries to Documents. 

62 # This is a lightweight DB-only operation (no FAISS) that ensures 

63 # the collection page and history page always show consistent counts. 

64 try: 

65 indexer.convert_all_research(force=False) 

66 except Exception: 

67 logger.warning("Auto-conversion of research entries failed") 

68 

69 with get_user_db_session(username, db_password) as db_session: 

70 # Total completed research with report content 

71 total_research = ( 

72 db_session.query(ResearchHistory) 

73 .filter(ResearchHistory.status == ResearchStatus.COMPLETED) 

74 .filter(ResearchHistory.report_content.isnot(None)) 

75 .filter(ResearchHistory.report_content != "") 

76 .count() 

77 ) 

78 

79 # Research entries represented in this collection 

80 # (via Document → DocumentCollection join, matching collection page) 

81 indexed_research = ( 

82 db_session.query(Document.research_id) 

83 .join( 

84 DocumentCollection, 

85 DocumentCollection.document_id == Document.id, 

86 ) 

87 .filter(DocumentCollection.collection_id == collection_id) 

88 .filter(Document.research_id.isnot(None)) 

89 .distinct() 

90 .count() 

91 ) 

92 

93 # Document counts in collection 

94 total_documents = ( 

95 db_session.query(DocumentCollection) 

96 .filter(DocumentCollection.collection_id == collection_id) 

97 .count() 

98 ) 

99 indexed_documents = ( 

100 db_session.query(DocumentCollection) 

101 .filter(DocumentCollection.collection_id == collection_id) 

102 .filter(DocumentCollection.indexed == True) # noqa: E712 

103 .count() 

104 ) 

105 

106 return jsonify( 

107 { 

108 "success": True, 

109 "collection_id": collection_id, 

110 "total_research": total_research, 

111 "indexed_research": indexed_research, 

112 "total_documents": total_documents, 

113 "indexed_documents": indexed_documents, 

114 } 

115 ) 

116 

117 except Exception as e: 

118 return handle_api_error("getting research history collection", e) 

119 

120 

121@search_bp.route("/api/research-history/convert-all", methods=["POST"]) 

122@login_required 

123def convert_all_research(): 

124 """ 

125 Convert all completed research entries into library Documents. 

126 

127 Unlike the SSE index endpoint this is a synchronous JSON endpoint that 

128 creates Document rows (and DocumentCollection memberships) without 

129 triggering FAISS / RAG indexing. Call this before the SSE index endpoint 

130 to avoid nested-session problems on SQLite. 

131 

132 Request JSON (optional): 

133 force: If true, re-convert even already-converted entries (default false) 

134 

135 Returns: 

136 JSON with converted, skipped, failed counts and collection_id 

137 """ 

138 from ....database.session_passwords import session_password_store 

139 from ..services.research_history_indexer import ResearchHistoryIndexer 

140 

141 username = session["username"] 

142 session_id = session.get("session_id") 

143 

144 db_password = None 

145 if session_id: 145 ↛ 152line 145 didn't jump to line 152 because the condition on line 145 was always true

146 db_password = ( 

147 session_password_store.get_session_password( # gitleaks:allow 

148 username, session_id 

149 ) 

150 ) 

151 

152 data = request.get_json() or {} 

153 force = data.get("force", False) 

154 

155 try: 

156 indexer = ResearchHistoryIndexer(username, db_password) 

157 result = indexer.convert_all_research(force=force) 

158 return jsonify({"success": True, **result}) 

159 

160 except Exception as e: 

161 return handle_api_error("converting all research", e) 

162 

163 

164@search_bp.route( 

165 "/api/research/<string:research_id>/add-to-collection", methods=["POST"] 

166) 

167@login_required 

168@require_json_body(error_format="success") 

169def add_research_to_collection(research_id): 

170 """ 

171 Add a research entry to a specific collection. 

172 

173 This allows users to organize research into custom collections 

174 in addition to the default Research History collection. 

175 

176 Args: 

177 research_id: UUID of the research to add 

178 

179 Request JSON: 

180 collection_id: UUID of the target collection (required) 

181 """ 

182 from ....database.session_context import get_user_db_session 

183 from ....database.session_passwords import session_password_store 

184 from ..services.research_history_indexer import ResearchHistoryIndexer 

185 

186 username = session["username"] 

187 session_id = session.get("session_id") 

188 

189 db_password = None 

190 if session_id: 190 ↛ 197line 190 didn't jump to line 197 because the condition on line 190 was always true

191 db_password = ( 

192 session_password_store.get_session_password( # gitleaks:allow 

193 username, session_id 

194 ) 

195 ) 

196 

197 data = request.get_json() 

198 collection_id = data.get("collection_id") 

199 

200 if not collection_id: 

201 return jsonify( 

202 { 

203 "success": False, 

204 "error": "collection_id is required", 

205 } 

206 ), 400 

207 

208 try: 

209 # Verify collection exists 

210 with get_user_db_session(username, db_password) as db_session: 

211 collection = ( 

212 db_session.query(Collection) 

213 .filter(Collection.id == collection_id) 

214 .first() 

215 ) 

216 if not collection: 

217 return jsonify( 

218 { 

219 "success": False, 

220 "error": "Collection not found", 

221 } 

222 ), 404 

223 

224 collection_name = collection.name 

225 

226 indexer = ResearchHistoryIndexer(username, db_password) 

227 result = indexer.index_research( 

228 research_id, 

229 collection_id=collection_id, 

230 ) 

231 

232 if result["status"] == "error": 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true

233 return jsonify( 

234 { 

235 "success": False, 

236 "error": result.get("error", "Operation failed."), 

237 } 

238 ), 400 

239 

240 result["collection_name"] = collection_name 

241 return jsonify({"success": True, **result}) 

242 

243 except Exception as e: 

244 return handle_api_error("adding research to collection", e) 

245 

246 

247# ============================================================================= 

248# Collection Search (generic — works for any collection type) 

249# ============================================================================= 

250 

251 

252@search_bp.route( 

253 "/api/collections/<string:collection_id>/search", methods=["POST"] 

254) 

255@login_required 

256@require_json_body(error_format="success") 

257def search_collection(collection_id): 

258 """Search any collection using semantic similarity. 

259 

260 Delegates to CollectionSearchEngine instead of reimplementing FAISS search. 

261 

262 Request JSON: 

263 query: Search query string 

264 limit: Maximum number of results (default 10) 

265 """ 

266 from ....database.session_context import get_user_db_session 

267 from ....database.session_passwords import session_password_store 

268 from ....web_search_engines.engines.search_engine_collection import ( 

269 CollectionSearchEngine, 

270 ) 

271 

272 username = session["username"] 

273 session_id = session.get("session_id") 

274 

275 db_password = None 

276 if session_id: 276 ↛ 283line 276 didn't jump to line 283 because the condition on line 276 was always true

277 db_password = ( 

278 session_password_store.get_session_password( # gitleaks:allow 

279 username, session_id 

280 ) 

281 ) 

282 

283 data = request.get_json() 

284 query = data.get("query", "").strip() 

285 

286 if len(query) > 10000: 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true

287 return jsonify( 

288 {"success": False, "error": "Query too long (max 10000 characters)"} 

289 ), 400 

290 

291 try: 

292 limit = max(1, min(int(data.get("limit", 10)), 50)) 

293 except (TypeError, ValueError): 

294 limit = 10 

295 

296 if not query: 

297 return jsonify({"success": False, "error": "Query is required"}), 400 

298 

299 try: 

300 # Verify collection exists and get its type 

301 with get_user_db_session(username, db_password) as db_session: 

302 collection = ( 

303 db_session.query(Collection).filter_by(id=collection_id).first() 

304 ) 

305 if not collection: 

306 return jsonify( 

307 {"success": False, "error": "Collection not found"} 

308 ), 404 

309 collection_type = collection.collection_type 

310 collection_name = collection.name 

311 

312 # Delegate to CollectionSearchEngine 

313 engine = CollectionSearchEngine( 

314 collection_id=collection_id, 

315 collection_name=collection_name, 

316 max_results=limit * 2, 

317 settings_snapshot={"_username": username}, 

318 ) 

319 raw_results = engine.search(query, limit=limit * 2) 

320 

321 # Transform CollectionSearchEngine format -> API format 

322 results = [] 

323 for r in raw_results: 

324 meta = r.get("metadata", {}) 

325 results.append( 

326 { 

327 "document_id": meta.get("document_id") 

328 or meta.get("source_id"), 

329 "title": r.get("title", "Untitled"), 

330 "snippet": r.get("snippet", ""), 

331 "similarity": round(r.get("relevance_score", 0) * 100, 1), 

332 "url": meta.get("source"), 

333 } 

334 ) 

335 if len(results) >= limit: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true

336 break 

337 

338 # For research_history collections, enrich with report/source type 

339 if collection_type == "research_history": 

340 _enrich_with_research_metadata(results, username, db_password) 

341 

342 # Always enrich with document-level metadata (file type, domain) 

343 _enrich_with_document_metadata(results, username, db_password) 

344 

345 return jsonify({"success": True, "results": results, "query": query}) 

346 

347 except Exception as e: 

348 return handle_api_error("searching collection", e) 

349 

350 

351def _enrich_with_research_metadata(results, username, db_password): 

352 """Add report/source type and research context to search results.""" 

353 from ....database.models.library import SourceType 

354 from ....database.models.research import ResearchHistory 

355 from ....database.session_context import get_user_db_session 

356 

357 doc_ids = [r["document_id"] for r in results if r.get("document_id")] 

358 if not doc_ids: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true

359 return 

360 

361 with get_user_db_session(username, db_password) as db_session: 

362 rows = ( 

363 db_session.query( 

364 Document.id.label("document_id"), 

365 SourceType.name.label("source_type_name"), 

366 ResearchHistory.title.label("research_title"), 

367 ResearchHistory.query.label("research_query"), 

368 ResearchHistory.created_at.label("research_created_at"), 

369 Document.research_id, 

370 ) 

371 .outerjoin(SourceType, Document.source_type_id == SourceType.id) 

372 .outerjoin( 

373 ResearchHistory, 

374 Document.research_id == ResearchHistory.id, 

375 ) 

376 .filter(Document.id.in_(doc_ids)) 

377 .all() 

378 ) 

379 lookup = {row.document_id: row for row in rows} 

380 

381 for result in results: 

382 row = lookup.get(result.get("document_id")) 

383 if row: 

384 result["type"] = ( 

385 "report" 

386 if row.source_type_name == "research_report" 

387 else "source" 

388 ) 

389 result["research_id"] = row.research_id 

390 result["research_title"] = row.research_title or ( 

391 row.research_query[:100] if row.research_query else "" 

392 ) 

393 result["research_query"] = row.research_query 

394 result["research_created_at"] = ( 

395 row.research_created_at 

396 if isinstance(row.research_created_at, str) 

397 else row.research_created_at.isoformat() 

398 if row.research_created_at 

399 else None 

400 ) 

401 else: 

402 result["type"] = "source" 

403 result["research_id"] = None 

404 result["research_title"] = "" 

405 result["research_query"] = None 

406 result["research_created_at"] = None 

407 

408 

409def _enrich_with_document_metadata(results, username, db_password): 

410 """Add file type, domain, and creation date to search results.""" 

411 from urllib.parse import urlparse 

412 

413 from ....database.session_context import get_user_db_session 

414 

415 doc_ids = [r["document_id"] for r in results if r.get("document_id")] 

416 if not doc_ids: 

417 return 

418 

419 with get_user_db_session(username, db_password) as db_session: 

420 rows = ( 

421 db_session.query( 

422 Document.id.label("document_id"), 

423 Document.file_type, 

424 Document.original_url, 

425 Document.created_at, 

426 ) 

427 .filter(Document.id.in_(doc_ids)) 

428 .all() 

429 ) 

430 lookup = {row.document_id: row for row in rows} 

431 

432 for result in results: 

433 row = lookup.get(result.get("document_id")) 

434 if row: 

435 result["file_type"] = row.file_type 

436 result["created_at"] = ( 

437 row.created_at 

438 if isinstance(row.created_at, str) 

439 else row.created_at.isoformat() 

440 if row.created_at 

441 else None 

442 ) 

443 if row.original_url: 

444 try: 

445 result["domain"] = urlparse(row.original_url).netloc 

446 except (ValueError, AttributeError): 

447 result["domain"] = "unknown" 

448 else: 

449 result["domain"] = None 

450 else: 

451 result.setdefault("file_type", "unknown") 

452 result.setdefault("domain", None) 

453 result.setdefault("created_at", None)