diff --git a/CHANGELOG.md b/CHANGELOG.md index eaa6e35..d4c3d0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,17 @@ All notable changes to this project will be documented in this file. --- +## [1.15.0] - 2026-06-11 + +### Fixed +- **Cache stampede (thundering herd) on cold loads** — the decision to load a table was made *before* the load lock was taken, and `load_table` never re-checked after acquiring it. During a slow cold load of a large table (observed: 212M rows, ~2 h), a second query for the same table passed the pre-lock "not cached" check, queued on the load lock, and then ran a **redundant second full reload** instead of seeing the first had finished — doubling a multi-hour load. `load_table` now does **double-checked locking**: after acquiring the load lock it re-evaluates a caller-supplied predicate (table cached, all needed columns present, not TTL-expired) and skips the load when it is already satisfied. Invisible on small tables; on large ones it removes hours of redundant indexing under concurrent cold-start traffic. + +### Changed +- `pyproject.toml` — bumped version to `1.15.0`. +- `CacheManager.load_table` gained an optional `recheck` callback (the double-check predicate); `QueryExecutor` supplies it for both column and `SELECT *` loads. + +--- + ## [1.14.0] - 2026-06-10 Follow-up to 1.12.0 from running `datetime_columns` in production: the feature was only half-wired (writes were coerced, reads and query params were not). diff --git a/README.md b/README.md index b236ac2..0319fe9 100644 --- a/README.md +++ b/README.md @@ -370,6 +370,7 @@ By default the cache is **in-memory SQLite**, so a cached table lives in RAM — - **Use [disk-backed mode](#disk-backed-cache-no-ram-copy)** (`in_memory=False`) when the working set simply doesn't fit in RAM — queries then run against `cache.db` on disk instead of a memory copy. - **Loads are streamed in batches** (`SQLMEM_FETCH_BATCH` rows at a time, default 10 000) into a staging table and swapped in atomically. A multi-million-row table never gets fully materialized in Python at once, so the load doesn't spike memory or crash the process, and readers keep seeing the previous copy until the swap completes. - Use **[delta refresh](#incremental-delta-refresh)** for large tables that have a change column — after the first load only changed rows are pulled, so restarts and refreshes don't re-read the whole table. +- **Concurrent queries during a cold load are deduplicated** — while one query is loading a large table, others for the same table wait and then read the freshly loaded cache rather than kicking off their own redundant reload (double-checked locking), so a slow cold start isn't multiplied by concurrent traffic. - A **single query that returns a huge result set** (e.g. `SELECT *` over a multi-million-row cached table) still materializes that result as a list of dicts; bound it with a `WHERE`/`LIMIT` rather than selecting everything. ## Configuration diff --git a/project.md b/project.md index 7f98a4b..a7d5fc2 100644 --- a/project.md +++ b/project.md @@ -224,10 +224,11 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o - Veřejný helper `from sqlmem import datetime_to_epoch_us` pro konstrukci parametrů bez duplicitní logiky. - [x] **`vacuum(incremental=True)` varuje bez `auto_vacuum=INCREMENTAL`**: dřív tichý no-op; teď zaloguje warning (a jak to opravit) a vrátí se. - [x] **`Stats.db_size_bytes`**: velikost cache souboru na disku (0 v memory módu) ve `stats` pro monitoring. +- [x] **Ochrana proti cache stampede**: `load_table` dělá double-checked locking — po získání `_load_lock` znovu ověří, zda tabulku mezitím nenahrál souběžný loader (cached + sloupce + ne-stale), a redundantní reload přeskočí. Bez toho druhý dotaz během studeného loadu velké tabulky spustil druhý plný reload (212M řádků = +2 h). ## TODO — budoucí funkce -- _(zatím žádné otevřené položky)_ +- [ ] **Deklarativní inicializace (`tables=[TableSpec(...)]`)** — předem deklarovat sloupce/indexy/strategii každé tabulky, background preload, fail-fast na nedeklarované sloupce. Řeší líznou expanzi sloupců (druhý reload) a dělá z `execute()` čistě read-only. Plán pro 1.16.0. --- diff --git a/pyproject.toml b/pyproject.toml index f2d007b..598a056 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sqlmem" -version = "1.14.0" +version = "1.15.0" description = "" authors = [ {name = "jan.doubravsky@gmail.com"} diff --git a/src/sqlmem/cache.py b/src/sqlmem/cache.py index 366cdf0..d2b26c8 100644 --- a/src/sqlmem/cache.py +++ b/src/sqlmem/cache.py @@ -2,6 +2,7 @@ import atexit import signal import sqlite3 import threading +from collections.abc import Callable from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path @@ -425,6 +426,7 @@ class CacheManager: columns: list[str], source_conn: sqlite3.Connection, full: bool = False, + recheck: Callable[[], bool] | None = None, ) -> None: """Stream the source table into the cache in batches. @@ -433,6 +435,13 @@ class CacheManager: ``fetchall`` of a huge table) and readers keep seeing the previous copy until the swap. Concurrent loads are serialized by ``_load_lock``; the connection lock is only held for the brief per-batch inserts and the swap. + + *recheck* implements double-checked locking against a cache stampede: the + decision to load is made by the caller *before* ``_load_lock`` is held, so + on a slow cold load a second request for the same table can queue behind + the lock and then redundantly reload it. If given, ``recheck()`` is + re-evaluated *after* the lock is acquired; when it returns ``True`` the + table is already loaded and fresh, so the load is skipped. """ src_cols = ", ".join(quote_source(c, self._dialect) for c in columns) dt_cols = set(self._datetime_columns.get(table, ())) @@ -446,6 +455,12 @@ class CacheManager: q_table = quote(table) with self._load_lock: + if recheck is not None and recheck(): + logger.info( + f"Skipping load of {table!r}: a concurrent loader already " + "satisfied it (double-checked lock)." + ) + return self.set_state(table, TableState.LOADING) logger.info(f"Fetching {table!r} columns {columns} from source DB (batch={self._fetch_batch})") try: diff --git a/src/sqlmem/executor.py b/src/sqlmem/executor.py index 2bde0d7..872455c 100644 --- a/src/sqlmem/executor.py +++ b/src/sqlmem/executor.py @@ -1,3 +1,4 @@ +from collections.abc import Callable from typing import Any from loguru import logger @@ -47,6 +48,20 @@ class QueryExecutor: else: self._ensure_columns(table, parsed.columns_by_table[table]) + def _full_satisfied(self, table: str) -> bool: + """True if *table* is cached in full and not TTL-expired (a SELECT * hit).""" + return ( + self._cache.is_table_cached(table) + and self._cache.is_table_full(table) + and not self._ttl_expired(table) + ) + + def _columns_satisfied(self, table: str, columns: list[str]) -> bool: + """True if *table* is cached with all *columns* present and not TTL-expired.""" + if not self._cache.is_table_cached(table) or self._ttl_expired(table): + return False + return set(columns).issubset(self._cache.get_table_columns(table)) + def _ensure_full(self, table: str) -> None: """Load every column of *table* (SELECT * / t.*), refetching unless already full.""" cached = self._cache.is_table_cached(table) @@ -67,7 +82,7 @@ class QueryExecutor: self._stats.record_miss() columns = self._cache.discover_columns(table, self._source_conn) - self._load(table, columns, full=True) + self._load(table, columns, full=True, satisfied=lambda cols: self._full_satisfied(table)) def _ensure_columns(self, table: str, columns: list[str]) -> None: """Load *table* with at least *columns*, refetching on new columns or TTL expiry.""" @@ -95,10 +110,27 @@ class QueryExecutor: all_columns = list(self._registry.get_columns(table)) + missing # Preserve a fully-cached table's status across a TTL reload. full = table_cached and self._cache.is_table_full(table) - self._load(table, all_columns, full=full) + self._load( + table, + all_columns, + full=full, + satisfied=lambda cols: self._columns_satisfied(table, cols), + ) - def _load(self, table: str, columns: list[str], full: bool) -> None: - """Fetch *table* into cache, adding delta key/timestamp and index columns.""" + def _load( + self, + table: str, + columns: list[str], + full: bool, + satisfied: Callable[[list[str]], bool] | None = None, + ) -> None: + """Fetch *table* into cache, adding delta key/timestamp and index columns. + + *satisfied* is the double-checked-locking predicate evaluated under the + load lock (see :meth:`CacheManager.load_table`); it is given the final, + augmented column list so a concurrent loader that already produced an + equivalent (or wider) cache is detected and the redundant reload skipped. + """ cfg = self._delta.get(table) extra = list(self._index_columns.get(table, [])) if cfg: @@ -108,7 +140,11 @@ class QueryExecutor: if extra: columns = list(dict.fromkeys([*columns, *extra])) - self._cache.load_table(table, columns, self._source_conn, full=full) + recheck: Callable[[], bool] | None = None + if satisfied is not None: + final_columns = columns + recheck = lambda: satisfied(final_columns) # noqa: E731 + self._cache.load_table(table, columns, self._source_conn, full=full, recheck=recheck) self._registry.update(table, columns) if cfg: diff --git a/tests/test_cache.py b/tests/test_cache.py index 0101650..97bee0f 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -290,6 +290,74 @@ def test_vacuum_in_memory_is_noop(cache, source_conn): assert cache.is_table_cached("users") is True +# --------------------------------------------------------------------------- +# Double-checked locking against cache stampede (1.15.0) +# --------------------------------------------------------------------------- + +class _ExplodingSource: + def execute(self, *args): + raise AssertionError("source must not be queried when recheck() is True") + + +def test_load_table_recheck_true_skips_load(cache, source_conn): + """A recheck that reports the table already satisfied skips the reload.""" + cache.load_table("users", ["name"], source_conn) + # Second load with recheck() → True must not touch the source at all. + cache.load_table("users", ["name"], _ExplodingSource(), recheck=lambda: True) + assert cache.is_table_cached("users") is True + + +def test_concurrent_loads_dedup_via_double_checked_lock(tmp_path): + """A second loader queued behind a slow cold load must not reload the table.""" + import time + + c = CacheManager(db_path=tmp_path / "c.db", backup_interval=9999) + started = threading.Event() + release = threading.Event() + loads: list[str] = [] + + class _GatedCursor: + def __init__(self, rows): + self._rows = list(rows) + self._done = False + + def fetchmany(self, n): + if self._done: + return [] + self._done = True + return self._rows + + class _GatedSource: + def execute(self, sql): + loads.append(sql) # one entry per *actual* source load + started.set() + release.wait(5) # hold the load open (and _load_lock) until released + return _GatedCursor([("alice",), ("bob",)]) + + def recheck() -> bool: + return c.is_table_cached("users") and "name" in c.get_table_columns("users") + + def load() -> None: + c.load_table("users", ["name"], _GatedSource(), recheck=recheck) + + a = threading.Thread(target=load) + b = threading.Thread(target=load) + a.start() + assert started.wait(5), "first load never started" # A holds _load_lock, mid-fetch + b.start() + time.sleep(0.2) # give B time to queue on _load_lock + release.set() # let A finish; B then re-checks and skips + a.join(5) + b.join(5) + assert not a.is_alive() and not b.is_alive() + + assert len(loads) == 1 # the redundant second load was skipped + assert c.is_table_cached("users") is True + _, rows = c.execute_in_memory("SELECT name FROM users ORDER BY name") + assert [r[0] for r in rows] == ["alice", "bob"] + c.close() + + def test_incremental_vacuum_warns_without_incremental_auto_vacuum(tmp_path, source_conn): """Incremental vacuum on a DB that isn't auto_vacuum=INCREMENTAL warns and skips.""" from loguru import logger