Coverage for src / local_deep_research / security / data_sanitizer.py: 100%
48 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""Security module for sanitizing sensitive data from data structures.
3This module ensures that sensitive information like API keys, passwords, and tokens
4are not accidentally leaked in logs, files, or API responses.
6Includes helpers for filtering research metadata in API responses to prevent
7settings_snapshot (which contains all application settings including API keys)
8from being sent to the frontend.
9"""
11import json
12from typing import Any, Set
15class DataSanitizer:
16 """Utility class for removing sensitive information from data structures."""
18 # Default set of sensitive key names to redact
19 DEFAULT_SENSITIVE_KEYS: Set[str] = {
20 "api_key",
21 "apikey",
22 "password",
23 "secret",
24 "access_token",
25 "refresh_token",
26 "private_key",
27 "auth_token",
28 "session_token",
29 "csrf_token",
30 }
32 @staticmethod
33 def sanitize(data: Any, sensitive_keys: Set[str] | None = None) -> Any:
34 """
35 Recursively remove sensitive keys from data structures.
37 This method traverses dictionaries and lists, removing any keys that match
38 the sensitive keys list (case-insensitive). This prevents accidental
39 credential leakage in optimization results, logs, or API responses.
41 Args:
42 data: The data structure to sanitize (dict, list, or primitive)
43 sensitive_keys: Set of key names to remove (case-insensitive).
44 If None, uses DEFAULT_SENSITIVE_KEYS.
46 Returns:
47 Sanitized copy of the data with sensitive keys removed
49 Example:
50 >>> sanitizer = DataSanitizer()
51 >>> data = {"username": "user", "api_key": "secret123"}
52 >>> sanitizer.sanitize(data)
53 {"username": "user"}
54 """
55 if sensitive_keys is None:
56 sensitive_keys = DataSanitizer.DEFAULT_SENSITIVE_KEYS
58 # Convert to lowercase for case-insensitive comparison
59 sensitive_keys_lower = {key.lower() for key in sensitive_keys}
61 if isinstance(data, dict):
62 return {
63 k: DataSanitizer.sanitize(v, sensitive_keys)
64 for k, v in data.items()
65 if k.lower() not in sensitive_keys_lower
66 }
67 if isinstance(data, list):
68 return [
69 DataSanitizer.sanitize(item, sensitive_keys) for item in data
70 ]
71 # Return primitives unchanged
72 return data
74 @staticmethod
75 def redact(
76 data: Any,
77 sensitive_keys: Set[str] | None = None,
78 redaction_text: str = "[REDACTED]",
79 ) -> Any:
80 """
81 Recursively redact (replace with placeholder) sensitive values in data structures.
83 Unlike sanitize() which removes keys entirely, this method replaces their
84 values with a redaction placeholder, preserving the structure.
86 Args:
87 data: The data structure to redact (dict, list, or primitive)
88 sensitive_keys: Set of key names to redact (case-insensitive).
89 If None, uses DEFAULT_SENSITIVE_KEYS.
90 redaction_text: Text to replace sensitive values with
92 Returns:
93 Copy of the data with sensitive values redacted
95 Example:
96 >>> sanitizer = DataSanitizer()
97 >>> data = {"username": "user", "api_key": "secret123"}
98 >>> sanitizer.redact(data)
99 {"username": "user", "api_key": "[REDACTED]"}
100 """
101 if sensitive_keys is None:
102 sensitive_keys = DataSanitizer.DEFAULT_SENSITIVE_KEYS
104 # Convert to lowercase for case-insensitive comparison
105 sensitive_keys_lower = {key.lower() for key in sensitive_keys}
107 if isinstance(data, dict):
108 return {
109 k: (
110 redaction_text
111 if k.lower() in sensitive_keys_lower
112 else DataSanitizer.redact(v, sensitive_keys, redaction_text)
113 )
114 for k, v in data.items()
115 }
116 if isinstance(data, list):
117 return [
118 DataSanitizer.redact(item, sensitive_keys, redaction_text)
119 for item in data
120 ]
121 # Return primitives unchanged
122 return data
125# Convenience functions for direct use
126def sanitize_data(data: Any, sensitive_keys: Set[str] | None = None) -> Any:
127 """
128 Remove sensitive keys from data structures.
130 Convenience function that calls DataSanitizer.sanitize().
132 Args:
133 data: The data structure to sanitize
134 sensitive_keys: Optional set of sensitive key names
136 Returns:
137 Sanitized copy of the data
138 """
139 return DataSanitizer.sanitize(data, sensitive_keys)
142def redact_data(
143 data: Any,
144 sensitive_keys: Set[str] | None = None,
145 redaction_text: str = "[REDACTED]",
146) -> Any:
147 """
148 Redact (replace) sensitive values in data structures.
150 Convenience function that calls DataSanitizer.redact().
152 Args:
153 data: The data structure to redact
154 sensitive_keys: Optional set of sensitive key names
155 redaction_text: Text to replace sensitive values with
157 Returns:
158 Copy of the data with sensitive values redacted
159 """
160 return DataSanitizer.redact(data, sensitive_keys, redaction_text)
163def filter_research_metadata(research_meta: Any) -> dict:
164 """Filter research_meta to only safe fields for history list API responses.
166 Uses an allowlist approach to prevent leaking settings_snapshot
167 (which contains API keys, passwords, tokens) to the frontend.
168 History list consumers only need is_news_search from metadata.
170 Args:
171 research_meta: Raw research metadata (dict, JSON string, or None)
173 Returns:
174 dict with only safe fields extracted (currently: is_news_search)
175 """
176 try:
177 meta = research_meta or {}
178 if isinstance(meta, str):
179 meta = json.loads(meta)
180 if not isinstance(meta, dict):
181 return {"is_news_search": False}
182 return {
183 "is_news_search": bool(meta.get("is_news_search", False)),
184 }
185 except (json.JSONDecodeError, TypeError, AttributeError):
186 return {"is_news_search": False}
189def strip_settings_snapshot(research_meta: Any) -> dict:
190 """Remove settings_snapshot from research_meta for API responses.
192 settings_snapshot contains all application settings including API keys.
193 This strips it while preserving all other metadata fields that the
194 frontend needs (phase, error_type, processed_query, mode, duration, etc.).
196 Args:
197 research_meta: Raw research metadata (dict, JSON string, or None)
199 Returns:
200 Copy of the dict with settings_snapshot removed
201 """
202 try:
203 meta = research_meta or {}
204 if isinstance(meta, str):
205 meta = json.loads(meta)
206 if not isinstance(meta, dict):
207 return {}
208 return {k: v for k, v in meta.items() if k != "settings_snapshot"}
209 except (json.JSONDecodeError, TypeError, AttributeError):
210 return {}