Coverage for src / local_deep_research / security / file_write_verifier.py: 36%
41 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""Security module for verified file write operations.
3This module ensures that file writes only occur when explicitly allowed by configuration,
4maintaining the encryption-at-rest security model.
5"""
7import json
8from pathlib import Path
9from typing import Any
11from loguru import logger
13# Keys that should never be written to disk in clear text
14SENSITIVE_KEYS = frozenset(
15 {
16 "password",
17 "api_key",
18 "apikey",
19 "api-key",
20 "secret",
21 "secret_key",
22 "secretkey",
23 "token",
24 "access_token",
25 "refresh_token",
26 "private_key",
27 "privatekey",
28 "credentials",
29 "auth",
30 "authorization",
31 }
32)
35def _sanitize_sensitive_data(data: Any) -> Any:
36 """Remove sensitive keys from data before writing to disk.
38 Args:
39 data: Data to sanitize (dict, list, or primitive)
41 Returns:
42 Sanitized copy of the data with sensitive keys redacted
43 """
44 if isinstance(data, dict):
45 result = {}
46 for key, value in data.items():
47 key_lower = key.lower() if isinstance(key, str) else key
48 if key_lower in SENSITIVE_KEYS:
49 result[key] = "[REDACTED]"
50 else:
51 result[key] = _sanitize_sensitive_data(value)
52 return result
53 elif isinstance(data, list):
54 return [_sanitize_sensitive_data(item) for item in data]
55 else:
56 return data
59class FileWriteSecurityError(Exception):
60 """Raised when a file write operation is not allowed by security settings."""
62 pass
65def write_file_verified(
66 filepath: str | Path,
67 content: str,
68 setting_name: str,
69 required_value: Any = True,
70 context: str = "",
71 mode: str = "w",
72 encoding: str = "utf-8",
73 settings_snapshot: dict = None,
74) -> None:
75 """Write content to a file only if security settings allow it.
77 Args:
78 filepath: Path to the file to write
79 content: Content to write to the file
80 setting_name: Configuration setting name to check (e.g., "api.allow_file_output")
81 required_value: Required value for the setting (default: True)
82 context: Description of what's being written (for error messages)
83 mode: File open mode (default: "w")
84 encoding: File encoding (default: "utf-8")
85 settings_snapshot: Optional settings snapshot for programmatic mode
87 Raises:
88 FileWriteSecurityError: If the security setting doesn't match required value
90 Example:
91 >>> write_file_verified(
92 ... "report.md",
93 ... markdown_content,
94 ... "api.allow_file_output",
95 ... context="API research report"
96 ... )
97 """
98 from ..config.search_config import get_setting_from_snapshot
100 try:
101 actual_value = get_setting_from_snapshot(
102 setting_name, settings_snapshot=settings_snapshot
103 )
104 except Exception:
105 # Setting doesn't exist - default deny
106 actual_value = None
108 if actual_value != required_value: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 error_msg = (
110 f"File write not allowed: {context or 'file operation'}. "
111 f"Set '{setting_name}={required_value}' in config to enable this feature."
112 )
113 logger.warning(error_msg)
114 raise FileWriteSecurityError(error_msg)
116 # Don't pass encoding for binary mode
117 # Note: This function writes non-sensitive data (PDFs, reports) after security check.
118 # CodeQL false positive: content is PDF binary or markdown, not passwords.
119 if "b" in mode: 119 ↛ 123line 119 didn't jump to line 123 because the condition on line 119 was always true
120 with open(filepath, mode) as f: # nosec B603
121 f.write(content)
122 else:
123 with open(filepath, mode, encoding=encoding) as f: # nosec B603
124 f.write(content)
126 logger.debug(
127 f"Verified file write: {filepath} (setting: {setting_name}={required_value})"
128 )
131def write_json_verified(
132 filepath: str | Path,
133 data: dict | list,
134 setting_name: str,
135 required_value: Any = True,
136 context: str = "",
137 settings_snapshot: dict = None,
138 **json_kwargs,
139) -> None:
140 """Write JSON data to a file only if security settings allow it.
142 Args:
143 filepath: Path to the file to write
144 data: Dictionary or list to serialize as JSON
145 setting_name: Configuration setting name to check
146 required_value: Required value for the setting (default: True)
147 context: Description of what's being written (for error messages)
148 settings_snapshot: Optional settings snapshot for programmatic mode
149 **json_kwargs: Additional keyword arguments to pass to json.dumps()
150 (e.g., indent=2, ensure_ascii=False, sort_keys=True, default=custom_serializer)
152 Raises:
153 FileWriteSecurityError: If the security setting doesn't match required value
155 Example:
156 >>> write_json_verified(
157 ... "results.json",
158 ... {"accuracy": 0.95},
159 ... "benchmark.allow_file_output",
160 ... context="benchmark results",
161 ... indent=2,
162 ... sort_keys=True
163 ... )
164 """
165 # Default to indent=2 if not specified for readability
166 if "indent" not in json_kwargs:
167 json_kwargs["indent"] = 2
169 # Sanitize sensitive data before writing to disk
170 sanitized_data = _sanitize_sensitive_data(data)
171 content = json.dumps(sanitized_data, **json_kwargs)
172 write_file_verified(
173 filepath,
174 content,
175 setting_name,
176 required_value,
177 context,
178 mode="w",
179 encoding="utf-8",
180 settings_snapshot=settings_snapshot,
181 )