Coverage for src/local_deep_research/research_library/search/routes/search_routes.py: 91%

168 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Semantic Search Routes 

3 

4Provides endpoints for: 

5- Research history collection management and indexing 

6- Semantic search across any library collection 

7""" 

8 

9from flask import ( 

10 Blueprint, 

11 jsonify, 

12 request, 

13 session, 

14) 

15 

16from ....database.models.library import Collection, Document 

17from ....security.decorators import require_json_body 

18from ....web.auth.decorators import login_required 

19from ...utils import handle_api_error 

20 

21search_bp = Blueprint("search", __name__, url_prefix="/library") 

22 

23# ============================================================================= 

24# Research History Collection & Indexing 

25# ============================================================================= 

26 

27 

28@search_bp.route("/api/research-history/collection", methods=["GET"]) 

29@login_required 

30def get_research_history_collection(): 

31 """ 

32 Get the Research History collection info and indexing status. 

33 

34 Returns collection ID and statistics about indexed vs total research. 

35 Counts are derived from DocumentCollection membership (matching the 

36 collection page) rather than source_type_id filtering. 

37 """ 

38 from ....constants import ResearchStatus 

39 from ....database.models.library import DocumentCollection 

40 from ....database.models.research import ResearchHistory 

41 from ....database.session_context import get_user_db_session 

42 from ....database.session_passwords import session_password_store 

43 from ..services.research_history_indexer import ResearchHistoryIndexer 

44 

45 username = session["username"] 

46 session_id = session.get("session_id") 

47 

48 db_password = None 

49 if session_id: 49 ↛ 56line 49 didn't jump to line 56 because the condition on line 49 was always true

50 db_password = ( 

51 session_password_store.get_session_password( # gitleaks:allow 

52 username, session_id 

53 ) 

54 ) 

55 

56 try: 

57 indexer = ResearchHistoryIndexer(username, db_password) 

58 collection_id = indexer.get_or_create_collection() 

59 

60 with get_user_db_session(username, db_password) as db_session: 

61 # Total completed research with report content 

62 total_research = ( 

63 db_session.query(ResearchHistory) 

64 .filter(ResearchHistory.status == ResearchStatus.COMPLETED) 

65 .filter(ResearchHistory.report_content.isnot(None)) 

66 .filter(ResearchHistory.report_content != "") 

67 .count() 

68 ) 

69 

70 # Research entries represented in this collection 

71 # (via Document → DocumentCollection join, matching collection page) 

72 indexed_research = ( 

73 db_session.query(Document.research_id) 

74 .join( 

75 DocumentCollection, 

76 DocumentCollection.document_id == Document.id, 

77 ) 

78 .filter(DocumentCollection.collection_id == collection_id) 

79 .filter(Document.research_id.isnot(None)) 

80 .distinct() 

81 .count() 

82 ) 

83 

84 # Document counts in collection 

85 total_documents = ( 

86 db_session.query(DocumentCollection) 

87 .filter(DocumentCollection.collection_id == collection_id) 

88 .count() 

89 ) 

90 indexed_documents = ( 

91 db_session.query(DocumentCollection) 

92 .filter(DocumentCollection.collection_id == collection_id) 

93 .filter(DocumentCollection.indexed == True) # noqa: E712 

94 .count() 

95 ) 

96 

97 return jsonify( 

98 { 

99 "success": True, 

100 "collection_id": collection_id, 

101 "total_research": total_research, 

102 "indexed_research": indexed_research, 

103 "total_documents": total_documents, 

104 "indexed_documents": indexed_documents, 

105 } 

106 ) 

107 

108 except Exception as e: 

109 return handle_api_error("getting research history collection", e) 

110 

111 

112@search_bp.route("/api/research-history/convert-all", methods=["POST"]) 

113@login_required 

114def convert_all_research(): 

115 """ 

116 Convert all completed research entries into library Documents. 

117 

118 Unlike the SSE index endpoint this is a synchronous JSON endpoint that 

119 creates Document rows (and DocumentCollection memberships) without 

120 triggering FAISS / RAG indexing. Call this before the SSE index endpoint 

121 to avoid nested-session problems on SQLite. 

122 

123 Request JSON (optional): 

124 force: If true, re-convert even already-converted entries (default false) 

125 

126 Returns: 

127 JSON with converted, skipped, failed counts and collection_id 

