Coverage for src / local_deep_research / research_library / search / services / research_history_indexer.py: 99%

106 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Research History Indexer Service 

3 

4Enables semantic search over research history by: 

5- Converting ResearchHistory reports into indexable Documents 

6- Linking documents to the Research History collection 

7- Triggering RAG indexing via LibraryRAGService 

8""" 

9 

10import hashlib 

11import uuid 

12from datetime import datetime, UTC 

13from typing import Any, Dict, Optional 

14 

15from loguru import logger 

16from sqlalchemy.exc import IntegrityError 

17 

18from ....constants import ResearchStatus 

19from ....database.library_init import ensure_research_history_collection 

20from ....database.models.library import ( 

21 Document, 

22 DocumentCollection, 

23 DocumentStatus, 

24 SourceType, 

25) 

26from ....database.models.research import ResearchHistory 

27from ....database.session_context import get_user_db_session 

28 

29 

30class ResearchHistoryIndexer: 

31 """ 

32 Service for indexing research history into a searchable collection. 

33 

34 Converts research reports into Documents that can be indexed for 

35 semantic search using the existing RAG infrastructure. 

36 """ 

37 

38 # Source type names used in the database 

39 SOURCE_TYPE_REPORT = "research_report" 

40 COLLECTION_TYPE = "research_history" 

41 

42 def __init__(self, username: str, db_password: Optional[str] = None): 

43 """ 

44 Initialize the indexer for a user. 

45 

46 Args: 

47 username: Username for database access 

48 db_password: Optional database password for encrypted DB access 

49 """ 

50 self.username = username 

51 self.db_password = db_password 

52 

53 def get_or_create_collection(self) -> str: 

54 """ 

55 Get or create the Research History collection for this user. 

56 

57 Returns: 

58 UUID of the Research History collection 

59 """ 

60 return ensure_research_history_collection( 

61 self.username, self.db_password 

62 ) 

63 

64 def index_research( 

65 self, 

66 research_id: str, 

67 collection_id: Optional[str] = None, 

68 ) -> Dict[str, Any]: 

69 """ 

70 Convert a single research entry into a Document and add it to a 

71 collection. Idempotent — safe to call multiple times. 

72 

73 Args: 

74 research_id: UUID of the research to index 

75 collection_id: Target collection UUID (defaults to Research History) 

76 

77 Returns: 

78 Dict with status and document count 

79 """ 

80 if collection_id is None: 80 ↛ 83line 80 didn't jump to line 83 because the condition on line 80 was always true

81 collection_id = self.get_or_create_collection() 

82 

83 with get_user_db_session(self.username, self.db_password) as session: 

84 research = ( 

85 session.query(ResearchHistory) 

86 .filter(ResearchHistory.id == research_id) 

87 .first() 

88 ) 

89 

90 if not research: 

91 return {"status": "error", "error": "Research not found"} 

92 

93 if research.status != ResearchStatus.COMPLETED: 

94 return { 

95 "status": "error", 

96 "error": "Research is not yet completed", 

97 } 

98 

99 if not research.report_content: 

100 return { 

101 "status": "error", 

102 "error": "Research has no report content", 

103 } 

104 

105 try: 

106 report_doc = self._create_document_from_report( 

107 research, collection_id, session 

108 ) 

109 if report_doc is None: 

110 return { 

111 "status": "error", 

112 "error": "SourceType 'research_report' not found. " 

113 "Run library initialization.", 

114 } 

115 logger.info( 

116 f"Created/found document for research: {research_id[:8]}" 

117 ) 

118 except Exception: 

119 logger.exception("Error creating report document") 

120 return { 

121 "status": "error", 

122 "error": "Failed to create report document", 

123 } 

124 

125 try: 

126 session.commit() 

127 except IntegrityError: 

128 session.rollback() 

129 logger.info( 

130 f"DocumentCollection already exists for research " 

131 f"{research_id[:8]} (concurrent insert)" 

132 ) 

133 

134 return { 

135 "status": "success", 

136 "research_id": research_id, 

137 "collection_id": collection_id, 

138 "documents_added": 1, 

139 } 

140 

141 def convert_all_research(self, force: bool = False) -> Dict[str, Any]: 

142 """ 

