3 Commits

16 changed files with 1018 additions and 60 deletions
+48
View File
@@ -6,6 +6,54 @@ All notable changes to this project will be documented in this file.
---
## [1.16.0] - 2026-06-11
### Added
- **Declarative table specs — `CachingEngine(tables=[TableSpec(...)])`** — declare each cached table up front (columns, indexes, refresh strategy, datetime columns, preload) instead of letting the engine learn columns lazily from queries. New public types `TableSpec`, `TTL`, `Delta` (a friendly alias of `DeltaConfig`) and exception `UndeclaredError`.
- **Background preload** — `preload=True` tables are loaded at startup (on the background thread by default, so startup isn't blocked; `blocking_startup_refresh=True` loads them synchronously). A copy already fresh in the persistent cache is skipped via the same double-checked locking added in 1.15.0, so a warm restart is instant.
- **Fail-fast on undeclared access** — in declarative mode a query referencing a table that has no `TableSpec`, or a column outside a spec's declared `columns` (including `SELECT *` on a column-restricted table), raises `UndeclaredError` instead of silently triggering an expensive lazy load / column-expansion. Declare `columns=None` to cache the whole table and allow any column.
- **Solves the lazy second-reload** — because columns are declared, a first query for a previously unseen column no longer forces a full re-fetch.
- `executor.ensure_loaded(table, columns)` — preloads a table into the cache (reusing the full load path: delta/index augmentation, registry, watermark, double-checked locking) without materializing any rows.
### Fixed
- **Race on the shared cache connection** — the metadata reads (`is_table_cached`, `is_table_full`, `seconds_since_refresh`, `get_table_columns`, `get_last_synced_at`, `max_value`, `count_rows`) touched the single shared SQLite connection without the connection lock, so a query thread reading while the background refresh/preload thread wrote could raise `sqlite3.InterfaceError`. These reads now take the lock. More likely to surface now that startup preload adds background-thread activity.
### Changed
- `pyproject.toml` — bumped version to `1.16.0`.
- **Fully backward compatible** — omit `tables=` and the legacy `delta=`/`ttl=`/`indexes=`/`datetime_columns=` kwargs behave exactly as before (lazy mode, no fail-fast). Passing both `tables=` and any of those kwargs raises `ValueError`; `tables=` is internally converted to the same config.
---
## [1.15.0] - 2026-06-11
### Fixed
- **Cache stampede (thundering herd) on cold loads** — the decision to load a table was made *before* the load lock was taken, and `load_table` never re-checked after acquiring it. During a slow cold load of a large table (observed: 212M rows, ~2 h), a second query for the same table passed the pre-lock "not cached" check, queued on the load lock, and then ran a **redundant second full reload** instead of seeing the first had finished — doubling a multi-hour load. `load_table` now does **double-checked locking**: after acquiring the load lock it re-evaluates a caller-supplied predicate (table cached, all needed columns present, not TTL-expired) and skips the load when it is already satisfied. Invisible on small tables; on large ones it removes hours of redundant indexing under concurrent cold-start traffic.
### Changed
- `pyproject.toml` — bumped version to `1.15.0`.
- `CacheManager.load_table` gained an optional `recheck` callback (the double-check predicate); `QueryExecutor` supplies it for both column and `SELECT *` loads.
---
## [1.14.0] - 2026-06-10
Follow-up to 1.12.0 from running `datetime_columns` in production: the feature was only half-wired (writes were coerced, reads and query params were not).
### Fixed
- **`WHERE` on an INTEGER-µs `datetime_columns` column silently returned 0 rows** — `execute_in_memory()` coerced query params with `to_sqlite()`, which leaves an ISO string a string. Comparing the stored `INTEGER` against a `TEXT` param is always false under SQLite affinity, so `WHERE CHANGE_DATE > '2026-05-01T…'` matched nothing. Params for a query that touches a `datetime_columns` table are now coerced to epoch µs (datetime objects and ISO-datetime strings alike), so the comparison matches the stored integer. Scoped to the query's tables, so non-datetime queries are unaffected.
### Added
- **Read-time coercion — `datetime_columns` come back as `datetime`** — `execute()` now returns those columns as real `datetime` objects (UTC) instead of the raw INTEGER µs, restoring the transparent-proxy contract (you get the same type a direct source query would give). Opt out with `CachingEngine(..., return_datetime=False)` to get the raw integers.
- **`Stats.db_size_bytes`** — on-disk size of the cache file (0 in memory mode), so `engine.stats` exposes cache growth for monitoring without an external file check.
- **Public `datetime_to_epoch_us` helper** — `from sqlmem import datetime_to_epoch_us` exposes the same datetime→epoch-µs conversion used internally, so callers building `WHERE change_col > ?` params don't have to re-implement it.
### Changed
- `pyproject.toml` — bumped version to `1.14.0`.
- **`vacuum(incremental=True)` now warns instead of silently doing nothing** when the cache was not created with `auto_vacuum=INCREMENTAL` (the only mode in which incremental vacuum can reclaim pages); it logs how to fix it (`hard_reset()` with the pragma, or a full `vacuum(incremental=False)`) and returns.
- `CacheManager.execute_in_memory()` gained an optional `tables` argument (the query's tables) used to scope datetime param/result coercion; `CacheManager`/`CachingEngine` gained a `return_datetime` flag.
---
## [1.12.0] - 2026-06-09
### ⚠️ Breaking
+50 -5
View File
@@ -238,6 +238,41 @@ Each value is a list of index definitions: a string is a single-column index, a
- Indexes are **recreated after every (re)load** — full loads, TTL reloads, and `invalidate()` + re-fetch all rebuild them — so they're always present, and they persist in `cache.db` across restarts.
- Delta-tracked tables already get a unique index on their key columns; secondary indexes are independent and can be combined with `delta` or `ttl`.
## Declarative initialization (`tables=`)
Instead of the lazy "learn columns from queries" mode, you can **declare every table up front** with `tables=[TableSpec(...)]` — its columns, indexes, refresh strategy and which columns are datetimes — and have the engine preload them and reject anything undeclared:
```python
from sqlmem import CachingEngine, TableSpec, Delta, TTL
engine = CachingEngine(
base_engine,
tables=[
TableSpec(
name="VW_P_PRATVALUES",
columns=["PRODUCT_PRODUCTNR", "PRAT_NAME", "PRATVALUE", "CHANGE_DATE"],
indexes=["PRODUCT_PRODUCTNR", "PRAT_NAME", "CHANGE_DATE"],
refresh=Delta(change_column="CHANGE_DATE", key_columns=["PRATVALUE_ID"]),
datetime_columns=["CHANGE_DATE"],
preload=True,
),
TableSpec(
name="VW_PRODUCTS_ASSIGNED_E",
columns=["PRODUCT_PRODUCTNR", "ELEMENT_NAME", "ELEMENT_ID"],
indexes=["PRODUCT_PRODUCTNR"],
refresh=TTL(seconds=1800),
preload=True,
),
],
pragmas={"mmap_size": 32 * 1024**3, "page_size": 8192},
)
```
- **Preload** — `preload=True` tables are loaded at startup (on the background thread by default, so startup isn't blocked; pass `blocking_startup_refresh=True` to load them synchronously before serving). A copy already fresh in the persistent cache is **skipped**, so a warm restart is instant. During warm-up a table reports `TableState.LOADING` in [`stats`](#runtime-statistics) — handy for gating a `503` until it's `ready`.
- **Fail-fast** — a query for a table without a `TableSpec`, or for a column outside a spec's declared `columns` (including `SELECT *` on a column-restricted table), raises `UndeclaredError` instead of silently kicking off an expensive lazy load. Use `columns=None` to cache the whole table and allow any column.
- `refresh=` takes a `Delta(change_column=…, key_columns=…)` (same as `DeltaConfig`) or `TTL(seconds=…)`, or `None` for a static table.
- **Backward compatible** — omit `tables=` and the legacy `delta=`/`ttl=`/`indexes=`/`datetime_columns=` kwargs work exactly as before (lazy mode, no fail-fast). Passing both raises `ValueError`.
## Persistence
By default the cache lives in an **in-memory SQLite** and is persisted to `cache.db` on disk:
@@ -297,10 +332,17 @@ engine = CachingEngine(
```
- **Opt-in per column.** Only the columns you name change; everything else keeps the default lossless `TEXT` storage.
- ⚠️ **It changes the output contract for those columns**`execute()` returns them as `int` (µs since epoch), not ISO strings, and a `WHERE` on them must compare against integer µs. Don't list a column your callers read as a string.
- **Transparent in and out.** A `WHERE` on such a column accepts a `datetime` or an ISO string — the param is coerced to integer µs so the comparison matches — and `execute()` returns the column as a real `datetime` (UTC), the same type a direct source query would give. Pass `return_datetime=False` to get the raw integers instead.
- The delta watermark is handled transparently: it is persisted as the integer and bound back to a real `datetime` for the source query, so incremental refresh keeps working.
- ⚠️ This is a **breaking on-disk change** (`SCHEMA_VERSION` 4): an existing cache is wiped and reloaded on first start after enabling it — schedule a maintenance window for a large reload.
To build a `WHERE` param yourself (e.g. an HTTP `?since=` filter) without re-implementing the conversion, use the exported helper:
```python
from sqlmem import datetime_to_epoch_us
rows = engine.execute("SELECT * FROM events WHERE changed > ?", (datetime_to_epoch_us(since),))
```
## Manual cache control
```python
@@ -316,20 +358,20 @@ Use `reset()` after a **structural change** in the source (columns added/removed
`hard_reset()` goes further than `reset()` in disk mode: it closes every connection, deletes `cache.db` (and its `-wal`/`-shm` sidecars) and reopens from scratch — the only way to change a baked-in `page_size`/`auto_vacuum`. In memory mode it falls back to `reset()`.
`vacuum()` reclaims free pages left behind by delta `INSERT OR REPLACE` churn. Incremental (the default) is cheap and non-blocking but needs `auto_vacuum=INCREMENTAL`; `vacuum(incremental=False)` runs a full VACUUM that rewrites the file (~2× disk, blocks readers) — schedule it in a maintenance window. Both are no-ops in memory mode.
`vacuum()` reclaims free pages left behind by delta `INSERT OR REPLACE` churn. Incremental (the default) is cheap and non-blocking but needs `auto_vacuum=INCREMENTAL` (set it via `pragmas=` on a fresh cache); if the cache wasn't created that way, `vacuum(incremental=True)` logs a warning and does nothing rather than silently no-op'ing. `vacuum(incremental=False)` runs a full VACUUM that rewrites the file (~2× disk, blocks readers) — schedule it in a maintenance window. Both are no-ops in memory mode.
## Runtime statistics
```python
stats = engine.stats # Stats snapshot
print(stats.hits, stats.misses, stats.refetches, stats.errors)
print(stats.hits, stats.misses, stats.refetches, stats.errors, stats.db_size_bytes)
for name, t in stats.tables.items():
print(name, t.rows, t.state, t.tracking, t.last_upsert, t.last_refresh)
if t.consecutive_failures:
print(f" {name} failing ×{t.consecutive_failures}: {t.last_error} ({t.last_error_at})")
```
`Stats.errors` is the total number of load/refresh failures since start. Each `TableStats` also carries `last_error`, `last_error_at` and `consecutive_failures` (reset to 0 on the next success) — so a delta that fails *before* streaming (which otherwise leaves `state` looking `ready`) is still visible, and the table is marked `error`.
`Stats.db_size_bytes` is the on-disk cache file size (0 in memory mode) — handy for monitoring cache growth. `Stats.errors` is the total number of load/refresh failures since start. Each `TableStats` also carries `last_error`, `last_error_at` and `consecutive_failures` (reset to 0 on the next success) — so a delta that fails *before* streaming (which otherwise leaves `state` looking `ready`) is still visible, and the table is marked `error`.
Two timestamps distinguish *data freshness* from *liveness*:
@@ -363,6 +405,7 @@ By default the cache is **in-memory SQLite**, so a cached table lives in RAM —
- **Use [disk-backed mode](#disk-backed-cache-no-ram-copy)** (`in_memory=False`) when the working set simply doesn't fit in RAM — queries then run against `cache.db` on disk instead of a memory copy.
- **Loads are streamed in batches** (`SQLMEM_FETCH_BATCH` rows at a time, default 10 000) into a staging table and swapped in atomically. A multi-million-row table never gets fully materialized in Python at once, so the load doesn't spike memory or crash the process, and readers keep seeing the previous copy until the swap completes.
- Use **[delta refresh](#incremental-delta-refresh)** for large tables that have a change column — after the first load only changed rows are pulled, so restarts and refreshes don't re-read the whole table.
- **Concurrent queries during a cold load are deduplicated** — while one query is loading a large table, others for the same table wait and then read the freshly loaded cache rather than kicking off their own redundant reload (double-checked locking), so a slow cold start isn't multiplied by concurrent traffic.
- A **single query that returns a huge result set** (e.g. `SELECT *` over a multi-million-row cached table) still materializes that result as a list of dicts; bound it with a `WHERE`/`LIMIT` rather than selecting everything.
## Configuration
@@ -392,6 +435,7 @@ engine = CachingEngine(
dialect="tsql", # SQLMEM_SQL_DIALECT
pragmas={"mmap_size": 32 * 1024**3, "page_size": 8192}, # disk-mode SQLite tuning
datetime_columns={"orders": ["created_at"]}, # store these as INTEGER µs (opt-in)
return_datetime=True, # return datetime_columns as datetime (vs raw µs int)
blocking_startup_refresh=False, # block startup until caught up? (default: no)
)
```
@@ -404,9 +448,10 @@ By default the **startup catch-up** (delta pulls and TTL reloads for tables rest
|---|---|
| `ReadOnlyError` | INSERT, UPDATE, or DELETE statement |
| `UnsupportedQueryError` | non-SELECT statement, `SELECT` without `FROM`, or an unqualified column in a multi-table query |
| `UndeclaredError` | in [declarative mode](#declarative-initialization-tables) (`tables=`): a query references a table or column that was not declared |
```python
from sqlmem import ReadOnlyError, UnsupportedQueryError
from sqlmem import ReadOnlyError, UnsupportedQueryError, UndeclaredError
```
## Logging
+8 -1
View File
@@ -218,7 +218,14 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o
- [x] **Ladění SQLite vrstvy (`pragmas=`)**: `CachingEngine(..., pragmas={...})` aplikuje libovolné PRAGMA na cache spojení (`mmap_size`, `cache_size`, `temp_store`, `page_size`, `auto_vacuum`). `page_size` a `auto_vacuum` jsou layout-pragmata — platí jen na čerstvém souboru (page_size na existující cache se ignoruje s warningem). Neznámá pragmata SQLite tiše ignoruje.
- [x] **`hard_reset()`**: smaže on-disk soubor (+ WAL/SHM) a otevře nový s aktuálními pragmaty — na rozdíl od `reset()` umožní změnit `page_size`/`auto_vacuum`. Jen disk mód (v memory módu fallback na `reset()`).
- [x] **`vacuum(incremental=, pages=)`**: údržbový VACUUM cache souboru — inkrementální (uvolní volné stránky po delta `INSERT OR REPLACE`, vyžaduje `auto_vacuum=INCREMENTAL`) nebo plný (přepíše soubor, jen v maintenance okně). V memory módu no-op.
- [x] **Nativní INTEGER ukládání datetime sloupců (`datetime_columns=`)**: `datetime_columns={"VW_X": ["CHANGE_DATE"]}` — vyjmenované datetime sloupce se ukládají jako INTEGER µs-od-epochy místo ~28 B ISO TEXT (úspora místa + nativní celočíselné porovnání indexu). Opt-in per sloupec → mění výstupní kontrakt jen u zvolených sloupců (vrací int, ne ISO string). Breaking: `SCHEMA_VERSION` 3→4, cache se při upgrade smaže a načte znovu. Watermark se persistuje jako int a `_bind_watermark(epoch_us=True)` ho rekonstruuje zpět na `datetime` pro zdroj.
- [x] **Nativní INTEGER ukládání datetime sloupců (`datetime_columns=`)**: `datetime_columns={"VW_X": ["CHANGE_DATE"]}` — vyjmenované datetime sloupce se ukládají jako INTEGER µs-od-epochy místo ~28 B ISO TEXT (úspora místa + nativní celočíselné porovnání indexu). Opt-in per sloupec. Breaking: `SCHEMA_VERSION` 3→4, cache se při upgrade smaže a načte znovu. Watermark se persistuje jako int a `_bind_watermark(epoch_us=True)` ho rekonstruuje zpět na `datetime` pro zdroj.
- **Param coercion**: `WHERE col > ?` s ISO/`datetime` parametrem se zkoercuje na epoch µs (scoped na tabulky dotazu), takže porovnání INTEGER sloupce sedí (dřív vracelo 0 řádků).
- **Read-time coercion**: čtení vrací `datetime` objekt místo raw int (transparentní proxy); opt-out `CachingEngine(..., return_datetime=False)`.
- Veřejný helper `from sqlmem import datetime_to_epoch_us` pro konstrukci parametrů bez duplicitní logiky.
- [x] **`vacuum(incremental=True)` varuje bez `auto_vacuum=INCREMENTAL`**: dřív tichý no-op; teď zaloguje warning (a jak to opravit) a vrátí se.
- [x] **`Stats.db_size_bytes`**: velikost cache souboru na disku (0 v memory módu) ve `stats` pro monitoring.
- [x] **Ochrana proti cache stampede**: `load_table` dělá double-checked locking — po získání `_load_lock` znovu ověří, zda tabulku mezitím nenahrál souběžný loader (cached + sloupce + ne-stale), a redundantní reload přeskočí. Bez toho druhý dotaz během studeného loadu velké tabulky spustil druhý plný reload (212M řádků = +2 h).
- [x] **Deklarativní inicializace (`tables=[TableSpec(...)]`)**: předem se deklaruje každá tabulka (`TableSpec(name, columns, indexes, refresh=Delta(...)|TTL(...), datetime_columns, preload)`). `preload=True` se na pozadí (nebo blokujícně) přednahraje při startu; co je v persistentní cache čerstvé, se přeskočí (warm restart = instant). Nedeklarovaná tabulka/sloupec (i `SELECT *` na sloupcově omezené tabulce) → `UndeclaredError` (fail-fast) místo tichého líného loadu; `columns=None` = celá tabulka + libovolný sloupec. Plně zpětně kompatibilní: bez `tables=` fungují staré `delta=/ttl=/indexes=/datetime_columns=` jako dřív (kombinace obojího → `ValueError`).
## TODO — budoucí funkce
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "sqlmem"
version = "1.12.0"
version = "1.16.0"
description = ""
authors = [
{name = "jan.doubravsky@gmail.com"}
+8 -1
View File
@@ -3,10 +3,12 @@ from typing import Any
from loguru import logger
from ._coerce import to_sqlite_datetime as datetime_to_epoch_us
from .config import DEBUG
from .delta import DeltaConfig
from .engine import CachingEngine
from .exceptions import ReadOnlyError, UnsupportedQueryError
from .exceptions import ReadOnlyError, UndeclaredError, UnsupportedQueryError
from .spec import TTL, Delta, TableSpec
from .stats import Stats, TableStats
_DEFAULT_FORMAT = (
@@ -58,9 +60,14 @@ def add_sink(sink: Any, *, level: str | None = None, **kwargs: Any) -> None:
__all__ = [
"CachingEngine",
"DeltaConfig",
"Delta",
"TTL",
"TableSpec",
"ReadOnlyError",
"UnsupportedQueryError",
"UndeclaredError",
"Stats",
"TableStats",
"add_sink",
"datetime_to_epoch_us",
]
+57 -3
View File
@@ -10,6 +10,7 @@ left untouched.
import datetime
import decimal
import re
import uuid
from typing import Any
@@ -17,6 +18,10 @@ Params = tuple | list | dict | None
_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
# A string that *starts* with an ISO date+time (``2026-05-01T00:00:00`` or
# space-separated). Used to spot a datetime passed as a string in a query param.
_ISO_DATETIME_RE = re.compile(r"^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}")
def to_sqlite(value: Any) -> Any:
if isinstance(value, decimal.Decimal):
@@ -53,13 +58,62 @@ def to_sqlite_datetime(value: Any) -> int | None:
return None
def from_sqlite_datetime(value: Any) -> Any:
"""Inverse of :func:`to_sqlite_datetime`: INTEGER µs-since-epoch → UTC datetime.
Non-integers (a ``NULL`` value, or a column that isn't datetime-typed) pass
through unchanged.
"""
if isinstance(value, bool) or not isinstance(value, int):
return value
return _EPOCH + datetime.timedelta(microseconds=value)
def coerce_row(row: tuple) -> tuple:
return tuple(to_sqlite(v) for v in row)
def coerce_params(params: Params) -> tuple | dict | None:
def _coerce_param(value: Any, dt_table: bool) -> Any:
"""Coerce a single query parameter.
When the query touches a table that stores datetime columns as INTEGER µs
(*dt_table*), a datetime object or an ISO-datetime string is converted to
epoch µs so a ``WHERE`` comparison matches the stored INTEGER instead of
comparing INTEGER against TEXT (which SQLite affinity makes always false).
Otherwise the default stringifying coercion applies, unchanged.
"""
if dt_table and (
isinstance(value, datetime.datetime)
or (isinstance(value, str) and _ISO_DATETIME_RE.match(value))
):
result = to_sqlite_datetime(value)
if result is not None:
return result
return to_sqlite(value)
def coerce_params(params: Params, dt_table: bool = False) -> tuple | dict | None:
if params is None:
return None
if isinstance(params, dict):
return {key: to_sqlite(val) for key, val in params.items()}
return tuple(to_sqlite(val) for val in params)
return {key: _coerce_param(val, dt_table) for key, val in params.items()}
return tuple(_coerce_param(val, dt_table) for val in params)
def reverse_coerce_rows(
rows: list[tuple], col_names: list[str], dt_cols: set[str]
) -> list[tuple]:
"""Turn INTEGER µs back into ``datetime`` for result columns in *dt_cols*.
A no-op when no result column is a datetime column, so non-datetime queries
pay nothing.
"""
if not dt_cols:
return rows
dt_idx = {i for i, c in enumerate(col_names) if c in dt_cols}
if not dt_idx:
return rows
return [
tuple(from_sqlite_datetime(v) if i in dt_idx else v for i, v in enumerate(row))
for row in rows
]
+97 -25
View File
@@ -2,6 +2,7 @@ import atexit
import signal
import sqlite3
import threading
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
@@ -9,7 +10,13 @@ from pathlib import Path
from loguru import logger
import sqlmem._meta as _meta
from ._coerce import coerce_params, coerce_row, to_sqlite, to_sqlite_datetime
from ._coerce import (
coerce_params,
coerce_row,
reverse_coerce_rows,
to_sqlite,
to_sqlite_datetime,
)
from ._sql import quote, quote_list, quote_source
from .config import FETCH_BATCH_SIZE, SQL_DIALECT
from .stats import TableState
@@ -42,6 +49,7 @@ class CacheManager:
fetch_batch: int = FETCH_BATCH_SIZE,
pragmas: dict[str, str | int] | None = None,
datetime_columns: dict[str, list[str]] | None = None,
return_datetime: bool = True,
) -> None:
self._db_path = db_path
self._backup_interval = backup_interval
@@ -51,6 +59,7 @@ class CacheManager:
self._pragmas = dict(pragmas or {}) # extra read/layout PRAGMAs (disk mode)
# table → columns stored as INTEGER µs-since-epoch instead of ISO TEXT
self._datetime_columns = {t: list(c) for t, c in (datetime_columns or {}).items()}
self._return_datetime = return_datetime # reverse-coerce reads back to datetime
self._lock = threading.Lock() # serializes connection access
self._load_lock = threading.Lock() # serializes full table loads
self._states: dict[str, str] = {} # table → live processing state
@@ -302,23 +311,26 @@ class CacheManager:
return dict(self._last_run)
def is_table_cached(self, table: str) -> bool:
row = self._conn.execute(
"SELECT 1 FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
with self._lock: # the shared _conn must not be read while a writer uses it
row = self._conn.execute(
"SELECT 1 FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
return row is not None
def is_table_full(self, table: str) -> bool:
"""True if the whole table (all columns) is cached — a SELECT * cache hit."""
row = self._conn.execute(
"SELECT is_full FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
with self._lock:
row = self._conn.execute(
"SELECT is_full FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
return bool(row and row[0])
def seconds_since_refresh(self, table: str) -> float | None:
"""Age of a cached table in seconds, or None if it is not cached."""
row = self._conn.execute(
"SELECT last_refresh_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
with self._lock:
row = self._conn.execute(
"SELECT last_refresh_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
if not row or not row[0]:
return None
last = datetime.fromisoformat(row[0])
@@ -417,6 +429,7 @@ class CacheManager:
columns: list[str],
source_conn: sqlite3.Connection,
full: bool = False,
recheck: Callable[[], bool] | None = None,
) -> None:
"""Stream the source table into the cache in batches.
@@ -425,6 +438,13 @@ class CacheManager:
``fetchall`` of a huge table) and readers keep seeing the previous copy
until the swap. Concurrent loads are serialized by ``_load_lock``; the
connection lock is only held for the brief per-batch inserts and the swap.
*recheck* implements double-checked locking against a cache stampede: the
decision to load is made by the caller *before* ``_load_lock`` is held, so
on a slow cold load a second request for the same table can queue behind
the lock and then redundantly reload it. If given, ``recheck()`` is
re-evaluated *after* the lock is acquired; when it returns ``True`` the
table is already loaded and fresh, so the load is skipped.
"""
src_cols = ", ".join(quote_source(c, self._dialect) for c in columns)
dt_cols = set(self._datetime_columns.get(table, ()))
@@ -438,6 +458,12 @@ class CacheManager:
q_table = quote(table)
with self._load_lock:
if recheck is not None and recheck():
logger.info(
f"Skipping load of {table!r}: a concurrent loader already "
"satisfied it (double-checked lock)."
)
return
self.set_state(table, TableState.LOADING)
logger.info(f"Fetching {table!r} columns {columns} from source DB (batch={self._fetch_batch})")
try:
@@ -498,16 +524,38 @@ class CacheManager:
self._read_conns.append(conn)
return conn
def _query_datetime_cols(self, tables: list[str] | None) -> set[str]:
"""Datetime columns (stored as INTEGER µs) belonging to *tables*.
Empty when no table is known/configured, so a query that touches no
datetime column pays nothing and behaves exactly as before.
"""
if not self._datetime_columns or not tables:
return set()
cols: set[str] = set()
for table in tables:
cols.update(self._datetime_columns.get(table, ()))
return cols
def execute_in_memory(
self, sql: str, params: tuple | list | dict | None = None
self,
sql: str,
params: tuple | list | dict | None = None,
tables: list[str] | None = None,
) -> tuple[list[str], list[tuple]]:
"""Run a read query against the cache.
In-memory mode serializes with writers on the single connection. Disk mode
reads from a per-thread WAL connection, so reads run concurrently with
writers and each other (see :meth:`_read_conn`).
When *tables* names a table with ``datetime_columns``, ISO/datetime query
params are coerced to epoch µs so a ``WHERE`` matches the stored INTEGER,
and (unless ``return_datetime=False``) those columns are returned as real
:class:`~datetime.datetime` objects rather than raw integers.
"""
bound = coerce_params(params)
dt_cols = self._query_datetime_cols(tables)
bound = coerce_params(params, dt_table=bool(dt_cols))
if self._in_memory:
with self._lock:
cursor = (
@@ -517,19 +565,22 @@ class CacheManager:
)
col_names = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
return col_names, rows
else:
conn = self._read_conn()
cursor = conn.execute(sql) if bound is None else conn.execute(sql, bound)
col_names = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
conn = self._read_conn()
cursor = conn.execute(sql) if bound is None else conn.execute(sql, bound)
col_names = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
if self._return_datetime and dt_cols:
rows = reverse_coerce_rows(rows, col_names, dt_cols)
return col_names, rows
# --- delta refresh support ---------------------------------------------
def get_table_columns(self, table: str) -> list[str]:
"""Authoritative ordered column list of a cached table (via PRAGMA)."""
rows = self._conn.execute(f"PRAGMA table_info({quote(table)})").fetchall()
with self._lock:
rows = self._conn.execute(f"PRAGMA table_info({quote(table)})").fetchall()
return [r[1] for r in rows]
def create_unique_index(self, table: str, key_columns: list[str]) -> None:
@@ -543,9 +594,10 @@ class CacheManager:
self._conn.commit()
def get_last_synced_at(self, table: str) -> str | None:
row = self._conn.execute(
"SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
with self._lock:
row = self._conn.execute(
"SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
# Stored in a TEXT column: an INTEGER-µs watermark (datetime_columns) comes
# back as its digit string; delta._bind_watermark reconstructs the datetime.
return row[0] if row else None
@@ -563,9 +615,10 @@ class CacheManager:
Returns an ``int`` for a datetime column stored as INTEGER µs, else the
ISO ``TEXT`` string."""
row = self._conn.execute(
f"SELECT MAX({quote(column)}) FROM {quote(table)}"
).fetchone()
with self._lock:
row = self._conn.execute(
f"SELECT MAX({quote(column)}) FROM {quote(table)}"
).fetchone()
return row[0] if row else None
def upsert_rows(self, table: str, columns: list[str], rows: list[tuple]) -> None:
@@ -582,9 +635,19 @@ class CacheManager:
self._conn.commit()
def count_rows(self, table: str) -> int:
row = self._conn.execute(f"SELECT COUNT(*) FROM {quote(table)}").fetchone()
with self._lock:
row = self._conn.execute(f"SELECT COUNT(*) FROM {quote(table)}").fetchone()
return int(row[0]) if row else 0
def db_size_bytes(self) -> int:
"""On-disk size of the cache file in bytes (0 in memory mode / if absent)."""
if self._in_memory:
return 0
try:
return self._db_path.stat().st_size
except OSError:
return 0
def reset(self) -> None:
"""Wipe the entire cache — every cached table plus the on-disk data
(the file is deleted in memory mode, VACUUMed in place in disk mode)."""
@@ -676,6 +739,15 @@ class CacheManager:
logger.debug("vacuum() called in memory mode — no-op.")
return
if incremental:
av = self._conn.execute("PRAGMA auto_vacuum").fetchone()[0]
if av != 2: # 0 = NONE, 1 = FULL, 2 = INCREMENTAL
logger.warning(
f"vacuum(incremental=True) called but auto_vacuum={av} (not "
"INCREMENTAL) — no pages will be reclaimed. Rebuild the cache "
"with pragmas={'auto_vacuum': 'INCREMENTAL'} via hard_reset(), "
"or run vacuum(incremental=False) for a full VACUUM."
)
return
with self._lock:
self._conn.execute(f"PRAGMA incremental_vacuum({pages})")
self._conn.commit()
+125 -17
View File
@@ -18,12 +18,50 @@ from .config import (
SQL_DIALECT,
)
from .delta import DeltaConfig, DeltaRefresher, ResolvedDelta
from .exceptions import UndeclaredError
from .executor import QueryExecutor
from .parser import Params, parse
from .parser import Params, ParsedQuery, parse
from .registry import ColumnRegistry
from .spec import TTL, TableSpec
from .stats import Stats, StatsCollector, TableState, TableStats
def _specs_to_config(
tables: list[TableSpec],
) -> tuple[
dict[str, DeltaConfig],
dict[str, int],
dict[str, list[str | list[str]]],
dict[str, list[str]],
dict[str, list[str] | None],
]:
"""Convert declarative ``TableSpec``s into the engine's internal config dicts.
Returns ``(delta, ttl, indexes, datetime_columns, declared)`` — the first four
mirror the legacy kwargs; ``declared`` maps each table to its allowed columns
(``None`` = whole table / any column) for fail-fast query checking.
"""
delta: dict[str, DeltaConfig] = {}
ttl: dict[str, int] = {}
indexes: dict[str, list[str | list[str]]] = {}
datetime_columns: dict[str, list[str]] = {}
declared: dict[str, list[str] | None] = {}
for spec in tables:
if spec.name in declared:
raise ValueError(f"Duplicate TableSpec for table {spec.name!r}.")
declared[spec.name] = list(spec.columns) if spec.columns is not None else None
if spec.indexes:
indexes[spec.name] = list(spec.indexes)
if spec.datetime_columns:
datetime_columns[spec.name] = list(spec.datetime_columns)
refresh = spec.refresh
if isinstance(refresh, TTL):
ttl[spec.name] = refresh.seconds
elif isinstance(refresh, DeltaConfig):
delta[spec.name] = refresh
return delta, ttl, indexes, datetime_columns, declared
class _LazySource:
"""A source connection opened on first ``execute`` and shared across one query.
@@ -67,9 +105,26 @@ class CachingEngine:
dialect: str | None = None,
pragmas: dict[str, str | int] | None = None,
datetime_columns: dict[str, list[str]] | None = None,
return_datetime: bool = True,
tables: list[TableSpec] | None = None,
blocking_startup_refresh: bool = False,
) -> None:
self._source_engine = source_engine
# Declarative mode: a list of TableSpecs is converted to the same internal
# config the legacy delta=/ttl=/indexes=/datetime_columns= kwargs produce,
# plus a declared-columns allowlist (for fail-fast) and preload set.
self._declared: dict[str, list[str] | None] | None = None
self._preload_specs: list[TableSpec] = []
if tables is not None:
if any(x is not None for x in (delta, ttl, indexes, datetime_columns)):
raise ValueError(
"Pass either tables=[TableSpec(...)] or the legacy "
"delta=/ttl=/indexes=/datetime_columns= kwargs, not both."
)
delta, ttl, indexes, datetime_columns, self._declared = _specs_to_config(tables)
self._preload_specs = [s for s in tables if s.preload]
use_memory = IN_MEMORY if in_memory is None else in_memory
self._dialect = dialect if dialect is not None else SQL_DIALECT
self._refresh_interval = (
@@ -83,6 +138,7 @@ class CachingEngine:
fetch_batch=fetch_batch if fetch_batch is not None else FETCH_BATCH_SIZE,
pragmas=pragmas,
datetime_columns=datetime_columns,
return_datetime=return_datetime,
)
self._registry = ColumnRegistry(self._cache.connection)
self._stats = StatsCollector()
@@ -99,12 +155,14 @@ class CachingEngine:
"reload), not both."
)
if self._delta or self._ttl:
# The startup catch-up (deltas/TTL reloads for tables restored from
# disk) can take a while on a cold start. By default it runs on the
# background thread so it never blocks application startup; callers
# who need the cache fully fresh before serving can opt back in.
if self._delta or self._ttl or self._preload_specs:
# Startup work (preload of declared tables + delta/TTL catch-up for
# tables restored from disk) can take a while on a cold start. By
# default it runs on the background thread so it never blocks
# application startup; callers who need the cache fully warm before
# serving can opt back in.
if blocking_startup_refresh:
self._preload()
self._run_refresh()
self._start_refresh_thread(initial_catch_up=not blocking_startup_refresh)
@@ -152,7 +210,11 @@ class CachingEngine:
last_runs = self._cache.get_last_runs()
with self._cache._lock:
base = self._stats.snapshot(self._cache.connection, states)
base = replace(base, errors=self._cache.error_total)
base = replace(
base,
errors=self._cache.error_total,
db_size_bytes=self._cache.db_size_bytes(),
)
return replace(
base,
tables={n: self._enrich(n, t, errors, last_runs) for n, t in base.tables.items()},
@@ -193,22 +255,67 @@ class CachingEngine:
)
return replace(table_stats, tracking=tracking, state=state, last_refresh=last_refresh)
def _make_executor(self, source: Any) -> QueryExecutor:
return QueryExecutor(
self._cache,
self._registry,
source,
self._stats,
self._delta,
self._ttl,
self._index_columns,
)
def _check_declared(self, parsed: ParsedQuery) -> None:
"""In declarative mode, reject any table/column not declared up front."""
if self._declared is None:
return
for table in parsed.tables:
if table not in self._declared:
raise UndeclaredError(
f"Table {table!r} is not declared in tables=[TableSpec(...)]. "
"Add a TableSpec for it (declarative mode is a strict allowlist)."
)
allowed = self._declared[table]
if allowed is None:
continue # whole table declared — any column is fine
if table in parsed.wildcard_tables:
raise UndeclaredError(
f"SELECT * on {table!r} is not allowed: only columns {allowed} "
"are declared. List the columns explicitly or declare "
"columns=None for the whole table."
)
unknown = [c for c in parsed.columns_by_table.get(table, []) if c not in allowed]
if unknown:
raise UndeclaredError(
f"Column(s) {unknown} of {table!r} are not declared "
f"(declared: {allowed})."
)
def execute(self, sql: str, params: Params = None) -> list[dict]:
parsed = parse(sql, params, dialect=self._dialect)
self._check_declared(parsed)
# The source connection is opened lazily — a pure cache hit never touches
# the source and never occupies a pool slot.
source = _LazySource(self._source_engine)
try:
executor = QueryExecutor(
self._cache,
self._registry,
source,
self._stats,
self._delta,
self._ttl,
self._index_columns,
)
return executor.execute(parsed)
return self._make_executor(source).execute(parsed)
finally:
source.close()
def _preload(self) -> None:
"""Load declared ``preload=True`` tables into the cache (skipping fresh copies)."""
if not self._preload_specs:
return
source = _LazySource(self._source_engine)
try:
executor = self._make_executor(source)
for spec in self._preload_specs:
try:
logger.info(f"Preloading {spec.name!r}")
executor.ensure_loaded(spec.name, spec.columns)
except Exception as e:
logger.error(f"Preload failed for {spec.name!r}: {e}")
finally:
source.close()
@@ -244,6 +351,7 @@ class CachingEngine:
def _start_refresh_thread(self, initial_catch_up: bool = True) -> None:
def loop() -> None:
if initial_catch_up:
self._preload() # off-main-thread declared-table preload
self._run_refresh() # off-main-thread startup catch-up
event = threading.Event()
while not event.wait(self._refresh_interval):
+10
View File
@@ -4,3 +4,13 @@ class ReadOnlyError(Exception):
class UnsupportedQueryError(Exception):
"""Raised when a query uses unsupported features (JOIN, SELECT *)."""
class UndeclaredError(Exception):
"""Raised in declarative mode (``tables=[TableSpec(...)]``) when a query
references a table or column that was not declared up front.
Fail-fast by design: an undeclared table/column would otherwise trigger a
silent (potentially multi-hour) lazy load/column-expansion, so it is surfaced
immediately instead.
"""
+58 -6
View File
@@ -1,3 +1,4 @@
from collections.abc import Callable
from typing import Any
from loguru import logger
@@ -41,12 +42,40 @@ class QueryExecutor:
self._ensure_table(table, parsed)
return self._run_in_memory(parsed)
def ensure_loaded(self, table: str, columns: list[str] | None) -> None:
"""Preload *table* into the cache without running a query.
``columns=None`` loads the whole table (``SELECT *`` semantics); otherwise
only the listed columns. Reuses the same load path as a real query — delta
key/change + index columns are augmented, the registry and watermark are
updated, and double-checked locking skips a copy already fresh in the
cache — but never materializes any rows (unlike :meth:`execute`).
"""
if columns is None:
self._ensure_full(table)
else:
self._ensure_columns(table, columns)
def _ensure_table(self, table: str, parsed: ParsedQuery) -> None:
if table in parsed.wildcard_tables:
self._ensure_full(table)
else:
self._ensure_columns(table, parsed.columns_by_table[table])
def _full_satisfied(self, table: str) -> bool:
"""True if *table* is cached in full and not TTL-expired (a SELECT * hit)."""
return (
self._cache.is_table_cached(table)
and self._cache.is_table_full(table)
and not self._ttl_expired(table)
)
def _columns_satisfied(self, table: str, columns: list[str]) -> bool:
"""True if *table* is cached with all *columns* present and not TTL-expired."""
if not self._cache.is_table_cached(table) or self._ttl_expired(table):
return False
return set(columns).issubset(self._cache.get_table_columns(table))
def _ensure_full(self, table: str) -> None:
"""Load every column of *table* (SELECT * / t.*), refetching unless already full."""
cached = self._cache.is_table_cached(table)
@@ -67,7 +96,7 @@ class QueryExecutor:
self._stats.record_miss()
columns = self._cache.discover_columns(table, self._source_conn)
self._load(table, columns, full=True)
self._load(table, columns, full=True, satisfied=lambda cols: self._full_satisfied(table))
def _ensure_columns(self, table: str, columns: list[str]) -> None:
"""Load *table* with at least *columns*, refetching on new columns or TTL expiry."""
@@ -95,10 +124,27 @@ class QueryExecutor:
all_columns = list(self._registry.get_columns(table)) + missing
# Preserve a fully-cached table's status across a TTL reload.
full = table_cached and self._cache.is_table_full(table)
self._load(table, all_columns, full=full)
self._load(
table,
all_columns,
full=full,
satisfied=lambda cols: self._columns_satisfied(table, cols),
)
def _load(self, table: str, columns: list[str], full: bool) -> None:
"""Fetch *table* into cache, adding delta key/timestamp and index columns."""
def _load(
self,
table: str,
columns: list[str],
full: bool,
satisfied: Callable[[list[str]], bool] | None = None,
) -> None:
"""Fetch *table* into cache, adding delta key/timestamp and index columns.
*satisfied* is the double-checked-locking predicate evaluated under the
load lock (see :meth:`CacheManager.load_table`); it is given the final,
augmented column list so a concurrent loader that already produced an
equivalent (or wider) cache is detected and the redundant reload skipped.
"""
cfg = self._delta.get(table)
extra = list(self._index_columns.get(table, []))
if cfg:
@@ -108,7 +154,11 @@ class QueryExecutor:
if extra:
columns = list(dict.fromkeys([*columns, *extra]))
self._cache.load_table(table, columns, self._source_conn, full=full)
recheck: Callable[[], bool] | None = None
if satisfied is not None:
final_columns = columns
recheck = lambda: satisfied(final_columns) # noqa: E731
self._cache.load_table(table, columns, self._source_conn, full=full, recheck=recheck)
self._registry.update(table, columns)
if cfg:
@@ -118,5 +168,7 @@ class QueryExecutor:
def _run_in_memory(self, parsed: ParsedQuery) -> list[dict]:
logger.debug(f"Executing in SQLite RAM: {parsed.sqlite_sql!r} params={parsed.params!r}")
col_names, rows = self._cache.execute_in_memory(parsed.sqlite_sql, parsed.params)
col_names, rows = self._cache.execute_in_memory(
parsed.sqlite_sql, parsed.params, parsed.tables
)
return [dict(zip(col_names, row)) for row in rows]
+49
View File
@@ -0,0 +1,49 @@
"""Declarative table specs for ``CachingEngine(tables=[...])``.
Instead of the lazy "learn columns from queries" mode, an application can declare
each table up front — its columns, indexes, refresh strategy and datetime columns —
so the engine preloads them and rejects anything undeclared (fail-fast) rather than
silently triggering an expensive lazy load. The legacy ``delta=/ttl=/indexes=``
kwargs keep working; ``tables=`` is converted to the same internal config.
"""
from dataclasses import dataclass, field
from .delta import DeltaConfig
# Friendly alias for the declarative API; ``Delta`` and ``DeltaConfig`` are the
# same type (``change_column`` + ``key_columns``), so either may be used as a
# ``TableSpec.refresh`` strategy.
Delta = DeltaConfig
@dataclass(frozen=True)
class TTL:
"""Time-based refresh strategy: full-reload the table when older than *seconds*."""
seconds: int
@dataclass(frozen=True)
class TableSpec:
"""Declarative specification of one cached table.
*columns* lists the columns to cache; leave it ``None`` to cache the whole
table (``SELECT *`` semantics) and allow any column. When columns are listed,
a query asking for a column outside the list raises
:class:`~sqlmem.exceptions.UndeclaredError`.
*refresh* is a :class:`Delta` (change-column incremental sync) or :class:`TTL`
(time-based full reload), or ``None`` for a static table loaded once.
*preload=True* loads the table at startup (in the background by default) so the
first query is a cache hit instead of paying a cold load; a copy already fresh
in the persistent cache is skipped.
"""
name: str
columns: list[str] | None = None
indexes: list[str | list[str]] = field(default_factory=list)
refresh: DeltaConfig | TTL | None = None
datetime_columns: list[str] = field(default_factory=list)
preload: bool = False
+1
View File
@@ -40,6 +40,7 @@ class Stats:
refetches: int
tables: dict[str, TableStats]
errors: int = 0 # total load/refresh failures since start
db_size_bytes: int = 0 # on-disk cache file size (0 in memory mode)
class StatsCollector:
+86
View File
@@ -288,3 +288,89 @@ def test_vacuum_in_memory_is_noop(cache, source_conn):
cache.load_table("users", ["name"], source_conn)
cache.vacuum(incremental=False) # no-op, no error
assert cache.is_table_cached("users") is True
# ---------------------------------------------------------------------------
# Double-checked locking against cache stampede (1.15.0)
# ---------------------------------------------------------------------------
class _ExplodingSource:
def execute(self, *args):
raise AssertionError("source must not be queried when recheck() is True")
def test_load_table_recheck_true_skips_load(cache, source_conn):
"""A recheck that reports the table already satisfied skips the reload."""
cache.load_table("users", ["name"], source_conn)
# Second load with recheck() → True must not touch the source at all.
cache.load_table("users", ["name"], _ExplodingSource(), recheck=lambda: True)
assert cache.is_table_cached("users") is True
def test_concurrent_loads_dedup_via_double_checked_lock(tmp_path):
"""A second loader queued behind a slow cold load must not reload the table."""
import time
c = CacheManager(db_path=tmp_path / "c.db", backup_interval=9999)
started = threading.Event()
release = threading.Event()
loads: list[str] = []
class _GatedCursor:
def __init__(self, rows):
self._rows = list(rows)
self._done = False
def fetchmany(self, n):
if self._done:
return []
self._done = True
return self._rows
class _GatedSource:
def execute(self, sql):
loads.append(sql) # one entry per *actual* source load
started.set()
release.wait(5) # hold the load open (and _load_lock) until released
return _GatedCursor([("alice",), ("bob",)])
def recheck() -> bool:
return c.is_table_cached("users") and "name" in c.get_table_columns("users")
def load() -> None:
c.load_table("users", ["name"], _GatedSource(), recheck=recheck)
a = threading.Thread(target=load)
b = threading.Thread(target=load)
a.start()
assert started.wait(5), "first load never started" # A holds _load_lock, mid-fetch
b.start()
time.sleep(0.2) # give B time to queue on _load_lock
release.set() # let A finish; B then re-checks and skips
a.join(5)
b.join(5)
assert not a.is_alive() and not b.is_alive()
assert len(loads) == 1 # the redundant second load was skipped
assert c.is_table_cached("users") is True
_, rows = c.execute_in_memory("SELECT name FROM users ORDER BY name")
assert [r[0] for r in rows] == ["alice", "bob"]
c.close()
def test_incremental_vacuum_warns_without_incremental_auto_vacuum(tmp_path, source_conn):
"""Incremental vacuum on a DB that isn't auto_vacuum=INCREMENTAL warns and skips."""
from loguru import logger
messages: list[str] = []
sink_id = logger.add(messages.append, level="WARNING", filter="sqlmem")
logger.enable("sqlmem")
try:
c = CacheManager(db_path=tmp_path / "c.db", backup_interval=9999, in_memory=False)
c.load_table("users", ["name"], source_conn)
c.vacuum(incremental=True) # auto_vacuum defaults to NONE → no-op + warning
c.close()
finally:
logger.remove(sink_id)
logger.disable("sqlmem")
assert any("auto_vacuum" in m for m in messages)
+131 -1
View File
@@ -4,9 +4,17 @@ import uuid
import pytest
from sqlmem._coerce import coerce_params, to_sqlite, to_sqlite_datetime
from sqlmem._coerce import (
coerce_params,
from_sqlite_datetime,
reverse_coerce_rows,
to_sqlite,
to_sqlite_datetime,
)
from sqlmem.cache import CacheManager
_UTC = datetime.timezone.utc
class _FakeCursor:
def __init__(self, rows):
@@ -165,6 +173,128 @@ def test_non_datetime_columns_unaffected_by_datetime_columns(tmp_path):
c.close()
# --- param coercion for datetime_columns (A) --------------------------------
def test_coerce_params_dt_table_iso_string_to_epoch():
p = coerce_params(("2026-06-01T10:00:00",), dt_table=True)
assert p == (to_sqlite_datetime("2026-06-01T10:00:00"),)
def test_coerce_params_dt_table_datetime_to_epoch():
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
assert coerce_params((dt,), dt_table=True) == (to_sqlite_datetime(dt),)
def test_coerce_params_dt_table_false_keeps_iso_string():
# No datetime table in the query → behaviour unchanged (string stays a string).
assert coerce_params(("2026-06-01T10:00:00",), dt_table=False) == (
"2026-06-01T10:00:00",
)
def test_coerce_params_dt_table_leaves_non_datetime_values():
assert coerce_params(("hello", 5, None), dt_table=True) == ("hello", 5, None)
def test_where_on_datetime_column_matches_with_iso_param(tmp_path):
"""The critical fix: a WHERE on an INTEGER-µs column with an ISO string param
must match instead of comparing INTEGER against TEXT (always 0 rows)."""
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
datetime_columns={"t": ["changed"]},
return_datetime=False,
)
rows = [
("1", datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)),
("2", datetime.datetime(2026, 6, 3, 10, 0, 0, tzinfo=_UTC)),
]
c.load_table("t", ["id", "changed"], FakeSource(rows))
_, out = c.execute_in_memory(
"SELECT id FROM t WHERE changed > ?", ("2026-06-02T00:00:00",), ["t"]
)
assert [r[0] for r in out] == ["2"]
c.close()
def test_where_on_datetime_column_without_table_scope_is_unchanged(tmp_path):
"""Without table scope the param isn't coerced — proves the fix is scoped."""
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
datetime_columns={"t": ["changed"]},
return_datetime=False,
)
c.load_table(
"t",
["id", "changed"],
FakeSource([("1", datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC))]),
)
# No `tables` arg → INTEGER vs TEXT comparison → no match (legacy behaviour).
_, out = c.execute_in_memory("SELECT id FROM t WHERE changed > ?", ("2026-01-01T00:00:00",))
assert out == []
c.close()
# --- reverse coercion: read back as datetime (B) ----------------------------
def test_from_sqlite_datetime_roundtrip():
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
assert from_sqlite_datetime(to_sqlite_datetime(dt)) == dt
def test_from_sqlite_datetime_passes_non_int():
assert from_sqlite_datetime("x") == "x"
assert from_sqlite_datetime(None) is None
assert from_sqlite_datetime(True) is True # bool is not treated as µs
def test_reverse_coerce_rows_only_named_columns():
us = to_sqlite_datetime(datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC))
out = reverse_coerce_rows([("1", us)], ["id", "changed"], {"changed"})
assert out[0][0] == "1"
assert out[0][1] == datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
def test_read_returns_datetime_by_default(tmp_path):
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
datetime_columns={"t": ["changed"]},
)
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
c.load_table("t", ["id", "changed"], FakeSource([("1", dt)]))
_, out = c.execute_in_memory("SELECT id, changed FROM t", None, ["t"])
assert out == [("1", dt)] # returned as a datetime, not the raw int
c.close()
def test_return_datetime_false_keeps_raw_int(tmp_path):
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
datetime_columns={"t": ["changed"]},
return_datetime=False,
)
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
c.load_table("t", ["id", "changed"], FakeSource([("1", dt)]))
_, out = c.execute_in_memory("SELECT changed FROM t", None, ["t"])
assert out == [(to_sqlite_datetime(dt),)] # raw INTEGER µs
c.close()
# --- public export (F) ------------------------------------------------------
def test_datetime_to_epoch_us_is_public():
from sqlmem import datetime_to_epoch_us
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
assert datetime_to_epoch_us(dt) == to_sqlite_datetime(dt)
# --- integration: values reach the cache through coercion -------------------
+68
View File
@@ -421,3 +421,71 @@ def test_engine_vacuum_runs(source_engine, tmp_path):
ce.vacuum(incremental=False) # must not raise
assert ce._cache.is_table_cached("products") is True
ce.close()
# ---------------------------------------------------------------------------
# datetime_columns end-to-end: param coercion (A) + read-back datetime (B)
# ---------------------------------------------------------------------------
@pytest.fixture
def events_engine(tmp_path):
src = tmp_path / "events.db"
conn = sqlite3.connect(src)
conn.execute("CREATE TABLE events (id TEXT, changed TEXT)")
conn.executemany(
"INSERT INTO events VALUES (?, ?)",
[("1", "2026-06-01T10:00:00"), ("2", "2026-06-03T10:00:00")],
)
conn.commit()
conn.close()
se = create_engine(f"sqlite:///{src}")
yield se
se.dispose()
def test_datetime_column_where_and_readback(events_engine, tmp_path):
from datetime import datetime, timezone
ce = CachingEngine(
events_engine,
cache_db_path=tmp_path / "cache.db",
in_memory=False,
datetime_columns={"events": ["changed"]},
)
# A: WHERE on the INTEGER-µs column with an ISO string param returns the right row.
rows = ce.execute(
"SELECT id, changed FROM events WHERE changed > ?", ("2026-06-02T00:00:00",)
)
assert [r["id"] for r in rows] == ["2"]
# B: the column comes back as a datetime, not a raw integer.
assert rows[0]["changed"] == datetime(2026, 6, 3, 10, 0, 0, tzinfo=timezone.utc)
ce.close()
def test_datetime_column_return_datetime_false(events_engine, tmp_path):
ce = CachingEngine(
events_engine,
cache_db_path=tmp_path / "cache.db",
in_memory=False,
datetime_columns={"events": ["changed"]},
return_datetime=False,
)
rows = ce.execute("SELECT id, changed FROM events")
assert all(isinstance(r["changed"], int) for r in rows) # opt-out → raw µs
ce.close()
# ---------------------------------------------------------------------------
# db_size_bytes in stats (D)
# ---------------------------------------------------------------------------
def test_stats_reports_db_size_in_disk_mode(source_engine, tmp_path):
ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False)
ce.execute("SELECT id FROM products")
assert ce.stats.db_size_bytes > 0
ce.close()
def test_stats_db_size_zero_in_memory(engine):
engine.execute("SELECT id, name FROM products")
assert engine.stats.db_size_bytes == 0
+221
View File
@@ -0,0 +1,221 @@
import sqlite3
import time
from datetime import datetime, timezone
import pytest
from sqlalchemy import create_engine
from sqlmem import (
TTL,
CachingEngine,
Delta,
DeltaConfig,
TableSpec,
UndeclaredError,
)
@pytest.fixture
def spec_source(tmp_path):
db = tmp_path / "src.db"
conn = sqlite3.connect(db)
conn.execute("CREATE TABLE products (id TEXT, name TEXT, price TEXT, changed TEXT)")
conn.executemany(
"INSERT INTO products VALUES (?, ?, ?, ?)",
[
("1", "Widget", "9.99", "2026-06-01T10:00:00"),
("2", "Gadget", "19.99", "2026-06-02T10:00:00"),
],
)
conn.execute("CREATE TABLE orders (order_id TEXT, qty TEXT)")
conn.executemany("INSERT INTO orders VALUES (?, ?)", [("101", "2")])
conn.commit()
conn.close()
se = create_engine(f"sqlite:///{db}")
yield se
se.dispose()
# --- back-compat / validation -----------------------------------------------
def test_tables_and_legacy_kwargs_are_mutually_exclusive(spec_source):
with pytest.raises(ValueError):
CachingEngine(
spec_source,
tables=[TableSpec("products", ["id"])],
delta={"products": DeltaConfig("changed", ["id"])},
)
def test_duplicate_tablespec_raises(spec_source):
with pytest.raises(ValueError):
CachingEngine(
spec_source,
tables=[TableSpec("products", ["id"]), TableSpec("products", ["name"])],
)
def test_delta_alias_is_deltaconfig():
assert Delta is DeltaConfig
# --- preload ----------------------------------------------------------------
def test_preload_blocking_caches_before_first_query(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[TableSpec("products", ["id", "name"], preload=True)],
blocking_startup_refresh=True,
)
# Cached at construction time — no execute() needed.
assert ce._cache.is_table_cached("products") is True
rows = ce.execute("SELECT id, name FROM products")
assert {r["id"] for r in rows} == {"1", "2"}
ce.close()
def test_preload_non_blocking_eventually_caches(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[TableSpec("products", ["id", "name"], preload=True)],
)
deadline = time.time() + 5
while not ce._cache.is_table_cached("products") and time.time() < deadline:
time.sleep(0.05)
assert ce._cache.is_table_cached("products") is True
ce.close()
def test_non_preload_table_is_not_loaded_at_startup(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[TableSpec("products", ["id", "name"], preload=False)],
blocking_startup_refresh=True,
)
assert ce._cache.is_table_cached("products") is False # loads lazily on first query
ce.execute("SELECT id, name FROM products")
assert ce._cache.is_table_cached("products") is True
ce.close()
# --- fail-fast on undeclared tables / columns -------------------------------
def test_fail_fast_undeclared_table(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
with pytest.raises(UndeclaredError):
ce.execute("SELECT order_id FROM orders")
ce.close()
def test_fail_fast_undeclared_column(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
with pytest.raises(UndeclaredError):
ce.execute("SELECT price FROM products")
ce.close()
def test_declared_columns_query_succeeds(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
rows = ce.execute("SELECT id, name FROM products")
assert len(rows) == 2
ce.close()
def test_columns_none_allows_wildcard_and_any_column(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", columns=None)])
assert len(ce.execute("SELECT * FROM products")) == 2
assert len(ce.execute("SELECT price FROM products")) == 2 # any column allowed
ce.close()
def test_wildcard_on_column_restricted_table_fails(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
with pytest.raises(UndeclaredError):
ce.execute("SELECT * FROM products")
ce.close()
def test_lazy_mode_has_no_fail_fast(spec_source):
"""Without tables=, undeclared columns/tables load lazily as before."""
ce = CachingEngine(spec_source)
rows = ce.execute("SELECT order_id FROM orders")
assert len(rows) == 1
ce.close()
# --- refresh strategies via TableSpec ---------------------------------------
def test_tablespec_delta_tracking(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[
TableSpec(
"products",
["id", "name", "changed"],
refresh=Delta(change_column="changed", key_columns=["id"]),
preload=True,
)
],
blocking_startup_refresh=True,
)
assert ce.stats.tables["products"].tracking == "delta"
ce.close()
def test_tablespec_ttl_tracking(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[TableSpec("products", ["id", "name"], refresh=TTL(seconds=1800), preload=True)],
blocking_startup_refresh=True,
)
assert ce.stats.tables["products"].tracking == "ttl"
ce.close()
def test_tablespec_datetime_columns_roundtrip(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[
TableSpec("products", ["id", "changed"], datetime_columns=["changed"], preload=True)
],
blocking_startup_refresh=True,
)
rows = ce.execute("SELECT id, changed FROM products WHERE changed > ?", ("2026-06-01T12:00:00",))
assert [r["id"] for r in rows] == ["2"] # param coercion via datetime_columns
assert rows[0]["changed"] == datetime(2026, 6, 2, 10, 0, 0, tzinfo=timezone.utc)
ce.close()
# --- warm restart: preload skips a copy already fresh on disk ---------------
def test_warm_restart_preload_skips_reload(spec_source, tmp_path):
path = tmp_path / "c.db"
def make() -> CachingEngine:
return CachingEngine(
spec_source,
cache_db_path=path,
in_memory=False,
tables=[TableSpec("products", ["id", "name"], preload=True)],
blocking_startup_refresh=True,
)
ce1 = make()
assert ce1.stats.misses >= 1 # cold preload had to load from source
ce1.close()
ce2 = make()
assert ce2._cache.is_table_cached("products") is True
assert ce2.stats.misses == 0 # warm: preload was a cache hit, no redundant reload
ce2.close()