Coverage for src / local_deep_research / research_library / search / services / research_history_indexer.py: 99%
106 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Research History Indexer Service
4Enables semantic search over research history by:
5- Converting ResearchHistory reports into indexable Documents
6- Linking documents to the Research History collection
7- Triggering RAG indexing via LibraryRAGService
8"""
10import hashlib
11import uuid
12from datetime import datetime, UTC
13from typing import Any, Dict, Optional
15from loguru import logger
16from sqlalchemy.exc import IntegrityError
18from ....constants import ResearchStatus
19from ....database.library_init import ensure_research_history_collection
20from ....database.models.library import (
21 Document,
22 DocumentCollection,
23 DocumentStatus,
24 SourceType,
25)
26from ....database.models.research import ResearchHistory
27from ....database.session_context import get_user_db_session
30class ResearchHistoryIndexer:
31 """
32 Service for indexing research history into a searchable collection.
34 Converts research reports into Documents that can be indexed for
35 semantic search using the existing RAG infrastructure.
36 """
38 # Source type names used in the database
39 SOURCE_TYPE_REPORT = "research_report"
40 COLLECTION_TYPE = "research_history"
42 def __init__(self, username: str, db_password: Optional[str] = None):
43 """
44 Initialize the indexer for a user.
46 Args:
47 username: Username for database access
48 db_password: Optional database password for encrypted DB access
49 """
50 self.username = username
51 self.db_password = db_password
53 def get_or_create_collection(self) -> str:
54 """
55 Get or create the Research History collection for this user.
57 Returns:
58 UUID of the Research History collection
59 """
60 return ensure_research_history_collection(
61 self.username, self.db_password
62 )
64 def index_research(
65 self,
66 research_id: str,
67 collection_id: Optional[str] = None,
68 ) -> Dict[str, Any]:
69 """
70 Convert a single research entry into a Document and add it to a
71 collection. Idempotent — safe to call multiple times.
73 Args:
74 research_id: UUID of the research to index
75 collection_id: Target collection UUID (defaults to Research History)
77 Returns:
78 Dict with status and document count
79 """
80 if collection_id is None: 80 ↛ 83line 80 didn't jump to line 83 because the condition on line 80 was always true
81 collection_id = self.get_or_create_collection()
83 with get_user_db_session(self.username, self.db_password) as session:
84 research = (
85 session.query(ResearchHistory)
86 .filter(ResearchHistory.id == research_id)
87 .first()
88 )
90 if not research:
91 return {"status": "error", "error": "Research not found"}
93 if research.status != ResearchStatus.COMPLETED:
94 return {
95 "status": "error",
96 "error": "Research is not yet completed",
97 }
99 if not research.report_content:
100 return {
101 "status": "error",
102 "error": "Research has no report content",
103 }
105 try:
106 report_doc = self._create_document_from_report(
107 research, collection_id, session
108 )
109 if report_doc is None:
110 return {
111 "status": "error",
112 "error": "SourceType 'research_report' not found. "
113 "Run library initialization.",
114 }
115 logger.info(
116 f"Created/found document for research: {research_id[:8]}"
117 )
118 except Exception:
119 logger.exception("Error creating report document")
120 return {
121 "status": "error",
122 "error": "Failed to create report document",
123 }
125 try:
126 session.commit()
127 except IntegrityError:
128 session.rollback()
129 logger.info(
130 f"DocumentCollection already exists for research "
131 f"{research_id[:8]} (concurrent insert)"
132 )
134 return {
135 "status": "success",
136 "research_id": research_id,
137 "collection_id": collection_id,
138 "documents_added": 1,
139 }
141 def convert_all_research(self, force: bool = False) -> Dict[str, Any]:
142 """
143 Convert all completed research entries into Documents (without RAG indexing).
145 Unlike index_all_research / index_research, this method operates within a
146 single DB session and calls the private helpers directly, avoiding the
147 nested-session issues that arise on SQLite when index_research opens its
148 own session inside a loop.
150 Args:
151 force: If True, process all entries even if already converted.
152 If False (default), skip entries that already have a report
153 Document.
155 Returns:
156 Dict with:
157 - converted: Number of research entries successfully converted
158 - skipped: Number of entries skipped (already converted)
159 - failed: Number of entries that raised an exception
160 - collection_id: UUID of the Research History collection
162 Only report Documents are created; source documents are not indexed.
163 """
164 collection_id = self.get_or_create_collection()
166 with get_user_db_session(self.username, self.db_password) as session:
167 # Resolve the report SourceType — required to create report Documents
168 report_type = (
169 session.query(SourceType)
170 .filter_by(name=self.SOURCE_TYPE_REPORT)
171 .first()
172 )
173 if report_type is None:
174 logger.warning(
175 f"SourceType '{self.SOURCE_TYPE_REPORT}' not found. "
176 "Run library initialization to seed source types before "
177 "converting research history."
178 )
179 return {
180 "converted": 0,
181 "skipped": 0,
182 "failed": 0,
183 "collection_id": collection_id,
184 }
186 # Build subquery of research IDs that already have a report Document
187 already_converted_subquery = (
188 session.query(Document.research_id)
189 .filter(Document.source_type_id == report_type.id)
190 .filter(Document.research_id.isnot(None))
191 .distinct()
192 .subquery()
193 )
195 # Count total eligible research entries (before filtering)
196 total_eligible = (
197 session.query(ResearchHistory)
198 .filter(ResearchHistory.status == ResearchStatus.COMPLETED)
199 .filter(ResearchHistory.report_content.isnot(None))
200 .filter(ResearchHistory.report_content != "")
201 .count()
202 )
204 # Fetch candidates — optionally excluding already-converted entries
205 query = (
206 session.query(ResearchHistory)
207 .filter(ResearchHistory.status == ResearchStatus.COMPLETED)
208 .filter(ResearchHistory.report_content.isnot(None))
209 .filter(ResearchHistory.report_content != "")
210 .order_by(ResearchHistory.created_at.desc())
211 )
212 if not force:
213 query = query.filter(
214 ResearchHistory.id.notin_(
215 already_converted_subquery.select()
216 )
217 )
219 research_entries = query.all()
221 converted = 0
222 skipped = total_eligible - len(research_entries) if not force else 0
223 failed = 0
225 for research in research_entries:
226 try:
227 # Create (or reuse) report Document
228 report_doc = self._create_document_from_report(
229 research,
230 collection_id,
231 session,
232 report_type_id=report_type.id,
233 )
234 if report_doc is None:
235 # SourceType missing inside helper (already warned)
236 failed += 1
237 continue
239 # Commit each entry individually so a rollback on failure
240 # only loses the failing entry, not the whole batch.
241 session.commit()
242 converted += 1
244 except Exception:
245 logger.exception(f"Error converting research {research.id}")
246 session.rollback()
247 failed += 1
249 logger.info(
250 f"convert_all_research complete — converted={converted}, "
251 f"skipped={skipped}, failed={failed}"
252 )
253 return {
254 "converted": converted,
255 "skipped": skipped,
256 "failed": failed,
257 "collection_id": collection_id,
258 }
260 def _create_document_from_report(
261 self,
262 research: ResearchHistory,
263 collection_id: str,
264 session,
265 report_type_id: Optional[str] = None,
266 ) -> Optional[Document]:
267 """
268 Create a Document from a research report.
270 Args:
271 research: ResearchHistory entry
272 collection_id: Target collection UUID
273 session: Database session
274 report_type_id: Pre-resolved SourceType ID (avoids N+1 queries
275 when called in a loop from convert_all_research)
277 Returns:
278 Created Document or None if skipped
279 """
280 # Resolve report SourceType (cached ID avoids per-entry query)
281 if report_type_id is None:
282 report_type = (
283 session.query(SourceType)
284 .filter_by(name=self.SOURCE_TYPE_REPORT)
285 .first()
286 )
287 if report_type is None:
288 logger.warning(
289 f"SourceType '{self.SOURCE_TYPE_REPORT}' not found for research "
290 f"{research.id}. Cannot create document — run library initialization "
291 "to seed source types."
292 )
293 return None
294 report_type_id = report_type.id
295 existing_doc = (
296 session.query(Document)
297 .filter(Document.research_id == research.id)
298 .filter(Document.source_type_id == report_type_id)
299 .first()
300 )
302 if existing_doc:
303 # Ensure it's in the collection
304 self._ensure_in_collection(existing_doc.id, collection_id, session)
305 return existing_doc
307 # Create document or reuse existing one with same content hash
308 # (document_hash has a unique constraint, so identical content
309 # must share a Document row — research_id points to the first creator)
310 content = research.report_content
311 doc_hash = hashlib.sha256(content.encode("utf-8")).hexdigest()
313 document = (
314 session.query(Document)
315 .filter(Document.document_hash == doc_hash)
316 .first()
317 )
319 if document is None:
320 doc_id = str(uuid.uuid4())
321 document = Document(
322 id=doc_id,
323 source_type_id=report_type_id,
324 research_id=research.id,
325 document_hash=doc_hash,
326 file_size=len(content.encode("utf-8")),
327 file_type="markdown",
328 mime_type="text/markdown",
329 title=research.title
330 or (research.query[:100] if research.query else "Untitled"),
331 text_content=content,
332 status=DocumentStatus.COMPLETED,
333 processed_at=datetime.now(UTC),
334 character_count=len(content),
335 word_count=len(content.split()),
336 )
337 session.add(document)
338 session.flush()
340 self._ensure_in_collection(document.id, collection_id, session)
341 return document
343 def _ensure_in_collection(
344 self, document_id: str, collection_id: str, session
345 ) -> None:
346 """Add document to collection if not already there."""
347 existing = (
348 session.query(DocumentCollection)
349 .filter_by(document_id=document_id, collection_id=collection_id)
350 .first()
351 )
352 if not existing:
353 session.add(
354 DocumentCollection(
355 document_id=document_id,
356 collection_id=collection_id,
357 indexed=False,
358 )
359 )
362def auto_convert_research(
363 username: str, research_id: str, db_password: str | None = None
364):
365 """Auto-convert a completed research entry to a document in the History collection.
367 Safe to call from any context — exceptions are caught and logged.
368 """
369 try:
370 indexer = ResearchHistoryIndexer(username, db_password=db_password)
371 result = indexer.index_research(research_id)
372 logger.info(
373 f"Auto-converted research {research_id} for user {username}: "
374 f"{result.get('status')}"
375 )
376 except Exception:
377 logger.exception(
378 f"Failed to auto-convert research {research_id} for user {username}"
379 )