Coverage for src / local_deep_research / security / data_sanitizer.py: 97%

48 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1"""Security module for sanitizing sensitive data from data structures. 

2 

3This module ensures that sensitive information like API keys, passwords, and tokens 

4are not accidentally leaked in logs, files, or API responses. 

5 

6Includes helpers for filtering research metadata in API responses to prevent 

7settings_snapshot (which contains all application settings including API keys) 

8from being sent to the frontend. 

9""" 

10 

11import json 

12from typing import Any, Set 

13 

14 

15class DataSanitizer: 

16 """Utility class for removing sensitive information from data structures.""" 

17 

18 # Default set of sensitive key names to redact 

19 DEFAULT_SENSITIVE_KEYS: Set[str] = { 

20 "api_key", 

21 "apikey", 

22 "password", 

23 "secret", 

24 "access_token", 

25 "refresh_token", 

26 "private_key", 

27 "auth_token", 

28 "session_token", 

29 "csrf_token", 

30 } 

31 

32 @staticmethod 

33 def sanitize(data: Any, sensitive_keys: Set[str] | None = None) -> Any: 

34 """ 

35 Recursively remove sensitive keys from data structures. 

36 

37 This method traverses dictionaries and lists, removing any keys that match 

38 the sensitive keys list (case-insensitive). This prevents accidental 

39 credential leakage in optimization results, logs, or API responses. 

40 

41 Args: 

42 data: The data structure to sanitize (dict, list, or primitive) 

43 sensitive_keys: Set of key names to remove (case-insensitive). 

44 If None, uses DEFAULT_SENSITIVE_KEYS. 

45 

46 Returns: 

47 Sanitized copy of the data with sensitive keys removed 

48 

49 Example: 

50 >>> sanitizer = DataSanitizer() 

51 >>> data = {"username": "user", "api_key": "secret123"} 

52 >>> sanitizer.sanitize(data) 

53 {"username": "user"} 

54 """ 

55 if sensitive_keys is None: 

56 sensitive_keys = DataSanitizer.DEFAULT_SENSITIVE_KEYS 

57 

58 # Convert to lowercase for case-insensitive comparison 

59 sensitive_keys_lower = {key.lower() for key in sensitive_keys} 

60 

61 if isinstance(data, dict): 

62 return { 

63 k: DataSanitizer.sanitize(v, sensitive_keys) 

64 for k, v in data.items() 

65 if k.lower() not in sensitive_keys_lower 

66 } 

67 elif isinstance(data, list): 

68 return [ 

69 DataSanitizer.sanitize(item, sensitive_keys) for item in data 

70 ] 

71 else: 

72 # Return primitives unchanged 

73 return data 

74 

75 @staticmethod 

76 def redact( 

77 data: Any, 

78 sensitive_keys: Set[str] | None = None, 

79 redaction_text: str = "[REDACTED]", 

80 ) -> Any: 

81 """ 

82 Recursively redact (replace with placeholder) sensitive values in data structures. 

83 

84 Unlike sanitize() which removes keys entirely, this method replaces their 

85 values with a redaction placeholder, preserving the structure. 

86 

87 Args: 

88 data: The data structure to redact (dict, list, or primitive) 

89 sensitive_keys: Set of key names to redact (case-insensitive). 

90 If None, uses DEFAULT_SENSITIVE_KEYS. 

91 redaction_text: Text to replace sensitive values with 

92 

93 Returns: 

94 Copy of the data with sensitive values redacted 

95 

96 Example: 

97 >>> sanitizer = DataSanitizer() 

98 >>> data = {"username": "user", "api_key": "secret123"} 

99 >>> sanitizer.redact(data) 

100 {"username": "user", "api_key": "[REDACTED]"} 

101 """ 

102 if sensitive_keys is None: 

103 sensitive_keys = DataSanitizer.DEFAULT_SENSITIVE_KEYS 

104 

105 # Convert to lowercase for case-insensitive comparison 

106 sensitive_keys_lower = {key.lower() for key in sensitive_keys} 

107 

108 if isinstance(data, dict): 

109 return { 

110 k: ( 

111 redaction_text 

112 if k.lower() in sensitive_keys_lower 

113 else DataSanitizer.redact(v, sensitive_keys, redaction_text) 

114 ) 

115 for k, v in data.items() 

116 } 

117 elif isinstance(data, list): 

118 return [ 

119 DataSanitizer.redact(item, sensitive_keys, redaction_text) 

120 for item in data 

121 ] 

122 else: 

123 # Return primitives unchanged 

124 return data 

125 

126 

127# Convenience functions for direct use 

128def sanitize_data(data: Any, sensitive_keys: Set[str] | None = None) -> Any: 

129 """ 

130 Remove sensitive keys from data structures. 

131 

132 Convenience function that calls DataSanitizer.sanitize(). 

133 

134 Args: 

135 data: The data structure to sanitize 

136 sensitive_keys: Optional set of sensitive key names 

137 

138 Returns: 

139 Sanitized copy of the data 

140 """ 

141 return DataSanitizer.sanitize(data, sensitive_keys) 

142 

143 

144def redact_data( 

145 data: Any, 

146 sensitive_keys: Set[str] | None = None, 

147 redaction_text: str = "[REDACTED]", 

148) -> Any: 

149 """ 

150 Redact (replace) sensitive values in data structures. 

151 

152 Convenience function that calls DataSanitizer.redact(). 

153 

154 Args: 

155 data: The data structure to redact 

156 sensitive_keys: Optional set of sensitive key names 

157 redaction_text: Text to replace sensitive values with 

158 

159 Returns: 

160 Copy of the data with sensitive values redacted 

161 """ 

162 return DataSanitizer.redact(data, sensitive_keys, redaction_text) 

163 

164 

165def filter_research_metadata(research_meta: Any) -> dict: 

166 """Filter research_meta to only safe fields for history list API responses. 

167 

168 Uses an allowlist approach to prevent leaking settings_snapshot 

169 (which contains API keys, passwords, tokens) to the frontend. 

170 History list consumers only need is_news_search from metadata. 

171 

172 Args: 

173 research_meta: Raw research metadata (dict, JSON string, or None) 

174 

175 Returns: 

176 dict with only safe fields extracted (currently: is_news_search) 

177 """ 

178 try: 

179 meta = research_meta or {} 

180 if isinstance(meta, str): 

181 meta = json.loads(meta) 

182 if not isinstance(meta, dict): 

183 return {"is_news_search": False} 

184 return { 

185 "is_news_search": bool(meta.get("is_news_search", False)), 

186 } 

187 except (json.JSONDecodeError, TypeError, AttributeError): 

188 return {"is_news_search": False} 

189 

190 

191def strip_settings_snapshot(research_meta: Any) -> dict: 

192 """Remove settings_snapshot from research_meta for API responses. 

193 

194 settings_snapshot contains all application settings including API keys. 

195 This strips it while preserving all other metadata fields that the 

196 frontend needs (phase, error_type, processed_query, mode, duration, etc.). 

197 

198 Args: 

199 research_meta: Raw research metadata (dict, JSON string, or None) 

200 

201 Returns: 

202 Copy of the dict with settings_snapshot removed 

203 """ 

204 try: 

205 meta = research_meta or {} 

206 if isinstance(meta, str): 

207 meta = json.loads(meta) 

208 if not isinstance(meta, dict): 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 return {} 

210 return {k: v for k, v in meta.items() if k != "settings_snapshot"} 

211 except (json.JSONDecodeError, TypeError, AttributeError): 

212 return {}