5 Commits

20 changed files with 1623 additions and 79 deletions
+3 -6
View File
@@ -1,8 +1,6 @@
# Python # Python
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*.pyo
*.pyd
*.egg *.egg
*.egg-info/ *.egg-info/
dist/ dist/
@@ -40,12 +38,11 @@ Thumbs.db
.env.* .env.*
# sqlmem cache (incl. WAL sidecars from disk-backed mode) # sqlmem cache (incl. WAL sidecars from disk-backed mode)
cache.db cache.db*
cache.db-wal
cache.db-shm
# Agents # Agents
AGENTS.md AGENTS.md
CLAUDE.md CLAUDE.md
DESIGN_DOCUMENT_MODULE.md DESIGN_DOCUMENT_MODULE.md
.claude/ .claude/
handover.md
+81
View File
@@ -6,6 +6,87 @@ All notable changes to this project will be documented in this file.
--- ---
## [1.16.0] - 2026-06-11
### Added
- **Declarative table specs — `CachingEngine(tables=[TableSpec(...)])`** — declare each cached table up front (columns, indexes, refresh strategy, datetime columns, preload) instead of letting the engine learn columns lazily from queries. New public types `TableSpec`, `TTL`, `Delta` (a friendly alias of `DeltaConfig`) and exception `UndeclaredError`.
- **Background preload** — `preload=True` tables are loaded at startup (on the background thread by default, so startup isn't blocked; `blocking_startup_refresh=True` loads them synchronously). A copy already fresh in the persistent cache is skipped via the same double-checked locking added in 1.15.0, so a warm restart is instant.
- **Fail-fast on undeclared access** — in declarative mode a query referencing a table that has no `TableSpec`, or a column outside a spec's declared `columns` (including `SELECT *` on a column-restricted table), raises `UndeclaredError` instead of silently triggering an expensive lazy load / column-expansion. Declare `columns=None` to cache the whole table and allow any column.
- **Solves the lazy second-reload** — because columns are declared, a first query for a previously unseen column no longer forces a full re-fetch.
- `executor.ensure_loaded(table, columns)` — preloads a table into the cache (reusing the full load path: delta/index augmentation, registry, watermark, double-checked locking) without materializing any rows.
### Fixed
- **Race on the shared cache connection** — the metadata reads (`is_table_cached`, `is_table_full`, `seconds_since_refresh`, `get_table_columns`, `get_last_synced_at`, `max_value`, `count_rows`) touched the single shared SQLite connection without the connection lock, so a query thread reading while the background refresh/preload thread wrote could raise `sqlite3.InterfaceError`. These reads now take the lock. More likely to surface now that startup preload adds background-thread activity.
### Changed
- `pyproject.toml` — bumped version to `1.16.0`.
- **Fully backward compatible** — omit `tables=` and the legacy `delta=`/`ttl=`/`indexes=`/`datetime_columns=` kwargs behave exactly as before (lazy mode, no fail-fast). Passing both `tables=` and any of those kwargs raises `ValueError`; `tables=` is internally converted to the same config.
---
## [1.15.0] - 2026-06-11
### Fixed
- **Cache stampede (thundering herd) on cold loads** — the decision to load a table was made *before* the load lock was taken, and `load_table` never re-checked after acquiring it. During a slow cold load of a large table (observed: 212M rows, ~2 h), a second query for the same table passed the pre-lock "not cached" check, queued on the load lock, and then ran a **redundant second full reload** instead of seeing the first had finished — doubling a multi-hour load. `load_table` now does **double-checked locking**: after acquiring the load lock it re-evaluates a caller-supplied predicate (table cached, all needed columns present, not TTL-expired) and skips the load when it is already satisfied. Invisible on small tables; on large ones it removes hours of redundant indexing under concurrent cold-start traffic.
### Changed
- `pyproject.toml` — bumped version to `1.15.0`.
- `CacheManager.load_table` gained an optional `recheck` callback (the double-check predicate); `QueryExecutor` supplies it for both column and `SELECT *` loads.
---
## [1.14.0] - 2026-06-10
Follow-up to 1.12.0 from running `datetime_columns` in production: the feature was only half-wired (writes were coerced, reads and query params were not).
### Fixed
- **`WHERE` on an INTEGER-µs `datetime_columns` column silently returned 0 rows** — `execute_in_memory()` coerced query params with `to_sqlite()`, which leaves an ISO string a string. Comparing the stored `INTEGER` against a `TEXT` param is always false under SQLite affinity, so `WHERE CHANGE_DATE > '2026-05-01T…'` matched nothing. Params for a query that touches a `datetime_columns` table are now coerced to epoch µs (datetime objects and ISO-datetime strings alike), so the comparison matches the stored integer. Scoped to the query's tables, so non-datetime queries are unaffected.
### Added
- **Read-time coercion — `datetime_columns` come back as `datetime`** — `execute()` now returns those columns as real `datetime` objects (UTC) instead of the raw INTEGER µs, restoring the transparent-proxy contract (you get the same type a direct source query would give). Opt out with `CachingEngine(..., return_datetime=False)` to get the raw integers.
- **`Stats.db_size_bytes`** — on-disk size of the cache file (0 in memory mode), so `engine.stats` exposes cache growth for monitoring without an external file check.
- **Public `datetime_to_epoch_us` helper** — `from sqlmem import datetime_to_epoch_us` exposes the same datetime→epoch-µs conversion used internally, so callers building `WHERE change_col > ?` params don't have to re-implement it.
### Changed
- `pyproject.toml` — bumped version to `1.14.0`.
- **`vacuum(incremental=True)` now warns instead of silently doing nothing** when the cache was not created with `auto_vacuum=INCREMENTAL` (the only mode in which incremental vacuum can reclaim pages); it logs how to fix it (`hard_reset()` with the pragma, or a full `vacuum(incremental=False)`) and returns.
- `CacheManager.execute_in_memory()` gained an optional `tables` argument (the query's tables) used to scope datetime param/result coercion; `CacheManager`/`CachingEngine` gained a `return_datetime` flag.
---
## [1.12.0] - 2026-06-09
### ⚠️ Breaking
- **`SCHEMA_VERSION` bumped `3``4`** — on upgrade the existing cache is wiped automatically (disk mode wipes the file in place, in-memory discards the backup) and reloaded from the source on next use. For a large cache (e.g. a multi-hundred-million-row table) the full reload can take a while; deploy in a maintenance window.
- **`datetime_columns` change the public output contract for the chosen columns** — a column listed in `datetime_columns` is stored and returned as an **INTEGER (microseconds since the Unix epoch, UTC)**, not an ISO `TEXT` string. This is opt-in per column, so no table is affected unless you name its columns; consumers that read or filter such a column must adapt (compare against integer µs, or convert on read).
### Added
- **`datetime_columns=` parameter on `CachingEngine` / `CacheManager`** — `datetime_columns={"VW_X": ["CHANGE_DATE"]}` stores the named datetime columns as INTEGER µs-since-epoch instead of ~28-byte ISO `TEXT`. Saves ~20 bytes per row and makes index comparisons on the column operate on native integers instead of string collation — worthwhile for a pure datetime column on a very large table (e.g. a delta change column that is also range-scanned).
- `_coerce.to_sqlite_datetime()` converts datetimes (and ISO/`date` values) to exact integer microseconds via integer arithmetic (no float rounding); a naive datetime is treated as UTC, `None` passes through.
- `load_table` declares those columns `INTEGER` and `upsert_rows` coerces them the same way, so full loads and delta upserts agree on the on-disk representation.
- The delta high-watermark for such a column is the stored integer; `delta._bind_watermark(..., epoch_us=True)` reconstructs a real UTC `datetime` before binding, so the source still receives a typed timestamp (and the watermark fix from 1.8.0 keeps holding).
### Changed
- `pyproject.toml` — bumped version to `1.12.0`.
- `CacheManager.max_value` / `set_last_synced_at` now accept/return `int` watermarks alongside `str` (the INTEGER-µs watermark round-trips through the `last_synced_at` TEXT column as its digit string).
---
## [1.11.0] - 2026-06-09
### Added
- **`pragmas=` parameter on `CachingEngine` / `CacheManager`** — pass a dict of SQLite PRAGMAs (e.g. `mmap_size`, `cache_size`, `temp_store`, `page_size`, `auto_vacuum`) applied to the cache connection at open time, so disk-backed caches can be tuned for the host's I/O profile without bypassing `CacheManager`. Unknown/inapplicable pragmas are silently ignored by SQLite (graceful degradation, no startup crash).
- **`page_size`** is a layout pragma: it is applied only on a *fresh* file (set before WAL / the first table). On an existing cache with a different page size the request is ignored and a one-time warning is logged — the new value takes effect only after `hard_reset()` or a rebuild.
- **`auto_vacuum`** is set before the database header is materialized (before switching to WAL) on a fresh file, so `INCREMENTAL`/`FULL` actually stick instead of silently reverting to `NONE`.
- **`CachingEngine.hard_reset()` / `CacheManager.hard_reset()`** — close every connection, delete the on-disk cache file (and its `-wal`/`-shm` sidecars) and reopen from scratch with all current pragmas applied. Unlike `reset()` (which drops tables but keeps the open file), this lets `page_size`/`auto_vacuum` change, since those are baked into the file at creation. Disk mode only — falls back to `reset()` in memory mode. All tables reload on next use.
- **`CachingEngine.vacuum(incremental=True, pages=10_000)` / `CacheManager.vacuum(...)`** — run maintenance VACUUM on the on-disk cache to reclaim free pages left by delta `INSERT OR REPLACE` churn. Incremental (default) reclaims up to `pages` pages without blocking readers or extra disk (requires `auto_vacuum=INCREMENTAL`); `incremental=False` runs a full VACUUM (rewrites the file, ~2× disk, blocks readers — maintenance window only). No-op in memory mode.
### Changed
- `pyproject.toml` — bumped version to `1.11.0`.
- `ColumnRegistry` gained `rebind()` so it follows the cache connection swap performed by `hard_reset()` (the registry previously captured the connection for the process lifetime).
---
## [1.10.0] - 2026-06-09 ## [1.10.0] - 2026-06-09
### Added ### Added
+94 -3
View File
@@ -238,6 +238,41 @@ Each value is a list of index definitions: a string is a single-column index, a
- Indexes are **recreated after every (re)load** — full loads, TTL reloads, and `invalidate()` + re-fetch all rebuild them — so they're always present, and they persist in `cache.db` across restarts. - Indexes are **recreated after every (re)load** — full loads, TTL reloads, and `invalidate()` + re-fetch all rebuild them — so they're always present, and they persist in `cache.db` across restarts.
- Delta-tracked tables already get a unique index on their key columns; secondary indexes are independent and can be combined with `delta` or `ttl`. - Delta-tracked tables already get a unique index on their key columns; secondary indexes are independent and can be combined with `delta` or `ttl`.
## Declarative initialization (`tables=`)
Instead of the lazy "learn columns from queries" mode, you can **declare every table up front** with `tables=[TableSpec(...)]` — its columns, indexes, refresh strategy and which columns are datetimes — and have the engine preload them and reject anything undeclared:
```python
from sqlmem import CachingEngine, TableSpec, Delta, TTL
engine = CachingEngine(
base_engine,
tables=[
TableSpec(
name="VW_P_PRATVALUES",
columns=["PRODUCT_PRODUCTNR", "PRAT_NAME", "PRATVALUE", "CHANGE_DATE"],
indexes=["PRODUCT_PRODUCTNR", "PRAT_NAME", "CHANGE_DATE"],
refresh=Delta(change_column="CHANGE_DATE", key_columns=["PRATVALUE_ID"]),
datetime_columns=["CHANGE_DATE"],
preload=True,
),
TableSpec(
name="VW_PRODUCTS_ASSIGNED_E",
columns=["PRODUCT_PRODUCTNR", "ELEMENT_NAME", "ELEMENT_ID"],
indexes=["PRODUCT_PRODUCTNR"],
refresh=TTL(seconds=1800),
preload=True,
),
],
pragmas={"mmap_size": 32 * 1024**3, "page_size": 8192},
)
```
- **Preload** — `preload=True` tables are loaded at startup (on the background thread by default, so startup isn't blocked; pass `blocking_startup_refresh=True` to load them synchronously before serving). A copy already fresh in the persistent cache is **skipped**, so a warm restart is instant. During warm-up a table reports `TableState.LOADING` in [`stats`](#runtime-statistics) — handy for gating a `503` until it's `ready`.
- **Fail-fast** — a query for a table without a `TableSpec`, or for a column outside a spec's declared `columns` (including `SELECT *` on a column-restricted table), raises `UndeclaredError` instead of silently kicking off an expensive lazy load. Use `columns=None` to cache the whole table and allow any column.
- `refresh=` takes a `Delta(change_column=…, key_columns=…)` (same as `DeltaConfig`) or `TTL(seconds=…)`, or `None` for a static table.
- **Backward compatible** — omit `tables=` and the legacy `delta=`/`ttl=`/`indexes=`/`datetime_columns=` kwargs work exactly as before (lazy mode, no fail-fast). Passing both raises `ValueError`.
## Persistence ## Persistence
By default the cache lives in an **in-memory SQLite** and is persisted to `cache.db` on disk: By default the cache lives in an **in-memory SQLite** and is persisted to `cache.db` on disk:
@@ -263,29 +298,80 @@ engine = CachingEngine(base_engine, in_memory=False)
The constructor argument wins over the env var; when `in_memory` is omitted it falls back to `SQLMEM_IN_MEMORY`. The constructor argument wins over the env var; when `in_memory` is omitted it falls back to `SQLMEM_IN_MEMORY`.
#### Tuning the SQLite layer (`pragmas=`)
For a large disk-backed cache, pass SQLite PRAGMAs to tune the read path and on-disk layout without bypassing SQLmem:
```python
engine = CachingEngine(
base_engine,
in_memory=False,
pragmas={
"mmap_size": 32 * 1024**3, # map the DB into the address space (32 GB)
"cache_size": -262144, # 256 MB page cache (negative = KiB)
"temp_store": 2, # ORDER BY / GROUP BY scratch in RAM
"page_size": 8192, # larger pages → fewer reads on range scans
"auto_vacuum": "INCREMENTAL",# reclaim free pages with vacuum() (see below)
},
)
```
- Every entry is applied as `PRAGMA <key> = <value>` when the cache connection opens. **Unknown or inapplicable pragmas are silently ignored** by SQLite, so a bad value degrades gracefully instead of crashing startup.
- **`page_size` and `auto_vacuum` are layout pragmas** — they only take effect on a *fresh* file (set before the first table). On an existing cache, `page_size` is ignored with a one-time warning; use [`hard_reset()`](#manual-cache-control) to rebuild the file with the new value.
#### INTEGER datetime columns (`datetime_columns=`)
A pure datetime column stored as an ISO `TEXT` string costs ~28 bytes per row and compares by string collation. For a large table you can store named datetime columns as **INTEGER microseconds since the Unix epoch** instead — 8 bytes, native integer comparison:
```python
engine = CachingEngine(
base_engine,
delta={"VW_P_PRATVALUES": DeltaConfig("CHANGE_DATE", ["PRATVALUE_ID"])},
datetime_columns={"VW_P_PRATVALUES": ["CHANGE_DATE"]},
)
```
- **Opt-in per column.** Only the columns you name change; everything else keeps the default lossless `TEXT` storage.
- **Transparent in and out.** A `WHERE` on such a column accepts a `datetime` or an ISO string — the param is coerced to integer µs so the comparison matches — and `execute()` returns the column as a real `datetime` (UTC), the same type a direct source query would give. Pass `return_datetime=False` to get the raw integers instead.
- The delta watermark is handled transparently: it is persisted as the integer and bound back to a real `datetime` for the source query, so incremental refresh keeps working.
- ⚠️ This is a **breaking on-disk change** (`SCHEMA_VERSION` 4): an existing cache is wiped and reloaded on first start after enabling it — schedule a maintenance window for a large reload.
To build a `WHERE` param yourself (e.g. an HTTP `?since=` filter) without re-implementing the conversion, use the exported helper:
```python
from sqlmem import datetime_to_epoch_us
rows = engine.execute("SELECT * FROM events WHERE changed > ?", (datetime_to_epoch_us(since),))
```
## Manual cache control ## Manual cache control
```python ```python
engine.invalidate("orders") # drop one table from cache; next query re-fetches it from DB engine.invalidate("orders") # drop one table from cache; next query re-fetches it from DB
engine.reset() # wipe the whole cache (RAM + cache.db) — full clean slate engine.reset() # wipe the whole cache (RAM + cache.db) — full clean slate
engine.hard_reset() # disk mode: delete the file and reopen with current pragmas/page_size
engine.vacuum() # disk mode: incremental VACUUM (reclaim free pages from delta churn)
engine.refresh() # pull deltas for all delta-tracked tables now engine.refresh() # pull deltas for all delta-tracked tables now
engine.close() # flush to disk and shut down background thread engine.close() # flush to disk and shut down background thread
``` ```
Use `reset()` after a **structural change** in the source (columns added/removed, values cleared in bulk without bumping the change timestamp) so the cache rebuilds from scratch. `invalidate(table)` is the targeted version for a single table. Use `reset()` after a **structural change** in the source (columns added/removed, values cleared in bulk without bumping the change timestamp) so the cache rebuilds from scratch. `invalidate(table)` is the targeted version for a single table.
`hard_reset()` goes further than `reset()` in disk mode: it closes every connection, deletes `cache.db` (and its `-wal`/`-shm` sidecars) and reopens from scratch — the only way to change a baked-in `page_size`/`auto_vacuum`. In memory mode it falls back to `reset()`.
`vacuum()` reclaims free pages left behind by delta `INSERT OR REPLACE` churn. Incremental (the default) is cheap and non-blocking but needs `auto_vacuum=INCREMENTAL` (set it via `pragmas=` on a fresh cache); if the cache wasn't created that way, `vacuum(incremental=True)` logs a warning and does nothing rather than silently no-op'ing. `vacuum(incremental=False)` runs a full VACUUM that rewrites the file (~2× disk, blocks readers) — schedule it in a maintenance window. Both are no-ops in memory mode.
## Runtime statistics ## Runtime statistics
```python ```python
stats = engine.stats # Stats snapshot stats = engine.stats # Stats snapshot
print(stats.hits, stats.misses, stats.refetches, stats.errors) print(stats.hits, stats.misses, stats.refetches, stats.errors, stats.db_size_bytes)
for name, t in stats.tables.items(): for name, t in stats.tables.items():
print(name, t.rows, t.state, t.tracking, t.last_upsert, t.last_refresh) print(name, t.rows, t.state, t.tracking, t.last_upsert, t.last_refresh)
if t.consecutive_failures: if t.consecutive_failures:
print(f" {name} failing ×{t.consecutive_failures}: {t.last_error} ({t.last_error_at})") print(f" {name} failing ×{t.consecutive_failures}: {t.last_error} ({t.last_error_at})")
``` ```
`Stats.errors` is the total number of load/refresh failures since start. Each `TableStats` also carries `last_error`, `last_error_at` and `consecutive_failures` (reset to 0 on the next success) — so a delta that fails *before* streaming (which otherwise leaves `state` looking `ready`) is still visible, and the table is marked `error`. `Stats.db_size_bytes` is the on-disk cache file size (0 in memory mode) — handy for monitoring cache growth. `Stats.errors` is the total number of load/refresh failures since start. Each `TableStats` also carries `last_error`, `last_error_at` and `consecutive_failures` (reset to 0 on the next success) — so a delta that fails *before* streaming (which otherwise leaves `state` looking `ready`) is still visible, and the table is marked `error`.
Two timestamps distinguish *data freshness* from *liveness*: Two timestamps distinguish *data freshness* from *liveness*:
@@ -319,6 +405,7 @@ By default the cache is **in-memory SQLite**, so a cached table lives in RAM —
- **Use [disk-backed mode](#disk-backed-cache-no-ram-copy)** (`in_memory=False`) when the working set simply doesn't fit in RAM — queries then run against `cache.db` on disk instead of a memory copy. - **Use [disk-backed mode](#disk-backed-cache-no-ram-copy)** (`in_memory=False`) when the working set simply doesn't fit in RAM — queries then run against `cache.db` on disk instead of a memory copy.
- **Loads are streamed in batches** (`SQLMEM_FETCH_BATCH` rows at a time, default 10 000) into a staging table and swapped in atomically. A multi-million-row table never gets fully materialized in Python at once, so the load doesn't spike memory or crash the process, and readers keep seeing the previous copy until the swap completes. - **Loads are streamed in batches** (`SQLMEM_FETCH_BATCH` rows at a time, default 10 000) into a staging table and swapped in atomically. A multi-million-row table never gets fully materialized in Python at once, so the load doesn't spike memory or crash the process, and readers keep seeing the previous copy until the swap completes.
- Use **[delta refresh](#incremental-delta-refresh)** for large tables that have a change column — after the first load only changed rows are pulled, so restarts and refreshes don't re-read the whole table. - Use **[delta refresh](#incremental-delta-refresh)** for large tables that have a change column — after the first load only changed rows are pulled, so restarts and refreshes don't re-read the whole table.
- **Concurrent queries during a cold load are deduplicated** — while one query is loading a large table, others for the same table wait and then read the freshly loaded cache rather than kicking off their own redundant reload (double-checked locking), so a slow cold start isn't multiplied by concurrent traffic.
- A **single query that returns a huge result set** (e.g. `SELECT *` over a multi-million-row cached table) still materializes that result as a list of dicts; bound it with a `WHERE`/`LIMIT` rather than selecting everything. - A **single query that returns a huge result set** (e.g. `SELECT *` over a multi-million-row cached table) still materializes that result as a list of dicts; bound it with a `WHERE`/`LIMIT` rather than selecting everything.
## Configuration ## Configuration
@@ -346,6 +433,9 @@ engine = CachingEngine(
refresh_interval=300, # SQLMEM_REFRESH_INTERVAL refresh_interval=300, # SQLMEM_REFRESH_INTERVAL
fetch_batch=10000, # SQLMEM_FETCH_BATCH fetch_batch=10000, # SQLMEM_FETCH_BATCH
dialect="tsql", # SQLMEM_SQL_DIALECT dialect="tsql", # SQLMEM_SQL_DIALECT
pragmas={"mmap_size": 32 * 1024**3, "page_size": 8192}, # disk-mode SQLite tuning
datetime_columns={"orders": ["created_at"]}, # store these as INTEGER µs (opt-in)
return_datetime=True, # return datetime_columns as datetime (vs raw µs int)
blocking_startup_refresh=False, # block startup until caught up? (default: no) blocking_startup_refresh=False, # block startup until caught up? (default: no)
) )
``` ```
@@ -358,9 +448,10 @@ By default the **startup catch-up** (delta pulls and TTL reloads for tables rest
|---|---| |---|---|
| `ReadOnlyError` | INSERT, UPDATE, or DELETE statement | | `ReadOnlyError` | INSERT, UPDATE, or DELETE statement |
| `UnsupportedQueryError` | non-SELECT statement, `SELECT` without `FROM`, or an unqualified column in a multi-table query | | `UnsupportedQueryError` | non-SELECT statement, `SELECT` without `FROM`, or an unqualified column in a multi-table query |
| `UndeclaredError` | in [declarative mode](#declarative-initialization-tables) (`tables=`): a query references a table or column that was not declared |
```python ```python
from sqlmem import ReadOnlyError, UnsupportedQueryError from sqlmem import ReadOnlyError, UnsupportedQueryError, UndeclaredError
``` ```
## Logging ## Logging
+11
View File
@@ -215,6 +215,17 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o
- [x] **Quoting identifikátorů**: názvy tabulek/sloupců se kvótují (SQLite `"x"` pro cache, dialekt zdroje — T-SQL `[x]` — pro source), takže rezervovaná slova i mezery fungují. - [x] **Quoting identifikátorů**: názvy tabulek/sloupců se kvótují (SQLite `"x"` pro cache, dialekt zdroje — T-SQL `[x]` — pro source), takže rezervovaná slova i mezery fungují.
- [x] **Lazy source connection**: `execute()` neotevírá spojení ke zdroji při cache hitu (neobsazuje pool slot). - [x] **Lazy source connection**: `execute()` neotevírá spojení ke zdroji při cache hitu (neobsazuje pool slot).
- [x] **Idempotentní `add_sink`**: opakované volání pro stejný sink je no-op (žádné duplicitní logy). - [x] **Idempotentní `add_sink`**: opakované volání pro stejný sink je no-op (žádné duplicitní logy).
- [x] **Ladění SQLite vrstvy (`pragmas=`)**: `CachingEngine(..., pragmas={...})` aplikuje libovolné PRAGMA na cache spojení (`mmap_size`, `cache_size`, `temp_store`, `page_size`, `auto_vacuum`). `page_size` a `auto_vacuum` jsou layout-pragmata — platí jen na čerstvém souboru (page_size na existující cache se ignoruje s warningem). Neznámá pragmata SQLite tiše ignoruje.
- [x] **`hard_reset()`**: smaže on-disk soubor (+ WAL/SHM) a otevře nový s aktuálními pragmaty — na rozdíl od `reset()` umožní změnit `page_size`/`auto_vacuum`. Jen disk mód (v memory módu fallback na `reset()`).
- [x] **`vacuum(incremental=, pages=)`**: údržbový VACUUM cache souboru — inkrementální (uvolní volné stránky po delta `INSERT OR REPLACE`, vyžaduje `auto_vacuum=INCREMENTAL`) nebo plný (přepíše soubor, jen v maintenance okně). V memory módu no-op.
- [x] **Nativní INTEGER ukládání datetime sloupců (`datetime_columns=`)**: `datetime_columns={"VW_X": ["CHANGE_DATE"]}` — vyjmenované datetime sloupce se ukládají jako INTEGER µs-od-epochy místo ~28 B ISO TEXT (úspora místa + nativní celočíselné porovnání indexu). Opt-in per sloupec. Breaking: `SCHEMA_VERSION` 3→4, cache se při upgrade smaže a načte znovu. Watermark se persistuje jako int a `_bind_watermark(epoch_us=True)` ho rekonstruuje zpět na `datetime` pro zdroj.
- **Param coercion**: `WHERE col > ?` s ISO/`datetime` parametrem se zkoercuje na epoch µs (scoped na tabulky dotazu), takže porovnání INTEGER sloupce sedí (dřív vracelo 0 řádků).
- **Read-time coercion**: čtení vrací `datetime` objekt místo raw int (transparentní proxy); opt-out `CachingEngine(..., return_datetime=False)`.
- Veřejný helper `from sqlmem import datetime_to_epoch_us` pro konstrukci parametrů bez duplicitní logiky.
- [x] **`vacuum(incremental=True)` varuje bez `auto_vacuum=INCREMENTAL`**: dřív tichý no-op; teď zaloguje warning (a jak to opravit) a vrátí se.
- [x] **`Stats.db_size_bytes`**: velikost cache souboru na disku (0 v memory módu) ve `stats` pro monitoring.
- [x] **Ochrana proti cache stampede**: `load_table` dělá double-checked locking — po získání `_load_lock` znovu ověří, zda tabulku mezitím nenahrál souběžný loader (cached + sloupce + ne-stale), a redundantní reload přeskočí. Bez toho druhý dotaz během studeného loadu velké tabulky spustil druhý plný reload (212M řádků = +2 h).
- [x] **Deklarativní inicializace (`tables=[TableSpec(...)]`)**: předem se deklaruje každá tabulka (`TableSpec(name, columns, indexes, refresh=Delta(...)|TTL(...), datetime_columns, preload)`). `preload=True` se na pozadí (nebo blokujícně) přednahraje při startu; co je v persistentní cache čerstvé, se přeskočí (warm restart = instant). Nedeklarovaná tabulka/sloupec (i `SELECT *` na sloupcově omezené tabulce) → `UndeclaredError` (fail-fast) místo tichého líného loadu; `columns=None` = celá tabulka + libovolný sloupec. Plně zpětně kompatibilní: bez `tables=` fungují staré `delta=/ttl=/indexes=/datetime_columns=` jako dřív (kombinace obojího → `ValueError`).
## TODO — budoucí funkce ## TODO — budoucí funkce
+1 -1
View File
@@ -1,6 +1,6 @@
[project] [project]
name = "sqlmem" name = "sqlmem"
version = "1.10.0" version = "1.16.0"
description = "" description = ""
authors = [ authors = [
{name = "jan.doubravsky@gmail.com"} {name = "jan.doubravsky@gmail.com"}
+8 -1
View File
@@ -3,10 +3,12 @@ from typing import Any
from loguru import logger from loguru import logger
from ._coerce import to_sqlite_datetime as datetime_to_epoch_us
from .config import DEBUG from .config import DEBUG
from .delta import DeltaConfig from .delta import DeltaConfig
from .engine import CachingEngine from .engine import CachingEngine
from .exceptions import ReadOnlyError, UnsupportedQueryError from .exceptions import ReadOnlyError, UndeclaredError, UnsupportedQueryError
from .spec import TTL, Delta, TableSpec
from .stats import Stats, TableStats from .stats import Stats, TableStats
_DEFAULT_FORMAT = ( _DEFAULT_FORMAT = (
@@ -58,9 +60,14 @@ def add_sink(sink: Any, *, level: str | None = None, **kwargs: Any) -> None:
__all__ = [ __all__ = [
"CachingEngine", "CachingEngine",
"DeltaConfig", "DeltaConfig",
"Delta",
"TTL",
"TableSpec",
"ReadOnlyError", "ReadOnlyError",
"UnsupportedQueryError", "UnsupportedQueryError",
"UndeclaredError",
"Stats", "Stats",
"TableStats", "TableStats",
"add_sink", "add_sink",
"datetime_to_epoch_us",
] ]
+82 -3
View File
@@ -10,11 +10,18 @@ left untouched.
import datetime import datetime
import decimal import decimal
import re
import uuid import uuid
from typing import Any from typing import Any
Params = tuple | list | dict | None Params = tuple | list | dict | None
_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
# A string that *starts* with an ISO date+time (``2026-05-01T00:00:00`` or
# space-separated). Used to spot a datetime passed as a string in a query param.
_ISO_DATETIME_RE = re.compile(r"^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}")
def to_sqlite(value: Any) -> Any: def to_sqlite(value: Any) -> Any:
if isinstance(value, decimal.Decimal): if isinstance(value, decimal.Decimal):
@@ -28,13 +35,85 @@ def to_sqlite(value: Any) -> Any:
return value return value
def to_sqlite_datetime(value: Any) -> int | None:
"""Store a datetime as INTEGER microseconds since the Unix epoch (UTC).
Used for columns the caller marks via ``datetime_columns``: 8 bytes as an
INTEGER instead of a ~28-byte ISO ``TEXT`` string, and integer comparison on
the change column instead of string collation. ``None`` passes through; a
naive datetime is treated as UTC. A non-datetime value is parsed from its ISO
string form (so ``date``/ISO-``str`` inputs work too); anything unparseable
becomes ``None``.
"""
if value is None:
return None
if isinstance(value, datetime.datetime):
if value.tzinfo is None:
value = value.replace(tzinfo=datetime.timezone.utc)
delta = value - _EPOCH # exact integer arithmetic (no float rounding)
return delta.days * 86_400_000_000 + delta.seconds * 1_000_000 + delta.microseconds
try:
return to_sqlite_datetime(datetime.datetime.fromisoformat(str(value)))
except (TypeError, ValueError):
return None
def from_sqlite_datetime(value: Any) -> Any:
"""Inverse of :func:`to_sqlite_datetime`: INTEGER µs-since-epoch → UTC datetime.
Non-integers (a ``NULL`` value, or a column that isn't datetime-typed) pass
through unchanged.
"""
if isinstance(value, bool) or not isinstance(value, int):
return value
return _EPOCH + datetime.timedelta(microseconds=value)
def coerce_row(row: tuple) -> tuple: def coerce_row(row: tuple) -> tuple:
return tuple(to_sqlite(v) for v in row) return tuple(to_sqlite(v) for v in row)
def coerce_params(params: Params) -> tuple | dict | None: def _coerce_param(value: Any, dt_table: bool) -> Any:
"""Coerce a single query parameter.
When the query touches a table that stores datetime columns as INTEGER µs
(*dt_table*), a datetime object or an ISO-datetime string is converted to
epoch µs so a ``WHERE`` comparison matches the stored INTEGER instead of
comparing INTEGER against TEXT (which SQLite affinity makes always false).
Otherwise the default stringifying coercion applies, unchanged.
"""
if dt_table and (
isinstance(value, datetime.datetime)
or (isinstance(value, str) and _ISO_DATETIME_RE.match(value))
):
result = to_sqlite_datetime(value)
if result is not None:
return result
return to_sqlite(value)
def coerce_params(params: Params, dt_table: bool = False) -> tuple | dict | None:
if params is None: if params is None:
return None return None
if isinstance(params, dict): if isinstance(params, dict):
return {key: to_sqlite(val) for key, val in params.items()} return {key: _coerce_param(val, dt_table) for key, val in params.items()}
return tuple(to_sqlite(val) for val in params) return tuple(_coerce_param(val, dt_table) for val in params)
def reverse_coerce_rows(
rows: list[tuple], col_names: list[str], dt_cols: set[str]
) -> list[tuple]:
"""Turn INTEGER µs back into ``datetime`` for result columns in *dt_cols*.
A no-op when no result column is a datetime column, so non-datetime queries
pay nothing.
"""
if not dt_cols:
return rows
dt_idx = {i for i, c in enumerate(col_names) if c in dt_cols}
if not dt_idx:
return rows
return [
tuple(from_sqlite_datetime(v) if i in dt_idx else v for i, v in enumerate(row))
for row in rows
]
+257 -35
View File
@@ -2,6 +2,7 @@ import atexit
import signal import signal
import sqlite3 import sqlite3
import threading import threading
from collections.abc import Callable
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
@@ -9,12 +10,18 @@ from pathlib import Path
from loguru import logger from loguru import logger
import sqlmem._meta as _meta import sqlmem._meta as _meta
from ._coerce import coerce_params, coerce_row from ._coerce import (
coerce_params,
coerce_row,
reverse_coerce_rows,
to_sqlite,
to_sqlite_datetime,
)
from ._sql import quote, quote_list, quote_source from ._sql import quote, quote_list, quote_source
from .config import FETCH_BATCH_SIZE, SQL_DIALECT from .config import FETCH_BATCH_SIZE, SQL_DIALECT
from .stats import TableState from .stats import TableState
SCHEMA_VERSION = 3 SCHEMA_VERSION = 4
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -40,12 +47,19 @@ class CacheManager:
in_memory: bool = True, in_memory: bool = True,
dialect: str = SQL_DIALECT, dialect: str = SQL_DIALECT,
fetch_batch: int = FETCH_BATCH_SIZE, fetch_batch: int = FETCH_BATCH_SIZE,
pragmas: dict[str, str | int] | None = None,
datetime_columns: dict[str, list[str]] | None = None,
return_datetime: bool = True,
) -> None: ) -> None:
self._db_path = db_path self._db_path = db_path
self._backup_interval = backup_interval self._backup_interval = backup_interval
self._in_memory = in_memory self._in_memory = in_memory
self._dialect = dialect # source-DB dialect, for identifier quoting self._dialect = dialect # source-DB dialect, for identifier quoting
self._fetch_batch = fetch_batch # rows fetched per source batch self._fetch_batch = fetch_batch # rows fetched per source batch
self._pragmas = dict(pragmas or {}) # extra read/layout PRAGMAs (disk mode)
# table → columns stored as INTEGER µs-since-epoch instead of ISO TEXT
self._datetime_columns = {t: list(c) for t, c in (datetime_columns or {}).items()}
self._return_datetime = return_datetime # reverse-coerce reads back to datetime
self._lock = threading.Lock() # serializes connection access self._lock = threading.Lock() # serializes connection access
self._load_lock = threading.Lock() # serializes full table loads self._load_lock = threading.Lock() # serializes full table loads
self._states: dict[str, str] = {} # table → live processing state self._states: dict[str, str] = {} # table → live processing state
@@ -59,12 +73,12 @@ class CacheManager:
if in_memory: if in_memory:
self._conn = sqlite3.connect(":memory:", check_same_thread=False) self._conn = sqlite3.connect(":memory:", check_same_thread=False)
self._apply_pragmas(self._conn)
else: else:
# Disk-backed: query the on-disk file directly — no RAM copy, every # Disk-backed: query the on-disk file directly — no RAM copy, every
# write persists immediately, and the cache can exceed available RAM. # write persists immediately, and the cache can exceed available RAM.
self._conn = sqlite3.connect(str(db_path), check_same_thread=False) db_existed = db_path.exists() and db_path.stat().st_size > 0
self._conn.execute("PRAGMA journal_mode=WAL") self._conn = self._open_disk_connection(db_existed)
self._conn.execute("PRAGMA synchronous=NORMAL")
self._discard_if_schema_mismatch() self._discard_if_schema_mismatch()
self._ensure_meta_tables() self._ensure_meta_tables()
@@ -83,6 +97,54 @@ class CacheManager:
def connection(self) -> sqlite3.Connection: def connection(self) -> sqlite3.Connection:
return self._conn return self._conn
def _open_disk_connection(self, db_existed: bool) -> sqlite3.Connection:
"""Open the on-disk cache connection with WAL + the configured pragmas.
``page_size`` and ``auto_vacuum`` are layout pragmas that only take
effect on a *fresh* file (before the first table exists), so they are
applied conditionally on ``db_existed``; everything else is applied
unconditionally. Used by both ``__init__`` and :meth:`hard_reset`.
"""
conn = sqlite3.connect(str(self._db_path), check_same_thread=False)
# page_size must be set before WAL/the first table on a brand-new file;
# on an existing file it is silently ignored until the next VACUUM.
if "page_size" in self._pragmas:
wanted = int(self._pragmas["page_size"])
if db_existed:
actual = conn.execute("PRAGMA page_size").fetchone()[0]
if actual != wanted:
logger.warning(
f"page_size={wanted} requested but the cache file already "
f"exists with page_size={actual}; the new value takes "
"effect only after the cache is wiped (hard_reset()) or "
"rebuilt from scratch."
)
else:
conn.execute(f"PRAGMA page_size = {wanted}")
# auto_vacuum must be set before the database header is materialized,
# i.e. before switching to WAL (which writes the header) — otherwise the
# value silently reverts to 0/NONE and only a full VACUUM could apply it.
if not db_existed and "auto_vacuum" in self._pragmas:
conn.execute(f"PRAGMA auto_vacuum = {self._pragmas['auto_vacuum']}")
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
self._apply_pragmas(conn, exclude={"page_size", "auto_vacuum"})
return conn
def _apply_pragmas(
self, conn: sqlite3.Connection, exclude: set[str] | None = None
) -> None:
"""Apply the user-supplied PRAGMAs to *conn*, skipping *exclude*.
SQLite silently ignores unknown or inapplicable pragmas, so a bad value
degrades gracefully (e.g. mmap unsupported) rather than crashing startup.
"""
skip = exclude or set()
for key, value in self._pragmas.items():
if key in skip:
continue
conn.execute(f"PRAGMA {key} = {value}")
def _ensure_meta_tables(self) -> None: def _ensure_meta_tables(self) -> None:
self._conn.executescript(""" self._conn.executescript("""
CREATE TABLE IF NOT EXISTS _sqlmem_meta ( CREATE TABLE IF NOT EXISTS _sqlmem_meta (
@@ -249,23 +311,26 @@ class CacheManager:
return dict(self._last_run) return dict(self._last_run)
def is_table_cached(self, table: str) -> bool: def is_table_cached(self, table: str) -> bool:
row = self._conn.execute( with self._lock: # the shared _conn must not be read while a writer uses it
"SELECT 1 FROM _sqlmem_tables WHERE table_name = ?", (table,) row = self._conn.execute(
).fetchone() "SELECT 1 FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
return row is not None return row is not None
def is_table_full(self, table: str) -> bool: def is_table_full(self, table: str) -> bool:
"""True if the whole table (all columns) is cached — a SELECT * cache hit.""" """True if the whole table (all columns) is cached — a SELECT * cache hit."""
row = self._conn.execute( with self._lock:
"SELECT is_full FROM _sqlmem_tables WHERE table_name = ?", (table,) row = self._conn.execute(
).fetchone() "SELECT is_full FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
return bool(row and row[0]) return bool(row and row[0])
def seconds_since_refresh(self, table: str) -> float | None: def seconds_since_refresh(self, table: str) -> float | None:
"""Age of a cached table in seconds, or None if it is not cached.""" """Age of a cached table in seconds, or None if it is not cached."""
row = self._conn.execute( with self._lock:
"SELECT last_refresh_at FROM _sqlmem_tables WHERE table_name = ?", (table,) row = self._conn.execute(
).fetchone() "SELECT last_refresh_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
if not row or not row[0]: if not row or not row[0]:
return None return None
last = datetime.fromisoformat(row[0]) last = datetime.fromisoformat(row[0])
@@ -337,12 +402,34 @@ class CacheManager:
self._conn.commit() self._conn.commit()
logger.debug(f"Index {idx.name!r} ready on {table} ({cols})") logger.debug(f"Index {idx.name!r} ready on {table} ({cols})")
def _row_coercer(self, table: str, columns: list[str]):
"""Return a per-row coercer for *columns* in source order.
Columns registered in ``datetime_columns`` for *table* are coerced to
INTEGER µs-since-epoch (``to_sqlite_datetime``); everything else keeps the
default stringifying coercion (``to_sqlite``). With no datetime columns it
is the plain :func:`coerce_row`, so the common path is unchanged.
"""
dt_cols = set(self._datetime_columns.get(table, ()))
dt_idx = {i for i, c in enumerate(columns) if c in dt_cols}
if not dt_idx:
return coerce_row
def coerce(row: tuple) -> tuple:
return tuple(
to_sqlite_datetime(v) if i in dt_idx else to_sqlite(v)
for i, v in enumerate(row)
)
return coerce
def load_table( def load_table(
self, self,
table: str, table: str,
columns: list[str], columns: list[str],
source_conn: sqlite3.Connection, source_conn: sqlite3.Connection,
full: bool = False, full: bool = False,
recheck: Callable[[], bool] | None = None,
) -> None: ) -> None:
"""Stream the source table into the cache in batches. """Stream the source table into the cache in batches.
@@ -351,15 +438,32 @@ class CacheManager:
``fetchall`` of a huge table) and readers keep seeing the previous copy ``fetchall`` of a huge table) and readers keep seeing the previous copy
until the swap. Concurrent loads are serialized by ``_load_lock``; the until the swap. Concurrent loads are serialized by ``_load_lock``; the
connection lock is only held for the brief per-batch inserts and the swap. connection lock is only held for the brief per-batch inserts and the swap.
*recheck* implements double-checked locking against a cache stampede: the
decision to load is made by the caller *before* ``_load_lock`` is held, so
on a slow cold load a second request for the same table can queue behind
the lock and then redundantly reload it. If given, ``recheck()`` is
re-evaluated *after* the lock is acquired; when it returns ``True`` the
table is already loaded and fresh, so the load is skipped.
""" """
src_cols = ", ".join(quote_source(c, self._dialect) for c in columns) src_cols = ", ".join(quote_source(c, self._dialect) for c in columns)
col_defs = ", ".join(f"{quote(c)} TEXT" for c in columns) dt_cols = set(self._datetime_columns.get(table, ()))
col_defs = ", ".join(
f"{quote(c)} {'INTEGER' if c in dt_cols else 'TEXT'}" for c in columns
)
coerce = self._row_coercer(table, columns)
placeholders = ", ".join("?" * len(columns)) placeholders = ", ".join("?" * len(columns))
staging = f"{table}__sqlmem_load" staging = f"{table}__sqlmem_load"
q_staging = quote(staging) q_staging = quote(staging)
q_table = quote(table) q_table = quote(table)
with self._load_lock: with self._load_lock:
if recheck is not None and recheck():
logger.info(
f"Skipping load of {table!r}: a concurrent loader already "
"satisfied it (double-checked lock)."
)
return
self.set_state(table, TableState.LOADING) self.set_state(table, TableState.LOADING)
logger.info(f"Fetching {table!r} columns {columns} from source DB (batch={self._fetch_batch})") logger.info(f"Fetching {table!r} columns {columns} from source DB (batch={self._fetch_batch})")
try: try:
@@ -377,7 +481,7 @@ class CacheManager:
batch = cursor.fetchmany(self._fetch_batch) # network outside _lock batch = cursor.fetchmany(self._fetch_batch) # network outside _lock
if not batch: if not batch:
break break
clean = [coerce_row(row) for row in batch] clean = [coerce(row) for row in batch]
with self._lock: with self._lock:
self._conn.executemany(insert_sql, clean) self._conn.executemany(insert_sql, clean)
self._conn.commit() self._conn.commit()
@@ -420,16 +524,38 @@ class CacheManager:
self._read_conns.append(conn) self._read_conns.append(conn)
return conn return conn
def _query_datetime_cols(self, tables: list[str] | None) -> set[str]:
"""Datetime columns (stored as INTEGER µs) belonging to *tables*.
Empty when no table is known/configured, so a query that touches no
datetime column pays nothing and behaves exactly as before.
"""
if not self._datetime_columns or not tables:
return set()
cols: set[str] = set()
for table in tables:
cols.update(self._datetime_columns.get(table, ()))
return cols
def execute_in_memory( def execute_in_memory(
self, sql: str, params: tuple | list | dict | None = None self,
sql: str,
params: tuple | list | dict | None = None,
tables: list[str] | None = None,
) -> tuple[list[str], list[tuple]]: ) -> tuple[list[str], list[tuple]]:
"""Run a read query against the cache. """Run a read query against the cache.
In-memory mode serializes with writers on the single connection. Disk mode In-memory mode serializes with writers on the single connection. Disk mode
reads from a per-thread WAL connection, so reads run concurrently with reads from a per-thread WAL connection, so reads run concurrently with
writers and each other (see :meth:`_read_conn`). writers and each other (see :meth:`_read_conn`).
When *tables* names a table with ``datetime_columns``, ISO/datetime query
params are coerced to epoch µs so a ``WHERE`` matches the stored INTEGER,
and (unless ``return_datetime=False``) those columns are returned as real
:class:`~datetime.datetime` objects rather than raw integers.
""" """
bound = coerce_params(params) dt_cols = self._query_datetime_cols(tables)
bound = coerce_params(params, dt_table=bool(dt_cols))
if self._in_memory: if self._in_memory:
with self._lock: with self._lock:
cursor = ( cursor = (
@@ -439,19 +565,22 @@ class CacheManager:
) )
col_names = [desc[0] for desc in cursor.description] col_names = [desc[0] for desc in cursor.description]
rows = cursor.fetchall() rows = cursor.fetchall()
return col_names, rows else:
conn = self._read_conn()
cursor = conn.execute(sql) if bound is None else conn.execute(sql, bound)
col_names = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
conn = self._read_conn() if self._return_datetime and dt_cols:
cursor = conn.execute(sql) if bound is None else conn.execute(sql, bound) rows = reverse_coerce_rows(rows, col_names, dt_cols)
col_names = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
return col_names, rows return col_names, rows
# --- delta refresh support --------------------------------------------- # --- delta refresh support ---------------------------------------------
def get_table_columns(self, table: str) -> list[str]: def get_table_columns(self, table: str) -> list[str]:
"""Authoritative ordered column list of a cached table (via PRAGMA).""" """Authoritative ordered column list of a cached table (via PRAGMA)."""
rows = self._conn.execute(f"PRAGMA table_info({quote(table)})").fetchall() with self._lock:
rows = self._conn.execute(f"PRAGMA table_info({quote(table)})").fetchall()
return [r[1] for r in rows] return [r[1] for r in rows]
def create_unique_index(self, table: str, key_columns: list[str]) -> None: def create_unique_index(self, table: str, key_columns: list[str]) -> None:
@@ -465,12 +594,15 @@ class CacheManager:
self._conn.commit() self._conn.commit()
def get_last_synced_at(self, table: str) -> str | None: def get_last_synced_at(self, table: str) -> str | None:
row = self._conn.execute( with self._lock:
"SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,) row = self._conn.execute(
).fetchone() "SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
# Stored in a TEXT column: an INTEGER-µs watermark (datetime_columns) comes
# back as its digit string; delta._bind_watermark reconstructs the datetime.
return row[0] if row else None return row[0] if row else None
def set_last_synced_at(self, table: str, value: str | None) -> None: def set_last_synced_at(self, table: str, value: str | int | None) -> None:
with self._lock: with self._lock:
self._conn.execute( self._conn.execute(
"UPDATE _sqlmem_tables SET last_synced_at = ? WHERE table_name = ?", "UPDATE _sqlmem_tables SET last_synced_at = ? WHERE table_name = ?",
@@ -478,18 +610,23 @@ class CacheManager:
) )
self._conn.commit() self._conn.commit()
def max_value(self, table: str, column: str) -> str | None: def max_value(self, table: str, column: str) -> str | int | None:
"""Maximum value of *column* across cached rows (the delta watermark).""" """Maximum value of *column* across cached rows (the delta watermark).
row = self._conn.execute(
f"SELECT MAX({quote(column)}) FROM {quote(table)}" Returns an ``int`` for a datetime column stored as INTEGER µs, else the
).fetchone() ISO ``TEXT`` string."""
with self._lock:
row = self._conn.execute(
f"SELECT MAX({quote(column)}) FROM {quote(table)}"
).fetchone()
return row[0] if row else None return row[0] if row else None
def upsert_rows(self, table: str, columns: list[str], rows: list[tuple]) -> None: def upsert_rows(self, table: str, columns: list[str], rows: list[tuple]) -> None:
"""Insert-or-replace one batch of *rows* by the table's unique key.""" """Insert-or-replace one batch of *rows* by the table's unique key."""
col_list = quote_list(columns) col_list = quote_list(columns)
placeholders = ", ".join("?" * len(columns)) placeholders = ", ".join("?" * len(columns))
clean_rows = [coerce_row(row) for row in rows] coerce = self._row_coercer(table, columns)
clean_rows = [coerce(row) for row in rows]
with self._lock: with self._lock:
self._conn.executemany( self._conn.executemany(
f"INSERT OR REPLACE INTO {quote(table)} ({col_list}) VALUES ({placeholders})", f"INSERT OR REPLACE INTO {quote(table)} ({col_list}) VALUES ({placeholders})",
@@ -498,9 +635,19 @@ class CacheManager:
self._conn.commit() self._conn.commit()
def count_rows(self, table: str) -> int: def count_rows(self, table: str) -> int:
row = self._conn.execute(f"SELECT COUNT(*) FROM {quote(table)}").fetchone() with self._lock:
row = self._conn.execute(f"SELECT COUNT(*) FROM {quote(table)}").fetchone()
return int(row[0]) if row else 0 return int(row[0]) if row else 0
def db_size_bytes(self) -> int:
"""On-disk size of the cache file in bytes (0 in memory mode / if absent)."""
if self._in_memory:
return 0
try:
return self._db_path.stat().st_size
except OSError:
return 0
def reset(self) -> None: def reset(self) -> None:
"""Wipe the entire cache — every cached table plus the on-disk data """Wipe the entire cache — every cached table plus the on-disk data
(the file is deleted in memory mode, VACUUMed in place in disk mode).""" (the file is deleted in memory mode, VACUUMed in place in disk mode)."""
@@ -536,6 +683,81 @@ class CacheManager:
except sqlite3.Error as e: except sqlite3.Error as e:
logger.error(f"Failed to VACUUM cache file {self._db_path}: {e}") logger.error(f"Failed to VACUUM cache file {self._db_path}: {e}")
def hard_reset(self) -> None:
"""Delete the on-disk cache file and reopen it from scratch.
Unlike :meth:`reset` (which drops tables but keeps the open file, so the
baked-in ``page_size``/``auto_vacuum`` cannot change), this closes every
connection, removes the file plus its WAL/SHM sidecars, and reopens with
all current pragmas applied — so layout pragmas take effect on the fresh
file. Disk mode only; in memory mode it falls back to :meth:`reset`.
Any read in flight on another thread will see its connection closed from
under it; treat this as a maintenance operation.
"""
if self._in_memory:
self.reset()
return
logger.info(f"Hard reset: closing connections and deleting {self._db_path}")
with self._lock:
for conn in self._read_conns:
try:
conn.close()
except sqlite3.Error:
pass
self._read_conns.clear()
self._read_local = threading.local() # force every thread to reopen
self._conn.close()
for suffix in ("", "-wal", "-shm"):
p = Path(str(self._db_path) + suffix)
if p.exists():
p.unlink()
# Reopen fresh — page_size/auto_vacuum apply to the new empty file.
self._conn = self._open_disk_connection(db_existed=False)
self._ensure_meta_tables()
self._states.clear()
self._errors.clear()
self._last_run.clear()
self._error_total = 0
logger.info(f"Hard reset complete — cache recreated at {self._db_path}.")
def vacuum(self, incremental: bool = True, pages: int = 10_000) -> None:
"""Run maintenance VACUUM on the on-disk cache (no-op in memory mode).
``incremental=True`` (default) reclaims up to *pages* free pages without
blocking readers or needing extra disk space — but requires the cache to
have been created with ``auto_vacuum=INCREMENTAL`` (otherwise it is a
no-op). ``incremental=False`` runs a full ``VACUUM``: it rewrites the
whole file (needs ~2× disk space, blocks readers) — use only in a
maintenance window.
"""
if self._in_memory:
logger.debug("vacuum() called in memory mode — no-op.")
return
if incremental:
av = self._conn.execute("PRAGMA auto_vacuum").fetchone()[0]
if av != 2: # 0 = NONE, 1 = FULL, 2 = INCREMENTAL
logger.warning(
f"vacuum(incremental=True) called but auto_vacuum={av} (not "
"INCREMENTAL) — no pages will be reclaimed. Rebuild the cache "
"with pragmas={'auto_vacuum': 'INCREMENTAL'} via hard_reset(), "
"or run vacuum(incremental=False) for a full VACUUM."
)
return
with self._lock:
self._conn.execute(f"PRAGMA incremental_vacuum({pages})")
self._conn.commit()
logger.info(f"Incremental vacuum: reclaimed up to {pages} pages.")
else:
logger.info("Full VACUUM started — this may take several minutes.")
with self._lock:
self._conn.execute("VACUUM")
logger.info("Full VACUUM complete.")
def close(self) -> None: def close(self) -> None:
self._backup_to_disk() self._backup_to_disk()
self._closed = True self._closed = True
+18 -5
View File
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime, timedelta, timezone
from typing import Any from typing import Any
from loguru import logger from loguru import logger
@@ -8,8 +8,10 @@ from ._sql import quote_source
from .cache import CacheManager from .cache import CacheManager
from .stats import TableState from .stats import TableState
_EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc)
def _bind_watermark(watermark: str) -> datetime | str:
def _bind_watermark(watermark: str | int, epoch_us: bool = False) -> datetime | str:
"""Bind the delta watermark back to the source in its native type. """Bind the delta watermark back to the source in its native type.
The cache stores the change column as an ISO ``TEXT`` string (see The cache stores the change column as an ISO ``TEXT`` string (see
@@ -22,11 +24,21 @@ def _bind_watermark(watermark: str) -> datetime | str:
driver send a typed timestamp, so the comparison happens natively with no driver send a typed timestamp, so the comparison happens natively with no
string conversion. Non-datetime change columns (e.g. an integer rowversion) string conversion. Non-datetime change columns (e.g. an integer rowversion)
don't parse and are passed through unchanged. don't parse and are passed through unchanged.
When the change column is stored as INTEGER µs-since-epoch (``datetime_columns``)
*epoch_us* is set: the watermark is a microsecond count (an ``int`` or its digit
string, since it round-trips through a TEXT column) and is reconstructed into a
UTC :class:`~datetime.datetime` so the source still receives a typed timestamp.
""" """
if epoch_us:
try:
return _EPOCH + timedelta(microseconds=int(watermark))
except (TypeError, ValueError):
return watermark if isinstance(watermark, str) else str(watermark)
try: try:
return datetime.fromisoformat(watermark) return datetime.fromisoformat(watermark) # type: ignore[arg-type]
except (TypeError, ValueError): except (TypeError, ValueError):
return watermark return watermark if isinstance(watermark, str) else str(watermark)
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -92,9 +104,10 @@ class DeltaRefresher:
cursor = source_conn.execute(f"SELECT {col_list} FROM {q_table}") cursor = source_conn.execute(f"SELECT {col_list} FROM {q_table}")
else: else:
change_col = quote_source(cfg.change_column, dialect) change_col = quote_source(cfg.change_column, dialect)
epoch_us = cfg.change_column in self._cache._datetime_columns.get(table, ())
cursor = source_conn.execute( cursor = source_conn.execute(
f"SELECT {col_list} FROM {q_table} WHERE {change_col} >= ?", f"SELECT {col_list} FROM {q_table} WHERE {change_col} >= ?",
(_bind_watermark(watermark),), (_bind_watermark(watermark, epoch_us),),
) )
# Stream the delta in batches so a large catch-up never materializes at once. # Stream the delta in batches so a large catch-up never materializes at once.
+151 -17
View File
@@ -18,12 +18,50 @@ from .config import (
SQL_DIALECT, SQL_DIALECT,
) )
from .delta import DeltaConfig, DeltaRefresher, ResolvedDelta from .delta import DeltaConfig, DeltaRefresher, ResolvedDelta
from .exceptions import UndeclaredError
from .executor import QueryExecutor from .executor import QueryExecutor
from .parser import Params, parse from .parser import Params, ParsedQuery, parse
from .registry import ColumnRegistry from .registry import ColumnRegistry
from .spec import TTL, TableSpec
from .stats import Stats, StatsCollector, TableState, TableStats from .stats import Stats, StatsCollector, TableState, TableStats
def _specs_to_config(
tables: list[TableSpec],
) -> tuple[
dict[str, DeltaConfig],
dict[str, int],
dict[str, list[str | list[str]]],
dict[str, list[str]],
dict[str, list[str] | None],
]:
"""Convert declarative ``TableSpec``s into the engine's internal config dicts.
Returns ``(delta, ttl, indexes, datetime_columns, declared)`` — the first four
mirror the legacy kwargs; ``declared`` maps each table to its allowed columns
(``None`` = whole table / any column) for fail-fast query checking.
"""
delta: dict[str, DeltaConfig] = {}
ttl: dict[str, int] = {}
indexes: dict[str, list[str | list[str]]] = {}
datetime_columns: dict[str, list[str]] = {}
declared: dict[str, list[str] | None] = {}
for spec in tables:
if spec.name in declared:
raise ValueError(f"Duplicate TableSpec for table {spec.name!r}.")
declared[spec.name] = list(spec.columns) if spec.columns is not None else None
if spec.indexes:
indexes[spec.name] = list(spec.indexes)
if spec.datetime_columns:
datetime_columns[spec.name] = list(spec.datetime_columns)
refresh = spec.refresh
if isinstance(refresh, TTL):
ttl[spec.name] = refresh.seconds
elif isinstance(refresh, DeltaConfig):
delta[spec.name] = refresh
return delta, ttl, indexes, datetime_columns, declared
class _LazySource: class _LazySource:
"""A source connection opened on first ``execute`` and shared across one query. """A source connection opened on first ``execute`` and shared across one query.
@@ -65,9 +103,28 @@ class CachingEngine:
refresh_interval: int | None = None, refresh_interval: int | None = None,
fetch_batch: int | None = None, fetch_batch: int | None = None,
dialect: str | None = None, dialect: str | None = None,
pragmas: dict[str, str | int] | None = None,
datetime_columns: dict[str, list[str]] | None = None,
return_datetime: bool = True,
tables: list[TableSpec] | None = None,
blocking_startup_refresh: bool = False, blocking_startup_refresh: bool = False,
) -> None: ) -> None:
self._source_engine = source_engine self._source_engine = source_engine
# Declarative mode: a list of TableSpecs is converted to the same internal
# config the legacy delta=/ttl=/indexes=/datetime_columns= kwargs produce,
# plus a declared-columns allowlist (for fail-fast) and preload set.
self._declared: dict[str, list[str] | None] | None = None
self._preload_specs: list[TableSpec] = []
if tables is not None:
if any(x is not None for x in (delta, ttl, indexes, datetime_columns)):
raise ValueError(
"Pass either tables=[TableSpec(...)] or the legacy "
"delta=/ttl=/indexes=/datetime_columns= kwargs, not both."
)
delta, ttl, indexes, datetime_columns, self._declared = _specs_to_config(tables)
self._preload_specs = [s for s in tables if s.preload]
use_memory = IN_MEMORY if in_memory is None else in_memory use_memory = IN_MEMORY if in_memory is None else in_memory
self._dialect = dialect if dialect is not None else SQL_DIALECT self._dialect = dialect if dialect is not None else SQL_DIALECT
self._refresh_interval = ( self._refresh_interval = (
@@ -79,6 +136,9 @@ class CachingEngine:
in_memory=use_memory, in_memory=use_memory,
dialect=self._dialect, dialect=self._dialect,
fetch_batch=fetch_batch if fetch_batch is not None else FETCH_BATCH_SIZE, fetch_batch=fetch_batch if fetch_batch is not None else FETCH_BATCH_SIZE,
pragmas=pragmas,
datetime_columns=datetime_columns,
return_datetime=return_datetime,
) )
self._registry = ColumnRegistry(self._cache.connection) self._registry = ColumnRegistry(self._cache.connection)
self._stats = StatsCollector() self._stats = StatsCollector()
@@ -95,12 +155,14 @@ class CachingEngine:
"reload), not both." "reload), not both."
) )
if self._delta or self._ttl: if self._delta or self._ttl or self._preload_specs:
# The startup catch-up (deltas/TTL reloads for tables restored from # Startup work (preload of declared tables + delta/TTL catch-up for
# disk) can take a while on a cold start. By default it runs on the # tables restored from disk) can take a while on a cold start. By
# background thread so it never blocks application startup; callers # default it runs on the background thread so it never blocks
# who need the cache fully fresh before serving can opt back in. # application startup; callers who need the cache fully warm before
# serving can opt back in.
if blocking_startup_refresh: if blocking_startup_refresh:
self._preload()
self._run_refresh() self._run_refresh()
self._start_refresh_thread(initial_catch_up=not blocking_startup_refresh) self._start_refresh_thread(initial_catch_up=not blocking_startup_refresh)
@@ -148,7 +210,11 @@ class CachingEngine:
last_runs = self._cache.get_last_runs() last_runs = self._cache.get_last_runs()
with self._cache._lock: with self._cache._lock:
base = self._stats.snapshot(self._cache.connection, states) base = self._stats.snapshot(self._cache.connection, states)
base = replace(base, errors=self._cache.error_total) base = replace(
base,
errors=self._cache.error_total,
db_size_bytes=self._cache.db_size_bytes(),
)
return replace( return replace(
base, base,
tables={n: self._enrich(n, t, errors, last_runs) for n, t in base.tables.items()}, tables={n: self._enrich(n, t, errors, last_runs) for n, t in base.tables.items()},
@@ -189,22 +255,67 @@ class CachingEngine:
) )
return replace(table_stats, tracking=tracking, state=state, last_refresh=last_refresh) return replace(table_stats, tracking=tracking, state=state, last_refresh=last_refresh)
def _make_executor(self, source: Any) -> QueryExecutor:
return QueryExecutor(
self._cache,
self._registry,
source,
self._stats,
self._delta,
self._ttl,
self._index_columns,
)
def _check_declared(self, parsed: ParsedQuery) -> None:
"""In declarative mode, reject any table/column not declared up front."""
if self._declared is None:
return
for table in parsed.tables:
if table not in self._declared:
raise UndeclaredError(
f"Table {table!r} is not declared in tables=[TableSpec(...)]. "
"Add a TableSpec for it (declarative mode is a strict allowlist)."
)
allowed = self._declared[table]
if allowed is None:
continue # whole table declared — any column is fine
if table in parsed.wildcard_tables:
raise UndeclaredError(
f"SELECT * on {table!r} is not allowed: only columns {allowed} "
"are declared. List the columns explicitly or declare "
"columns=None for the whole table."
)
unknown = [c for c in parsed.columns_by_table.get(table, []) if c not in allowed]
if unknown:
raise UndeclaredError(
f"Column(s) {unknown} of {table!r} are not declared "
f"(declared: {allowed})."
)
def execute(self, sql: str, params: Params = None) -> list[dict]: def execute(self, sql: str, params: Params = None) -> list[dict]:
parsed = parse(sql, params, dialect=self._dialect) parsed = parse(sql, params, dialect=self._dialect)
self._check_declared(parsed)
# The source connection is opened lazily — a pure cache hit never touches # The source connection is opened lazily — a pure cache hit never touches
# the source and never occupies a pool slot. # the source and never occupies a pool slot.
source = _LazySource(self._source_engine) source = _LazySource(self._source_engine)
try: try:
executor = QueryExecutor( return self._make_executor(source).execute(parsed)
self._cache, finally:
self._registry, source.close()
source,
self._stats, def _preload(self) -> None:
self._delta, """Load declared ``preload=True`` tables into the cache (skipping fresh copies)."""
self._ttl, if not self._preload_specs:
self._index_columns, return
) source = _LazySource(self._source_engine)
return executor.execute(parsed) try:
executor = self._make_executor(source)
for spec in self._preload_specs:
try:
logger.info(f"Preloading {spec.name!r}")
executor.ensure_loaded(spec.name, spec.columns)
except Exception as e:
logger.error(f"Preload failed for {spec.name!r}: {e}")
finally: finally:
source.close() source.close()
@@ -240,6 +351,7 @@ class CachingEngine:
def _start_refresh_thread(self, initial_catch_up: bool = True) -> None: def _start_refresh_thread(self, initial_catch_up: bool = True) -> None:
def loop() -> None: def loop() -> None:
if initial_catch_up: if initial_catch_up:
self._preload() # off-main-thread declared-table preload
self._run_refresh() # off-main-thread startup catch-up self._run_refresh() # off-main-thread startup catch-up
event = threading.Event() event = threading.Event()
while not event.wait(self._refresh_interval): while not event.wait(self._refresh_interval):
@@ -267,6 +379,28 @@ class CachingEngine:
self._cache.reset() self._cache.reset()
logger.info("Cache reset — all tables will be reloaded on next use.") logger.info("Cache reset — all tables will be reloaded on next use.")
def hard_reset(self) -> None:
"""Delete the on-disk cache file and reopen with current pragmas/page_size.
Disk mode only (falls back to :meth:`reset` in memory mode). Use when a
layout pragma — ``page_size`` or ``auto_vacuum`` — must change, since
those are baked into the file at creation and :meth:`reset` keeps it.
All tables reload on next use.
"""
self._cache.hard_reset()
# hard_reset swaps the cache connection — re-point the registry at it.
self._registry.rebind(self._cache.connection)
logger.info("Cache hard reset — file recreated; all tables reload on next use.")
def vacuum(self, incremental: bool = True, pages: int = 10_000) -> None:
"""Run maintenance VACUUM on the on-disk cache (incremental by default).
Incremental reclaims free pages left by delta ``INSERT OR REPLACE`` churn
cheaply (requires ``auto_vacuum=INCREMENTAL``); a full VACUUM rewrites the
whole file and should run only in a maintenance window.
"""
self._cache.vacuum(incremental=incremental, pages=pages)
def close(self) -> None: def close(self) -> None:
self._cache.close() self._cache.close()
logger.info("CachingEngine closed.") logger.info("CachingEngine closed.")
+10
View File
@@ -4,3 +4,13 @@ class ReadOnlyError(Exception):
class UnsupportedQueryError(Exception): class UnsupportedQueryError(Exception):
"""Raised when a query uses unsupported features (JOIN, SELECT *).""" """Raised when a query uses unsupported features (JOIN, SELECT *)."""
class UndeclaredError(Exception):
"""Raised in declarative mode (``tables=[TableSpec(...)]``) when a query
references a table or column that was not declared up front.
Fail-fast by design: an undeclared table/column would otherwise trigger a
silent (potentially multi-hour) lazy load/column-expansion, so it is surfaced
immediately instead.
"""
+58 -6
View File
@@ -1,3 +1,4 @@
from collections.abc import Callable
from typing import Any from typing import Any
from loguru import logger from loguru import logger
@@ -41,12 +42,40 @@ class QueryExecutor:
self._ensure_table(table, parsed) self._ensure_table(table, parsed)
return self._run_in_memory(parsed) return self._run_in_memory(parsed)
def ensure_loaded(self, table: str, columns: list[str] | None) -> None:
"""Preload *table* into the cache without running a query.
``columns=None`` loads the whole table (``SELECT *`` semantics); otherwise
only the listed columns. Reuses the same load path as a real query — delta
key/change + index columns are augmented, the registry and watermark are
updated, and double-checked locking skips a copy already fresh in the
cache — but never materializes any rows (unlike :meth:`execute`).
"""
if columns is None:
self._ensure_full(table)
else:
self._ensure_columns(table, columns)
def _ensure_table(self, table: str, parsed: ParsedQuery) -> None: def _ensure_table(self, table: str, parsed: ParsedQuery) -> None:
if table in parsed.wildcard_tables: if table in parsed.wildcard_tables:
self._ensure_full(table) self._ensure_full(table)
else: else:
self._ensure_columns(table, parsed.columns_by_table[table]) self._ensure_columns(table, parsed.columns_by_table[table])
def _full_satisfied(self, table: str) -> bool:
"""True if *table* is cached in full and not TTL-expired (a SELECT * hit)."""
return (
self._cache.is_table_cached(table)
and self._cache.is_table_full(table)
and not self._ttl_expired(table)
)
def _columns_satisfied(self, table: str, columns: list[str]) -> bool:
"""True if *table* is cached with all *columns* present and not TTL-expired."""
if not self._cache.is_table_cached(table) or self._ttl_expired(table):
return False
return set(columns).issubset(self._cache.get_table_columns(table))
def _ensure_full(self, table: str) -> None: def _ensure_full(self, table: str) -> None:
"""Load every column of *table* (SELECT * / t.*), refetching unless already full.""" """Load every column of *table* (SELECT * / t.*), refetching unless already full."""
cached = self._cache.is_table_cached(table) cached = self._cache.is_table_cached(table)
@@ -67,7 +96,7 @@ class QueryExecutor:
self._stats.record_miss() self._stats.record_miss()
columns = self._cache.discover_columns(table, self._source_conn) columns = self._cache.discover_columns(table, self._source_conn)
self._load(table, columns, full=True) self._load(table, columns, full=True, satisfied=lambda cols: self._full_satisfied(table))
def _ensure_columns(self, table: str, columns: list[str]) -> None: def _ensure_columns(self, table: str, columns: list[str]) -> None:
"""Load *table* with at least *columns*, refetching on new columns or TTL expiry.""" """Load *table* with at least *columns*, refetching on new columns or TTL expiry."""
@@ -95,10 +124,27 @@ class QueryExecutor:
all_columns = list(self._registry.get_columns(table)) + missing all_columns = list(self._registry.get_columns(table)) + missing
# Preserve a fully-cached table's status across a TTL reload. # Preserve a fully-cached table's status across a TTL reload.
full = table_cached and self._cache.is_table_full(table) full = table_cached and self._cache.is_table_full(table)
self._load(table, all_columns, full=full) self._load(
table,
all_columns,
full=full,
satisfied=lambda cols: self._columns_satisfied(table, cols),
)
def _load(self, table: str, columns: list[str], full: bool) -> None: def _load(
"""Fetch *table* into cache, adding delta key/timestamp and index columns.""" self,
table: str,
columns: list[str],
full: bool,
satisfied: Callable[[list[str]], bool] | None = None,
) -> None:
"""Fetch *table* into cache, adding delta key/timestamp and index columns.
*satisfied* is the double-checked-locking predicate evaluated under the
load lock (see :meth:`CacheManager.load_table`); it is given the final,
augmented column list so a concurrent loader that already produced an
equivalent (or wider) cache is detected and the redundant reload skipped.
"""
cfg = self._delta.get(table) cfg = self._delta.get(table)
extra = list(self._index_columns.get(table, [])) extra = list(self._index_columns.get(table, []))
if cfg: if cfg:
@@ -108,7 +154,11 @@ class QueryExecutor:
if extra: if extra:
columns = list(dict.fromkeys([*columns, *extra])) columns = list(dict.fromkeys([*columns, *extra]))
self._cache.load_table(table, columns, self._source_conn, full=full) recheck: Callable[[], bool] | None = None
if satisfied is not None:
final_columns = columns
recheck = lambda: satisfied(final_columns) # noqa: E731
self._cache.load_table(table, columns, self._source_conn, full=full, recheck=recheck)
self._registry.update(table, columns) self._registry.update(table, columns)
if cfg: if cfg:
@@ -118,5 +168,7 @@ class QueryExecutor:
def _run_in_memory(self, parsed: ParsedQuery) -> list[dict]: def _run_in_memory(self, parsed: ParsedQuery) -> list[dict]:
logger.debug(f"Executing in SQLite RAM: {parsed.sqlite_sql!r} params={parsed.params!r}") logger.debug(f"Executing in SQLite RAM: {parsed.sqlite_sql!r} params={parsed.params!r}")
col_names, rows = self._cache.execute_in_memory(parsed.sqlite_sql, parsed.params) col_names, rows = self._cache.execute_in_memory(
parsed.sqlite_sql, parsed.params, parsed.tables
)
return [dict(zip(col_names, row)) for row in rows] return [dict(zip(col_names, row)) for row in rows]
+10
View File
@@ -12,6 +12,16 @@ class ColumnRegistry:
self._lock = Lock() self._lock = Lock()
self._ensure_table() self._ensure_table()
def rebind(self, mem_conn: sqlite3.Connection) -> None:
"""Point the registry at a new cache connection (after a hard reset).
``CacheManager.hard_reset`` closes and reopens the cache connection, so the
connection object the registry captured at construction becomes invalid.
"""
with self._lock:
self._conn = mem_conn
self._ensure_table()
def _ensure_table(self) -> None: def _ensure_table(self) -> None:
self._conn.execute(""" self._conn.execute("""
CREATE TABLE IF NOT EXISTS _sqlmem_columns ( CREATE TABLE IF NOT EXISTS _sqlmem_columns (
+49
View File
@@ -0,0 +1,49 @@
"""Declarative table specs for ``CachingEngine(tables=[...])``.
Instead of the lazy "learn columns from queries" mode, an application can declare
each table up front — its columns, indexes, refresh strategy and datetime columns —
so the engine preloads them and rejects anything undeclared (fail-fast) rather than
silently triggering an expensive lazy load. The legacy ``delta=/ttl=/indexes=``
kwargs keep working; ``tables=`` is converted to the same internal config.
"""
from dataclasses import dataclass, field
from .delta import DeltaConfig
# Friendly alias for the declarative API; ``Delta`` and ``DeltaConfig`` are the
# same type (``change_column`` + ``key_columns``), so either may be used as a
# ``TableSpec.refresh`` strategy.
Delta = DeltaConfig
@dataclass(frozen=True)
class TTL:
"""Time-based refresh strategy: full-reload the table when older than *seconds*."""
seconds: int
@dataclass(frozen=True)
class TableSpec:
"""Declarative specification of one cached table.
*columns* lists the columns to cache; leave it ``None`` to cache the whole
table (``SELECT *`` semantics) and allow any column. When columns are listed,
a query asking for a column outside the list raises
:class:`~sqlmem.exceptions.UndeclaredError`.
*refresh* is a :class:`Delta` (change-column incremental sync) or :class:`TTL`
(time-based full reload), or ``None`` for a static table loaded once.
*preload=True* loads the table at startup (in the background by default) so the
first query is a cache hit instead of paying a cold load; a copy already fresh
in the persistent cache is skipped.
"""
name: str
columns: list[str] | None = None
indexes: list[str | list[str]] = field(default_factory=list)
refresh: DeltaConfig | TTL | None = None
datetime_columns: list[str] = field(default_factory=list)
preload: bool = False
+1
View File
@@ -40,6 +40,7 @@ class Stats:
refetches: int refetches: int
tables: dict[str, TableStats] tables: dict[str, TableStats]
errors: int = 0 # total load/refresh failures since start errors: int = 0 # total load/refresh failures since start
db_size_bytes: int = 0 # on-disk cache file size (0 in memory mode)
class StatsCollector: class StatsCollector:
+206
View File
@@ -168,3 +168,209 @@ def test_disk_mode_reset_keeps_file(tmp_path, source_conn):
assert db_path.exists() assert db_path.exists()
assert c.is_table_cached("users") is False assert c.is_table_cached("users") is False
c.close() c.close()
# ---------------------------------------------------------------------------
# Pragmas / layout tuning (1.11.0)
# ---------------------------------------------------------------------------
def test_pragmas_applied_on_fresh_disk_cache(tmp_path):
"""page_size, auto_vacuum and a generic pragma all take effect on a new file."""
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
in_memory=False,
pragmas={"page_size": 8192, "auto_vacuum": "INCREMENTAL", "cache_size": -2000},
)
assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 8192
assert c.connection.execute("PRAGMA auto_vacuum").fetchone()[0] == 2 # INCREMENTAL
assert c.connection.execute("PRAGMA cache_size").fetchone()[0] == -2000
c.close()
def test_page_size_ignored_on_existing_file_warns(tmp_path):
"""A page_size that differs from the existing file is ignored, with a warning."""
db_path = tmp_path / "cache.db"
c1 = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False)
assert c1.connection.execute("PRAGMA page_size").fetchone()[0] == 4096 # default
c1.close()
c2 = CacheManager(
db_path=db_path,
backup_interval=9999,
in_memory=False,
pragmas={"page_size": 16384},
)
# File keeps its original page size; the request is ignored (not an error).
assert c2.connection.execute("PRAGMA page_size").fetchone()[0] == 4096
c2.close()
def test_unknown_pragma_does_not_crash(tmp_path):
"""SQLite ignores unknown/inapplicable pragmas — startup must not fail."""
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
in_memory=False,
pragmas={"this_is_not_a_pragma": 1, "mmap_size": 1024 * 1024},
)
assert c.connection.execute("PRAGMA mmap_size").fetchone()[0] == 1024 * 1024
c.close()
# ---------------------------------------------------------------------------
# hard_reset / vacuum (1.11.0)
# ---------------------------------------------------------------------------
def test_hard_reset_recreates_file_and_clears_tables(tmp_path, source_conn):
db_path = tmp_path / "cache.db"
c = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False)
c.load_table("users", ["name"], source_conn)
assert c.is_table_cached("users") is True
c.hard_reset()
assert db_path.exists() # reopened fresh
assert c.is_table_cached("users") is False
# The connection is usable again after the swap.
c.load_table("users", ["name"], source_conn)
assert c.is_table_cached("users") is True
c.close()
def test_hard_reset_applies_new_page_size(tmp_path, source_conn):
"""page_size can't change via reset() but does via hard_reset() (fresh file)."""
db_path = tmp_path / "cache.db"
# Existing file at the default 4096; request 8192 — ignored on open.
CacheManager(db_path=db_path, backup_interval=9999, in_memory=False).close()
c = CacheManager(
db_path=db_path,
backup_interval=9999,
in_memory=False,
pragmas={"page_size": 8192},
)
assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 4096
c.hard_reset() # deletes the file → recreated with the requested page size
assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 8192
c.close()
def test_hard_reset_in_memory_falls_back_to_reset(tmp_path, source_conn):
c = CacheManager(db_path=tmp_path / "cache.db", backup_interval=9999)
c.load_table("users", ["name"], source_conn)
c.hard_reset() # memory mode → reset()
assert c.is_table_cached("users") is False
c.close()
def test_full_vacuum_runs_on_disk(tmp_path, source_conn):
db_path = tmp_path / "cache.db"
c = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False)
c.load_table("users", ["name"], source_conn)
c.vacuum(incremental=False) # must not raise
assert c.is_table_cached("users") is True
c.close()
def test_incremental_vacuum_runs_with_auto_vacuum(tmp_path, source_conn):
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
in_memory=False,
pragmas={"auto_vacuum": "INCREMENTAL"},
)
c.load_table("users", ["name"], source_conn)
c.vacuum(incremental=True, pages=100) # must not raise
assert c.is_table_cached("users") is True
c.close()
def test_vacuum_in_memory_is_noop(cache, source_conn):
cache.load_table("users", ["name"], source_conn)
cache.vacuum(incremental=False) # no-op, no error
assert cache.is_table_cached("users") is True
# ---------------------------------------------------------------------------
# Double-checked locking against cache stampede (1.15.0)
# ---------------------------------------------------------------------------
class _ExplodingSource:
def execute(self, *args):
raise AssertionError("source must not be queried when recheck() is True")
def test_load_table_recheck_true_skips_load(cache, source_conn):
"""A recheck that reports the table already satisfied skips the reload."""
cache.load_table("users", ["name"], source_conn)
# Second load with recheck() → True must not touch the source at all.
cache.load_table("users", ["name"], _ExplodingSource(), recheck=lambda: True)
assert cache.is_table_cached("users") is True
def test_concurrent_loads_dedup_via_double_checked_lock(tmp_path):
"""A second loader queued behind a slow cold load must not reload the table."""
import time
c = CacheManager(db_path=tmp_path / "c.db", backup_interval=9999)
started = threading.Event()
release = threading.Event()
loads: list[str] = []
class _GatedCursor:
def __init__(self, rows):
self._rows = list(rows)
self._done = False
def fetchmany(self, n):
if self._done:
return []
self._done = True
return self._rows
class _GatedSource:
def execute(self, sql):
loads.append(sql) # one entry per *actual* source load
started.set()
release.wait(5) # hold the load open (and _load_lock) until released
return _GatedCursor([("alice",), ("bob",)])
def recheck() -> bool:
return c.is_table_cached("users") and "name" in c.get_table_columns("users")
def load() -> None:
c.load_table("users", ["name"], _GatedSource(), recheck=recheck)
a = threading.Thread(target=load)
b = threading.Thread(target=load)
a.start()
assert started.wait(5), "first load never started" # A holds _load_lock, mid-fetch
b.start()
time.sleep(0.2) # give B time to queue on _load_lock
release.set() # let A finish; B then re-checks and skips
a.join(5)
b.join(5)
assert not a.is_alive() and not b.is_alive()
assert len(loads) == 1 # the redundant second load was skipped
assert c.is_table_cached("users") is True
_, rows = c.execute_in_memory("SELECT name FROM users ORDER BY name")
assert [r[0] for r in rows] == ["alice", "bob"]
c.close()
def test_incremental_vacuum_warns_without_incremental_auto_vacuum(tmp_path, source_conn):
"""Incremental vacuum on a DB that isn't auto_vacuum=INCREMENTAL warns and skips."""
from loguru import logger
messages: list[str] = []
sink_id = logger.add(messages.append, level="WARNING", filter="sqlmem")
logger.enable("sqlmem")
try:
c = CacheManager(db_path=tmp_path / "c.db", backup_interval=9999, in_memory=False)
c.load_table("users", ["name"], source_conn)
c.vacuum(incremental=True) # auto_vacuum defaults to NONE → no-op + warning
c.close()
finally:
logger.remove(sink_id)
logger.disable("sqlmem")
assert any("auto_vacuum" in m for m in messages)
+205 -1
View File
@@ -4,9 +4,17 @@ import uuid
import pytest import pytest
from sqlmem._coerce import coerce_params, to_sqlite from sqlmem._coerce import (
coerce_params,
from_sqlite_datetime,
reverse_coerce_rows,
to_sqlite,
to_sqlite_datetime,
)
from sqlmem.cache import CacheManager from sqlmem.cache import CacheManager
_UTC = datetime.timezone.utc
class _FakeCursor: class _FakeCursor:
def __init__(self, rows): def __init__(self, rows):
@@ -91,6 +99,202 @@ def test_coerce_params_none():
assert coerce_params(None) is None assert coerce_params(None) is None
# --- to_sqlite_datetime (INTEGER µs storage, 1.12.0) ------------------------
def test_datetime_to_epoch_micros():
# 2026-06-01T10:00:00Z -> microseconds since epoch
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc)
expected = int(dt.timestamp() * 1_000_000)
assert to_sqlite_datetime(dt) == expected
def test_datetime_naive_treated_as_utc():
naive = datetime.datetime(2026, 6, 1, 10, 0, 0)
aware = naive.replace(tzinfo=datetime.timezone.utc)
assert to_sqlite_datetime(naive) == to_sqlite_datetime(aware)
def test_datetime_micros_are_exact():
dt = datetime.datetime(2026, 6, 5, 14, 54, 24, 823000, tzinfo=datetime.timezone.utc)
us = to_sqlite_datetime(dt)
# round-trips back to the same instant with no rounding loss
back = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(
microseconds=us
)
assert back == dt
def test_datetime_none_passes_through():
assert to_sqlite_datetime(None) is None
def test_datetime_iso_string_parsed():
assert to_sqlite_datetime("2026-06-01T10:00:00+00:00") == to_sqlite_datetime(
datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc)
)
def test_datetime_unparseable_is_none():
assert to_sqlite_datetime("not a date") is None
# --- integration: datetime_columns are stored as INTEGER --------------------
def test_datetime_column_stored_as_integer(tmp_path):
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
datetime_columns={"t": ["changed"]},
)
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc)
c.load_table("t", ["id", "changed"], FakeSource([("1", dt)]))
# Column declared INTEGER, value stored as µs-since-epoch.
coltype = c.connection.execute("PRAGMA table_info(t)").fetchall()
types = {row[1]: row[2] for row in coltype}
assert types["changed"] == "INTEGER"
assert types["id"] == "TEXT"
_, out = c.execute_in_memory("SELECT changed FROM t")
assert out == [(to_sqlite_datetime(dt),)]
c.close()
def test_non_datetime_columns_unaffected_by_datetime_columns(tmp_path):
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
datetime_columns={"t": ["changed"]},
)
c.load_table("t", ["id", "price"], FakeSource([("1", decimal.Decimal("9.99"))]))
_, out = c.execute_in_memory("SELECT id, price FROM t")
assert out == [("1", "9.99")] # still TEXT/ISO coercion
c.close()
# --- param coercion for datetime_columns (A) --------------------------------
def test_coerce_params_dt_table_iso_string_to_epoch():
p = coerce_params(("2026-06-01T10:00:00",), dt_table=True)
assert p == (to_sqlite_datetime("2026-06-01T10:00:00"),)
def test_coerce_params_dt_table_datetime_to_epoch():
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
assert coerce_params((dt,), dt_table=True) == (to_sqlite_datetime(dt),)
def test_coerce_params_dt_table_false_keeps_iso_string():
# No datetime table in the query → behaviour unchanged (string stays a string).
assert coerce_params(("2026-06-01T10:00:00",), dt_table=False) == (
"2026-06-01T10:00:00",
)
def test_coerce_params_dt_table_leaves_non_datetime_values():
assert coerce_params(("hello", 5, None), dt_table=True) == ("hello", 5, None)
def test_where_on_datetime_column_matches_with_iso_param(tmp_path):
"""The critical fix: a WHERE on an INTEGER-µs column with an ISO string param
must match instead of comparing INTEGER against TEXT (always 0 rows)."""
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
datetime_columns={"t": ["changed"]},
return_datetime=False,
)
rows = [
("1", datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)),
("2", datetime.datetime(2026, 6, 3, 10, 0, 0, tzinfo=_UTC)),
]
c.load_table("t", ["id", "changed"], FakeSource(rows))
_, out = c.execute_in_memory(
"SELECT id FROM t WHERE changed > ?", ("2026-06-02T00:00:00",), ["t"]
)
assert [r[0] for r in out] == ["2"]
c.close()
def test_where_on_datetime_column_without_table_scope_is_unchanged(tmp_path):
"""Without table scope the param isn't coerced — proves the fix is scoped."""
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
datetime_columns={"t": ["changed"]},
return_datetime=False,
)
c.load_table(
"t",
["id", "changed"],
FakeSource([("1", datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC))]),
)
# No `tables` arg → INTEGER vs TEXT comparison → no match (legacy behaviour).
_, out = c.execute_in_memory("SELECT id FROM t WHERE changed > ?", ("2026-01-01T00:00:00",))
assert out == []
c.close()
# --- reverse coercion: read back as datetime (B) ----------------------------
def test_from_sqlite_datetime_roundtrip():
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
assert from_sqlite_datetime(to_sqlite_datetime(dt)) == dt
def test_from_sqlite_datetime_passes_non_int():
assert from_sqlite_datetime("x") == "x"
assert from_sqlite_datetime(None) is None
assert from_sqlite_datetime(True) is True # bool is not treated as µs
def test_reverse_coerce_rows_only_named_columns():
us = to_sqlite_datetime(datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC))
out = reverse_coerce_rows([("1", us)], ["id", "changed"], {"changed"})
assert out[0][0] == "1"
assert out[0][1] == datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
def test_read_returns_datetime_by_default(tmp_path):
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
datetime_columns={"t": ["changed"]},
)
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
c.load_table("t", ["id", "changed"], FakeSource([("1", dt)]))
_, out = c.execute_in_memory("SELECT id, changed FROM t", None, ["t"])
assert out == [("1", dt)] # returned as a datetime, not the raw int
c.close()
def test_return_datetime_false_keeps_raw_int(tmp_path):
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
datetime_columns={"t": ["changed"]},
return_datetime=False,
)
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
c.load_table("t", ["id", "changed"], FakeSource([("1", dt)]))
_, out = c.execute_in_memory("SELECT changed FROM t", None, ["t"])
assert out == [(to_sqlite_datetime(dt),)] # raw INTEGER µs
c.close()
# --- public export (F) ------------------------------------------------------
def test_datetime_to_epoch_us_is_public():
from sqlmem import datetime_to_epoch_us
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
assert datetime_to_epoch_us(dt) == to_sqlite_datetime(dt)
# --- integration: values reach the cache through coercion ------------------- # --- integration: values reach the cache through coercion -------------------
+53 -1
View File
@@ -1,6 +1,6 @@
import sqlite3 import sqlite3
import threading import threading
from datetime import datetime from datetime import datetime, timezone
from types import SimpleNamespace from types import SimpleNamespace
import pytest import pytest
@@ -140,6 +140,18 @@ def test_bind_watermark_passes_through_non_datetime():
assert _bind_watermark("12345") == "12345" assert _bind_watermark("12345") == "12345"
# --- INTEGER µs watermark binding (datetime_columns, 1.12.0) ----------------
def test_bind_watermark_epoch_us_reconstructs_datetime():
dt = datetime(2026, 6, 5, 14, 54, 24, 823000, tzinfo=timezone.utc)
us = int(dt.timestamp() * 1_000_000)
# Whether the watermark is an int or its digit string (it round-trips through
# the TEXT last_synced_at column), it binds back to the same UTC datetime.
assert _bind_watermark(us, epoch_us=True) == dt
assert _bind_watermark(str(us), epoch_us=True) == dt
class _SpyCursor: class _SpyCursor:
def __init__(self, rows): def __init__(self, rows):
self._rows = list(rows) self._rows = list(rows)
@@ -174,6 +186,46 @@ def test_refresh_binds_watermark_as_datetime(env):
assert params == (datetime(2026, 6, 5, 14, 54, 24, 823000),) assert params == (datetime(2026, 6, 5, 14, 54, 24, 823000),)
class _RowSource:
"""Returns fixed rows for any query (for loading datetime-typed source data)."""
def __init__(self, rows):
self._rows = rows
def execute(self, sql, params=()):
return _SpyCursor(self._rows)
def test_datetime_column_watermark_stored_as_int_and_bound_back(tmp_path):
"""A change column declared in datetime_columns is stored as INTEGER µs; the
watermark is bound back to a real datetime for the source query."""
cache = CacheManager(
db_path=tmp_path / "c.db",
backup_interval=9999,
datetime_columns={"products": ["changed"]},
)
dt1 = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc)
dt2 = datetime(2026, 6, 1, 10, 5, 0, tzinfo=timezone.utc)
cache.load_table("products", ["id", "changed"], _RowSource([("1", dt1), ("2", dt2)]))
cache.create_unique_index("products", ["id"])
cache.set_last_synced_at("products", cache.max_value("products", "changed"))
# Watermark persisted as the max INTEGER µs (digit string out of the TEXT col).
wm = cache.get_last_synced_at("products")
assert wm == str(int(dt2.timestamp() * 1_000_000))
refresher = DeltaRefresher(
cache, {"products": ResolvedDelta("changed", ["id"])}
)
spy = _SpySource(rows=[]) # no new rows — just capture the bound watermark
refresher.refresh(spy)
assert spy.bound, "source query was never issued"
_, params = spy.bound[-1]
assert params == (dt2,) # bound back as datetime, not an int/string
cache.close()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Refresh failures are recorded (4.3) so a stuck delta is visible in stats # Refresh failures are recorded (4.3) so a stuck delta is visible in stats
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
+104
View File
@@ -385,3 +385,107 @@ def test_two_engines_separate_cache_files(source_engine, tmp_path):
assert b._cache.is_table_cached("products") is False # independent cache assert b._cache.is_table_cached("products") is False # independent cache
a.close() a.close()
b.close() b.close()
# ---------------------------------------------------------------------------
# Pragmas / hard_reset / vacuum (1.11.0)
# ---------------------------------------------------------------------------
def test_engine_passes_pragmas_to_cache(source_engine, tmp_path):
ce = CachingEngine(
source_engine,
cache_db_path=tmp_path / "cache.db",
in_memory=False,
pragmas={"page_size": 8192, "auto_vacuum": "INCREMENTAL"},
)
assert ce._cache.connection.execute("PRAGMA page_size").fetchone()[0] == 8192
assert ce._cache.connection.execute("PRAGMA auto_vacuum").fetchone()[0] == 2
ce.close()
def test_engine_hard_reset_reloads(source_engine, tmp_path):
ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False)
ce.execute("SELECT id FROM products")
assert ce._cache.is_table_cached("products") is True
ce.hard_reset()
assert ce._cache.is_table_cached("products") is False
rows = ce.execute("SELECT id, name FROM products") # reloads on next use
assert len(rows) == 3
ce.close()
def test_engine_vacuum_runs(source_engine, tmp_path):
ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False)
ce.execute("SELECT id FROM products")
ce.vacuum(incremental=False) # must not raise
assert ce._cache.is_table_cached("products") is True
ce.close()
# ---------------------------------------------------------------------------
# datetime_columns end-to-end: param coercion (A) + read-back datetime (B)
# ---------------------------------------------------------------------------
@pytest.fixture
def events_engine(tmp_path):
src = tmp_path / "events.db"
conn = sqlite3.connect(src)
conn.execute("CREATE TABLE events (id TEXT, changed TEXT)")
conn.executemany(
"INSERT INTO events VALUES (?, ?)",
[("1", "2026-06-01T10:00:00"), ("2", "2026-06-03T10:00:00")],
)
conn.commit()
conn.close()
se = create_engine(f"sqlite:///{src}")
yield se
se.dispose()
def test_datetime_column_where_and_readback(events_engine, tmp_path):
from datetime import datetime, timezone
ce = CachingEngine(
events_engine,
cache_db_path=tmp_path / "cache.db",
in_memory=False,
datetime_columns={"events": ["changed"]},
)
# A: WHERE on the INTEGER-µs column with an ISO string param returns the right row.
rows = ce.execute(
"SELECT id, changed FROM events WHERE changed > ?", ("2026-06-02T00:00:00",)
)
assert [r["id"] for r in rows] == ["2"]
# B: the column comes back as a datetime, not a raw integer.
assert rows[0]["changed"] == datetime(2026, 6, 3, 10, 0, 0, tzinfo=timezone.utc)
ce.close()
def test_datetime_column_return_datetime_false(events_engine, tmp_path):
ce = CachingEngine(
events_engine,
cache_db_path=tmp_path / "cache.db",
in_memory=False,
datetime_columns={"events": ["changed"]},
return_datetime=False,
)
rows = ce.execute("SELECT id, changed FROM events")
assert all(isinstance(r["changed"], int) for r in rows) # opt-out → raw µs
ce.close()
# ---------------------------------------------------------------------------
# db_size_bytes in stats (D)
# ---------------------------------------------------------------------------
def test_stats_reports_db_size_in_disk_mode(source_engine, tmp_path):
ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False)
ce.execute("SELECT id FROM products")
assert ce.stats.db_size_bytes > 0
ce.close()
def test_stats_db_size_zero_in_memory(engine):
engine.execute("SELECT id, name FROM products")
assert engine.stats.db_size_bytes == 0
+221
View File
@@ -0,0 +1,221 @@
import sqlite3
import time
from datetime import datetime, timezone
import pytest
from sqlalchemy import create_engine
from sqlmem import (
TTL,
CachingEngine,
Delta,
DeltaConfig,
TableSpec,
UndeclaredError,
)
@pytest.fixture
def spec_source(tmp_path):
db = tmp_path / "src.db"
conn = sqlite3.connect(db)
conn.execute("CREATE TABLE products (id TEXT, name TEXT, price TEXT, changed TEXT)")
conn.executemany(
"INSERT INTO products VALUES (?, ?, ?, ?)",
[
("1", "Widget", "9.99", "2026-06-01T10:00:00"),
("2", "Gadget", "19.99", "2026-06-02T10:00:00"),
],
)
conn.execute("CREATE TABLE orders (order_id TEXT, qty TEXT)")
conn.executemany("INSERT INTO orders VALUES (?, ?)", [("101", "2")])
conn.commit()
conn.close()
se = create_engine(f"sqlite:///{db}")
yield se
se.dispose()
# --- back-compat / validation -----------------------------------------------
def test_tables_and_legacy_kwargs_are_mutually_exclusive(spec_source):
with pytest.raises(ValueError):
CachingEngine(
spec_source,
tables=[TableSpec("products", ["id"])],
delta={"products": DeltaConfig("changed", ["id"])},
)
def test_duplicate_tablespec_raises(spec_source):
with pytest.raises(ValueError):
CachingEngine(
spec_source,
tables=[TableSpec("products", ["id"]), TableSpec("products", ["name"])],
)
def test_delta_alias_is_deltaconfig():
assert Delta is DeltaConfig
# --- preload ----------------------------------------------------------------
def test_preload_blocking_caches_before_first_query(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[TableSpec("products", ["id", "name"], preload=True)],
blocking_startup_refresh=True,
)
# Cached at construction time — no execute() needed.
assert ce._cache.is_table_cached("products") is True
rows = ce.execute("SELECT id, name FROM products")
assert {r["id"] for r in rows} == {"1", "2"}
ce.close()
def test_preload_non_blocking_eventually_caches(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[TableSpec("products", ["id", "name"], preload=True)],
)
deadline = time.time() + 5
while not ce._cache.is_table_cached("products") and time.time() < deadline:
time.sleep(0.05)
assert ce._cache.is_table_cached("products") is True
ce.close()
def test_non_preload_table_is_not_loaded_at_startup(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[TableSpec("products", ["id", "name"], preload=False)],
blocking_startup_refresh=True,
)
assert ce._cache.is_table_cached("products") is False # loads lazily on first query
ce.execute("SELECT id, name FROM products")
assert ce._cache.is_table_cached("products") is True
ce.close()
# --- fail-fast on undeclared tables / columns -------------------------------
def test_fail_fast_undeclared_table(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
with pytest.raises(UndeclaredError):
ce.execute("SELECT order_id FROM orders")
ce.close()
def test_fail_fast_undeclared_column(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
with pytest.raises(UndeclaredError):
ce.execute("SELECT price FROM products")
ce.close()
def test_declared_columns_query_succeeds(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
rows = ce.execute("SELECT id, name FROM products")
assert len(rows) == 2
ce.close()
def test_columns_none_allows_wildcard_and_any_column(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", columns=None)])
assert len(ce.execute("SELECT * FROM products")) == 2
assert len(ce.execute("SELECT price FROM products")) == 2 # any column allowed
ce.close()
def test_wildcard_on_column_restricted_table_fails(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
with pytest.raises(UndeclaredError):
ce.execute("SELECT * FROM products")
ce.close()
def test_lazy_mode_has_no_fail_fast(spec_source):
"""Without tables=, undeclared columns/tables load lazily as before."""
ce = CachingEngine(spec_source)
rows = ce.execute("SELECT order_id FROM orders")
assert len(rows) == 1
ce.close()
# --- refresh strategies via TableSpec ---------------------------------------
def test_tablespec_delta_tracking(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[
TableSpec(
"products",
["id", "name", "changed"],
refresh=Delta(change_column="changed", key_columns=["id"]),
preload=True,
)
],
blocking_startup_refresh=True,
)
assert ce.stats.tables["products"].tracking == "delta"
ce.close()
def test_tablespec_ttl_tracking(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[TableSpec("products", ["id", "name"], refresh=TTL(seconds=1800), preload=True)],
blocking_startup_refresh=True,
)
assert ce.stats.tables["products"].tracking == "ttl"
ce.close()
def test_tablespec_datetime_columns_roundtrip(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[
TableSpec("products", ["id", "changed"], datetime_columns=["changed"], preload=True)
],
blocking_startup_refresh=True,
)
rows = ce.execute("SELECT id, changed FROM products WHERE changed > ?", ("2026-06-01T12:00:00",))
assert [r["id"] for r in rows] == ["2"] # param coercion via datetime_columns
assert rows[0]["changed"] == datetime(2026, 6, 2, 10, 0, 0, tzinfo=timezone.utc)
ce.close()
# --- warm restart: preload skips a copy already fresh on disk ---------------
def test_warm_restart_preload_skips_reload(spec_source, tmp_path):
path = tmp_path / "c.db"
def make() -> CachingEngine:
return CachingEngine(
spec_source,
cache_db_path=path,
in_memory=False,
tables=[TableSpec("products", ["id", "name"], preload=True)],
blocking_startup_refresh=True,
)
ce1 = make()
assert ce1.stats.misses >= 1 # cold preload had to load from source
ce1.close()
ce2 = make()
assert ce2._cache.is_table_cached("products") is True
assert ce2.stats.misses == 0 # warm: preload was a cache hit, no redundant reload
ce2.close()