Compare commits

...

2 Commits

Author SHA1 Message Date
Jan Doubravský 8e46ee3547 Store named datetime columns as INTEGER microseconds (datetime_columns) 2026-06-09 18:18:38 +02:00
Jan Doubravský a21b5a2a04 Add pragmas, hard_reset, and vacuum for tuning disk-backed caches 2026-06-09 17:58:41 +02:00
14 changed files with 611 additions and 25 deletions
+2 -5
View File
@@ -1,8 +1,6 @@
# Python # Python
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*.pyo
*.pyd
*.egg *.egg
*.egg-info/ *.egg-info/
dist/ dist/
@@ -40,12 +38,11 @@ Thumbs.db
.env.* .env.*
# sqlmem cache (incl. WAL sidecars from disk-backed mode) # sqlmem cache (incl. WAL sidecars from disk-backed mode)
cache.db cache.db*
cache.db-wal
cache.db-shm
# Agents # Agents
AGENTS.md AGENTS.md
CLAUDE.md CLAUDE.md
DESIGN_DOCUMENT_MODULE.md DESIGN_DOCUMENT_MODULE.md
.claude/ .claude/
handover.md
+33
View File
@@ -6,6 +6,39 @@ All notable changes to this project will be documented in this file.
--- ---
## [1.12.0] - 2026-06-09
### ⚠️ Breaking
- **`SCHEMA_VERSION` bumped `3``4`** — on upgrade the existing cache is wiped automatically (disk mode wipes the file in place, in-memory discards the backup) and reloaded from the source on next use. For a large cache (e.g. a multi-hundred-million-row table) the full reload can take a while; deploy in a maintenance window.
- **`datetime_columns` change the public output contract for the chosen columns** — a column listed in `datetime_columns` is stored and returned as an **INTEGER (microseconds since the Unix epoch, UTC)**, not an ISO `TEXT` string. This is opt-in per column, so no table is affected unless you name its columns; consumers that read or filter such a column must adapt (compare against integer µs, or convert on read).
### Added
- **`datetime_columns=` parameter on `CachingEngine` / `CacheManager`** — `datetime_columns={"VW_X": ["CHANGE_DATE"]}` stores the named datetime columns as INTEGER µs-since-epoch instead of ~28-byte ISO `TEXT`. Saves ~20 bytes per row and makes index comparisons on the column operate on native integers instead of string collation — worthwhile for a pure datetime column on a very large table (e.g. a delta change column that is also range-scanned).
- `_coerce.to_sqlite_datetime()` converts datetimes (and ISO/`date` values) to exact integer microseconds via integer arithmetic (no float rounding); a naive datetime is treated as UTC, `None` passes through.
- `load_table` declares those columns `INTEGER` and `upsert_rows` coerces them the same way, so full loads and delta upserts agree on the on-disk representation.
- The delta high-watermark for such a column is the stored integer; `delta._bind_watermark(..., epoch_us=True)` reconstructs a real UTC `datetime` before binding, so the source still receives a typed timestamp (and the watermark fix from 1.8.0 keeps holding).
### Changed
- `pyproject.toml` — bumped version to `1.12.0`.
- `CacheManager.max_value` / `set_last_synced_at` now accept/return `int` watermarks alongside `str` (the INTEGER-µs watermark round-trips through the `last_synced_at` TEXT column as its digit string).
---
## [1.11.0] - 2026-06-09
### Added
- **`pragmas=` parameter on `CachingEngine` / `CacheManager`** — pass a dict of SQLite PRAGMAs (e.g. `mmap_size`, `cache_size`, `temp_store`, `page_size`, `auto_vacuum`) applied to the cache connection at open time, so disk-backed caches can be tuned for the host's I/O profile without bypassing `CacheManager`. Unknown/inapplicable pragmas are silently ignored by SQLite (graceful degradation, no startup crash).
- **`page_size`** is a layout pragma: it is applied only on a *fresh* file (set before WAL / the first table). On an existing cache with a different page size the request is ignored and a one-time warning is logged — the new value takes effect only after `hard_reset()` or a rebuild.
- **`auto_vacuum`** is set before the database header is materialized (before switching to WAL) on a fresh file, so `INCREMENTAL`/`FULL` actually stick instead of silently reverting to `NONE`.
- **`CachingEngine.hard_reset()` / `CacheManager.hard_reset()`** — close every connection, delete the on-disk cache file (and its `-wal`/`-shm` sidecars) and reopen from scratch with all current pragmas applied. Unlike `reset()` (which drops tables but keeps the open file), this lets `page_size`/`auto_vacuum` change, since those are baked into the file at creation. Disk mode only — falls back to `reset()` in memory mode. All tables reload on next use.
- **`CachingEngine.vacuum(incremental=True, pages=10_000)` / `CacheManager.vacuum(...)`** — run maintenance VACUUM on the on-disk cache to reclaim free pages left by delta `INSERT OR REPLACE` churn. Incremental (default) reclaims up to `pages` pages without blocking readers or extra disk (requires `auto_vacuum=INCREMENTAL`); `incremental=False` runs a full VACUUM (rewrites the file, ~2× disk, blocks readers — maintenance window only). No-op in memory mode.
### Changed
- `pyproject.toml` — bumped version to `1.11.0`.
- `ColumnRegistry` gained `rebind()` so it follows the cache connection swap performed by `hard_reset()` (the registry previously captured the connection for the process lifetime).
---
## [1.10.0] - 2026-06-09 ## [1.10.0] - 2026-06-09
### Added ### Added
+46
View File
@@ -263,17 +263,61 @@ engine = CachingEngine(base_engine, in_memory=False)
The constructor argument wins over the env var; when `in_memory` is omitted it falls back to `SQLMEM_IN_MEMORY`. The constructor argument wins over the env var; when `in_memory` is omitted it falls back to `SQLMEM_IN_MEMORY`.
#### Tuning the SQLite layer (`pragmas=`)
For a large disk-backed cache, pass SQLite PRAGMAs to tune the read path and on-disk layout without bypassing SQLmem:
```python
engine = CachingEngine(
base_engine,
in_memory=False,
pragmas={
"mmap_size": 32 * 1024**3, # map the DB into the address space (32 GB)
"cache_size": -262144, # 256 MB page cache (negative = KiB)
"temp_store": 2, # ORDER BY / GROUP BY scratch in RAM
"page_size": 8192, # larger pages → fewer reads on range scans
"auto_vacuum": "INCREMENTAL",# reclaim free pages with vacuum() (see below)
},
)
```
- Every entry is applied as `PRAGMA <key> = <value>` when the cache connection opens. **Unknown or inapplicable pragmas are silently ignored** by SQLite, so a bad value degrades gracefully instead of crashing startup.
- **`page_size` and `auto_vacuum` are layout pragmas** — they only take effect on a *fresh* file (set before the first table). On an existing cache, `page_size` is ignored with a one-time warning; use [`hard_reset()`](#manual-cache-control) to rebuild the file with the new value.
#### INTEGER datetime columns (`datetime_columns=`)
A pure datetime column stored as an ISO `TEXT` string costs ~28 bytes per row and compares by string collation. For a large table you can store named datetime columns as **INTEGER microseconds since the Unix epoch** instead — 8 bytes, native integer comparison:
```python
engine = CachingEngine(
base_engine,
delta={"VW_P_PRATVALUES": DeltaConfig("CHANGE_DATE", ["PRATVALUE_ID"])},
datetime_columns={"VW_P_PRATVALUES": ["CHANGE_DATE"]},
)
```
- **Opt-in per column.** Only the columns you name change; everything else keeps the default lossless `TEXT` storage.
- ⚠️ **It changes the output contract for those columns**`execute()` returns them as `int` (µs since epoch), not ISO strings, and a `WHERE` on them must compare against integer µs. Don't list a column your callers read as a string.
- The delta watermark is handled transparently: it is persisted as the integer and bound back to a real `datetime` for the source query, so incremental refresh keeps working.
- ⚠️ This is a **breaking on-disk change** (`SCHEMA_VERSION` 4): an existing cache is wiped and reloaded on first start after enabling it — schedule a maintenance window for a large reload.
## Manual cache control ## Manual cache control
```python ```python
engine.invalidate("orders") # drop one table from cache; next query re-fetches it from DB engine.invalidate("orders") # drop one table from cache; next query re-fetches it from DB
engine.reset() # wipe the whole cache (RAM + cache.db) — full clean slate engine.reset() # wipe the whole cache (RAM + cache.db) — full clean slate
engine.hard_reset() # disk mode: delete the file and reopen with current pragmas/page_size
engine.vacuum() # disk mode: incremental VACUUM (reclaim free pages from delta churn)
engine.refresh() # pull deltas for all delta-tracked tables now engine.refresh() # pull deltas for all delta-tracked tables now
engine.close() # flush to disk and shut down background thread engine.close() # flush to disk and shut down background thread
``` ```
Use `reset()` after a **structural change** in the source (columns added/removed, values cleared in bulk without bumping the change timestamp) so the cache rebuilds from scratch. `invalidate(table)` is the targeted version for a single table. Use `reset()` after a **structural change** in the source (columns added/removed, values cleared in bulk without bumping the change timestamp) so the cache rebuilds from scratch. `invalidate(table)` is the targeted version for a single table.
`hard_reset()` goes further than `reset()` in disk mode: it closes every connection, deletes `cache.db` (and its `-wal`/`-shm` sidecars) and reopens from scratch — the only way to change a baked-in `page_size`/`auto_vacuum`. In memory mode it falls back to `reset()`.
`vacuum()` reclaims free pages left behind by delta `INSERT OR REPLACE` churn. Incremental (the default) is cheap and non-blocking but needs `auto_vacuum=INCREMENTAL`; `vacuum(incremental=False)` runs a full VACUUM that rewrites the file (~2× disk, blocks readers) — schedule it in a maintenance window. Both are no-ops in memory mode.
## Runtime statistics ## Runtime statistics
```python ```python
@@ -346,6 +390,8 @@ engine = CachingEngine(
refresh_interval=300, # SQLMEM_REFRESH_INTERVAL refresh_interval=300, # SQLMEM_REFRESH_INTERVAL
fetch_batch=10000, # SQLMEM_FETCH_BATCH fetch_batch=10000, # SQLMEM_FETCH_BATCH
dialect="tsql", # SQLMEM_SQL_DIALECT dialect="tsql", # SQLMEM_SQL_DIALECT
pragmas={"mmap_size": 32 * 1024**3, "page_size": 8192}, # disk-mode SQLite tuning
datetime_columns={"orders": ["created_at"]}, # store these as INTEGER µs (opt-in)
blocking_startup_refresh=False, # block startup until caught up? (default: no) blocking_startup_refresh=False, # block startup until caught up? (default: no)
) )
``` ```
+4
View File
@@ -215,6 +215,10 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o
- [x] **Quoting identifikátorů**: názvy tabulek/sloupců se kvótují (SQLite `"x"` pro cache, dialekt zdroje — T-SQL `[x]` — pro source), takže rezervovaná slova i mezery fungují. - [x] **Quoting identifikátorů**: názvy tabulek/sloupců se kvótují (SQLite `"x"` pro cache, dialekt zdroje — T-SQL `[x]` — pro source), takže rezervovaná slova i mezery fungují.
- [x] **Lazy source connection**: `execute()` neotevírá spojení ke zdroji při cache hitu (neobsazuje pool slot). - [x] **Lazy source connection**: `execute()` neotevírá spojení ke zdroji při cache hitu (neobsazuje pool slot).
- [x] **Idempotentní `add_sink`**: opakované volání pro stejný sink je no-op (žádné duplicitní logy). - [x] **Idempotentní `add_sink`**: opakované volání pro stejný sink je no-op (žádné duplicitní logy).
- [x] **Ladění SQLite vrstvy (`pragmas=`)**: `CachingEngine(..., pragmas={...})` aplikuje libovolné PRAGMA na cache spojení (`mmap_size`, `cache_size`, `temp_store`, `page_size`, `auto_vacuum`). `page_size` a `auto_vacuum` jsou layout-pragmata — platí jen na čerstvém souboru (page_size na existující cache se ignoruje s warningem). Neznámá pragmata SQLite tiše ignoruje.
- [x] **`hard_reset()`**: smaže on-disk soubor (+ WAL/SHM) a otevře nový s aktuálními pragmaty — na rozdíl od `reset()` umožní změnit `page_size`/`auto_vacuum`. Jen disk mód (v memory módu fallback na `reset()`).
- [x] **`vacuum(incremental=, pages=)`**: údržbový VACUUM cache souboru — inkrementální (uvolní volné stránky po delta `INSERT OR REPLACE`, vyžaduje `auto_vacuum=INCREMENTAL`) nebo plný (přepíše soubor, jen v maintenance okně). V memory módu no-op.
- [x] **Nativní INTEGER ukládání datetime sloupců (`datetime_columns=`)**: `datetime_columns={"VW_X": ["CHANGE_DATE"]}` — vyjmenované datetime sloupce se ukládají jako INTEGER µs-od-epochy místo ~28 B ISO TEXT (úspora místa + nativní celočíselné porovnání indexu). Opt-in per sloupec → mění výstupní kontrakt jen u zvolených sloupců (vrací int, ne ISO string). Breaking: `SCHEMA_VERSION` 3→4, cache se při upgrade smaže a načte znovu. Watermark se persistuje jako int a `_bind_watermark(epoch_us=True)` ho rekonstruuje zpět na `datetime` pro zdroj.
## TODO — budoucí funkce ## TODO — budoucí funkce
+1 -1
View File
@@ -1,6 +1,6 @@
[project] [project]
name = "sqlmem" name = "sqlmem"
version = "1.10.0" version = "1.12.0"
description = "" description = ""
authors = [ authors = [
{name = "jan.doubravsky@gmail.com"} {name = "jan.doubravsky@gmail.com"}
+25
View File
@@ -15,6 +15,8 @@ from typing import Any
Params = tuple | list | dict | None Params = tuple | list | dict | None
_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
def to_sqlite(value: Any) -> Any: def to_sqlite(value: Any) -> Any:
if isinstance(value, decimal.Decimal): if isinstance(value, decimal.Decimal):
@@ -28,6 +30,29 @@ def to_sqlite(value: Any) -> Any:
return value return value
def to_sqlite_datetime(value: Any) -> int | None:
"""Store a datetime as INTEGER microseconds since the Unix epoch (UTC).
Used for columns the caller marks via ``datetime_columns``: 8 bytes as an
INTEGER instead of a ~28-byte ISO ``TEXT`` string, and integer comparison on
the change column instead of string collation. ``None`` passes through; a
naive datetime is treated as UTC. A non-datetime value is parsed from its ISO
string form (so ``date``/ISO-``str`` inputs work too); anything unparseable
becomes ``None``.
"""
if value is None:
return None
if isinstance(value, datetime.datetime):
if value.tzinfo is None:
value = value.replace(tzinfo=datetime.timezone.utc)
delta = value - _EPOCH # exact integer arithmetic (no float rounding)
return delta.days * 86_400_000_000 + delta.seconds * 1_000_000 + delta.microseconds
try:
return to_sqlite_datetime(datetime.datetime.fromisoformat(str(value)))
except (TypeError, ValueError):
return None
def coerce_row(row: tuple) -> tuple: def coerce_row(row: tuple) -> tuple:
return tuple(to_sqlite(v) for v in row) return tuple(to_sqlite(v) for v in row)
+161 -11
View File
@@ -9,12 +9,12 @@ from pathlib import Path
from loguru import logger from loguru import logger
import sqlmem._meta as _meta import sqlmem._meta as _meta
from ._coerce import coerce_params, coerce_row from ._coerce import coerce_params, coerce_row, to_sqlite, to_sqlite_datetime
from ._sql import quote, quote_list, quote_source from ._sql import quote, quote_list, quote_source
from .config import FETCH_BATCH_SIZE, SQL_DIALECT from .config import FETCH_BATCH_SIZE, SQL_DIALECT
from .stats import TableState from .stats import TableState
SCHEMA_VERSION = 3 SCHEMA_VERSION = 4
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -40,12 +40,17 @@ class CacheManager:
in_memory: bool = True, in_memory: bool = True,
dialect: str = SQL_DIALECT, dialect: str = SQL_DIALECT,
fetch_batch: int = FETCH_BATCH_SIZE, fetch_batch: int = FETCH_BATCH_SIZE,
pragmas: dict[str, str | int] | None = None,
datetime_columns: dict[str, list[str]] | None = None,
) -> None: ) -> None:
self._db_path = db_path self._db_path = db_path
self._backup_interval = backup_interval self._backup_interval = backup_interval
self._in_memory = in_memory self._in_memory = in_memory
self._dialect = dialect # source-DB dialect, for identifier quoting self._dialect = dialect # source-DB dialect, for identifier quoting
self._fetch_batch = fetch_batch # rows fetched per source batch self._fetch_batch = fetch_batch # rows fetched per source batch
self._pragmas = dict(pragmas or {}) # extra read/layout PRAGMAs (disk mode)
# table → columns stored as INTEGER µs-since-epoch instead of ISO TEXT
self._datetime_columns = {t: list(c) for t, c in (datetime_columns or {}).items()}
self._lock = threading.Lock() # serializes connection access self._lock = threading.Lock() # serializes connection access
self._load_lock = threading.Lock() # serializes full table loads self._load_lock = threading.Lock() # serializes full table loads
self._states: dict[str, str] = {} # table → live processing state self._states: dict[str, str] = {} # table → live processing state
@@ -59,12 +64,12 @@ class CacheManager:
if in_memory: if in_memory:
self._conn = sqlite3.connect(":memory:", check_same_thread=False) self._conn = sqlite3.connect(":memory:", check_same_thread=False)
self._apply_pragmas(self._conn)
else: else:
# Disk-backed: query the on-disk file directly — no RAM copy, every # Disk-backed: query the on-disk file directly — no RAM copy, every
# write persists immediately, and the cache can exceed available RAM. # write persists immediately, and the cache can exceed available RAM.
self._conn = sqlite3.connect(str(db_path), check_same_thread=False) db_existed = db_path.exists() and db_path.stat().st_size > 0
self._conn.execute("PRAGMA journal_mode=WAL") self._conn = self._open_disk_connection(db_existed)
self._conn.execute("PRAGMA synchronous=NORMAL")
self._discard_if_schema_mismatch() self._discard_if_schema_mismatch()
self._ensure_meta_tables() self._ensure_meta_tables()
@@ -83,6 +88,54 @@ class CacheManager:
def connection(self) -> sqlite3.Connection: def connection(self) -> sqlite3.Connection:
return self._conn return self._conn
def _open_disk_connection(self, db_existed: bool) -> sqlite3.Connection:
"""Open the on-disk cache connection with WAL + the configured pragmas.
``page_size`` and ``auto_vacuum`` are layout pragmas that only take
effect on a *fresh* file (before the first table exists), so they are
applied conditionally on ``db_existed``; everything else is applied
unconditionally. Used by both ``__init__`` and :meth:`hard_reset`.
"""
conn = sqlite3.connect(str(self._db_path), check_same_thread=False)
# page_size must be set before WAL/the first table on a brand-new file;
# on an existing file it is silently ignored until the next VACUUM.
if "page_size" in self._pragmas:
wanted = int(self._pragmas["page_size"])
if db_existed:
actual = conn.execute("PRAGMA page_size").fetchone()[0]
if actual != wanted:
logger.warning(
f"page_size={wanted} requested but the cache file already "
f"exists with page_size={actual}; the new value takes "
"effect only after the cache is wiped (hard_reset()) or "
"rebuilt from scratch."
)
else:
conn.execute(f"PRAGMA page_size = {wanted}")
# auto_vacuum must be set before the database header is materialized,
# i.e. before switching to WAL (which writes the header) — otherwise the
# value silently reverts to 0/NONE and only a full VACUUM could apply it.
if not db_existed and "auto_vacuum" in self._pragmas:
conn.execute(f"PRAGMA auto_vacuum = {self._pragmas['auto_vacuum']}")
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
self._apply_pragmas(conn, exclude={"page_size", "auto_vacuum"})
return conn
def _apply_pragmas(
self, conn: sqlite3.Connection, exclude: set[str] | None = None
) -> None:
"""Apply the user-supplied PRAGMAs to *conn*, skipping *exclude*.
SQLite silently ignores unknown or inapplicable pragmas, so a bad value
degrades gracefully (e.g. mmap unsupported) rather than crashing startup.
"""
skip = exclude or set()
for key, value in self._pragmas.items():
if key in skip:
continue
conn.execute(f"PRAGMA {key} = {value}")
def _ensure_meta_tables(self) -> None: def _ensure_meta_tables(self) -> None:
self._conn.executescript(""" self._conn.executescript("""
CREATE TABLE IF NOT EXISTS _sqlmem_meta ( CREATE TABLE IF NOT EXISTS _sqlmem_meta (
@@ -337,6 +390,27 @@ class CacheManager:
self._conn.commit() self._conn.commit()
logger.debug(f"Index {idx.name!r} ready on {table} ({cols})") logger.debug(f"Index {idx.name!r} ready on {table} ({cols})")
def _row_coercer(self, table: str, columns: list[str]):
"""Return a per-row coercer for *columns* in source order.
Columns registered in ``datetime_columns`` for *table* are coerced to
INTEGER µs-since-epoch (``to_sqlite_datetime``); everything else keeps the
default stringifying coercion (``to_sqlite``). With no datetime columns it
is the plain :func:`coerce_row`, so the common path is unchanged.
"""
dt_cols = set(self._datetime_columns.get(table, ()))
dt_idx = {i for i, c in enumerate(columns) if c in dt_cols}
if not dt_idx:
return coerce_row
def coerce(row: tuple) -> tuple:
return tuple(
to_sqlite_datetime(v) if i in dt_idx else to_sqlite(v)
for i, v in enumerate(row)
)
return coerce
def load_table( def load_table(
self, self,
table: str, table: str,
@@ -353,7 +427,11 @@ class CacheManager:
connection lock is only held for the brief per-batch inserts and the swap. connection lock is only held for the brief per-batch inserts and the swap.
""" """
src_cols = ", ".join(quote_source(c, self._dialect) for c in columns) src_cols = ", ".join(quote_source(c, self._dialect) for c in columns)
col_defs = ", ".join(f"{quote(c)} TEXT" for c in columns) dt_cols = set(self._datetime_columns.get(table, ()))
col_defs = ", ".join(
f"{quote(c)} {'INTEGER' if c in dt_cols else 'TEXT'}" for c in columns
)
coerce = self._row_coercer(table, columns)
placeholders = ", ".join("?" * len(columns)) placeholders = ", ".join("?" * len(columns))
staging = f"{table}__sqlmem_load" staging = f"{table}__sqlmem_load"
q_staging = quote(staging) q_staging = quote(staging)
@@ -377,7 +455,7 @@ class CacheManager:
batch = cursor.fetchmany(self._fetch_batch) # network outside _lock batch = cursor.fetchmany(self._fetch_batch) # network outside _lock
if not batch: if not batch:
break break
clean = [coerce_row(row) for row in batch] clean = [coerce(row) for row in batch]
with self._lock: with self._lock:
self._conn.executemany(insert_sql, clean) self._conn.executemany(insert_sql, clean)
self._conn.commit() self._conn.commit()
@@ -468,9 +546,11 @@ class CacheManager:
row = self._conn.execute( row = self._conn.execute(
"SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,) "SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
).fetchone() ).fetchone()
# Stored in a TEXT column: an INTEGER-µs watermark (datetime_columns) comes
# back as its digit string; delta._bind_watermark reconstructs the datetime.
return row[0] if row else None return row[0] if row else None
def set_last_synced_at(self, table: str, value: str | None) -> None: def set_last_synced_at(self, table: str, value: str | int | None) -> None:
with self._lock: with self._lock:
self._conn.execute( self._conn.execute(
"UPDATE _sqlmem_tables SET last_synced_at = ? WHERE table_name = ?", "UPDATE _sqlmem_tables SET last_synced_at = ? WHERE table_name = ?",
@@ -478,8 +558,11 @@ class CacheManager:
) )
self._conn.commit() self._conn.commit()
def max_value(self, table: str, column: str) -> str | None: def max_value(self, table: str, column: str) -> str | int | None:
"""Maximum value of *column* across cached rows (the delta watermark).""" """Maximum value of *column* across cached rows (the delta watermark).
Returns an ``int`` for a datetime column stored as INTEGER µs, else the
ISO ``TEXT`` string."""
row = self._conn.execute( row = self._conn.execute(
f"SELECT MAX({quote(column)}) FROM {quote(table)}" f"SELECT MAX({quote(column)}) FROM {quote(table)}"
).fetchone() ).fetchone()
@@ -489,7 +572,8 @@ class CacheManager:
"""Insert-or-replace one batch of *rows* by the table's unique key.""" """Insert-or-replace one batch of *rows* by the table's unique key."""
col_list = quote_list(columns) col_list = quote_list(columns)
placeholders = ", ".join("?" * len(columns)) placeholders = ", ".join("?" * len(columns))
clean_rows = [coerce_row(row) for row in rows] coerce = self._row_coercer(table, columns)
clean_rows = [coerce(row) for row in rows]
with self._lock: with self._lock:
self._conn.executemany( self._conn.executemany(
f"INSERT OR REPLACE INTO {quote(table)} ({col_list}) VALUES ({placeholders})", f"INSERT OR REPLACE INTO {quote(table)} ({col_list}) VALUES ({placeholders})",
@@ -536,6 +620,72 @@ class CacheManager:
except sqlite3.Error as e: except sqlite3.Error as e:
logger.error(f"Failed to VACUUM cache file {self._db_path}: {e}") logger.error(f"Failed to VACUUM cache file {self._db_path}: {e}")
def hard_reset(self) -> None:
"""Delete the on-disk cache file and reopen it from scratch.
Unlike :meth:`reset` (which drops tables but keeps the open file, so the
baked-in ``page_size``/``auto_vacuum`` cannot change), this closes every
connection, removes the file plus its WAL/SHM sidecars, and reopens with
all current pragmas applied — so layout pragmas take effect on the fresh
file. Disk mode only; in memory mode it falls back to :meth:`reset`.
Any read in flight on another thread will see its connection closed from
under it; treat this as a maintenance operation.
"""
if self._in_memory:
self.reset()
return
logger.info(f"Hard reset: closing connections and deleting {self._db_path}")
with self._lock:
for conn in self._read_conns:
try:
conn.close()
except sqlite3.Error:
pass
self._read_conns.clear()
self._read_local = threading.local() # force every thread to reopen
self._conn.close()
for suffix in ("", "-wal", "-shm"):
p = Path(str(self._db_path) + suffix)
if p.exists():
p.unlink()
# Reopen fresh — page_size/auto_vacuum apply to the new empty file.
self._conn = self._open_disk_connection(db_existed=False)
self._ensure_meta_tables()
self._states.clear()
self._errors.clear()
self._last_run.clear()
self._error_total = 0
logger.info(f"Hard reset complete — cache recreated at {self._db_path}.")
def vacuum(self, incremental: bool = True, pages: int = 10_000) -> None:
"""Run maintenance VACUUM on the on-disk cache (no-op in memory mode).
``incremental=True`` (default) reclaims up to *pages* free pages without
blocking readers or needing extra disk space — but requires the cache to
have been created with ``auto_vacuum=INCREMENTAL`` (otherwise it is a
no-op). ``incremental=False`` runs a full ``VACUUM``: it rewrites the
whole file (needs ~2× disk space, blocks readers) — use only in a
maintenance window.
"""
if self._in_memory:
logger.debug("vacuum() called in memory mode — no-op.")
return
if incremental:
with self._lock:
self._conn.execute(f"PRAGMA incremental_vacuum({pages})")
self._conn.commit()
logger.info(f"Incremental vacuum: reclaimed up to {pages} pages.")
else:
logger.info("Full VACUUM started — this may take several minutes.")
with self._lock:
self._conn.execute("VACUUM")
logger.info("Full VACUUM complete.")
def close(self) -> None: def close(self) -> None:
self._backup_to_disk() self._backup_to_disk()
self._closed = True self._closed = True
+18 -5
View File
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime, timedelta, timezone
from typing import Any from typing import Any
from loguru import logger from loguru import logger
@@ -8,8 +8,10 @@ from ._sql import quote_source
from .cache import CacheManager from .cache import CacheManager
from .stats import TableState from .stats import TableState
_EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc)
def _bind_watermark(watermark: str) -> datetime | str:
def _bind_watermark(watermark: str | int, epoch_us: bool = False) -> datetime | str:
"""Bind the delta watermark back to the source in its native type. """Bind the delta watermark back to the source in its native type.
The cache stores the change column as an ISO ``TEXT`` string (see The cache stores the change column as an ISO ``TEXT`` string (see
@@ -22,11 +24,21 @@ def _bind_watermark(watermark: str) -> datetime | str:
driver send a typed timestamp, so the comparison happens natively with no driver send a typed timestamp, so the comparison happens natively with no
string conversion. Non-datetime change columns (e.g. an integer rowversion) string conversion. Non-datetime change columns (e.g. an integer rowversion)
don't parse and are passed through unchanged. don't parse and are passed through unchanged.
When the change column is stored as INTEGER µs-since-epoch (``datetime_columns``)
*epoch_us* is set: the watermark is a microsecond count (an ``int`` or its digit
string, since it round-trips through a TEXT column) and is reconstructed into a
UTC :class:`~datetime.datetime` so the source still receives a typed timestamp.
""" """
if epoch_us:
try:
return _EPOCH + timedelta(microseconds=int(watermark))
except (TypeError, ValueError):
return watermark if isinstance(watermark, str) else str(watermark)
try: try:
return datetime.fromisoformat(watermark) return datetime.fromisoformat(watermark) # type: ignore[arg-type]
except (TypeError, ValueError): except (TypeError, ValueError):
return watermark return watermark if isinstance(watermark, str) else str(watermark)
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -92,9 +104,10 @@ class DeltaRefresher:
cursor = source_conn.execute(f"SELECT {col_list} FROM {q_table}") cursor = source_conn.execute(f"SELECT {col_list} FROM {q_table}")
else: else:
change_col = quote_source(cfg.change_column, dialect) change_col = quote_source(cfg.change_column, dialect)
epoch_us = cfg.change_column in self._cache._datetime_columns.get(table, ())
cursor = source_conn.execute( cursor = source_conn.execute(
f"SELECT {col_list} FROM {q_table} WHERE {change_col} >= ?", f"SELECT {col_list} FROM {q_table} WHERE {change_col} >= ?",
(_bind_watermark(watermark),), (_bind_watermark(watermark, epoch_us),),
) )
# Stream the delta in batches so a large catch-up never materializes at once. # Stream the delta in batches so a large catch-up never materializes at once.
+26
View File
@@ -65,6 +65,8 @@ class CachingEngine:
refresh_interval: int | None = None, refresh_interval: int | None = None,
fetch_batch: int | None = None, fetch_batch: int | None = None,
dialect: str | None = None, dialect: str | None = None,
pragmas: dict[str, str | int] | None = None,
datetime_columns: dict[str, list[str]] | None = None,
blocking_startup_refresh: bool = False, blocking_startup_refresh: bool = False,
) -> None: ) -> None:
self._source_engine = source_engine self._source_engine = source_engine
@@ -79,6 +81,8 @@ class CachingEngine:
in_memory=use_memory, in_memory=use_memory,
dialect=self._dialect, dialect=self._dialect,
fetch_batch=fetch_batch if fetch_batch is not None else FETCH_BATCH_SIZE, fetch_batch=fetch_batch if fetch_batch is not None else FETCH_BATCH_SIZE,
pragmas=pragmas,
datetime_columns=datetime_columns,
) )
self._registry = ColumnRegistry(self._cache.connection) self._registry = ColumnRegistry(self._cache.connection)
self._stats = StatsCollector() self._stats = StatsCollector()
@@ -267,6 +271,28 @@ class CachingEngine:
self._cache.reset() self._cache.reset()
logger.info("Cache reset — all tables will be reloaded on next use.") logger.info("Cache reset — all tables will be reloaded on next use.")
def hard_reset(self) -> None:
"""Delete the on-disk cache file and reopen with current pragmas/page_size.
Disk mode only (falls back to :meth:`reset` in memory mode). Use when a
layout pragma — ``page_size`` or ``auto_vacuum`` — must change, since
those are baked into the file at creation and :meth:`reset` keeps it.
All tables reload on next use.
"""
self._cache.hard_reset()
# hard_reset swaps the cache connection — re-point the registry at it.
self._registry.rebind(self._cache.connection)
logger.info("Cache hard reset — file recreated; all tables reload on next use.")
def vacuum(self, incremental: bool = True, pages: int = 10_000) -> None:
"""Run maintenance VACUUM on the on-disk cache (incremental by default).
Incremental reclaims free pages left by delta ``INSERT OR REPLACE`` churn
cheaply (requires ``auto_vacuum=INCREMENTAL``); a full VACUUM rewrites the
whole file and should run only in a maintenance window.
"""
self._cache.vacuum(incremental=incremental, pages=pages)
def close(self) -> None: def close(self) -> None:
self._cache.close() self._cache.close()
logger.info("CachingEngine closed.") logger.info("CachingEngine closed.")
+10
View File
@@ -12,6 +12,16 @@ class ColumnRegistry:
self._lock = Lock() self._lock = Lock()
self._ensure_table() self._ensure_table()
def rebind(self, mem_conn: sqlite3.Connection) -> None:
"""Point the registry at a new cache connection (after a hard reset).
``CacheManager.hard_reset`` closes and reopens the cache connection, so the
connection object the registry captured at construction becomes invalid.
"""
with self._lock:
self._conn = mem_conn
self._ensure_table()
def _ensure_table(self) -> None: def _ensure_table(self) -> None:
self._conn.execute(""" self._conn.execute("""
CREATE TABLE IF NOT EXISTS _sqlmem_columns ( CREATE TABLE IF NOT EXISTS _sqlmem_columns (
+120
View File
@@ -168,3 +168,123 @@ def test_disk_mode_reset_keeps_file(tmp_path, source_conn):
assert db_path.exists() assert db_path.exists()
assert c.is_table_cached("users") is False assert c.is_table_cached("users") is False
c.close() c.close()
# ---------------------------------------------------------------------------
# Pragmas / layout tuning (1.11.0)
# ---------------------------------------------------------------------------
def test_pragmas_applied_on_fresh_disk_cache(tmp_path):
"""page_size, auto_vacuum and a generic pragma all take effect on a new file."""
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
in_memory=False,
pragmas={"page_size": 8192, "auto_vacuum": "INCREMENTAL", "cache_size": -2000},
)
assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 8192
assert c.connection.execute("PRAGMA auto_vacuum").fetchone()[0] == 2 # INCREMENTAL
assert c.connection.execute("PRAGMA cache_size").fetchone()[0] == -2000
c.close()
def test_page_size_ignored_on_existing_file_warns(tmp_path):
"""A page_size that differs from the existing file is ignored, with a warning."""
db_path = tmp_path / "cache.db"
c1 = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False)
assert c1.connection.execute("PRAGMA page_size").fetchone()[0] == 4096 # default
c1.close()
c2 = CacheManager(
db_path=db_path,
backup_interval=9999,
in_memory=False,
pragmas={"page_size": 16384},
)
# File keeps its original page size; the request is ignored (not an error).
assert c2.connection.execute("PRAGMA page_size").fetchone()[0] == 4096
c2.close()
def test_unknown_pragma_does_not_crash(tmp_path):
"""SQLite ignores unknown/inapplicable pragmas — startup must not fail."""
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
in_memory=False,
pragmas={"this_is_not_a_pragma": 1, "mmap_size": 1024 * 1024},
)
assert c.connection.execute("PRAGMA mmap_size").fetchone()[0] == 1024 * 1024
c.close()
# ---------------------------------------------------------------------------
# hard_reset / vacuum (1.11.0)
# ---------------------------------------------------------------------------
def test_hard_reset_recreates_file_and_clears_tables(tmp_path, source_conn):
db_path = tmp_path / "cache.db"
c = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False)
c.load_table("users", ["name"], source_conn)
assert c.is_table_cached("users") is True
c.hard_reset()
assert db_path.exists() # reopened fresh
assert c.is_table_cached("users") is False
# The connection is usable again after the swap.
c.load_table("users", ["name"], source_conn)
assert c.is_table_cached("users") is True
c.close()
def test_hard_reset_applies_new_page_size(tmp_path, source_conn):
"""page_size can't change via reset() but does via hard_reset() (fresh file)."""
db_path = tmp_path / "cache.db"
# Existing file at the default 4096; request 8192 — ignored on open.
CacheManager(db_path=db_path, backup_interval=9999, in_memory=False).close()
c = CacheManager(
db_path=db_path,
backup_interval=9999,
in_memory=False,
pragmas={"page_size": 8192},
)
assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 4096
c.hard_reset() # deletes the file → recreated with the requested page size
assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 8192
c.close()
def test_hard_reset_in_memory_falls_back_to_reset(tmp_path, source_conn):
c = CacheManager(db_path=tmp_path / "cache.db", backup_interval=9999)
c.load_table("users", ["name"], source_conn)
c.hard_reset() # memory mode → reset()
assert c.is_table_cached("users") is False
c.close()
def test_full_vacuum_runs_on_disk(tmp_path, source_conn):
db_path = tmp_path / "cache.db"
c = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False)
c.load_table("users", ["name"], source_conn)
c.vacuum(incremental=False) # must not raise
assert c.is_table_cached("users") is True
c.close()
def test_incremental_vacuum_runs_with_auto_vacuum(tmp_path, source_conn):
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
in_memory=False,
pragmas={"auto_vacuum": "INCREMENTAL"},
)
c.load_table("users", ["name"], source_conn)
c.vacuum(incremental=True, pages=100) # must not raise
assert c.is_table_cached("users") is True
c.close()
def test_vacuum_in_memory_is_noop(cache, source_conn):
cache.load_table("users", ["name"], source_conn)
cache.vacuum(incremental=False) # no-op, no error
assert cache.is_table_cached("users") is True
+75 -1
View File
@@ -4,7 +4,7 @@ import uuid
import pytest import pytest
from sqlmem._coerce import coerce_params, to_sqlite from sqlmem._coerce import coerce_params, to_sqlite, to_sqlite_datetime
from sqlmem.cache import CacheManager from sqlmem.cache import CacheManager
@@ -91,6 +91,80 @@ def test_coerce_params_none():
assert coerce_params(None) is None assert coerce_params(None) is None
# --- to_sqlite_datetime (INTEGER µs storage, 1.12.0) ------------------------
def test_datetime_to_epoch_micros():
# 2026-06-01T10:00:00Z -> microseconds since epoch
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc)
expected = int(dt.timestamp() * 1_000_000)
assert to_sqlite_datetime(dt) == expected
def test_datetime_naive_treated_as_utc():
naive = datetime.datetime(2026, 6, 1, 10, 0, 0)
aware = naive.replace(tzinfo=datetime.timezone.utc)
assert to_sqlite_datetime(naive) == to_sqlite_datetime(aware)
def test_datetime_micros_are_exact():
dt = datetime.datetime(2026, 6, 5, 14, 54, 24, 823000, tzinfo=datetime.timezone.utc)
us = to_sqlite_datetime(dt)
# round-trips back to the same instant with no rounding loss
back = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(
microseconds=us
)
assert back == dt
def test_datetime_none_passes_through():
assert to_sqlite_datetime(None) is None
def test_datetime_iso_string_parsed():
assert to_sqlite_datetime("2026-06-01T10:00:00+00:00") == to_sqlite_datetime(
datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc)
)
def test_datetime_unparseable_is_none():
assert to_sqlite_datetime("not a date") is None
# --- integration: datetime_columns are stored as INTEGER --------------------
def test_datetime_column_stored_as_integer(tmp_path):
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
datetime_columns={"t": ["changed"]},
)
dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc)
c.load_table("t", ["id", "changed"], FakeSource([("1", dt)]))
# Column declared INTEGER, value stored as µs-since-epoch.
coltype = c.connection.execute("PRAGMA table_info(t)").fetchall()
types = {row[1]: row[2] for row in coltype}
assert types["changed"] == "INTEGER"
assert types["id"] == "TEXT"
_, out = c.execute_in_memory("SELECT changed FROM t")
assert out == [(to_sqlite_datetime(dt),)]
c.close()
def test_non_datetime_columns_unaffected_by_datetime_columns(tmp_path):
c = CacheManager(
db_path=tmp_path / "cache.db",
backup_interval=9999,
datetime_columns={"t": ["changed"]},
)
c.load_table("t", ["id", "price"], FakeSource([("1", decimal.Decimal("9.99"))]))
_, out = c.execute_in_memory("SELECT id, price FROM t")
assert out == [("1", "9.99")] # still TEXT/ISO coercion
c.close()
# --- integration: values reach the cache through coercion ------------------- # --- integration: values reach the cache through coercion -------------------
+53 -1
View File
@@ -1,6 +1,6 @@
import sqlite3 import sqlite3
import threading import threading
from datetime import datetime from datetime import datetime, timezone
from types import SimpleNamespace from types import SimpleNamespace
import pytest import pytest
@@ -140,6 +140,18 @@ def test_bind_watermark_passes_through_non_datetime():
assert _bind_watermark("12345") == "12345" assert _bind_watermark("12345") == "12345"
# --- INTEGER µs watermark binding (datetime_columns, 1.12.0) ----------------
def test_bind_watermark_epoch_us_reconstructs_datetime():
dt = datetime(2026, 6, 5, 14, 54, 24, 823000, tzinfo=timezone.utc)
us = int(dt.timestamp() * 1_000_000)
# Whether the watermark is an int or its digit string (it round-trips through
# the TEXT last_synced_at column), it binds back to the same UTC datetime.
assert _bind_watermark(us, epoch_us=True) == dt
assert _bind_watermark(str(us), epoch_us=True) == dt
class _SpyCursor: class _SpyCursor:
def __init__(self, rows): def __init__(self, rows):
self._rows = list(rows) self._rows = list(rows)
@@ -174,6 +186,46 @@ def test_refresh_binds_watermark_as_datetime(env):
assert params == (datetime(2026, 6, 5, 14, 54, 24, 823000),) assert params == (datetime(2026, 6, 5, 14, 54, 24, 823000),)
class _RowSource:
"""Returns fixed rows for any query (for loading datetime-typed source data)."""
def __init__(self, rows):
self._rows = rows
def execute(self, sql, params=()):
return _SpyCursor(self._rows)
def test_datetime_column_watermark_stored_as_int_and_bound_back(tmp_path):
"""A change column declared in datetime_columns is stored as INTEGER µs; the
watermark is bound back to a real datetime for the source query."""
cache = CacheManager(
db_path=tmp_path / "c.db",
backup_interval=9999,
datetime_columns={"products": ["changed"]},
)
dt1 = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc)
dt2 = datetime(2026, 6, 1, 10, 5, 0, tzinfo=timezone.utc)
cache.load_table("products", ["id", "changed"], _RowSource([("1", dt1), ("2", dt2)]))
cache.create_unique_index("products", ["id"])
cache.set_last_synced_at("products", cache.max_value("products", "changed"))
# Watermark persisted as the max INTEGER µs (digit string out of the TEXT col).
wm = cache.get_last_synced_at("products")
assert wm == str(int(dt2.timestamp() * 1_000_000))
refresher = DeltaRefresher(
cache, {"products": ResolvedDelta("changed", ["id"])}
)
spy = _SpySource(rows=[]) # no new rows — just capture the bound watermark
refresher.refresh(spy)
assert spy.bound, "source query was never issued"
_, params = spy.bound[-1]
assert params == (dt2,) # bound back as datetime, not an int/string
cache.close()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Refresh failures are recorded (4.3) so a stuck delta is visible in stats # Refresh failures are recorded (4.3) so a stuck delta is visible in stats
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
+36
View File
@@ -385,3 +385,39 @@ def test_two_engines_separate_cache_files(source_engine, tmp_path):
assert b._cache.is_table_cached("products") is False # independent cache assert b._cache.is_table_cached("products") is False # independent cache
a.close() a.close()
b.close() b.close()
# ---------------------------------------------------------------------------
# Pragmas / hard_reset / vacuum (1.11.0)
# ---------------------------------------------------------------------------
def test_engine_passes_pragmas_to_cache(source_engine, tmp_path):
ce = CachingEngine(
source_engine,
cache_db_path=tmp_path / "cache.db",
in_memory=False,
pragmas={"page_size": 8192, "auto_vacuum": "INCREMENTAL"},
)
assert ce._cache.connection.execute("PRAGMA page_size").fetchone()[0] == 8192
assert ce._cache.connection.execute("PRAGMA auto_vacuum").fetchone()[0] == 2
ce.close()
def test_engine_hard_reset_reloads(source_engine, tmp_path):
ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False)
ce.execute("SELECT id FROM products")
assert ce._cache.is_table_cached("products") is True
ce.hard_reset()
assert ce._cache.is_table_cached("products") is False
rows = ce.execute("SELECT id, name FROM products") # reloads on next use
assert len(rows) == 3
ce.close()
def test_engine_vacuum_runs(source_engine, tmp_path):
ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False)
ce.execute("SELECT id FROM products")
ce.vacuum(incremental=False) # must not raise
assert ce._cache.is_table_cached("products") is True
ce.close()