Fix frozen delta watermark and add error stats, lazy source, concurrent disk reads, and per-engine config

This commit is contained in:
Jan Doubravský
2026-06-08 19:35:33 +02:00
parent 209ae667ab
commit 6dc85e4f3c
17 changed files with 668 additions and 71 deletions
+128 -33
View File
@@ -10,7 +10,8 @@ from loguru import logger
import sqlmem._meta as _meta
from ._coerce import coerce_params, coerce_row
from .config import FETCH_BATCH_SIZE
from ._sql import quote, quote_list, quote_source
from .config import FETCH_BATCH_SIZE, SQL_DIALECT
from .stats import TableState
SCHEMA_VERSION = 3
@@ -22,17 +23,37 @@ class _Index:
columns: tuple[str, ...]
@dataclass(frozen=True)
class TableError:
"""Most recent load/refresh failure for a table (see ``CacheManager.get_errors``)."""
message: str
at: str
consecutive: int
class CacheManager:
def __init__(
self, db_path: Path, backup_interval: int, in_memory: bool = True
self,
db_path: Path,
backup_interval: int,
in_memory: bool = True,
dialect: str = SQL_DIALECT,
fetch_batch: int = FETCH_BATCH_SIZE,
) -> None:
self._db_path = db_path
self._backup_interval = backup_interval
self._in_memory = in_memory
self._dialect = dialect # source-DB dialect, for identifier quoting
self._fetch_batch = fetch_batch # rows fetched per source batch
self._lock = threading.Lock() # serializes connection access
self._load_lock = threading.Lock() # serializes full table loads
self._states: dict[str, str] = {} # table → live processing state
self._errors: dict[str, TableError] = {} # table → last load/refresh failure
self._error_total = 0 # process-wide failure counter
self._index_defs: dict[str, list[_Index]] = {} # table → secondary indexes
self._read_local = threading.local() # per-thread read conn (disk mode)
self._read_conns: list[sqlite3.Connection] = [] # read conns, for cleanup
self._closed = False
if in_memory:
@@ -124,7 +145,7 @@ class CacheManager:
).fetchall()
]
for name in names:
self._conn.execute(f"DROP TABLE IF EXISTS {name}")
self._conn.execute(f"DROP TABLE IF EXISTS {quote(name)}")
self._conn.commit()
def _load_from_disk(self) -> None:
@@ -161,7 +182,7 @@ class CacheManager:
]
for name in orphans:
logger.warning(f"Dropping orphan staging table {name!r} from a previous interrupted load.")
self._conn.execute(f"DROP TABLE IF EXISTS {name}")
self._conn.execute(f"DROP TABLE IF EXISTS {quote(name)}")
if orphans:
self._conn.commit()
@@ -238,7 +259,9 @@ class CacheManager:
def discover_columns(self, table: str, source_conn: sqlite3.Connection) -> list[str]:
"""Return all column names of *table* from the source DB without fetching rows."""
logger.debug(f"Discovering columns of {table!r} from source DB")
cursor = source_conn.execute(f"SELECT * FROM {table} WHERE 1 = 0")
cursor = source_conn.execute(
f"SELECT * FROM {quote_source(table, self._dialect)} WHERE 1 = 0"
)
columns = [desc[0] for desc in cursor.description]
logger.debug(f"{table!r} has columns: {columns}")
return columns
@@ -251,6 +274,28 @@ class CacheManager:
def clear_state(self, table: str) -> None:
self._states.pop(table, None)
self._errors.pop(table, None)
def record_error(self, table: str, message: str) -> None:
"""Record a load/refresh failure for *table* (increments its failure streak)."""
prev = self._errors.get(table)
streak = (prev.consecutive if prev else 0) + 1
self._errors[table] = TableError(message=message, at=_now(), consecutive=streak)
self._error_total += 1
logger.debug(f"Recorded error for {table!r} (streak {streak}): {message}")
def record_success(self, table: str) -> None:
"""Reset *table*'s failure streak to 0 after a successful load/refresh."""
prev = self._errors.get(table)
if prev and prev.consecutive:
self._errors[table] = TableError(prev.message, prev.at, 0)
def get_errors(self) -> dict[str, TableError]:
return dict(self._errors)
@property
def error_total(self) -> int:
return self._error_total
def add_index(self, table: str, columns: list[str]) -> None:
"""Register a secondary index to (re)create on *columns* after each load."""
@@ -268,10 +313,10 @@ class CacheManager:
f"Skipping index {idx.name!r}: columns {idx.columns} not all cached."
)
continue
cols = ", ".join(idx.columns)
cols = quote_list(idx.columns)
with self._lock:
self._conn.execute(
f"CREATE INDEX IF NOT EXISTS {idx.name} ON {table} ({cols})"
f"CREATE INDEX IF NOT EXISTS {quote(idx.name)} ON {quote(table)} ({cols})"
)
self._conn.commit()
logger.debug(f"Index {idx.name!r} ready on {table} ({cols})")
@@ -291,25 +336,29 @@ class CacheManager:
until the swap. Concurrent loads are serialized by ``_load_lock``; the
connection lock is only held for the brief per-batch inserts and the swap.
"""
cols = ", ".join(columns)
col_defs = ", ".join(f"{c} TEXT" for c in columns)
src_cols = ", ".join(quote_source(c, self._dialect) for c in columns)
col_defs = ", ".join(f"{quote(c)} TEXT" for c in columns)
placeholders = ", ".join("?" * len(columns))
staging = f"{table}__sqlmem_load"
q_staging = quote(staging)
q_table = quote(table)
with self._load_lock:
self.set_state(table, TableState.LOADING)
logger.info(f"Fetching {table!r} columns [{cols}] from source DB (batch={FETCH_BATCH_SIZE})")
logger.info(f"Fetching {table!r} columns {columns} from source DB (batch={self._fetch_batch})")
try:
cursor = source_conn.execute(f"SELECT {cols} FROM {table}")
cursor = source_conn.execute(
f"SELECT {src_cols} FROM {quote_source(table, self._dialect)}"
)
with self._lock:
self._conn.execute(f"DROP TABLE IF EXISTS {staging}")
self._conn.execute(f"CREATE TABLE {staging} ({col_defs})")
self._conn.execute(f"DROP TABLE IF EXISTS {q_staging}")
self._conn.execute(f"CREATE TABLE {q_staging} ({col_defs})")
self._conn.commit()
total = 0
insert_sql = f"INSERT INTO {staging} VALUES ({placeholders})"
insert_sql = f"INSERT INTO {q_staging} VALUES ({placeholders})"
while True:
batch = cursor.fetchmany(FETCH_BATCH_SIZE) # network outside _lock
batch = cursor.fetchmany(self._fetch_batch) # network outside _lock
if not batch:
break
clean = [coerce_row(row) for row in batch]
@@ -319,46 +368,83 @@ class CacheManager:
total += len(batch)
with self._lock: # atomic swap — readers see old or new, never partial
self._conn.execute(f"DROP TABLE IF EXISTS {table}")
self._conn.execute(f"ALTER TABLE {staging} RENAME TO {table}")
self._conn.execute(f"DROP TABLE IF EXISTS {q_table}")
self._conn.execute(f"ALTER TABLE {q_staging} RENAME TO {q_table}")
self._conn.commit()
except BaseException:
except BaseException as exc:
with self._lock:
self._conn.execute(f"DROP TABLE IF EXISTS {staging}")
self._conn.execute(f"DROP TABLE IF EXISTS {q_staging}")
self._conn.commit()
self.set_state(table, TableState.ERROR)
self.record_error(table, f"{type(exc).__name__}: {exc}")
raise
self._create_indexes(table, columns)
self.mark_table_refreshed(table, total, full)
self.set_state(table, TableState.READY)
self.record_success(table)
logger.info(f"Table {table!r} cached ({total} rows, columns: {columns})")
def _read_conn(self) -> sqlite3.Connection:
"""A per-thread, read-only connection used for cache reads in disk mode.
Disk mode runs in WAL, which allows many concurrent readers alongside one
writer. Giving each thread its own read connection (rather than sharing the
single write connection under ``_lock``) means a slow ``SELECT`` no longer
blocks writers (loads/upserts) or other readers. In-memory mode can't do
this — each ``:memory:`` connection is a separate database — so it keeps
using the single locked connection.
"""
conn = getattr(self._read_local, "conn", None)
if conn is None:
conn = sqlite3.connect(str(self._db_path), check_same_thread=False)
conn.execute("PRAGMA query_only=ON") # read-only guard
self._read_local.conn = conn
with self._lock:
self._read_conns.append(conn)
return conn
def execute_in_memory(
self, sql: str, params: tuple | list | dict | None = None
) -> tuple[list[str], list[tuple]]:
"""Run a read query against the in-memory cache, serialized with writers."""
"""Run a read query against the cache.
In-memory mode serializes with writers on the single connection. Disk mode
reads from a per-thread WAL connection, so reads run concurrently with
writers and each other (see :meth:`_read_conn`).
"""
bound = coerce_params(params)
with self._lock:
cursor = self._conn.execute(sql) if bound is None else self._conn.execute(sql, bound)
col_names = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
if self._in_memory:
with self._lock:
cursor = (
self._conn.execute(sql)
if bound is None
else self._conn.execute(sql, bound)
)
col_names = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
return col_names, rows
conn = self._read_conn()
cursor = conn.execute(sql) if bound is None else conn.execute(sql, bound)
col_names = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
return col_names, rows
# --- delta refresh support ---------------------------------------------
def get_table_columns(self, table: str) -> list[str]:
"""Authoritative ordered column list of a cached table (via PRAGMA)."""
rows = self._conn.execute(f"PRAGMA table_info({table})").fetchall()
rows = self._conn.execute(f"PRAGMA table_info({quote(table)})").fetchall()
return [r[1] for r in rows]
def create_unique_index(self, table: str, key_columns: list[str]) -> None:
"""Create the unique index on *key_columns* that makes upsert-by-key work."""
cols = ", ".join(key_columns)
index = f"idx_{table}_pk"
cols = quote_list(key_columns)
index = quote(f"idx_{table}_pk")
with self._lock:
self._conn.execute(
f"CREATE UNIQUE INDEX IF NOT EXISTS {index} ON {table} ({cols})"
f"CREATE UNIQUE INDEX IF NOT EXISTS {index} ON {quote(table)} ({cols})"
)
self._conn.commit()
@@ -378,23 +464,25 @@ class CacheManager:
def max_value(self, table: str, column: str) -> str | None:
"""Maximum value of *column* across cached rows (the delta watermark)."""
row = self._conn.execute(f"SELECT MAX({column}) FROM {table}").fetchone()
row = self._conn.execute(
f"SELECT MAX({quote(column)}) FROM {quote(table)}"
).fetchone()
return row[0] if row else None
def upsert_rows(self, table: str, columns: list[str], rows: list[tuple]) -> None:
"""Insert-or-replace one batch of *rows* by the table's unique key."""
col_list = ", ".join(columns)
col_list = quote_list(columns)
placeholders = ", ".join("?" * len(columns))
clean_rows = [coerce_row(row) for row in rows]
with self._lock:
self._conn.executemany(
f"INSERT OR REPLACE INTO {table} ({col_list}) VALUES ({placeholders})",
f"INSERT OR REPLACE INTO {quote(table)} ({col_list}) VALUES ({placeholders})",
clean_rows,
)
self._conn.commit()
def count_rows(self, table: str) -> int:
row = self._conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()
row = self._conn.execute(f"SELECT COUNT(*) FROM {quote(table)}").fetchone()
return int(row[0]) if row else 0
def reset(self) -> None:
@@ -411,7 +499,7 @@ class CacheManager:
).fetchall()
]
for name in user_tables:
self._conn.execute(f"DROP TABLE IF EXISTS {name}")
self._conn.execute(f"DROP TABLE IF EXISTS {quote(name)}")
self._conn.execute("DELETE FROM _sqlmem_tables")
self._conn.execute("DELETE FROM _sqlmem_columns")
self._conn.commit()
@@ -434,6 +522,13 @@ class CacheManager:
def close(self) -> None:
self._backup_to_disk()
self._closed = True
with self._lock:
for conn in self._read_conns:
try:
conn.close()
except sqlite3.Error:
pass
self._read_conns.clear()
self._conn.close()