Compare commits
5 Commits
8744f458cc
...
devel
| Author | SHA1 | Date | |
|---|---|---|---|
| 4a86b2282f | |||
| 46370fe651 | |||
| a68b8994e3 | |||
| 8e46ee3547 | |||
| a21b5a2a04 |
+2
-5
@@ -1,8 +1,6 @@
|
||||
# Python
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*.pyo
|
||||
*.pyd
|
||||
*.egg
|
||||
*.egg-info/
|
||||
dist/
|
||||
@@ -40,12 +38,11 @@ Thumbs.db
|
||||
.env.*
|
||||
|
||||
# sqlmem cache (incl. WAL sidecars from disk-backed mode)
|
||||
cache.db
|
||||
cache.db-wal
|
||||
cache.db-shm
|
||||
cache.db*
|
||||
|
||||
# Agents
|
||||
AGENTS.md
|
||||
CLAUDE.md
|
||||
DESIGN_DOCUMENT_MODULE.md
|
||||
.claude/
|
||||
handover.md
|
||||
@@ -6,6 +6,87 @@ All notable changes to this project will be documented in this file.
|
||||
|
||||
---
|
||||
|
||||
## [1.16.0] - 2026-06-11
|
||||
|
||||
### Added
|
||||
- **Declarative table specs — `CachingEngine(tables=[TableSpec(...)])`** — declare each cached table up front (columns, indexes, refresh strategy, datetime columns, preload) instead of letting the engine learn columns lazily from queries. New public types `TableSpec`, `TTL`, `Delta` (a friendly alias of `DeltaConfig`) and exception `UndeclaredError`.
|
||||
- **Background preload** — `preload=True` tables are loaded at startup (on the background thread by default, so startup isn't blocked; `blocking_startup_refresh=True` loads them synchronously). A copy already fresh in the persistent cache is skipped via the same double-checked locking added in 1.15.0, so a warm restart is instant.
|
||||
- **Fail-fast on undeclared access** — in declarative mode a query referencing a table that has no `TableSpec`, or a column outside a spec's declared `columns` (including `SELECT *` on a column-restricted table), raises `UndeclaredError` instead of silently triggering an expensive lazy load / column-expansion. Declare `columns=None` to cache the whole table and allow any column.
|
||||
- **Solves the lazy second-reload** — because columns are declared, a first query for a previously unseen column no longer forces a full re-fetch.
|
||||
- `executor.ensure_loaded(table, columns)` — preloads a table into the cache (reusing the full load path: delta/index augmentation, registry, watermark, double-checked locking) without materializing any rows.
|
||||
|
||||
### Fixed
|
||||
- **Race on the shared cache connection** — the metadata reads (`is_table_cached`, `is_table_full`, `seconds_since_refresh`, `get_table_columns`, `get_last_synced_at`, `max_value`, `count_rows`) touched the single shared SQLite connection without the connection lock, so a query thread reading while the background refresh/preload thread wrote could raise `sqlite3.InterfaceError`. These reads now take the lock. More likely to surface now that startup preload adds background-thread activity.
|
||||
|
||||
### Changed
|
||||
- `pyproject.toml` — bumped version to `1.16.0`.
|
||||
- **Fully backward compatible** — omit `tables=` and the legacy `delta=`/`ttl=`/`indexes=`/`datetime_columns=` kwargs behave exactly as before (lazy mode, no fail-fast). Passing both `tables=` and any of those kwargs raises `ValueError`; `tables=` is internally converted to the same config.
|
||||
|
||||
---
|
||||
|
||||
## [1.15.0] - 2026-06-11
|
||||
|
||||
### Fixed
|
||||
- **Cache stampede (thundering herd) on cold loads** — the decision to load a table was made *before* the load lock was taken, and `load_table` never re-checked after acquiring it. During a slow cold load of a large table (observed: 212M rows, ~2 h), a second query for the same table passed the pre-lock "not cached" check, queued on the load lock, and then ran a **redundant second full reload** instead of seeing the first had finished — doubling a multi-hour load. `load_table` now does **double-checked locking**: after acquiring the load lock it re-evaluates a caller-supplied predicate (table cached, all needed columns present, not TTL-expired) and skips the load when it is already satisfied. Invisible on small tables; on large ones it removes hours of redundant indexing under concurrent cold-start traffic.
|
||||
|
||||
### Changed
|
||||
- `pyproject.toml` — bumped version to `1.15.0`.
|
||||
- `CacheManager.load_table` gained an optional `recheck` callback (the double-check predicate); `QueryExecutor` supplies it for both column and `SELECT *` loads.
|
||||
|
||||
---
|
||||
|
||||
## [1.14.0] - 2026-06-10
|
||||
|
||||
Follow-up to 1.12.0 from running `datetime_columns` in production: the feature was only half-wired (writes were coerced, reads and query params were not).
|
||||
|
||||
### Fixed
|
||||
- **`WHERE` on an INTEGER-µs `datetime_columns` column silently returned 0 rows** — `execute_in_memory()` coerced query params with `to_sqlite()`, which leaves an ISO string a string. Comparing the stored `INTEGER` against a `TEXT` param is always false under SQLite affinity, so `WHERE CHANGE_DATE > '2026-05-01T…'` matched nothing. Params for a query that touches a `datetime_columns` table are now coerced to epoch µs (datetime objects and ISO-datetime strings alike), so the comparison matches the stored integer. Scoped to the query's tables, so non-datetime queries are unaffected.
|
||||
|
||||
### Added
|
||||
- **Read-time coercion — `datetime_columns` come back as `datetime`** — `execute()` now returns those columns as real `datetime` objects (UTC) instead of the raw INTEGER µs, restoring the transparent-proxy contract (you get the same type a direct source query would give). Opt out with `CachingEngine(..., return_datetime=False)` to get the raw integers.
|
||||
- **`Stats.db_size_bytes`** — on-disk size of the cache file (0 in memory mode), so `engine.stats` exposes cache growth for monitoring without an external file check.
|
||||
- **Public `datetime_to_epoch_us` helper** — `from sqlmem import datetime_to_epoch_us` exposes the same datetime→epoch-µs conversion used internally, so callers building `WHERE change_col > ?` params don't have to re-implement it.
|
||||
|
||||
### Changed
|
||||
- `pyproject.toml` — bumped version to `1.14.0`.
|
||||
- **`vacuum(incremental=True)` now warns instead of silently doing nothing** when the cache was not created with `auto_vacuum=INCREMENTAL` (the only mode in which incremental vacuum can reclaim pages); it logs how to fix it (`hard_reset()` with the pragma, or a full `vacuum(incremental=False)`) and returns.
|
||||
- `CacheManager.execute_in_memory()` gained an optional `tables` argument (the query's tables) used to scope datetime param/result coercion; `CacheManager`/`CachingEngine` gained a `return_datetime` flag.
|
||||
|
||||
---
|
||||
|
||||
## [1.12.0] - 2026-06-09
|
||||
|
||||
### ⚠️ Breaking
|
||||
- **`SCHEMA_VERSION` bumped `3` → `4`** — on upgrade the existing cache is wiped automatically (disk mode wipes the file in place, in-memory discards the backup) and reloaded from the source on next use. For a large cache (e.g. a multi-hundred-million-row table) the full reload can take a while; deploy in a maintenance window.
|
||||
- **`datetime_columns` change the public output contract for the chosen columns** — a column listed in `datetime_columns` is stored and returned as an **INTEGER (microseconds since the Unix epoch, UTC)**, not an ISO `TEXT` string. This is opt-in per column, so no table is affected unless you name its columns; consumers that read or filter such a column must adapt (compare against integer µs, or convert on read).
|
||||
|
||||
### Added
|
||||
- **`datetime_columns=` parameter on `CachingEngine` / `CacheManager`** — `datetime_columns={"VW_X": ["CHANGE_DATE"]}` stores the named datetime columns as INTEGER µs-since-epoch instead of ~28-byte ISO `TEXT`. Saves ~20 bytes per row and makes index comparisons on the column operate on native integers instead of string collation — worthwhile for a pure datetime column on a very large table (e.g. a delta change column that is also range-scanned).
|
||||
- `_coerce.to_sqlite_datetime()` converts datetimes (and ISO/`date` values) to exact integer microseconds via integer arithmetic (no float rounding); a naive datetime is treated as UTC, `None` passes through.
|
||||
- `load_table` declares those columns `INTEGER` and `upsert_rows` coerces them the same way, so full loads and delta upserts agree on the on-disk representation.
|
||||
- The delta high-watermark for such a column is the stored integer; `delta._bind_watermark(..., epoch_us=True)` reconstructs a real UTC `datetime` before binding, so the source still receives a typed timestamp (and the watermark fix from 1.8.0 keeps holding).
|
||||
|
||||
### Changed
|
||||
- `pyproject.toml` — bumped version to `1.12.0`.
|
||||
- `CacheManager.max_value` / `set_last_synced_at` now accept/return `int` watermarks alongside `str` (the INTEGER-µs watermark round-trips through the `last_synced_at` TEXT column as its digit string).
|
||||
|
||||
---
|
||||
|
||||
## [1.11.0] - 2026-06-09
|
||||
|
||||
### Added
|
||||
- **`pragmas=` parameter on `CachingEngine` / `CacheManager`** — pass a dict of SQLite PRAGMAs (e.g. `mmap_size`, `cache_size`, `temp_store`, `page_size`, `auto_vacuum`) applied to the cache connection at open time, so disk-backed caches can be tuned for the host's I/O profile without bypassing `CacheManager`. Unknown/inapplicable pragmas are silently ignored by SQLite (graceful degradation, no startup crash).
|
||||
- **`page_size`** is a layout pragma: it is applied only on a *fresh* file (set before WAL / the first table). On an existing cache with a different page size the request is ignored and a one-time warning is logged — the new value takes effect only after `hard_reset()` or a rebuild.
|
||||
- **`auto_vacuum`** is set before the database header is materialized (before switching to WAL) on a fresh file, so `INCREMENTAL`/`FULL` actually stick instead of silently reverting to `NONE`.
|
||||
- **`CachingEngine.hard_reset()` / `CacheManager.hard_reset()`** — close every connection, delete the on-disk cache file (and its `-wal`/`-shm` sidecars) and reopen from scratch with all current pragmas applied. Unlike `reset()` (which drops tables but keeps the open file), this lets `page_size`/`auto_vacuum` change, since those are baked into the file at creation. Disk mode only — falls back to `reset()` in memory mode. All tables reload on next use.
|
||||
- **`CachingEngine.vacuum(incremental=True, pages=10_000)` / `CacheManager.vacuum(...)`** — run maintenance VACUUM on the on-disk cache to reclaim free pages left by delta `INSERT OR REPLACE` churn. Incremental (default) reclaims up to `pages` pages without blocking readers or extra disk (requires `auto_vacuum=INCREMENTAL`); `incremental=False` runs a full VACUUM (rewrites the file, ~2× disk, blocks readers — maintenance window only). No-op in memory mode.
|
||||
|
||||
### Changed
|
||||
- `pyproject.toml` — bumped version to `1.11.0`.
|
||||
- `ColumnRegistry` gained `rebind()` so it follows the cache connection swap performed by `hard_reset()` (the registry previously captured the connection for the process lifetime).
|
||||
|
||||
---
|
||||
|
||||
## [1.10.0] - 2026-06-09
|
||||
|
||||
### Added
|
||||
|
||||
@@ -238,6 +238,41 @@ Each value is a list of index definitions: a string is a single-column index, a
|
||||
- Indexes are **recreated after every (re)load** — full loads, TTL reloads, and `invalidate()` + re-fetch all rebuild them — so they're always present, and they persist in `cache.db` across restarts.
|
||||
- Delta-tracked tables already get a unique index on their key columns; secondary indexes are independent and can be combined with `delta` or `ttl`.
|
||||
|
||||
## Declarative initialization (`tables=`)
|
||||
|
||||
Instead of the lazy "learn columns from queries" mode, you can **declare every table up front** with `tables=[TableSpec(...)]` — its columns, indexes, refresh strategy and which columns are datetimes — and have the engine preload them and reject anything undeclared:
|
||||
|
||||
```python
|
||||
from sqlmem import CachingEngine, TableSpec, Delta, TTL
|
||||
|
||||
engine = CachingEngine(
|
||||
base_engine,
|
||||
tables=[
|
||||
TableSpec(
|
||||
name="VW_P_PRATVALUES",
|
||||
columns=["PRODUCT_PRODUCTNR", "PRAT_NAME", "PRATVALUE", "CHANGE_DATE"],
|
||||
indexes=["PRODUCT_PRODUCTNR", "PRAT_NAME", "CHANGE_DATE"],
|
||||
refresh=Delta(change_column="CHANGE_DATE", key_columns=["PRATVALUE_ID"]),
|
||||
datetime_columns=["CHANGE_DATE"],
|
||||
preload=True,
|
||||
),
|
||||
TableSpec(
|
||||
name="VW_PRODUCTS_ASSIGNED_E",
|
||||
columns=["PRODUCT_PRODUCTNR", "ELEMENT_NAME", "ELEMENT_ID"],
|
||||
indexes=["PRODUCT_PRODUCTNR"],
|
||||
refresh=TTL(seconds=1800),
|
||||
preload=True,
|
||||
),
|
||||
],
|
||||
pragmas={"mmap_size": 32 * 1024**3, "page_size": 8192},
|
||||
)
|
||||
```
|
||||
|
||||
- **Preload** — `preload=True` tables are loaded at startup (on the background thread by default, so startup isn't blocked; pass `blocking_startup_refresh=True` to load them synchronously before serving). A copy already fresh in the persistent cache is **skipped**, so a warm restart is instant. During warm-up a table reports `TableState.LOADING` in [`stats`](#runtime-statistics) — handy for gating a `503` until it's `ready`.
|
||||
- **Fail-fast** — a query for a table without a `TableSpec`, or for a column outside a spec's declared `columns` (including `SELECT *` on a column-restricted table), raises `UndeclaredError` instead of silently kicking off an expensive lazy load. Use `columns=None` to cache the whole table and allow any column.
|
||||
- `refresh=` takes a `Delta(change_column=…, key_columns=…)` (same as `DeltaConfig`) or `TTL(seconds=…)`, or `None` for a static table.
|
||||
- **Backward compatible** — omit `tables=` and the legacy `delta=`/`ttl=`/`indexes=`/`datetime_columns=` kwargs work exactly as before (lazy mode, no fail-fast). Passing both raises `ValueError`.
|
||||
|
||||
## Persistence
|
||||
|
||||
By default the cache lives in an **in-memory SQLite** and is persisted to `cache.db` on disk:
|
||||
@@ -263,29 +298,80 @@ engine = CachingEngine(base_engine, in_memory=False)
|
||||
|
||||
The constructor argument wins over the env var; when `in_memory` is omitted it falls back to `SQLMEM_IN_MEMORY`.
|
||||
|
||||
#### Tuning the SQLite layer (`pragmas=`)
|
||||
|
||||
For a large disk-backed cache, pass SQLite PRAGMAs to tune the read path and on-disk layout without bypassing SQLmem:
|
||||
|
||||
```python
|
||||
engine = CachingEngine(
|
||||
base_engine,
|
||||
in_memory=False,
|
||||
pragmas={
|
||||
"mmap_size": 32 * 1024**3, # map the DB into the address space (32 GB)
|
||||
"cache_size": -262144, # 256 MB page cache (negative = KiB)
|
||||
"temp_store": 2, # ORDER BY / GROUP BY scratch in RAM
|
||||
"page_size": 8192, # larger pages → fewer reads on range scans
|
||||
"auto_vacuum": "INCREMENTAL",# reclaim free pages with vacuum() (see below)
|
||||
},
|
||||
)
|
||||
```
|
||||
|
||||
- Every entry is applied as `PRAGMA <key> = <value>` when the cache connection opens. **Unknown or inapplicable pragmas are silently ignored** by SQLite, so a bad value degrades gracefully instead of crashing startup.
|
||||
- **`page_size` and `auto_vacuum` are layout pragmas** — they only take effect on a *fresh* file (set before the first table). On an existing cache, `page_size` is ignored with a one-time warning; use [`hard_reset()`](#manual-cache-control) to rebuild the file with the new value.
|
||||
|
||||
#### INTEGER datetime columns (`datetime_columns=`)
|
||||
|
||||
A pure datetime column stored as an ISO `TEXT` string costs ~28 bytes per row and compares by string collation. For a large table you can store named datetime columns as **INTEGER microseconds since the Unix epoch** instead — 8 bytes, native integer comparison:
|
||||
|
||||
```python
|
||||
engine = CachingEngine(
|
||||
base_engine,
|
||||
delta={"VW_P_PRATVALUES": DeltaConfig("CHANGE_DATE", ["PRATVALUE_ID"])},
|
||||
datetime_columns={"VW_P_PRATVALUES": ["CHANGE_DATE"]},
|
||||
)
|
||||
```
|
||||
|
||||
- **Opt-in per column.** Only the columns you name change; everything else keeps the default lossless `TEXT` storage.
|
||||
- **Transparent in and out.** A `WHERE` on such a column accepts a `datetime` or an ISO string — the param is coerced to integer µs so the comparison matches — and `execute()` returns the column as a real `datetime` (UTC), the same type a direct source query would give. Pass `return_datetime=False` to get the raw integers instead.
|
||||
- The delta watermark is handled transparently: it is persisted as the integer and bound back to a real `datetime` for the source query, so incremental refresh keeps working.
|
||||
- ⚠️ This is a **breaking on-disk change** (`SCHEMA_VERSION` 4): an existing cache is wiped and reloaded on first start after enabling it — schedule a maintenance window for a large reload.
|
||||
|
||||
To build a `WHERE` param yourself (e.g. an HTTP `?since=` filter) without re-implementing the conversion, use the exported helper:
|
||||
|
||||
```python
|
||||
from sqlmem import datetime_to_epoch_us
|
||||
rows = engine.execute("SELECT * FROM events WHERE changed > ?", (datetime_to_epoch_us(since),))
|
||||
```
|
||||
|
||||
## Manual cache control
|
||||
|
||||
```python
|
||||
engine.invalidate("orders") # drop one table from cache; next query re-fetches it from DB
|
||||
engine.reset() # wipe the whole cache (RAM + cache.db) — full clean slate
|
||||
engine.hard_reset() # disk mode: delete the file and reopen with current pragmas/page_size
|
||||
engine.vacuum() # disk mode: incremental VACUUM (reclaim free pages from delta churn)
|
||||
engine.refresh() # pull deltas for all delta-tracked tables now
|
||||
engine.close() # flush to disk and shut down background thread
|
||||
```
|
||||
|
||||
Use `reset()` after a **structural change** in the source (columns added/removed, values cleared in bulk without bumping the change timestamp) so the cache rebuilds from scratch. `invalidate(table)` is the targeted version for a single table.
|
||||
|
||||
`hard_reset()` goes further than `reset()` in disk mode: it closes every connection, deletes `cache.db` (and its `-wal`/`-shm` sidecars) and reopens from scratch — the only way to change a baked-in `page_size`/`auto_vacuum`. In memory mode it falls back to `reset()`.
|
||||
|
||||
`vacuum()` reclaims free pages left behind by delta `INSERT OR REPLACE` churn. Incremental (the default) is cheap and non-blocking but needs `auto_vacuum=INCREMENTAL` (set it via `pragmas=` on a fresh cache); if the cache wasn't created that way, `vacuum(incremental=True)` logs a warning and does nothing rather than silently no-op'ing. `vacuum(incremental=False)` runs a full VACUUM that rewrites the file (~2× disk, blocks readers) — schedule it in a maintenance window. Both are no-ops in memory mode.
|
||||
|
||||
## Runtime statistics
|
||||
|
||||
```python
|
||||
stats = engine.stats # Stats snapshot
|
||||
print(stats.hits, stats.misses, stats.refetches, stats.errors)
|
||||
print(stats.hits, stats.misses, stats.refetches, stats.errors, stats.db_size_bytes)
|
||||
for name, t in stats.tables.items():
|
||||
print(name, t.rows, t.state, t.tracking, t.last_upsert, t.last_refresh)
|
||||
if t.consecutive_failures:
|
||||
print(f" {name} failing ×{t.consecutive_failures}: {t.last_error} ({t.last_error_at})")
|
||||
```
|
||||
|
||||
`Stats.errors` is the total number of load/refresh failures since start. Each `TableStats` also carries `last_error`, `last_error_at` and `consecutive_failures` (reset to 0 on the next success) — so a delta that fails *before* streaming (which otherwise leaves `state` looking `ready`) is still visible, and the table is marked `error`.
|
||||
`Stats.db_size_bytes` is the on-disk cache file size (0 in memory mode) — handy for monitoring cache growth. `Stats.errors` is the total number of load/refresh failures since start. Each `TableStats` also carries `last_error`, `last_error_at` and `consecutive_failures` (reset to 0 on the next success) — so a delta that fails *before* streaming (which otherwise leaves `state` looking `ready`) is still visible, and the table is marked `error`.
|
||||
|
||||
Two timestamps distinguish *data freshness* from *liveness*:
|
||||
|
||||
@@ -319,6 +405,7 @@ By default the cache is **in-memory SQLite**, so a cached table lives in RAM —
|
||||
- **Use [disk-backed mode](#disk-backed-cache-no-ram-copy)** (`in_memory=False`) when the working set simply doesn't fit in RAM — queries then run against `cache.db` on disk instead of a memory copy.
|
||||
- **Loads are streamed in batches** (`SQLMEM_FETCH_BATCH` rows at a time, default 10 000) into a staging table and swapped in atomically. A multi-million-row table never gets fully materialized in Python at once, so the load doesn't spike memory or crash the process, and readers keep seeing the previous copy until the swap completes.
|
||||
- Use **[delta refresh](#incremental-delta-refresh)** for large tables that have a change column — after the first load only changed rows are pulled, so restarts and refreshes don't re-read the whole table.
|
||||
- **Concurrent queries during a cold load are deduplicated** — while one query is loading a large table, others for the same table wait and then read the freshly loaded cache rather than kicking off their own redundant reload (double-checked locking), so a slow cold start isn't multiplied by concurrent traffic.
|
||||
- A **single query that returns a huge result set** (e.g. `SELECT *` over a multi-million-row cached table) still materializes that result as a list of dicts; bound it with a `WHERE`/`LIMIT` rather than selecting everything.
|
||||
|
||||
## Configuration
|
||||
@@ -346,6 +433,9 @@ engine = CachingEngine(
|
||||
refresh_interval=300, # SQLMEM_REFRESH_INTERVAL
|
||||
fetch_batch=10000, # SQLMEM_FETCH_BATCH
|
||||
dialect="tsql", # SQLMEM_SQL_DIALECT
|
||||
pragmas={"mmap_size": 32 * 1024**3, "page_size": 8192}, # disk-mode SQLite tuning
|
||||
datetime_columns={"orders": ["created_at"]}, # store these as INTEGER µs (opt-in)
|
||||
return_datetime=True, # return datetime_columns as datetime (vs raw µs int)
|
||||
blocking_startup_refresh=False, # block startup until caught up? (default: no)
|
||||
)
|
||||
```
|
||||
@@ -358,9 +448,10 @@ By default the **startup catch-up** (delta pulls and TTL reloads for tables rest
|
||||
|---|---|
|
||||
| `ReadOnlyError` | INSERT, UPDATE, or DELETE statement |
|
||||
| `UnsupportedQueryError` | non-SELECT statement, `SELECT` without `FROM`, or an unqualified column in a multi-table query |
|
||||
| `UndeclaredError` | in [declarative mode](#declarative-initialization-tables) (`tables=`): a query references a table or column that was not declared |
|
||||
|
||||
```python
|
||||
from sqlmem import ReadOnlyError, UnsupportedQueryError
|
||||
from sqlmem import ReadOnlyError, UnsupportedQueryError, UndeclaredError
|
||||
```
|
||||
|
||||
## Logging
|
||||
|
||||
+11
@@ -215,6 +215,17 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o
|
||||
- [x] **Quoting identifikátorů**: názvy tabulek/sloupců se kvótují (SQLite `"x"` pro cache, dialekt zdroje — T-SQL `[x]` — pro source), takže rezervovaná slova i mezery fungují.
|
||||
- [x] **Lazy source connection**: `execute()` neotevírá spojení ke zdroji při cache hitu (neobsazuje pool slot).
|
||||
- [x] **Idempotentní `add_sink`**: opakované volání pro stejný sink je no-op (žádné duplicitní logy).
|
||||
- [x] **Ladění SQLite vrstvy (`pragmas=`)**: `CachingEngine(..., pragmas={...})` aplikuje libovolné PRAGMA na cache spojení (`mmap_size`, `cache_size`, `temp_store`, `page_size`, `auto_vacuum`). `page_size` a `auto_vacuum` jsou layout-pragmata — platí jen na čerstvém souboru (page_size na existující cache se ignoruje s warningem). Neznámá pragmata SQLite tiše ignoruje.
|
||||
- [x] **`hard_reset()`**: smaže on-disk soubor (+ WAL/SHM) a otevře nový s aktuálními pragmaty — na rozdíl od `reset()` umožní změnit `page_size`/`auto_vacuum`. Jen disk mód (v memory módu fallback na `reset()`).
|
||||
- [x] **`vacuum(incremental=, pages=)`**: údržbový VACUUM cache souboru — inkrementální (uvolní volné stránky po delta `INSERT OR REPLACE`, vyžaduje `auto_vacuum=INCREMENTAL`) nebo plný (přepíše soubor, jen v maintenance okně). V memory módu no-op.
|
||||
- [x] **Nativní INTEGER ukládání datetime sloupců (`datetime_columns=`)**: `datetime_columns={"VW_X": ["CHANGE_DATE"]}` — vyjmenované datetime sloupce se ukládají jako INTEGER µs-od-epochy místo ~28 B ISO TEXT (úspora místa + nativní celočíselné porovnání indexu). Opt-in per sloupec. Breaking: `SCHEMA_VERSION` 3→4, cache se při upgrade smaže a načte znovu. Watermark se persistuje jako int a `_bind_watermark(epoch_us=True)` ho rekonstruuje zpět na `datetime` pro zdroj.
|
||||
- **Param coercion**: `WHERE col > ?` s ISO/`datetime` parametrem se zkoercuje na epoch µs (scoped na tabulky dotazu), takže porovnání INTEGER sloupce sedí (dřív vracelo 0 řádků).
|
||||
- **Read-time coercion**: čtení vrací `datetime` objekt místo raw int (transparentní proxy); opt-out `CachingEngine(..., return_datetime=False)`.
|
||||
- Veřejný helper `from sqlmem import datetime_to_epoch_us` pro konstrukci parametrů bez duplicitní logiky.
|
||||
- [x] **`vacuum(incremental=True)` varuje bez `auto_vacuum=INCREMENTAL`**: dřív tichý no-op; teď zaloguje warning (a jak to opravit) a vrátí se.
|
||||
- [x] **`Stats.db_size_bytes`**: velikost cache souboru na disku (0 v memory módu) ve `stats` pro monitoring.
|
||||
- [x] **Ochrana proti cache stampede**: `load_table` dělá double-checked locking — po získání `_load_lock` znovu ověří, zda tabulku mezitím nenahrál souběžný loader (cached + sloupce + ne-stale), a redundantní reload přeskočí. Bez toho druhý dotaz během studeného loadu velké tabulky spustil druhý plný reload (212M řádků = +2 h).
|
||||
- [x] **Deklarativní inicializace (`tables=[TableSpec(...)]`)**: předem se deklaruje každá tabulka (`TableSpec(name, columns, indexes, refresh=Delta(...)|TTL(...), datetime_columns, preload)`). `preload=True` se na pozadí (nebo blokujícně) přednahraje při startu; co je v persistentní cache čerstvé, se přeskočí (warm restart = instant). Nedeklarovaná tabulka/sloupec (i `SELECT *` na sloupcově omezené tabulce) → `UndeclaredError` (fail-fast) místo tichého líného loadu; `columns=None` = celá tabulka + libovolný sloupec. Plně zpětně kompatibilní: bez `tables=` fungují staré `delta=/ttl=/indexes=/datetime_columns=` jako dřív (kombinace obojího → `ValueError`).
|
||||
|
||||
## TODO — budoucí funkce
|
||||
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "sqlmem"
|
||||
version = "1.10.0"
|
||||
version = "1.16.0"
|
||||
description = ""
|
||||
authors = [
|
||||
{name = "jan.doubravsky@gmail.com"}
|
||||
|
||||
@@ -3,10 +3,12 @@ from typing import Any
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from ._coerce import to_sqlite_datetime as datetime_to_epoch_us
|
||||
from .config import DEBUG
|
||||
from .delta import DeltaConfig
|
||||
from .engine import CachingEngine
|
||||
from .exceptions import ReadOnlyError, UnsupportedQueryError
|
||||
from .exceptions import ReadOnlyError, UndeclaredError, UnsupportedQueryError
|
||||
from .spec import TTL, Delta, TableSpec
|
||||
from .stats import Stats, TableStats
|
||||
|
||||
_DEFAULT_FORMAT = (
|
||||
@@ -58,9 +60,14 @@ def add_sink(sink: Any, *, level: str | None = None, **kwargs: Any) -> None:
|
||||
__all__ = [
|
||||
"CachingEngine",
|
||||
"DeltaConfig",
|
||||
"Delta",
|
||||
"TTL",
|
||||
"TableSpec",
|
||||
"ReadOnlyError",
|
||||
"UnsupportedQueryError",
|
||||
"UndeclaredError",
|
||||
"Stats",
|
||||
"TableStats",
|
||||
"add_sink",
|
||||
"datetime_to_epoch_us",
|
||||
]
|
||||
|
||||
+82
-3
@@ -10,11 +10,18 @@ left untouched.
|
||||
|
||||
import datetime
|
||||
import decimal
|
||||
import re
|
||||
import uuid
|
||||
from typing import Any
|
||||
|
||||
Params = tuple | list | dict | None
|
||||
|
||||
_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
|
||||
|
||||
# A string that *starts* with an ISO date+time (``2026-05-01T00:00:00`` or
|
||||
# space-separated). Used to spot a datetime passed as a string in a query param.
|
||||
_ISO_DATETIME_RE = re.compile(r"^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}")
|
||||
|
||||
|
||||
def to_sqlite(value: Any) -> Any:
|
||||
if isinstance(value, decimal.Decimal):
|
||||
@@ -28,13 +35,85 @@ def to_sqlite(value: Any) -> Any:
|
||||
return value
|
||||
|
||||
|
||||
def to_sqlite_datetime(value: Any) -> int | None:
|
||||
"""Store a datetime as INTEGER microseconds since the Unix epoch (UTC).
|
||||
|
||||
Used for columns the caller marks via ``datetime_columns``: 8 bytes as an
|
||||
INTEGER instead of a ~28-byte ISO ``TEXT`` string, and integer comparison on
|
||||
the change column instead of string collation. ``None`` passes through; a
|
||||
naive datetime is treated as UTC. A non-datetime value is parsed from its ISO
|
||||
string form (so ``date``/ISO-``str`` inputs work too); anything unparseable
|
||||
becomes ``None``.
|
||||
"""
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, datetime.datetime):
|
||||
if value.tzinfo is None:
|
||||
value = value.replace(tzinfo=datetime.timezone.utc)
|
||||
delta = value - _EPOCH # exact integer arithmetic (no float rounding)
|
||||
return delta.days * 86_400_000_000 + delta.seconds * 1_000_000 + delta.microseconds
|
||||
try:
|
||||
return to_sqlite_datetime(datetime.datetime.fromisoformat(str(value)))
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def from_sqlite_datetime(value: Any) -> Any:
|
||||
"""Inverse of :func:`to_sqlite_datetime`: INTEGER µs-since-epoch → UTC datetime.
|
||||
|
||||
Non-integers (a ``NULL`` value, or a column that isn't datetime-typed) pass
|
||||
through unchanged.
|
||||
"""
|
||||
if isinstance(value, bool) or not isinstance(value, int):
|
||||
return value
|
||||
return _EPOCH + datetime.timedelta(microseconds=value)
|
||||
|
||||
|
||||
def coerce_row(row: tuple) -> tuple:
|
||||
return tuple(to_sqlite(v) for v in row)
|
||||
|
||||
|
||||
def coerce_params(params: Params) -> tuple | dict | None:
|
||||
def _coerce_param(value: Any, dt_table: bool) -> Any:
|
||||
"""Coerce a single query parameter.
|
||||
|
||||
When the query touches a table that stores datetime columns as INTEGER µs
|
||||
(*dt_table*), a datetime object or an ISO-datetime string is converted to
|
||||
epoch µs so a ``WHERE`` comparison matches the stored INTEGER instead of
|
||||
comparing INTEGER against TEXT (which SQLite affinity makes always false).
|
||||
Otherwise the default stringifying coercion applies, unchanged.
|
||||
"""
|
||||
if dt_table and (
|
||||
isinstance(value, datetime.datetime)
|
||||
or (isinstance(value, str) and _ISO_DATETIME_RE.match(value))
|
||||
):
|
||||
result = to_sqlite_datetime(value)
|
||||
if result is not None:
|
||||
return result
|
||||
return to_sqlite(value)
|
||||
|
||||
|
||||
def coerce_params(params: Params, dt_table: bool = False) -> tuple | dict | None:
|
||||
if params is None:
|
||||
return None
|
||||
if isinstance(params, dict):
|
||||
return {key: to_sqlite(val) for key, val in params.items()}
|
||||
return tuple(to_sqlite(val) for val in params)
|
||||
return {key: _coerce_param(val, dt_table) for key, val in params.items()}
|
||||
return tuple(_coerce_param(val, dt_table) for val in params)
|
||||
|
||||
|
||||
def reverse_coerce_rows(
|
||||
rows: list[tuple], col_names: list[str], dt_cols: set[str]
|
||||
) -> list[tuple]:
|
||||
"""Turn INTEGER µs back into ``datetime`` for result columns in *dt_cols*.
|
||||
|
||||
A no-op when no result column is a datetime column, so non-datetime queries
|
||||
pay nothing.
|
||||
"""
|
||||
if not dt_cols:
|
||||
return rows
|
||||
dt_idx = {i for i, c in enumerate(col_names) if c in dt_cols}
|
||||
if not dt_idx:
|
||||
return rows
|
||||
return [
|
||||
tuple(from_sqlite_datetime(v) if i in dt_idx else v for i, v in enumerate(row))
|
||||
for row in rows
|
||||
]
|
||||
|
||||
+257
-35
@@ -2,6 +2,7 @@ import atexit
|
||||
import signal
|
||||
import sqlite3
|
||||
import threading
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
@@ -9,12 +10,18 @@ from pathlib import Path
|
||||
from loguru import logger
|
||||
|
||||
import sqlmem._meta as _meta
|
||||
from ._coerce import coerce_params, coerce_row
|
||||
from ._coerce import (
|
||||
coerce_params,
|
||||
coerce_row,
|
||||
reverse_coerce_rows,
|
||||
to_sqlite,
|
||||
to_sqlite_datetime,
|
||||
)
|
||||
from ._sql import quote, quote_list, quote_source
|
||||
from .config import FETCH_BATCH_SIZE, SQL_DIALECT
|
||||
from .stats import TableState
|
||||
|
||||
SCHEMA_VERSION = 3
|
||||
SCHEMA_VERSION = 4
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@@ -40,12 +47,19 @@ class CacheManager:
|
||||
in_memory: bool = True,
|
||||
dialect: str = SQL_DIALECT,
|
||||
fetch_batch: int = FETCH_BATCH_SIZE,
|
||||
pragmas: dict[str, str | int] | None = None,
|
||||
datetime_columns: dict[str, list[str]] | None = None,
|
||||
return_datetime: bool = True,
|
||||
) -> None:
|
||||
self._db_path = db_path
|
||||
self._backup_interval = backup_interval
|
||||
self._in_memory = in_memory
|
||||
self._dialect = dialect # source-DB dialect, for identifier quoting
|
||||
self._fetch_batch = fetch_batch # rows fetched per source batch
|
||||
self._pragmas = dict(pragmas or {}) # extra read/layout PRAGMAs (disk mode)
|
||||
# table → columns stored as INTEGER µs-since-epoch instead of ISO TEXT
|
||||
self._datetime_columns = {t: list(c) for t, c in (datetime_columns or {}).items()}
|
||||
self._return_datetime = return_datetime # reverse-coerce reads back to datetime
|
||||
self._lock = threading.Lock() # serializes connection access
|
||||
self._load_lock = threading.Lock() # serializes full table loads
|
||||
self._states: dict[str, str] = {} # table → live processing state
|
||||
@@ -59,12 +73,12 @@ class CacheManager:
|
||||
|
||||
if in_memory:
|
||||
self._conn = sqlite3.connect(":memory:", check_same_thread=False)
|
||||
self._apply_pragmas(self._conn)
|
||||
else:
|
||||
# Disk-backed: query the on-disk file directly — no RAM copy, every
|
||||
# write persists immediately, and the cache can exceed available RAM.
|
||||
self._conn = sqlite3.connect(str(db_path), check_same_thread=False)
|
||||
self._conn.execute("PRAGMA journal_mode=WAL")
|
||||
self._conn.execute("PRAGMA synchronous=NORMAL")
|
||||
db_existed = db_path.exists() and db_path.stat().st_size > 0
|
||||
self._conn = self._open_disk_connection(db_existed)
|
||||
self._discard_if_schema_mismatch()
|
||||
|
||||
self._ensure_meta_tables()
|
||||
@@ -83,6 +97,54 @@ class CacheManager:
|
||||
def connection(self) -> sqlite3.Connection:
|
||||
return self._conn
|
||||
|
||||
def _open_disk_connection(self, db_existed: bool) -> sqlite3.Connection:
|
||||
"""Open the on-disk cache connection with WAL + the configured pragmas.
|
||||
|
||||
``page_size`` and ``auto_vacuum`` are layout pragmas that only take
|
||||
effect on a *fresh* file (before the first table exists), so they are
|
||||
applied conditionally on ``db_existed``; everything else is applied
|
||||
unconditionally. Used by both ``__init__`` and :meth:`hard_reset`.
|
||||
"""
|
||||
conn = sqlite3.connect(str(self._db_path), check_same_thread=False)
|
||||
# page_size must be set before WAL/the first table on a brand-new file;
|
||||
# on an existing file it is silently ignored until the next VACUUM.
|
||||
if "page_size" in self._pragmas:
|
||||
wanted = int(self._pragmas["page_size"])
|
||||
if db_existed:
|
||||
actual = conn.execute("PRAGMA page_size").fetchone()[0]
|
||||
if actual != wanted:
|
||||
logger.warning(
|
||||
f"page_size={wanted} requested but the cache file already "
|
||||
f"exists with page_size={actual}; the new value takes "
|
||||
"effect only after the cache is wiped (hard_reset()) or "
|
||||
"rebuilt from scratch."
|
||||
)
|
||||
else:
|
||||
conn.execute(f"PRAGMA page_size = {wanted}")
|
||||
# auto_vacuum must be set before the database header is materialized,
|
||||
# i.e. before switching to WAL (which writes the header) — otherwise the
|
||||
# value silently reverts to 0/NONE and only a full VACUUM could apply it.
|
||||
if not db_existed and "auto_vacuum" in self._pragmas:
|
||||
conn.execute(f"PRAGMA auto_vacuum = {self._pragmas['auto_vacuum']}")
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA synchronous=NORMAL")
|
||||
self._apply_pragmas(conn, exclude={"page_size", "auto_vacuum"})
|
||||
return conn
|
||||
|
||||
def _apply_pragmas(
|
||||
self, conn: sqlite3.Connection, exclude: set[str] | None = None
|
||||
) -> None:
|
||||
"""Apply the user-supplied PRAGMAs to *conn*, skipping *exclude*.
|
||||
|
||||
SQLite silently ignores unknown or inapplicable pragmas, so a bad value
|
||||
degrades gracefully (e.g. mmap unsupported) rather than crashing startup.
|
||||
"""
|
||||
skip = exclude or set()
|
||||
for key, value in self._pragmas.items():
|
||||
if key in skip:
|
||||
continue
|
||||
conn.execute(f"PRAGMA {key} = {value}")
|
||||
|
||||
def _ensure_meta_tables(self) -> None:
|
||||
self._conn.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS _sqlmem_meta (
|
||||
@@ -249,23 +311,26 @@ class CacheManager:
|
||||
return dict(self._last_run)
|
||||
|
||||
def is_table_cached(self, table: str) -> bool:
|
||||
row = self._conn.execute(
|
||||
"SELECT 1 FROM _sqlmem_tables WHERE table_name = ?", (table,)
|
||||
).fetchone()
|
||||
with self._lock: # the shared _conn must not be read while a writer uses it
|
||||
row = self._conn.execute(
|
||||
"SELECT 1 FROM _sqlmem_tables WHERE table_name = ?", (table,)
|
||||
).fetchone()
|
||||
return row is not None
|
||||
|
||||
def is_table_full(self, table: str) -> bool:
|
||||
"""True if the whole table (all columns) is cached — a SELECT * cache hit."""
|
||||
row = self._conn.execute(
|
||||
"SELECT is_full FROM _sqlmem_tables WHERE table_name = ?", (table,)
|
||||
).fetchone()
|
||||
with self._lock:
|
||||
row = self._conn.execute(
|
||||
"SELECT is_full FROM _sqlmem_tables WHERE table_name = ?", (table,)
|
||||
).fetchone()
|
||||
return bool(row and row[0])
|
||||
|
||||
def seconds_since_refresh(self, table: str) -> float | None:
|
||||
"""Age of a cached table in seconds, or None if it is not cached."""
|
||||
row = self._conn.execute(
|
||||
"SELECT last_refresh_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
|
||||
).fetchone()
|
||||
with self._lock:
|
||||
row = self._conn.execute(
|
||||
"SELECT last_refresh_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
|
||||
).fetchone()
|
||||
if not row or not row[0]:
|
||||
return None
|
||||
last = datetime.fromisoformat(row[0])
|
||||
@@ -337,12 +402,34 @@ class CacheManager:
|
||||
self._conn.commit()
|
||||
logger.debug(f"Index {idx.name!r} ready on {table} ({cols})")
|
||||
|
||||
def _row_coercer(self, table: str, columns: list[str]):
|
||||
"""Return a per-row coercer for *columns* in source order.
|
||||
|
||||
Columns registered in ``datetime_columns`` for *table* are coerced to
|
||||
INTEGER µs-since-epoch (``to_sqlite_datetime``); everything else keeps the
|
||||
default stringifying coercion (``to_sqlite``). With no datetime columns it
|
||||
is the plain :func:`coerce_row`, so the common path is unchanged.
|
||||
"""
|
||||
dt_cols = set(self._datetime_columns.get(table, ()))
|
||||
dt_idx = {i for i, c in enumerate(columns) if c in dt_cols}
|
||||
if not dt_idx:
|
||||
return coerce_row
|
||||
|
||||
def coerce(row: tuple) -> tuple:
|
||||
return tuple(
|
||||
to_sqlite_datetime(v) if i in dt_idx else to_sqlite(v)
|
||||
for i, v in enumerate(row)
|
||||
)
|
||||
|
||||
return coerce
|
||||
|
||||
def load_table(
|
||||
self,
|
||||
table: str,
|
||||
columns: list[str],
|
||||
source_conn: sqlite3.Connection,
|
||||
full: bool = False,
|
||||
recheck: Callable[[], bool] | None = None,
|
||||
) -> None:
|
||||
"""Stream the source table into the cache in batches.
|
||||
|
||||
@@ -351,15 +438,32 @@ class CacheManager:
|
||||
``fetchall`` of a huge table) and readers keep seeing the previous copy
|
||||
until the swap. Concurrent loads are serialized by ``_load_lock``; the
|
||||
connection lock is only held for the brief per-batch inserts and the swap.
|
||||
|
||||
*recheck* implements double-checked locking against a cache stampede: the
|
||||
decision to load is made by the caller *before* ``_load_lock`` is held, so
|
||||
on a slow cold load a second request for the same table can queue behind
|
||||
the lock and then redundantly reload it. If given, ``recheck()`` is
|
||||
re-evaluated *after* the lock is acquired; when it returns ``True`` the
|
||||
table is already loaded and fresh, so the load is skipped.
|
||||
"""
|
||||
src_cols = ", ".join(quote_source(c, self._dialect) for c in columns)
|
||||
col_defs = ", ".join(f"{quote(c)} TEXT" for c in columns)
|
||||
dt_cols = set(self._datetime_columns.get(table, ()))
|
||||
col_defs = ", ".join(
|
||||
f"{quote(c)} {'INTEGER' if c in dt_cols else 'TEXT'}" for c in columns
|
||||
)
|
||||
coerce = self._row_coercer(table, columns)
|
||||
placeholders = ", ".join("?" * len(columns))
|
||||
staging = f"{table}__sqlmem_load"
|
||||
q_staging = quote(staging)
|
||||
q_table = quote(table)
|
||||
|
||||
with self._load_lock:
|
||||
if recheck is not None and recheck():
|
||||
logger.info(
|
||||
f"Skipping load of {table!r}: a concurrent loader already "
|
||||
"satisfied it (double-checked lock)."
|
||||
)
|
||||
return
|
||||
self.set_state(table, TableState.LOADING)
|
||||
logger.info(f"Fetching {table!r} columns {columns} from source DB (batch={self._fetch_batch})")
|
||||
try:
|
||||
@@ -377,7 +481,7 @@ class CacheManager:
|
||||
batch = cursor.fetchmany(self._fetch_batch) # network outside _lock
|
||||
if not batch:
|
||||
break
|
||||
clean = [coerce_row(row) for row in batch]
|
||||
clean = [coerce(row) for row in batch]
|
||||
with self._lock:
|
||||
self._conn.executemany(insert_sql, clean)
|
||||
self._conn.commit()
|
||||
@@ -420,16 +524,38 @@ class CacheManager:
|
||||
self._read_conns.append(conn)
|
||||
return conn
|
||||
|
||||
def _query_datetime_cols(self, tables: list[str] | None) -> set[str]:
|
||||
"""Datetime columns (stored as INTEGER µs) belonging to *tables*.
|
||||
|
||||
Empty when no table is known/configured, so a query that touches no
|
||||
datetime column pays nothing and behaves exactly as before.
|
||||
"""
|
||||
if not self._datetime_columns or not tables:
|
||||
return set()
|
||||
cols: set[str] = set()
|
||||
for table in tables:
|
||||
cols.update(self._datetime_columns.get(table, ()))
|
||||
return cols
|
||||
|
||||
def execute_in_memory(
|
||||
self, sql: str, params: tuple | list | dict | None = None
|
||||
self,
|
||||
sql: str,
|
||||
params: tuple | list | dict | None = None,
|
||||
tables: list[str] | None = None,
|
||||
) -> tuple[list[str], list[tuple]]:
|
||||
"""Run a read query against the cache.
|
||||
|
||||
In-memory mode serializes with writers on the single connection. Disk mode
|
||||
reads from a per-thread WAL connection, so reads run concurrently with
|
||||
writers and each other (see :meth:`_read_conn`).
|
||||
|
||||
When *tables* names a table with ``datetime_columns``, ISO/datetime query
|
||||
params are coerced to epoch µs so a ``WHERE`` matches the stored INTEGER,
|
||||
and (unless ``return_datetime=False``) those columns are returned as real
|
||||
:class:`~datetime.datetime` objects rather than raw integers.
|
||||
"""
|
||||
bound = coerce_params(params)
|
||||
dt_cols = self._query_datetime_cols(tables)
|
||||
bound = coerce_params(params, dt_table=bool(dt_cols))
|
||||
if self._in_memory:
|
||||
with self._lock:
|
||||
cursor = (
|
||||
@@ -439,19 +565,22 @@ class CacheManager:
|
||||
)
|
||||
col_names = [desc[0] for desc in cursor.description]
|
||||
rows = cursor.fetchall()
|
||||
return col_names, rows
|
||||
else:
|
||||
conn = self._read_conn()
|
||||
cursor = conn.execute(sql) if bound is None else conn.execute(sql, bound)
|
||||
col_names = [desc[0] for desc in cursor.description]
|
||||
rows = cursor.fetchall()
|
||||
|
||||
conn = self._read_conn()
|
||||
cursor = conn.execute(sql) if bound is None else conn.execute(sql, bound)
|
||||
col_names = [desc[0] for desc in cursor.description]
|
||||
rows = cursor.fetchall()
|
||||
if self._return_datetime and dt_cols:
|
||||
rows = reverse_coerce_rows(rows, col_names, dt_cols)
|
||||
return col_names, rows
|
||||
|
||||
# --- delta refresh support ---------------------------------------------
|
||||
|
||||
def get_table_columns(self, table: str) -> list[str]:
|
||||
"""Authoritative ordered column list of a cached table (via PRAGMA)."""
|
||||
rows = self._conn.execute(f"PRAGMA table_info({quote(table)})").fetchall()
|
||||
with self._lock:
|
||||
rows = self._conn.execute(f"PRAGMA table_info({quote(table)})").fetchall()
|
||||
return [r[1] for r in rows]
|
||||
|
||||
def create_unique_index(self, table: str, key_columns: list[str]) -> None:
|
||||
@@ -465,12 +594,15 @@ class CacheManager:
|
||||
self._conn.commit()
|
||||
|
||||
def get_last_synced_at(self, table: str) -> str | None:
|
||||
row = self._conn.execute(
|
||||
"SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
|
||||
).fetchone()
|
||||
with self._lock:
|
||||
row = self._conn.execute(
|
||||
"SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
|
||||
).fetchone()
|
||||
# Stored in a TEXT column: an INTEGER-µs watermark (datetime_columns) comes
|
||||
# back as its digit string; delta._bind_watermark reconstructs the datetime.
|
||||
return row[0] if row else None
|
||||
|
||||
def set_last_synced_at(self, table: str, value: str | None) -> None:
|
||||
def set_last_synced_at(self, table: str, value: str | int | None) -> None:
|
||||
with self._lock:
|
||||
self._conn.execute(
|
||||
"UPDATE _sqlmem_tables SET last_synced_at = ? WHERE table_name = ?",
|
||||
@@ -478,18 +610,23 @@ class CacheManager:
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def max_value(self, table: str, column: str) -> str | None:
|
||||
"""Maximum value of *column* across cached rows (the delta watermark)."""
|
||||
row = self._conn.execute(
|
||||
f"SELECT MAX({quote(column)}) FROM {quote(table)}"
|
||||
).fetchone()
|
||||
def max_value(self, table: str, column: str) -> str | int | None:
|
||||
"""Maximum value of *column* across cached rows (the delta watermark).
|
||||
|
||||
Returns an ``int`` for a datetime column stored as INTEGER µs, else the
|
||||
ISO ``TEXT`` string."""
|
||||
with self._lock:
|
||||
row = self._conn.execute(
|
||||
f"SELECT MAX({quote(column)}) FROM {quote(table)}"
|
||||
).fetchone()
|
||||
return row[0] if row else None
|
||||
|
||||
def upsert_rows(self, table: str, columns: list[str], rows: list[tuple]) -> None:
|
||||
"""Insert-or-replace one batch of *rows* by the table's unique key."""
|
||||
col_list = quote_list(columns)
|
||||
placeholders = ", ".join("?" * len(columns))
|
||||
clean_rows = [coerce_row(row) for row in rows]
|
||||
coerce = self._row_coercer(table, columns)
|
||||
clean_rows = [coerce(row) for row in rows]
|
||||
with self._lock:
|
||||
self._conn.executemany(
|
||||
f"INSERT OR REPLACE INTO {quote(table)} ({col_list}) VALUES ({placeholders})",
|
||||
@@ -498,9 +635,19 @@ class CacheManager:
|
||||
self._conn.commit()
|
||||
|
||||
def count_rows(self, table: str) -> int:
|
||||
row = self._conn.execute(f"SELECT COUNT(*) FROM {quote(table)}").fetchone()
|
||||
with self._lock:
|
||||
row = self._conn.execute(f"SELECT COUNT(*) FROM {quote(table)}").fetchone()
|
||||
return int(row[0]) if row else 0
|
||||
|
||||
def db_size_bytes(self) -> int:
|
||||
"""On-disk size of the cache file in bytes (0 in memory mode / if absent)."""
|
||||
if self._in_memory:
|
||||
return 0
|
||||
try:
|
||||
return self._db_path.stat().st_size
|
||||
except OSError:
|
||||
return 0
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Wipe the entire cache — every cached table plus the on-disk data
|
||||
(the file is deleted in memory mode, VACUUMed in place in disk mode)."""
|
||||
@@ -536,6 +683,81 @@ class CacheManager:
|
||||
except sqlite3.Error as e:
|
||||
logger.error(f"Failed to VACUUM cache file {self._db_path}: {e}")
|
||||
|
||||
def hard_reset(self) -> None:
|
||||
"""Delete the on-disk cache file and reopen it from scratch.
|
||||
|
||||
Unlike :meth:`reset` (which drops tables but keeps the open file, so the
|
||||
baked-in ``page_size``/``auto_vacuum`` cannot change), this closes every
|
||||
connection, removes the file plus its WAL/SHM sidecars, and reopens with
|
||||
all current pragmas applied — so layout pragmas take effect on the fresh
|
||||
file. Disk mode only; in memory mode it falls back to :meth:`reset`.
|
||||
|
||||
Any read in flight on another thread will see its connection closed from
|
||||
under it; treat this as a maintenance operation.
|
||||
"""
|
||||
if self._in_memory:
|
||||
self.reset()
|
||||
return
|
||||
|
||||
logger.info(f"Hard reset: closing connections and deleting {self._db_path}")
|
||||
with self._lock:
|
||||
for conn in self._read_conns:
|
||||
try:
|
||||
conn.close()
|
||||
except sqlite3.Error:
|
||||
pass
|
||||
self._read_conns.clear()
|
||||
self._read_local = threading.local() # force every thread to reopen
|
||||
self._conn.close()
|
||||
|
||||
for suffix in ("", "-wal", "-shm"):
|
||||
p = Path(str(self._db_path) + suffix)
|
||||
if p.exists():
|
||||
p.unlink()
|
||||
|
||||
# Reopen fresh — page_size/auto_vacuum apply to the new empty file.
|
||||
self._conn = self._open_disk_connection(db_existed=False)
|
||||
self._ensure_meta_tables()
|
||||
|
||||
self._states.clear()
|
||||
self._errors.clear()
|
||||
self._last_run.clear()
|
||||
self._error_total = 0
|
||||
logger.info(f"Hard reset complete — cache recreated at {self._db_path}.")
|
||||
|
||||
def vacuum(self, incremental: bool = True, pages: int = 10_000) -> None:
|
||||
"""Run maintenance VACUUM on the on-disk cache (no-op in memory mode).
|
||||
|
||||
``incremental=True`` (default) reclaims up to *pages* free pages without
|
||||
blocking readers or needing extra disk space — but requires the cache to
|
||||
have been created with ``auto_vacuum=INCREMENTAL`` (otherwise it is a
|
||||
no-op). ``incremental=False`` runs a full ``VACUUM``: it rewrites the
|
||||
whole file (needs ~2× disk space, blocks readers) — use only in a
|
||||
maintenance window.
|
||||
"""
|
||||
if self._in_memory:
|
||||
logger.debug("vacuum() called in memory mode — no-op.")
|
||||
return
|
||||
if incremental:
|
||||
av = self._conn.execute("PRAGMA auto_vacuum").fetchone()[0]
|
||||
if av != 2: # 0 = NONE, 1 = FULL, 2 = INCREMENTAL
|
||||
logger.warning(
|
||||
f"vacuum(incremental=True) called but auto_vacuum={av} (not "
|
||||
"INCREMENTAL) — no pages will be reclaimed. Rebuild the cache "
|
||||
"with pragmas={'auto_vacuum': 'INCREMENTAL'} via hard_reset(), "
|
||||
"or run vacuum(incremental=False) for a full VACUUM."
|
||||
)
|
||||
return
|
||||
with self._lock:
|
||||
self._conn.execute(f"PRAGMA incremental_vacuum({pages})")
|
||||
self._conn.commit()
|
||||
logger.info(f"Incremental vacuum: reclaimed up to {pages} pages.")
|
||||
else:
|
||||
logger.info("Full VACUUM started — this may take several minutes.")
|
||||
with self._lock:
|
||||
self._conn.execute("VACUUM")
|
||||
logger.info("Full VACUUM complete.")
|
||||
|
||||
def close(self) -> None:
|
||||
self._backup_to_disk()
|
||||
self._closed = True
|
||||
|
||||
+18
-5
@@ -1,5 +1,5 @@
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
from loguru import logger
|
||||
@@ -8,8 +8,10 @@ from ._sql import quote_source
|
||||
from .cache import CacheManager
|
||||
from .stats import TableState
|
||||
|
||||
_EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc)
|
||||
|
||||
def _bind_watermark(watermark: str) -> datetime | str:
|
||||
|
||||
def _bind_watermark(watermark: str | int, epoch_us: bool = False) -> datetime | str:
|
||||
"""Bind the delta watermark back to the source in its native type.
|
||||
|
||||
The cache stores the change column as an ISO ``TEXT`` string (see
|
||||
@@ -22,11 +24,21 @@ def _bind_watermark(watermark: str) -> datetime | str:
|
||||
driver send a typed timestamp, so the comparison happens natively with no
|
||||
string conversion. Non-datetime change columns (e.g. an integer rowversion)
|
||||
don't parse and are passed through unchanged.
|
||||
|
||||
When the change column is stored as INTEGER µs-since-epoch (``datetime_columns``)
|
||||
*epoch_us* is set: the watermark is a microsecond count (an ``int`` or its digit
|
||||
string, since it round-trips through a TEXT column) and is reconstructed into a
|
||||
UTC :class:`~datetime.datetime` so the source still receives a typed timestamp.
|
||||
"""
|
||||
if epoch_us:
|
||||
try:
|
||||
return _EPOCH + timedelta(microseconds=int(watermark))
|
||||
except (TypeError, ValueError):
|
||||
return watermark if isinstance(watermark, str) else str(watermark)
|
||||
try:
|
||||
return datetime.fromisoformat(watermark)
|
||||
return datetime.fromisoformat(watermark) # type: ignore[arg-type]
|
||||
except (TypeError, ValueError):
|
||||
return watermark
|
||||
return watermark if isinstance(watermark, str) else str(watermark)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@@ -92,9 +104,10 @@ class DeltaRefresher:
|
||||
cursor = source_conn.execute(f"SELECT {col_list} FROM {q_table}")
|
||||
else:
|
||||
change_col = quote_source(cfg.change_column, dialect)
|
||||
epoch_us = cfg.change_column in self._cache._datetime_columns.get(table, ())
|
||||
cursor = source_conn.execute(
|
||||
f"SELECT {col_list} FROM {q_table} WHERE {change_col} >= ?",
|
||||
(_bind_watermark(watermark),),
|
||||
(_bind_watermark(watermark, epoch_us),),
|
||||
)
|
||||
|
||||
# Stream the delta in batches so a large catch-up never materializes at once.
|
||||
|
||||
+151
-17
@@ -18,12 +18,50 @@ from .config import (
|
||||
SQL_DIALECT,
|
||||
)
|
||||
from .delta import DeltaConfig, DeltaRefresher, ResolvedDelta
|
||||
from .exceptions import UndeclaredError
|
||||
from .executor import QueryExecutor
|
||||
from .parser import Params, parse
|
||||
from .parser import Params, ParsedQuery, parse
|
||||
from .registry import ColumnRegistry
|
||||
from .spec import TTL, TableSpec
|
||||
from .stats import Stats, StatsCollector, TableState, TableStats
|
||||
|
||||
|
||||
def _specs_to_config(
|
||||
tables: list[TableSpec],
|
||||
) -> tuple[
|
||||
dict[str, DeltaConfig],
|
||||
dict[str, int],
|
||||
dict[str, list[str | list[str]]],
|
||||
dict[str, list[str]],
|
||||
dict[str, list[str] | None],
|
||||
]:
|
||||
"""Convert declarative ``TableSpec``s into the engine's internal config dicts.
|
||||
|
||||
Returns ``(delta, ttl, indexes, datetime_columns, declared)`` — the first four
|
||||
mirror the legacy kwargs; ``declared`` maps each table to its allowed columns
|
||||
(``None`` = whole table / any column) for fail-fast query checking.
|
||||
"""
|
||||
delta: dict[str, DeltaConfig] = {}
|
||||
ttl: dict[str, int] = {}
|
||||
indexes: dict[str, list[str | list[str]]] = {}
|
||||
datetime_columns: dict[str, list[str]] = {}
|
||||
declared: dict[str, list[str] | None] = {}
|
||||
for spec in tables:
|
||||
if spec.name in declared:
|
||||
raise ValueError(f"Duplicate TableSpec for table {spec.name!r}.")
|
||||
declared[spec.name] = list(spec.columns) if spec.columns is not None else None
|
||||
if spec.indexes:
|
||||
indexes[spec.name] = list(spec.indexes)
|
||||
if spec.datetime_columns:
|
||||
datetime_columns[spec.name] = list(spec.datetime_columns)
|
||||
refresh = spec.refresh
|
||||
if isinstance(refresh, TTL):
|
||||
ttl[spec.name] = refresh.seconds
|
||||
elif isinstance(refresh, DeltaConfig):
|
||||
delta[spec.name] = refresh
|
||||
return delta, ttl, indexes, datetime_columns, declared
|
||||
|
||||
|
||||
class _LazySource:
|
||||
"""A source connection opened on first ``execute`` and shared across one query.
|
||||
|
||||
@@ -65,9 +103,28 @@ class CachingEngine:
|
||||
refresh_interval: int | None = None,
|
||||
fetch_batch: int | None = None,
|
||||
dialect: str | None = None,
|
||||
pragmas: dict[str, str | int] | None = None,
|
||||
datetime_columns: dict[str, list[str]] | None = None,
|
||||
return_datetime: bool = True,
|
||||
tables: list[TableSpec] | None = None,
|
||||
blocking_startup_refresh: bool = False,
|
||||
) -> None:
|
||||
self._source_engine = source_engine
|
||||
|
||||
# Declarative mode: a list of TableSpecs is converted to the same internal
|
||||
# config the legacy delta=/ttl=/indexes=/datetime_columns= kwargs produce,
|
||||
# plus a declared-columns allowlist (for fail-fast) and preload set.
|
||||
self._declared: dict[str, list[str] | None] | None = None
|
||||
self._preload_specs: list[TableSpec] = []
|
||||
if tables is not None:
|
||||
if any(x is not None for x in (delta, ttl, indexes, datetime_columns)):
|
||||
raise ValueError(
|
||||
"Pass either tables=[TableSpec(...)] or the legacy "
|
||||
"delta=/ttl=/indexes=/datetime_columns= kwargs, not both."
|
||||
)
|
||||
delta, ttl, indexes, datetime_columns, self._declared = _specs_to_config(tables)
|
||||
self._preload_specs = [s for s in tables if s.preload]
|
||||
|
||||
use_memory = IN_MEMORY if in_memory is None else in_memory
|
||||
self._dialect = dialect if dialect is not None else SQL_DIALECT
|
||||
self._refresh_interval = (
|
||||
@@ -79,6 +136,9 @@ class CachingEngine:
|
||||
in_memory=use_memory,
|
||||
dialect=self._dialect,
|
||||
fetch_batch=fetch_batch if fetch_batch is not None else FETCH_BATCH_SIZE,
|
||||
pragmas=pragmas,
|
||||
datetime_columns=datetime_columns,
|
||||
return_datetime=return_datetime,
|
||||
)
|
||||
self._registry = ColumnRegistry(self._cache.connection)
|
||||
self._stats = StatsCollector()
|
||||
@@ -95,12 +155,14 @@ class CachingEngine:
|
||||
"reload), not both."
|
||||
)
|
||||
|
||||
if self._delta or self._ttl:
|
||||
# The startup catch-up (deltas/TTL reloads for tables restored from
|
||||
# disk) can take a while on a cold start. By default it runs on the
|
||||
# background thread so it never blocks application startup; callers
|
||||
# who need the cache fully fresh before serving can opt back in.
|
||||
if self._delta or self._ttl or self._preload_specs:
|
||||
# Startup work (preload of declared tables + delta/TTL catch-up for
|
||||
# tables restored from disk) can take a while on a cold start. By
|
||||
# default it runs on the background thread so it never blocks
|
||||
# application startup; callers who need the cache fully warm before
|
||||
# serving can opt back in.
|
||||
if blocking_startup_refresh:
|
||||
self._preload()
|
||||
self._run_refresh()
|
||||
self._start_refresh_thread(initial_catch_up=not blocking_startup_refresh)
|
||||
|
||||
@@ -148,7 +210,11 @@ class CachingEngine:
|
||||
last_runs = self._cache.get_last_runs()
|
||||
with self._cache._lock:
|
||||
base = self._stats.snapshot(self._cache.connection, states)
|
||||
base = replace(base, errors=self._cache.error_total)
|
||||
base = replace(
|
||||
base,
|
||||
errors=self._cache.error_total,
|
||||
db_size_bytes=self._cache.db_size_bytes(),
|
||||
)
|
||||
return replace(
|
||||
base,
|
||||
tables={n: self._enrich(n, t, errors, last_runs) for n, t in base.tables.items()},
|
||||
@@ -189,22 +255,67 @@ class CachingEngine:
|
||||
)
|
||||
return replace(table_stats, tracking=tracking, state=state, last_refresh=last_refresh)
|
||||
|
||||
def _make_executor(self, source: Any) -> QueryExecutor:
|
||||
return QueryExecutor(
|
||||
self._cache,
|
||||
self._registry,
|
||||
source,
|
||||
self._stats,
|
||||
self._delta,
|
||||
self._ttl,
|
||||
self._index_columns,
|
||||
)
|
||||
|
||||
def _check_declared(self, parsed: ParsedQuery) -> None:
|
||||
"""In declarative mode, reject any table/column not declared up front."""
|
||||
if self._declared is None:
|
||||
return
|
||||
for table in parsed.tables:
|
||||
if table not in self._declared:
|
||||
raise UndeclaredError(
|
||||
f"Table {table!r} is not declared in tables=[TableSpec(...)]. "
|
||||
"Add a TableSpec for it (declarative mode is a strict allowlist)."
|
||||
)
|
||||
allowed = self._declared[table]
|
||||
if allowed is None:
|
||||
continue # whole table declared — any column is fine
|
||||
if table in parsed.wildcard_tables:
|
||||
raise UndeclaredError(
|
||||
f"SELECT * on {table!r} is not allowed: only columns {allowed} "
|
||||
"are declared. List the columns explicitly or declare "
|
||||
"columns=None for the whole table."
|
||||
)
|
||||
unknown = [c for c in parsed.columns_by_table.get(table, []) if c not in allowed]
|
||||
if unknown:
|
||||
raise UndeclaredError(
|
||||
f"Column(s) {unknown} of {table!r} are not declared "
|
||||
f"(declared: {allowed})."
|
||||
)
|
||||
|
||||
def execute(self, sql: str, params: Params = None) -> list[dict]:
|
||||
parsed = parse(sql, params, dialect=self._dialect)
|
||||
self._check_declared(parsed)
|
||||
# The source connection is opened lazily — a pure cache hit never touches
|
||||
# the source and never occupies a pool slot.
|
||||
source = _LazySource(self._source_engine)
|
||||
try:
|
||||
executor = QueryExecutor(
|
||||
self._cache,
|
||||
self._registry,
|
||||
source,
|
||||
self._stats,
|
||||
self._delta,
|
||||
self._ttl,
|
||||
self._index_columns,
|
||||
)
|
||||
return executor.execute(parsed)
|
||||
return self._make_executor(source).execute(parsed)
|
||||
finally:
|
||||
source.close()
|
||||
|
||||
def _preload(self) -> None:
|
||||
"""Load declared ``preload=True`` tables into the cache (skipping fresh copies)."""
|
||||
if not self._preload_specs:
|
||||
return
|
||||
source = _LazySource(self._source_engine)
|
||||
try:
|
||||
executor = self._make_executor(source)
|
||||
for spec in self._preload_specs:
|
||||
try:
|
||||
logger.info(f"Preloading {spec.name!r}…")
|
||||
executor.ensure_loaded(spec.name, spec.columns)
|
||||
except Exception as e:
|
||||
logger.error(f"Preload failed for {spec.name!r}: {e}")
|
||||
finally:
|
||||
source.close()
|
||||
|
||||
@@ -240,6 +351,7 @@ class CachingEngine:
|
||||
def _start_refresh_thread(self, initial_catch_up: bool = True) -> None:
|
||||
def loop() -> None:
|
||||
if initial_catch_up:
|
||||
self._preload() # off-main-thread declared-table preload
|
||||
self._run_refresh() # off-main-thread startup catch-up
|
||||
event = threading.Event()
|
||||
while not event.wait(self._refresh_interval):
|
||||
@@ -267,6 +379,28 @@ class CachingEngine:
|
||||
self._cache.reset()
|
||||
logger.info("Cache reset — all tables will be reloaded on next use.")
|
||||
|
||||
def hard_reset(self) -> None:
|
||||
"""Delete the on-disk cache file and reopen with current pragmas/page_size.
|
||||
|
||||
Disk mode only (falls back to :meth:`reset` in memory mode). Use when a
|
||||
layout pragma — ``page_size`` or ``auto_vacuum`` — must change, since
|
||||
those are baked into the file at creation and :meth:`reset` keeps it.
|
||||
All tables reload on next use.
|
||||
"""
|
||||
self._cache.hard_reset()
|
||||
# hard_reset swaps the cache connection — re-point the registry at it.
|
||||
self._registry.rebind(self._cache.connection)
|
||||
logger.info("Cache hard reset — file recreated; all tables reload on next use.")
|
||||
|
||||
def vacuum(self, incremental: bool = True, pages: int = 10_000) -> None:
|
||||
"""Run maintenance VACUUM on the on-disk cache (incremental by default).
|
||||
|
||||
Incremental reclaims free pages left by delta ``INSERT OR REPLACE`` churn
|
||||
cheaply (requires ``auto_vacuum=INCREMENTAL``); a full VACUUM rewrites the
|
||||
whole file and should run only in a maintenance window.
|
||||
"""
|
||||
self._cache.vacuum(incremental=incremental, pages=pages)
|
||||
|
||||
def close(self) -> None:
|
||||
self._cache.close()
|
||||
logger.info("CachingEngine closed.")
|
||||
|
||||
@@ -4,3 +4,13 @@ class ReadOnlyError(Exception):
|
||||
|
||||
class UnsupportedQueryError(Exception):
|
||||
"""Raised when a query uses unsupported features (JOIN, SELECT *)."""
|
||||
|
||||
|
||||
class UndeclaredError(Exception):
|
||||
"""Raised in declarative mode (``tables=[TableSpec(...)]``) when a query
|
||||
references a table or column that was not declared up front.
|
||||
|
||||
Fail-fast by design: an undeclared table/column would otherwise trigger a
|
||||
silent (potentially multi-hour) lazy load/column-expansion, so it is surfaced
|
||||
immediately instead.
|
||||
"""
|
||||
|
||||
+58
-6
@@ -1,3 +1,4 @@
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
from loguru import logger
|
||||
@@ -41,12 +42,40 @@ class QueryExecutor:
|
||||
self._ensure_table(table, parsed)
|
||||
return self._run_in_memory(parsed)
|
||||
|
||||
def ensure_loaded(self, table: str, columns: list[str] | None) -> None:
|
||||
"""Preload *table* into the cache without running a query.
|
||||
|
||||
``columns=None`` loads the whole table (``SELECT *`` semantics); otherwise
|
||||
only the listed columns. Reuses the same load path as a real query — delta
|
||||
key/change + index columns are augmented, the registry and watermark are
|
||||
updated, and double-checked locking skips a copy already fresh in the
|
||||
cache — but never materializes any rows (unlike :meth:`execute`).
|
||||
"""
|
||||
if columns is None:
|
||||
self._ensure_full(table)
|
||||
else:
|
||||
self._ensure_columns(table, columns)
|
||||
|
||||
def _ensure_table(self, table: str, parsed: ParsedQuery) -> None:
|
||||
if table in parsed.wildcard_tables:
|
||||
self._ensure_full(table)
|
||||
else:
|
||||
self._ensure_columns(table, parsed.columns_by_table[table])
|
||||
|
||||
def _full_satisfied(self, table: str) -> bool:
|
||||
"""True if *table* is cached in full and not TTL-expired (a SELECT * hit)."""
|
||||
return (
|
||||
self._cache.is_table_cached(table)
|
||||
and self._cache.is_table_full(table)
|
||||
and not self._ttl_expired(table)
|
||||
)
|
||||
|
||||
def _columns_satisfied(self, table: str, columns: list[str]) -> bool:
|
||||
"""True if *table* is cached with all *columns* present and not TTL-expired."""
|
||||
if not self._cache.is_table_cached(table) or self._ttl_expired(table):
|
||||
return False
|
||||
return set(columns).issubset(self._cache.get_table_columns(table))
|
||||
|
||||
def _ensure_full(self, table: str) -> None:
|
||||
"""Load every column of *table* (SELECT * / t.*), refetching unless already full."""
|
||||
cached = self._cache.is_table_cached(table)
|
||||
@@ -67,7 +96,7 @@ class QueryExecutor:
|
||||
self._stats.record_miss()
|
||||
|
||||
columns = self._cache.discover_columns(table, self._source_conn)
|
||||
self._load(table, columns, full=True)
|
||||
self._load(table, columns, full=True, satisfied=lambda cols: self._full_satisfied(table))
|
||||
|
||||
def _ensure_columns(self, table: str, columns: list[str]) -> None:
|
||||
"""Load *table* with at least *columns*, refetching on new columns or TTL expiry."""
|
||||
@@ -95,10 +124,27 @@ class QueryExecutor:
|
||||
all_columns = list(self._registry.get_columns(table)) + missing
|
||||
# Preserve a fully-cached table's status across a TTL reload.
|
||||
full = table_cached and self._cache.is_table_full(table)
|
||||
self._load(table, all_columns, full=full)
|
||||
self._load(
|
||||
table,
|
||||
all_columns,
|
||||
full=full,
|
||||
satisfied=lambda cols: self._columns_satisfied(table, cols),
|
||||
)
|
||||
|
||||
def _load(self, table: str, columns: list[str], full: bool) -> None:
|
||||
"""Fetch *table* into cache, adding delta key/timestamp and index columns."""
|
||||
def _load(
|
||||
self,
|
||||
table: str,
|
||||
columns: list[str],
|
||||
full: bool,
|
||||
satisfied: Callable[[list[str]], bool] | None = None,
|
||||
) -> None:
|
||||
"""Fetch *table* into cache, adding delta key/timestamp and index columns.
|
||||
|
||||
*satisfied* is the double-checked-locking predicate evaluated under the
|
||||
load lock (see :meth:`CacheManager.load_table`); it is given the final,
|
||||
augmented column list so a concurrent loader that already produced an
|
||||
equivalent (or wider) cache is detected and the redundant reload skipped.
|
||||
"""
|
||||
cfg = self._delta.get(table)
|
||||
extra = list(self._index_columns.get(table, []))
|
||||
if cfg:
|
||||
@@ -108,7 +154,11 @@ class QueryExecutor:
|
||||
if extra:
|
||||
columns = list(dict.fromkeys([*columns, *extra]))
|
||||
|
||||
self._cache.load_table(table, columns, self._source_conn, full=full)
|
||||
recheck: Callable[[], bool] | None = None
|
||||
if satisfied is not None:
|
||||
final_columns = columns
|
||||
recheck = lambda: satisfied(final_columns) # noqa: E731
|
||||
self._cache.load_table(table, columns, self._source_conn, full=full, recheck=recheck)
|
||||
self._registry.update(table, columns)
|
||||
|
||||
if cfg:
|
||||
@@ -118,5 +168,7 @@ class QueryExecutor:
|
||||
|
||||
def _run_in_memory(self, parsed: ParsedQuery) -> list[dict]:
|
||||
logger.debug(f"Executing in SQLite RAM: {parsed.sqlite_sql!r} params={parsed.params!r}")
|
||||
col_names, rows = self._cache.execute_in_memory(parsed.sqlite_sql, parsed.params)
|
||||
col_names, rows = self._cache.execute_in_memory(
|
||||
parsed.sqlite_sql, parsed.params, parsed.tables
|
||||
)
|
||||
return [dict(zip(col_names, row)) for row in rows]
|
||||
|
||||
@@ -12,6 +12,16 @@ class ColumnRegistry:
|
||||
self._lock = Lock()
|
||||
self._ensure_table()
|
||||
|
||||
def rebind(self, mem_conn: sqlite3.Connection) -> None:
|
||||
"""Point the registry at a new cache connection (after a hard reset).
|
||||
|
||||
``CacheManager.hard_reset`` closes and reopens the cache connection, so the
|
||||
connection object the registry captured at construction becomes invalid.
|
||||
"""
|
||||
with self._lock:
|
||||
self._conn = mem_conn
|
||||
self._ensure_table()
|
||||
|
||||
def _ensure_table(self) -> None:
|
||||
self._conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS _sqlmem_columns (
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
"""Declarative table specs for ``CachingEngine(tables=[...])``.
|
||||
|
||||
Instead of the lazy "learn columns from queries" mode, an application can declare
|
||||
each table up front — its columns, indexes, refresh strategy and datetime columns —
|
||||
so the engine preloads them and rejects anything undeclared (fail-fast) rather than
|
||||
silently triggering an expensive lazy load. The legacy ``delta=/ttl=/indexes=``
|
||||
kwargs keep working; ``tables=`` is converted to the same internal config.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from .delta import DeltaConfig
|
||||
|
||||
# Friendly alias for the declarative API; ``Delta`` and ``DeltaConfig`` are the
|
||||
# same type (``change_column`` + ``key_columns``), so either may be used as a
|
||||
# ``TableSpec.refresh`` strategy.
|
||||
Delta = DeltaConfig
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TTL:
|
||||
"""Time-based refresh strategy: full-reload the table when older than *seconds*."""
|
||||
|
||||
seconds: int
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TableSpec:
|
||||
"""Declarative specification of one cached table.
|
||||
|
||||
*columns* lists the columns to cache; leave it ``None`` to cache the whole
|
||||
table (``SELECT *`` semantics) and allow any column. When columns are listed,
|
||||
a query asking for a column outside the list raises
|
||||
:class:`~sqlmem.exceptions.UndeclaredError`.
|
||||
|
||||
*refresh* is a :class:`Delta` (change-column incremental sync) or :class:`TTL`
|
||||
(time-based full reload), or ``None`` for a static table loaded once.
|
||||
|
||||
*preload=True* loads the table at startup (in the background by default) so the
|
||||
first query is a cache hit instead of paying a cold load; a copy already fresh
|
||||
in the persistent cache is skipped.
|
||||
"""
|
||||
|
||||
name: str
|
||||
columns: list[str] | None = None
|
||||
indexes: list[str | list[str]] = field(default_factory=list)
|
||||
refresh: DeltaConfig | TTL | None = None
|
||||
datetime_columns: list[str] = field(default_factory=list)
|
||||
preload: bool = False
|
||||
@@ -40,6 +40,7 @@ class Stats:
|
||||
refetches: int
|
||||
tables: dict[str, TableStats]
|
||||
errors: int = 0 # total load/refresh failures since start
|
||||
db_size_bytes: int = 0 # on-disk cache file size (0 in memory mode)
|
||||
|
||||
|
||||
class StatsCollector:
|
||||
|
||||
@@ -168,3 +168,209 @@ def test_disk_mode_reset_keeps_file(tmp_path, source_conn):
|
||||
assert db_path.exists()
|
||||
assert c.is_table_cached("users") is False
|
||||
c.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pragmas / layout tuning (1.11.0)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_pragmas_applied_on_fresh_disk_cache(tmp_path):
|
||||
"""page_size, auto_vacuum and a generic pragma all take effect on a new file."""
|
||||
c = CacheManager(
|
||||
db_path=tmp_path / "cache.db",
|
||||
backup_interval=9999,
|
||||
in_memory=False,
|
||||
pragmas={"page_size": 8192, "auto_vacuum": "INCREMENTAL", "cache_size": -2000},
|
||||
)
|
||||
assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 8192
|
||||
assert c.connection.execute("PRAGMA auto_vacuum").fetchone()[0] == 2 # INCREMENTAL
|
||||
assert c.connection.execute("PRAGMA cache_size").fetchone()[0] == -2000
|
||||
c.close()
|
||||
|
||||
|
||||
def test_page_size_ignored_on_existing_file_warns(tmp_path):
|
||||
"""A page_size that differs from the existing file is ignored, with a warning."""
|
||||
db_path = tmp_path / "cache.db"
|
||||
c1 = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False)
|
||||
assert c1.connection.execute("PRAGMA page_size").fetchone()[0] == 4096 # default
|
||||
c1.close()
|
||||
|
||||
c2 = CacheManager(
|
||||
db_path=db_path,
|
||||
backup_interval=9999,
|
||||
in_memory=False,
|
||||
pragmas={"page_size": 16384},
|
||||
)
|
||||
# File keeps its original page size; the request is ignored (not an error).
|
||||
assert c2.connection.execute("PRAGMA page_size").fetchone()[0] == 4096
|
||||
c2.close()
|
||||
|
||||
|
||||
def test_unknown_pragma_does_not_crash(tmp_path):
|
||||
"""SQLite ignores unknown/inapplicable pragmas — startup must not fail."""
|
||||
c = CacheManager(
|
||||
db_path=tmp_path / "cache.db",
|
||||
backup_interval=9999,
|
||||
in_memory=False,
|
||||
pragmas={"this_is_not_a_pragma": 1, "mmap_size": 1024 * 1024},
|
||||
)
|
||||
assert c.connection.execute("PRAGMA mmap_size").fetchone()[0] == 1024 * 1024
|
||||
c.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# hard_reset / vacuum (1.11.0)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_hard_reset_recreates_file_and_clears_tables(tmp_path, source_conn):
|
||||
db_path = tmp_path / "cache.db"
|
||||
c = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False)
|
||||
c.load_table("users", ["name"], source_conn)
|
||||
assert c.is_table_cached("users") is True
|
||||
|
||||
c.hard_reset()
|
||||
assert db_path.exists() # reopened fresh
|
||||
assert c.is_table_cached("users") is False
|
||||
# The connection is usable again after the swap.
|
||||
c.load_table("users", ["name"], source_conn)
|
||||
assert c.is_table_cached("users") is True
|
||||
c.close()
|
||||
|
||||
|
||||
def test_hard_reset_applies_new_page_size(tmp_path, source_conn):
|
||||
"""page_size can't change via reset() but does via hard_reset() (fresh file)."""
|
||||
db_path = tmp_path / "cache.db"
|
||||
# Existing file at the default 4096; request 8192 — ignored on open.
|
||||
CacheManager(db_path=db_path, backup_interval=9999, in_memory=False).close()
|
||||
c = CacheManager(
|
||||
db_path=db_path,
|
||||
backup_interval=9999,
|
||||
in_memory=False,
|
||||
pragmas={"page_size": 8192},
|
||||
)
|
||||
assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 4096
|
||||
c.hard_reset() # deletes the file → recreated with the requested page size
|
||||
assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 8192
|
||||
c.close()
|
||||
|
||||
|
||||
def test_hard_reset_in_memory_falls_back_to_reset(tmp_path, source_conn):
|
||||
c = CacheManager(db_path=tmp_path / "cache.db", backup_interval=9999)
|
||||
c.load_table("users", ["name"], source_conn)
|
||||
c.hard_reset() # memory mode → reset()
|
||||
assert c.is_table_cached("users") is False
|
||||
c.close()
|
||||
|
||||
|
||||
def test_full_vacuum_runs_on_disk(tmp_path, source_conn):
|
||||
db_path = tmp_path / "cache.db"
|
||||
c = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False)
|
||||
c.load_table("users", ["name"], source_conn)
|
||||
c.vacuum(incremental=False) # must not raise
|
||||
assert c.is_table_cached("users") is True
|
||||
c.close()
|
||||
|
||||
|
||||
def test_incremental_vacuum_runs_with_auto_vacuum(tmp_path, source_conn):
|
||||
c = CacheManager(
|
||||
db_path=tmp_path / "cache.db",
|
||||
backup_interval=9999,
|
||||
in_memory=False,
|
||||
pragmas={"auto_vacuum": "INCREMENTAL"},
|
||||
)
|
||||
c.load_table("users", ["name"], source_conn)
|
||||
c.vacuum(incremental=True, pages=100) # must not raise
|
||||
assert c.is_table_cached("users") is True
|
||||
c.close()
|
||||
|
||||
|
||||
def test_vacuum_in_memory_is_noop(cache, source_conn):
|
||||
cache.load_table("users", ["name"], source_conn)
|
||||
cache.vacuum(incremental=False) # no-op, no error
|
||||
assert cache.is_table_cached("users") is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Double-checked locking against cache stampede (1.15.0)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _ExplodingSource:
|
||||
def execute(self, *args):
|
||||
raise AssertionError("source must not be queried when recheck() is True")
|
||||
|
||||
|
||||
def test_load_table_recheck_true_skips_load(cache, source_conn):
|
||||
"""A recheck that reports the table already satisfied skips the reload."""
|
||||
cache.load_table("users", ["name"], source_conn)
|
||||
# Second load with recheck() → True must not touch the source at all.
|
||||
cache.load_table("users", ["name"], _ExplodingSource(), recheck=lambda: True)
|
||||
assert cache.is_table_cached("users") is True
|
||||
|
||||
|
||||
def test_concurrent_loads_dedup_via_double_checked_lock(tmp_path):
|
||||
"""A second loader queued behind a slow cold load must not reload the table."""
|
||||
import time
|
||||
|
||||
c = CacheManager(db_path=tmp_path / "c.db", backup_interval=9999)
|
||||
started = threading.Event()
|
||||
release = threading.Event()
|
||||
loads: list[str] = []
|
||||
|
||||
class _GatedCursor:
|
||||
def __init__(self, rows):
|
||||
self._rows = list(rows)
|
||||
self._done = False
|
||||
|
||||
def fetchmany(self, n):
|
||||
if self._done:
|
||||
return []
|
||||
self._done = True
|
||||
return self._rows
|
||||
|
||||
class _GatedSource:
|
||||
def execute(self, sql):
|
||||
loads.append(sql) # one entry per *actual* source load
|
||||
started.set()
|
||||
release.wait(5) # hold the load open (and _load_lock) until released
|
||||
return _GatedCursor([("alice",), ("bob",)])
|
||||
|
||||
def recheck() -> bool:
|
||||
return c.is_table_cached("users") and "name" in c.get_table_columns("users")
|
||||
|
||||
def load() -> None:
|
||||
c.load_table("users", ["name"], _GatedSource(), recheck=recheck)
|
||||
|
||||
a = threading.Thread(target=load)
|
||||
b = threading.Thread(target=load)
|
||||
a.start()
|
||||
assert started.wait(5), "first load never started" # A holds _load_lock, mid-fetch
|
||||
b.start()
|
||||
time.sleep(0.2) # give B time to queue on _load_lock
|
||||
release.set() # let A finish; B then re-checks and skips
|
||||
a.join(5)
|
||||
b.join(5)
|
||||
assert not a.is_alive() and not b.is_alive()
|
||||
|
||||
assert len(loads) == 1 # the redundant second load was skipped
|
||||
assert c.is_table_cached("users") is True
|
||||
_, rows = c.execute_in_memory("SELECT name FROM users ORDER BY name")
|
||||
assert [r[0] for r in rows] == ["alice", "bob"]
|
||||
c.close()
|
||||
|
||||
|
||||
def test_incremental_vacuum_warns_without_incremental_auto_vacuum(tmp_path, source_conn):
|
||||
"""Incremental vacuum on a DB that isn't auto_vacuum=INCREMENTAL warns and skips."""
|
||||
from loguru import logger
|
||||
|
||||
messages: list[str] = []
|
||||
sink_id = logger.add(messages.append, level="WARNING", filter="sqlmem")
|
||||
logger.enable("sqlmem")
|
||||
try:
|
||||
c = CacheManager(db_path=tmp_path / "c.db", backup_interval=9999, in_memory=False)
|
||||
c.load_table("users", ["name"], source_conn)
|
||||
c.vacuum(incremental=True) # auto_vacuum defaults to NONE → no-op + warning
|
||||
c.close()
|
||||
finally:
|
||||
logger.remove(sink_id)
|
||||
logger.disable("sqlmem")
|
||||
assert any("auto_vacuum" in m for m in messages)
|
||||
|
||||
+205
-1
@@ -4,9 +4,17 @@ import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
from sqlmem._coerce import coerce_params, to_sqlite
|
||||
from sqlmem._coerce import (
|
||||
coerce_params,
|
||||
from_sqlite_datetime,
|
||||
reverse_coerce_rows,
|
||||
to_sqlite,
|
||||
to_sqlite_datetime,
|
||||
)
|
||||
from sqlmem.cache import CacheManager
|
||||
|
||||
_UTC = datetime.timezone.utc
|
||||
|
||||
|
||||
class _FakeCursor:
|
||||
def __init__(self, rows):
|
||||
@@ -91,6 +99,202 @@ def test_coerce_params_none():
|
||||
assert coerce_params(None) is None
|
||||
|
||||
|
||||
# --- to_sqlite_datetime (INTEGER µs storage, 1.12.0) ------------------------
|
||||
|
||||
|
||||
def test_datetime_to_epoch_micros():
|
||||
# 2026-06-01T10:00:00Z -> microseconds since epoch
|
||||
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc)
|
||||
expected = int(dt.timestamp() * 1_000_000)
|
||||
assert to_sqlite_datetime(dt) == expected
|
||||
|
||||
|
||||
def test_datetime_naive_treated_as_utc():
|
||||
naive = datetime.datetime(2026, 6, 1, 10, 0, 0)
|
||||
aware = naive.replace(tzinfo=datetime.timezone.utc)
|
||||
assert to_sqlite_datetime(naive) == to_sqlite_datetime(aware)
|
||||
|
||||
|
||||
def test_datetime_micros_are_exact():
|
||||
dt = datetime.datetime(2026, 6, 5, 14, 54, 24, 823000, tzinfo=datetime.timezone.utc)
|
||||
us = to_sqlite_datetime(dt)
|
||||
# round-trips back to the same instant with no rounding loss
|
||||
back = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(
|
||||
microseconds=us
|
||||
)
|
||||
assert back == dt
|
||||
|
||||
|
||||
def test_datetime_none_passes_through():
|
||||
assert to_sqlite_datetime(None) is None
|
||||
|
||||
|
||||
def test_datetime_iso_string_parsed():
|
||||
assert to_sqlite_datetime("2026-06-01T10:00:00+00:00") == to_sqlite_datetime(
|
||||
datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc)
|
||||
)
|
||||
|
||||
|
||||
def test_datetime_unparseable_is_none():
|
||||
assert to_sqlite_datetime("not a date") is None
|
||||
|
||||
|
||||
# --- integration: datetime_columns are stored as INTEGER --------------------
|
||||
|
||||
|
||||
def test_datetime_column_stored_as_integer(tmp_path):
|
||||
c = CacheManager(
|
||||
db_path=tmp_path / "cache.db",
|
||||
backup_interval=9999,
|
||||
datetime_columns={"t": ["changed"]},
|
||||
)
|
||||
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc)
|
||||
c.load_table("t", ["id", "changed"], FakeSource([("1", dt)]))
|
||||
|
||||
# Column declared INTEGER, value stored as µs-since-epoch.
|
||||
coltype = c.connection.execute("PRAGMA table_info(t)").fetchall()
|
||||
types = {row[1]: row[2] for row in coltype}
|
||||
assert types["changed"] == "INTEGER"
|
||||
assert types["id"] == "TEXT"
|
||||
_, out = c.execute_in_memory("SELECT changed FROM t")
|
||||
assert out == [(to_sqlite_datetime(dt),)]
|
||||
c.close()
|
||||
|
||||
|
||||
def test_non_datetime_columns_unaffected_by_datetime_columns(tmp_path):
|
||||
c = CacheManager(
|
||||
db_path=tmp_path / "cache.db",
|
||||
backup_interval=9999,
|
||||
datetime_columns={"t": ["changed"]},
|
||||
)
|
||||
c.load_table("t", ["id", "price"], FakeSource([("1", decimal.Decimal("9.99"))]))
|
||||
_, out = c.execute_in_memory("SELECT id, price FROM t")
|
||||
assert out == [("1", "9.99")] # still TEXT/ISO coercion
|
||||
c.close()
|
||||
|
||||
|
||||
# --- param coercion for datetime_columns (A) --------------------------------
|
||||
|
||||
|
||||
def test_coerce_params_dt_table_iso_string_to_epoch():
|
||||
p = coerce_params(("2026-06-01T10:00:00",), dt_table=True)
|
||||
assert p == (to_sqlite_datetime("2026-06-01T10:00:00"),)
|
||||
|
||||
|
||||
def test_coerce_params_dt_table_datetime_to_epoch():
|
||||
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
|
||||
assert coerce_params((dt,), dt_table=True) == (to_sqlite_datetime(dt),)
|
||||
|
||||
|
||||
def test_coerce_params_dt_table_false_keeps_iso_string():
|
||||
# No datetime table in the query → behaviour unchanged (string stays a string).
|
||||
assert coerce_params(("2026-06-01T10:00:00",), dt_table=False) == (
|
||||
"2026-06-01T10:00:00",
|
||||
)
|
||||
|
||||
|
||||
def test_coerce_params_dt_table_leaves_non_datetime_values():
|
||||
assert coerce_params(("hello", 5, None), dt_table=True) == ("hello", 5, None)
|
||||
|
||||
|
||||
def test_where_on_datetime_column_matches_with_iso_param(tmp_path):
|
||||
"""The critical fix: a WHERE on an INTEGER-µs column with an ISO string param
|
||||
must match instead of comparing INTEGER against TEXT (always 0 rows)."""
|
||||
c = CacheManager(
|
||||
db_path=tmp_path / "cache.db",
|
||||
backup_interval=9999,
|
||||
datetime_columns={"t": ["changed"]},
|
||||
return_datetime=False,
|
||||
)
|
||||
rows = [
|
||||
("1", datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)),
|
||||
("2", datetime.datetime(2026, 6, 3, 10, 0, 0, tzinfo=_UTC)),
|
||||
]
|
||||
c.load_table("t", ["id", "changed"], FakeSource(rows))
|
||||
_, out = c.execute_in_memory(
|
||||
"SELECT id FROM t WHERE changed > ?", ("2026-06-02T00:00:00",), ["t"]
|
||||
)
|
||||
assert [r[0] for r in out] == ["2"]
|
||||
c.close()
|
||||
|
||||
|
||||
def test_where_on_datetime_column_without_table_scope_is_unchanged(tmp_path):
|
||||
"""Without table scope the param isn't coerced — proves the fix is scoped."""
|
||||
c = CacheManager(
|
||||
db_path=tmp_path / "cache.db",
|
||||
backup_interval=9999,
|
||||
datetime_columns={"t": ["changed"]},
|
||||
return_datetime=False,
|
||||
)
|
||||
c.load_table(
|
||||
"t",
|
||||
["id", "changed"],
|
||||
FakeSource([("1", datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC))]),
|
||||
)
|
||||
# No `tables` arg → INTEGER vs TEXT comparison → no match (legacy behaviour).
|
||||
_, out = c.execute_in_memory("SELECT id FROM t WHERE changed > ?", ("2026-01-01T00:00:00",))
|
||||
assert out == []
|
||||
c.close()
|
||||
|
||||
|
||||
# --- reverse coercion: read back as datetime (B) ----------------------------
|
||||
|
||||
|
||||
def test_from_sqlite_datetime_roundtrip():
|
||||
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
|
||||
assert from_sqlite_datetime(to_sqlite_datetime(dt)) == dt
|
||||
|
||||
|
||||
def test_from_sqlite_datetime_passes_non_int():
|
||||
assert from_sqlite_datetime("x") == "x"
|
||||
assert from_sqlite_datetime(None) is None
|
||||
assert from_sqlite_datetime(True) is True # bool is not treated as µs
|
||||
|
||||
|
||||
def test_reverse_coerce_rows_only_named_columns():
|
||||
us = to_sqlite_datetime(datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC))
|
||||
out = reverse_coerce_rows([("1", us)], ["id", "changed"], {"changed"})
|
||||
assert out[0][0] == "1"
|
||||
assert out[0][1] == datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
|
||||
|
||||
|
||||
def test_read_returns_datetime_by_default(tmp_path):
|
||||
c = CacheManager(
|
||||
db_path=tmp_path / "cache.db",
|
||||
backup_interval=9999,
|
||||
datetime_columns={"t": ["changed"]},
|
||||
)
|
||||
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
|
||||
c.load_table("t", ["id", "changed"], FakeSource([("1", dt)]))
|
||||
_, out = c.execute_in_memory("SELECT id, changed FROM t", None, ["t"])
|
||||
assert out == [("1", dt)] # returned as a datetime, not the raw int
|
||||
c.close()
|
||||
|
||||
|
||||
def test_return_datetime_false_keeps_raw_int(tmp_path):
|
||||
c = CacheManager(
|
||||
db_path=tmp_path / "cache.db",
|
||||
backup_interval=9999,
|
||||
datetime_columns={"t": ["changed"]},
|
||||
return_datetime=False,
|
||||
)
|
||||
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
|
||||
c.load_table("t", ["id", "changed"], FakeSource([("1", dt)]))
|
||||
_, out = c.execute_in_memory("SELECT changed FROM t", None, ["t"])
|
||||
assert out == [(to_sqlite_datetime(dt),)] # raw INTEGER µs
|
||||
c.close()
|
||||
|
||||
|
||||
# --- public export (F) ------------------------------------------------------
|
||||
|
||||
|
||||
def test_datetime_to_epoch_us_is_public():
|
||||
from sqlmem import datetime_to_epoch_us
|
||||
|
||||
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)
|
||||
assert datetime_to_epoch_us(dt) == to_sqlite_datetime(dt)
|
||||
|
||||
|
||||
# --- integration: values reach the cache through coercion -------------------
|
||||
|
||||
|
||||
|
||||
+53
-1
@@ -1,6 +1,6 @@
|
||||
import sqlite3
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
@@ -140,6 +140,18 @@ def test_bind_watermark_passes_through_non_datetime():
|
||||
assert _bind_watermark("12345") == "12345"
|
||||
|
||||
|
||||
# --- INTEGER µs watermark binding (datetime_columns, 1.12.0) ----------------
|
||||
|
||||
|
||||
def test_bind_watermark_epoch_us_reconstructs_datetime():
|
||||
dt = datetime(2026, 6, 5, 14, 54, 24, 823000, tzinfo=timezone.utc)
|
||||
us = int(dt.timestamp() * 1_000_000)
|
||||
# Whether the watermark is an int or its digit string (it round-trips through
|
||||
# the TEXT last_synced_at column), it binds back to the same UTC datetime.
|
||||
assert _bind_watermark(us, epoch_us=True) == dt
|
||||
assert _bind_watermark(str(us), epoch_us=True) == dt
|
||||
|
||||
|
||||
class _SpyCursor:
|
||||
def __init__(self, rows):
|
||||
self._rows = list(rows)
|
||||
@@ -174,6 +186,46 @@ def test_refresh_binds_watermark_as_datetime(env):
|
||||
assert params == (datetime(2026, 6, 5, 14, 54, 24, 823000),)
|
||||
|
||||
|
||||
class _RowSource:
|
||||
"""Returns fixed rows for any query (for loading datetime-typed source data)."""
|
||||
|
||||
def __init__(self, rows):
|
||||
self._rows = rows
|
||||
|
||||
def execute(self, sql, params=()):
|
||||
return _SpyCursor(self._rows)
|
||||
|
||||
|
||||
def test_datetime_column_watermark_stored_as_int_and_bound_back(tmp_path):
|
||||
"""A change column declared in datetime_columns is stored as INTEGER µs; the
|
||||
watermark is bound back to a real datetime for the source query."""
|
||||
cache = CacheManager(
|
||||
db_path=tmp_path / "c.db",
|
||||
backup_interval=9999,
|
||||
datetime_columns={"products": ["changed"]},
|
||||
)
|
||||
dt1 = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc)
|
||||
dt2 = datetime(2026, 6, 1, 10, 5, 0, tzinfo=timezone.utc)
|
||||
cache.load_table("products", ["id", "changed"], _RowSource([("1", dt1), ("2", dt2)]))
|
||||
cache.create_unique_index("products", ["id"])
|
||||
cache.set_last_synced_at("products", cache.max_value("products", "changed"))
|
||||
|
||||
# Watermark persisted as the max INTEGER µs (digit string out of the TEXT col).
|
||||
wm = cache.get_last_synced_at("products")
|
||||
assert wm == str(int(dt2.timestamp() * 1_000_000))
|
||||
|
||||
refresher = DeltaRefresher(
|
||||
cache, {"products": ResolvedDelta("changed", ["id"])}
|
||||
)
|
||||
spy = _SpySource(rows=[]) # no new rows — just capture the bound watermark
|
||||
refresher.refresh(spy)
|
||||
|
||||
assert spy.bound, "source query was never issued"
|
||||
_, params = spy.bound[-1]
|
||||
assert params == (dt2,) # bound back as datetime, not an int/string
|
||||
cache.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Refresh failures are recorded (4.3) so a stuck delta is visible in stats
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -385,3 +385,107 @@ def test_two_engines_separate_cache_files(source_engine, tmp_path):
|
||||
assert b._cache.is_table_cached("products") is False # independent cache
|
||||
a.close()
|
||||
b.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pragmas / hard_reset / vacuum (1.11.0)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_engine_passes_pragmas_to_cache(source_engine, tmp_path):
|
||||
ce = CachingEngine(
|
||||
source_engine,
|
||||
cache_db_path=tmp_path / "cache.db",
|
||||
in_memory=False,
|
||||
pragmas={"page_size": 8192, "auto_vacuum": "INCREMENTAL"},
|
||||
)
|
||||
assert ce._cache.connection.execute("PRAGMA page_size").fetchone()[0] == 8192
|
||||
assert ce._cache.connection.execute("PRAGMA auto_vacuum").fetchone()[0] == 2
|
||||
ce.close()
|
||||
|
||||
|
||||
def test_engine_hard_reset_reloads(source_engine, tmp_path):
|
||||
ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False)
|
||||
ce.execute("SELECT id FROM products")
|
||||
assert ce._cache.is_table_cached("products") is True
|
||||
|
||||
ce.hard_reset()
|
||||
assert ce._cache.is_table_cached("products") is False
|
||||
rows = ce.execute("SELECT id, name FROM products") # reloads on next use
|
||||
assert len(rows) == 3
|
||||
ce.close()
|
||||
|
||||
|
||||
def test_engine_vacuum_runs(source_engine, tmp_path):
|
||||
ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False)
|
||||
ce.execute("SELECT id FROM products")
|
||||
ce.vacuum(incremental=False) # must not raise
|
||||
assert ce._cache.is_table_cached("products") is True
|
||||
ce.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# datetime_columns end-to-end: param coercion (A) + read-back datetime (B)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture
|
||||
def events_engine(tmp_path):
|
||||
src = tmp_path / "events.db"
|
||||
conn = sqlite3.connect(src)
|
||||
conn.execute("CREATE TABLE events (id TEXT, changed TEXT)")
|
||||
conn.executemany(
|
||||
"INSERT INTO events VALUES (?, ?)",
|
||||
[("1", "2026-06-01T10:00:00"), ("2", "2026-06-03T10:00:00")],
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
se = create_engine(f"sqlite:///{src}")
|
||||
yield se
|
||||
se.dispose()
|
||||
|
||||
|
||||
def test_datetime_column_where_and_readback(events_engine, tmp_path):
|
||||
from datetime import datetime, timezone
|
||||
|
||||
ce = CachingEngine(
|
||||
events_engine,
|
||||
cache_db_path=tmp_path / "cache.db",
|
||||
in_memory=False,
|
||||
datetime_columns={"events": ["changed"]},
|
||||
)
|
||||
# A: WHERE on the INTEGER-µs column with an ISO string param returns the right row.
|
||||
rows = ce.execute(
|
||||
"SELECT id, changed FROM events WHERE changed > ?", ("2026-06-02T00:00:00",)
|
||||
)
|
||||
assert [r["id"] for r in rows] == ["2"]
|
||||
# B: the column comes back as a datetime, not a raw integer.
|
||||
assert rows[0]["changed"] == datetime(2026, 6, 3, 10, 0, 0, tzinfo=timezone.utc)
|
||||
ce.close()
|
||||
|
||||
|
||||
def test_datetime_column_return_datetime_false(events_engine, tmp_path):
|
||||
ce = CachingEngine(
|
||||
events_engine,
|
||||
cache_db_path=tmp_path / "cache.db",
|
||||
in_memory=False,
|
||||
datetime_columns={"events": ["changed"]},
|
||||
return_datetime=False,
|
||||
)
|
||||
rows = ce.execute("SELECT id, changed FROM events")
|
||||
assert all(isinstance(r["changed"], int) for r in rows) # opt-out → raw µs
|
||||
ce.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# db_size_bytes in stats (D)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_stats_reports_db_size_in_disk_mode(source_engine, tmp_path):
|
||||
ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False)
|
||||
ce.execute("SELECT id FROM products")
|
||||
assert ce.stats.db_size_bytes > 0
|
||||
ce.close()
|
||||
|
||||
|
||||
def test_stats_db_size_zero_in_memory(engine):
|
||||
engine.execute("SELECT id, name FROM products")
|
||||
assert engine.stats.db_size_bytes == 0
|
||||
|
||||
@@ -0,0 +1,221 @@
|
||||
import sqlite3
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
from sqlmem import (
|
||||
TTL,
|
||||
CachingEngine,
|
||||
Delta,
|
||||
DeltaConfig,
|
||||
TableSpec,
|
||||
UndeclaredError,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def spec_source(tmp_path):
|
||||
db = tmp_path / "src.db"
|
||||
conn = sqlite3.connect(db)
|
||||
conn.execute("CREATE TABLE products (id TEXT, name TEXT, price TEXT, changed TEXT)")
|
||||
conn.executemany(
|
||||
"INSERT INTO products VALUES (?, ?, ?, ?)",
|
||||
[
|
||||
("1", "Widget", "9.99", "2026-06-01T10:00:00"),
|
||||
("2", "Gadget", "19.99", "2026-06-02T10:00:00"),
|
||||
],
|
||||
)
|
||||
conn.execute("CREATE TABLE orders (order_id TEXT, qty TEXT)")
|
||||
conn.executemany("INSERT INTO orders VALUES (?, ?)", [("101", "2")])
|
||||
conn.commit()
|
||||
conn.close()
|
||||
se = create_engine(f"sqlite:///{db}")
|
||||
yield se
|
||||
se.dispose()
|
||||
|
||||
|
||||
# --- back-compat / validation -----------------------------------------------
|
||||
|
||||
|
||||
def test_tables_and_legacy_kwargs_are_mutually_exclusive(spec_source):
|
||||
with pytest.raises(ValueError):
|
||||
CachingEngine(
|
||||
spec_source,
|
||||
tables=[TableSpec("products", ["id"])],
|
||||
delta={"products": DeltaConfig("changed", ["id"])},
|
||||
)
|
||||
|
||||
|
||||
def test_duplicate_tablespec_raises(spec_source):
|
||||
with pytest.raises(ValueError):
|
||||
CachingEngine(
|
||||
spec_source,
|
||||
tables=[TableSpec("products", ["id"]), TableSpec("products", ["name"])],
|
||||
)
|
||||
|
||||
|
||||
def test_delta_alias_is_deltaconfig():
|
||||
assert Delta is DeltaConfig
|
||||
|
||||
|
||||
# --- preload ----------------------------------------------------------------
|
||||
|
||||
|
||||
def test_preload_blocking_caches_before_first_query(spec_source, tmp_path):
|
||||
ce = CachingEngine(
|
||||
spec_source,
|
||||
cache_db_path=tmp_path / "c.db",
|
||||
tables=[TableSpec("products", ["id", "name"], preload=True)],
|
||||
blocking_startup_refresh=True,
|
||||
)
|
||||
# Cached at construction time — no execute() needed.
|
||||
assert ce._cache.is_table_cached("products") is True
|
||||
rows = ce.execute("SELECT id, name FROM products")
|
||||
assert {r["id"] for r in rows} == {"1", "2"}
|
||||
ce.close()
|
||||
|
||||
|
||||
def test_preload_non_blocking_eventually_caches(spec_source, tmp_path):
|
||||
ce = CachingEngine(
|
||||
spec_source,
|
||||
cache_db_path=tmp_path / "c.db",
|
||||
tables=[TableSpec("products", ["id", "name"], preload=True)],
|
||||
)
|
||||
deadline = time.time() + 5
|
||||
while not ce._cache.is_table_cached("products") and time.time() < deadline:
|
||||
time.sleep(0.05)
|
||||
assert ce._cache.is_table_cached("products") is True
|
||||
ce.close()
|
||||
|
||||
|
||||
def test_non_preload_table_is_not_loaded_at_startup(spec_source, tmp_path):
|
||||
ce = CachingEngine(
|
||||
spec_source,
|
||||
cache_db_path=tmp_path / "c.db",
|
||||
tables=[TableSpec("products", ["id", "name"], preload=False)],
|
||||
blocking_startup_refresh=True,
|
||||
)
|
||||
assert ce._cache.is_table_cached("products") is False # loads lazily on first query
|
||||
ce.execute("SELECT id, name FROM products")
|
||||
assert ce._cache.is_table_cached("products") is True
|
||||
ce.close()
|
||||
|
||||
|
||||
# --- fail-fast on undeclared tables / columns -------------------------------
|
||||
|
||||
|
||||
def test_fail_fast_undeclared_table(spec_source):
|
||||
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
|
||||
with pytest.raises(UndeclaredError):
|
||||
ce.execute("SELECT order_id FROM orders")
|
||||
ce.close()
|
||||
|
||||
|
||||
def test_fail_fast_undeclared_column(spec_source):
|
||||
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
|
||||
with pytest.raises(UndeclaredError):
|
||||
ce.execute("SELECT price FROM products")
|
||||
ce.close()
|
||||
|
||||
|
||||
def test_declared_columns_query_succeeds(spec_source):
|
||||
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
|
||||
rows = ce.execute("SELECT id, name FROM products")
|
||||
assert len(rows) == 2
|
||||
ce.close()
|
||||
|
||||
|
||||
def test_columns_none_allows_wildcard_and_any_column(spec_source):
|
||||
ce = CachingEngine(spec_source, tables=[TableSpec("products", columns=None)])
|
||||
assert len(ce.execute("SELECT * FROM products")) == 2
|
||||
assert len(ce.execute("SELECT price FROM products")) == 2 # any column allowed
|
||||
ce.close()
|
||||
|
||||
|
||||
def test_wildcard_on_column_restricted_table_fails(spec_source):
|
||||
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
|
||||
with pytest.raises(UndeclaredError):
|
||||
ce.execute("SELECT * FROM products")
|
||||
ce.close()
|
||||
|
||||
|
||||
def test_lazy_mode_has_no_fail_fast(spec_source):
|
||||
"""Without tables=, undeclared columns/tables load lazily as before."""
|
||||
ce = CachingEngine(spec_source)
|
||||
rows = ce.execute("SELECT order_id FROM orders")
|
||||
assert len(rows) == 1
|
||||
ce.close()
|
||||
|
||||
|
||||
# --- refresh strategies via TableSpec ---------------------------------------
|
||||
|
||||
|
||||
def test_tablespec_delta_tracking(spec_source, tmp_path):
|
||||
ce = CachingEngine(
|
||||
spec_source,
|
||||
cache_db_path=tmp_path / "c.db",
|
||||
tables=[
|
||||
TableSpec(
|
||||
"products",
|
||||
["id", "name", "changed"],
|
||||
refresh=Delta(change_column="changed", key_columns=["id"]),
|
||||
preload=True,
|
||||
)
|
||||
],
|
||||
blocking_startup_refresh=True,
|
||||
)
|
||||
assert ce.stats.tables["products"].tracking == "delta"
|
||||
ce.close()
|
||||
|
||||
|
||||
def test_tablespec_ttl_tracking(spec_source, tmp_path):
|
||||
ce = CachingEngine(
|
||||
spec_source,
|
||||
cache_db_path=tmp_path / "c.db",
|
||||
tables=[TableSpec("products", ["id", "name"], refresh=TTL(seconds=1800), preload=True)],
|
||||
blocking_startup_refresh=True,
|
||||
)
|
||||
assert ce.stats.tables["products"].tracking == "ttl"
|
||||
ce.close()
|
||||
|
||||
|
||||
def test_tablespec_datetime_columns_roundtrip(spec_source, tmp_path):
|
||||
ce = CachingEngine(
|
||||
spec_source,
|
||||
cache_db_path=tmp_path / "c.db",
|
||||
tables=[
|
||||
TableSpec("products", ["id", "changed"], datetime_columns=["changed"], preload=True)
|
||||
],
|
||||
blocking_startup_refresh=True,
|
||||
)
|
||||
rows = ce.execute("SELECT id, changed FROM products WHERE changed > ?", ("2026-06-01T12:00:00",))
|
||||
assert [r["id"] for r in rows] == ["2"] # param coercion via datetime_columns
|
||||
assert rows[0]["changed"] == datetime(2026, 6, 2, 10, 0, 0, tzinfo=timezone.utc)
|
||||
ce.close()
|
||||
|
||||
|
||||
# --- warm restart: preload skips a copy already fresh on disk ---------------
|
||||
|
||||
|
||||
def test_warm_restart_preload_skips_reload(spec_source, tmp_path):
|
||||
path = tmp_path / "c.db"
|
||||
|
||||
def make() -> CachingEngine:
|
||||
return CachingEngine(
|
||||
spec_source,
|
||||
cache_db_path=path,
|
||||
in_memory=False,
|
||||
tables=[TableSpec("products", ["id", "name"], preload=True)],
|
||||
blocking_startup_refresh=True,
|
||||
)
|
||||
|
||||
ce1 = make()
|
||||
assert ce1.stats.misses >= 1 # cold preload had to load from source
|
||||
ce1.close()
|
||||
|
||||
ce2 = make()
|
||||
assert ce2._cache.is_table_cached("products") is True
|
||||
assert ce2.stats.misses == 0 # warm: preload was a cache hit, no redundant reload
|
||||
ce2.close()
|
||||
Reference in New Issue
Block a user