143 Convert all completed research entries into Documents (without RAG indexing). 

144 

145 Unlike index_all_research / index_research, this method operates within a 

146 single DB session and calls the private helpers directly, avoiding the 

147 nested-session issues that arise on SQLite when index_research opens its 

148 own session inside a loop. 

149 

150 Args: 

151 force: If True, process all entries even if already converted. 

152 If False (default), skip entries that already have a report 

153 Document. 

154 

155 Returns: 

156 Dict with: 

157 - converted: Number of research entries successfully converted 

158 - skipped: Number of entries skipped (already converted) 

159 - failed: Number of entries that raised an exception 

160 - collection_id: UUID of the Research History collection 

161 

162 Only report Documents are created; source documents are not indexed. 

163 """ 

164 collection_id = self.get_or_create_collection() 

165 

166 with get_user_db_session(self.username, self.db_password) as session: 

167 # Resolve the report SourceType — required to create report Documents 

168 report_type = ( 

169 session.query(SourceType) 

170 .filter_by(name=self.SOURCE_TYPE_REPORT) 

171 .first() 

172 ) 

173 if report_type is None: 

174 logger.warning( 

175 f"SourceType '{self.SOURCE_TYPE_REPORT}' not found. " 

176 "Run library initialization to seed source types before " 

177 "converting research history." 

178 ) 

179 return { 

180 "converted": 0, 

181 "skipped": 0, 

182 "failed": 0, 

183 "collection_id": collection_id, 

184 } 

185 

186 # Build subquery of research IDs that already have a report Document 

187 already_converted_subquery = ( 

188 session.query(Document.research_id) 

189 .filter(Document.source_type_id == report_type.id) 

190 .filter(Document.research_id.isnot(None)) 

191 .distinct() 

192 .subquery() 

193 ) 

194 

195 # Count total eligible research entries (before filtering) 

196 total_eligible = ( 

197 session.query(ResearchHistory) 

198 .filter(ResearchHistory.status == ResearchStatus.COMPLETED) 

199 .filter(ResearchHistory.report_content.isnot(None)) 

200 .filter(ResearchHistory.report_content != "") 

201 .count() 

202 ) 

203 

204 # Fetch candidates — optionally excluding already-converted entries 

205 query = ( 

206 session.query(ResearchHistory) 

207 .filter(ResearchHistory.status == ResearchStatus.COMPLETED) 

208 .filter(ResearchHistory.report_content.isnot(None)) 

209 .filter(ResearchHistory.report_content != "") 

210 .order_by(ResearchHistory.created_at.desc()) 

211 ) 

212 if not force: 

213 query = query.filter( 

214 ResearchHistory.id.notin_( 

215 already_converted_subquery.select() 

216 ) 

217 ) 

218 

219 research_entries = query.all() 

220 

221 converted = 0 

222 skipped = total_eligible - len(research_entries) if not force else 0 

223 failed = 0 

224 

225 for research in research_entries: 

226 try: 

227 # Create (or reuse) report Document 

228 report_doc = self._create_document_from_report( 

229 research, 

230 collection_id, 

231 session, 

232 report_type_id=report_type.id, 

233 ) 

234 if report_doc is None: 

235 # SourceType missing inside helper (already warned) 

236 failed += 1 

237 continue 

238 

239 # Commit each entry individually so a rollback on failure 

240 # only loses the failing entry, not the whole batch. 

241 session.commit() 

242 converted += 1 

243 

244 except Exception: 

245 logger.exception(f"Error converting research {research.id}") 

246 session.rollback() 

247 failed += 1 

248 

249 logger.info( 

250 f"convert_all_research complete — converted={converted}, " 

251 f"skipped={skipped}, failed={failed}" 

252 ) 

253 return { 

254 "converted": converted, 

255 "skipped": skipped, 

256 "failed": failed, 

257 "collection_id": collection_id, 

258 } 

259 

260 def _create_document_from_report( 

261 self, 

262 research: ResearchHistory, 

263 collection_id: str, 

264 session, 

265 report_type_id: Optional[str] = None, 

266 ) -> Optional[Document]: 

267 """ 

