Coverage for src/local_deep_research/research_library/search/services/research_history_indexer.py: 99%
103 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Research History Indexer Service
4Enables semantic search over research history by:
5- Converting ResearchHistory reports into indexable Documents
6- Linking documents to the Research History collection
7- Triggering RAG indexing via LibraryRAGService
8"""
10import hashlib
11import uuid
12from datetime import datetime, UTC
13from typing import Any, Dict, Optional
15from loguru import logger
16from sqlalchemy.exc import IntegrityError
18from ....constants import ResearchStatus
19from ....database.library_init import ensure_research_history_collection
20from ....database.models.library import (
21 Document,
22 DocumentStatus,
23 SourceType,
24)
25from ....database.models.research import ResearchHistory
26from ....database.session_context import get_user_db_session
27from ...utils import ensure_in_collection
30class ResearchHistoryIndexer:
31 """
32 Service for indexing research history into a searchable collection.
34 Converts research reports into Documents that can be indexed for
35 semantic search using the existing RAG infrastructure.
36 """
38 # Source type names used in the database
39 SOURCE_TYPE_REPORT = "research_report"
40 COLLECTION_TYPE = "research_history"
42 def __init__(self, username: str, db_password: Optional[str] = None):
43 """
44 Initialize the indexer for a user.
46 Args:
47 username: Username for database access
48 db_password: Optional database password for encrypted DB access
49 """
50 self.username = username
51 self.db_password = db_password
53 def get_or_create_collection(self) -> str:
54 """
55 Get or create the Research History collection for this user.
57 Returns:
58 UUID of the Research History collection
59 """
60 return ensure_research_history_collection(
61 self.username, self.db_password
62 )
64 def index_research(
65 self,
66 research_id: str,
67 collection_id: Optional[str] = None,
68 ) -> Dict[str, Any]:
69 """
70 Convert a single research entry into a Document and add it to a
71 collection. Idempotent — safe to call multiple times.
73 Args:
74 research_id: UUID of the research to index
75 collection_id: Target collection UUID (defaults to Research History)
77 Returns:
78 Dict with status and document count
79 """
80 if collection_id is None: 80 ↛ 83line 80 didn't jump to line 83 because the condition on line 80 was always true
81 collection_id = self.get_or_create_collection()
83 with get_user_db_session(self.username, self.db_password) as session:
84 research = (
85 session.query(ResearchHistory)
86 .filter(ResearchHistory.id == research_id)
87 .first()
88 )
90 if not research:
91 return {"status": "error", "error": "Research not found"}
93 if research.status != ResearchStatus.COMPLETED:
94 return {
95 "status": "error",
96 "error": "Research is not yet completed",
97 }
99 if not research.report_content:
100 return {
101 "status": "error",
102 "error": "Research has no report content",
103 }
105 try:
106 report_doc = self._create_document_from_report(
107 research, collection_id, session
108 )
109 if report_doc is None:
110 return {
111 "status": "error",
112 "error": "SourceType 'research_report' not found. "
113 "Run library initialization.",
114 }
115 logger.info(
116 f"Created/found document for research: {research_id[:8]}"
117 )
118 except Exception:
119 logger.exception("Error creating report document")
120 return {
121 "status": "error",
122 "error": "Failed to create report document",
123 }
125 try:
126 session.commit()
127 except IntegrityError:
128 session.rollback()
129 logger.info(
130 f"DocumentCollection already exists for research "
131 f"{research_id[:8]} (concurrent insert)"
132 )
134 return {
135 "status": "success",
136 "research_id": research_id,
137 "collection_id": collection_id,
138 "documents_added": 1,
139 }
141 def convert_all_research(self, force: bool = False) -> Dict[str, Any]:
142 """
143 Convert all completed research entries into Documents (without RAG indexing).
145 Single-session implementation that calls private helpers directly to
146 avoid the nested-session issues that arise on SQLite when
147 index_research opens its own session inside a loop.
149 Args:
150 force: If True, process all entries even if already converted.
151 If False (default), skip entries that already have a report
152 Document.
154 Returns:
155 Dict with:
156 - converted: Number of research entries successfully converted
157 - skipped: Number of entries skipped (already converted)
158 - failed: Number of entries that raised an exception
159 - collection_id: UUID of the Research History collection
161 Note: the "already converted" filter checks ``Document.research_id``.
162 When two research entries produce identical ``report_content``,
163 ``_create_document_from_report`` reuses the existing Document (its
164 ``research_id`` stays as the first creator's), so the duplicate
165 research keeps appearing in the candidate set on every call. Calling
166 this from a hot path (request handler, polling loop) will repeatedly
167 re-attempt those entries. Call only from explicit user actions
168 (e.g. the manual ``/convert-all`` endpoint or ``auto_convert_research``
169 on research completion).
171 Only report Documents are created; source documents are not indexed.
172 """
173 collection_id = self.get_or_create_collection()
175 with get_user_db_session(self.username, self.db_password) as session:
176 # Resolve the report SourceType — required to create report Documents
177 report_type = (
178 session.query(SourceType)
179 .filter_by(name=self.SOURCE_TYPE_REPORT)
180 .first()
181 )
182 if report_type is None:
183 logger.warning(
184 f"SourceType '{self.SOURCE_TYPE_REPORT}' not found. "
185 "Run library initialization to seed source types before "
186 "converting research history."
187 )
188 return {
189 "converted": 0,
190 "skipped": 0,
191 "failed": 0,
192 "collection_id": collection_id,
193 }
195 # Build subquery of research IDs that already have a report Document
196 already_converted_subquery = (
197 session.query(Document.research_id)
198 .filter(Document.source_type_id == report_type.id)
199 .filter(Document.research_id.isnot(None))
200 .distinct()
201 .subquery()
202 )
204 # Count total eligible research entries (before filtering)
205 total_eligible = (
206 session.query(ResearchHistory)
207 .filter(ResearchHistory.status == ResearchStatus.COMPLETED)
208 .filter(ResearchHistory.report_content.isnot(None))
209 .filter(ResearchHistory.report_content != "")
210 .count()
211 )
213 # Fetch candidates — optionally excluding already-converted entries
214 query = (
215 session.query(ResearchHistory)
216 .filter(ResearchHistory.status == ResearchStatus.COMPLETED)
217 .filter(ResearchHistory.report_content.isnot(None))
218 .filter(ResearchHistory.report_content != "")
219 .order_by(ResearchHistory.created_at.desc())
220 )
221 if not force:
222 query = query.filter(
223 ResearchHistory.id.notin_(
224 already_converted_subquery.select()
225 )
226 )
228 research_entries = query.all()
230 converted = 0
231 skipped = total_eligible - len(research_entries) if not force else 0
232 failed = 0
234 for research in research_entries:
235 try:
236 # Create (or reuse) report Document
237 report_doc = self._create_document_from_report(
238 research,
239 collection_id,
240 session,
241 report_type_id=report_type.id,
242 )
243 if report_doc is None:
244 # SourceType missing inside helper (already warned)
245 failed += 1
246 continue
248 # Commit each entry individually so a rollback on failure
249 # only loses the failing entry, not the whole batch.
250 session.commit()
251 converted += 1
253 except Exception:
254 logger.exception(f"Error converting research {research.id}")
255 session.rollback()
256 failed += 1
258 logger.info(
259 f"convert_all_research complete — converted={converted}, "
260 f"skipped={skipped}, failed={failed}"
261 )
262 return {
263 "converted": converted,
264 "skipped": skipped,
265 "failed": failed,
266 "collection_id": collection_id,
267 }
269 def _create_document_from_report(
270 self,
271 research: ResearchHistory,
272 collection_id: str,
273 session,
274 report_type_id: Optional[str] = None,
275 ) -> Optional[Document]:
276 """
277 Create a Document from a research report.
279 Args:
280 research: ResearchHistory entry
281 collection_id: Target collection UUID
282 session: Database session
283 report_type_id: Pre-resolved SourceType ID (avoids N+1 queries
284 when called in a loop from convert_all_research)
286 Returns:
287 Created Document or None if skipped
288 """
289 # Resolve report SourceType (cached ID avoids per-entry query)
290 if report_type_id is None:
291 report_type = (
292 session.query(SourceType)
293 .filter_by(name=self.SOURCE_TYPE_REPORT)
294 .first()
295 )
296 if report_type is None:
297 logger.warning(
298 f"SourceType '{self.SOURCE_TYPE_REPORT}' not found for research "
299 f"{research.id}. Cannot create document — run library initialization "
300 "to seed source types."
301 )
302 return None
303 report_type_id = report_type.id
304 existing_doc = (
305 session.query(Document)
306 .filter(Document.research_id == research.id)
307 .filter(Document.source_type_id == report_type_id)
308 .first()
309 )
311 if existing_doc:
312 # Ensure it's in the collection
313 ensure_in_collection(session, existing_doc.id, collection_id)
314 return existing_doc
316 # Create document or reuse existing one with same content hash
317 # (document_hash has a unique constraint, so identical content
318 # must share a Document row — research_id points to the first creator)
319 content = research.report_content
320 doc_hash = hashlib.sha256(content.encode("utf-8")).hexdigest()
322 document = (
323 session.query(Document)
324 .filter(Document.document_hash == doc_hash)
325 .first()
326 )
328 if document is None:
329 doc_id = str(uuid.uuid4())
330 document = Document(
331 id=doc_id,
332 source_type_id=report_type_id,
333 research_id=research.id,
334 document_hash=doc_hash,
335 file_size=len(content.encode("utf-8")),
336 file_type="markdown",
337 mime_type="text/markdown",
338 title=research.title
339 or (research.query[:100] if research.query else "Untitled"),
340 text_content=content,
341 status=DocumentStatus.COMPLETED,
342 processed_at=datetime.now(UTC),
343 character_count=len(content),
344 word_count=len(content.split()),
345 )
346 session.add(document)
347 session.flush()
349 ensure_in_collection(session, document.id, collection_id)
350 return document
353def auto_convert_research(
354 username: str, research_id: str, db_password: str | None = None
355):
356 """Auto-convert a completed research entry to a document in the History collection.
358 Safe to call from any context — exceptions are caught and logged.
359 """
360 try:
361 indexer = ResearchHistoryIndexer(username, db_password=db_password)
362 result = indexer.index_research(research_id)
363 logger.info(
364 f"Auto-converted research {research_id} for user {username}: "
365 f"{result.get('status')}"
366 )
367 except Exception:
368 logger.exception(
369 f"Failed to auto-convert research {research_id} for user {username}"
370 )