Add pragmas, hard_reset, and vacuum for tuning disk-backed caches
This commit is contained in:
+119
-3
@@ -40,12 +40,14 @@ class CacheManager:
|
||||
in_memory: bool = True,
|
||||
dialect: str = SQL_DIALECT,
|
||||
fetch_batch: int = FETCH_BATCH_SIZE,
|
||||
pragmas: dict[str, str | int] | None = None,
|
||||
) -> None:
|
||||
self._db_path = db_path
|
||||
self._backup_interval = backup_interval
|
||||
self._in_memory = in_memory
|
||||
self._dialect = dialect # source-DB dialect, for identifier quoting
|
||||
self._fetch_batch = fetch_batch # rows fetched per source batch
|
||||
self._pragmas = dict(pragmas or {}) # extra read/layout PRAGMAs (disk mode)
|
||||
self._lock = threading.Lock() # serializes connection access
|
||||
self._load_lock = threading.Lock() # serializes full table loads
|
||||
self._states: dict[str, str] = {} # table → live processing state
|
||||
@@ -59,12 +61,12 @@ class CacheManager:
|
||||
|
||||
if in_memory:
|
||||
self._conn = sqlite3.connect(":memory:", check_same_thread=False)
|
||||
self._apply_pragmas(self._conn)
|
||||
else:
|
||||
# Disk-backed: query the on-disk file directly — no RAM copy, every
|
||||
# write persists immediately, and the cache can exceed available RAM.
|
||||
self._conn = sqlite3.connect(str(db_path), check_same_thread=False)
|
||||
self._conn.execute("PRAGMA journal_mode=WAL")
|
||||
self._conn.execute("PRAGMA synchronous=NORMAL")
|
||||
db_existed = db_path.exists() and db_path.stat().st_size > 0
|
||||
self._conn = self._open_disk_connection(db_existed)
|
||||
self._discard_if_schema_mismatch()
|
||||
|
||||
self._ensure_meta_tables()
|
||||
@@ -83,6 +85,54 @@ class CacheManager:
|
||||
def connection(self) -> sqlite3.Connection:
|
||||
return self._conn
|
||||
|
||||
def _open_disk_connection(self, db_existed: bool) -> sqlite3.Connection:
|
||||
"""Open the on-disk cache connection with WAL + the configured pragmas.
|
||||
|
||||
``page_size`` and ``auto_vacuum`` are layout pragmas that only take
|
||||
effect on a *fresh* file (before the first table exists), so they are
|
||||
applied conditionally on ``db_existed``; everything else is applied
|
||||
unconditionally. Used by both ``__init__`` and :meth:`hard_reset`.
|
||||
"""
|
||||
conn = sqlite3.connect(str(self._db_path), check_same_thread=False)
|
||||
# page_size must be set before WAL/the first table on a brand-new file;
|
||||
# on an existing file it is silently ignored until the next VACUUM.
|
||||
if "page_size" in self._pragmas:
|
||||
wanted = int(self._pragmas["page_size"])
|
||||
if db_existed:
|
||||
actual = conn.execute("PRAGMA page_size").fetchone()[0]
|
||||
if actual != wanted:
|
||||
logger.warning(
|
||||
f"page_size={wanted} requested but the cache file already "
|
||||
f"exists with page_size={actual}; the new value takes "
|
||||
"effect only after the cache is wiped (hard_reset()) or "
|
||||
"rebuilt from scratch."
|
||||
)
|
||||
else:
|
||||
conn.execute(f"PRAGMA page_size = {wanted}")
|
||||
# auto_vacuum must be set before the database header is materialized,
|
||||
# i.e. before switching to WAL (which writes the header) — otherwise the
|
||||
# value silently reverts to 0/NONE and only a full VACUUM could apply it.
|
||||
if not db_existed and "auto_vacuum" in self._pragmas:
|
||||
conn.execute(f"PRAGMA auto_vacuum = {self._pragmas['auto_vacuum']}")
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA synchronous=NORMAL")
|
||||
self._apply_pragmas(conn, exclude={"page_size", "auto_vacuum"})
|
||||
return conn
|
||||
|
||||
def _apply_pragmas(
|
||||
self, conn: sqlite3.Connection, exclude: set[str] | None = None
|
||||
) -> None:
|
||||
"""Apply the user-supplied PRAGMAs to *conn*, skipping *exclude*.
|
||||
|
||||
SQLite silently ignores unknown or inapplicable pragmas, so a bad value
|
||||
degrades gracefully (e.g. mmap unsupported) rather than crashing startup.
|
||||
"""
|
||||
skip = exclude or set()
|
||||
for key, value in self._pragmas.items():
|
||||
if key in skip:
|
||||
continue
|
||||
conn.execute(f"PRAGMA {key} = {value}")
|
||||
|
||||
def _ensure_meta_tables(self) -> None:
|
||||
self._conn.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS _sqlmem_meta (
|
||||
@@ -536,6 +586,72 @@ class CacheManager:
|
||||
except sqlite3.Error as e:
|
||||
logger.error(f"Failed to VACUUM cache file {self._db_path}: {e}")
|
||||
|
||||
def hard_reset(self) -> None:
|
||||
"""Delete the on-disk cache file and reopen it from scratch.
|
||||
|
||||
Unlike :meth:`reset` (which drops tables but keeps the open file, so the
|
||||
baked-in ``page_size``/``auto_vacuum`` cannot change), this closes every
|
||||
connection, removes the file plus its WAL/SHM sidecars, and reopens with
|
||||
all current pragmas applied — so layout pragmas take effect on the fresh
|
||||
file. Disk mode only; in memory mode it falls back to :meth:`reset`.
|
||||
|
||||
Any read in flight on another thread will see its connection closed from
|
||||
under it; treat this as a maintenance operation.
|
||||
"""
|
||||
if self._in_memory:
|
||||
self.reset()
|
||||
return
|
||||
|
||||
logger.info(f"Hard reset: closing connections and deleting {self._db_path}")
|
||||
with self._lock:
|
||||
for conn in self._read_conns:
|
||||
try:
|
||||
conn.close()
|
||||
except sqlite3.Error:
|
||||
pass
|
||||
self._read_conns.clear()
|
||||
self._read_local = threading.local() # force every thread to reopen
|
||||
self._conn.close()
|
||||
|
||||
for suffix in ("", "-wal", "-shm"):
|
||||
p = Path(str(self._db_path) + suffix)
|
||||
if p.exists():
|
||||
p.unlink()
|
||||
|
||||
# Reopen fresh — page_size/auto_vacuum apply to the new empty file.
|
||||
self._conn = self._open_disk_connection(db_existed=False)
|
||||
self._ensure_meta_tables()
|
||||
|
||||
self._states.clear()
|
||||
self._errors.clear()
|
||||
self._last_run.clear()
|
||||
self._error_total = 0
|
||||
logger.info(f"Hard reset complete — cache recreated at {self._db_path}.")
|
||||
|
||||
def vacuum(self, incremental: bool = True, pages: int = 10_000) -> None:
|
||||
"""Run maintenance VACUUM on the on-disk cache (no-op in memory mode).
|
||||
|
||||
``incremental=True`` (default) reclaims up to *pages* free pages without
|
||||
blocking readers or needing extra disk space — but requires the cache to
|
||||
have been created with ``auto_vacuum=INCREMENTAL`` (otherwise it is a
|
||||
no-op). ``incremental=False`` runs a full ``VACUUM``: it rewrites the
|
||||
whole file (needs ~2× disk space, blocks readers) — use only in a
|
||||
maintenance window.
|
||||
"""
|
||||
if self._in_memory:
|
||||
logger.debug("vacuum() called in memory mode — no-op.")
|
||||
return
|
||||
if incremental:
|
||||
with self._lock:
|
||||
self._conn.execute(f"PRAGMA incremental_vacuum({pages})")
|
||||
self._conn.commit()
|
||||
logger.info(f"Incremental vacuum: reclaimed up to {pages} pages.")
|
||||
else:
|
||||
logger.info("Full VACUUM started — this may take several minutes.")
|
||||
with self._lock:
|
||||
self._conn.execute("VACUUM")
|
||||
logger.info("Full VACUUM complete.")
|
||||
|
||||
def close(self) -> None:
|
||||
self._backup_to_disk()
|
||||
self._closed = True
|
||||
|
||||
@@ -65,6 +65,7 @@ class CachingEngine:
|
||||
refresh_interval: int | None = None,
|
||||
fetch_batch: int | None = None,
|
||||
dialect: str | None = None,
|
||||
pragmas: dict[str, str | int] | None = None,
|
||||
blocking_startup_refresh: bool = False,
|
||||
) -> None:
|
||||
self._source_engine = source_engine
|
||||
@@ -79,6 +80,7 @@ class CachingEngine:
|
||||
in_memory=use_memory,
|
||||
dialect=self._dialect,
|
||||
fetch_batch=fetch_batch if fetch_batch is not None else FETCH_BATCH_SIZE,
|
||||
pragmas=pragmas,
|
||||
)
|
||||
self._registry = ColumnRegistry(self._cache.connection)
|
||||
self._stats = StatsCollector()
|
||||
@@ -267,6 +269,28 @@ class CachingEngine:
|
||||
self._cache.reset()
|
||||
logger.info("Cache reset — all tables will be reloaded on next use.")
|
||||
|
||||
def hard_reset(self) -> None:
|
||||
"""Delete the on-disk cache file and reopen with current pragmas/page_size.
|
||||
|
||||
Disk mode only (falls back to :meth:`reset` in memory mode). Use when a
|
||||
layout pragma — ``page_size`` or ``auto_vacuum`` — must change, since
|
||||
those are baked into the file at creation and :meth:`reset` keeps it.
|
||||
All tables reload on next use.
|
||||
"""
|
||||
self._cache.hard_reset()
|
||||
# hard_reset swaps the cache connection — re-point the registry at it.
|
||||
self._registry.rebind(self._cache.connection)
|
||||
logger.info("Cache hard reset — file recreated; all tables reload on next use.")
|
||||
|
||||
def vacuum(self, incremental: bool = True, pages: int = 10_000) -> None:
|
||||
"""Run maintenance VACUUM on the on-disk cache (incremental by default).
|
||||
|
||||
Incremental reclaims free pages left by delta ``INSERT OR REPLACE`` churn
|
||||
cheaply (requires ``auto_vacuum=INCREMENTAL``); a full VACUUM rewrites the
|
||||
whole file and should run only in a maintenance window.
|
||||
"""
|
||||
self._cache.vacuum(incremental=incremental, pages=pages)
|
||||
|
||||
def close(self) -> None:
|
||||
self._cache.close()
|
||||
logger.info("CachingEngine closed.")
|
||||
|
||||
@@ -12,6 +12,16 @@ class ColumnRegistry:
|
||||
self._lock = Lock()
|
||||
self._ensure_table()
|
||||
|
||||
def rebind(self, mem_conn: sqlite3.Connection) -> None:
|
||||
"""Point the registry at a new cache connection (after a hard reset).
|
||||
|
||||
``CacheManager.hard_reset`` closes and reopens the cache connection, so the
|
||||
connection object the registry captured at construction becomes invalid.
|
||||
"""
|
||||
with self._lock:
|
||||
self._conn = mem_conn
|
||||
self._ensure_table()
|
||||
|
||||
def _ensure_table(self) -> None:
|
||||
self._conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS _sqlmem_columns (
|
||||
|
||||
Reference in New Issue
Block a user