diff --git a/CHANGELOG.md b/CHANGELOG.md index d4c3d0b..7fda6e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,24 @@ All notable changes to this project will be documented in this file. --- +## [1.16.0] - 2026-06-11 + +### Added +- **Declarative table specs — `CachingEngine(tables=[TableSpec(...)])`** — declare each cached table up front (columns, indexes, refresh strategy, datetime columns, preload) instead of letting the engine learn columns lazily from queries. New public types `TableSpec`, `TTL`, `Delta` (a friendly alias of `DeltaConfig`) and exception `UndeclaredError`. + - **Background preload** — `preload=True` tables are loaded at startup (on the background thread by default, so startup isn't blocked; `blocking_startup_refresh=True` loads them synchronously). A copy already fresh in the persistent cache is skipped via the same double-checked locking added in 1.15.0, so a warm restart is instant. + - **Fail-fast on undeclared access** — in declarative mode a query referencing a table that has no `TableSpec`, or a column outside a spec's declared `columns` (including `SELECT *` on a column-restricted table), raises `UndeclaredError` instead of silently triggering an expensive lazy load / column-expansion. Declare `columns=None` to cache the whole table and allow any column. + - **Solves the lazy second-reload** — because columns are declared, a first query for a previously unseen column no longer forces a full re-fetch. +- `executor.ensure_loaded(table, columns)` — preloads a table into the cache (reusing the full load path: delta/index augmentation, registry, watermark, double-checked locking) without materializing any rows. + +### Fixed +- **Race on the shared cache connection** — the metadata reads (`is_table_cached`, `is_table_full`, `seconds_since_refresh`, `get_table_columns`, `get_last_synced_at`, `max_value`, `count_rows`) touched the single shared SQLite connection without the connection lock, so a query thread reading while the background refresh/preload thread wrote could raise `sqlite3.InterfaceError`. These reads now take the lock. More likely to surface now that startup preload adds background-thread activity. + +### Changed +- `pyproject.toml` — bumped version to `1.16.0`. +- **Fully backward compatible** — omit `tables=` and the legacy `delta=`/`ttl=`/`indexes=`/`datetime_columns=` kwargs behave exactly as before (lazy mode, no fail-fast). Passing both `tables=` and any of those kwargs raises `ValueError`; `tables=` is internally converted to the same config. + +--- + ## [1.15.0] - 2026-06-11 ### Fixed diff --git a/README.md b/README.md index 0319fe9..bde50c5 100644 --- a/README.md +++ b/README.md @@ -238,6 +238,41 @@ Each value is a list of index definitions: a string is a single-column index, a - Indexes are **recreated after every (re)load** — full loads, TTL reloads, and `invalidate()` + re-fetch all rebuild them — so they're always present, and they persist in `cache.db` across restarts. - Delta-tracked tables already get a unique index on their key columns; secondary indexes are independent and can be combined with `delta` or `ttl`. +## Declarative initialization (`tables=`) + +Instead of the lazy "learn columns from queries" mode, you can **declare every table up front** with `tables=[TableSpec(...)]` — its columns, indexes, refresh strategy and which columns are datetimes — and have the engine preload them and reject anything undeclared: + +```python +from sqlmem import CachingEngine, TableSpec, Delta, TTL + +engine = CachingEngine( + base_engine, + tables=[ + TableSpec( + name="VW_P_PRATVALUES", + columns=["PRODUCT_PRODUCTNR", "PRAT_NAME", "PRATVALUE", "CHANGE_DATE"], + indexes=["PRODUCT_PRODUCTNR", "PRAT_NAME", "CHANGE_DATE"], + refresh=Delta(change_column="CHANGE_DATE", key_columns=["PRATVALUE_ID"]), + datetime_columns=["CHANGE_DATE"], + preload=True, + ), + TableSpec( + name="VW_PRODUCTS_ASSIGNED_E", + columns=["PRODUCT_PRODUCTNR", "ELEMENT_NAME", "ELEMENT_ID"], + indexes=["PRODUCT_PRODUCTNR"], + refresh=TTL(seconds=1800), + preload=True, + ), + ], + pragmas={"mmap_size": 32 * 1024**3, "page_size": 8192}, +) +``` + +- **Preload** — `preload=True` tables are loaded at startup (on the background thread by default, so startup isn't blocked; pass `blocking_startup_refresh=True` to load them synchronously before serving). A copy already fresh in the persistent cache is **skipped**, so a warm restart is instant. During warm-up a table reports `TableState.LOADING` in [`stats`](#runtime-statistics) — handy for gating a `503` until it's `ready`. +- **Fail-fast** — a query for a table without a `TableSpec`, or for a column outside a spec's declared `columns` (including `SELECT *` on a column-restricted table), raises `UndeclaredError` instead of silently kicking off an expensive lazy load. Use `columns=None` to cache the whole table and allow any column. +- `refresh=` takes a `Delta(change_column=…, key_columns=…)` (same as `DeltaConfig`) or `TTL(seconds=…)`, or `None` for a static table. +- **Backward compatible** — omit `tables=` and the legacy `delta=`/`ttl=`/`indexes=`/`datetime_columns=` kwargs work exactly as before (lazy mode, no fail-fast). Passing both raises `ValueError`. + ## Persistence By default the cache lives in an **in-memory SQLite** and is persisted to `cache.db` on disk: @@ -413,9 +448,10 @@ By default the **startup catch-up** (delta pulls and TTL reloads for tables rest |---|---| | `ReadOnlyError` | INSERT, UPDATE, or DELETE statement | | `UnsupportedQueryError` | non-SELECT statement, `SELECT` without `FROM`, or an unqualified column in a multi-table query | +| `UndeclaredError` | in [declarative mode](#declarative-initialization-tables) (`tables=`): a query references a table or column that was not declared | ```python -from sqlmem import ReadOnlyError, UnsupportedQueryError +from sqlmem import ReadOnlyError, UnsupportedQueryError, UndeclaredError ``` ## Logging diff --git a/project.md b/project.md index a7d5fc2..af66e53 100644 --- a/project.md +++ b/project.md @@ -225,10 +225,11 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o - [x] **`vacuum(incremental=True)` varuje bez `auto_vacuum=INCREMENTAL`**: dřív tichý no-op; teď zaloguje warning (a jak to opravit) a vrátí se. - [x] **`Stats.db_size_bytes`**: velikost cache souboru na disku (0 v memory módu) ve `stats` pro monitoring. - [x] **Ochrana proti cache stampede**: `load_table` dělá double-checked locking — po získání `_load_lock` znovu ověří, zda tabulku mezitím nenahrál souběžný loader (cached + sloupce + ne-stale), a redundantní reload přeskočí. Bez toho druhý dotaz během studeného loadu velké tabulky spustil druhý plný reload (212M řádků = +2 h). +- [x] **Deklarativní inicializace (`tables=[TableSpec(...)]`)**: předem se deklaruje každá tabulka (`TableSpec(name, columns, indexes, refresh=Delta(...)|TTL(...), datetime_columns, preload)`). `preload=True` se na pozadí (nebo blokujícně) přednahraje při startu; co je v persistentní cache čerstvé, se přeskočí (warm restart = instant). Nedeklarovaná tabulka/sloupec (i `SELECT *` na sloupcově omezené tabulce) → `UndeclaredError` (fail-fast) místo tichého líného loadu; `columns=None` = celá tabulka + libovolný sloupec. Plně zpětně kompatibilní: bez `tables=` fungují staré `delta=/ttl=/indexes=/datetime_columns=` jako dřív (kombinace obojího → `ValueError`). ## TODO — budoucí funkce -- [ ] **Deklarativní inicializace (`tables=[TableSpec(...)]`)** — předem deklarovat sloupce/indexy/strategii každé tabulky, background preload, fail-fast na nedeklarované sloupce. Řeší líznou expanzi sloupců (druhý reload) a dělá z `execute()` čistě read-only. Plán pro 1.16.0. +- _(zatím žádné otevřené položky)_ --- diff --git a/pyproject.toml b/pyproject.toml index 598a056..b22713f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sqlmem" -version = "1.15.0" +version = "1.16.0" description = "" authors = [ {name = "jan.doubravsky@gmail.com"} diff --git a/src/sqlmem/__init__.py b/src/sqlmem/__init__.py index 54e8dd8..73538e7 100644 --- a/src/sqlmem/__init__.py +++ b/src/sqlmem/__init__.py @@ -7,7 +7,8 @@ from ._coerce import to_sqlite_datetime as datetime_to_epoch_us from .config import DEBUG from .delta import DeltaConfig from .engine import CachingEngine -from .exceptions import ReadOnlyError, UnsupportedQueryError +from .exceptions import ReadOnlyError, UndeclaredError, UnsupportedQueryError +from .spec import TTL, Delta, TableSpec from .stats import Stats, TableStats _DEFAULT_FORMAT = ( @@ -59,8 +60,12 @@ def add_sink(sink: Any, *, level: str | None = None, **kwargs: Any) -> None: __all__ = [ "CachingEngine", "DeltaConfig", + "Delta", + "TTL", + "TableSpec", "ReadOnlyError", "UnsupportedQueryError", + "UndeclaredError", "Stats", "TableStats", "add_sink", diff --git a/src/sqlmem/cache.py b/src/sqlmem/cache.py index d2b26c8..934389b 100644 --- a/src/sqlmem/cache.py +++ b/src/sqlmem/cache.py @@ -311,23 +311,26 @@ class CacheManager: return dict(self._last_run) def is_table_cached(self, table: str) -> bool: - row = self._conn.execute( - "SELECT 1 FROM _sqlmem_tables WHERE table_name = ?", (table,) - ).fetchone() + with self._lock: # the shared _conn must not be read while a writer uses it + row = self._conn.execute( + "SELECT 1 FROM _sqlmem_tables WHERE table_name = ?", (table,) + ).fetchone() return row is not None def is_table_full(self, table: str) -> bool: """True if the whole table (all columns) is cached — a SELECT * cache hit.""" - row = self._conn.execute( - "SELECT is_full FROM _sqlmem_tables WHERE table_name = ?", (table,) - ).fetchone() + with self._lock: + row = self._conn.execute( + "SELECT is_full FROM _sqlmem_tables WHERE table_name = ?", (table,) + ).fetchone() return bool(row and row[0]) def seconds_since_refresh(self, table: str) -> float | None: """Age of a cached table in seconds, or None if it is not cached.""" - row = self._conn.execute( - "SELECT last_refresh_at FROM _sqlmem_tables WHERE table_name = ?", (table,) - ).fetchone() + with self._lock: + row = self._conn.execute( + "SELECT last_refresh_at FROM _sqlmem_tables WHERE table_name = ?", (table,) + ).fetchone() if not row or not row[0]: return None last = datetime.fromisoformat(row[0]) @@ -576,7 +579,8 @@ class CacheManager: def get_table_columns(self, table: str) -> list[str]: """Authoritative ordered column list of a cached table (via PRAGMA).""" - rows = self._conn.execute(f"PRAGMA table_info({quote(table)})").fetchall() + with self._lock: + rows = self._conn.execute(f"PRAGMA table_info({quote(table)})").fetchall() return [r[1] for r in rows] def create_unique_index(self, table: str, key_columns: list[str]) -> None: @@ -590,9 +594,10 @@ class CacheManager: self._conn.commit() def get_last_synced_at(self, table: str) -> str | None: - row = self._conn.execute( - "SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,) - ).fetchone() + with self._lock: + row = self._conn.execute( + "SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,) + ).fetchone() # Stored in a TEXT column: an INTEGER-µs watermark (datetime_columns) comes # back as its digit string; delta._bind_watermark reconstructs the datetime. return row[0] if row else None @@ -610,9 +615,10 @@ class CacheManager: Returns an ``int`` for a datetime column stored as INTEGER µs, else the ISO ``TEXT`` string.""" - row = self._conn.execute( - f"SELECT MAX({quote(column)}) FROM {quote(table)}" - ).fetchone() + with self._lock: + row = self._conn.execute( + f"SELECT MAX({quote(column)}) FROM {quote(table)}" + ).fetchone() return row[0] if row else None def upsert_rows(self, table: str, columns: list[str], rows: list[tuple]) -> None: @@ -629,7 +635,8 @@ class CacheManager: self._conn.commit() def count_rows(self, table: str) -> int: - row = self._conn.execute(f"SELECT COUNT(*) FROM {quote(table)}").fetchone() + with self._lock: + row = self._conn.execute(f"SELECT COUNT(*) FROM {quote(table)}").fetchone() return int(row[0]) if row else 0 def db_size_bytes(self) -> int: diff --git a/src/sqlmem/engine.py b/src/sqlmem/engine.py index 5ded193..192b904 100644 --- a/src/sqlmem/engine.py +++ b/src/sqlmem/engine.py @@ -18,12 +18,50 @@ from .config import ( SQL_DIALECT, ) from .delta import DeltaConfig, DeltaRefresher, ResolvedDelta +from .exceptions import UndeclaredError from .executor import QueryExecutor -from .parser import Params, parse +from .parser import Params, ParsedQuery, parse from .registry import ColumnRegistry +from .spec import TTL, TableSpec from .stats import Stats, StatsCollector, TableState, TableStats +def _specs_to_config( + tables: list[TableSpec], +) -> tuple[ + dict[str, DeltaConfig], + dict[str, int], + dict[str, list[str | list[str]]], + dict[str, list[str]], + dict[str, list[str] | None], +]: + """Convert declarative ``TableSpec``s into the engine's internal config dicts. + + Returns ``(delta, ttl, indexes, datetime_columns, declared)`` — the first four + mirror the legacy kwargs; ``declared`` maps each table to its allowed columns + (``None`` = whole table / any column) for fail-fast query checking. + """ + delta: dict[str, DeltaConfig] = {} + ttl: dict[str, int] = {} + indexes: dict[str, list[str | list[str]]] = {} + datetime_columns: dict[str, list[str]] = {} + declared: dict[str, list[str] | None] = {} + for spec in tables: + if spec.name in declared: + raise ValueError(f"Duplicate TableSpec for table {spec.name!r}.") + declared[spec.name] = list(spec.columns) if spec.columns is not None else None + if spec.indexes: + indexes[spec.name] = list(spec.indexes) + if spec.datetime_columns: + datetime_columns[spec.name] = list(spec.datetime_columns) + refresh = spec.refresh + if isinstance(refresh, TTL): + ttl[spec.name] = refresh.seconds + elif isinstance(refresh, DeltaConfig): + delta[spec.name] = refresh + return delta, ttl, indexes, datetime_columns, declared + + class _LazySource: """A source connection opened on first ``execute`` and shared across one query. @@ -68,9 +106,25 @@ class CachingEngine: pragmas: dict[str, str | int] | None = None, datetime_columns: dict[str, list[str]] | None = None, return_datetime: bool = True, + tables: list[TableSpec] | None = None, blocking_startup_refresh: bool = False, ) -> None: self._source_engine = source_engine + + # Declarative mode: a list of TableSpecs is converted to the same internal + # config the legacy delta=/ttl=/indexes=/datetime_columns= kwargs produce, + # plus a declared-columns allowlist (for fail-fast) and preload set. + self._declared: dict[str, list[str] | None] | None = None + self._preload_specs: list[TableSpec] = [] + if tables is not None: + if any(x is not None for x in (delta, ttl, indexes, datetime_columns)): + raise ValueError( + "Pass either tables=[TableSpec(...)] or the legacy " + "delta=/ttl=/indexes=/datetime_columns= kwargs, not both." + ) + delta, ttl, indexes, datetime_columns, self._declared = _specs_to_config(tables) + self._preload_specs = [s for s in tables if s.preload] + use_memory = IN_MEMORY if in_memory is None else in_memory self._dialect = dialect if dialect is not None else SQL_DIALECT self._refresh_interval = ( @@ -101,12 +155,14 @@ class CachingEngine: "reload), not both." ) - if self._delta or self._ttl: - # The startup catch-up (deltas/TTL reloads for tables restored from - # disk) can take a while on a cold start. By default it runs on the - # background thread so it never blocks application startup; callers - # who need the cache fully fresh before serving can opt back in. + if self._delta or self._ttl or self._preload_specs: + # Startup work (preload of declared tables + delta/TTL catch-up for + # tables restored from disk) can take a while on a cold start. By + # default it runs on the background thread so it never blocks + # application startup; callers who need the cache fully warm before + # serving can opt back in. if blocking_startup_refresh: + self._preload() self._run_refresh() self._start_refresh_thread(initial_catch_up=not blocking_startup_refresh) @@ -199,22 +255,67 @@ class CachingEngine: ) return replace(table_stats, tracking=tracking, state=state, last_refresh=last_refresh) + def _make_executor(self, source: Any) -> QueryExecutor: + return QueryExecutor( + self._cache, + self._registry, + source, + self._stats, + self._delta, + self._ttl, + self._index_columns, + ) + + def _check_declared(self, parsed: ParsedQuery) -> None: + """In declarative mode, reject any table/column not declared up front.""" + if self._declared is None: + return + for table in parsed.tables: + if table not in self._declared: + raise UndeclaredError( + f"Table {table!r} is not declared in tables=[TableSpec(...)]. " + "Add a TableSpec for it (declarative mode is a strict allowlist)." + ) + allowed = self._declared[table] + if allowed is None: + continue # whole table declared — any column is fine + if table in parsed.wildcard_tables: + raise UndeclaredError( + f"SELECT * on {table!r} is not allowed: only columns {allowed} " + "are declared. List the columns explicitly or declare " + "columns=None for the whole table." + ) + unknown = [c for c in parsed.columns_by_table.get(table, []) if c not in allowed] + if unknown: + raise UndeclaredError( + f"Column(s) {unknown} of {table!r} are not declared " + f"(declared: {allowed})." + ) + def execute(self, sql: str, params: Params = None) -> list[dict]: parsed = parse(sql, params, dialect=self._dialect) + self._check_declared(parsed) # The source connection is opened lazily — a pure cache hit never touches # the source and never occupies a pool slot. source = _LazySource(self._source_engine) try: - executor = QueryExecutor( - self._cache, - self._registry, - source, - self._stats, - self._delta, - self._ttl, - self._index_columns, - ) - return executor.execute(parsed) + return self._make_executor(source).execute(parsed) + finally: + source.close() + + def _preload(self) -> None: + """Load declared ``preload=True`` tables into the cache (skipping fresh copies).""" + if not self._preload_specs: + return + source = _LazySource(self._source_engine) + try: + executor = self._make_executor(source) + for spec in self._preload_specs: + try: + logger.info(f"Preloading {spec.name!r}…") + executor.ensure_loaded(spec.name, spec.columns) + except Exception as e: + logger.error(f"Preload failed for {spec.name!r}: {e}") finally: source.close() @@ -250,6 +351,7 @@ class CachingEngine: def _start_refresh_thread(self, initial_catch_up: bool = True) -> None: def loop() -> None: if initial_catch_up: + self._preload() # off-main-thread declared-table preload self._run_refresh() # off-main-thread startup catch-up event = threading.Event() while not event.wait(self._refresh_interval): diff --git a/src/sqlmem/exceptions.py b/src/sqlmem/exceptions.py index 8a8bba4..d78f44f 100644 --- a/src/sqlmem/exceptions.py +++ b/src/sqlmem/exceptions.py @@ -4,3 +4,13 @@ class ReadOnlyError(Exception): class UnsupportedQueryError(Exception): """Raised when a query uses unsupported features (JOIN, SELECT *).""" + + +class UndeclaredError(Exception): + """Raised in declarative mode (``tables=[TableSpec(...)]``) when a query + references a table or column that was not declared up front. + + Fail-fast by design: an undeclared table/column would otherwise trigger a + silent (potentially multi-hour) lazy load/column-expansion, so it is surfaced + immediately instead. + """ diff --git a/src/sqlmem/executor.py b/src/sqlmem/executor.py index 872455c..4f6cd22 100644 --- a/src/sqlmem/executor.py +++ b/src/sqlmem/executor.py @@ -42,6 +42,20 @@ class QueryExecutor: self._ensure_table(table, parsed) return self._run_in_memory(parsed) + def ensure_loaded(self, table: str, columns: list[str] | None) -> None: + """Preload *table* into the cache without running a query. + + ``columns=None`` loads the whole table (``SELECT *`` semantics); otherwise + only the listed columns. Reuses the same load path as a real query — delta + key/change + index columns are augmented, the registry and watermark are + updated, and double-checked locking skips a copy already fresh in the + cache — but never materializes any rows (unlike :meth:`execute`). + """ + if columns is None: + self._ensure_full(table) + else: + self._ensure_columns(table, columns) + def _ensure_table(self, table: str, parsed: ParsedQuery) -> None: if table in parsed.wildcard_tables: self._ensure_full(table) diff --git a/src/sqlmem/spec.py b/src/sqlmem/spec.py new file mode 100644 index 0000000..d2d96e2 --- /dev/null +++ b/src/sqlmem/spec.py @@ -0,0 +1,49 @@ +"""Declarative table specs for ``CachingEngine(tables=[...])``. + +Instead of the lazy "learn columns from queries" mode, an application can declare +each table up front — its columns, indexes, refresh strategy and datetime columns — +so the engine preloads them and rejects anything undeclared (fail-fast) rather than +silently triggering an expensive lazy load. The legacy ``delta=/ttl=/indexes=`` +kwargs keep working; ``tables=`` is converted to the same internal config. +""" + +from dataclasses import dataclass, field + +from .delta import DeltaConfig + +# Friendly alias for the declarative API; ``Delta`` and ``DeltaConfig`` are the +# same type (``change_column`` + ``key_columns``), so either may be used as a +# ``TableSpec.refresh`` strategy. +Delta = DeltaConfig + + +@dataclass(frozen=True) +class TTL: + """Time-based refresh strategy: full-reload the table when older than *seconds*.""" + + seconds: int + + +@dataclass(frozen=True) +class TableSpec: + """Declarative specification of one cached table. + + *columns* lists the columns to cache; leave it ``None`` to cache the whole + table (``SELECT *`` semantics) and allow any column. When columns are listed, + a query asking for a column outside the list raises + :class:`~sqlmem.exceptions.UndeclaredError`. + + *refresh* is a :class:`Delta` (change-column incremental sync) or :class:`TTL` + (time-based full reload), or ``None`` for a static table loaded once. + + *preload=True* loads the table at startup (in the background by default) so the + first query is a cache hit instead of paying a cold load; a copy already fresh + in the persistent cache is skipped. + """ + + name: str + columns: list[str] | None = None + indexes: list[str | list[str]] = field(default_factory=list) + refresh: DeltaConfig | TTL | None = None + datetime_columns: list[str] = field(default_factory=list) + preload: bool = False diff --git a/tests/test_spec.py b/tests/test_spec.py new file mode 100644 index 0000000..694bb83 --- /dev/null +++ b/tests/test_spec.py @@ -0,0 +1,221 @@ +import sqlite3 +import time +from datetime import datetime, timezone + +import pytest +from sqlalchemy import create_engine + +from sqlmem import ( + TTL, + CachingEngine, + Delta, + DeltaConfig, + TableSpec, + UndeclaredError, +) + + +@pytest.fixture +def spec_source(tmp_path): + db = tmp_path / "src.db" + conn = sqlite3.connect(db) + conn.execute("CREATE TABLE products (id TEXT, name TEXT, price TEXT, changed TEXT)") + conn.executemany( + "INSERT INTO products VALUES (?, ?, ?, ?)", + [ + ("1", "Widget", "9.99", "2026-06-01T10:00:00"), + ("2", "Gadget", "19.99", "2026-06-02T10:00:00"), + ], + ) + conn.execute("CREATE TABLE orders (order_id TEXT, qty TEXT)") + conn.executemany("INSERT INTO orders VALUES (?, ?)", [("101", "2")]) + conn.commit() + conn.close() + se = create_engine(f"sqlite:///{db}") + yield se + se.dispose() + + +# --- back-compat / validation ----------------------------------------------- + + +def test_tables_and_legacy_kwargs_are_mutually_exclusive(spec_source): + with pytest.raises(ValueError): + CachingEngine( + spec_source, + tables=[TableSpec("products", ["id"])], + delta={"products": DeltaConfig("changed", ["id"])}, + ) + + +def test_duplicate_tablespec_raises(spec_source): + with pytest.raises(ValueError): + CachingEngine( + spec_source, + tables=[TableSpec("products", ["id"]), TableSpec("products", ["name"])], + ) + + +def test_delta_alias_is_deltaconfig(): + assert Delta is DeltaConfig + + +# --- preload ---------------------------------------------------------------- + + +def test_preload_blocking_caches_before_first_query(spec_source, tmp_path): + ce = CachingEngine( + spec_source, + cache_db_path=tmp_path / "c.db", + tables=[TableSpec("products", ["id", "name"], preload=True)], + blocking_startup_refresh=True, + ) + # Cached at construction time — no execute() needed. + assert ce._cache.is_table_cached("products") is True + rows = ce.execute("SELECT id, name FROM products") + assert {r["id"] for r in rows} == {"1", "2"} + ce.close() + + +def test_preload_non_blocking_eventually_caches(spec_source, tmp_path): + ce = CachingEngine( + spec_source, + cache_db_path=tmp_path / "c.db", + tables=[TableSpec("products", ["id", "name"], preload=True)], + ) + deadline = time.time() + 5 + while not ce._cache.is_table_cached("products") and time.time() < deadline: + time.sleep(0.05) + assert ce._cache.is_table_cached("products") is True + ce.close() + + +def test_non_preload_table_is_not_loaded_at_startup(spec_source, tmp_path): + ce = CachingEngine( + spec_source, + cache_db_path=tmp_path / "c.db", + tables=[TableSpec("products", ["id", "name"], preload=False)], + blocking_startup_refresh=True, + ) + assert ce._cache.is_table_cached("products") is False # loads lazily on first query + ce.execute("SELECT id, name FROM products") + assert ce._cache.is_table_cached("products") is True + ce.close() + + +# --- fail-fast on undeclared tables / columns ------------------------------- + + +def test_fail_fast_undeclared_table(spec_source): + ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])]) + with pytest.raises(UndeclaredError): + ce.execute("SELECT order_id FROM orders") + ce.close() + + +def test_fail_fast_undeclared_column(spec_source): + ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])]) + with pytest.raises(UndeclaredError): + ce.execute("SELECT price FROM products") + ce.close() + + +def test_declared_columns_query_succeeds(spec_source): + ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])]) + rows = ce.execute("SELECT id, name FROM products") + assert len(rows) == 2 + ce.close() + + +def test_columns_none_allows_wildcard_and_any_column(spec_source): + ce = CachingEngine(spec_source, tables=[TableSpec("products", columns=None)]) + assert len(ce.execute("SELECT * FROM products")) == 2 + assert len(ce.execute("SELECT price FROM products")) == 2 # any column allowed + ce.close() + + +def test_wildcard_on_column_restricted_table_fails(spec_source): + ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])]) + with pytest.raises(UndeclaredError): + ce.execute("SELECT * FROM products") + ce.close() + + +def test_lazy_mode_has_no_fail_fast(spec_source): + """Without tables=, undeclared columns/tables load lazily as before.""" + ce = CachingEngine(spec_source) + rows = ce.execute("SELECT order_id FROM orders") + assert len(rows) == 1 + ce.close() + + +# --- refresh strategies via TableSpec --------------------------------------- + + +def test_tablespec_delta_tracking(spec_source, tmp_path): + ce = CachingEngine( + spec_source, + cache_db_path=tmp_path / "c.db", + tables=[ + TableSpec( + "products", + ["id", "name", "changed"], + refresh=Delta(change_column="changed", key_columns=["id"]), + preload=True, + ) + ], + blocking_startup_refresh=True, + ) + assert ce.stats.tables["products"].tracking == "delta" + ce.close() + + +def test_tablespec_ttl_tracking(spec_source, tmp_path): + ce = CachingEngine( + spec_source, + cache_db_path=tmp_path / "c.db", + tables=[TableSpec("products", ["id", "name"], refresh=TTL(seconds=1800), preload=True)], + blocking_startup_refresh=True, + ) + assert ce.stats.tables["products"].tracking == "ttl" + ce.close() + + +def test_tablespec_datetime_columns_roundtrip(spec_source, tmp_path): + ce = CachingEngine( + spec_source, + cache_db_path=tmp_path / "c.db", + tables=[ + TableSpec("products", ["id", "changed"], datetime_columns=["changed"], preload=True) + ], + blocking_startup_refresh=True, + ) + rows = ce.execute("SELECT id, changed FROM products WHERE changed > ?", ("2026-06-01T12:00:00",)) + assert [r["id"] for r in rows] == ["2"] # param coercion via datetime_columns + assert rows[0]["changed"] == datetime(2026, 6, 2, 10, 0, 0, tzinfo=timezone.utc) + ce.close() + + +# --- warm restart: preload skips a copy already fresh on disk --------------- + + +def test_warm_restart_preload_skips_reload(spec_source, tmp_path): + path = tmp_path / "c.db" + + def make() -> CachingEngine: + return CachingEngine( + spec_source, + cache_db_path=path, + in_memory=False, + tables=[TableSpec("products", ["id", "name"], preload=True)], + blocking_startup_refresh=True, + ) + + ce1 = make() + assert ce1.stats.misses >= 1 # cold preload had to load from source + ce1.close() + + ce2 = make() + assert ce2._cache.is_table_cached("products") is True + assert ce2.stats.misses == 0 # warm: preload was a cache hit, no redundant reload + ce2.close()