Coverage for src / local_deep_research / security / file_write_verifier.py: 36%

41 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1"""Security module for verified file write operations. 

2 

3This module ensures that file writes only occur when explicitly allowed by configuration, 

4maintaining the encryption-at-rest security model. 

5""" 

6 

7import json 

8from pathlib import Path 

9from typing import Any 

10 

11from loguru import logger 

12 

13# Keys that should never be written to disk in clear text 

14SENSITIVE_KEYS = frozenset( 

15 { 

16 "password", 

17 "api_key", 

18 "apikey", 

19 "api-key", 

20 "secret", 

21 "secret_key", 

22 "secretkey", 

23 "token", 

24 "access_token", 

25 "refresh_token", 

26 "private_key", 

27 "privatekey", 

28 "credentials", 

29 "auth", 

30 "authorization", 

31 } 

32) 

33 

34 

35def _sanitize_sensitive_data(data: Any) -> Any: 

36 """Remove sensitive keys from data before writing to disk. 

37 

38 Args: 

39 data: Data to sanitize (dict, list, or primitive) 

40 

41 Returns: 

42 Sanitized copy of the data with sensitive keys redacted 

43 """ 

44 if isinstance(data, dict): 

45 result = {} 

46 for key, value in data.items(): 

47 key_lower = key.lower() if isinstance(key, str) else key 

48 if key_lower in SENSITIVE_KEYS: 

49 result[key] = "[REDACTED]" 

50 else: 

51 result[key] = _sanitize_sensitive_data(value) 

52 return result 

53 elif isinstance(data, list): 

54 return [_sanitize_sensitive_data(item) for item in data] 

55 else: 

56 return data 

57 

58 

59class FileWriteSecurityError(Exception): 

60 """Raised when a file write operation is not allowed by security settings.""" 

61 

62 pass 

63 

64 

65def write_file_verified( 

66 filepath: str | Path, 

67 content: str, 

68 setting_name: str, 

69 required_value: Any = True, 

70 context: str = "", 

71 mode: str = "w", 

72 encoding: str = "utf-8", 

73 settings_snapshot: dict = None, 

74) -> None: 

75 """Write content to a file only if security settings allow it. 

76 

77 Args: 

78 filepath: Path to the file to write 

79 content: Content to write to the file 

80 setting_name: Configuration setting name to check (e.g., "api.allow_file_output") 

81 required_value: Required value for the setting (default: True) 

82 context: Description of what's being written (for error messages) 

83 mode: File open mode (default: "w") 

84 encoding: File encoding (default: "utf-8") 

85 settings_snapshot: Optional settings snapshot for programmatic mode 

86 

87 Raises: 

88 FileWriteSecurityError: If the security setting doesn't match required value 

89 

90 Example: 

91 >>> write_file_verified( 

92 ... "report.md", 

93 ... markdown_content, 

94 ... "api.allow_file_output", 

95 ... context="API research report" 

96 ... ) 

97 """ 

98 from ..config.search_config import get_setting_from_snapshot 

99 

100 try: 

101 actual_value = get_setting_from_snapshot( 

102 setting_name, settings_snapshot=settings_snapshot 

103 ) 

104 except Exception: 

105 # Setting doesn't exist - default deny 

106 actual_value = None 

107 

108 if actual_value != required_value: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 error_msg = ( 

110 f"File write not allowed: {context or 'file operation'}. " 

111 f"Set '{setting_name}={required_value}' in config to enable this feature." 

112 ) 

113 logger.warning(error_msg) 

114 raise FileWriteSecurityError(error_msg) 

115 

116 # Don't pass encoding for binary mode 

117 # Note: This function writes non-sensitive data (PDFs, reports) after security check. 

118 # CodeQL false positive: content is PDF binary or markdown, not passwords. 

119 if "b" in mode: 119 ↛ 123line 119 didn't jump to line 123 because the condition on line 119 was always true

120 with open(filepath, mode) as f: # nosec B603 

121 f.write(content) 

122 else: 

123 with open(filepath, mode, encoding=encoding) as f: # nosec B603 

124 f.write(content) 

125 

126 logger.debug( 

127 f"Verified file write: {filepath} (setting: {setting_name}={required_value})" 

128 ) 

129 

130 

131def write_json_verified( 

132 filepath: str | Path, 

133 data: dict | list, 

134 setting_name: str, 

135 required_value: Any = True, 

136 context: str = "", 

137 settings_snapshot: dict = None, 

138 **json_kwargs, 

139) -> None: 

140 """Write JSON data to a file only if security settings allow it. 

141 

142 Args: 

143 filepath: Path to the file to write 

144 data: Dictionary or list to serialize as JSON 

145 setting_name: Configuration setting name to check 

146 required_value: Required value for the setting (default: True) 

147 context: Description of what's being written (for error messages) 

148 settings_snapshot: Optional settings snapshot for programmatic mode 

149 **json_kwargs: Additional keyword arguments to pass to json.dumps() 

150 (e.g., indent=2, ensure_ascii=False, sort_keys=True, default=custom_serializer) 

151 

152 Raises: 

153 FileWriteSecurityError: If the security setting doesn't match required value 

154 

155 Example: 

156 >>> write_json_verified( 

157 ... "results.json", 

158 ... {"accuracy": 0.95}, 

159 ... "benchmark.allow_file_output", 

160 ... context="benchmark results", 

161 ... indent=2, 

162 ... sort_keys=True 

163 ... ) 

164 """ 

165 # Default to indent=2 if not specified for readability 

166 if "indent" not in json_kwargs: 

167 json_kwargs["indent"] = 2 

168 

169 # Sanitize sensitive data before writing to disk 

170 sanitized_data = _sanitize_sensitive_data(data) 

171 content = json.dumps(sanitized_data, **json_kwargs) 

172 write_file_verified( 

173 filepath, 

174 content, 

175 setting_name, 

176 required_value, 

177 context, 

178 mode="w", 

179 encoding="utf-8", 

180 settings_snapshot=settings_snapshot, 

181 )