Coverage for src / local_deep_research / web / services / research_sources_service.py: 88%

91 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Service for managing research sources/resources in the database. 

3 

4This service handles saving and retrieving sources from research 

5in a proper relational way using the ResearchResource table. 

6""" 

7 

8from typing import List, Dict, Any, Optional 

9from datetime import datetime, UTC 

10from loguru import logger 

11 

12from ...database.models import ResearchResource, ResearchHistory 

13from ...database.session_context import get_user_db_session 

14 

15 

16class ResearchSourcesService: 

17 """Service for managing research sources in the database.""" 

18 

19 @staticmethod 

20 def save_research_sources( 

21 research_id: str, 

22 sources: List[Dict[str, Any]], 

23 username: Optional[str] = None, 

24 ) -> int: 

25 """ 

26 Save sources from research to the ResearchResource table. 

27 

28 Args: 

29 research_id: The UUID of the research 

30 sources: List of source dictionaries with url, title, snippet, etc. 

31 username: Username for database access 

32 

33 Returns: 

34 Number of sources saved 

35 """ 

36 if not sources: 

37 logger.info(f"No sources to save for research {research_id}") 

38 return 0 

39 

40 saved_count = 0 

41 

42 try: 

43 with get_user_db_session(username) as db_session: 

44 # First check if resources already exist for this research 

45 existing = ( 

46 db_session.query(ResearchResource) 

47 .filter_by(research_id=research_id) 

48 .count() 

49 ) 

50 

51 if existing > 0: 

52 logger.info( 

53 f"Research {research_id} already has {existing} resources, skipping save" 

54 ) 

55 return existing 

56 

57 # Save each source as a ResearchResource 

58 for source in sources: 

59 try: 

60 # Extract fields from various possible formats 

61 url = source.get("url", "") or source.get("link", "") 

62 title = source.get("title", "") or source.get( 

63 "name", "" 

64 ) 

65 snippet = ( 

66 source.get("snippet", "") 

67 or source.get("content_preview", "") 

68 or source.get("description", "") 

69 ) 

70 source_type = source.get("source_type", "web") 

71 

72 # Skip if no URL 

73 if not url: 

74 continue 

75 

76 # Create resource record 

77 resource = ResearchResource( 

78 research_id=research_id, 

79 title=title or "Untitled", 

80 url=url, 

81 content_preview=snippet[:1000] 

82 if snippet 

83 else None, # Limit preview length 

84 source_type=source_type, 

85 resource_metadata={ 

86 "added_at": datetime.now(UTC).isoformat(), 

87 "original_data": source, # Keep original data for reference 

88 }, 

89 created_at=datetime.now(UTC).isoformat(), 

90 ) 

91 

92 db_session.add(resource) 

93 saved_count += 1 

94 

95 except Exception as e: 

96 logger.warning( 

97 f"Failed to save source {source.get('url', 'unknown')}: {e}" 

98 ) 

99 continue 

100 

101 # Commit all resources 

102 if saved_count > 0: 102 ↛ 112line 102 didn't jump to line 112

103 db_session.commit() 

104 logger.info( 

105 f"Saved {saved_count} sources for research {research_id}" 

106 ) 

107 

108 except Exception: 

109 logger.exception("Error saving research sources") 

110 raise 

111 

112 return saved_count 

113 

114 @staticmethod 

115 def get_research_sources( 

116 research_id: str, username: Optional[str] = None 

117 ) -> List[Dict[str, Any]]: 

118 """ 

119 Get all sources for a research from the database. 

120 

121 Args: 

122 research_id: The UUID of the research 

123 username: Username for database access 

124 

125 Returns: 

126 List of source dictionaries 

