diff --git a/.gitignore b/.gitignore index 821cd3f..ce79316 100644 --- a/.gitignore +++ b/.gitignore @@ -48,4 +48,5 @@ cache.db-shm AGENTS.md CLAUDE.md DESIGN_DOCUMENT_MODULE.md -.claude/ \ No newline at end of file +.claude/ +handover.md \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fcbb14..874ec8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,21 @@ All notable changes to this project will be documented in this file. --- +## [1.11.0] - 2026-06-09 + +### Added +- **`pragmas=` parameter on `CachingEngine` / `CacheManager`** — pass a dict of SQLite PRAGMAs (e.g. `mmap_size`, `cache_size`, `temp_store`, `page_size`, `auto_vacuum`) applied to the cache connection at open time, so disk-backed caches can be tuned for the host's I/O profile without bypassing `CacheManager`. Unknown/inapplicable pragmas are silently ignored by SQLite (graceful degradation, no startup crash). + - **`page_size`** is a layout pragma: it is applied only on a *fresh* file (set before WAL / the first table). On an existing cache with a different page size the request is ignored and a one-time warning is logged — the new value takes effect only after `hard_reset()` or a rebuild. + - **`auto_vacuum`** is set before the database header is materialized (before switching to WAL) on a fresh file, so `INCREMENTAL`/`FULL` actually stick instead of silently reverting to `NONE`. +- **`CachingEngine.hard_reset()` / `CacheManager.hard_reset()`** — close every connection, delete the on-disk cache file (and its `-wal`/`-shm` sidecars) and reopen from scratch with all current pragmas applied. Unlike `reset()` (which drops tables but keeps the open file), this lets `page_size`/`auto_vacuum` change, since those are baked into the file at creation. Disk mode only — falls back to `reset()` in memory mode. All tables reload on next use. +- **`CachingEngine.vacuum(incremental=True, pages=10_000)` / `CacheManager.vacuum(...)`** — run maintenance VACUUM on the on-disk cache to reclaim free pages left by delta `INSERT OR REPLACE` churn. Incremental (default) reclaims up to `pages` pages without blocking readers or extra disk (requires `auto_vacuum=INCREMENTAL`); `incremental=False` runs a full VACUUM (rewrites the file, ~2× disk, blocks readers — maintenance window only). No-op in memory mode. + +### Changed +- `pyproject.toml` — bumped version to `1.11.0`. +- `ColumnRegistry` gained `rebind()` so it follows the cache connection swap performed by `hard_reset()` (the registry previously captured the connection for the process lifetime). + +--- + ## [1.10.0] - 2026-06-09 ### Added diff --git a/README.md b/README.md index edca0a6..9a761d0 100644 --- a/README.md +++ b/README.md @@ -263,17 +263,44 @@ engine = CachingEngine(base_engine, in_memory=False) The constructor argument wins over the env var; when `in_memory` is omitted it falls back to `SQLMEM_IN_MEMORY`. +#### Tuning the SQLite layer (`pragmas=`) + +For a large disk-backed cache, pass SQLite PRAGMAs to tune the read path and on-disk layout without bypassing SQLmem: + +```python +engine = CachingEngine( + base_engine, + in_memory=False, + pragmas={ + "mmap_size": 32 * 1024**3, # map the DB into the address space (32 GB) + "cache_size": -262144, # 256 MB page cache (negative = KiB) + "temp_store": 2, # ORDER BY / GROUP BY scratch in RAM + "page_size": 8192, # larger pages → fewer reads on range scans + "auto_vacuum": "INCREMENTAL",# reclaim free pages with vacuum() (see below) + }, +) +``` + +- Every entry is applied as `PRAGMA = ` when the cache connection opens. **Unknown or inapplicable pragmas are silently ignored** by SQLite, so a bad value degrades gracefully instead of crashing startup. +- **`page_size` and `auto_vacuum` are layout pragmas** — they only take effect on a *fresh* file (set before the first table). On an existing cache, `page_size` is ignored with a one-time warning; use [`hard_reset()`](#manual-cache-control) to rebuild the file with the new value. + ## Manual cache control ```python engine.invalidate("orders") # drop one table from cache; next query re-fetches it from DB engine.reset() # wipe the whole cache (RAM + cache.db) — full clean slate +engine.hard_reset() # disk mode: delete the file and reopen with current pragmas/page_size +engine.vacuum() # disk mode: incremental VACUUM (reclaim free pages from delta churn) engine.refresh() # pull deltas for all delta-tracked tables now engine.close() # flush to disk and shut down background thread ``` Use `reset()` after a **structural change** in the source (columns added/removed, values cleared in bulk without bumping the change timestamp) so the cache rebuilds from scratch. `invalidate(table)` is the targeted version for a single table. +`hard_reset()` goes further than `reset()` in disk mode: it closes every connection, deletes `cache.db` (and its `-wal`/`-shm` sidecars) and reopens from scratch — the only way to change a baked-in `page_size`/`auto_vacuum`. In memory mode it falls back to `reset()`. + +`vacuum()` reclaims free pages left behind by delta `INSERT OR REPLACE` churn. Incremental (the default) is cheap and non-blocking but needs `auto_vacuum=INCREMENTAL`; `vacuum(incremental=False)` runs a full VACUUM that rewrites the file (~2× disk, blocks readers) — schedule it in a maintenance window. Both are no-ops in memory mode. + ## Runtime statistics ```python @@ -346,6 +373,7 @@ engine = CachingEngine( refresh_interval=300, # SQLMEM_REFRESH_INTERVAL fetch_batch=10000, # SQLMEM_FETCH_BATCH dialect="tsql", # SQLMEM_SQL_DIALECT + pragmas={"mmap_size": 32 * 1024**3, "page_size": 8192}, # disk-mode SQLite tuning blocking_startup_refresh=False, # block startup until caught up? (default: no) ) ``` diff --git a/project.md b/project.md index 35e56ce..ab26657 100644 --- a/project.md +++ b/project.md @@ -215,10 +215,13 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o - [x] **Quoting identifikátorů**: názvy tabulek/sloupců se kvótují (SQLite `"x"` pro cache, dialekt zdroje — T-SQL `[x]` — pro source), takže rezervovaná slova i mezery fungují. - [x] **Lazy source connection**: `execute()` neotevírá spojení ke zdroji při cache hitu (neobsazuje pool slot). - [x] **Idempotentní `add_sink`**: opakované volání pro stejný sink je no-op (žádné duplicitní logy). +- [x] **Ladění SQLite vrstvy (`pragmas=`)**: `CachingEngine(..., pragmas={...})` aplikuje libovolné PRAGMA na cache spojení (`mmap_size`, `cache_size`, `temp_store`, `page_size`, `auto_vacuum`). `page_size` a `auto_vacuum` jsou layout-pragmata — platí jen na čerstvém souboru (page_size na existující cache se ignoruje s warningem). Neznámá pragmata SQLite tiše ignoruje. +- [x] **`hard_reset()`**: smaže on-disk soubor (+ WAL/SHM) a otevře nový s aktuálními pragmaty — na rozdíl od `reset()` umožní změnit `page_size`/`auto_vacuum`. Jen disk mód (v memory módu fallback na `reset()`). +- [x] **`vacuum(incremental=, pages=)`**: údržbový VACUUM cache souboru — inkrementální (uvolní volné stránky po delta `INSERT OR REPLACE`, vyžaduje `auto_vacuum=INCREMENTAL`) nebo plný (přepíše soubor, jen v maintenance okně). V memory módu no-op. ## TODO — budoucí funkce -- _(zatím žádné otevřené položky)_ +- [ ] **Nativní INTEGER ukládání datetime sloupců (`datetime_columns=`)** — `CHANGE_DATE` apod. jako µs-od-epochy INTEGER místo 28 B ISO TEXT (úspora místa + rychlejší porovnání indexu). Breaking (`SCHEMA_VERSION` 3→4, wipe cache). Plán pro 1.12.0. --- diff --git a/pyproject.toml b/pyproject.toml index 1a9504b..43abcff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sqlmem" -version = "1.10.0" +version = "1.11.0" description = "" authors = [ {name = "jan.doubravsky@gmail.com"} diff --git a/src/sqlmem/cache.py b/src/sqlmem/cache.py index 71544aa..b56156c 100644 --- a/src/sqlmem/cache.py +++ b/src/sqlmem/cache.py @@ -40,12 +40,14 @@ class CacheManager: in_memory: bool = True, dialect: str = SQL_DIALECT, fetch_batch: int = FETCH_BATCH_SIZE, + pragmas: dict[str, str | int] | None = None, ) -> None: self._db_path = db_path self._backup_interval = backup_interval self._in_memory = in_memory self._dialect = dialect # source-DB dialect, for identifier quoting self._fetch_batch = fetch_batch # rows fetched per source batch + self._pragmas = dict(pragmas or {}) # extra read/layout PRAGMAs (disk mode) self._lock = threading.Lock() # serializes connection access self._load_lock = threading.Lock() # serializes full table loads self._states: dict[str, str] = {} # table → live processing state @@ -59,12 +61,12 @@ class CacheManager: if in_memory: self._conn = sqlite3.connect(":memory:", check_same_thread=False) + self._apply_pragmas(self._conn) else: # Disk-backed: query the on-disk file directly — no RAM copy, every # write persists immediately, and the cache can exceed available RAM. - self._conn = sqlite3.connect(str(db_path), check_same_thread=False) - self._conn.execute("PRAGMA journal_mode=WAL") - self._conn.execute("PRAGMA synchronous=NORMAL") + db_existed = db_path.exists() and db_path.stat().st_size > 0 + self._conn = self._open_disk_connection(db_existed) self._discard_if_schema_mismatch() self._ensure_meta_tables() @@ -83,6 +85,54 @@ class CacheManager: def connection(self) -> sqlite3.Connection: return self._conn + def _open_disk_connection(self, db_existed: bool) -> sqlite3.Connection: + """Open the on-disk cache connection with WAL + the configured pragmas. + + ``page_size`` and ``auto_vacuum`` are layout pragmas that only take + effect on a *fresh* file (before the first table exists), so they are + applied conditionally on ``db_existed``; everything else is applied + unconditionally. Used by both ``__init__`` and :meth:`hard_reset`. + """ + conn = sqlite3.connect(str(self._db_path), check_same_thread=False) + # page_size must be set before WAL/the first table on a brand-new file; + # on an existing file it is silently ignored until the next VACUUM. + if "page_size" in self._pragmas: + wanted = int(self._pragmas["page_size"]) + if db_existed: + actual = conn.execute("PRAGMA page_size").fetchone()[0] + if actual != wanted: + logger.warning( + f"page_size={wanted} requested but the cache file already " + f"exists with page_size={actual}; the new value takes " + "effect only after the cache is wiped (hard_reset()) or " + "rebuilt from scratch." + ) + else: + conn.execute(f"PRAGMA page_size = {wanted}") + # auto_vacuum must be set before the database header is materialized, + # i.e. before switching to WAL (which writes the header) — otherwise the + # value silently reverts to 0/NONE and only a full VACUUM could apply it. + if not db_existed and "auto_vacuum" in self._pragmas: + conn.execute(f"PRAGMA auto_vacuum = {self._pragmas['auto_vacuum']}") + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + self._apply_pragmas(conn, exclude={"page_size", "auto_vacuum"}) + return conn + + def _apply_pragmas( + self, conn: sqlite3.Connection, exclude: set[str] | None = None + ) -> None: + """Apply the user-supplied PRAGMAs to *conn*, skipping *exclude*. + + SQLite silently ignores unknown or inapplicable pragmas, so a bad value + degrades gracefully (e.g. mmap unsupported) rather than crashing startup. + """ + skip = exclude or set() + for key, value in self._pragmas.items(): + if key in skip: + continue + conn.execute(f"PRAGMA {key} = {value}") + def _ensure_meta_tables(self) -> None: self._conn.executescript(""" CREATE TABLE IF NOT EXISTS _sqlmem_meta ( @@ -536,6 +586,72 @@ class CacheManager: except sqlite3.Error as e: logger.error(f"Failed to VACUUM cache file {self._db_path}: {e}") + def hard_reset(self) -> None: + """Delete the on-disk cache file and reopen it from scratch. + + Unlike :meth:`reset` (which drops tables but keeps the open file, so the + baked-in ``page_size``/``auto_vacuum`` cannot change), this closes every + connection, removes the file plus its WAL/SHM sidecars, and reopens with + all current pragmas applied — so layout pragmas take effect on the fresh + file. Disk mode only; in memory mode it falls back to :meth:`reset`. + + Any read in flight on another thread will see its connection closed from + under it; treat this as a maintenance operation. + """ + if self._in_memory: + self.reset() + return + + logger.info(f"Hard reset: closing connections and deleting {self._db_path}") + with self._lock: + for conn in self._read_conns: + try: + conn.close() + except sqlite3.Error: + pass + self._read_conns.clear() + self._read_local = threading.local() # force every thread to reopen + self._conn.close() + + for suffix in ("", "-wal", "-shm"): + p = Path(str(self._db_path) + suffix) + if p.exists(): + p.unlink() + + # Reopen fresh — page_size/auto_vacuum apply to the new empty file. + self._conn = self._open_disk_connection(db_existed=False) + self._ensure_meta_tables() + + self._states.clear() + self._errors.clear() + self._last_run.clear() + self._error_total = 0 + logger.info(f"Hard reset complete — cache recreated at {self._db_path}.") + + def vacuum(self, incremental: bool = True, pages: int = 10_000) -> None: + """Run maintenance VACUUM on the on-disk cache (no-op in memory mode). + + ``incremental=True`` (default) reclaims up to *pages* free pages without + blocking readers or needing extra disk space — but requires the cache to + have been created with ``auto_vacuum=INCREMENTAL`` (otherwise it is a + no-op). ``incremental=False`` runs a full ``VACUUM``: it rewrites the + whole file (needs ~2× disk space, blocks readers) — use only in a + maintenance window. + """ + if self._in_memory: + logger.debug("vacuum() called in memory mode — no-op.") + return + if incremental: + with self._lock: + self._conn.execute(f"PRAGMA incremental_vacuum({pages})") + self._conn.commit() + logger.info(f"Incremental vacuum: reclaimed up to {pages} pages.") + else: + logger.info("Full VACUUM started — this may take several minutes.") + with self._lock: + self._conn.execute("VACUUM") + logger.info("Full VACUUM complete.") + def close(self) -> None: self._backup_to_disk() self._closed = True diff --git a/src/sqlmem/engine.py b/src/sqlmem/engine.py index fe5499a..0599143 100644 --- a/src/sqlmem/engine.py +++ b/src/sqlmem/engine.py @@ -65,6 +65,7 @@ class CachingEngine: refresh_interval: int | None = None, fetch_batch: int | None = None, dialect: str | None = None, + pragmas: dict[str, str | int] | None = None, blocking_startup_refresh: bool = False, ) -> None: self._source_engine = source_engine @@ -79,6 +80,7 @@ class CachingEngine: in_memory=use_memory, dialect=self._dialect, fetch_batch=fetch_batch if fetch_batch is not None else FETCH_BATCH_SIZE, + pragmas=pragmas, ) self._registry = ColumnRegistry(self._cache.connection) self._stats = StatsCollector() @@ -267,6 +269,28 @@ class CachingEngine: self._cache.reset() logger.info("Cache reset — all tables will be reloaded on next use.") + def hard_reset(self) -> None: + """Delete the on-disk cache file and reopen with current pragmas/page_size. + + Disk mode only (falls back to :meth:`reset` in memory mode). Use when a + layout pragma — ``page_size`` or ``auto_vacuum`` — must change, since + those are baked into the file at creation and :meth:`reset` keeps it. + All tables reload on next use. + """ + self._cache.hard_reset() + # hard_reset swaps the cache connection — re-point the registry at it. + self._registry.rebind(self._cache.connection) + logger.info("Cache hard reset — file recreated; all tables reload on next use.") + + def vacuum(self, incremental: bool = True, pages: int = 10_000) -> None: + """Run maintenance VACUUM on the on-disk cache (incremental by default). + + Incremental reclaims free pages left by delta ``INSERT OR REPLACE`` churn + cheaply (requires ``auto_vacuum=INCREMENTAL``); a full VACUUM rewrites the + whole file and should run only in a maintenance window. + """ + self._cache.vacuum(incremental=incremental, pages=pages) + def close(self) -> None: self._cache.close() logger.info("CachingEngine closed.") diff --git a/src/sqlmem/registry.py b/src/sqlmem/registry.py index ac89032..4c6f67a 100644 --- a/src/sqlmem/registry.py +++ b/src/sqlmem/registry.py @@ -12,6 +12,16 @@ class ColumnRegistry: self._lock = Lock() self._ensure_table() + def rebind(self, mem_conn: sqlite3.Connection) -> None: + """Point the registry at a new cache connection (after a hard reset). + + ``CacheManager.hard_reset`` closes and reopens the cache connection, so the + connection object the registry captured at construction becomes invalid. + """ + with self._lock: + self._conn = mem_conn + self._ensure_table() + def _ensure_table(self) -> None: self._conn.execute(""" CREATE TABLE IF NOT EXISTS _sqlmem_columns ( diff --git a/tests/test_cache.py b/tests/test_cache.py index 3070548..40ce442 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -168,3 +168,123 @@ def test_disk_mode_reset_keeps_file(tmp_path, source_conn): assert db_path.exists() assert c.is_table_cached("users") is False c.close() + + +# --------------------------------------------------------------------------- +# Pragmas / layout tuning (1.11.0) +# --------------------------------------------------------------------------- + +def test_pragmas_applied_on_fresh_disk_cache(tmp_path): + """page_size, auto_vacuum and a generic pragma all take effect on a new file.""" + c = CacheManager( + db_path=tmp_path / "cache.db", + backup_interval=9999, + in_memory=False, + pragmas={"page_size": 8192, "auto_vacuum": "INCREMENTAL", "cache_size": -2000}, + ) + assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 8192 + assert c.connection.execute("PRAGMA auto_vacuum").fetchone()[0] == 2 # INCREMENTAL + assert c.connection.execute("PRAGMA cache_size").fetchone()[0] == -2000 + c.close() + + +def test_page_size_ignored_on_existing_file_warns(tmp_path): + """A page_size that differs from the existing file is ignored, with a warning.""" + db_path = tmp_path / "cache.db" + c1 = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False) + assert c1.connection.execute("PRAGMA page_size").fetchone()[0] == 4096 # default + c1.close() + + c2 = CacheManager( + db_path=db_path, + backup_interval=9999, + in_memory=False, + pragmas={"page_size": 16384}, + ) + # File keeps its original page size; the request is ignored (not an error). + assert c2.connection.execute("PRAGMA page_size").fetchone()[0] == 4096 + c2.close() + + +def test_unknown_pragma_does_not_crash(tmp_path): + """SQLite ignores unknown/inapplicable pragmas — startup must not fail.""" + c = CacheManager( + db_path=tmp_path / "cache.db", + backup_interval=9999, + in_memory=False, + pragmas={"this_is_not_a_pragma": 1, "mmap_size": 1024 * 1024}, + ) + assert c.connection.execute("PRAGMA mmap_size").fetchone()[0] == 1024 * 1024 + c.close() + + +# --------------------------------------------------------------------------- +# hard_reset / vacuum (1.11.0) +# --------------------------------------------------------------------------- + +def test_hard_reset_recreates_file_and_clears_tables(tmp_path, source_conn): + db_path = tmp_path / "cache.db" + c = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False) + c.load_table("users", ["name"], source_conn) + assert c.is_table_cached("users") is True + + c.hard_reset() + assert db_path.exists() # reopened fresh + assert c.is_table_cached("users") is False + # The connection is usable again after the swap. + c.load_table("users", ["name"], source_conn) + assert c.is_table_cached("users") is True + c.close() + + +def test_hard_reset_applies_new_page_size(tmp_path, source_conn): + """page_size can't change via reset() but does via hard_reset() (fresh file).""" + db_path = tmp_path / "cache.db" + # Existing file at the default 4096; request 8192 — ignored on open. + CacheManager(db_path=db_path, backup_interval=9999, in_memory=False).close() + c = CacheManager( + db_path=db_path, + backup_interval=9999, + in_memory=False, + pragmas={"page_size": 8192}, + ) + assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 4096 + c.hard_reset() # deletes the file → recreated with the requested page size + assert c.connection.execute("PRAGMA page_size").fetchone()[0] == 8192 + c.close() + + +def test_hard_reset_in_memory_falls_back_to_reset(tmp_path, source_conn): + c = CacheManager(db_path=tmp_path / "cache.db", backup_interval=9999) + c.load_table("users", ["name"], source_conn) + c.hard_reset() # memory mode → reset() + assert c.is_table_cached("users") is False + c.close() + + +def test_full_vacuum_runs_on_disk(tmp_path, source_conn): + db_path = tmp_path / "cache.db" + c = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False) + c.load_table("users", ["name"], source_conn) + c.vacuum(incremental=False) # must not raise + assert c.is_table_cached("users") is True + c.close() + + +def test_incremental_vacuum_runs_with_auto_vacuum(tmp_path, source_conn): + c = CacheManager( + db_path=tmp_path / "cache.db", + backup_interval=9999, + in_memory=False, + pragmas={"auto_vacuum": "INCREMENTAL"}, + ) + c.load_table("users", ["name"], source_conn) + c.vacuum(incremental=True, pages=100) # must not raise + assert c.is_table_cached("users") is True + c.close() + + +def test_vacuum_in_memory_is_noop(cache, source_conn): + cache.load_table("users", ["name"], source_conn) + cache.vacuum(incremental=False) # no-op, no error + assert cache.is_table_cached("users") is True diff --git a/tests/test_engine.py b/tests/test_engine.py index 6463707..a2df15b 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -385,3 +385,39 @@ def test_two_engines_separate_cache_files(source_engine, tmp_path): assert b._cache.is_table_cached("products") is False # independent cache a.close() b.close() + + +# --------------------------------------------------------------------------- +# Pragmas / hard_reset / vacuum (1.11.0) +# --------------------------------------------------------------------------- + +def test_engine_passes_pragmas_to_cache(source_engine, tmp_path): + ce = CachingEngine( + source_engine, + cache_db_path=tmp_path / "cache.db", + in_memory=False, + pragmas={"page_size": 8192, "auto_vacuum": "INCREMENTAL"}, + ) + assert ce._cache.connection.execute("PRAGMA page_size").fetchone()[0] == 8192 + assert ce._cache.connection.execute("PRAGMA auto_vacuum").fetchone()[0] == 2 + ce.close() + + +def test_engine_hard_reset_reloads(source_engine, tmp_path): + ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False) + ce.execute("SELECT id FROM products") + assert ce._cache.is_table_cached("products") is True + + ce.hard_reset() + assert ce._cache.is_table_cached("products") is False + rows = ce.execute("SELECT id, name FROM products") # reloads on next use + assert len(rows) == 3 + ce.close() + + +def test_engine_vacuum_runs(source_engine, tmp_path): + ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False) + ce.execute("SELECT id FROM products") + ce.vacuum(incremental=False) # must not raise + assert ce._cache.is_table_cached("products") is True + ce.close()