128 """ 

129 from ....database.session_passwords import session_password_store 

130 from ..services.research_history_indexer import ResearchHistoryIndexer 

131 

132 username = session["username"] 

133 session_id = session.get("session_id") 

134 

135 db_password = None 

136 if session_id: 136 ↛ 143line 136 didn't jump to line 143 because the condition on line 136 was always true

137 db_password = ( 

138 session_password_store.get_session_password( # gitleaks:allow 

139 username, session_id 

140 ) 

141 ) 

142 

143 data = request.get_json() or {} 

144 force = data.get("force", False) 

145 

146 try: 

147 indexer = ResearchHistoryIndexer(username, db_password) 

148 result = indexer.convert_all_research(force=force) 

149 return jsonify({"success": True, **result}) 

150 

151 except Exception as e: 

152 return handle_api_error("converting all research", e) 

153 

154 

155@search_bp.route( 

156 "/api/research/<string:research_id>/add-to-collection", methods=["POST"] 

157) 

158@login_required 

159@require_json_body(error_format="success") 

160def add_research_to_collection(research_id): 

161 """ 

162 Add a research entry to a specific collection. 

163 

164 This allows users to organize research into custom collections 

165 in addition to the default Research History collection. 

166 

167 Args: 

168 research_id: UUID of the research to add 

169 

170 Request JSON: 

171 collection_id: UUID of the target collection (required) 

172 """ 

173 from ....database.session_context import get_user_db_session 

174 from ....database.session_passwords import session_password_store 

175 from ..services.research_history_indexer import ResearchHistoryIndexer 

176 

177 username = session["username"] 

178 session_id = session.get("session_id") 

179 

180 db_password = None 

181 if session_id: 181 ↛ 188line 181 didn't jump to line 188 because the condition on line 181 was always true

182 db_password = ( 

183 session_password_store.get_session_password( # gitleaks:allow 

184 username, session_id 

185 ) 

186 ) 

187 

188 data = request.get_json() 

189 collection_id = data.get("collection_id") 

190 

191 if not collection_id: 

192 return jsonify( 

193 { 

194 "success": False, 

195 "error": "collection_id is required", 

196 } 

197 ), 400 

198 

199 try: 

200 # Verify collection exists 

201 with get_user_db_session(username, db_password) as db_session: 

202 collection = ( 

203 db_session.query(Collection) 

204 .filter(Collection.id == collection_id) 

205 .first() 

206 ) 

207 if not collection: 

208 return jsonify( 

209 { 

210 "success": False, 

211 "error": "Collection not found", 

212 } 

213 ), 404 

214 

215 collection_name = collection.name 

216 

217 indexer = ResearchHistoryIndexer(username, db_password) 

218 result = indexer.index_research( 

219 research_id, 

220 collection_id=collection_id, 

221 ) 

222 

223 if result["status"] == "error": 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 return jsonify( 

225 { 

226 "success": False, 

227 "error": result.get("error", "Operation failed."), 

228 } 

229 ), 400 

230 

231 result["collection_name"] = collection_name 

232 return jsonify({"success": True, **result}) 

233 

234 except Exception as e: 

235 return handle_api_error("adding research to collection", e) 

236 

237 

238# ============================================================================= 

239# Collection Search (generic — works for any collection type) 

240# ============================================================================= 

241 

242 

243@search_bp.route( 

244 "/api/collections/<string:collection_id>/search", methods=["POST"] 

245) 

246@login_required 

247@require_json_body(error_format="success") 

248def search_collection(collection_id): 

249 """Search any collection using semantic similarity. 

250 

251 Delegates to CollectionSearchEngine instead of reimplementing FAISS search. 

252 

253 Request JSON: 

254 query: Search query string 

255 limit: Maximum number of results (default 10) 