127 """ 

128 sources = [] 

129 

130 try: 

131 with get_user_db_session(username) as db_session: 

132 resources = ( 

133 db_session.query(ResearchResource) 

134 .filter_by(research_id=research_id) 

135 .order_by(ResearchResource.id.asc()) 

136 .all() 

137 ) 

138 

139 for resource in resources: 

140 sources.append( 

141 { 

142 "id": resource.id, 

143 "url": resource.url, 

144 "title": resource.title, 

145 "snippet": resource.content_preview, 

146 "content_preview": resource.content_preview, 

147 "source_type": resource.source_type, 

148 "metadata": resource.resource_metadata or {}, 

149 "created_at": resource.created_at, 

150 } 

151 ) 

152 

153 logger.info( 

154 f"Retrieved {len(sources)} sources for research {research_id}" 

155 ) 

156 

157 except Exception: 

158 logger.exception("Error retrieving research sources") 

159 raise 

160 

161 return sources 

162 

163 @staticmethod 

164 def copy_sources_to_new_research( 

165 from_research_id: str, 

166 to_research_id: str, 

167 source_ids: Optional[List[int]] = None, 

168 username: Optional[str] = None, 

169 ) -> int: 

170 """ 

171 Copy sources from one research to another (useful for follow-ups). 

172 

173 Args: 

174 from_research_id: Source research ID 

175 to_research_id: Destination research ID 

176 source_ids: Optional list of specific source IDs to copy 

177 username: Username for database access 

178 

179 Returns: 

180 Number of sources copied 

181 """ 

182 copied_count = 0 

183 

184 try: 

185 with get_user_db_session(username) as db_session: 

186 # Get sources to copy 

187 query = db_session.query(ResearchResource).filter_by( 

188 research_id=from_research_id 

189 ) 

190 

191 if source_ids: 

192 query = query.filter(ResearchResource.id.in_(source_ids)) 

193 

194 sources_to_copy = query.all() 

195 

196 # Copy each source 

197 for source in sources_to_copy: 

198 new_resource = ResearchResource( 

199 research_id=to_research_id, 

200 title=source.title, 

201 url=source.url, 

202 content_preview=source.content_preview, 

203 source_type=source.source_type, 

204 resource_metadata={ 

205 **(source.resource_metadata or {}), 

206 "copied_from": from_research_id, 

207 "copied_at": datetime.now(UTC).isoformat(), 

208 }, 

209 created_at=datetime.now(UTC).isoformat(), 

210 ) 

211 

212 db_session.add(new_resource) 

213 copied_count += 1 

214 

215 if copied_count > 0: 

216 db_session.commit() 

217 logger.info( 

218 f"Copied {copied_count} sources from {from_research_id} to {to_research_id}" 

219 ) 

220 

221 except Exception: 

222 logger.exception("Error copying research sources") 

223 raise 

224 

225 return copied_count 

226 

227 @staticmethod 

228 def update_research_with_sources( 

229 research_id: str, 

230 all_links_of_system: List[Dict[str, Any]], 

231 username: Optional[str] = None, 

232 ) -> bool: 

233 """ 

234 Update a completed research with its sources. 

235 This should be called when research completes. 

236 

237 Args: 

238 research_id: The UUID of the research 

239 all_links_of_system: List of all sources found during research 

240 username: Username for database access 

241 

242 Returns: 

243 True if successful 

244 """ 

245 try: 

246 # Save sources to ResearchResource table 

247 saved_count = ResearchSourcesService.save_research_sources( 

248 research_id, all_links_of_system, username 

249 ) 

250 

251 # Also update the research metadata to include source count 

252 with get_user_db_session(username) as db_session: 

253 research = ( 

254 db_session.query(ResearchHistory) 

255 .filter_by(id=research_id) 

256 .first() 

257 ) 

258 

259 if research: 

260 if not research.research_meta: 260 ↛ 264line 260 didn't jump to line 264 because the condition on line 260 was always true

261 research.research_meta = {} 

262 

263 # Update metadata with source information 

264 research.research_meta["sources_count"] = saved_count 

265 research.research_meta["has_sources"] = saved_count > 0 

266 

267 db_session.commit() 

268 logger.info( 

269 f"Updated research {research_id} with {saved_count} sources" 

270 ) 

271 return True 

272 else: 

273 logger.warning( 

274 f"Research {research_id} not found for source update" 

275 ) 

276 return False 

277 

278 except Exception: 

279 logger.exception("Error updating research with sources") 

280 return False