Coverage for src / local_deep_research / security / file_write_verifier.py: 100%
41 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""Security module for verified file write operations.
3This module ensures that file writes only occur when explicitly allowed by configuration,
4maintaining the encryption-at-rest security model.
5"""
7import json
8from pathlib import Path
9from typing import Any
11from loguru import logger
13# Keys that should never be written to disk in clear text
14SENSITIVE_KEYS = frozenset(
15 {
16 "password",
17 "api_key",
18 "apikey",
19 "api-key",
20 "secret",
21 "secret_key",
22 "secretkey",
23 "token",
24 "access_token",
25 "refresh_token",
26 "private_key",
27 "privatekey",
28 "credentials",
29 "auth",
30 "authorization",
31 }
32)
35def _sanitize_sensitive_data(data: Any) -> Any:
36 """Remove sensitive keys from data before writing to disk.
38 Args:
39 data: Data to sanitize (dict, list, or primitive)
41 Returns:
42 Sanitized copy of the data with sensitive keys redacted
43 """
44 if isinstance(data, dict):
45 result = {}
46 for key, value in data.items():
47 key_lower = key.lower() if isinstance(key, str) else key
48 if key_lower in SENSITIVE_KEYS:
49 result[key] = "[REDACTED]"
50 else:
51 result[key] = _sanitize_sensitive_data(value)
52 return result
53 if isinstance(data, list):
54 return [_sanitize_sensitive_data(item) for item in data]
55 return data
58class FileWriteSecurityError(Exception):
59 """Raised when a file write operation is not allowed by security settings."""
61 pass
64def write_file_verified(
65 filepath: str | Path,
66 content: str,
67 setting_name: str,
68 required_value: Any = True,
69 context: str = "",
70 mode: str = "w",
71 encoding: str = "utf-8",
72 settings_snapshot: dict = None,
73) -> None:
74 """Write content to a file only if security settings allow it.
76 Args:
77 filepath: Path to the file to write
78 content: Content to write to the file
79 setting_name: Configuration setting name to check (e.g., "api.allow_file_output")
80 required_value: Required value for the setting (default: True)
81 context: Description of what's being written (for error messages)
82 mode: File open mode (default: "w")
83 encoding: File encoding (default: "utf-8")
84 settings_snapshot: Optional settings snapshot for programmatic mode
86 Raises:
87 FileWriteSecurityError: If the security setting doesn't match required value
89 Example:
90 >>> write_file_verified(
91 ... "report.md",
92 ... markdown_content,
93 ... "api.allow_file_output",
94 ... context="API research report"
95 ... )
96 """
97 from ..config.search_config import get_setting_from_snapshot
99 try:
100 actual_value = get_setting_from_snapshot(
101 setting_name, settings_snapshot=settings_snapshot
102 )
103 except Exception:
104 # Setting doesn't exist - default deny
105 actual_value = None
107 if actual_value != required_value:
108 error_msg = (
109 f"File write not allowed: {context or 'file operation'}. "
110 f"Set '{setting_name}={required_value}' in config to enable this feature."
111 )
112 logger.warning(error_msg)
113 raise FileWriteSecurityError(error_msg)
115 # Don't pass encoding for binary mode
116 # Note: This function writes non-sensitive data (PDFs, reports) after security check.
117 # CodeQL false positive: content is PDF binary or markdown, not passwords.
118 if "b" in mode:
119 with open(filepath, mode) as f: # nosec B603
120 f.write(content)
121 else:
122 with open(filepath, mode, encoding=encoding) as f: # nosec B603
123 f.write(content)
125 logger.debug(
126 f"Verified file write: {filepath} (setting: {setting_name}={required_value})"
127 )
130def write_json_verified(
131 filepath: str | Path,
132 data: dict | list,
133 setting_name: str,
134 required_value: Any = True,
135 context: str = "",
136 settings_snapshot: dict = None,
137 **json_kwargs,
138) -> None:
139 """Write JSON data to a file only if security settings allow it.
141 Args:
142 filepath: Path to the file to write
143 data: Dictionary or list to serialize as JSON
144 setting_name: Configuration setting name to check
145 required_value: Required value for the setting (default: True)
146 context: Description of what's being written (for error messages)
147 settings_snapshot: Optional settings snapshot for programmatic mode
148 **json_kwargs: Additional keyword arguments to pass to json.dumps()
149 (e.g., indent=2, ensure_ascii=False, sort_keys=True, default=custom_serializer)
151 Raises:
152 FileWriteSecurityError: If the security setting doesn't match required value
154 Example:
155 >>> write_json_verified(
156 ... "results.json",
157 ... {"accuracy": 0.95},
158 ... "benchmark.allow_file_output",
159 ... context="benchmark results",
160 ... indent=2,
161 ... sort_keys=True
162 ... )
163 """
164 # Default to indent=2 if not specified for readability
165 if "indent" not in json_kwargs:
166 json_kwargs["indent"] = 2
168 # Sanitize sensitive data before writing to disk
169 sanitized_data = _sanitize_sensitive_data(data)
170 content = json.dumps(sanitized_data, **json_kwargs)
171 write_file_verified(
172 filepath,
173 content,
174 setting_name,
175 required_value,
176 context,
177 mode="w",
178 encoding="utf-8",
179 settings_snapshot=settings_snapshot,
180 )