Coverage for src / local_deep_research / security / data_sanitizer.py: 100%

48 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1"""Security module for sanitizing sensitive data from data structures. 

2 

3This module ensures that sensitive information like API keys, passwords, and tokens 

4are not accidentally leaked in logs, files, or API responses. 

5 

6Includes helpers for filtering research metadata in API responses to prevent 

7settings_snapshot (which contains all application settings including API keys) 

8from being sent to the frontend. 

9""" 

10 

11import json 

12from typing import Any, Set 

13 

14 

15class DataSanitizer: 

16 """Utility class for removing sensitive information from data structures.""" 

17 

18 # Default set of sensitive key names to redact 

19 DEFAULT_SENSITIVE_KEYS: Set[str] = { 

20 "api_key", 

21 "apikey", 

22 "password", 

23 "secret", 

24 "access_token", 

25 "refresh_token", 

26 "private_key", 

27 "auth_token", 

28 "session_token", 

29 "csrf_token", 

30 } 

31 

32 @staticmethod 

33 def sanitize(data: Any, sensitive_keys: Set[str] | None = None) -> Any: 

34 """ 

35 Recursively remove sensitive keys from data structures. 

36 

37 This method traverses dictionaries and lists, removing any keys that match 

38 the sensitive keys list (case-insensitive). This prevents accidental 

39 credential leakage in optimization results, logs, or API responses. 

40 

41 Args: 

42 data: The data structure to sanitize (dict, list, or primitive) 

43 sensitive_keys: Set of key names to remove (case-insensitive). 

44 If None, uses DEFAULT_SENSITIVE_KEYS. 

45 

46 Returns: 

47 Sanitized copy of the data with sensitive keys removed 

48 

49 Example: 

50 >>> sanitizer = DataSanitizer() 

51 >>> data = {"username": "user", "api_key": "secret123"} 

52 >>> sanitizer.sanitize(data) 

53 {"username": "user"} 

54 """ 

55 if sensitive_keys is None: 

56 sensitive_keys = DataSanitizer.DEFAULT_SENSITIVE_KEYS 

57 

58 # Convert to lowercase for case-insensitive comparison 

59 sensitive_keys_lower = {key.lower() for key in sensitive_keys} 

60 

61 if isinstance(data, dict): 

62 return { 

63 k: DataSanitizer.sanitize(v, sensitive_keys) 

64 for k, v in data.items() 

65 if k.lower() not in sensitive_keys_lower 

66 } 

67 if isinstance(data, list): 

68 return [ 

69 DataSanitizer.sanitize(item, sensitive_keys) for item in data 

70 ] 

71 # Return primitives unchanged 

72 return data 

73 

74 @staticmethod 

75 def redact( 

76 data: Any, 

77 sensitive_keys: Set[str] | None = None, 

78 redaction_text: str = "[REDACTED]", 

79 ) -> Any: 

80 """ 

81 Recursively redact (replace with placeholder) sensitive values in data structures. 

82 

83 Unlike sanitize() which removes keys entirely, this method replaces their 

84 values with a redaction placeholder, preserving the structure. 

85 

86 Args: 

87 data: The data structure to redact (dict, list, or primitive) 

88 sensitive_keys: Set of key names to redact (case-insensitive). 

89 If None, uses DEFAULT_SENSITIVE_KEYS. 

90 redaction_text: Text to replace sensitive values with 

91 

92 Returns: 

93 Copy of the data with sensitive values redacted 

94 

95 Example: 

96 >>> sanitizer = DataSanitizer() 

97 >>> data = {"username": "user", "api_key": "secret123"} 

98 >>> sanitizer.redact(data) 

99 {"username": "user", "api_key": "[REDACTED]"} 

100 """ 

101 if sensitive_keys is None: 

102 sensitive_keys = DataSanitizer.DEFAULT_SENSITIVE_KEYS 

103 

104 # Convert to lowercase for case-insensitive comparison 

105 sensitive_keys_lower = {key.lower() for key in sensitive_keys} 

106 

107 if isinstance(data, dict): 

108 return { 

109 k: ( 

110 redaction_text 

111 if k.lower() in sensitive_keys_lower 

112 else DataSanitizer.redact(v, sensitive_keys, redaction_text) 

113 ) 

114 for k, v in data.items() 

115 } 

116 if isinstance(data, list): 

117 return [ 

118 DataSanitizer.redact(item, sensitive_keys, redaction_text) 

119 for item in data 

120 ] 

121 # Return primitives unchanged 

122 return data 

123 

124 

125# Convenience functions for direct use 

126def sanitize_data(data: Any, sensitive_keys: Set[str] | None = None) -> Any: 

127 """ 

128 Remove sensitive keys from data structures. 

129 

130 Convenience function that calls DataSanitizer.sanitize(). 

131 

132 Args: 

133 data: The data structure to sanitize 

134 sensitive_keys: Optional set of sensitive key names 

135 

136 Returns: 

137 Sanitized copy of the data 

138 """ 

139 return DataSanitizer.sanitize(data, sensitive_keys) 

140 

141 

142def redact_data( 

143 data: Any, 

144 sensitive_keys: Set[str] | None = None, 

145 redaction_text: str = "[REDACTED]", 

146) -> Any: 

147 """ 

148 Redact (replace) sensitive values in data structures. 

149 

150 Convenience function that calls DataSanitizer.redact(). 

151 

152 Args: 

153 data: The data structure to redact 

154 sensitive_keys: Optional set of sensitive key names 

155 redaction_text: Text to replace sensitive values with 

156 

157 Returns: 

158 Copy of the data with sensitive values redacted 

159 """ 

160 return DataSanitizer.redact(data, sensitive_keys, redaction_text) 

161 

162 

163def filter_research_metadata(research_meta: Any) -> dict: 

164 """Filter research_meta to only safe fields for history list API responses. 

165 

166 Uses an allowlist approach to prevent leaking settings_snapshot 

167 (which contains API keys, passwords, tokens) to the frontend. 

168 History list consumers only need is_news_search from metadata. 

169 

170 Args: 

171 research_meta: Raw research metadata (dict, JSON string, or None) 

172 

173 Returns: 

174 dict with only safe fields extracted (currently: is_news_search) 

175 """ 

176 try: 

177 meta = research_meta or {} 

178 if isinstance(meta, str): 

179 meta = json.loads(meta) 

180 if not isinstance(meta, dict): 

181 return {"is_news_search": False} 

182 return { 

183 "is_news_search": bool(meta.get("is_news_search", False)), 

184 } 

185 except (json.JSONDecodeError, TypeError, AttributeError): 

186 return {"is_news_search": False} 

187 

188 

189def strip_settings_snapshot(research_meta: Any) -> dict: 

190 """Remove settings_snapshot from research_meta for API responses. 

191 

192 settings_snapshot contains all application settings including API keys. 

193 This strips it while preserving all other metadata fields that the 

194 frontend needs (phase, error_type, processed_query, mode, duration, etc.). 

195 

196 Args: 

197 research_meta: Raw research metadata (dict, JSON string, or None) 

198 

199 Returns: 

200 Copy of the dict with settings_snapshot removed 

201 """ 

202 try: 

203 meta = research_meta or {} 

204 if isinstance(meta, str): 

205 meta = json.loads(meta) 

206 if not isinstance(meta, dict): 

207 return {} 

208 return {k: v for k, v in meta.items() if k != "settings_snapshot"} 

209 except (json.JSONDecodeError, TypeError, AttributeError): 

210 return {}