Skip to main content

kipuka/db/
mod.rs

1//! Database access layer.
2//!
3//! Provides connection pool initialization, migration execution, and
4//! query helpers for the three supported backends: SQLite, PostgreSQL,
5//! and MariaDB.
6//!
7//! The shared connection pool type [`Db`] is a runtime-dispatch `AnyPool`
8//! that routes queries to the configured backend.  All write transactions
9//! on SQLite should go through [`begin_write`] rather than `pool.begin()`
10//! to avoid `SQLITE_BUSY_SNAPSHOT` in WAL mode.
11
12pub mod schema;
13
14use crate::config::DbConfig;
15use crate::error::KipukaError;
16
17/// Type alias for the shared connection pool (runtime-dispatch Any backend).
18pub type Db = sqlx::AnyPool;
19
20/// Which database backend is active.
21///
22/// Drives `begin_write` only: SQLite requires `BEGIN IMMEDIATE` to prevent
23/// `SQLITE_BUSY_SNAPSHOT` (error 517) in WAL mode; PostgreSQL and MariaDB use
24/// standard MVCC and need only `BEGIN`.
25#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26pub enum DbKind {
27    Sqlite,
28    Postgres,
29    MariaDb,
30}
31
32impl DbKind {
33    /// Infer the backend from the connection URL scheme.
34    pub fn from_url(url: &str) -> Self {
35        if url.starts_with("postgres") {
36            DbKind::Postgres
37        } else if url.starts_with("mariadb") || url.starts_with("mysql") {
38            DbKind::MariaDb
39        } else {
40            DbKind::Sqlite
41        }
42    }
43}
44
45/// Global PostgreSQL flag for parameter rewriting.
46///
47/// sqlx 0.8's `AnyPool` does not reliably rewrite `?` parameter placeholders
48/// to `$N` for PostgreSQL.  Call [`pg_sql`] on every SQL string that contains
49/// `?` before passing it to sqlx.
50static IS_POSTGRES: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
51
52/// Rewrite `?` → `$1`, `$2`, … for PostgreSQL; return unchanged for others.
53///
54/// The rewritten string is cached permanently (via `Box::leak`) keyed by the
55/// pointer identity of the static string literal, so each unique query string
56/// is rewritten at most once.
57#[allow(dead_code)]
58pub(crate) fn pg_sql(s: &'static str) -> &'static str {
59    if !IS_POSTGRES.get().copied().unwrap_or(false) {
60        return s;
61    }
62    static CACHE: std::sync::OnceLock<
63        std::sync::Mutex<std::collections::HashMap<usize, &'static str>>,
64    > = std::sync::OnceLock::new();
65    let cache = CACHE.get_or_init(|| std::sync::Mutex::new(std::collections::HashMap::new()));
66    let key = s.as_ptr() as usize;
67    {
68        let guard = cache.lock().unwrap();
69        if let Some(&cached) = guard.get(&key) {
70            return cached;
71        }
72    }
73    // Slow path: rewrite then store for the lifetime of the process.
74    let mut result = String::with_capacity(s.len() + 16);
75    let mut param_num = 0u32;
76    for ch in s.chars() {
77        if ch == '?' {
78            param_num += 1;
79            result.push('$');
80            result.push_str(&param_num.to_string());
81        } else {
82            result.push(ch);
83        }
84    }
85    let leaked: &'static str = Box::leak(result.into_boxed_str());
86    cache.lock().unwrap().insert(key, leaked);
87    leaked
88}
89
90/// Initialize the primary (read-write) database connection pool.
91pub async fn init_pool(config: &DbConfig) -> Result<(Db, DbKind), KipukaError> {
92    let url = config.resolve_url().map_err(KipukaError::Config)?;
93
94    let kind = DbKind::from_url(&url);
95    let _ = IS_POSTGRES.set(kind == DbKind::Postgres);
96
97    let pool_opts = sqlx::any::AnyPoolOptions::new()
98        .acquire_timeout(std::time::Duration::from_secs(config.connect_timeout_secs))
99        .max_lifetime(std::time::Duration::from_secs(config.max_lifetime_secs));
100
101    let pool_opts = if let Some(max) = config.max_connections {
102        pool_opts.max_connections(max)
103    } else {
104        match kind {
105            DbKind::Sqlite => pool_opts.max_connections(1),
106            _ => pool_opts.max_connections(10),
107        }
108    };
109
110    let pool_opts = if let Some(min) = config.min_connections {
111        pool_opts.min_connections(min)
112    } else {
113        pool_opts
114    };
115
116    let pool = pool_opts
117        .connect(&url)
118        .await
119        .map_err(|e| KipukaError::Db(format!("failed to connect to database: {e}")))?;
120
121    // Enable WAL mode for SQLite
122    if kind == DbKind::Sqlite && config.sqlite_wal {
123        sqlx::query("PRAGMA journal_mode=WAL")
124            .execute(&pool)
125            .await
126            .map_err(|e| KipukaError::Db(format!("failed to enable WAL mode: {e}")))?;
127    }
128
129    Ok((pool, kind))
130}
131
132/// Initialize a read-only connection pool for GET handlers.
133///
134/// For SQLite file-backed databases, this opens a `?mode=ro` pool that
135/// never acquires the write lock, enabling concurrent reads during writes
136/// (WAL concurrency benefit).  For `:memory:` and non-SQLite backends,
137/// returns a clone of the primary pool.
138pub async fn init_ro_pool(config: &DbConfig, kind: DbKind) -> Result<Db, KipukaError> {
139    let url = config.resolve_url().map_err(KipukaError::Config)?;
140
141    // Only SQLite file-backed databases benefit from a separate RO pool
142    if kind != DbKind::Sqlite || url.contains(":memory:") {
143        // For non-SQLite or in-memory: caller should clone the primary pool
144        let pool = sqlx::any::AnyPoolOptions::new()
145            .max_connections(1)
146            .connect(&url)
147            .await
148            .map_err(|e| KipukaError::Db(format!("failed to connect RO pool: {e}")))?;
149        return Ok(pool);
150    }
151
152    // Build a read-only URL for SQLite
153    let ro_url = if url.contains('?') {
154        format!("{url}&mode=ro")
155    } else {
156        format!("{url}?mode=ro")
157    };
158
159    let pool = sqlx::any::AnyPoolOptions::new()
160        .max_connections(4)
161        .connect(&ro_url)
162        .await
163        .map_err(|e| KipukaError::Db(format!("failed to connect RO pool: {e}")))?;
164
165    Ok(pool)
166}
167
168/// Begin a write transaction.
169///
170/// SQLite uses `BEGIN IMMEDIATE` to avoid `SQLITE_BUSY_SNAPSHOT` under WAL
171/// mode; other backends use standard `BEGIN`.
172pub async fn begin_write(
173    pool: &Db,
174    kind: DbKind,
175) -> Result<sqlx::Transaction<'_, sqlx::Any>, KipukaError> {
176    if kind == DbKind::Sqlite {
177        // SQLite: BEGIN IMMEDIATE prevents SQLITE_BUSY_SNAPSHOT
178        sqlx::query("BEGIN IMMEDIATE")
179            .execute(pool)
180            .await
181            .map_err(|e| KipukaError::Db(format!("BEGIN IMMEDIATE failed: {e}")))?;
182    }
183    let tx = pool
184        .begin()
185        .await
186        .map_err(|e| KipukaError::Db(format!("begin transaction failed: {e}")))?;
187    Ok(tx)
188}
189
190/// Run pending database migrations.
191pub async fn run_migrations(pool: &Db, kind: DbKind) -> Result<(), KipukaError> {
192    crate::db::schema::run_migrations(pool, kind).await
193}