Compare commits

..

2 Commits

Author SHA1 Message Date
Jan Doubravský 8e46ee3547 Store named datetime columns as INTEGER microseconds (datetime_columns) 2026-06-09 18:18:38 +02:00
Jan Doubravský a21b5a2a04 Add pragmas, hard_reset, and vacuum for tuning disk-backed caches 2026-06-09 17:58:41 +02:00
14 changed files with 611 additions and 25 deletions
+2 -5
View File
@@ -1,8 +1,6 @@
# Python
__pycache__/
*.py[cod]
*.pyo
*.pyd
*.egg
*.egg-info/
dist/
@@ -40,12 +38,11 @@ Thumbs.db
.env.*
# sqlmem cache (incl. WAL sidecars from disk-backed mode)
cache.db
cache.db-wal
cache.db-shm
cache.db*
# Agents
AGENTS.md
CLAUDE.md
DESIGN_DOCUMENT_MODULE.md
.claude/
handover.md
+33
View File
@@ -6,6 +6,39 @@ All notable changes to this project will be documented in this file.
---
## [1.12.0] - 2026-06-09
### ⚠️ Breaking
- **`SCHEMA_VERSION` bumped `3``4`** — on upgrade the existing cache is wiped automatically (disk mode wipes the file in place, in-memory discards the backup) and reloaded from the source on next use. For a large cache (e.g. a multi-hundred-million-row table) the full reload can take a while; deploy in a maintenance window.
- **`datetime_columns` change the public output contract for the chosen columns** — a column listed in `datetime_columns` is stored and returned as an **INTEGER (microseconds since the Unix epoch, UTC)**, not an ISO `TEXT` string. This is opt-in per column, so no table is affected unless you name its columns; consumers that read or filter such a column must adapt (compare against integer µs, or convert on read).
### Added
- **`datetime_columns=` parameter on `CachingEngine` / `CacheManager`** — `datetime_columns={"VW_X": ["CHANGE_DATE"]}` stores the named datetime columns as INTEGER µs-since-epoch instead of ~28-byte ISO `TEXT`. Saves ~20 bytes per row and makes index comparisons on the column operate on native integers instead of string collation — worthwhile for a pure datetime column on a very large table (e.g. a delta change column that is also range-scanned).
- `_coerce.to_sqlite_datetime()` converts datetimes (and ISO/`date` values) to exact integer microseconds via integer arithmetic (no float rounding); a naive datetime is treated as UTC, `None` passes through.
- `load_table` declares those columns `INTEGER` and `upsert_rows` coerces them the same way, so full loads and delta upserts agree on the on-disk representation.
- The delta high-watermark for such a column is the stored integer; `delta._bind_watermark(..., epoch_us=True)` reconstructs a real UTC `datetime` before binding, so the source still receives a typed timestamp (and the watermark fix from 1.8.0 keeps holding).
### Changed
- `pyproject.toml` — bumped version to `1.12.0`.
- `CacheManager.max_value` / `set_last_synced_at` now accept/return `int` watermarks alongside `str` (the INTEGER-µs watermark round-trips through the `last_synced_at` TEXT column as its digit string).
---
## [1.11.0] - 2026-06-09
### Added
- **`pragmas=` parameter on `CachingEngine` / `CacheManager`** — pass a dict of SQLite PRAGMAs (e.g. `mmap_size`, `cache_size`, `temp_store`, `page_size`, `auto_vacuum`) applied to the cache connection at open time, so disk-backed caches can be tuned for the host's I/O profile without bypassing `CacheManager`. Unknown/inapplicable pragmas are silently ignored by SQLite (graceful degradation, no startup crash).
- **`page_size`** is a layout pragma: it is applied only on a *fresh* file (set before WAL / the first table). On an existing cache with a different page size the request is ignored and a one-time warning is logged — the new value takes effect only after `hard_reset()` or a rebuild.
- **`auto_vacuum`** is set before the database header is materialized (before switching to WAL) on a fresh file, so `INCREMENTAL`/`FULL` actually stick instead of silently reverting to `NONE`.
- **`CachingEngine.hard_reset()` / `CacheManager.hard_reset()`** — close every connection, delete the on-disk cache file (and its `-wal`/`-shm` sidecars) and reopen from scratch with all current pragmas applied. Unlike `reset()` (which drops tables but keeps the open file), this lets `page_size`/`auto_vacuum` change, since those are baked into the file at creation. Disk mode only — falls back to `reset()` in memory mode. All tables reload on next use.
- **`CachingEngine.vacuum(incremental=True, pages=10_000)` / `CacheManager.vacuum(...)`** — run maintenance VACUUM on the on-disk cache to reclaim free pages left by delta `INSERT OR REPLACE` churn. Incremental (default) reclaims up to `pages` pages without blocking readers or extra disk (requires `auto_vacuum=INCREMENTAL`); `incremental=False` runs a full VACUUM (rewrites the file, ~2× disk, blocks readers — maintenance window only). No-op in memory mode.
### Changed
- `pyproject.toml` — bumped version to `1.11.0`.
- `ColumnRegistry` gained `rebind()` so it follows the cache connection swap performed by `hard_reset()` (the registry previously captured the connection for the process lifetime).
---
## [1.10.0] - 2026-06-09
### Added
+46
View File
@@ -263,17 +263,61 @@ engine = CachingEngine(base_engine, in_memory=False)
The constructor argument wins over the env var; when `in_memory` is omitted it falls back to `SQLMEM_IN_MEMORY`.
#### Tuning the SQLite layer (`pragmas=`)
For a large disk-backed cache, pass SQLite PRAGMAs to tune the read path and on-disk layout without bypassing SQLmem:
```python
engine = CachingEngine(
base_engine,
in_memory=False,
pragmas={
"mmap_size": 32 * 1024**3, # map the DB into the address space (32 GB)
"cache_size": -262144, # 256 MB page cache (negative = KiB)
"temp_store": 2, # ORDER BY / GROUP BY scratch in RAM
"page_size": 8192, # larger pages → fewer reads on range scans
"auto_vacuum": "INCREMENTAL",# reclaim free pages with vacuum() (see below)
},
)
```
- Every entry is applied as `PRAGMA <key> = <value>` when the cache connection opens. **Unknown or inapplicable pragmas are silently ignored** by SQLite, so a bad value degrades gracefully instead of crashing startup.
- **`page_size` and `auto_vacuum` are layout pragmas** — they only take effect on a *fresh* file (set before the first table). On an existing cache, `page_size` is ignored with a one-time warning; use [`hard_reset()`](#manual-cache-control) to rebuild the file with the new value.
#### INTEGER datetime columns (`datetime_columns=`)
A pure datetime column stored as an ISO `TEXT` string costs ~28 bytes per row and compares by string collation. For a large table you can store named datetime columns as **INTEGER microseconds since the Unix epoch** instead — 8 bytes, native integer comparison:
```python
engine = CachingEngine(
base_engine,
delta={"VW_P_PRATVALUES": DeltaConfig("CHANGE_DATE", ["PRATVALUE_ID"])},
datetime_columns={"VW_P_PRATVALUES": ["CHANGE_DATE"]},
)
```
- **Opt-in per column.** Only the columns you name change; everything else keeps the default lossless `TEXT` storage.
- ⚠️ **It changes the output contract for those columns**`execute()` returns them as `int` (µs since epoch), not ISO strings, and a `WHERE` on them must compare against integer µs. Don't list a column your callers read as a string.
- The delta watermark is handled transparently: it is persisted as the integer and bound back to a real `datetime` for the source query, so incremental refresh keeps working.
- ⚠️ This is a **breaking on-disk change** (`SCHEMA_VERSION` 4): an existing cache is wiped and reloaded on first start after enabling it — schedule a maintenance window for a large reload.
## Manual cache control
```python
engine.invalidate("orders") # drop one table from cache; next query re-fetches it from DB
engine.reset() # wipe the whole cache (RAM + cache.db) — full clean slate
engine.hard_reset() # disk mode: delete the file and reopen with current pragmas/page_size
engine.vacuum() # disk mode: incremental VACUUM (reclaim free pages from delta churn)
engine.refresh() # pull deltas for all delta-tracked tables now
engine.close() # flush to disk and shut down background thread
```
Use `reset()` after a **structural change** in the source (columns added/removed, values cleared in bulk without bumping the change timestamp) so the cache rebuilds from scratch. `invalidate(table)` is the targeted version for a single table.
`hard_reset()` goes further than `reset()` in disk mode: it closes every connection, deletes `cache.db` (and its `-wal`/`-shm` sidecars) and reopens from scratch — the only way to change a baked-in `page_size`/`auto_vacuum`. In memory mode it falls back to `reset()`.
`vacuum()` reclaims free pages left behind by delta `INSERT OR REPLACE` churn. Incremental (the default) is cheap and non-blocking but needs `auto_vacuum=INCREMENTAL`; `vacuum(incremental=False)` runs a full VACUUM that rewrites the file (~2× disk, blocks readers) — schedule it in a maintenance window. Both are no-ops in memory mode.
## Runtime statistics
```python
@@ -346,6 +390,8 @@ engine = CachingEngine(
refresh_interval=300, # SQLMEM_REFRESH_INTERVAL
fetch_batch=10000, # SQLMEM_FETCH_BATCH
dialect="tsql", # SQLMEM_SQL_DIALECT
pragmas={"mmap_size": 32 * 1024**3, "page_size": 8192}, # disk-mode SQLite tuning
datetime_columns={"orders": ["created_at"]}, # store these as INTEGER µs (opt-in)
blocking_startup_refresh=False, # block startup until caught up? (default: no)
)
```
+4
View File
@@ -215,6 +215,10 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o
- [x] **Quoting identifikátorů**: názvy tabulek/sloupců se kvótují (SQLite `"x"` pro cache, dialekt zdroje — T-SQL `[x]` — pro source), takže rezervovaná slova i mezery fungují.
- [x] **Lazy source connection**: `execute()` neotevírá spojení ke zdroji při cache hitu (neobsazuje pool slot).
- [x] **Idempotentní `add_sink`**: opakované volání pro stejný sink je no-op (žádné duplicitní logy).
- [x] **Ladění SQLite vrstvy (`pragmas=`)**: `CachingEngine(..., pragmas={...})` aplikuje libovolné PRAGMA na cache spojení (`mmap_size`, `cache_size`, `temp_store`, `page_size`, `auto_vacuum`). `page_size` a `auto_vacuum` jsou layout-pragmata — platí jen na čerstvém souboru (page_size na existující cache se ignoruje s warningem). Neznámá pragmata SQLite tiše ignoruje.
- [x] **`hard_reset()`**: smaže on-disk soubor (+ WAL/SHM) a otevře nový s aktuálními pragmaty — na rozdíl od `reset()` umožní změnit `page_size`/`auto_vacuum`. Jen disk mód (v memory módu fallback na `reset()`).
- [x] **`vacuum(incremental=, pages=)`**: údržbový VACUUM cache souboru — inkrementální (uvolní volné stránky po delta `INSERT OR REPLACE`, vyžaduje `auto_vacuum=INCREMENTAL`) nebo plný (přepíše soubor, jen v maintenance okně). V memory módu no-op.
- [x] **Nativní INTEGER ukládání datetime sloupců (`datetime_columns=`)**: `datetime_columns={"VW_X": ["CHANGE_DATE"]}` — vyjmenované datetime sloupce se ukládají jako INTEGER µs-od-epochy místo ~28 B ISO TEXT (úspora místa + nativní celočíselné porovnání indexu). Opt-in per sloupec → mění výstupní kontrakt jen u zvolených sloupců (vrací int, ne ISO string). Breaking: `SCHEMA_VERSION` 3→4, cache se při upgrade smaže a načte znovu. Watermark se persistuje jako int a `_bind_watermark(epoch_us=True)` ho rekonstruuje zpět na `datetime` pro zdroj.
## TODO — budoucí funkce
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "sqlmem"
version = "1.10.0"
version = "1.12.0"
description = ""
authors = [
{name = "jan.doubravsky@gmail.com"}
+25
View File
@@ -15,6 +15,8 @@ from typing import Any
Params = tuple | list | dict | None
_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
def to_sqlite(value: Any) -> Any:
if isinstance(value, decimal.Decimal):
@@ -28,6 +30,29 @@ def to_sqlite(value: Any) -> Any:
return value
def to_sqlite_datetime(value: Any) -> int | None:
"""Store a datetime as INTEGER microseconds since the Unix epoch (UTC).
Used for columns the caller marks via ``datetime_columns``: 8 bytes as an
INTEGER instead of a ~28-byte ISO ``TEXT`` string, and integer comparison on
the change column instead of string collation. ``None`` passes through; a
naive datetime is treated as UTC. A non-datetime value is parsed from its ISO
string form (so ``date``/ISO-``str`` inputs work too); anything unparseable
becomes ``None``.
"""
if value is None:
return None
if isinstance(value, datetime.datetime):
if value.tzinfo is None:
value = value.replace(tzinfo=datetime.timezone.utc)
delta = value - _EPOCH # exact integer arithmetic (no float rounding)
return delta.days * 86_400_000_000 + delta.seconds * 1_000_000 + delta.microseconds
try:
return to_sqlite_datetime(datetime.datetime.fromisoformat(str(value)))
except (TypeError, ValueError):
return None
def coerce_row(row: tuple) -> tuple:
return tuple(to_sqlite(v) for v in row)
+161 -11
View File
@@ -9,12 +9,12 @@ from pathlib import Path
from loguru import logger
import sqlmem._meta as _meta
from ._coerce import coerce_params, coerce_row
from ._coerce import coerce_params, coerce_row, to_sqlite, to_sqlite_datetime
from ._sql import quote, quote_list, quote_source
from .config import FETCH_BATCH_SIZE, SQL_DIALECT
from .stats import TableState
SCHEMA_VERSION = 3
SCHEMA_VERSION = 4
@dataclass(frozen=True)
@@ -40,12 +40,17 @@ class CacheManager:
in_memory: bool = True,
dialect: str = SQL_DIALECT,
fetch_batch: int = FETCH_BATCH_SIZE,
pragmas: dict[str, str | int] | None = None,
datetime_columns: dict[str, list[str]] | None = None,
) -> None:
self._db_path = db_path
self._backup_interval = backup_interval
self._in_memory = in_memory
self._dialect = dialect # source-DB dialect, for identifier quoting
self._fetch_batch = fetch_batch # rows fetched per source batch
self._pragmas = dict(pragmas or {}) # extra read/layout PRAGMAs (disk mode)
# table → columns stored as INTEGER µs-since-epoch instead of ISO TEXT
self._datetime_columns = {t: list(c) for t, c in (datetime_columns or {}).items()}
self._lock = threading.Lock() # serializes connection access
self._load_lock = threading.Lock() # serializes full table loads
self._states: dict[str, str] = {} # table → live processing state
@@ -59,12 +64,12 @@ class CacheManager:
if in_memory:
self._conn = sqlite3.connect(":memory:", check_same_thread=False)
self._apply_pragmas(self._conn)
else:
# Disk-backed: query the on-disk file directly — no RAM copy, every
# write persists immediately, and the cache can exceed available RAM.
self._conn = sqlite3.connect(str(db_path), check_same_thread=False)
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA synchronous=NORMAL")
db_existed = db_path.exists() and db_path.stat().st_size > 0
self._conn = self._open_disk_connection(db_existed)
self._discard_if_schema_mismatch()
self._ensure_meta_tables()
@@ -83,6 +88,54 @@ class CacheManager:
def connection(self) -> sqlite3.Connection:
return self._conn
def _open_disk_connection(self, db_existed: bool) -> sqlite3.Connection:
"""Open the on-disk cache connection with WAL + the configured pragmas.
``page_size`` and ``auto_vacuum`` are layout pragmas that only take
effect on a *fresh* file (before the first table exists), so they are
applied conditionally on ``db_existed``; everything else is applied
unconditionally. Used by both ``__init__`` and :meth:`hard_reset`.
"""
conn = sqlite3.connect(str(self._db_path), check_same_thread=False)
# page_size must be set before WAL/the first table on a brand-new file;
# on an existing file it is silently ignored until the next VACUUM.
if "page_size" in self._pragmas:
wanted = int(self._pragmas["page_size"])
if db_existed:
actual = conn.execute("PRAGMA page_size").fetchone()[0]
if actual != wanted:
logger.warning(
f"page_size={wanted} requested but the cache file already "
f"exists with page_size={actual}; the new value takes "
"effect only after the cache is wiped (hard_reset()) or "
"rebuilt from scratch."
)
else:
conn.execute(f"PRAGMA page_size = {wanted}")
# auto_vacuum must be set before the database header is materialized,
# i.e. before switching to WAL (which writes the header) — otherwise the
# value silently reverts to 0/NONE and only a full VACUUM could apply it.
if not db_existed and "auto_vacuum" in self._pragmas:
conn.execute(f"PRAGMA auto_vacuum = {self._pragmas['auto_vacuum']}")
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
self._apply_pragmas(conn, exclude={"page_size", "auto_vacuum"})
return conn
def _apply_pragmas(
self, conn: sqlite3.Connection, exclude: set[str] | None = None
) -> None:
"""Apply the user-supplied PRAGMAs to *conn*, skipping *exclude*.
SQLite silently ignores unknown or inapplicable pragmas, so a bad value
degrades gracefully (e.g. mmap unsupported) rather than crashing startup.
"""
skip = exclude or set()
for key, value in self._pragmas.items():
if key in skip:
continue
conn.execute(f"PRAGMA {key} = {value}")
def _ensure_meta_tables(self) -> None:
self._conn.executescript("""
CREATE TABLE IF NOT EXISTS _sqlmem_meta (
@@ -337,6 +390,27 @@ class CacheManager:
self._conn.commit()
logger.debug(f"Index {idx.name!r} ready on {table} ({cols})")
def _row_coercer(self, table: str, columns: list[str]):
"""Return a per-row coercer for *columns* in source order.
Columns registered in ``datetime_columns`` for *table* are coerced to
INTEGER µs-since-epoch (``to_sqlite_datetime``); everything else keeps the
default stringifying coercion (``to_sqlite``). With no datetime columns it
is the plain :func:`coerce_row`, so the common path is unchanged.
"""
dt_cols = set(self._datetime_columns.get(table, ()))
dt_idx = {i for i, c in enumerate(columns) if c in dt_cols}
if not dt_idx:
return coerce_row
def coerce(row: tuple) -> tuple:
return tuple(
to_sqlite_datetime(v) if i in dt_idx else to_sqlite(v)
for i, v in enumerate(row)
)
return coerce
def load_table(
self,
table: str,
@@ -353,7 +427,11 @@ class CacheManager:
connection lock is only held for the brief per-batch inserts and the swap.
"""
src_cols = ", ".join(quote_source(c, self._dialect) for c in columns)
col_defs = ", ".join(f"{quote(c)} TEXT" for c in columns)
dt_cols = set(self._datetime_columns.get(table, ()))
col_defs = ", ".join(
f"{quote(c)} {'INTEGER' if c in dt_cols else 'TEXT'}" for c in columns
)
coerce = self._row_coercer(table, columns)
placeholders = ", ".join("?" * len(columns))
staging = f"{table}__sqlmem_load"
q_staging = quote(staging)
@@ -377,7 +455,7 @@ class CacheManager:
batch = cursor.fetchmany(self._fetch_batch) # network outside _lock
if not batch:
break
clean = [coerce_row(row) for row in batch]
clean = [coerce(row) for row in batch]
with self._lock:
self._conn.executemany(insert_sql, clean)
self._conn.commit()
@@ -468,9 +546,11 @@ class CacheManager:
row = self._conn.execute(
"SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone()
# Stored in a TEXT column: an INTEGER-µs watermark (datetime_columns) comes
# back as its digit string; delta._bind_watermark reconstructs the datetime.
return row[0] if row else None
def set_last_synced_at(self, table: str, value: str | None) -> None:
def set_last_synced_at(self, table: str, value: str | int | None) -> None:
with self._lock:
self._conn.execute(
"UPDATE _sqlmem_tables SET last_synced_at = ? WHERE table_name = ?",
@@ -478,8 +558,11 @@ class CacheManager:
)
self._conn.commit()
def max_value(self, table: str, column: str) -> str | None:
"""Maximum value of *column* across cached rows (the delta watermark)."""
def max_value(self, table: str, column: str) -> str | int | None:
"""Maximum value of *column* across cached rows (the delta watermark).
Returns an ``int`` for a datetime column stored as INTEGER µs, else the
ISO ``TEXT`` string."""
row = self._conn.execute(
f"SELECT MAX({quote(column)}) FROM {quote(table)}"
).fetchone()
@@ -489,7 +572,8 @@ class CacheManager:
"""Insert-or-replace one batch of *rows* by the table's unique key."""
col_list = quote_list(columns)
placeholders = ", ".join("?" * len(columns))
clean_rows = [coerce_row(row) for row in rows]
coerce = self._row_coercer(table, columns)
clean_rows = [coerce(row) for row in rows]
with self._lock:
self._conn.executemany(
f"INSERT OR REPLACE INTO {quote(table)} ({col_list}) VALUES ({placeholders})",
@@ -536,6 +620,72 @@ class CacheManager:
except sqlite3.Error as e:
logger.error(f"Failed to VACUUM cache file {self._db_path}: {e}")
def hard_reset(self) -> None:
"""Delete the on-disk cache file and reopen it from scratch.
Unlike :meth:`reset` (which drops tables but keeps the open file, so the
baked-in ``page_size``/``auto_vacuum`` cannot change), this closes every
connection, removes the file plus its WAL/SHM sidecars, and reopens with
all current pragmas applied — so layout pragmas take effect on the fresh
file. Disk mode only; in memory mode it falls back to :meth:`reset`.
Any read in flight on another thread will see its connection closed from
under it; treat this as a maintenance operation.
"""
if self._in_memory:
self.reset()
return
logger.info(f"Hard reset: closing connections and deleting {self._db_path}")
with self._lock:
for conn in self._read_conns:
try:
conn.close()
except sqlite3.Error:
pass
self._read_conns.clear()
self._read_local = threading.local() # force every thread to reopen
self._conn.close()
for suffix in ("", "-wal", "-shm"):
p = Path(str(self._db_path) + suffix)
if p.exists():
p.unlink()
# Reopen fresh — page_size/auto_vacuum apply to the new empty file.
self._conn = self._open_disk_connection(db_existed=False)
self._ensure_meta_tables()
self._states.clear()
self._errors.clear()
self._last_run.clear()
self._error_total = 0
logger.info(f"Hard reset complete — cache recreated at {self._db_path}.")
def vacuum(self, incremental: bool = True, pages: int = 10_000) -> None:
"""Run maintenance VACUUM on the on-disk cache (no-op in memory mode).
``incremental=True`` (default) reclaims up to *pages* free pages without
blocking readers or needing extra disk space — but requires the cache to
have been created with ``auto_vacuum=INCREMENTAL`` (otherwise it is a
no-op). ``incremental=False`` runs a full ``VACUUM``: it rewrites the
whole file (needs ~2× disk space, blocks readers) — use only in a
maintenance window.
"""
if self._in_memory:
logger.debug("vacuum() called in memory mode — no-op.")
return
if incremental:
with self._lock:
self._conn.execute(f"PRAGMA incremental_vacuum({pages})")
self._conn.commit()
logger.info(f"Incremental vacuum: reclaimed up to {pages} pages.")
else:
logger.info("Full VACUUM started — this may take several minutes.")
with self._lock:
self._conn.execute("VACUUM")
logger.info("Full VACUUM complete.")
def close(self) -> None:
self._backup_to_disk()
self._closed = True
+18 -5
View File
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timedelta, timezone
from typing import Any
from loguru import logger
@@ -8,8 +8,10 @@ from ._sql import quote_source
from .cache import CacheManager
from .stats import TableState
_EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc)
def _bind_watermark(watermark: str) -> datetime | str:
def _bind_watermark(watermark: str | int, epoch_us: bool = False) -> datetime | str:
"""Bind the delta watermark back to the source in its native type.
The cache stores the change column as an ISO ``TEXT`` string (see
@@ -22,11 +24,21 @@ def _bind_watermark(watermark: str) -> datetime | str:
driver send a typed timestamp, so the comparison happens natively with no
string conversion. Non-datetime change columns (e.g. an integer rowversion)
don't parse and are passed through unchanged.
When the change column is stored as INTEGER µs-since-epoch (``datetime_columns``)
*epoch_us* is set: the watermark is a microsecond count (an ``int`` or its digit
string, since it round-trips through a TEXT column) and is reconstructed into a
UTC :class:`~datetime.datetime` so the source still receives a typed timestamp.
"""
if epoch_us:
try:
return datetime.fromisoformat(watermark)
return _EPOCH + timedelta(microseconds=int(watermark))
except (TypeError, ValueError):
return watermark
return watermark if isinstance(watermark, str) else str(watermark)
try:
return datetime.fromisoformat(watermark) # type: ignore[arg-type]
except (TypeError, ValueError):
return watermark if isinstance(watermark, str) else str(watermark)
@dataclass(frozen=True)
@@ -92,9 +104,10 @@ class DeltaRefresher:
cursor = source_conn.execute(f"SELECT {col_list} FROM {q_table}")
else:
change_col = quote_source(cfg.change_column, dialect)
epoch_us = cfg.change_column in self._cache._datetime_columns.get(table, ())
cursor = source_conn.execute(
f"SELECT {col_list} FROM {q_table} WHERE {change_col} >= ?",
(_bind_watermark(watermark),),
(_bind_watermark(watermark, epoch_us),),
)
# Stream the delta in batches so a large catch-up never materializes at once.
+26
View File
@@ -65,6 +65,8 @@ class CachingEngine:
refresh_interval: int | None = None,
fetch_batch: int | None = None,
dialect: str | None = None,
pragmas: dict[str, str | int] | None = None,
datetime_columns: dict[str, list[str]] | None = None,
blocking_startup_refresh: bool = False,
) -> None:
self._source_engine = source_engine
@@ -79,6 +81,8 @@ class CachingEngine:
in_memory=use_memory,
dialect=self._dialect,
fetch_batch=fetch_batch if fetch_batch is not None else FETCH_BATCH_SIZE,
pragmas=pragmas,
datetime_columns=datetime_columns,
)
self._registry = ColumnRegistry(self._cache.connection)
self._stats = StatsCollector()
@@ -267,6 +271,28 @@ class CachingEngine:
self._cache.reset()
logger.info("Cache reset — all tables will be reloaded on next use.")
def hard_reset(self) -> None:
"""Delete the on-disk cache file and reopen with current pragmas/page_size.
Disk mode only (falls back to :meth:`reset` in memory mode). Use when a
layout pragma — ``page_size`` or ``auto_vacuum`` — must change, since
those are baked into the file at creation and :meth:`reset` keeps it.
All tables reload on next use.
"""
self._cache.hard_reset()
# hard_reset swaps the cache connection — re-point the registry at it.
self._registry.rebind(self._cache.connection)
logger.info("Cache hard reset — file recreated; all tables reload on next use.")
def vacuum(self, incremental: bool = True, pages: int = 10_000) -> None:
"""Run maintenance VACUUM on the on-disk cache (incremental by default).
Incremental reclaims free pages left by delta ``INSERT OR REPLACE`` churn
cheaply (requires ``auto_vacuum=INCREMENTAL``); a full VACUUM rewrites the
whole file and should run only in a maintenance window.
"""
self._cache.vacuum(incremental=incremental, pages=pages)
def close(self) -> None:
self._cache.close()
logger.info("CachingEngine closed.")
+10
View File
@@ -12,6 +12,16 @@ class ColumnRegistry:
self._lock = Lock()
self._ensure_table()
def rebind(self, mem_conn: sqlite3.Connection) -> None:
"""Point the registry at a new cache connection (after a hard reset).
``CacheManager.hard_reset`` closes and reopens the cache connection, so the
connection object the registry captured at construction becomes invalid.
"""
with self._lock:
self._conn = mem_conn
self._ensure_table()
def _ensure_table(self) -> None:
self._conn.execute("""
CREATE TABLE IF NOT EXISTS _sqlmem_columns (
+120
View File
@@ -168,3 +168,123 @@ def test_disk_mode_reset_keeps_file(tmp_path, source_conn):
assert db_path.exists()
assert c.is_table_cached("users") is False
c.close()
# ---------------------------------------------------------------------------
# Pragmas / layout tuning (1.11.0)
# ---------------------------------------------------------------------------
def test_pragmas_applied_on_fresh_disk_cache(tmp_path):
"""page_size, auto_vacuum and a generic pragma all take effect on a new file."""
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
in_memory=False,
pragmas={"page_size": 8192, "auto_vacuum": "INCREMENTAL", "cache_size": -2000},
)
assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 8192
assert c.connection.execute("PRAGMA auto_vacuum").fetchone()[0] == 2 # INCREMENTAL
assert c.connection.execute("PRAGMA cache_size").fetchone()[0] == -2000
c.close()
def test_page_size_ignored_on_existing_file_warns(tmp_path):
"""A page_size that differs from the existing file is ignored, with a warning."""
db_path = tmp_path / "cache.db"
c1 = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False)
assert c1.connection.execute("PRAGMA page_size").fetchone()[0] == 4096 # default
c1.close()
c2 = CacheManager(
db_path=db_path,
backup_interval=9999,
in_memory=False,
pragmas={"page_size": 16384},
)
# File keeps its original page size; the request is ignored (not an error).
assert c2.connection.execute("PRAGMA page_size").fetchone()[0] == 4096
c2.close()
def test_unknown_pragma_does_not_crash(tmp_path):
"""SQLite ignores unknown/inapplicable pragmas — startup must not fail."""
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
in_memory=False,
pragmas={"this_is_not_a_pragma": 1, "mmap_size": 1024 * 1024},
)
assert c.connection.execute("PRAGMA mmap_size").fetchone()[0] == 1024 * 1024
c.close()
# ---------------------------------------------------------------------------
# hard_reset / vacuum (1.11.0)
# ---------------------------------------------------------------------------
def test_hard_reset_recreates_file_and_clears_tables(tmp_path, source_conn):
db_path = tmp_path / "cache.db"
c = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False)
c.load_table("users", ["name"], source_conn)
assert c.is_table_cached("users") is True
c.hard_reset()
assert db_path.exists() # reopened fresh
assert c.is_table_cached("users") is False
# The connection is usable again after the swap.
c.load_table("users", ["name"], source_conn)
assert c.is_table_cached("users") is True
c.close()
def test_hard_reset_applies_new_page_size(tmp_path, source_conn):
"""page_size can't change via reset() but does via hard_reset() (fresh file)."""
db_path = tmp_path / "cache.db"
# Existing file at the default 4096; request 8192 — ignored on open.
CacheManager(db_path=db_path, backup_interval=9999, in_memory=False).close()
c = CacheManager(
db_path=db_path,
backup_interval=9999,
in_memory=False,
pragmas={"page_size": 8192},
)
assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 4096
c.hard_reset() # deletes the file → recreated with the requested page size
assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 8192
c.close()
def test_hard_reset_in_memory_falls_back_to_reset(tmp_path, source_conn):
c = CacheManager(db_path=tmp_path / "cache.db", backup_interval=9999)
c.load_table("users", ["name"], source_conn)
c.hard_reset() # memory mode → reset()
assert c.is_table_cached("users") is False
c.close()
def test_full_vacuum_runs_on_disk(tmp_path, source_conn):
db_path = tmp_path / "cache.db"
c = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False)
c.load_table("users", ["name"], source_conn)
c.vacuum(incremental=False) # must not raise
assert c.is_table_cached("users") is True
c.close()
def test_incremental_vacuum_runs_with_auto_vacuum(tmp_path, source_conn):
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
in_memory=False,
pragmas={"auto_vacuum": "INCREMENTAL"},
)
c.load_table("users", ["name"], source_conn)
c.vacuum(incremental=True, pages=100) # must not raise
assert c.is_table_cached("users") is True
c.close()
def test_vacuum_in_memory_is_noop(cache, source_conn):
cache.load_table("users", ["name"], source_conn)
cache.vacuum(incremental=False) # no-op, no error
assert cache.is_table_cached("users") is True
+75 -1
View File
@@ -4,7 +4,7 @@ import uuid
import pytest
from sqlmem._coerce import coerce_params, to_sqlite
from sqlmem._coerce import coerce_params, to_sqlite, to_sqlite_datetime
from sqlmem.cache import CacheManager
@@ -91,6 +91,80 @@ def test_coerce_params_none():
assert coerce_params(None) is None
# --- to_sqlite_datetime (INTEGER µs storage, 1.12.0) ------------------------
def test_datetime_to_epoch_micros():
# 2026-06-01T10:00:00Z -> microseconds since epoch
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc)
expected = int(dt.timestamp() * 1_000_000)
assert to_sqlite_datetime(dt) == expected
def test_datetime_naive_treated_as_utc():
naive = datetime.datetime(2026, 6, 1, 10, 0, 0)
aware = naive.replace(tzinfo=datetime.timezone.utc)
assert to_sqlite_datetime(naive) == to_sqlite_datetime(aware)
def test_datetime_micros_are_exact():
dt = datetime.datetime(2026, 6, 5, 14, 54, 24, 823000, tzinfo=datetime.timezone.utc)
us = to_sqlite_datetime(dt)
# round-trips back to the same instant with no rounding loss
back = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(
microseconds=us
)
assert back == dt
def test_datetime_none_passes_through():
assert to_sqlite_datetime(None) is None
def test_datetime_iso_string_parsed():
assert to_sqlite_datetime("2026-06-01T10:00:00+00:00") == to_sqlite_datetime(
datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc)
)
def test_datetime_unparseable_is_none():
assert to_sqlite_datetime("not a date") is None
# --- integration: datetime_columns are stored as INTEGER --------------------
def test_datetime_column_stored_as_integer(tmp_path):
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
datetime_columns={"t": ["changed"]},
)
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc)
c.load_table("t", ["id", "changed"], FakeSource([("1", dt)]))
# Column declared INTEGER, value stored as µs-since-epoch.
coltype = c.connection.execute("PRAGMA table_info(t)").fetchall()
types = {row[1]: row[2] for row in coltype}
assert types["changed"] == "INTEGER"
assert types["id"] == "TEXT"
_, out = c.execute_in_memory("SELECT changed FROM t")
assert out == [(to_sqlite_datetime(dt),)]
c.close()
def test_non_datetime_columns_unaffected_by_datetime_columns(tmp_path):
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
datetime_columns={"t": ["changed"]},
)
c.load_table("t", ["id", "price"], FakeSource([("1", decimal.Decimal("9.99"))]))
_, out = c.execute_in_memory("SELECT id, price FROM t")
assert out == [("1", "9.99")] # still TEXT/ISO coercion
c.close()
# --- integration: values reach the cache through coercion -------------------
+53 -1
View File
@@ -1,6 +1,6 @@
import sqlite3
import threading
from datetime import datetime
from datetime import datetime, timezone
from types import SimpleNamespace
import pytest
@@ -140,6 +140,18 @@ def test_bind_watermark_passes_through_non_datetime():
assert _bind_watermark("12345") == "12345"
# --- INTEGER µs watermark binding (datetime_columns, 1.12.0) ----------------
def test_bind_watermark_epoch_us_reconstructs_datetime():
dt = datetime(2026, 6, 5, 14, 54, 24, 823000, tzinfo=timezone.utc)
us = int(dt.timestamp() * 1_000_000)
# Whether the watermark is an int or its digit string (it round-trips through
# the TEXT last_synced_at column), it binds back to the same UTC datetime.
assert _bind_watermark(us, epoch_us=True) == dt
assert _bind_watermark(str(us), epoch_us=True) == dt
class _SpyCursor:
def __init__(self, rows):
self._rows = list(rows)
@@ -174,6 +186,46 @@ def test_refresh_binds_watermark_as_datetime(env):
assert params == (datetime(2026, 6, 5, 14, 54, 24, 823000),)
class _RowSource:
"""Returns fixed rows for any query (for loading datetime-typed source data)."""
def __init__(self, rows):
self._rows = rows
def execute(self, sql, params=()):
return _SpyCursor(self._rows)
def test_datetime_column_watermark_stored_as_int_and_bound_back(tmp_path):
"""A change column declared in datetime_columns is stored as INTEGER µs; the
watermark is bound back to a real datetime for the source query."""
cache = CacheManager(
db_path=tmp_path / "c.db",
backup_interval=9999,
datetime_columns={"products": ["changed"]},
)
dt1 = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc)
dt2 = datetime(2026, 6, 1, 10, 5, 0, tzinfo=timezone.utc)
cache.load_table("products", ["id", "changed"], _RowSource([("1", dt1), ("2", dt2)]))
cache.create_unique_index("products", ["id"])
cache.set_last_synced_at("products", cache.max_value("products", "changed"))
# Watermark persisted as the max INTEGER µs (digit string out of the TEXT col).
wm = cache.get_last_synced_at("products")
assert wm == str(int(dt2.timestamp() * 1_000_000))
refresher = DeltaRefresher(
cache, {"products": ResolvedDelta("changed", ["id"])}
)
spy = _SpySource(rows=[]) # no new rows — just capture the bound watermark
refresher.refresh(spy)
assert spy.bound, "source query was never issued"
_, params = spy.bound[-1]
assert params == (dt2,) # bound back as datetime, not an int/string
cache.close()
# ---------------------------------------------------------------------------
# Refresh failures are recorded (4.3) so a stuck delta is visible in stats
# ---------------------------------------------------------------------------
+36
View File
@@ -385,3 +385,39 @@ def test_two_engines_separate_cache_files(source_engine, tmp_path):
assert b._cache.is_table_cached("products") is False # independent cache
a.close()
b.close()
# ---------------------------------------------------------------------------
# Pragmas / hard_reset / vacuum (1.11.0)
# ---------------------------------------------------------------------------
def test_engine_passes_pragmas_to_cache(source_engine, tmp_path):
ce = CachingEngine(
source_engine,
cache_db_path=tmp_path / "cache.db",
in_memory=False,
pragmas={"page_size": 8192, "auto_vacuum": "INCREMENTAL"},
)
assert ce._cache.connection.execute("PRAGMA page_size").fetchone()[0] == 8192
assert ce._cache.connection.execute("PRAGMA auto_vacuum").fetchone()[0] == 2
ce.close()
def test_engine_hard_reset_reloads(source_engine, tmp_path):
ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False)
ce.execute("SELECT id FROM products")
assert ce._cache.is_table_cached("products") is True
ce.hard_reset()
assert ce._cache.is_table_cached("products") is False
rows = ce.execute("SELECT id, name FROM products") # reloads on next use
assert len(rows) == 3
ce.close()
def test_engine_vacuum_runs(source_engine, tmp_path):
ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False)
ce.execute("SELECT id FROM products")
ce.vacuum(incremental=False) # must not raise
assert ce._cache.is_table_cached("products") is True
ce.close()