256 """ 

257 from ....database.session_context import get_user_db_session 

258 from ....database.session_passwords import session_password_store 

259 from ....web_search_engines.engines.search_engine_collection import ( 

260 CollectionSearchEngine, 

261 ) 

262 

263 username = session["username"] 

264 session_id = session.get("session_id") 

265 

266 db_password = None 

267 if session_id: 267 ↛ 274line 267 didn't jump to line 274 because the condition on line 267 was always true

268 db_password = ( 

269 session_password_store.get_session_password( # gitleaks:allow 

270 username, session_id 

271 ) 

272 ) 

273 

274 data = request.get_json() 

275 query = data.get("query", "").strip() 

276 

277 if len(query) > 10000: 277 ↛ 278line 277 didn't jump to line 278 because the condition on line 277 was never true

278 return jsonify( 

279 {"success": False, "error": "Query too long (max 10000 characters)"} 

280 ), 400 

281 

282 try: 

283 limit = max(1, min(int(data.get("limit", 10)), 50)) 

284 except (TypeError, ValueError): 

285 limit = 10 

286 

287 if not query: 

288 return jsonify({"success": False, "error": "Query is required"}), 400 

289 

290 try: 

291 # Verify collection exists and get its type 

292 with get_user_db_session(username, db_password) as db_session: 

293 collection = ( 

294 db_session.query(Collection).filter_by(id=collection_id).first() 

295 ) 

296 if not collection: 

297 return jsonify( 

298 {"success": False, "error": "Collection not found"} 

299 ), 404 

300 collection_type = collection.collection_type 

301 collection_name = collection.name 

302 

303 # Delegate to CollectionSearchEngine 

304 engine = CollectionSearchEngine( 

305 collection_id=collection_id, 

306 collection_name=collection_name, 

307 max_results=limit * 2, 

308 settings_snapshot={"_username": username}, 

309 ) 

310 raw_results = engine.search(query, limit=limit * 2) 

311 

312 # Transform CollectionSearchEngine format -> API format 

313 results = [] 

314 for r in raw_results: 

315 meta = r.get("metadata", {}) 

316 results.append( 

317 { 

318 "document_id": meta.get("document_id") 

319 or meta.get("source_id"), 

320 "title": r.get("title", "Untitled"), 

321 "snippet": r.get("snippet", ""), 

322 "similarity": round(r.get("relevance_score", 0) * 100, 1), 

323 "url": meta.get("source"), 

324 } 

325 ) 

326 if len(results) >= limit: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true

327 break 

328 

329 # For research_history collections, enrich with report/source type 

330 if collection_type == "research_history": 

331 _enrich_with_research_metadata(results, username, db_password) 

332 

333 # Always enrich with document-level metadata (file type, domain) 

334 _enrich_with_document_metadata(results, username, db_password) 

335 

336 return jsonify({"success": True, "results": results, "query": query}) 

337 

338 except Exception as e: 

339 return handle_api_error("searching collection", e) 

340 

341 

342def _enrich_with_research_metadata(results, username, db_password): 

343 """Add report/source type and research context to search results.""" 

344 from ....database.models.library import SourceType 

345 from ....database.models.research import ResearchHistory 

346 from ....database.session_context import get_user_db_session 

347 

348 doc_ids = [r["document_id"] for r in results if r.get("document_id")] 

349 if not doc_ids: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true

350 return 

351 

352 with get_user_db_session(username, db_password) as db_session: 

353 rows = ( 

354 db_session.query( 

355 Document.id.label("document_id"), 

356 SourceType.name.label("source_type_name"), 

357 ResearchHistory.title.label("research_title"), 

358 ResearchHistory.query.label("research_query"), 

359 ResearchHistory.created_at.label("research_created_at"), 

360 Document.research_id, 

361 ) 

362 .outerjoin(SourceType, Document.source_type_id == SourceType.id) 

363 .outerjoin( 

364 ResearchHistory, 

365 Document.research_id == ResearchHistory.id, 

366 ) 

367 .filter(Document.id.in_(doc_ids)) 

368 .all() 

369 ) 

370 lookup = {row.document_id: row for row in rows} 

371 

372 for result in results: 

373 row = lookup.get(result.get("document_id")) 

374 if row: 

375 result["type"] = ( 

376 "report" 

377 if row.source_type_name == "research_report" 

378 else "source" 

379 ) 

380 result["research_id"] = row.research_id 

381 result["research_title"] = row.research_title or ( 

382 row.research_query[:100] if row.research_query else "" 

383 ) 

384 result["research_query"] = row.research_query 

385 result["research_created_at"] = ( 

386 row.research_created_at 

387 if isinstance(row.research_created_at, str) 

388 else row.research_created_at.isoformat() 

389 if row.research_created_at 

390 else None 

391 ) 

392 else: 

393 result["type"] = "source" 

394 result["research_id"] = None 

395 result["research_title"] = "" 

396 result["research_query"] = None 

397 result["research_created_at"] = None 

398 

399 

400def _enrich_with_document_metadata(results, username, db_password): 

401 """Add file type, domain, and creation date to search results.""" 

402 from urllib.parse import urlparse 

403 

404 from ....database.session_context import get_user_db_session 

405 

406 doc_ids = [r["document_id"] for r in results if r.get("document_id")] 

407 if not doc_ids: 

408 return 

409 

410 with get_user_db_session(username, db_password) as db_session: 

411 rows = ( 

412 db_session.query( 

413 Document.id.label("document_id"), 

414 Document.file_type, 

415 Document.original_url, 

416 Document.created_at, 

417 ) 

418 .filter(Document.id.in_(doc_ids)) 

419 .all() 

420 ) 

421 lookup = {row.document_id: row for row in rows} 

422 

423 for result in results: 

424 row = lookup.get(result.get("document_id")) 

425 if row: 

426 result["file_type"] = row.file_type 

427 result["created_at"] = ( 

428 row.created_at 

429 if isinstance(row.created_at, str) 

430 else row.created_at.isoformat() 

431 if row.created_at 

432 else None 

433 ) 

434 if row.original_url: 

435 try: 

436 result["domain"] = urlparse(row.original_url).netloc 

437 except (ValueError, AttributeError): 

438 result["domain"] = "unknown" 

439 else: 

440 result["domain"] = None 

441 else: 

442 result.setdefault("file_type", "unknown") 

443 result.setdefault("domain", None) 

444 result.setdefault("created_at", None)