Coverage for src/local_deep_research/research_library/search/routes/search_routes.py: 91%
168 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Semantic Search Routes
4Provides endpoints for:
5- Research history collection management and indexing
6- Semantic search across any library collection
7"""
9from flask import (
10 Blueprint,
11 jsonify,
12 request,
13 session,
14)
16from ....database.models.library import Collection, Document
17from ....security.decorators import require_json_body
18from ....web.auth.decorators import login_required
19from ...utils import handle_api_error
21search_bp = Blueprint("search", __name__, url_prefix="/library")
23# =============================================================================
24# Research History Collection & Indexing
25# =============================================================================
28@search_bp.route("/api/research-history/collection", methods=["GET"])
29@login_required
30def get_research_history_collection():
31 """
32 Get the Research History collection info and indexing status.
34 Returns collection ID and statistics about indexed vs total research.
35 Counts are derived from DocumentCollection membership (matching the
36 collection page) rather than source_type_id filtering.
37 """
38 from ....constants import ResearchStatus
39 from ....database.models.library import DocumentCollection
40 from ....database.models.research import ResearchHistory
41 from ....database.session_context import get_user_db_session
42 from ....database.session_passwords import session_password_store
43 from ..services.research_history_indexer import ResearchHistoryIndexer
45 username = session["username"]
46 session_id = session.get("session_id")
48 db_password = None
49 if session_id: 49 ↛ 56line 49 didn't jump to line 56 because the condition on line 49 was always true
50 db_password = (
51 session_password_store.get_session_password( # gitleaks:allow
52 username, session_id
53 )
54 )
56 try:
57 indexer = ResearchHistoryIndexer(username, db_password)
58 collection_id = indexer.get_or_create_collection()
60 with get_user_db_session(username, db_password) as db_session:
61 # Total completed research with report content
62 total_research = (
63 db_session.query(ResearchHistory)
64 .filter(ResearchHistory.status == ResearchStatus.COMPLETED)
65 .filter(ResearchHistory.report_content.isnot(None))
66 .filter(ResearchHistory.report_content != "")
67 .count()
68 )
70 # Research entries represented in this collection
71 # (via Document → DocumentCollection join, matching collection page)
72 indexed_research = (
73 db_session.query(Document.research_id)
74 .join(
75 DocumentCollection,
76 DocumentCollection.document_id == Document.id,
77 )
78 .filter(DocumentCollection.collection_id == collection_id)
79 .filter(Document.research_id.isnot(None))
80 .distinct()
81 .count()
82 )
84 # Document counts in collection
85 total_documents = (
86 db_session.query(DocumentCollection)
87 .filter(DocumentCollection.collection_id == collection_id)
88 .count()
89 )
90 indexed_documents = (
91 db_session.query(DocumentCollection)
92 .filter(DocumentCollection.collection_id == collection_id)
93 .filter(DocumentCollection.indexed == True) # noqa: E712
94 .count()
95 )
97 return jsonify(
98 {
99 "success": True,
100 "collection_id": collection_id,
101 "total_research": total_research,
102 "indexed_research": indexed_research,
103 "total_documents": total_documents,
104 "indexed_documents": indexed_documents,
105 }
106 )
108 except Exception as e:
109 return handle_api_error("getting research history collection", e)
112@search_bp.route("/api/research-history/convert-all", methods=["POST"])
113@login_required
114def convert_all_research():
115 """
116 Convert all completed research entries into library Documents.
118 Unlike the SSE index endpoint this is a synchronous JSON endpoint that
119 creates Document rows (and DocumentCollection memberships) without
120 triggering FAISS / RAG indexing. Call this before the SSE index endpoint
121 to avoid nested-session problems on SQLite.
123 Request JSON (optional):
124 force: If true, re-convert even already-converted entries (default false)
126 Returns:
127 JSON with converted, skipped, failed counts and collection_id
128 """
129 from ....database.session_passwords import session_password_store
130 from ..services.research_history_indexer import ResearchHistoryIndexer
132 username = session["username"]
133 session_id = session.get("session_id")
135 db_password = None
136 if session_id: 136 ↛ 143line 136 didn't jump to line 143 because the condition on line 136 was always true
137 db_password = (
138 session_password_store.get_session_password( # gitleaks:allow
139 username, session_id
140 )
141 )
143 data = request.get_json() or {}
144 force = data.get("force", False)
146 try:
147 indexer = ResearchHistoryIndexer(username, db_password)
148 result = indexer.convert_all_research(force=force)
149 return jsonify({"success": True, **result})
151 except Exception as e:
152 return handle_api_error("converting all research", e)
155@search_bp.route(
156 "/api/research/<string:research_id>/add-to-collection", methods=["POST"]
157)
158@login_required
159@require_json_body(error_format="success")
160def add_research_to_collection(research_id):
161 """
162 Add a research entry to a specific collection.
164 This allows users to organize research into custom collections
165 in addition to the default Research History collection.
167 Args:
168 research_id: UUID of the research to add
170 Request JSON:
171 collection_id: UUID of the target collection (required)
172 """
173 from ....database.session_context import get_user_db_session
174 from ....database.session_passwords import session_password_store
175 from ..services.research_history_indexer import ResearchHistoryIndexer
177 username = session["username"]
178 session_id = session.get("session_id")
180 db_password = None
181 if session_id: 181 ↛ 188line 181 didn't jump to line 188 because the condition on line 181 was always true
182 db_password = (
183 session_password_store.get_session_password( # gitleaks:allow
184 username, session_id
185 )
186 )
188 data = request.get_json()
189 collection_id = data.get("collection_id")
191 if not collection_id:
192 return jsonify(
193 {
194 "success": False,
195 "error": "collection_id is required",
196 }
197 ), 400
199 try:
200 # Verify collection exists
201 with get_user_db_session(username, db_password) as db_session:
202 collection = (
203 db_session.query(Collection)
204 .filter(Collection.id == collection_id)
205 .first()
206 )
207 if not collection:
208 return jsonify(
209 {
210 "success": False,
211 "error": "Collection not found",
212 }
213 ), 404
215 collection_name = collection.name
217 indexer = ResearchHistoryIndexer(username, db_password)
218 result = indexer.index_research(
219 research_id,
220 collection_id=collection_id,
221 )
223 if result["status"] == "error": 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true
224 return jsonify(
225 {
226 "success": False,
227 "error": result.get("error", "Operation failed."),
228 }
229 ), 400
231 result["collection_name"] = collection_name
232 return jsonify({"success": True, **result})
234 except Exception as e:
235 return handle_api_error("adding research to collection", e)
238# =============================================================================
239# Collection Search (generic — works for any collection type)
240# =============================================================================
243@search_bp.route(
244 "/api/collections/<string:collection_id>/search", methods=["POST"]
245)
246@login_required
247@require_json_body(error_format="success")
248def search_collection(collection_id):
249 """Search any collection using semantic similarity.
251 Delegates to CollectionSearchEngine instead of reimplementing FAISS search.
253 Request JSON:
254 query: Search query string
255 limit: Maximum number of results (default 10)
256 """
257 from ....database.session_context import get_user_db_session
258 from ....database.session_passwords import session_password_store
259 from ....web_search_engines.engines.search_engine_collection import (
260 CollectionSearchEngine,
261 )
263 username = session["username"]
264 session_id = session.get("session_id")
266 db_password = None
267 if session_id: 267 ↛ 274line 267 didn't jump to line 274 because the condition on line 267 was always true
268 db_password = (
269 session_password_store.get_session_password( # gitleaks:allow
270 username, session_id
271 )
272 )
274 data = request.get_json()
275 query = data.get("query", "").strip()
277 if len(query) > 10000: 277 ↛ 278line 277 didn't jump to line 278 because the condition on line 277 was never true
278 return jsonify(
279 {"success": False, "error": "Query too long (max 10000 characters)"}
280 ), 400
282 try:
283 limit = max(1, min(int(data.get("limit", 10)), 50))
284 except (TypeError, ValueError):
285 limit = 10
287 if not query:
288 return jsonify({"success": False, "error": "Query is required"}), 400
290 try:
291 # Verify collection exists and get its type
292 with get_user_db_session(username, db_password) as db_session:
293 collection = (
294 db_session.query(Collection).filter_by(id=collection_id).first()
295 )
296 if not collection:
297 return jsonify(
298 {"success": False, "error": "Collection not found"}
299 ), 404
300 collection_type = collection.collection_type
301 collection_name = collection.name
303 # Delegate to CollectionSearchEngine
304 engine = CollectionSearchEngine(
305 collection_id=collection_id,
306 collection_name=collection_name,
307 max_results=limit * 2,
308 settings_snapshot={"_username": username},
309 )
310 raw_results = engine.search(query, limit=limit * 2)
312 # Transform CollectionSearchEngine format -> API format
313 results = []
314 for r in raw_results:
315 meta = r.get("metadata", {})
316 results.append(
317 {
318 "document_id": meta.get("document_id")
319 or meta.get("source_id"),
320 "title": r.get("title", "Untitled"),
321 "snippet": r.get("snippet", ""),
322 "similarity": round(r.get("relevance_score", 0) * 100, 1),
323 "url": meta.get("source"),
324 }
325 )
326 if len(results) >= limit: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true
327 break
329 # For research_history collections, enrich with report/source type
330 if collection_type == "research_history":
331 _enrich_with_research_metadata(results, username, db_password)
333 # Always enrich with document-level metadata (file type, domain)
334 _enrich_with_document_metadata(results, username, db_password)
336 return jsonify({"success": True, "results": results, "query": query})
338 except Exception as e:
339 return handle_api_error("searching collection", e)
342def _enrich_with_research_metadata(results, username, db_password):
343 """Add report/source type and research context to search results."""
344 from ....database.models.library import SourceType
345 from ....database.models.research import ResearchHistory
346 from ....database.session_context import get_user_db_session
348 doc_ids = [r["document_id"] for r in results if r.get("document_id")]
349 if not doc_ids: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true
350 return
352 with get_user_db_session(username, db_password) as db_session:
353 rows = (
354 db_session.query(
355 Document.id.label("document_id"),
356 SourceType.name.label("source_type_name"),
357 ResearchHistory.title.label("research_title"),
358 ResearchHistory.query.label("research_query"),
359 ResearchHistory.created_at.label("research_created_at"),
360 Document.research_id,
361 )
362 .outerjoin(SourceType, Document.source_type_id == SourceType.id)
363 .outerjoin(
364 ResearchHistory,
365 Document.research_id == ResearchHistory.id,
366 )
367 .filter(Document.id.in_(doc_ids))
368 .all()
369 )
370 lookup = {row.document_id: row for row in rows}
372 for result in results:
373 row = lookup.get(result.get("document_id"))
374 if row:
375 result["type"] = (
376 "report"
377 if row.source_type_name == "research_report"
378 else "source"
379 )
380 result["research_id"] = row.research_id
381 result["research_title"] = row.research_title or (
382 row.research_query[:100] if row.research_query else ""
383 )
384 result["research_query"] = row.research_query
385 result["research_created_at"] = (
386 row.research_created_at
387 if isinstance(row.research_created_at, str)
388 else row.research_created_at.isoformat()
389 if row.research_created_at
390 else None
391 )
392 else:
393 result["type"] = "source"
394 result["research_id"] = None
395 result["research_title"] = ""
396 result["research_query"] = None
397 result["research_created_at"] = None
400def _enrich_with_document_metadata(results, username, db_password):
401 """Add file type, domain, and creation date to search results."""
402 from urllib.parse import urlparse
404 from ....database.session_context import get_user_db_session
406 doc_ids = [r["document_id"] for r in results if r.get("document_id")]
407 if not doc_ids:
408 return
410 with get_user_db_session(username, db_password) as db_session:
411 rows = (
412 db_session.query(
413 Document.id.label("document_id"),
414 Document.file_type,
415 Document.original_url,
416 Document.created_at,
417 )
418 .filter(Document.id.in_(doc_ids))
419 .all()
420 )
421 lookup = {row.document_id: row for row in rows}
423 for result in results:
424 row = lookup.get(result.get("document_id"))
425 if row:
426 result["file_type"] = row.file_type
427 result["created_at"] = (
428 row.created_at
429 if isinstance(row.created_at, str)
430 else row.created_at.isoformat()
431 if row.created_at
432 else None
433 )
434 if row.original_url:
435 try:
436 result["domain"] = urlparse(row.original_url).netloc
437 except (ValueError, AttributeError):
438 result["domain"] = "unknown"
439 else:
440 result["domain"] = None
441 else:
442 result.setdefault("file_type", "unknown")
443 result.setdefault("domain", None)
444 result.setdefault("created_at", None)