Split last_upsert (persisted write) and last_refresh (run liveness) in stats

This commit is contained in:
Jan Doubravský
2026-06-09 08:48:29 +02:00
parent 6dc85e4f3c
commit 8744f458cc
10 changed files with 108 additions and 11 deletions
+18 -1
View File
@@ -51,6 +51,7 @@ class CacheManager:
self._states: dict[str, str] = {} # table → live processing state
self._errors: dict[str, TableError] = {} # table → last load/refresh failure
self._error_total = 0 # process-wide failure counter
self._last_run: dict[str, str] = {} # table → last refresh-cycle run (this process)
self._index_defs: dict[str, list[_Index]] = {} # table → secondary indexes
self._read_local = threading.local() # per-thread read conn (disk mode)
self._read_conns: list[sqlite3.Connection] = [] # read conns, for cleanup
@@ -219,6 +220,7 @@ class CacheManager:
self._backup_to_disk()
def mark_table_refreshed(self, table: str, row_count: int, full: bool = False) -> None:
ts = _now()
with self._lock:
self._conn.execute(
"""
@@ -229,9 +231,22 @@ class CacheManager:
row_count = excluded.row_count,
is_full = excluded.is_full
""",
(table, _now(), row_count, int(full)),
(table, ts, row_count, int(full)),
)
self._conn.commit()
self._last_run[table] = ts # a write is also a refresh-cycle run
def mark_refresh_ran(self, table: str) -> None:
"""Record that a refresh cycle ran for *table* now, even if it wrote nothing.
In-memory only (like states/errors) — never persisted, never touches the
schema. This is the liveness signal surfaced as ``TableStats.last_refresh``,
distinct from the persisted last *write* time (``last_upsert``).
"""
self._last_run[table] = _now()
def get_last_runs(self) -> dict[str, str]:
return dict(self._last_run)
def is_table_cached(self, table: str) -> bool:
row = self._conn.execute(
@@ -275,6 +290,7 @@ class CacheManager:
def clear_state(self, table: str) -> None:
self._states.pop(table, None)
self._errors.pop(table, None)
self._last_run.pop(table, None)
def record_error(self, table: str, message: str) -> None:
"""Record a load/refresh failure for *table* (increments its failure streak)."""
@@ -504,6 +520,7 @@ class CacheManager:
self._conn.execute("DELETE FROM _sqlmem_columns")
self._conn.commit()
self._states.clear()
self._last_run.clear()
if self._in_memory:
try:
if self._db_path.exists():
+3
View File
@@ -111,6 +111,9 @@ class DeltaRefresher:
self._cache.set_state(table, TableState.READY)
if total == 0:
# The cycle ran but wrote nothing — record liveness (last_refresh) without
# touching the persisted last-write time (last_upsert).
self._cache.mark_refresh_ran(table)
logger.debug(f"Delta refresh {table!r}: no changes since {watermark!r}")
return
+12 -4
View File
@@ -145,17 +145,23 @@ class CachingEngine:
def stats(self) -> Stats:
states = self._cache.get_states()
errors = self._cache.get_errors()
last_runs = self._cache.get_last_runs()
with self._cache._lock:
base = self._stats.snapshot(self._cache.connection, states)
base = replace(base, errors=self._cache.error_total)
return replace(
base, tables={n: self._enrich(n, t, errors) for n, t in base.tables.items()}
base,
tables={n: self._enrich(n, t, errors, last_runs) for n, t in base.tables.items()},
)
def _enrich(
self, name: str, table_stats: TableStats, errors: dict[str, TableError]
self,
name: str,
table_stats: TableStats,
errors: dict[str, TableError],
last_runs: dict[str, str],
) -> TableStats:
"""Annotate a TableStats with refresh tracking, TTL staleness and errors."""
"""Annotate a TableStats with refresh tracking, TTL staleness, errors and run time."""
if name in self._delta:
tracking = "delta"
elif name in self._ttl:
@@ -169,17 +175,19 @@ class CachingEngine:
if age is not None and age > self._ttl[name]:
state = TableState.STALE
last_refresh = last_runs.get(name)
err = errors.get(name)
if err is not None:
return replace(
table_stats,
tracking=tracking,
state=state,
last_refresh=last_refresh,
last_error=err.message,
last_error_at=err.at,
consecutive_failures=err.consecutive,
)
return replace(table_stats, tracking=tracking, state=state)
return replace(table_stats, tracking=tracking, state=state, last_refresh=last_refresh)
def execute(self, sql: str, params: Params = None) -> list[dict]:
parsed = parse(sql, params, dialect=self._dialect)
+12 -4
View File
@@ -17,7 +17,13 @@ class TableState:
class TableStats:
rows: int
columns: list[str]
last_refresh: str
# Persisted wall-clock of the last actual data write (full load / delta with rows).
# Survives restarts. Answers "when did the data last change?".
last_upsert: str | None
# In-memory (this process) wall-clock of the last time a refresh cycle ran for the
# table — bumped even when the cycle wrote nothing. Liveness signal; ``None`` until
# the first cycle runs after start. Answers "is the refresh loop alive?".
last_refresh: str | None = None
state: str = TableState.READY
tracking: str = "static" # "delta" | "ttl" | "static"
# Most recent load/refresh failure for this table, if any. ``consecutive_failures``
@@ -64,7 +70,7 @@ class StatsCollector:
tables: dict[str, TableStats] = {}
cached: set[str] = set()
for table_name, row_count, last_refresh in conn.execute(
for table_name, row_count, last_upsert in conn.execute(
"SELECT table_name, row_count, last_refresh_at FROM _sqlmem_tables"
).fetchall():
cached.add(table_name)
@@ -75,16 +81,18 @@ class StatsCollector:
(table_name,),
).fetchall()
]
# last_refresh (run/liveness) is filled in by the engine from the
# in-memory last-run map; only the persisted write time is read here.
tables[table_name] = TableStats(
rows=row_count or 0,
columns=columns,
last_refresh=last_refresh,
last_upsert=last_upsert,
state=states.get(table_name, TableState.READY),
)
# Surface tables that are mid-first-load (not yet in _sqlmem_tables) or failed.
for name, state in states.items():
if name not in cached and state in (TableState.LOADING, TableState.ERROR):
tables[name] = TableStats(rows=0, columns=[], last_refresh="", state=state)
tables[name] = TableStats(rows=0, columns=[], last_upsert=None, state=state)
return Stats(hits=hits, misses=misses, refetches=refetches, tables=tables)