Coverage for src/local_deep_research/research_library/search/services/research_history_indexer.py: 99%

103 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Research History Indexer Service 

3 

4Enables semantic search over research history by: 

5- Converting ResearchHistory reports into indexable Documents 

6- Linking documents to the Research History collection 

7- Triggering RAG indexing via LibraryRAGService 

8""" 

9 

10import hashlib 

11import uuid 

12from datetime import datetime, UTC 

13from typing import Any, Dict, Optional 

14 

15from loguru import logger 

16from sqlalchemy.exc import IntegrityError 

17 

18from ....constants import ResearchStatus 

19from ....database.library_init import ensure_research_history_collection 

20from ....database.models.library import ( 

21 Document, 

22 DocumentStatus, 

23 SourceType, 

24) 

25from ....database.models.research import ResearchHistory 

26from ....database.session_context import get_user_db_session 

27from ...utils import ensure_in_collection 

28 

29 

30class ResearchHistoryIndexer: 

31 """ 

32 Service for indexing research history into a searchable collection. 

33 

34 Converts research reports into Documents that can be indexed for 

35 semantic search using the existing RAG infrastructure. 

36 """ 

37 

38 # Source type names used in the database 

39 SOURCE_TYPE_REPORT = "research_report" 

40 COLLECTION_TYPE = "research_history" 

41 

42 def __init__(self, username: str, db_password: Optional[str] = None): 

43 """ 

44 Initialize the indexer for a user. 

45 

46 Args: 

47 username: Username for database access 

48 db_password: Optional database password for encrypted DB access 

49 """ 

50 self.username = username 

51 self.db_password = db_password 

52 

53 def get_or_create_collection(self) -> str: 

54 """ 

55 Get or create the Research History collection for this user. 

56 

57 Returns: 

58 UUID of the Research History collection 

59 """ 

60 return ensure_research_history_collection( 

61 self.username, self.db_password 

62 ) 

63 

64 def index_research( 

65 self, 

66 research_id: str, 

67 collection_id: Optional[str] = None, 

68 ) -> Dict[str, Any]: 

69 """ 

70 Convert a single research entry into a Document and add it to a 

71 collection. Idempotent — safe to call multiple times. 

72 

73 Args: 

74 research_id: UUID of the research to index 

75 collection_id: Target collection UUID (defaults to Research History) 

76 

77 Returns: 

78 Dict with status and document count 

79 """ 

80 if collection_id is None: 80 ↛ 83line 80 didn't jump to line 83 because the condition on line 80 was always true

81 collection_id = self.get_or_create_collection() 

82 

83 with get_user_db_session(self.username, self.db_password) as session: 

84 research = ( 

85 session.query(ResearchHistory) 

86 .filter(ResearchHistory.id == research_id) 

87 .first() 

88 ) 

89 

90 if not research: 

91 return {"status": "error", "error": "Research not found"} 

92 

93 if research.status != ResearchStatus.COMPLETED: 

94 return { 

95 "status": "error", 

96 "error": "Research is not yet completed", 

97 } 

98 

99 if not research.report_content: 

100 return { 

101 "status": "error", 

102 "error": "Research has no report content", 

103 } 

104 

105 try: 

106 report_doc = self._create_document_from_report( 

107 research, collection_id, session 

108 ) 

109 if report_doc is None: 

110 return { 

111 "status": "error", 

112 "error": "SourceType 'research_report' not found. " 

113 "Run library initialization.", 

114 } 

115 logger.info( 

116 f"Created/found document for research: {research_id[:8]}" 

117 ) 

118 except Exception: 

119 logger.exception("Error creating report document") 

120 return { 

121 "status": "error", 

122 "error": "Failed to create report document", 

123 } 

124 

125 try: 

126 session.commit() 

127 except IntegrityError: 

128 session.rollback() 

129 logger.info( 

130 f"DocumentCollection already exists for research " 

131 f"{research_id[:8]} (concurrent insert)" 

132 ) 

133 

134 return { 

135 "status": "success", 

136 "research_id": research_id, 

137 "collection_id": collection_id, 

138 "documents_added": 1, 

139 } 

140 

141 def convert_all_research(self, force: bool = False) -> Dict[str, Any]: 

142 """ 

