Coverage for src / local_deep_research / security / file_write_verifier.py: 100%

41 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1"""Security module for verified file write operations. 

2 

3This module ensures that file writes only occur when explicitly allowed by configuration, 

4maintaining the encryption-at-rest security model. 

5""" 

6 

7import json 

8from pathlib import Path 

9from typing import Any 

10 

11from loguru import logger 

12 

13# Keys that should never be written to disk in clear text 

14SENSITIVE_KEYS = frozenset( 

15 { 

16 "password", 

17 "api_key", 

18 "apikey", 

19 "api-key", 

20 "secret", 

21 "secret_key", 

22 "secretkey", 

23 "token", 

24 "access_token", 

25 "refresh_token", 

26 "private_key", 

27 "privatekey", 

28 "credentials", 

29 "auth", 

30 "authorization", 

31 } 

32) 

33 

34 

35def _sanitize_sensitive_data(data: Any) -> Any: 

36 """Remove sensitive keys from data before writing to disk. 

37 

38 Args: 

39 data: Data to sanitize (dict, list, or primitive) 

40 

41 Returns: 

42 Sanitized copy of the data with sensitive keys redacted 

43 """ 

44 if isinstance(data, dict): 

45 result = {} 

46 for key, value in data.items(): 

47 key_lower = key.lower() if isinstance(key, str) else key 

48 if key_lower in SENSITIVE_KEYS: 

49 result[key] = "[REDACTED]" 

50 else: 

51 result[key] = _sanitize_sensitive_data(value) 

52 return result 

53 if isinstance(data, list): 

54 return [_sanitize_sensitive_data(item) for item in data] 

55 return data 

56 

57 

58class FileWriteSecurityError(Exception): 

59 """Raised when a file write operation is not allowed by security settings.""" 

60 

61 pass 

62 

63 

64def write_file_verified( 

65 filepath: str | Path, 

66 content: str, 

67 setting_name: str, 

68 required_value: Any = True, 

69 context: str = "", 

70 mode: str = "w", 

71 encoding: str = "utf-8", 

72 settings_snapshot: dict = None, 

73) -> None: 

74 """Write content to a file only if security settings allow it. 

75 

76 Args: 

77 filepath: Path to the file to write 

78 content: Content to write to the file 

79 setting_name: Configuration setting name to check (e.g., "api.allow_file_output") 

80 required_value: Required value for the setting (default: True) 

81 context: Description of what's being written (for error messages) 

82 mode: File open mode (default: "w") 

83 encoding: File encoding (default: "utf-8") 

84 settings_snapshot: Optional settings snapshot for programmatic mode 

85 

86 Raises: 

87 FileWriteSecurityError: If the security setting doesn't match required value 

88 

89 Example: 

90 >>> write_file_verified( 

91 ... "report.md", 

92 ... markdown_content, 

93 ... "api.allow_file_output", 

94 ... context="API research report" 

95 ... ) 

96 """ 

97 from ..config.search_config import get_setting_from_snapshot 

98 

99 try: 

100 actual_value = get_setting_from_snapshot( 

101 setting_name, settings_snapshot=settings_snapshot 

102 ) 

103 except Exception: 

104 # Setting doesn't exist - default deny 

105 actual_value = None 

106 

107 if actual_value != required_value: 

108 error_msg = ( 

109 f"File write not allowed: {context or 'file operation'}. " 

110 f"Set '{setting_name}={required_value}' in config to enable this feature." 

111 ) 

112 logger.warning(error_msg) 

113 raise FileWriteSecurityError(error_msg) 

114 

115 # Don't pass encoding for binary mode 

116 # Note: This function writes non-sensitive data (PDFs, reports) after security check. 

117 # CodeQL false positive: content is PDF binary or markdown, not passwords. 

118 if "b" in mode: 

119 with open(filepath, mode) as f: # nosec B603 

120 f.write(content) 

121 else: 

122 with open(filepath, mode, encoding=encoding) as f: # nosec B603 

123 f.write(content) 

124 

125 logger.debug( 

126 f"Verified file write: {filepath} (setting: {setting_name}={required_value})" 

127 ) 

128 

129 

130def write_json_verified( 

131 filepath: str | Path, 

132 data: dict | list, 

133 setting_name: str, 

134 required_value: Any = True, 

135 context: str = "", 

136 settings_snapshot: dict = None, 

137 **json_kwargs, 

138) -> None: 

139 """Write JSON data to a file only if security settings allow it. 

140 

141 Args: 

142 filepath: Path to the file to write 

143 data: Dictionary or list to serialize as JSON 

144 setting_name: Configuration setting name to check 

145 required_value: Required value for the setting (default: True) 

146 context: Description of what's being written (for error messages) 

147 settings_snapshot: Optional settings snapshot for programmatic mode 

148 **json_kwargs: Additional keyword arguments to pass to json.dumps() 

149 (e.g., indent=2, ensure_ascii=False, sort_keys=True, default=custom_serializer) 

150 

151 Raises: 

152 FileWriteSecurityError: If the security setting doesn't match required value 

153 

154 Example: 

155 >>> write_json_verified( 

156 ... "results.json", 

157 ... {"accuracy": 0.95}, 

158 ... "benchmark.allow_file_output", 

159 ... context="benchmark results", 

160 ... indent=2, 

161 ... sort_keys=True 

162 ... ) 

163 """ 

164 # Default to indent=2 if not specified for readability 

165 if "indent" not in json_kwargs: 

166 json_kwargs["indent"] = 2 

167 

168 # Sanitize sensitive data before writing to disk 

169 sanitized_data = _sanitize_sensitive_data(data) 

170 content = json.dumps(sanitized_data, **json_kwargs) 

171 write_file_verified( 

172 filepath, 

173 content, 

174 setting_name, 

175 required_value, 

176 context, 

177 mode="w", 

178 encoding="utf-8", 

179 settings_snapshot=settings_snapshot, 

180 )