Add per-table TTL refresh for tables without a change column
This commit is contained in:
@@ -143,6 +143,16 @@ class CacheManager:
|
||||
).fetchone()
|
||||
return bool(row and row[0])
|
||||
|
||||
def seconds_since_refresh(self, table: str) -> float | None:
|
||||
"""Age of a cached table in seconds, or None if it is not cached."""
|
||||
row = self._mem_conn.execute(
|
||||
"SELECT last_refresh_at FROM _sqlmem_tables WHERE table_name = ?", (table,)
|
||||
).fetchone()
|
||||
if not row or not row[0]:
|
||||
return None
|
||||
last = datetime.fromisoformat(row[0])
|
||||
return (datetime.now(timezone.utc) - last).total_seconds()
|
||||
|
||||
def discover_columns(self, table: str, source_conn: sqlite3.Connection) -> list[str]:
|
||||
"""Return all column names of *table* from the source DB without fetching rows."""
|
||||
logger.debug(f"Discovering columns of {table!r} from source DB")
|
||||
|
||||
+30
-3
@@ -22,6 +22,7 @@ class CachingEngine:
|
||||
self,
|
||||
source_engine: Engine,
|
||||
delta: dict[str, DeltaConfig] | None = None,
|
||||
ttl: dict[str, int] | None = None,
|
||||
) -> None:
|
||||
self._source_engine = source_engine
|
||||
self._cache = CacheManager(CACHE_DB_PATH, BACKUP_INTERVAL_SECONDS)
|
||||
@@ -29,9 +30,18 @@ class CachingEngine:
|
||||
self._stats = StatsCollector()
|
||||
self._refresh_interval = REFRESH_INTERVAL_SECONDS
|
||||
self._delta = self._resolve_delta(delta or {})
|
||||
self._ttl = dict(ttl or {})
|
||||
self._refresher = DeltaRefresher(self._cache, self._delta)
|
||||
|
||||
if self._delta:
|
||||
overlap = set(self._delta) & set(self._ttl)
|
||||
if overlap:
|
||||
raise ValueError(
|
||||
f"Tables {sorted(overlap)} are in both delta and ttl — a table is "
|
||||
"either delta-refreshed (has a change column) or TTL-refreshed (full "
|
||||
"reload), not both."
|
||||
)
|
||||
|
||||
if self._delta or self._ttl:
|
||||
self._run_refresh() # catch up tables restored from disk
|
||||
self._start_refresh_thread()
|
||||
|
||||
@@ -66,7 +76,7 @@ class CachingEngine:
|
||||
with self._source_engine.connect() as sa_conn:
|
||||
raw_conn = cast(sqlite3.Connection, sa_conn.connection.dbapi_connection)
|
||||
executor = QueryExecutor(
|
||||
self._cache, self._registry, raw_conn, self._stats, self._delta
|
||||
self._cache, self._registry, raw_conn, self._stats, self._delta, self._ttl
|
||||
)
|
||||
return executor.execute(parsed)
|
||||
|
||||
@@ -79,8 +89,25 @@ class CachingEngine:
|
||||
with self._source_engine.connect() as sa_conn:
|
||||
raw_conn = cast(sqlite3.Connection, sa_conn.connection.dbapi_connection)
|
||||
self._refresher.refresh(raw_conn)
|
||||
self._refresh_ttl(raw_conn)
|
||||
except Exception as e:
|
||||
logger.error(f"Delta refresh cycle failed: {e}")
|
||||
logger.error(f"Refresh cycle failed: {e}")
|
||||
|
||||
def _refresh_ttl(self, source_conn: sqlite3.Connection) -> None:
|
||||
"""Proactively full-reload TTL-tracked tables whose cache has expired."""
|
||||
for table, ttl in self._ttl.items():
|
||||
if not self._cache.is_table_cached(table):
|
||||
continue
|
||||
age = self._cache.seconds_since_refresh(table)
|
||||
if age is None or age <= ttl:
|
||||
continue
|
||||
try:
|
||||
columns = self._cache.get_table_columns(table)
|
||||
full = self._cache.is_table_full(table)
|
||||
self._cache.load_table(table, columns, source_conn, full=full)
|
||||
logger.info(f"TTL refresh {table!r}: reloaded (age {age:.0f}s > {ttl}s)")
|
||||
except Exception as e:
|
||||
logger.error(f"TTL refresh failed for {table!r}: {e}")
|
||||
|
||||
def _start_refresh_thread(self) -> None:
|
||||
def loop() -> None:
|
||||
|
||||
+28
-6
@@ -17,12 +17,22 @@ class QueryExecutor:
|
||||
source_conn: sqlite3.Connection,
|
||||
stats: StatsCollector,
|
||||
delta: dict[str, ResolvedDelta] | None = None,
|
||||
ttl: dict[str, int] | None = None,
|
||||
) -> None:
|
||||
self._cache = cache
|
||||
self._registry = registry
|
||||
self._source_conn = source_conn
|
||||
self._stats = stats
|
||||
self._delta = delta or {}
|
||||
self._ttl = ttl or {}
|
||||
|
||||
def _ttl_expired(self, table: str) -> bool:
|
||||
"""True if *table* has a TTL and its cached copy is older than that TTL."""
|
||||
ttl = self._ttl.get(table)
|
||||
if ttl is None:
|
||||
return False
|
||||
age = self._cache.seconds_since_refresh(table)
|
||||
return age is not None and age > ttl
|
||||
|
||||
def execute(self, parsed: ParsedQuery) -> list[dict]:
|
||||
for table in parsed.tables:
|
||||
@@ -37,12 +47,18 @@ class QueryExecutor:
|
||||
|
||||
def _ensure_full(self, table: str) -> None:
|
||||
"""Load every column of *table* (SELECT * / t.*), refetching unless already full."""
|
||||
if self._cache.is_table_cached(table) and self._cache.is_table_full(table):
|
||||
cached = self._cache.is_table_cached(table)
|
||||
stale = cached and self._ttl_expired(table)
|
||||
|
||||
if cached and self._cache.is_table_full(table) and not stale:
|
||||
logger.debug(f"Cache hit (full): {table!r}")
|
||||
self._stats.record_hit()
|
||||
return
|
||||
|
||||
if self._cache.is_table_cached(table):
|
||||
if cached and stale:
|
||||
logger.info(f"Cache expired (ttl) — reloading {table!r} in full.")
|
||||
self._stats.record_refetch()
|
||||
elif cached:
|
||||
logger.warning(f"Re-fetching {table!r} in full — SELECT * requested.")
|
||||
self._stats.record_refetch()
|
||||
else:
|
||||
@@ -52,16 +68,20 @@ class QueryExecutor:
|
||||
self._load(table, columns, full=True)
|
||||
|
||||
def _ensure_columns(self, table: str, columns: list[str]) -> None:
|
||||
"""Load *table* with at least *columns*, refetching only when columns are missing."""
|
||||
"""Load *table* with at least *columns*, refetching on new columns or TTL expiry."""
|
||||
missing = self._registry.needs_refetch(table, columns)
|
||||
table_cached = self._cache.is_table_cached(table)
|
||||
stale = table_cached and self._ttl_expired(table)
|
||||
|
||||
if not missing and table_cached:
|
||||
if table_cached and not missing and not stale:
|
||||
logger.debug(f"Cache hit: {table!r} columns={columns}")
|
||||
self._stats.record_hit()
|
||||
return
|
||||
|
||||
if table_cached and missing:
|
||||
if stale:
|
||||
logger.info(f"Cache expired (ttl) — reloading {table!r}.")
|
||||
self._stats.record_refetch()
|
||||
elif table_cached and missing:
|
||||
logger.warning(
|
||||
f"Re-fetching {table!r} — new columns requested: {missing}. "
|
||||
f"Expanding cache from {self._registry.get_columns(table)} + {missing}"
|
||||
@@ -71,7 +91,9 @@ class QueryExecutor:
|
||||
self._stats.record_miss()
|
||||
|
||||
all_columns = list(self._registry.get_columns(table)) + missing
|
||||
self._load(table, all_columns, full=False)
|
||||
# Preserve a fully-cached table's status across a TTL reload.
|
||||
full = table_cached and self._cache.is_table_full(table)
|
||||
self._load(table, all_columns, full=full)
|
||||
|
||||
def _load(self, table: str, columns: list[str], full: bool) -> None:
|
||||
"""Fetch *table* into cache, adding delta key/timestamp columns when tracked."""
|
||||
|
||||
Reference in New Issue
Block a user