Coverage for src / local_deep_research / database / initialize.py: 71%

80 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Centralized database initialization module. 

3 

4This module provides a single entry point for database initialization. 

5In the future, this will be replaced with Alembic migrations for better 

6version control and schema evolution. 

7 

8TODO: Implement Alembic migrations for production use 

9""" 

10 

11from typing import Optional 

12from loguru import logger 

13from sqlalchemy import Engine, inspect 

14from sqlalchemy.orm import Session 

15 

16from ..database.models import Base 

17 

18 

19def initialize_database( 

20 engine: Engine, 

21 db_session: Optional[Session] = None, 

22) -> None: 

23 """ 

24 Initialize database tables if they don't exist. 

25 

26 This is a temporary solution until Alembic migrations are implemented. 

27 Currently creates all tables defined in the models if they don't exist. 

28 

29 Args: 

30 engine: SQLAlchemy engine for the database 

31 db_session: Optional database session for settings initialization 

32 """ 

33 inspector = inspect(engine) 

34 existing_tables = inspector.get_table_names() 

35 

36 logger.info( 

37 f"Initializing database with {len(existing_tables)} existing tables" 

38 ) 

39 logger.debug( 

40 f"Base.metadata has {len(Base.metadata.tables)} tables defined" 

41 ) 

42 

43 # Create all tables (including news tables) - let SQLAlchemy handle dependencies 

44 # checkfirst=True ensures existing tables are not recreated 

45 logger.info("Creating database tables") 

46 Base.metadata.create_all(engine, checkfirst=True) 

47 

48 # Run migrations for existing tables 

49 _run_migrations(engine) 

50 

51 # Check what was created (need new inspector to avoid caching) 

52 new_inspector = inspect(engine) 

53 new_tables = new_inspector.get_table_names() 

54 logger.info(f"After initialization: {len(new_tables)} tables exist") 

55 

56 # Initialize default settings if session provided 

57 if db_session: 

58 try: 

59 _initialize_default_settings(db_session) 

60 except Exception as e: 

61 logger.warning(f"Could not initialize default settings: {e}") 

62 

63 logger.info("Database initialization complete") 

64 

65 

66def _initialize_default_settings(db_session: Session) -> None: 

67 """ 

68 Initialize default settings from the defaults file. 

69 

70 Args: 

71 db_session: Database session to use for settings initialization 

72 """ 

73 from ..web.services.settings_manager import SettingsManager 

74 

75 try: 

76 settings_mgr = SettingsManager(db_session) 

77 

78 # Check if we need to update settings 

79 if settings_mgr.db_version_matches_package(): 

80 logger.debug("Settings version matches package, skipping update") 

81 return 

82 

83 logger.info("Loading default settings into database") 

84 

85 # Load settings from defaults file 

86 # This will not overwrite existing settings but will add new ones 

87 settings_mgr.load_from_defaults_file(overwrite=False, delete_extra=True) 

88 

89 # Update the saved version 

90 settings_mgr.update_db_version() 

91 

92 logger.info("Default settings initialized successfully") 

93 

94 except Exception: 

95 logger.exception("Error initializing default settings") 

96 

97 

98def check_database_schema(engine: Engine) -> dict: 

99 """ 

100 Check the current database schema and return information about tables. 

101 

102 Args: 

103 engine: SQLAlchemy engine for the database 

104 

105 Returns: 

106 Dictionary with schema information including tables and their columns 

107 """ 

108 inspector = inspect(engine) 

109 schema_info = { 

110 "tables": {}, 

111 "missing_tables": [], 

112 "has_news_tables": False, 

113 } 

114 

115 # Check core tables 

116 for table_name in Base.metadata.tables.keys(): 

117 if inspector.has_table(table_name): 

118 columns = [col["name"] for col in inspector.get_columns(table_name)] 

119 schema_info["tables"][table_name] = columns 

120 else: 

121 schema_info["missing_tables"].append(table_name) 

122 

123 # Check if news tables exist 

124 news_tables = ["news_subscription", "news_card", "news_interest"] 

125 for table_name in news_tables: 

126 if table_name in schema_info["tables"]: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true

127 schema_info["has_news_tables"] = True 

128 break 

129 

130 return schema_info 

131 

132 

133def _add_column_if_not_exists( 

134 engine: Engine, 

135 table_name: str, 

136 column_name: str, 

137 column_type: str, 

138 default: str = None, 

139) -> bool: 

140 """ 

141 Add a column to a table if it doesn't already exist. 

142 

143 Uses SQLAlchemy's DDL capabilities for dialect-aware column addition. 

144 

145 Args: 

146 engine: SQLAlchemy engine 

147 table_name: Name of the table to modify 

148 column_name: Name of the column to add 

149 column_type: SQLAlchemy-compatible type string (e.g., 'INTEGER', 'TEXT') 

150 default: Optional default value clause 

151 

152 Returns: 

153 True if column was added, False if it already existed 

154 """ 

155 from sqlalchemy.schema import CreateColumn, Column 

156 from sqlalchemy import Integer, String 

157 

158 inspector = inspect(engine) 

159 existing_columns = { 

160 col["name"] for col in inspector.get_columns(table_name) 

161 } 

162 

163 if column_name in existing_columns: 163 ↛ 167line 163 didn't jump to line 167 because the condition on line 163 was always true

164 return False 

165 

166 # Build column definition using SQLAlchemy types 

167 type_map = { 

168 "INTEGER": Integer(), 

169 "TEXT": String(), 

170 } 

171 col_type = type_map.get(column_type.upper(), String()) 

172 column = Column(column_name, col_type) 

173 

174 # Use CreateColumn to get dialect-aware DDL 

175 compiled = CreateColumn(column).compile(dialect=engine.dialect) 

176 column_def = str(compiled).strip() 

177 

178 # Add default clause if specified 

179 if default is not None: 

180 column_def = f"{column_def} DEFAULT {default}" 

181 

182 try: 

183 with engine.begin() as conn: 

184 # Use DDL class for proper execution 

185 from sqlalchemy import DDL 

186 

187 ddl = DDL(f"ALTER TABLE {table_name} ADD {column_def}") 

188 conn.execute(ddl) 

189 logger.info(f"Added column {column_name} to {table_name} table") 

190 return True 

191 except Exception as e: 

192 logger.debug(f"Migration for {column_name} skipped: {e}") 

193 return False 

194 

195 

196def _run_migrations(engine: Engine) -> None: 

197 """ 

198 Run database migrations to add missing columns to existing tables. 

199 

200 This is a simple migration system for adding new columns. 

201 For more complex migrations, consider using Alembic. 

202 """ 

203 inspector = inspect(engine) 

204 

205 # Migration: Add progress tracking columns to task_metadata 

206 if inspector.has_table("task_metadata"): 206 ↛ exitline 206 didn't return from function '_run_migrations' because the condition on line 206 was always true

207 _add_column_if_not_exists( 

208 engine, "task_metadata", "progress_current", "INTEGER", "0" 

209 ) 

210 _add_column_if_not_exists( 

211 engine, "task_metadata", "progress_total", "INTEGER", "0" 

212 ) 

213 _add_column_if_not_exists( 

214 engine, "task_metadata", "progress_message", "TEXT" 

215 ) 

216 _add_column_if_not_exists( 

217 engine, "task_metadata", "metadata_json", "TEXT" 

218 )