From 209ae667ab0853b0012c212ff0b489bd8c37a004 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Doubravsk=C3=BD?= Date: Mon, 8 Jun 2026 11:39:04 +0200 Subject: [PATCH] Add disk-backed SQLite cache mode as an alternative to in-memory --- .gitignore | 7 +- CHANGELOG.md | 15 ++++ README.md | 22 +++++- project.md | 14 +++- pyproject.toml | 2 +- src/sqlmem/cache.py | 182 +++++++++++++++++++++++++++++-------------- src/sqlmem/config.py | 3 + src/sqlmem/engine.py | 13 +++- tests/test_cache.py | 47 +++++++++++ tests/test_engine.py | 42 ++++++++++ 10 files changed, 280 insertions(+), 67 deletions(-) diff --git a/.gitignore b/.gitignore index 787df82..821cd3f 100644 --- a/.gitignore +++ b/.gitignore @@ -39,8 +39,13 @@ Thumbs.db .env .env.* +# sqlmem cache (incl. WAL sidecars from disk-backed mode) +cache.db +cache.db-wal +cache.db-shm + # Agents AGENTS.md CLAUDE.md DESIGN_DOCUMENT_MODULE.md -.claude \ No newline at end of file +.claude/ \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index eb25d0e..3af6aaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,21 @@ All notable changes to this project will be documented in this file. --- +## [1.7.0] - 2026-06-08 + +### Added +- **Disk-backed cache mode** — `CachingEngine(engine, in_memory=False)` (or env `SQLMEM_IN_MEMORY=false`) queries the on-disk `cache.db` directly instead of loading it into an in-memory SQLite. Every write persists immediately (no hourly backup thread, no load-on-startup copy, no `atexit`/`SIGTERM` flush needed), and the cache may exceed available RAM. The disk connection uses WAL + `synchronous=NORMAL` for write throughput. In-memory mode (backed up to disk periodically) remains the default. `in_memory` defaults to the `SQLMEM_IN_MEMORY` config when omitted. + - On open, a disk cache with a mismatched `schema_version` is wiped in place and rebuilt. + - `engine.reset()` in disk mode drops the cached tables and `VACUUM`s the file (it does not unlink the open file). +- `SQLMEM_IN_MEMORY` env var (default `true`). + +### Changed +- `pyproject.toml` — bumped version to `1.7.0` +- `cache.py` — `CacheManager` gained an `in_memory` flag; the cache connection (`_mem_conn` → `_conn`) is opened either on `:memory:` or directly on the on-disk file. Disk mode skips the load-on-startup copy, backup thread, and shutdown flush, and `reset()` VACUUMs in place instead of unlinking the open file. +- `.gitignore` — ignore `cache.db` and its WAL sidecars (`cache.db-wal`, `cache.db-shm`). + +--- + ## [1.6.0] - 2026-06-05 ### Added diff --git a/README.md b/README.md index b625885..4a827fa 100644 --- a/README.md +++ b/README.md @@ -240,7 +240,7 @@ Each value is a list of index definitions: a string is a single-column index, a ## Persistence -The in-memory cache is persisted to `cache.db` on disk: +By default the cache lives in an **in-memory SQLite** and is persisted to `cache.db` on disk: - **On startup**: if `cache.db` exists, it is loaded into memory. - **Periodically**: a background thread writes a snapshot to disk every `SQLMEM_BACKUP_INTERVAL` seconds. @@ -248,6 +248,20 @@ The in-memory cache is persisted to `cache.db` on disk: The schema version is checked on load — if it does not match, the stale file is discarded and the cache is rebuilt from the database. +### Disk-backed cache (no RAM copy) + +Set `in_memory=False` (or `SQLMEM_IN_MEMORY=false`) to query the on-disk `cache.db` **directly** instead of mirroring it in RAM: + +```python +engine = CachingEngine(base_engine, in_memory=False) +``` + +- The cache can **exceed available memory** — nothing is held in RAM beyond SQLite's page cache. +- Every write **persists immediately** (WAL + `synchronous=NORMAL`), so there is no hourly backup thread, no load-into-memory step on startup, and no shutdown flush to lose. +- On open, a cache file with a mismatched schema version is wiped in place and rebuilt; `engine.reset()` drops the cached tables and `VACUUM`s the file (it does not delete the open file). + +The constructor argument wins over the env var; when `in_memory` is omitted it falls back to `SQLMEM_IN_MEMORY`. + ## Manual cache control ```python @@ -286,8 +300,9 @@ Each `TableStats` reports a live processing **state** and how the table is kept ## Memory and very large tables -The cache is **in-memory SQLite**, so a cached table lives in RAM — it must fit in available memory. To keep huge tables manageable: +By default the cache is **in-memory SQLite**, so a cached table lives in RAM — it must fit in available memory. To keep huge tables manageable: +- **Use [disk-backed mode](#disk-backed-cache-no-ram-copy)** (`in_memory=False`) when the working set simply doesn't fit in RAM — queries then run against `cache.db` on disk instead of a memory copy. - **Loads are streamed in batches** (`SQLMEM_FETCH_BATCH` rows at a time, default 10 000) into a staging table and swapped in atomically. A multi-million-row table never gets fully materialized in Python at once, so the load doesn't spike memory or crash the process, and readers keep seeing the previous copy until the swap completes. - Use **[delta refresh](#incremental-delta-refresh)** for large tables that have a change column — after the first load only changed rows are pulled, so restarts and refreshes don't re-read the whole table. - A **single query that returns a huge result set** (e.g. `SELECT *` over a multi-million-row cached table) still materializes that result as a list of dicts; bound it with a `WHERE`/`LIMIT` rather than selecting everything. @@ -300,7 +315,8 @@ Set via environment variables or a `.env` file: |---|---|---| | `SQLMEM_DEBUG` | `false` | `true` enables DEBUG-level logging | | `SQLMEM_CACHE_DB` | `cache.db` | Path to the on-disk persistence file | -| `SQLMEM_BACKUP_INTERVAL` | `3600` | Disk backup interval in seconds | +| `SQLMEM_IN_MEMORY` | `true` | `false` queries `cache.db` on disk directly (no RAM copy); overridden by the `in_memory` constructor arg | +| `SQLMEM_BACKUP_INTERVAL` | `3600` | Disk backup interval in seconds (in-memory mode only) | | `SQLMEM_SQL_DIALECT` | `tsql` | sqlglot dialect used to parse incoming SQL (e.g. `tsql`, `postgres`, `mysql`) | | `SQLMEM_REFRESH_INTERVAL` | `300` | background refresh tick (seconds) — delta pulls and proactive TTL reloads | | `SQLMEM_FETCH_BATCH` | `10000` | rows fetched per batch when loading a table — caps peak memory for huge tables | diff --git a/project.md b/project.md index 9eca4ce..89e7b15 100644 --- a/project.md +++ b/project.md @@ -47,12 +47,23 @@ with engine.connect() as conn: ## Cache backend +Dva režimy (volí se `CachingEngine(engine, in_memory=...)` nebo env `SQLMEM_IN_MEMORY`): + +**In-memory (výchozí, `in_memory=True`)** + - **SQLite in-memory** jako primární úložiště — veškeré dotazy běží v RAM. - **Persistence na disk** (`cache.db`) ve třech situacích: - **Při startu**: pokud soubor existuje, načte se do paměti (`ATTACH` + kopie). - **Periodicky každou hodinu**: snapshot in-memory SQLite se zapíše na disk (backup API). - **Při vypnutí**: finální zápis na disk před ukončením (signal handler + context manager). -- Celé tabulky se při cache miss načtou z databáze a drží v paměti. + +**Disk-backed (`in_memory=False`)** + +- Dotazy běží přímo nad on-disk souborem `cache.db` — **žádná kopie v RAM**, cache může přesáhnout dostupnou paměť. +- Každý zápis se rovnou ukládá na disk (WAL + `synchronous=NORMAL`); odpadá hodinový backup thread i načítání do paměti při startu. +- Při otevření se cache s nesedícím `schema_version` zahodí a postaví znovu; `engine.reset()` smaže tabulky a provede `VACUUM` (soubor neodlinkuje). + +Celé tabulky se při cache miss načtou z databáze (v obou režimech). --- @@ -195,6 +206,7 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o - [x] **`engine.reset()`**: smaže celou cache (RAM + `cache.db`) pro čistý rebuild po strukturální změně. - [x] **Sekundární indexy**: `indexes={"VW_X": ["col", ["a","b"]]}` — indexy na in-memory cache pro zrychlení `WHERE`/`JOIN`; index-sloupce se auto-dotáhnou, indexy se obnoví po každém (re)loadu. - [x] **TTL na úrovni tabulky**: `ttl={"VW_X": 300}` — pro tabulky bez timestamp sloupce. Garantuje, že cache není starší než interval (full reload při čtení po expiraci + proaktivně na pozadí). +- [x] **Disk-backed cache**: `in_memory=False` (nebo `SQLMEM_IN_MEMORY=false`) — dotazy běží přímo nad on-disk `cache.db` (WAL), bez kopie v RAM; cache může přesáhnout paměť, zápisy se rovnou persistují. ## TODO — budoucí funkce diff --git a/pyproject.toml b/pyproject.toml index 9ef6049..4504770 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sqlmem" -version = "1.6.0" +version = "1.7.0" description = "" authors = [ {name = "jan.doubravsky@gmail.com"} diff --git a/src/sqlmem/cache.py b/src/sqlmem/cache.py index 1fa329b..e59d498 100644 --- a/src/sqlmem/cache.py +++ b/src/sqlmem/cache.py @@ -23,30 +23,46 @@ class _Index: class CacheManager: - def __init__(self, db_path: Path, backup_interval: int) -> None: + def __init__( + self, db_path: Path, backup_interval: int, in_memory: bool = True + ) -> None: self._db_path = db_path self._backup_interval = backup_interval - self._mem_conn = sqlite3.connect(":memory:", check_same_thread=False) + self._in_memory = in_memory self._lock = threading.Lock() # serializes connection access self._load_lock = threading.Lock() # serializes full table loads self._states: dict[str, str] = {} # table → live processing state self._index_defs: dict[str, list[_Index]] = {} # table → secondary indexes self._closed = False - self._ensure_meta_tables() - self._load_from_disk() - self._drop_orphan_staging() - self._start_backup_thread() + if in_memory: + self._conn = sqlite3.connect(":memory:", check_same_thread=False) + else: + # Disk-backed: query the on-disk file directly — no RAM copy, every + # write persists immediately, and the cache can exceed available RAM. + self._conn = sqlite3.connect(str(db_path), check_same_thread=False) + self._conn.execute("PRAGMA journal_mode=WAL") + self._conn.execute("PRAGMA synchronous=NORMAL") + self._discard_if_schema_mismatch() - atexit.register(self._backup_to_disk) - signal.signal(signal.SIGTERM, self._on_sigterm) + self._ensure_meta_tables() + if in_memory: + self._load_from_disk() + self._drop_orphan_staging() + + if in_memory: + self._start_backup_thread() + atexit.register(self._backup_to_disk) + signal.signal(signal.SIGTERM, self._on_sigterm) + else: + atexit.register(self.close) @property def connection(self) -> sqlite3.Connection: - return self._mem_conn + return self._conn def _ensure_meta_tables(self) -> None: - self._mem_conn.executescript(""" + self._conn.executescript(""" CREATE TABLE IF NOT EXISTS _sqlmem_meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL @@ -64,19 +80,52 @@ class CacheManager: PRIMARY KEY (table_name, column_name) ); """) - self._mem_conn.execute( + self._conn.execute( "INSERT OR IGNORE INTO _sqlmem_meta (key, value) VALUES (?, ?)", ("app_version", _meta.__version__), ) - self._mem_conn.execute( + self._conn.execute( "INSERT OR IGNORE INTO _sqlmem_meta (key, value) VALUES (?, ?)", ("schema_version", str(SCHEMA_VERSION)), ) - self._mem_conn.execute( + self._conn.execute( "INSERT OR IGNORE INTO _sqlmem_meta (key, value) VALUES (?, ?)", ("created_at", _now()), ) - self._mem_conn.commit() + self._conn.commit() + + def _discard_if_schema_mismatch(self) -> None: + """Disk mode: wipe an existing cache file written by an incompatible schema. + + In memory mode the equivalent check lives in :meth:`_load_from_disk`; here + we operate on the live on-disk connection, dropping every table so the + meta tables are recreated fresh by :meth:`_ensure_meta_tables`. + """ + meta_exists = self._conn.execute( + "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = '_sqlmem_meta'" + ).fetchone() + if not meta_exists: + return # fresh file — nothing to validate + + row = self._conn.execute( + "SELECT value FROM _sqlmem_meta WHERE key = 'schema_version'" + ).fetchone() + if row is not None and int(row[0]) == SCHEMA_VERSION: + return + + logger.warning( + "Cache schema version mismatch — wiping on-disk cache, starting fresh." + ) + names = [ + r[0] + for r in self._conn.execute( + r"SELECT name FROM sqlite_master WHERE type = 'table' " + r"AND name NOT LIKE 'sqlite\_%' ESCAPE '\'" + ).fetchall() + ] + for name in names: + self._conn.execute(f"DROP TABLE IF EXISTS {name}") + self._conn.commit() def _load_from_disk(self) -> None: if not self._db_path.exists(): @@ -94,7 +143,7 @@ class CacheManager: disk_conn.close() return - disk_conn.backup(self._mem_conn) + disk_conn.backup(self._conn) logger.info("Cache loaded from disk successfully.") except Exception as e: logger.error(f"Failed to load cache from disk: {e} — starting fresh.") @@ -105,25 +154,30 @@ class CacheManager: """Drop staging tables left by a load that was interrupted (e.g. crash mid-load).""" orphans = [ r[0] - for r in self._mem_conn.execute( + for r in self._conn.execute( r"SELECT name FROM sqlite_master " r"WHERE type = 'table' AND name LIKE '%\_\_sqlmem\_load' ESCAPE '\'" ).fetchall() ] for name in orphans: logger.warning(f"Dropping orphan staging table {name!r} from a previous interrupted load.") - self._mem_conn.execute(f"DROP TABLE IF EXISTS {name}") + self._conn.execute(f"DROP TABLE IF EXISTS {name}") if orphans: - self._mem_conn.commit() + self._conn.commit() def _backup_to_disk(self) -> None: if self._closed: return + if not self._in_memory: + # Disk-backed: every write already lands on disk; just flush the WAL. + with self._lock: + self._conn.commit() + return logger.info(f"Backing up cache to {self._db_path}") try: with self._lock: disk_conn = sqlite3.connect(self._db_path) - self._mem_conn.backup(disk_conn) + self._conn.backup(disk_conn) disk_conn.close() logger.info("Cache backup complete.") except Exception as e: @@ -145,7 +199,7 @@ class CacheManager: def mark_table_refreshed(self, table: str, row_count: int, full: bool = False) -> None: with self._lock: - self._mem_conn.execute( + self._conn.execute( """ INSERT INTO _sqlmem_tables (table_name, last_refresh_at, row_count, is_full) VALUES (?, ?, ?, ?) @@ -156,24 +210,24 @@ class CacheManager: """, (table, _now(), row_count, int(full)), ) - self._mem_conn.commit() + self._conn.commit() def is_table_cached(self, table: str) -> bool: - row = self._mem_conn.execute( + row = self._conn.execute( "SELECT 1 FROM _sqlmem_tables WHERE table_name = ?", (table,) ).fetchone() return row is not None def is_table_full(self, table: str) -> bool: """True if the whole table (all columns) is cached — a SELECT * cache hit.""" - row = self._mem_conn.execute( + row = self._conn.execute( "SELECT is_full FROM _sqlmem_tables WHERE table_name = ?", (table,) ).fetchone() return bool(row and row[0]) def seconds_since_refresh(self, table: str) -> float | None: """Age of a cached table in seconds, or None if it is not cached.""" - row = self._mem_conn.execute( + row = self._conn.execute( "SELECT last_refresh_at FROM _sqlmem_tables WHERE table_name = ?", (table,) ).fetchone() if not row or not row[0]: @@ -216,10 +270,10 @@ class CacheManager: continue cols = ", ".join(idx.columns) with self._lock: - self._mem_conn.execute( + self._conn.execute( f"CREATE INDEX IF NOT EXISTS {idx.name} ON {table} ({cols})" ) - self._mem_conn.commit() + self._conn.commit() logger.debug(f"Index {idx.name!r} ready on {table} ({cols})") def load_table( @@ -248,9 +302,9 @@ class CacheManager: try: cursor = source_conn.execute(f"SELECT {cols} FROM {table}") with self._lock: - self._mem_conn.execute(f"DROP TABLE IF EXISTS {staging}") - self._mem_conn.execute(f"CREATE TABLE {staging} ({col_defs})") - self._mem_conn.commit() + self._conn.execute(f"DROP TABLE IF EXISTS {staging}") + self._conn.execute(f"CREATE TABLE {staging} ({col_defs})") + self._conn.commit() total = 0 insert_sql = f"INSERT INTO {staging} VALUES ({placeholders})" @@ -260,18 +314,18 @@ class CacheManager: break clean = [coerce_row(row) for row in batch] with self._lock: - self._mem_conn.executemany(insert_sql, clean) - self._mem_conn.commit() + self._conn.executemany(insert_sql, clean) + self._conn.commit() total += len(batch) with self._lock: # atomic swap — readers see old or new, never partial - self._mem_conn.execute(f"DROP TABLE IF EXISTS {table}") - self._mem_conn.execute(f"ALTER TABLE {staging} RENAME TO {table}") - self._mem_conn.commit() + self._conn.execute(f"DROP TABLE IF EXISTS {table}") + self._conn.execute(f"ALTER TABLE {staging} RENAME TO {table}") + self._conn.commit() except BaseException: with self._lock: - self._mem_conn.execute(f"DROP TABLE IF EXISTS {staging}") - self._mem_conn.commit() + self._conn.execute(f"DROP TABLE IF EXISTS {staging}") + self._conn.commit() self.set_state(table, TableState.ERROR) raise @@ -286,7 +340,7 @@ class CacheManager: """Run a read query against the in-memory cache, serialized with writers.""" bound = coerce_params(params) with self._lock: - cursor = self._mem_conn.execute(sql) if bound is None else self._mem_conn.execute(sql, bound) + cursor = self._conn.execute(sql) if bound is None else self._conn.execute(sql, bound) col_names = [desc[0] for desc in cursor.description] rows = cursor.fetchall() return col_names, rows @@ -295,7 +349,7 @@ class CacheManager: def get_table_columns(self, table: str) -> list[str]: """Authoritative ordered column list of a cached table (via PRAGMA).""" - rows = self._mem_conn.execute(f"PRAGMA table_info({table})").fetchall() + rows = self._conn.execute(f"PRAGMA table_info({table})").fetchall() return [r[1] for r in rows] def create_unique_index(self, table: str, key_columns: list[str]) -> None: @@ -303,28 +357,28 @@ class CacheManager: cols = ", ".join(key_columns) index = f"idx_{table}_pk" with self._lock: - self._mem_conn.execute( + self._conn.execute( f"CREATE UNIQUE INDEX IF NOT EXISTS {index} ON {table} ({cols})" ) - self._mem_conn.commit() + self._conn.commit() def get_last_synced_at(self, table: str) -> str | None: - row = self._mem_conn.execute( + row = self._conn.execute( "SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,) ).fetchone() return row[0] if row else None def set_last_synced_at(self, table: str, value: str | None) -> None: with self._lock: - self._mem_conn.execute( + self._conn.execute( "UPDATE _sqlmem_tables SET last_synced_at = ? WHERE table_name = ?", (value, table), ) - self._mem_conn.commit() + self._conn.commit() def max_value(self, table: str, column: str) -> str | None: """Maximum value of *column* across cached rows (the delta watermark).""" - row = self._mem_conn.execute(f"SELECT MAX({column}) FROM {table}").fetchone() + row = self._conn.execute(f"SELECT MAX({column}) FROM {table}").fetchone() return row[0] if row else None def upsert_rows(self, table: str, columns: list[str], rows: list[tuple]) -> None: @@ -333,44 +387,54 @@ class CacheManager: placeholders = ", ".join("?" * len(columns)) clean_rows = [coerce_row(row) for row in rows] with self._lock: - self._mem_conn.executemany( + self._conn.executemany( f"INSERT OR REPLACE INTO {table} ({col_list}) VALUES ({placeholders})", clean_rows, ) - self._mem_conn.commit() + self._conn.commit() def count_rows(self, table: str) -> int: - row = self._mem_conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone() + row = self._conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone() return int(row[0]) if row else 0 def reset(self) -> None: - """Wipe the entire cache — every cached table plus the on-disk file.""" + """Wipe the entire cache — every cached table plus the on-disk data + (the file is deleted in memory mode, VACUUMed in place in disk mode).""" logger.info("Resetting cache — dropping all cached tables.") with self._lock: user_tables = [ r[0] - for r in self._mem_conn.execute( + for r in self._conn.execute( "SELECT name FROM sqlite_master " r"WHERE type = 'table' AND name NOT LIKE 'sqlite\_%' ESCAPE '\' " r"AND name NOT LIKE '\_sqlmem\_%' ESCAPE '\'" ).fetchall() ] for name in user_tables: - self._mem_conn.execute(f"DROP TABLE IF EXISTS {name}") - self._mem_conn.execute("DELETE FROM _sqlmem_tables") - self._mem_conn.execute("DELETE FROM _sqlmem_columns") - self._mem_conn.commit() + self._conn.execute(f"DROP TABLE IF EXISTS {name}") + self._conn.execute("DELETE FROM _sqlmem_tables") + self._conn.execute("DELETE FROM _sqlmem_columns") + self._conn.commit() self._states.clear() - try: - if self._db_path.exists(): - self._db_path.unlink() - except OSError as e: - logger.error(f"Failed to delete cache file {self._db_path}: {e}") + if self._in_memory: + try: + if self._db_path.exists(): + self._db_path.unlink() + except OSError as e: + logger.error(f"Failed to delete cache file {self._db_path}: {e}") + else: + # The open connection *is* the file — drop tables persisted the wipe; + # VACUUM reclaims the freed pages on disk. + try: + with self._lock: + self._conn.execute("VACUUM") + except sqlite3.Error as e: + logger.error(f"Failed to VACUUM cache file {self._db_path}: {e}") def close(self) -> None: self._backup_to_disk() self._closed = True - self._mem_conn.close() + self._conn.close() def _now() -> str: diff --git a/src/sqlmem/config.py b/src/sqlmem/config.py index 32d181b..97f58d5 100644 --- a/src/sqlmem/config.py +++ b/src/sqlmem/config.py @@ -8,6 +8,9 @@ load_dotenv() DEBUG = os.getenv("SQLMEM_DEBUG", "false").lower() == "true" CACHE_DB_PATH = Path(os.getenv("SQLMEM_CACHE_DB", "cache.db")) +# Cache backend: in-memory SQLite (default) backed up to disk periodically, or +# query the on-disk SQLite file directly (no RAM copy, every write persists). +IN_MEMORY = os.getenv("SQLMEM_IN_MEMORY", "true").lower() == "true" BACKUP_INTERVAL_SECONDS = int(os.getenv("SQLMEM_BACKUP_INTERVAL", "3600")) # How often (seconds) the background thread pulls deltas for delta-tracked tables. REFRESH_INTERVAL_SECONDS = int(os.getenv("SQLMEM_REFRESH_INTERVAL", "300")) diff --git a/src/sqlmem/engine.py b/src/sqlmem/engine.py index a5fabbf..06899c9 100644 --- a/src/sqlmem/engine.py +++ b/src/sqlmem/engine.py @@ -8,7 +8,12 @@ from sqlalchemy import inspect from sqlalchemy.engine import Engine from .cache import CacheManager -from .config import BACKUP_INTERVAL_SECONDS, CACHE_DB_PATH, REFRESH_INTERVAL_SECONDS +from .config import ( + BACKUP_INTERVAL_SECONDS, + CACHE_DB_PATH, + IN_MEMORY, + REFRESH_INTERVAL_SECONDS, +) from .delta import DeltaConfig, DeltaRefresher, ResolvedDelta from .executor import QueryExecutor from .parser import Params, parse @@ -25,9 +30,13 @@ class CachingEngine: delta: dict[str, DeltaConfig] | None = None, ttl: dict[str, int] | None = None, indexes: dict[str, list[str | list[str]]] | None = None, + in_memory: bool | None = None, ) -> None: self._source_engine = source_engine - self._cache = CacheManager(CACHE_DB_PATH, BACKUP_INTERVAL_SECONDS) + use_memory = IN_MEMORY if in_memory is None else in_memory + self._cache = CacheManager( + CACHE_DB_PATH, BACKUP_INTERVAL_SECONDS, in_memory=use_memory + ) self._registry = ColumnRegistry(self._cache.connection) self._stats = StatsCollector() self._refresh_interval = REFRESH_INTERVAL_SECONDS diff --git a/tests/test_cache.py b/tests/test_cache.py index 63dbe7d..321f462 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -58,3 +58,50 @@ def test_backup_and_reload(tmp_path, source_conn): c2 = CacheManager(db_path=db_path, backup_interval=9999) assert c2.is_table_cached("users") is True c2.close() + + +# --------------------------------------------------------------------------- +# Disk-backed mode (in_memory=False) +# --------------------------------------------------------------------------- + +def test_disk_mode_persists_without_backup(tmp_path, source_conn): + """Disk mode writes straight to the file — no explicit backup/close needed.""" + db_path = tmp_path / "cache.db" + c = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False) + c.load_table("users", ["name"], source_conn) + # Data is already on disk; a brand-new disk-mode manager sees it. + c2 = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False) + assert c2.is_table_cached("users") is True + c2.close() + c.close() + + +def test_disk_mode_file_created_immediately(tmp_path, source_conn): + db_path = tmp_path / "cache.db" + c = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False) + c.load_table("users", ["name"], source_conn) + assert db_path.exists() + c.close() + + +def test_disk_mode_reload_in_new_instance(tmp_path, source_conn): + db_path = tmp_path / "cache.db" + c1 = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False) + c1.load_table("users", ["name", "email"], source_conn) + c1.close() + + c2 = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False) + rows = c2.connection.execute("SELECT name FROM users").fetchall() + assert {r[0] for r in rows} == {"alice", "bob"} + c2.close() + + +def test_disk_mode_reset_keeps_file(tmp_path, source_conn): + db_path = tmp_path / "cache.db" + c = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False) + c.load_table("users", ["name"], source_conn) + c.reset() + # File stays (the connection is still open) but the table is gone. + assert db_path.exists() + assert c.is_table_cached("users") is False + c.close() diff --git a/tests/test_engine.py b/tests/test_engine.py index 301e94e..ccee4cd 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -289,3 +289,45 @@ def test_invalidate_then_refetch_works(engine): def test_invalidate_unknown_table_is_noop(engine): engine.invalidate("nonexistent_table") # must not raise + + +# --------------------------------------------------------------------------- +# Disk-backed cache (in_memory=False) +# --------------------------------------------------------------------------- + +def test_disk_mode_query_works(source_engine, cache_path, monkeypatch): + monkeypatch.setattr(eng_mod, "CACHE_DB_PATH", cache_path) + monkeypatch.setattr(eng_mod, "BACKUP_INTERVAL_SECONDS", 9999) + + ce = CachingEngine(source_engine, in_memory=False) + rows = ce.execute("SELECT id, name FROM products") + assert {r["name"] for r in rows} == {"Widget", "Gadget", "Doohickey"} + assert ce._cache._in_memory is False + ce.close() + + +def test_disk_mode_persists_across_instances(source_engine, cache_path, monkeypatch): + monkeypatch.setattr(eng_mod, "CACHE_DB_PATH", cache_path) + monkeypatch.setattr(eng_mod, "BACKUP_INTERVAL_SECONDS", 9999) + + ce1 = CachingEngine(source_engine, in_memory=False) + ce1.execute("SELECT id, name FROM products") + ce1.close() + + # Second instance opens the same on-disk cache and finds the table already there. + ce2 = CachingEngine(source_engine, in_memory=False) + assert ce2._cache.is_table_cached("products") is True + rows = ce2.execute("SELECT id, name FROM products") + assert {r["name"] for r in rows} == {"Widget", "Gadget", "Doohickey"} + ce2.close() + + +def test_in_memory_override_respects_config(source_engine, cache_path, monkeypatch): + """in_memory=None falls back to the IN_MEMORY config default.""" + monkeypatch.setattr(eng_mod, "CACHE_DB_PATH", cache_path) + monkeypatch.setattr(eng_mod, "BACKUP_INTERVAL_SECONDS", 9999) + monkeypatch.setattr(eng_mod, "IN_MEMORY", False) + + ce = CachingEngine(source_engine) # no explicit in_memory + assert ce._cache._in_memory is False + ce.close()