268 Create a Document from a research report. 

269 

270 Args: 

271 research: ResearchHistory entry 

272 collection_id: Target collection UUID 

273 session: Database session 

274 report_type_id: Pre-resolved SourceType ID (avoids N+1 queries 

275 when called in a loop from convert_all_research) 

276 

277 Returns: 

278 Created Document or None if skipped 

279 """ 

280 # Resolve report SourceType (cached ID avoids per-entry query) 

281 if report_type_id is None: 

282 report_type = ( 

283 session.query(SourceType) 

284 .filter_by(name=self.SOURCE_TYPE_REPORT) 

285 .first() 

286 ) 

287 if report_type is None: 

288 logger.warning( 

289 f"SourceType '{self.SOURCE_TYPE_REPORT}' not found for research " 

290 f"{research.id}. Cannot create document — run library initialization " 

291 "to seed source types." 

292 ) 

293 return None 

294 report_type_id = report_type.id 

295 existing_doc = ( 

296 session.query(Document) 

297 .filter(Document.research_id == research.id) 

298 .filter(Document.source_type_id == report_type_id) 

299 .first() 

300 ) 

301 

302 if existing_doc: 

303 # Ensure it's in the collection 

304 self._ensure_in_collection(existing_doc.id, collection_id, session) 

305 return existing_doc 

306 

307 # Create document or reuse existing one with same content hash 

308 # (document_hash has a unique constraint, so identical content 

309 # must share a Document row — research_id points to the first creator) 

310 content = research.report_content 

311 doc_hash = hashlib.sha256(content.encode("utf-8")).hexdigest() 

312 

313 document = ( 

314 session.query(Document) 

315 .filter(Document.document_hash == doc_hash) 

316 .first() 

317 ) 

318 

319 if document is None: 

320 doc_id = str(uuid.uuid4()) 

321 document = Document( 

322 id=doc_id, 

323 source_type_id=report_type_id, 

324 research_id=research.id, 

325 document_hash=doc_hash, 

326 file_size=len(content.encode("utf-8")), 

327 file_type="markdown", 

328 mime_type="text/markdown", 

329 title=research.title 

330 or (research.query[:100] if research.query else "Untitled"), 

331 text_content=content, 

332 status=DocumentStatus.COMPLETED, 

333 processed_at=datetime.now(UTC), 

334 character_count=len(content), 

335 word_count=len(content.split()), 

336 ) 

337 session.add(document) 

338 session.flush() 

339 

340 self._ensure_in_collection(document.id, collection_id, session) 

341 return document 

342 

343 def _ensure_in_collection( 

344 self, document_id: str, collection_id: str, session 

345 ) -> None: 

346 """Add document to collection if not already there.""" 

347 existing = ( 

348 session.query(DocumentCollection) 

349 .filter_by(document_id=document_id, collection_id=collection_id) 

350 .first() 

351 ) 

352 if not existing: 

353 session.add( 

354 DocumentCollection( 

355 document_id=document_id, 

356 collection_id=collection_id, 

357 indexed=False, 

358 ) 

359 ) 

360 

361 

362def auto_convert_research( 

363 username: str, research_id: str, db_password: str | None = None 

364): 

365 """Auto-convert a completed research entry to a document in the History collection. 

366 

367 Safe to call from any context — exceptions are caught and logged. 

368 """ 

369 try: 

370 indexer = ResearchHistoryIndexer(username, db_password=db_password) 

371 result = indexer.index_research(research_id) 

372 logger.info( 

373 f"Auto-converted research {research_id} for user {username}: " 

374 f"{result.get('status')}" 

375 ) 

376 except Exception: 

377 logger.exception( 

378 f"Failed to auto-convert research {research_id} for user {username}" 

379 )