From 8744f458cc5d4dc1eac9b39eb6cc706c5f448004 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Doubravsk=C3=BD?= Date: Tue, 9 Jun 2026 08:48:29 +0200 Subject: [PATCH] Split last_upsert (persisted write) and last_refresh (run liveness) in stats --- CHANGELOG.md | 14 ++++++++++++++ README.md | 11 ++++++++++- project.md | 1 + pyproject.toml | 2 +- src/sqlmem/cache.py | 19 ++++++++++++++++++- src/sqlmem/delta.py | 3 +++ src/sqlmem/engine.py | 16 ++++++++++++---- src/sqlmem/stats.py | 16 ++++++++++++---- tests/test_delta.py | 28 ++++++++++++++++++++++++++++ tests/test_stats.py | 9 +++++++++ 10 files changed, 108 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8d9061..4fcbb14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,20 @@ All notable changes to this project will be documented in this file. --- +## [1.10.0] - 2026-06-09 + +### Added +- **`last_upsert` (persisted write) vs `last_refresh` (run/liveness) in `stats`** — `TableStats.last_refresh` previously came from the persisted `last_refresh_at` column, which is only written when rows are actually written (a delta cycle with `total == 0` early-returns and leaves it unchanged). A healthy delta that keeps finding no new rows therefore *looked* frozen. The single value is now split: + - `last_upsert` — wall-clock (UTC) of the last actual data write (full load / delta with rows). Persisted, survives restarts (this is the existing `last_refresh_at` column, surfaced under a clearer name). + - `last_refresh` — wall-clock (UTC) of the last time a refresh cycle *ran* for the table, **even when it wrote nothing**. In-memory per process (`None` until the first cycle after start), tracked like `_states`/`_errors` — so **no schema change and no cache wipe**. + - `CacheManager` gained `mark_refresh_ran()` / `get_last_runs()`; an empty delta cycle now records a run. TTL staleness still uses the last *write* (`seconds_since_refresh` reads `last_refresh_at`), so behaviour is unchanged. + +### Changed +- `pyproject.toml` — bumped version to `1.10.0`. +- **`TableStats.last_refresh` is now `str | None`** (was `str`) and a new required `last_upsert: str | None` field is added. Consumers reading `last_refresh` for "when did data change?" should switch to `last_upsert`. + +--- + ## [1.8.0] - 2026-06-08 ### Fixed diff --git a/README.md b/README.md index 5cd85fa..edca0a6 100644 --- a/README.md +++ b/README.md @@ -280,13 +280,22 @@ Use `reset()` after a **structural change** in the source (columns added/removed stats = engine.stats # Stats snapshot print(stats.hits, stats.misses, stats.refetches, stats.errors) for name, t in stats.tables.items(): - print(name, t.rows, t.state, t.tracking, t.last_refresh) + print(name, t.rows, t.state, t.tracking, t.last_upsert, t.last_refresh) if t.consecutive_failures: print(f" {name} failing ×{t.consecutive_failures}: {t.last_error} ({t.last_error_at})") ``` `Stats.errors` is the total number of load/refresh failures since start. Each `TableStats` also carries `last_error`, `last_error_at` and `consecutive_failures` (reset to 0 on the next success) — so a delta that fails *before* streaming (which otherwise leaves `state` looking `ready`) is still visible, and the table is marked `error`. +Two timestamps distinguish *data freshness* from *liveness*: + +| field | meaning | +|---|---| +| `last_upsert` | wall-clock (UTC) of the last actual **data write** — full load or a delta cycle that wrote rows. Persisted, survives restarts. Answers *"when did the data last change?"* | +| `last_refresh` | wall-clock (UTC) of the last time a **refresh cycle ran** for the table — bumped **even when it wrote nothing**. In-memory per process (`None` until the first cycle runs after start). Answers *"is the refresh loop alive?"* | + +A delta table that runs every cycle but finds no new rows keeps `last_refresh` ticking while `last_upsert` stays put — that's healthy, not stuck. (Both are UTC ISO strings; the default log timestamps are local time, so expect an offset.) + Each `TableStats` reports a live processing **state** and how the table is kept fresh (**tracking**): | `state` | Meaning | diff --git a/project.md b/project.md index deb3611..35e56ce 100644 --- a/project.md +++ b/project.md @@ -209,6 +209,7 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o - [x] **Disk-backed cache**: `in_memory=False` (nebo `SQLMEM_IN_MEMORY=false`) — dotazy běží přímo nad on-disk `cache.db` (WAL), bez kopie v RAM; cache může přesáhnout paměť, zápisy se rovnou persistují. - V disk módu čtení běží přes **per-thread read-only WAL connection** → souběžné čtení neblokuje zápisy ani ostatní čtenáře. - [x] **Chyby refresh/load ve `stats`**: `TableStats.last_error` / `last_error_at` / `consecutive_failures` + `Stats.errors`. Delta, který selže před streamem, označí tabulku jako `error` (dřív zůstával `ready`). +- [x] **`last_upsert` vs `last_refresh`**: `last_upsert` = perzistovaný čas posledního zápisu dat (přežije restart); `last_refresh` = in-memory čas posledního běhu refresh cyklu (liveness — tiká i když cyklus nic nezapsal, `None` do prvního běhu). Prázdný delta cyklus posune `last_refresh`, ne `last_upsert`. - [x] **Per-engine konfigurace**: `CachingEngine(..., cache_db_path=, backup_interval=, refresh_interval=, fetch_batch=, dialect=)` — každý parametr defaultuje na env/config; dva enginy s vlastními cache soubory v jednom procesu. - [x] **Neblokující startup catch-up**: výchozí chování — startup catch-up (delta/TTL po restartu) běží na pozadí, neblokuje start aplikace. `blocking_startup_refresh=True` pro synchronní dohnání před servírováním. - [x] **Quoting identifikátorů**: názvy tabulek/sloupců se kvótují (SQLite `"x"` pro cache, dialekt zdroje — T-SQL `[x]` — pro source), takže rezervovaná slova i mezery fungují. diff --git a/pyproject.toml b/pyproject.toml index 36ce77b..1a9504b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sqlmem" -version = "1.8.0" +version = "1.10.0" description = "" authors = [ {name = "jan.doubravsky@gmail.com"} diff --git a/src/sqlmem/cache.py b/src/sqlmem/cache.py index c392828..71544aa 100644 --- a/src/sqlmem/cache.py +++ b/src/sqlmem/cache.py @@ -51,6 +51,7 @@ class CacheManager: self._states: dict[str, str] = {} # table → live processing state self._errors: dict[str, TableError] = {} # table → last load/refresh failure self._error_total = 0 # process-wide failure counter + self._last_run: dict[str, str] = {} # table → last refresh-cycle run (this process) self._index_defs: dict[str, list[_Index]] = {} # table → secondary indexes self._read_local = threading.local() # per-thread read conn (disk mode) self._read_conns: list[sqlite3.Connection] = [] # read conns, for cleanup @@ -219,6 +220,7 @@ class CacheManager: self._backup_to_disk() def mark_table_refreshed(self, table: str, row_count: int, full: bool = False) -> None: + ts = _now() with self._lock: self._conn.execute( """ @@ -229,9 +231,22 @@ class CacheManager: row_count = excluded.row_count, is_full = excluded.is_full """, - (table, _now(), row_count, int(full)), + (table, ts, row_count, int(full)), ) self._conn.commit() + self._last_run[table] = ts # a write is also a refresh-cycle run + + def mark_refresh_ran(self, table: str) -> None: + """Record that a refresh cycle ran for *table* now, even if it wrote nothing. + + In-memory only (like states/errors) — never persisted, never touches the + schema. This is the liveness signal surfaced as ``TableStats.last_refresh``, + distinct from the persisted last *write* time (``last_upsert``). + """ + self._last_run[table] = _now() + + def get_last_runs(self) -> dict[str, str]: + return dict(self._last_run) def is_table_cached(self, table: str) -> bool: row = self._conn.execute( @@ -275,6 +290,7 @@ class CacheManager: def clear_state(self, table: str) -> None: self._states.pop(table, None) self._errors.pop(table, None) + self._last_run.pop(table, None) def record_error(self, table: str, message: str) -> None: """Record a load/refresh failure for *table* (increments its failure streak).""" @@ -504,6 +520,7 @@ class CacheManager: self._conn.execute("DELETE FROM _sqlmem_columns") self._conn.commit() self._states.clear() + self._last_run.clear() if self._in_memory: try: if self._db_path.exists(): diff --git a/src/sqlmem/delta.py b/src/sqlmem/delta.py index 62cd3b7..5bf139e 100644 --- a/src/sqlmem/delta.py +++ b/src/sqlmem/delta.py @@ -111,6 +111,9 @@ class DeltaRefresher: self._cache.set_state(table, TableState.READY) if total == 0: + # The cycle ran but wrote nothing — record liveness (last_refresh) without + # touching the persisted last-write time (last_upsert). + self._cache.mark_refresh_ran(table) logger.debug(f"Delta refresh {table!r}: no changes since {watermark!r}") return diff --git a/src/sqlmem/engine.py b/src/sqlmem/engine.py index 68abe4d..fe5499a 100644 --- a/src/sqlmem/engine.py +++ b/src/sqlmem/engine.py @@ -145,17 +145,23 @@ class CachingEngine: def stats(self) -> Stats: states = self._cache.get_states() errors = self._cache.get_errors() + last_runs = self._cache.get_last_runs() with self._cache._lock: base = self._stats.snapshot(self._cache.connection, states) base = replace(base, errors=self._cache.error_total) return replace( - base, tables={n: self._enrich(n, t, errors) for n, t in base.tables.items()} + base, + tables={n: self._enrich(n, t, errors, last_runs) for n, t in base.tables.items()}, ) def _enrich( - self, name: str, table_stats: TableStats, errors: dict[str, TableError] + self, + name: str, + table_stats: TableStats, + errors: dict[str, TableError], + last_runs: dict[str, str], ) -> TableStats: - """Annotate a TableStats with refresh tracking, TTL staleness and errors.""" + """Annotate a TableStats with refresh tracking, TTL staleness, errors and run time.""" if name in self._delta: tracking = "delta" elif name in self._ttl: @@ -169,17 +175,19 @@ class CachingEngine: if age is not None and age > self._ttl[name]: state = TableState.STALE + last_refresh = last_runs.get(name) err = errors.get(name) if err is not None: return replace( table_stats, tracking=tracking, state=state, + last_refresh=last_refresh, last_error=err.message, last_error_at=err.at, consecutive_failures=err.consecutive, ) - return replace(table_stats, tracking=tracking, state=state) + return replace(table_stats, tracking=tracking, state=state, last_refresh=last_refresh) def execute(self, sql: str, params: Params = None) -> list[dict]: parsed = parse(sql, params, dialect=self._dialect) diff --git a/src/sqlmem/stats.py b/src/sqlmem/stats.py index 87e9164..121f3a4 100644 --- a/src/sqlmem/stats.py +++ b/src/sqlmem/stats.py @@ -17,7 +17,13 @@ class TableState: class TableStats: rows: int columns: list[str] - last_refresh: str + # Persisted wall-clock of the last actual data write (full load / delta with rows). + # Survives restarts. Answers "when did the data last change?". + last_upsert: str | None + # In-memory (this process) wall-clock of the last time a refresh cycle ran for the + # table — bumped even when the cycle wrote nothing. Liveness signal; ``None`` until + # the first cycle runs after start. Answers "is the refresh loop alive?". + last_refresh: str | None = None state: str = TableState.READY tracking: str = "static" # "delta" | "ttl" | "static" # Most recent load/refresh failure for this table, if any. ``consecutive_failures`` @@ -64,7 +70,7 @@ class StatsCollector: tables: dict[str, TableStats] = {} cached: set[str] = set() - for table_name, row_count, last_refresh in conn.execute( + for table_name, row_count, last_upsert in conn.execute( "SELECT table_name, row_count, last_refresh_at FROM _sqlmem_tables" ).fetchall(): cached.add(table_name) @@ -75,16 +81,18 @@ class StatsCollector: (table_name,), ).fetchall() ] + # last_refresh (run/liveness) is filled in by the engine from the + # in-memory last-run map; only the persisted write time is read here. tables[table_name] = TableStats( rows=row_count or 0, columns=columns, - last_refresh=last_refresh, + last_upsert=last_upsert, state=states.get(table_name, TableState.READY), ) # Surface tables that are mid-first-load (not yet in _sqlmem_tables) or failed. for name, state in states.items(): if name not in cached and state in (TableState.LOADING, TableState.ERROR): - tables[name] = TableStats(rows=0, columns=[], last_refresh="", state=state) + tables[name] = TableStats(rows=0, columns=[], last_upsert=None, state=state) return Stats(hits=hits, misses=misses, refetches=refetches, tables=tables) diff --git a/tests/test_delta.py b/tests/test_delta.py index 1c764c1..f8fcf6d 100644 --- a/tests/test_delta.py +++ b/tests/test_delta.py @@ -202,6 +202,34 @@ def test_delta_success_resets_failure_streak(env): assert env.cache.get_errors()["products"].consecutive == 0 +# --------------------------------------------------------------------------- +# last_upsert (persisted write) vs last_refresh (in-memory run/liveness) +# --------------------------------------------------------------------------- + +def _persisted_last_upsert(cache, table): + row = cache.connection.execute( + "SELECT last_refresh_at FROM _sqlmem_tables WHERE table_name = ?", (table,) + ).fetchone() + return row[0] if row else None + + +def test_empty_delta_records_run_but_not_write(env): + """An empty delta cycle bumps last_refresh (liveness) but not the persisted + last_upsert (write time).""" + before = _persisted_last_upsert(env.cache, "products") + # Push the watermark past every source row so the next cycle returns 0 rows. + env.cache.set_last_synced_at("products", "2099-01-01 00:00:00") + + env.refresher.refresh(env.source) + + # No rows written → persisted write time unchanged. + assert _persisted_last_upsert(env.cache, "products") == before + # But the cycle ran → in-memory run time recorded (and at/after the last write). + runs = env.cache.get_last_runs() + assert runs["products"] is not None + assert runs["products"] >= before + + # --------------------------------------------------------------------------- # Engine-level: PK auto-discovery, reset, end-to-end refresh # --------------------------------------------------------------------------- diff --git a/tests/test_stats.py b/tests/test_stats.py index b289a7f..31ebe76 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -96,6 +96,15 @@ def test_stats_no_error_by_default(source_engine, patched_cache): engine.close() +def test_stats_exposes_last_upsert_and_last_refresh(source_engine, patched_cache): + engine = CachingEngine(source_engine) + engine.execute("SELECT id, name FROM products") + s = engine.stats.tables["products"] + assert s.last_upsert is not None # the load wrote rows (persisted) + assert s.last_refresh is not None # the load also counts as a refresh-cycle run + engine.close() + + # --- a table being loaded for the first time shows up as "loading" ----------