Coverage for src / local_deep_research / security / data_sanitizer.py: 97%
48 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""Security module for sanitizing sensitive data from data structures.
3This module ensures that sensitive information like API keys, passwords, and tokens
4are not accidentally leaked in logs, files, or API responses.
6Includes helpers for filtering research metadata in API responses to prevent
7settings_snapshot (which contains all application settings including API keys)
8from being sent to the frontend.
9"""
11import json
12from typing import Any, Set
15class DataSanitizer:
16 """Utility class for removing sensitive information from data structures."""
18 # Default set of sensitive key names to redact
19 DEFAULT_SENSITIVE_KEYS: Set[str] = {
20 "api_key",
21 "apikey",
22 "password",
23 "secret",
24 "access_token",
25 "refresh_token",
26 "private_key",
27 "auth_token",
28 "session_token",
29 "csrf_token",
30 }
32 @staticmethod
33 def sanitize(data: Any, sensitive_keys: Set[str] | None = None) -> Any:
34 """
35 Recursively remove sensitive keys from data structures.
37 This method traverses dictionaries and lists, removing any keys that match
38 the sensitive keys list (case-insensitive). This prevents accidental
39 credential leakage in optimization results, logs, or API responses.
41 Args:
42 data: The data structure to sanitize (dict, list, or primitive)
43 sensitive_keys: Set of key names to remove (case-insensitive).
44 If None, uses DEFAULT_SENSITIVE_KEYS.
46 Returns:
47 Sanitized copy of the data with sensitive keys removed
49 Example:
50 >>> sanitizer = DataSanitizer()
51 >>> data = {"username": "user", "api_key": "secret123"}
52 >>> sanitizer.sanitize(data)
53 {"username": "user"}
54 """
55 if sensitive_keys is None:
56 sensitive_keys = DataSanitizer.DEFAULT_SENSITIVE_KEYS
58 # Convert to lowercase for case-insensitive comparison
59 sensitive_keys_lower = {key.lower() for key in sensitive_keys}
61 if isinstance(data, dict):
62 return {
63 k: DataSanitizer.sanitize(v, sensitive_keys)
64 for k, v in data.items()
65 if k.lower() not in sensitive_keys_lower
66 }
67 elif isinstance(data, list):
68 return [
69 DataSanitizer.sanitize(item, sensitive_keys) for item in data
70 ]
71 else:
72 # Return primitives unchanged
73 return data
75 @staticmethod
76 def redact(
77 data: Any,
78 sensitive_keys: Set[str] | None = None,
79 redaction_text: str = "[REDACTED]",
80 ) -> Any:
81 """
82 Recursively redact (replace with placeholder) sensitive values in data structures.
84 Unlike sanitize() which removes keys entirely, this method replaces their
85 values with a redaction placeholder, preserving the structure.
87 Args:
88 data: The data structure to redact (dict, list, or primitive)
89 sensitive_keys: Set of key names to redact (case-insensitive).
90 If None, uses DEFAULT_SENSITIVE_KEYS.
91 redaction_text: Text to replace sensitive values with
93 Returns:
94 Copy of the data with sensitive values redacted
96 Example:
97 >>> sanitizer = DataSanitizer()
98 >>> data = {"username": "user", "api_key": "secret123"}
99 >>> sanitizer.redact(data)
100 {"username": "user", "api_key": "[REDACTED]"}
101 """
102 if sensitive_keys is None:
103 sensitive_keys = DataSanitizer.DEFAULT_SENSITIVE_KEYS
105 # Convert to lowercase for case-insensitive comparison
106 sensitive_keys_lower = {key.lower() for key in sensitive_keys}
108 if isinstance(data, dict):
109 return {
110 k: (
111 redaction_text
112 if k.lower() in sensitive_keys_lower
113 else DataSanitizer.redact(v, sensitive_keys, redaction_text)
114 )
115 for k, v in data.items()
116 }
117 elif isinstance(data, list):
118 return [
119 DataSanitizer.redact(item, sensitive_keys, redaction_text)
120 for item in data
121 ]
122 else:
123 # Return primitives unchanged
124 return data
127# Convenience functions for direct use
128def sanitize_data(data: Any, sensitive_keys: Set[str] | None = None) -> Any:
129 """
130 Remove sensitive keys from data structures.
132 Convenience function that calls DataSanitizer.sanitize().
134 Args:
135 data: The data structure to sanitize
136 sensitive_keys: Optional set of sensitive key names
138 Returns:
139 Sanitized copy of the data
140 """
141 return DataSanitizer.sanitize(data, sensitive_keys)
144def redact_data(
145 data: Any,
146 sensitive_keys: Set[str] | None = None,
147 redaction_text: str = "[REDACTED]",
148) -> Any:
149 """
150 Redact (replace) sensitive values in data structures.
152 Convenience function that calls DataSanitizer.redact().
154 Args:
155 data: The data structure to redact
156 sensitive_keys: Optional set of sensitive key names
157 redaction_text: Text to replace sensitive values with
159 Returns:
160 Copy of the data with sensitive values redacted
161 """
162 return DataSanitizer.redact(data, sensitive_keys, redaction_text)
165def filter_research_metadata(research_meta: Any) -> dict:
166 """Filter research_meta to only safe fields for history list API responses.
168 Uses an allowlist approach to prevent leaking settings_snapshot
169 (which contains API keys, passwords, tokens) to the frontend.
170 History list consumers only need is_news_search from metadata.
172 Args:
173 research_meta: Raw research metadata (dict, JSON string, or None)
175 Returns:
176 dict with only safe fields extracted (currently: is_news_search)
177 """
178 try:
179 meta = research_meta or {}
180 if isinstance(meta, str):
181 meta = json.loads(meta)
182 if not isinstance(meta, dict):
183 return {"is_news_search": False}
184 return {
185 "is_news_search": bool(meta.get("is_news_search", False)),
186 }
187 except (json.JSONDecodeError, TypeError, AttributeError):
188 return {"is_news_search": False}
191def strip_settings_snapshot(research_meta: Any) -> dict:
192 """Remove settings_snapshot from research_meta for API responses.
194 settings_snapshot contains all application settings including API keys.
195 This strips it while preserving all other metadata fields that the
196 frontend needs (phase, error_type, processed_query, mode, duration, etc.).
198 Args:
199 research_meta: Raw research metadata (dict, JSON string, or None)
201 Returns:
202 Copy of the dict with settings_snapshot removed
203 """
204 try:
205 meta = research_meta or {}
206 if isinstance(meta, str):
207 meta = json.loads(meta)
208 if not isinstance(meta, dict): 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 return {}
210 return {k: v for k, v in meta.items() if k != "settings_snapshot"}
211 except (json.JSONDecodeError, TypeError, AttributeError):
212 return {}