Coverage for src / local_deep_research / database / initialize.py: 71%
80 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Centralized database initialization module.
4This module provides a single entry point for database initialization.
5In the future, this will be replaced with Alembic migrations for better
6version control and schema evolution.
8TODO: Implement Alembic migrations for production use
9"""
11from typing import Optional
12from loguru import logger
13from sqlalchemy import Engine, inspect
14from sqlalchemy.orm import Session
16from ..database.models import Base
19def initialize_database(
20 engine: Engine,
21 db_session: Optional[Session] = None,
22) -> None:
23 """
24 Initialize database tables if they don't exist.
26 This is a temporary solution until Alembic migrations are implemented.
27 Currently creates all tables defined in the models if they don't exist.
29 Args:
30 engine: SQLAlchemy engine for the database
31 db_session: Optional database session for settings initialization
32 """
33 inspector = inspect(engine)
34 existing_tables = inspector.get_table_names()
36 logger.info(
37 f"Initializing database with {len(existing_tables)} existing tables"
38 )
39 logger.debug(
40 f"Base.metadata has {len(Base.metadata.tables)} tables defined"
41 )
43 # Create all tables (including news tables) - let SQLAlchemy handle dependencies
44 # checkfirst=True ensures existing tables are not recreated
45 logger.info("Creating database tables")
46 Base.metadata.create_all(engine, checkfirst=True)
48 # Run migrations for existing tables
49 _run_migrations(engine)
51 # Check what was created (need new inspector to avoid caching)
52 new_inspector = inspect(engine)
53 new_tables = new_inspector.get_table_names()
54 logger.info(f"After initialization: {len(new_tables)} tables exist")
56 # Initialize default settings if session provided
57 if db_session:
58 try:
59 _initialize_default_settings(db_session)
60 except Exception as e:
61 logger.warning(f"Could not initialize default settings: {e}")
63 logger.info("Database initialization complete")
66def _initialize_default_settings(db_session: Session) -> None:
67 """
68 Initialize default settings from the defaults file.
70 Args:
71 db_session: Database session to use for settings initialization
72 """
73 from ..web.services.settings_manager import SettingsManager
75 try:
76 settings_mgr = SettingsManager(db_session)
78 # Check if we need to update settings
79 if settings_mgr.db_version_matches_package():
80 logger.debug("Settings version matches package, skipping update")
81 return
83 logger.info("Loading default settings into database")
85 # Load settings from defaults file
86 # This will not overwrite existing settings but will add new ones
87 settings_mgr.load_from_defaults_file(overwrite=False, delete_extra=True)
89 # Update the saved version
90 settings_mgr.update_db_version()
92 logger.info("Default settings initialized successfully")
94 except Exception:
95 logger.exception("Error initializing default settings")
98def check_database_schema(engine: Engine) -> dict:
99 """
100 Check the current database schema and return information about tables.
102 Args:
103 engine: SQLAlchemy engine for the database
105 Returns:
106 Dictionary with schema information including tables and their columns
107 """
108 inspector = inspect(engine)
109 schema_info = {
110 "tables": {},
111 "missing_tables": [],
112 "has_news_tables": False,
113 }
115 # Check core tables
116 for table_name in Base.metadata.tables.keys():
117 if inspector.has_table(table_name):
118 columns = [col["name"] for col in inspector.get_columns(table_name)]
119 schema_info["tables"][table_name] = columns
120 else:
121 schema_info["missing_tables"].append(table_name)
123 # Check if news tables exist
124 news_tables = ["news_subscription", "news_card", "news_interest"]
125 for table_name in news_tables:
126 if table_name in schema_info["tables"]: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true
127 schema_info["has_news_tables"] = True
128 break
130 return schema_info
133def _add_column_if_not_exists(
134 engine: Engine,
135 table_name: str,
136 column_name: str,
137 column_type: str,
138 default: str = None,
139) -> bool:
140 """
141 Add a column to a table if it doesn't already exist.
143 Uses SQLAlchemy's DDL capabilities for dialect-aware column addition.
145 Args:
146 engine: SQLAlchemy engine
147 table_name: Name of the table to modify
148 column_name: Name of the column to add
149 column_type: SQLAlchemy-compatible type string (e.g., 'INTEGER', 'TEXT')
150 default: Optional default value clause
152 Returns:
153 True if column was added, False if it already existed
154 """
155 from sqlalchemy.schema import CreateColumn, Column
156 from sqlalchemy import Integer, String
158 inspector = inspect(engine)
159 existing_columns = {
160 col["name"] for col in inspector.get_columns(table_name)
161 }
163 if column_name in existing_columns: 163 ↛ 167line 163 didn't jump to line 167 because the condition on line 163 was always true
164 return False
166 # Build column definition using SQLAlchemy types
167 type_map = {
168 "INTEGER": Integer(),
169 "TEXT": String(),
170 }
171 col_type = type_map.get(column_type.upper(), String())
172 column = Column(column_name, col_type)
174 # Use CreateColumn to get dialect-aware DDL
175 compiled = CreateColumn(column).compile(dialect=engine.dialect)
176 column_def = str(compiled).strip()
178 # Add default clause if specified
179 if default is not None:
180 column_def = f"{column_def} DEFAULT {default}"
182 try:
183 with engine.begin() as conn:
184 # Use DDL class for proper execution
185 from sqlalchemy import DDL
187 ddl = DDL(f"ALTER TABLE {table_name} ADD {column_def}")
188 conn.execute(ddl)
189 logger.info(f"Added column {column_name} to {table_name} table")
190 return True
191 except Exception as e:
192 logger.debug(f"Migration for {column_name} skipped: {e}")
193 return False
196def _run_migrations(engine: Engine) -> None:
197 """
198 Run database migrations to add missing columns to existing tables.
200 This is a simple migration system for adding new columns.
201 For more complex migrations, consider using Alembic.
202 """
203 inspector = inspect(engine)
205 # Migration: Add progress tracking columns to task_metadata
206 if inspector.has_table("task_metadata"): 206 ↛ exitline 206 didn't return from function '_run_migrations' because the condition on line 206 was always true
207 _add_column_if_not_exists(
208 engine, "task_metadata", "progress_current", "INTEGER", "0"
209 )
210 _add_column_if_not_exists(
211 engine, "task_metadata", "progress_total", "INTEGER", "0"
212 )
213 _add_column_if_not_exists(
214 engine, "task_metadata", "progress_message", "TEXT"
215 )
216 _add_column_if_not_exists(
217 engine, "task_metadata", "metadata_json", "TEXT"
218 )