From 85bb84a1a6ef9676b9c8a02bd1b2796076113c38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Doubravsk=C3=BD?= Date: Fri, 5 Jun 2026 12:12:57 +0200 Subject: [PATCH] Add per-table TTL refresh for tables without a change column --- CHANGELOG.md | 11 ++-- README.md | 29 ++++++++- project.md | 3 +- pyproject.toml | 2 +- src/sqlmem/cache.py | 10 +++ src/sqlmem/engine.py | 33 +++++++++- src/sqlmem/executor.py | 34 ++++++++-- tests/test_ttl.py | 137 +++++++++++++++++++++++++++++++++++++++++ 8 files changed, 240 insertions(+), 19 deletions(-) create mode 100644 tests/test_ttl.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b8ab3b..fa5e33e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ All notable changes to this project will be documented in this file. --- -## [1.3.1] - 2026-06-05 +## [1.4.0] - 2026-06-05 ### Fixed - **`decimal.Decimal` (and `datetime`) binding error** — `NUMERIC`/`DECIMAL`/`MONEY` columns from SQL Server (pyodbc) arrive as `decimal.Decimal`, which `sqlite3` cannot bind, crashing the cache load with `type 'decimal.Decimal' is not supported`. Values are now coerced to sqlite-bindable types (`Decimal`→`str`, `datetime`/`date`/`time`→ISO, `uuid.UUID`→`str`, `bytearray`→`bytes`) at the cache boundary — on full load, on delta upsert, and for WHERE parameters. Coercion is local (no global `sqlite3.register_adapter`), so the host application's `sqlite3` behaviour is untouched. Cache columns are `TEXT`, so the conversion is lossless and exact (no rounding). @@ -16,14 +16,15 @@ All notable changes to this project will be documented in this file. - Data-driven high-watermark = `max(change_column)` cached, persisted in `cache.db`; `>=` overlap + idempotent upsert so no row is missed and boundary rows are harmlessly re-read. - Catch-up on startup (since last shutdown) and a background thread refreshing every `SQLMEM_REFRESH_INTERVAL` seconds (default 300); `engine.refresh()` triggers a pull on demand. - Primary key is auto-discovered from the source DB (`inspect(engine).get_pk_constraint`) when `key_columns` is omitted; required explicitly for views (raises `ValueError`). +- **Per-table TTL (time-based refresh)** — `CachingEngine(engine, ttl={"VW_X": 300})` for tables with no change column that can't be delta-synced. The cached copy is guaranteed never older than the TTL: a query touching an expired table triggers a full reload before it is answered (read-time guarantee), and the background thread proactively reloads expired tables. TTL age uses the persisted `last_refresh_at`, so the bound holds across restarts. A table in both `delta` and `ttl` raises `ValueError`. - `DeltaConfig` exported from the public API. - `engine.reset()` — wipes the whole cache (RAM + `cache.db`) for a clean rebuild after structural source changes. -- `SQLMEM_REFRESH_INTERVAL` env var (default `300`). +- `SQLMEM_REFRESH_INTERVAL` env var (default `300`) — background refresh tick for delta pulls and proactive TTL reloads. ### Changed -- `pyproject.toml` — bumped version to `1.3.1` -- `cache.py` — schema version bumped to `3`; `_sqlmem_tables` gained a `last_synced_at` watermark column. New methods: `execute_in_memory` (lock-serialized read), `get_table_columns`, `create_unique_index`, `get/set_last_synced_at`, `max_value`, `upsert_rows`, `reset`. Existing on-disk caches are discarded and rebuilt on load. -- `executor.py` — loading a delta-tracked table augments the column set with its key and change columns, creates the unique key index, and records the initial watermark; in-memory reads now go through the cache lock. +- `pyproject.toml` — bumped version to `1.4.0` +- `cache.py` — schema version bumped to `3`; `_sqlmem_tables` gained a `last_synced_at` watermark column. New methods: `execute_in_memory` (lock-serialized read), `get_table_columns`, `create_unique_index`, `get/set_last_synced_at`, `max_value`, `upsert_rows`, `seconds_since_refresh`, `reset`. Existing on-disk caches are discarded and rebuilt on load. +- `executor.py` — delta-tracked tables augment their column set with key/change columns (unique key index + initial watermark); TTL-tracked tables full-reload at read time when expired; in-memory reads go through the cache lock. --- diff --git a/README.md b/README.md index e696784..c057aee 100644 --- a/README.md +++ b/README.md @@ -185,7 +185,7 @@ sequenceDiagram - **First use** of a delta table → full load; the watermark is set to the table's current `max(change_column)`. - **On startup** → for each delta table restored from disk, a single catch-up query pulls everything changed **since the last shutdown** and upserts it, bringing the cache back in sync without a full reload. - **While running** → a background thread repeats the delta pull every `SQLMEM_REFRESH_INTERVAL` seconds (default 5 minutes), so the cache trails the source DB by at most that interval. -- Tables **without** a `DeltaConfig` keep the current behaviour: full load on miss, never auto-refreshed. +- Tables **without** a `DeltaConfig` keep the default behaviour: full load on miss, never auto-refreshed — unless they are given a [TTL](#time-based-refresh-tables-without-a-change-column). ### Requirements and limits of delta sync @@ -195,6 +195,29 @@ sequenceDiagram - **Structural changes are not covered by delta sync** — adding/removing attributes, or clearing values *without* bumping `change_column`, won't be picked up. For those, force a clean reload with [`engine.reset()`](#manual-cache-control) (or `invalidate()` for a single table). - Hard `DELETE`s of whole rows are not detected by a change-timestamp; this workload doesn't delete rows, but if yours does, use a soft-delete flag column or `reset()`. +## Time-based refresh (tables without a change column) + +Some tables can't be delta-synced because they have no change timestamp. For those you can set a **TTL** (max age in seconds): SQLmem keeps serving from cache and guarantees the cached copy is **never older than the TTL** by doing a full reload when it expires. + +```python +engine = CachingEngine( + base_engine, + ttl={ + "VW_LOOKUP_CODES": 300, # full-reload if the cache is older than 5 minutes + "VW_SETTINGS": 3600, + }, +) +``` + +- **Read-time guarantee** — when a query touches a TTL table whose cache is older than its TTL, the table is fully reloaded *before* the query is answered, so a stale copy is never returned. +- **Proactive** — the background thread also full-reloads expired TTL tables every `SQLMEM_REFRESH_INTERVAL` seconds, keeping them warm so reads usually don't pay the reload latency. +- TTL age is measured from `last_refresh_at`, which is persisted in `cache.db`, so the guarantee holds across restarts (an expired table is reloaded on first use after start). +- A table may be in **either** `delta` **or** `ttl`, not both (delta already keeps it fresh) — supplying both raises `ValueError`. + +```python +engine.refresh() # also reloads any expired TTL tables on demand +``` + ## Persistence The in-memory cache is persisted to `cache.db` on disk: @@ -235,7 +258,7 @@ Set via environment variables or a `.env` file: | `SQLMEM_CACHE_DB` | `cache.db` | Path to the on-disk persistence file | | `SQLMEM_BACKUP_INTERVAL` | `3600` | Disk backup interval in seconds | | `SQLMEM_SQL_DIALECT` | `tsql` | sqlglot dialect used to parse incoming SQL (e.g. `tsql`, `postgres`, `mysql`) | -| `SQLMEM_REFRESH_INTERVAL` | `300` | delta-refresh interval in seconds for delta-tracked tables | +| `SQLMEM_REFRESH_INTERVAL` | `300` | background refresh tick (seconds) — delta pulls and proactive TTL reloads | ## Exceptions @@ -276,7 +299,7 @@ Set `SQLMEM_DEBUG=true` in `.env` to make the default level DEBUG when no explic - [x] **Incremental (delta) refresh** via per-table change-timestamp + key columns (see above) — the key feature for large tables. - [x] **Primary-key auto-discovery** from the source DB (`inspect(engine).get_pk_constraint`) so `key_columns` is only needed for views. - [x] **`engine.reset()`** — wipe RAM + `cache.db` for a clean rebuild after structural changes. -- [ ] Per-table TTL (time-to-live) expiry. +- [x] **Per-table TTL** (time-to-live) — bounded-staleness full refresh for tables without a change column. ## Dependencies diff --git a/project.md b/project.md index 6a98149..1a0b53d 100644 --- a/project.md +++ b/project.md @@ -193,10 +193,11 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o - [x] **Třídílné názvy tabulek**: `[catalog].[schema].[table]` se cachuje pod base name, in-memory dotaz prefix stripuje. - [x] **Inkrementální (delta) refresh**: per-tabulku `DeltaConfig(change_column, key_columns)` — sync jen změněných řádků přes datový watermark `max(change_column)` (`>=` + idempotentní upsert podle klíče), catch-up na startu + background thread (`SQLMEM_REFRESH_INTERVAL`, default 300 s). PK se auto-zjistí ze zdrojové DB, pro views nutno zadat ručně. - [x] **`engine.reset()`**: smaže celou cache (RAM + `cache.db`) pro čistý rebuild po strukturální změně. +- [x] **TTL na úrovni tabulky**: `ttl={"VW_X": 300}` — pro tabulky bez timestamp sloupce. Garantuje, že cache není starší než interval (full reload při čtení po expiraci + proaktivně na pozadí). ## TODO — budoucí funkce -- **TTL na úrovni tabulky**: automatické vypršení cache po nastaveném čase. +- _(zatím žádné otevřené položky)_ --- diff --git a/pyproject.toml b/pyproject.toml index b4b690d..f44f894 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sqlmem" -version = "1.3.1" +version = "1.4.0" description = "" authors = [ {name = "jan.doubravsky@gmail.com"} diff --git a/src/sqlmem/cache.py b/src/sqlmem/cache.py index 2ab3e9d..e00e907 100644 --- a/src/sqlmem/cache.py +++ b/src/sqlmem/cache.py @@ -143,6 +143,16 @@ class CacheManager: ).fetchone() return bool(row and row[0]) + def seconds_since_refresh(self, table: str) -> float | None: + """Age of a cached table in seconds, or None if it is not cached.""" + row = self._mem_conn.execute( + "SELECT last_refresh_at FROM _sqlmem_tables WHERE table_name = ?", (table,) + ).fetchone() + if not row or not row[0]: + return None + last = datetime.fromisoformat(row[0]) + return (datetime.now(timezone.utc) - last).total_seconds() + def discover_columns(self, table: str, source_conn: sqlite3.Connection) -> list[str]: """Return all column names of *table* from the source DB without fetching rows.""" logger.debug(f"Discovering columns of {table!r} from source DB") diff --git a/src/sqlmem/engine.py b/src/sqlmem/engine.py index b08fb7b..981182a 100644 --- a/src/sqlmem/engine.py +++ b/src/sqlmem/engine.py @@ -22,6 +22,7 @@ class CachingEngine: self, source_engine: Engine, delta: dict[str, DeltaConfig] | None = None, + ttl: dict[str, int] | None = None, ) -> None: self._source_engine = source_engine self._cache = CacheManager(CACHE_DB_PATH, BACKUP_INTERVAL_SECONDS) @@ -29,9 +30,18 @@ class CachingEngine: self._stats = StatsCollector() self._refresh_interval = REFRESH_INTERVAL_SECONDS self._delta = self._resolve_delta(delta or {}) + self._ttl = dict(ttl or {}) self._refresher = DeltaRefresher(self._cache, self._delta) - if self._delta: + overlap = set(self._delta) & set(self._ttl) + if overlap: + raise ValueError( + f"Tables {sorted(overlap)} are in both delta and ttl — a table is " + "either delta-refreshed (has a change column) or TTL-refreshed (full " + "reload), not both." + ) + + if self._delta or self._ttl: self._run_refresh() # catch up tables restored from disk self._start_refresh_thread() @@ -66,7 +76,7 @@ class CachingEngine: with self._source_engine.connect() as sa_conn: raw_conn = cast(sqlite3.Connection, sa_conn.connection.dbapi_connection) executor = QueryExecutor( - self._cache, self._registry, raw_conn, self._stats, self._delta + self._cache, self._registry, raw_conn, self._stats, self._delta, self._ttl ) return executor.execute(parsed) @@ -79,8 +89,25 @@ class CachingEngine: with self._source_engine.connect() as sa_conn: raw_conn = cast(sqlite3.Connection, sa_conn.connection.dbapi_connection) self._refresher.refresh(raw_conn) + self._refresh_ttl(raw_conn) except Exception as e: - logger.error(f"Delta refresh cycle failed: {e}") + logger.error(f"Refresh cycle failed: {e}") + + def _refresh_ttl(self, source_conn: sqlite3.Connection) -> None: + """Proactively full-reload TTL-tracked tables whose cache has expired.""" + for table, ttl in self._ttl.items(): + if not self._cache.is_table_cached(table): + continue + age = self._cache.seconds_since_refresh(table) + if age is None or age <= ttl: + continue + try: + columns = self._cache.get_table_columns(table) + full = self._cache.is_table_full(table) + self._cache.load_table(table, columns, source_conn, full=full) + logger.info(f"TTL refresh {table!r}: reloaded (age {age:.0f}s > {ttl}s)") + except Exception as e: + logger.error(f"TTL refresh failed for {table!r}: {e}") def _start_refresh_thread(self) -> None: def loop() -> None: diff --git a/src/sqlmem/executor.py b/src/sqlmem/executor.py index 629a1fa..2c3b46f 100644 --- a/src/sqlmem/executor.py +++ b/src/sqlmem/executor.py @@ -17,12 +17,22 @@ class QueryExecutor: source_conn: sqlite3.Connection, stats: StatsCollector, delta: dict[str, ResolvedDelta] | None = None, + ttl: dict[str, int] | None = None, ) -> None: self._cache = cache self._registry = registry self._source_conn = source_conn self._stats = stats self._delta = delta or {} + self._ttl = ttl or {} + + def _ttl_expired(self, table: str) -> bool: + """True if *table* has a TTL and its cached copy is older than that TTL.""" + ttl = self._ttl.get(table) + if ttl is None: + return False + age = self._cache.seconds_since_refresh(table) + return age is not None and age > ttl def execute(self, parsed: ParsedQuery) -> list[dict]: for table in parsed.tables: @@ -37,12 +47,18 @@ class QueryExecutor: def _ensure_full(self, table: str) -> None: """Load every column of *table* (SELECT * / t.*), refetching unless already full.""" - if self._cache.is_table_cached(table) and self._cache.is_table_full(table): + cached = self._cache.is_table_cached(table) + stale = cached and self._ttl_expired(table) + + if cached and self._cache.is_table_full(table) and not stale: logger.debug(f"Cache hit (full): {table!r}") self._stats.record_hit() return - if self._cache.is_table_cached(table): + if cached and stale: + logger.info(f"Cache expired (ttl) — reloading {table!r} in full.") + self._stats.record_refetch() + elif cached: logger.warning(f"Re-fetching {table!r} in full — SELECT * requested.") self._stats.record_refetch() else: @@ -52,16 +68,20 @@ class QueryExecutor: self._load(table, columns, full=True) def _ensure_columns(self, table: str, columns: list[str]) -> None: - """Load *table* with at least *columns*, refetching only when columns are missing.""" + """Load *table* with at least *columns*, refetching on new columns or TTL expiry.""" missing = self._registry.needs_refetch(table, columns) table_cached = self._cache.is_table_cached(table) + stale = table_cached and self._ttl_expired(table) - if not missing and table_cached: + if table_cached and not missing and not stale: logger.debug(f"Cache hit: {table!r} columns={columns}") self._stats.record_hit() return - if table_cached and missing: + if stale: + logger.info(f"Cache expired (ttl) — reloading {table!r}.") + self._stats.record_refetch() + elif table_cached and missing: logger.warning( f"Re-fetching {table!r} — new columns requested: {missing}. " f"Expanding cache from {self._registry.get_columns(table)} + {missing}" @@ -71,7 +91,9 @@ class QueryExecutor: self._stats.record_miss() all_columns = list(self._registry.get_columns(table)) + missing - self._load(table, all_columns, full=False) + # Preserve a fully-cached table's status across a TTL reload. + full = table_cached and self._cache.is_table_full(table) + self._load(table, all_columns, full=full) def _load(self, table: str, columns: list[str], full: bool) -> None: """Fetch *table* into cache, adding delta key/timestamp columns when tracked.""" diff --git a/tests/test_ttl.py b/tests/test_ttl.py new file mode 100644 index 0000000..5d4b1bd --- /dev/null +++ b/tests/test_ttl.py @@ -0,0 +1,137 @@ +import sqlite3 + +import pytest +from sqlalchemy import create_engine + +import sqlmem.engine as eng_mod +from sqlmem import CachingEngine, DeltaConfig +from sqlmem.cache import CacheManager +from sqlmem.executor import QueryExecutor +from sqlmem.parser import parse +from sqlmem.registry import ColumnRegistry +from sqlmem.stats import StatsCollector + + +@pytest.fixture +def source_conn(): + conn = sqlite3.connect(":memory:") + conn.executescript( + """ + CREATE TABLE products (id TEXT, name TEXT, price TEXT); + INSERT INTO products VALUES ('1', 'Widget', '9.99'), ('2', 'Gadget', '19.99'); + """ + ) + conn.commit() + yield conn + conn.close() + + +def make_executor(tmp_path, source_conn, ttl): + cache = CacheManager(db_path=tmp_path / "cache.db", backup_interval=9999) + registry = ColumnRegistry(cache.connection) + stats = StatsCollector() + executor = QueryExecutor(cache, registry, source_conn, stats, None, ttl) + return executor + + +def run(executor, sql, params=None): + return executor.execute(parse(sql, params)) + + +# --- lazy (read-time) guarantee -------------------------------------------- + + +def test_ttl_zero_reloads_every_access(tmp_path, source_conn): + executor = make_executor(tmp_path, source_conn, ttl={"products": 0}) + run(executor, "SELECT id, price FROM products") # miss → load + source_conn.execute("UPDATE products SET price = '1.11' WHERE id = '1'") + source_conn.commit() + + rows = {r["id"]: r for r in run(executor, "SELECT id, price FROM products")} + assert rows["1"]["price"] == "1.11" # stale → reloaded, sees new value + assert executor._stats.refetches == 1 + assert executor._stats.misses == 1 + + +def test_ttl_fresh_is_cache_hit(tmp_path, source_conn): + executor = make_executor(tmp_path, source_conn, ttl={"products": 9999}) + run(executor, "SELECT id, price FROM products") + source_conn.execute("UPDATE products SET price = '1.11' WHERE id = '1'") + source_conn.commit() + + rows = {r["id"]: r for r in run(executor, "SELECT id, price FROM products")} + assert rows["1"]["price"] == "9.99" # still fresh → old cached value served + assert executor._stats.hits == 1 + assert executor._stats.refetches == 0 + + +def test_ttl_preserves_full_status(tmp_path, source_conn): + executor = make_executor(tmp_path, source_conn, ttl={"products": 0}) + run(executor, "SELECT * FROM products") # full load + run(executor, "SELECT * FROM products") # stale → full reload + assert executor._cache.is_table_full("products") is True + + +def test_untracked_table_never_expires(tmp_path, source_conn): + executor = make_executor(tmp_path, source_conn, ttl={"other": 0}) + run(executor, "SELECT id, name FROM products") + source_conn.execute("UPDATE products SET name = 'X' WHERE id = '1'") + source_conn.commit() + rows = {r["id"]: r for r in run(executor, "SELECT id, name FROM products")} + assert rows["1"]["name"] == "Widget" # no TTL on this table → cache hit + assert executor._stats.hits == 1 + + +# --- engine-level: background refresh + config validation ------------------- + + +@pytest.fixture +def source_db(tmp_path): + db_path = tmp_path / "source.db" + conn = sqlite3.connect(db_path) + conn.executescript( + """ + CREATE TABLE products (id TEXT PRIMARY KEY, name TEXT, changed TEXT); + INSERT INTO products VALUES ('1', 'Widget', '2026-06-01 10:00:00'); + """ + ) + conn.commit() + conn.close() + return db_path + + +@pytest.fixture +def source_engine(source_db): + engine = create_engine(f"sqlite:///{source_db}") + yield engine + engine.dispose() + + +@pytest.fixture +def patched_cache(tmp_path, monkeypatch): + monkeypatch.setattr(eng_mod, "CACHE_DB_PATH", tmp_path / "cache.db") + monkeypatch.setattr(eng_mod, "BACKUP_INTERVAL_SECONDS", 9999) + + +def test_background_ttl_refresh(source_engine, source_db, patched_cache): + engine = CachingEngine(source_engine, ttl={"products": 0}) + engine.execute("SELECT id, name FROM products") + + conn = sqlite3.connect(source_db) + conn.execute("UPDATE products SET name = 'Widget2' WHERE id = '1'") + conn.commit() + conn.close() + + engine.refresh() # background-style full reload of the expired table + rows = engine.execute("SELECT id, name FROM products") + assert rows[0]["name"] == "Widget2" + engine.close() + + +def test_delta_and_ttl_overlap_raises(source_engine, patched_cache): + with pytest.raises(ValueError): + CachingEngine( + source_engine, + delta={"products": DeltaConfig(change_column="changed", key_columns=["id"])}, + ttl={"products": 300}, + )