143 Convert all completed research entries into Documents (without RAG indexing). 

144 

145 Single-session implementation that calls private helpers directly to 

146 avoid the nested-session issues that arise on SQLite when 

147 index_research opens its own session inside a loop. 

148 

149 Args: 

150 force: If True, process all entries even if already converted. 

151 If False (default), skip entries that already have a report 

152 Document. 

153 

154 Returns: 

155 Dict with: 

156 - converted: Number of research entries successfully converted 

157 - skipped: Number of entries skipped (already converted) 

158 - failed: Number of entries that raised an exception 

159 - collection_id: UUID of the Research History collection 

160 

161 Note: the "already converted" filter checks ``Document.research_id``. 

162 When two research entries produce identical ``report_content``, 

163 ``_create_document_from_report`` reuses the existing Document (its 

164 ``research_id`` stays as the first creator's), so the duplicate 

165 research keeps appearing in the candidate set on every call. Calling 

166 this from a hot path (request handler, polling loop) will repeatedly 

167 re-attempt those entries. Call only from explicit user actions 

168 (e.g. the manual ``/convert-all`` endpoint or ``auto_convert_research`` 

169 on research completion). 

170 

171 Only report Documents are created; source documents are not indexed. 

172 """ 

173 collection_id = self.get_or_create_collection() 

174 

175 with get_user_db_session(self.username, self.db_password) as session: 

176 # Resolve the report SourceType — required to create report Documents 

177 report_type = ( 

178 session.query(SourceType) 

179 .filter_by(name=self.SOURCE_TYPE_REPORT) 

180 .first() 

181 ) 

182 if report_type is None: 

183 logger.warning( 

184 f"SourceType '{self.SOURCE_TYPE_REPORT}' not found. " 

185 "Run library initialization to seed source types before " 

186 "converting research history." 

187 ) 

188 return { 

189 "converted": 0, 

190 "skipped": 0, 

191 "failed": 0, 

192 "collection_id": collection_id, 

193 } 

194 

195 # Build subquery of research IDs that already have a report Document 

196 already_converted_subquery = ( 

197 session.query(Document.research_id) 

198 .filter(Document.source_type_id == report_type.id) 

199 .filter(Document.research_id.isnot(None)) 

200 .distinct() 

201 .subquery() 

202 ) 

203 

204 # Count total eligible research entries (before filtering) 

205 total_eligible = ( 

206 session.query(ResearchHistory) 

207 .filter(ResearchHistory.status == ResearchStatus.COMPLETED) 

208 .filter(ResearchHistory.report_content.isnot(None)) 

209 .filter(ResearchHistory.report_content != "") 

210 .count() 

211 ) 

212 

213 # Fetch candidates — optionally excluding already-converted entries 

214 query = ( 

215 session.query(ResearchHistory) 

216 .filter(ResearchHistory.status == ResearchStatus.COMPLETED) 

217 .filter(ResearchHistory.report_content.isnot(None)) 

218 .filter(ResearchHistory.report_content != "") 

219 .order_by(ResearchHistory.created_at.desc()) 

220 ) 

221 if not force: 

222 query = query.filter( 

223 ResearchHistory.id.notin_( 

224 already_converted_subquery.select() 

225 ) 

226 ) 

227 

228 research_entries = query.all() 

229 

230 converted = 0 

231 skipped = total_eligible - len(research_entries) if not force else 0 

232 failed = 0 

233 

234 for research in research_entries: 

235 try: 

236 # Create (or reuse) report Document 

237 report_doc = self._create_document_from_report( 

238 research, 

239 collection_id, 

240 session, 

241 report_type_id=report_type.id, 

242 ) 

243 if report_doc is None: 

244 # SourceType missing inside helper (already warned) 

245 failed += 1 

246 continue 

247 

248 # Commit each entry individually so a rollback on failure 

249 # only loses the failing entry, not the whole batch. 

250 session.commit() 

251 converted += 1 

252 

253 except Exception: 

254 logger.exception(f"Error converting research {research.id}") 

255 session.rollback() 

256 failed += 1 

257 

258 logger.info( 

259 f"convert_all_research complete — converted={converted}, " 

260 f"skipped={skipped}, failed={failed}" 

261 ) 

262 return { 

263 "converted": converted, 

264 "skipped": skipped, 

265 "failed": failed, 

266 "collection_id": collection_id, 

267 } 

268 

269 def _create_document_from_report( 

270 self, 

271 research: ResearchHistory, 

272 collection_id: str, 

273 session, 

274 report_type_id: Optional[str] = None, 

275 ) -> Optional[Document]: 

276 """ 

277 Create a Document from a research report. 

278 

279 Args: 

280 research: ResearchHistory entry 

281 collection_id: Target collection UUID 

282 session: Database session 

283 report_type_id: Pre-resolved SourceType ID (avoids N+1 queries 

284 when called in a loop from convert_all_research) 

285 

286 Returns: 

287 Created Document or None if skipped 

288 """ 

289 # Resolve report SourceType (cached ID avoids per-entry query) 

290 if report_type_id is None: 

291 report_type = ( 

292 session.query(SourceType) 

293 .filter_by(name=self.SOURCE_TYPE_REPORT) 

294 .first() 

295 ) 

296 if report_type is None: 

297 logger.warning( 

298 f"SourceType '{self.SOURCE_TYPE_REPORT}' not found for research " 

299 f"{research.id}. Cannot create document — run library initialization " 

300 "to seed source types." 

301 ) 

302 return None 

303 report_type_id = report_type.id 

304 existing_doc = ( 

305 session.query(Document) 

306 .filter(Document.research_id == research.id) 

307 .filter(Document.source_type_id == report_type_id) 

308 .first() 

309 ) 

310 

311 if existing_doc: 

312 # Ensure it's in the collection 

313 ensure_in_collection(session, existing_doc.id, collection_id) 

314 return existing_doc 

315 

316 # Create document or reuse existing one with same content hash 

317 # (document_hash has a unique constraint, so identical content 

318 # must share a Document row — research_id points to the first creator) 

319 content = research.report_content 

320 doc_hash = hashlib.sha256(content.encode("utf-8")).hexdigest() 

321 

322 document = ( 

323 session.query(Document) 

324 .filter(Document.document_hash == doc_hash) 

325 .first() 

326 ) 

327 

328 if document is None: 

329 doc_id = str(uuid.uuid4()) 

330 document = Document( 

331 id=doc_id, 

332 source_type_id=report_type_id, 

333 research_id=research.id, 

334 document_hash=doc_hash, 

335 file_size=len(content.encode("utf-8")), 

336 file_type="markdown", 

337 mime_type="text/markdown", 

338 title=research.title 

339 or (research.query[:100] if research.query else "Untitled"), 

340 text_content=content, 

341 status=DocumentStatus.COMPLETED, 

342 processed_at=datetime.now(UTC), 

343 character_count=len(content), 

344 word_count=len(content.split()), 

345 ) 

346 session.add(document) 

347 session.flush() 

348 

349 ensure_in_collection(session, document.id, collection_id) 

350 return document 

351 

352 

353def auto_convert_research( 

354 username: str, research_id: str, db_password: str | None = None 

355): 

356 """Auto-convert a completed research entry to a document in the History collection. 

357 

358 Safe to call from any context — exceptions are caught and logged. 

359 """ 

360 try: 

361 indexer = ResearchHistoryIndexer(username, db_password=db_password) 

362 result = indexer.index_research(research_id) 

363 logger.info( 

364 f"Auto-converted research {research_id} for user {username}: " 

365 f"{result.get('status')}" 

366 ) 

367 except Exception: 

368 logger.exception( 

369 f"Failed to auto-convert research {research_id} for user {username}" 

370 )