Add disk-backed SQLite cache mode as an alternative to in-memory

This commit is contained in:
Jan Doubravský
2026-06-08 11:39:04 +02:00
parent 757a8f4eba
commit 209ae667ab
10 changed files with 280 additions and 67 deletions
+123 -59
View File
@@ -23,30 +23,46 @@ class _Index:
class CacheManager:
def __init__(self, db_path: Path, backup_interval: int) -> None:
def __init__(
self, db_path: Path, backup_interval: int, in_memory: bool = True
) -> None:
self._db_path = db_path
self._backup_interval = backup_interval
self._mem_conn = sqlite3.connect(":memory:", check_same_thread=False)
self._in_memory = in_memory
self._lock = threading.Lock() # serializes connection access
self._load_lock = threading.Lock() # serializes full table loads
self._states: dict[str, str] = {} # table → live processing state
self._index_defs: dict[str, list[_Index]] = {} # table → secondary indexes
self._closed = False
self._ensure_meta_tables()
self._load_from_disk()
self._drop_orphan_staging()
self._start_backup_thread()
if in_memory:
self._conn = sqlite3.connect(":memory:", check_same_thread=False)
else:
# Disk-backed: query the on-disk file directly — no RAM copy, every
# write persists immediately, and the cache can exceed available RAM.
self._conn = sqlite3.connect(str(db_path), check_same_thread=False)
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA synchronous=NORMAL")
self._discard_if_schema_mismatch()
atexit.register(self._backup_to_disk)
signal.signal(signal.SIGTERM, self._on_sigterm)
self._ensure_meta_tables()
if in_memory:
self._load_from_disk()
self._drop_orphan_staging()
if in_memory:
self._start_backup_thread()
atexit.register(self._backup_to_disk)
signal.signal(signal.SIGTERM, self._on_sigterm)
else:
atexit.register(self.close)
@property
def connection(self) -> sqlite3.Connection:
return self._mem_conn
return self._conn
def _ensure_meta_tables(self) -> None:
self._mem_conn.executescript("""
self._conn.executescript("""
CREATE TABLE IF NOT EXISTS _sqlmem_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
@@ -64,19 +80,52 @@ class CacheManager:
PRIMARY KEY (table_name, column_name)
);
""")
self._mem_conn.execute(
self._conn.execute(
"INSERT OR IGNORE INTO _sqlmem_meta (key, value) VALUES (?, ?)",
("app_version", _meta.__version__),
)
self._mem_conn.execute(
self._conn.execute(
"INSERT OR IGNORE INTO _sqlmem_meta (key, value) VALUES (?, ?)",
("schema_version", str(SCHEMA_VERSION)),
)
self._mem_conn.execute(
self._conn.execute(
"INSERT OR IGNORE INTO _sqlmem_meta (key, value) VALUES (?, ?)",
("created_at", _now()),
)
self._mem_conn.commit()
self._conn.commit()
def _discard_if_schema_mismatch(self) -> None:
"""Disk mode: wipe an existing cache file written by an incompatible schema.
In memory mode the equivalent check lives in :meth:`_load_from_disk`; here
we operate on the live on-disk connection, dropping every table so the
meta tables are recreated fresh by :meth:`_ensure_meta_tables`.
"""
meta_exists = self._conn.execute(
"SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = '_sqlmem_meta'"
).fetchone()
if not meta_exists:
return # fresh file — nothing to validate
row = self._conn.execute(
"SELECT value FROM _sqlmem_meta WHERE key = 'schema_version'"
).fetchone()
if row is not None and int(row[0]) == SCHEMA_VERSION:
return
logger.warning(
"Cache schema version mismatch — wiping on-disk cache, starting fresh."
)
names = [
r[0]
for r in self._conn.execute(
r"SELECT name FROM sqlite_master WHERE type = 'table' "
r"AND name NOT LIKE 'sqlite\_%' ESCAPE '\'"
).fetchall()
]
for name in names:
self._conn.execute(f"DROP TABLE IF EXISTS {name}")
self._conn.commit()
def _load_from_disk(self) -> None:
if not self._db_path.exists():
@@ -94,7 +143,7 @@ class CacheManager:
disk_conn.close()
return
disk_conn.backup(self._mem_conn)
disk_conn.backup(self._conn)
logger.info("Cache loaded from disk successfully.")
except Exception as e:
logger.error(f"Failed to load cache from disk: {e} — starting fresh.")
@@ -105,25 +154,30 @@ class CacheManager:
"""Drop staging tables left by a load that was interrupted (e.g. crash mid-load)."""
orphans = [
r[0]
for r in self._mem_conn.execute(
for r in self._conn.execute(
r"SELECT name FROM sqlite_master "
r"WHERE type = 'table' AND name LIKE '%\_\_sqlmem\_load' ESCAPE '\'"
).fetchall()
]
for name in orphans:
logger.warning(f"Dropping orphan staging table {name!r} from a previous interrupted load.")
self._mem_conn.execute(f"DROP TABLE IF EXISTS {name}")
self._conn.execute(f"DROP TABLE IF EXISTS {name}")
if orphans:
self._mem_conn.commit()
self._conn.commit()
def _backup_to_disk(self) -> None:
if self._closed:
return
if not self._in_memory:
# Disk-backed: every write already lands on disk; just flush the WAL.
with self._lock:
self._conn.commit()
return
logger.info(f"Backing up cache to {self._db_path}")
try:
with self._lock:
disk_conn = sqlite3.connect(self._db_path)
self._mem_conn.backup(disk_conn)
self._conn.backup(disk_conn)
disk_conn.close()
logger.info("Cache backup complete.")
except Exception as e:
@@ -145,7 +199,7 @@ class CacheManager:
def mark_table_refreshed(self, table: str, row_count: int, full: bool = False) -> None:
with self._lock:
self._mem_conn.execute(
self._conn.execute(
"""
INSERT INTO _sqlmem_tables (table_name, last_refresh_at, row_count, is_full)
VALUES (?, ?, ?, ?)
@@ -156,24 +210,24 @@ class CacheManager:
""",
(table, _now(), row_count, int(full)),
)
self._mem_conn.commit()
self._conn.commit()
def is_table_cached(self, table: str) -> bool:
row = self._mem_conn.execute(
row = self._conn.execute(
"SELECT 1 FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
return row is not None
def is_table_full(self, table: str) -> bool:
"""True if the whole table (all columns) is cached — a SELECT * cache hit."""
row = self._mem_conn.execute(
row = self._conn.execute(
"SELECT is_full FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
return bool(row and row[0])
def seconds_since_refresh(self, table: str) -> float | None:
"""Age of a cached table in seconds, or None if it is not cached."""
row = self._mem_conn.execute(
row = self._conn.execute(
"SELECT last_refresh_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
if not row or not row[0]:
@@ -216,10 +270,10 @@ class CacheManager:
continue
cols = ", ".join(idx.columns)
with self._lock:
self._mem_conn.execute(
self._conn.execute(
f"CREATE INDEX IF NOT EXISTS {idx.name} ON {table} ({cols})"
)
self._mem_conn.commit()
self._conn.commit()
logger.debug(f"Index {idx.name!r} ready on {table} ({cols})")
def load_table(
@@ -248,9 +302,9 @@ class CacheManager:
try:
cursor = source_conn.execute(f"SELECT {cols} FROM {table}")
with self._lock:
self._mem_conn.execute(f"DROP TABLE IF EXISTS {staging}")
self._mem_conn.execute(f"CREATE TABLE {staging} ({col_defs})")
self._mem_conn.commit()
self._conn.execute(f"DROP TABLE IF EXISTS {staging}")
self._conn.execute(f"CREATE TABLE {staging} ({col_defs})")
self._conn.commit()
total = 0
insert_sql = f"INSERT INTO {staging} VALUES ({placeholders})"
@@ -260,18 +314,18 @@ class CacheManager:
break
clean = [coerce_row(row) for row in batch]
with self._lock:
self._mem_conn.executemany(insert_sql, clean)
self._mem_conn.commit()
self._conn.executemany(insert_sql, clean)
self._conn.commit()
total += len(batch)
with self._lock: # atomic swap — readers see old or new, never partial
self._mem_conn.execute(f"DROP TABLE IF EXISTS {table}")
self._mem_conn.execute(f"ALTER TABLE {staging} RENAME TO {table}")
self._mem_conn.commit()
self._conn.execute(f"DROP TABLE IF EXISTS {table}")
self._conn.execute(f"ALTER TABLE {staging} RENAME TO {table}")
self._conn.commit()
except BaseException:
with self._lock:
self._mem_conn.execute(f"DROP TABLE IF EXISTS {staging}")
self._mem_conn.commit()
self._conn.execute(f"DROP TABLE IF EXISTS {staging}")
self._conn.commit()
self.set_state(table, TableState.ERROR)
raise
@@ -286,7 +340,7 @@ class CacheManager:
"""Run a read query against the in-memory cache, serialized with writers."""
bound = coerce_params(params)
with self._lock:
cursor = self._mem_conn.execute(sql) if bound is None else self._mem_conn.execute(sql, bound)
cursor = self._conn.execute(sql) if bound is None else self._conn.execute(sql, bound)
col_names = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
return col_names, rows
@@ -295,7 +349,7 @@ class CacheManager:
def get_table_columns(self, table: str) -> list[str]:
"""Authoritative ordered column list of a cached table (via PRAGMA)."""
rows = self._mem_conn.execute(f"PRAGMA table_info({table})").fetchall()
rows = self._conn.execute(f"PRAGMA table_info({table})").fetchall()
return [r[1] for r in rows]
def create_unique_index(self, table: str, key_columns: list[str]) -> None:
@@ -303,28 +357,28 @@ class CacheManager:
cols = ", ".join(key_columns)
index = f"idx_{table}_pk"
with self._lock:
self._mem_conn.execute(
self._conn.execute(
f"CREATE UNIQUE INDEX IF NOT EXISTS {index} ON {table} ({cols})"
)
self._mem_conn.commit()
self._conn.commit()
def get_last_synced_at(self, table: str) -> str | None:
row = self._mem_conn.execute(
row = self._conn.execute(
"SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
return row[0] if row else None
def set_last_synced_at(self, table: str, value: str | None) -> None:
with self._lock:
self._mem_conn.execute(
self._conn.execute(
"UPDATE _sqlmem_tables SET last_synced_at = ? WHERE table_name = ?",
(value, table),
)
self._mem_conn.commit()
self._conn.commit()
def max_value(self, table: str, column: str) -> str | None:
"""Maximum value of *column* across cached rows (the delta watermark)."""
row = self._mem_conn.execute(f"SELECT MAX({column}) FROM {table}").fetchone()
row = self._conn.execute(f"SELECT MAX({column}) FROM {table}").fetchone()
return row[0] if row else None
def upsert_rows(self, table: str, columns: list[str], rows: list[tuple]) -> None:
@@ -333,44 +387,54 @@ class CacheManager:
placeholders = ", ".join("?" * len(columns))
clean_rows = [coerce_row(row) for row in rows]
with self._lock:
self._mem_conn.executemany(
self._conn.executemany(
f"INSERT OR REPLACE INTO {table} ({col_list}) VALUES ({placeholders})",
clean_rows,
)
self._mem_conn.commit()
self._conn.commit()
def count_rows(self, table: str) -> int:
row = self._mem_conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()
row = self._conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()
return int(row[0]) if row else 0
def reset(self) -> None:
"""Wipe the entire cache — every cached table plus the on-disk file."""
"""Wipe the entire cache — every cached table plus the on-disk data
(the file is deleted in memory mode, VACUUMed in place in disk mode)."""
logger.info("Resetting cache — dropping all cached tables.")
with self._lock:
user_tables = [
r[0]
for r in self._mem_conn.execute(
for r in self._conn.execute(
"SELECT name FROM sqlite_master "
r"WHERE type = 'table' AND name NOT LIKE 'sqlite\_%' ESCAPE '\' "
r"AND name NOT LIKE '\_sqlmem\_%' ESCAPE '\'"
).fetchall()
]
for name in user_tables:
self._mem_conn.execute(f"DROP TABLE IF EXISTS {name}")
self._mem_conn.execute("DELETE FROM _sqlmem_tables")
self._mem_conn.execute("DELETE FROM _sqlmem_columns")
self._mem_conn.commit()
self._conn.execute(f"DROP TABLE IF EXISTS {name}")
self._conn.execute("DELETE FROM _sqlmem_tables")
self._conn.execute("DELETE FROM _sqlmem_columns")
self._conn.commit()
self._states.clear()
try:
if self._db_path.exists():
self._db_path.unlink()
except OSError as e:
logger.error(f"Failed to delete cache file {self._db_path}: {e}")
if self._in_memory:
try:
if self._db_path.exists():
self._db_path.unlink()
except OSError as e:
logger.error(f"Failed to delete cache file {self._db_path}: {e}")
else:
# The open connection *is* the file — drop tables persisted the wipe;
# VACUUM reclaims the freed pages on disk.
try:
with self._lock:
self._conn.execute("VACUUM")
except sqlite3.Error as e:
logger.error(f"Failed to VACUUM cache file {self._db_path}: {e}")
def close(self) -> None:
self._backup_to_disk()
self._closed = True
self._mem_conn.close()
self._conn.close()
def _now() -> str:
+3
View File
@@ -8,6 +8,9 @@ load_dotenv()
DEBUG = os.getenv("SQLMEM_DEBUG", "false").lower() == "true"
CACHE_DB_PATH = Path(os.getenv("SQLMEM_CACHE_DB", "cache.db"))
# Cache backend: in-memory SQLite (default) backed up to disk periodically, or
# query the on-disk SQLite file directly (no RAM copy, every write persists).
IN_MEMORY = os.getenv("SQLMEM_IN_MEMORY", "true").lower() == "true"
BACKUP_INTERVAL_SECONDS = int(os.getenv("SQLMEM_BACKUP_INTERVAL", "3600"))
# How often (seconds) the background thread pulls deltas for delta-tracked tables.
REFRESH_INTERVAL_SECONDS = int(os.getenv("SQLMEM_REFRESH_INTERVAL", "300"))
+11 -2
View File
@@ -8,7 +8,12 @@ from sqlalchemy import inspect
from sqlalchemy.engine import Engine
from .cache import CacheManager
from .config import BACKUP_INTERVAL_SECONDS, CACHE_DB_PATH, REFRESH_INTERVAL_SECONDS
from .config import (
BACKUP_INTERVAL_SECONDS,
CACHE_DB_PATH,
IN_MEMORY,
REFRESH_INTERVAL_SECONDS,
)
from .delta import DeltaConfig, DeltaRefresher, ResolvedDelta
from .executor import QueryExecutor
from .parser import Params, parse
@@ -25,9 +30,13 @@ class CachingEngine:
delta: dict[str, DeltaConfig] | None = None,
ttl: dict[str, int] | None = None,
indexes: dict[str, list[str | list[str]]] | None = None,
in_memory: bool | None = None,
) -> None:
self._source_engine = source_engine
self._cache = CacheManager(CACHE_DB_PATH, BACKUP_INTERVAL_SECONDS)
use_memory = IN_MEMORY if in_memory is None else in_memory
self._cache = CacheManager(
CACHE_DB_PATH, BACKUP_INTERVAL_SECONDS, in_memory=use_memory
)
self._registry = ColumnRegistry(self._cache.connection)
self._stats = StatsCollector()
self._refresh_interval = REFRESH_INTERVAL_SECONDS