Coverage for src / local_deep_research / web_search_engines / engines / search_engine_local_all.py: 13%

63 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Search engine that searches across all local collections 

3""" 

4 

5from typing import Any, Dict, List, Optional, cast 

6 

7from langchain_core.language_models import BaseLLM 

8from loguru import logger 

9 

10from ..search_engine_base import BaseSearchEngine 

11from ..search_engine_factory import create_search_engine 

12from ..search_engines_config import local_search_engines 

13from .search_engine_local import LocalSearchEngine 

14 

15 

16class LocalAllSearchEngine(BaseSearchEngine): 

17 """ 

18 Search engine that searches across all local document collections. 

19 Acts as a meta search engine specifically for local collections. 

20 """ 

21 

22 def __init__( 

23 self, 

24 llm: Optional[BaseLLM] = None, 

25 max_results: int = 10, 

26 max_filtered_results: Optional[int] = None, 

27 settings_snapshot: Optional[Dict[str, Any]] = None, 

28 programmatic_mode: bool = False, 

29 **kwargs, 

30 ): 

31 """ 

32 Initialize the local all-collections search engine. 

33 

34 Args: 

35 llm: Language model for relevance filtering 

36 max_results: Maximum number of search results 

37 max_filtered_results: Maximum results after filtering 

38 settings_snapshot: Settings snapshot for thread context 

39 programmatic_mode: If True, disables database operations and metrics tracking 

40 **kwargs: Additional parameters passed to LocalSearchEngine instances 

41 """ 

42 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results 

43 super().__init__( 

44 llm=llm, 

45 max_filtered_results=max_filtered_results, 

46 max_results=max_results, 

47 settings_snapshot=settings_snapshot, 

48 programmatic_mode=programmatic_mode, 

49 ) 

50 

51 # Find all local collection search engines 

52 self.local_engines = {} 

53 try: 

54 for collection_id in local_search_engines(): 

55 # Create a search engine for this collection 

56 try: 

57 engine = create_search_engine( 

58 collection_id, 

59 llm=llm, 

60 max_filtered_results=max_filtered_results, 

61 settings_snapshot=settings_snapshot, 

62 programmatic_mode=programmatic_mode, 

63 ) 

64 engine = cast(LocalSearchEngine, engine) 

65 

66 if engine: 

67 self.local_engines[collection_id] = { 

68 "engine": engine, 

69 "name": engine.name, 

70 "description": engine.description, 

71 } 

72 except Exception: 

73 logger.exception( 

74 f"Error creating search engine for collection '{collection_id}'" 

75 ) 

76 except ImportError: 

77 logger.warning("No local collections configuration found") 

78 

79 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

80 """ 

81 Get preview information for documents from all local collections. 

82 

83 Args: 

84 query: The search query 

85 

86 Returns: 

87 List of preview dictionaries 

88 """ 

89 logger.info( 

90 f"Searching across all local collections for query: {query}" 

91 ) 

92 

93 all_previews = [] 

94 

95 # Get previews from each local search engine 

96 for collection_id, engine_info in self.local_engines.items(): 

97 engine = engine_info["engine"] 

98 try: 

99 # Get previews from this engine 

100 previews = engine._get_previews(query) 

101 

102 # Add collection info to each preview 

103 for preview in previews: 

104 preview["collection_id"] = collection_id 

105 preview["collection_name"] = engine_info["name"] 

106 preview["collection_description"] = engine_info[ 

107 "description" 

108 ] 

109 

110 all_previews.extend(previews) 

111 except Exception: 

112 logger.exception( 

113 f"Error searching collection '{collection_id}'" 

114 ) 

115 

116 if not all_previews: 

117 logger.info(f"No local documents found for query: {query}") 

118 return [] 

119 

120 # Sort by similarity score if available 

121 all_previews.sort( 

122 key=lambda x: float(x.get("similarity", 0)), reverse=True 

123 ) 

124 

125 # Limit to max_results 

126 return all_previews[: self.max_results] 

127 

128 def _get_full_content( 

129 self, relevant_items: List[Dict[str, Any]] 

130 ) -> List[Dict[str, Any]]: 

131 """ 

132 Get full content for the relevant documents. 

133 Delegates to the appropriate collection's search engine. 

134 

135 Args: 

136 relevant_items: List of relevant preview dictionaries 

137 

138 Returns: 

139 List of result dictionaries with full content 

140 """ 

141 # Group items by collection 

142 items_by_collection = {} 

143 for item in relevant_items: 

144 collection_id = item.get("collection_id") 

145 if collection_id and collection_id in self.local_engines: 

146 if collection_id not in items_by_collection: 

147 items_by_collection[collection_id] = [] 

148 items_by_collection[collection_id].append(item) 

149 

150 # Process each collection's items with its own engine 

151 all_results = [] 

152 for collection_id, items in items_by_collection.items(): 

153 engine = self.local_engines[collection_id]["engine"] 

154 try: 

155 results = engine._get_full_content(items) 

156 all_results.extend(results) 

157 except Exception: 

158 logger.exception( 

159 f"Error getting full content from collection '{collection_id}'" 

160 ) 

161 # Fall back to returning the items without full content 

162 all_results.extend(items) 

163 

164 # Add any items that weren't processed 

165 processed_ids = set(item["id"] for item in all_results) 

166 for item in relevant_items: 

167 if item["id"] not in processed_ids: 

168 all_results.append(item) 

169 

170 return all_results