diff --git a/CHANGELOG.md b/CHANGELOG.md index 3af6aaa..a8d9061 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,28 @@ All notable changes to this project will be documented in this file. --- +## [1.8.0] - 2026-06-08 + +### Fixed +- **Frozen delta watermark on `datetime` change columns** — the delta high-watermark is read back from the cache as an ISO `TEXT` string (e.g. `'2026-06-05T14:54:24.823000'`) and was bound straight back to the source. SQL Server then had to implicitly convert that `nvarchar` to `datetime` and **failed** (`T`-separated ISO with 6 fractional digits exceeds `datetime`'s 3 — error 241 / SQLSTATE 22007), so every delta refresh and the startup catch-up died before streaming and the watermark never advanced (the cache silently froze at the last full load). The watermark is now parsed back to a real `datetime` (`delta._bind_watermark`) so the driver sends a typed timestamp and the comparison runs natively; non-datetime change columns (e.g. integer rowversions) pass through unchanged. Regression tests added. + +### Added +- **Refresh/load failures are now visible in `stats`** — `TableStats` gained `last_error`, `last_error_at` and `consecutive_failures`, and `Stats` gained a total `errors` counter. A delta that fails *before* streaming (e.g. the watermark bug above) previously left `state = ready`, hiding the problem; it now also marks the table `error` and records the message. `consecutive_failures` resets to 0 on the next success. +- **Per-engine configuration** — `CachingEngine` accepts `cache_db_path`, `backup_interval`, `refresh_interval`, `fetch_batch` and `dialect` (each defaults to its env var / config global when omitted), so two engines with independent cache files can run in one process and config is testable without env vars. +- **`blocking_startup_refresh` flag** (default `False`) — the startup catch-up (deltas/TTL reloads for tables restored from disk) now runs on the background thread by default, so it never blocks application startup. Pass `blocking_startup_refresh=True` to catch up synchronously before serving. + +### Changed +- **SQL identifiers are quoted** — table/column names are now quoted everywhere they are interpolated into statements (SQLite double-quote for the cache, the configured dialect — e.g. T-SQL `[brackets]` — for the source), so reserved words or names with spaces work and the f-string interpolation is hardened. +- **Source connection opened lazily** — `execute()` no longer opens a source connection on every call; a pure cache hit never touches the source (and never occupies a pool slot). The misleading `cast(sqlite3.Connection, …)` on the source handle was removed (it is a pyodbc connection in production). +- **Concurrent reads in disk mode** — disk-backed reads now use a per-thread read-only WAL connection instead of sharing the single write connection under a lock, so a slow `SELECT` no longer blocks writers (loads/upserts) or other readers. In-memory mode is unchanged (a `:memory:` database can't be shared across connections). +- **`add_sink` is idempotent** — calling it again for the same sink is a no-op, so a double import no longer duplicates every log line. +- `pyproject.toml` — bumped version to `1.8.0`; added a scoped pytest `filterwarnings` for the SQLite test source's legacy datetime-adapter deprecation. + +### Note +- Cache type fidelity (returning real `datetime`/`Decimal`/numeric types from `execute()` instead of `TEXT` strings, and giving numeric columns proper affinity) was evaluated but **deferred** — it changes the public output contract that consumers currently rely on (and that `test_coerce.py` pins). Decimal/datetime stay stored as exact, lossless `TEXT`. + +--- + ## [1.7.0] - 2026-06-08 ### Added diff --git a/README.md b/README.md index 4a827fa..5cd85fa 100644 --- a/README.md +++ b/README.md @@ -258,6 +258,7 @@ engine = CachingEngine(base_engine, in_memory=False) - The cache can **exceed available memory** — nothing is held in RAM beyond SQLite's page cache. - Every write **persists immediately** (WAL + `synchronous=NORMAL`), so there is no hourly backup thread, no load-into-memory step on startup, and no shutdown flush to lose. +- **Reads run concurrently** — each thread reads through its own read-only WAL connection, so a slow `SELECT` doesn't block writers (loads/upserts) or other readers. - On open, a cache file with a mismatched schema version is wiped in place and rebuilt; `engine.reset()` drops the cached tables and `VACUUM`s the file (it does not delete the open file). The constructor argument wins over the env var; when `in_memory` is omitted it falls back to `SQLMEM_IN_MEMORY`. @@ -277,11 +278,15 @@ Use `reset()` after a **structural change** in the source (columns added/removed ```python stats = engine.stats # Stats snapshot -print(stats.hits, stats.misses, stats.refetches) +print(stats.hits, stats.misses, stats.refetches, stats.errors) for name, t in stats.tables.items(): print(name, t.rows, t.state, t.tracking, t.last_refresh) + if t.consecutive_failures: + print(f" {name} failing ×{t.consecutive_failures}: {t.last_error} ({t.last_error_at})") ``` +`Stats.errors` is the total number of load/refresh failures since start. Each `TableStats` also carries `last_error`, `last_error_at` and `consecutive_failures` (reset to 0 on the next success) — so a delta that fails *before* streaming (which otherwise leaves `state` looking `ready`) is still visible, and the table is marked `error`. + Each `TableStats` reports a live processing **state** and how the table is kept fresh (**tracking**): | `state` | Meaning | @@ -321,6 +326,23 @@ Set via environment variables or a `.env` file: | `SQLMEM_REFRESH_INTERVAL` | `300` | background refresh tick (seconds) — delta pulls and proactive TTL reloads | | `SQLMEM_FETCH_BATCH` | `10000` | rows fetched per batch when loading a table — caps peak memory for huge tables | +Most of these can also be passed **per engine** to the constructor, overriding the env default — handy for running two engines (with separate cache files) in one process, and for tests: + +```python +engine = CachingEngine( + base_engine, + cache_db_path="orders_cache.db", # SQLMEM_CACHE_DB + in_memory=False, # SQLMEM_IN_MEMORY + backup_interval=3600, # SQLMEM_BACKUP_INTERVAL + refresh_interval=300, # SQLMEM_REFRESH_INTERVAL + fetch_batch=10000, # SQLMEM_FETCH_BATCH + dialect="tsql", # SQLMEM_SQL_DIALECT + blocking_startup_refresh=False, # block startup until caught up? (default: no) +) +``` + +By default the **startup catch-up** (delta pulls and TTL reloads for tables restored from disk) runs on the background thread so it never blocks application startup; the cache may serve slightly stale data until the first refresh completes. Set `blocking_startup_refresh=True` to catch up synchronously before the engine starts serving. + ## Exceptions | Exception | When raised | diff --git a/project.md b/project.md index 89e7b15..deb3611 100644 --- a/project.md +++ b/project.md @@ -207,6 +207,13 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o - [x] **Sekundární indexy**: `indexes={"VW_X": ["col", ["a","b"]]}` — indexy na in-memory cache pro zrychlení `WHERE`/`JOIN`; index-sloupce se auto-dotáhnou, indexy se obnoví po každém (re)loadu. - [x] **TTL na úrovni tabulky**: `ttl={"VW_X": 300}` — pro tabulky bez timestamp sloupce. Garantuje, že cache není starší než interval (full reload při čtení po expiraci + proaktivně na pozadí). - [x] **Disk-backed cache**: `in_memory=False` (nebo `SQLMEM_IN_MEMORY=false`) — dotazy běží přímo nad on-disk `cache.db` (WAL), bez kopie v RAM; cache může přesáhnout paměť, zápisy se rovnou persistují. + - V disk módu čtení běží přes **per-thread read-only WAL connection** → souběžné čtení neblokuje zápisy ani ostatní čtenáře. +- [x] **Chyby refresh/load ve `stats`**: `TableStats.last_error` / `last_error_at` / `consecutive_failures` + `Stats.errors`. Delta, který selže před streamem, označí tabulku jako `error` (dřív zůstával `ready`). +- [x] **Per-engine konfigurace**: `CachingEngine(..., cache_db_path=, backup_interval=, refresh_interval=, fetch_batch=, dialect=)` — každý parametr defaultuje na env/config; dva enginy s vlastními cache soubory v jednom procesu. +- [x] **Neblokující startup catch-up**: výchozí chování — startup catch-up (delta/TTL po restartu) běží na pozadí, neblokuje start aplikace. `blocking_startup_refresh=True` pro synchronní dohnání před servírováním. +- [x] **Quoting identifikátorů**: názvy tabulek/sloupců se kvótují (SQLite `"x"` pro cache, dialekt zdroje — T-SQL `[x]` — pro source), takže rezervovaná slova i mezery fungují. +- [x] **Lazy source connection**: `execute()` neotevírá spojení ke zdroji při cache hitu (neobsazuje pool slot). +- [x] **Idempotentní `add_sink`**: opakované volání pro stejný sink je no-op (žádné duplicitní logy). ## TODO — budoucí funkce diff --git a/pyproject.toml b/pyproject.toml index 4504770..36ce77b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sqlmem" -version = "1.7.0" +version = "1.8.0" description = "" authors = [ {name = "jan.doubravsky@gmail.com"} @@ -25,3 +25,11 @@ dev = [ "ruff (>=0.15.15,<0.16.0)", "mypy (>=2.1.0,<3.0.0)" ] + +[tool.pytest.ini_options] +filterwarnings = [ + # The SQLite test source binds the delta watermark as a real datetime via + # sqlite3's legacy adapter (deprecated in 3.12). Production sources are + # pyodbc, which binds datetimes natively, so this only affects the tests. + "ignore:The default datetime adapter is deprecated:DeprecationWarning", +] diff --git a/src/sqlmem/__init__.py b/src/sqlmem/__init__.py index bb52442..419b762 100644 --- a/src/sqlmem/__init__.py +++ b/src/sqlmem/__init__.py @@ -1,3 +1,4 @@ +from pathlib import Path from typing import Any from loguru import logger @@ -15,13 +16,25 @@ _DEFAULT_FORMAT = ( "{message}" ) +# Sinks already registered, keyed by a stable identity, so a repeated call (e.g. +# a double import) doesn't add a second handler and duplicate every log line. +_added_sinks: dict[object, int] = {} + + +def _sink_key(sink: Any) -> object: + """A stable identity for *sink* so the same destination isn't added twice.""" + if isinstance(sink, (str, Path)): + return ("path", str(Path(sink).resolve())) + return ("obj", id(sink)) + def add_sink(sink: Any, *, level: str | None = None, **kwargs: Any) -> None: - """Route sqlmem log records to *sink*. + """Route sqlmem log records to *sink* (idempotent). Accepts any sink supported by loguru (file path, stream, callable, …). *level* defaults to ``DEBUG`` when ``SQLMEM_DEBUG=true``, otherwise ``INFO``. - Extra keyword arguments are forwarded to :func:`loguru.logger.add`. + Extra keyword arguments are forwarded to :func:`loguru.logger.add`. Calling it + again for the same sink is a no-op, so a double import won't duplicate logs. Example:: @@ -31,9 +44,15 @@ def add_sink(sink: Any, *, level: str | None = None, **kwargs: Any) -> None: add_sink("sqlmem.log", rotation="10 MB") """ logger.enable("sqlmem") + key = _sink_key(sink) + if key in _added_sinks: + return kwargs.setdefault("format", _DEFAULT_FORMAT) kwargs.setdefault("colorize", True) - logger.add(sink, level=level or ("DEBUG" if DEBUG else "INFO"), filter="sqlmem", **kwargs) + handler_id = logger.add( + sink, level=level or ("DEBUG" if DEBUG else "INFO"), filter="sqlmem", **kwargs + ) + _added_sinks[key] = handler_id __all__ = [ diff --git a/src/sqlmem/_sql.py b/src/sqlmem/_sql.py new file mode 100644 index 0000000..791ba83 --- /dev/null +++ b/src/sqlmem/_sql.py @@ -0,0 +1,27 @@ +"""SQL identifier quoting. + +Table and column names are interpolated into statements as raw strings, so a +name with a space, a reserved word, or an embedded quote would break the query +(and is a latent injection vector). These helpers quote identifiers safely. The +in-memory cache is SQLite, so it uses double-quote style; the source DB is quoted +in its configured dialect (e.g. T-SQL ``[brackets]``). +""" + +from collections.abc import Iterable + +from sqlglot import exp + + +def quote(name: str) -> str: + """Quote an identifier for the in-memory SQLite cache.""" + return '"' + name.replace('"', '""') + '"' + + +def quote_list(names: Iterable[str]) -> str: + """Comma-join SQLite-quoted identifiers.""" + return ", ".join(quote(n) for n in names) + + +def quote_source(name: str, dialect: str) -> str: + """Quote an identifier for the source DB in its dialect (e.g. T-SQL ``[x]``).""" + return exp.to_identifier(name, quoted=True).sql(dialect=dialect) diff --git a/src/sqlmem/cache.py b/src/sqlmem/cache.py index e59d498..c392828 100644 --- a/src/sqlmem/cache.py +++ b/src/sqlmem/cache.py @@ -10,7 +10,8 @@ from loguru import logger import sqlmem._meta as _meta from ._coerce import coerce_params, coerce_row -from .config import FETCH_BATCH_SIZE +from ._sql import quote, quote_list, quote_source +from .config import FETCH_BATCH_SIZE, SQL_DIALECT from .stats import TableState SCHEMA_VERSION = 3 @@ -22,17 +23,37 @@ class _Index: columns: tuple[str, ...] +@dataclass(frozen=True) +class TableError: + """Most recent load/refresh failure for a table (see ``CacheManager.get_errors``).""" + + message: str + at: str + consecutive: int + + class CacheManager: def __init__( - self, db_path: Path, backup_interval: int, in_memory: bool = True + self, + db_path: Path, + backup_interval: int, + in_memory: bool = True, + dialect: str = SQL_DIALECT, + fetch_batch: int = FETCH_BATCH_SIZE, ) -> None: self._db_path = db_path self._backup_interval = backup_interval self._in_memory = in_memory + self._dialect = dialect # source-DB dialect, for identifier quoting + self._fetch_batch = fetch_batch # rows fetched per source batch self._lock = threading.Lock() # serializes connection access self._load_lock = threading.Lock() # serializes full table loads self._states: dict[str, str] = {} # table → live processing state + self._errors: dict[str, TableError] = {} # table → last load/refresh failure + self._error_total = 0 # process-wide failure counter self._index_defs: dict[str, list[_Index]] = {} # table → secondary indexes + self._read_local = threading.local() # per-thread read conn (disk mode) + self._read_conns: list[sqlite3.Connection] = [] # read conns, for cleanup self._closed = False if in_memory: @@ -124,7 +145,7 @@ class CacheManager: ).fetchall() ] for name in names: - self._conn.execute(f"DROP TABLE IF EXISTS {name}") + self._conn.execute(f"DROP TABLE IF EXISTS {quote(name)}") self._conn.commit() def _load_from_disk(self) -> None: @@ -161,7 +182,7 @@ class CacheManager: ] for name in orphans: logger.warning(f"Dropping orphan staging table {name!r} from a previous interrupted load.") - self._conn.execute(f"DROP TABLE IF EXISTS {name}") + self._conn.execute(f"DROP TABLE IF EXISTS {quote(name)}") if orphans: self._conn.commit() @@ -238,7 +259,9 @@ class CacheManager: def discover_columns(self, table: str, source_conn: sqlite3.Connection) -> list[str]: """Return all column names of *table* from the source DB without fetching rows.""" logger.debug(f"Discovering columns of {table!r} from source DB") - cursor = source_conn.execute(f"SELECT * FROM {table} WHERE 1 = 0") + cursor = source_conn.execute( + f"SELECT * FROM {quote_source(table, self._dialect)} WHERE 1 = 0" + ) columns = [desc[0] for desc in cursor.description] logger.debug(f"{table!r} has columns: {columns}") return columns @@ -251,6 +274,28 @@ class CacheManager: def clear_state(self, table: str) -> None: self._states.pop(table, None) + self._errors.pop(table, None) + + def record_error(self, table: str, message: str) -> None: + """Record a load/refresh failure for *table* (increments its failure streak).""" + prev = self._errors.get(table) + streak = (prev.consecutive if prev else 0) + 1 + self._errors[table] = TableError(message=message, at=_now(), consecutive=streak) + self._error_total += 1 + logger.debug(f"Recorded error for {table!r} (streak {streak}): {message}") + + def record_success(self, table: str) -> None: + """Reset *table*'s failure streak to 0 after a successful load/refresh.""" + prev = self._errors.get(table) + if prev and prev.consecutive: + self._errors[table] = TableError(prev.message, prev.at, 0) + + def get_errors(self) -> dict[str, TableError]: + return dict(self._errors) + + @property + def error_total(self) -> int: + return self._error_total def add_index(self, table: str, columns: list[str]) -> None: """Register a secondary index to (re)create on *columns* after each load.""" @@ -268,10 +313,10 @@ class CacheManager: f"Skipping index {idx.name!r}: columns {idx.columns} not all cached." ) continue - cols = ", ".join(idx.columns) + cols = quote_list(idx.columns) with self._lock: self._conn.execute( - f"CREATE INDEX IF NOT EXISTS {idx.name} ON {table} ({cols})" + f"CREATE INDEX IF NOT EXISTS {quote(idx.name)} ON {quote(table)} ({cols})" ) self._conn.commit() logger.debug(f"Index {idx.name!r} ready on {table} ({cols})") @@ -291,25 +336,29 @@ class CacheManager: until the swap. Concurrent loads are serialized by ``_load_lock``; the connection lock is only held for the brief per-batch inserts and the swap. """ - cols = ", ".join(columns) - col_defs = ", ".join(f"{c} TEXT" for c in columns) + src_cols = ", ".join(quote_source(c, self._dialect) for c in columns) + col_defs = ", ".join(f"{quote(c)} TEXT" for c in columns) placeholders = ", ".join("?" * len(columns)) staging = f"{table}__sqlmem_load" + q_staging = quote(staging) + q_table = quote(table) with self._load_lock: self.set_state(table, TableState.LOADING) - logger.info(f"Fetching {table!r} columns [{cols}] from source DB (batch={FETCH_BATCH_SIZE})") + logger.info(f"Fetching {table!r} columns {columns} from source DB (batch={self._fetch_batch})") try: - cursor = source_conn.execute(f"SELECT {cols} FROM {table}") + cursor = source_conn.execute( + f"SELECT {src_cols} FROM {quote_source(table, self._dialect)}" + ) with self._lock: - self._conn.execute(f"DROP TABLE IF EXISTS {staging}") - self._conn.execute(f"CREATE TABLE {staging} ({col_defs})") + self._conn.execute(f"DROP TABLE IF EXISTS {q_staging}") + self._conn.execute(f"CREATE TABLE {q_staging} ({col_defs})") self._conn.commit() total = 0 - insert_sql = f"INSERT INTO {staging} VALUES ({placeholders})" + insert_sql = f"INSERT INTO {q_staging} VALUES ({placeholders})" while True: - batch = cursor.fetchmany(FETCH_BATCH_SIZE) # network outside _lock + batch = cursor.fetchmany(self._fetch_batch) # network outside _lock if not batch: break clean = [coerce_row(row) for row in batch] @@ -319,46 +368,83 @@ class CacheManager: total += len(batch) with self._lock: # atomic swap — readers see old or new, never partial - self._conn.execute(f"DROP TABLE IF EXISTS {table}") - self._conn.execute(f"ALTER TABLE {staging} RENAME TO {table}") + self._conn.execute(f"DROP TABLE IF EXISTS {q_table}") + self._conn.execute(f"ALTER TABLE {q_staging} RENAME TO {q_table}") self._conn.commit() - except BaseException: + except BaseException as exc: with self._lock: - self._conn.execute(f"DROP TABLE IF EXISTS {staging}") + self._conn.execute(f"DROP TABLE IF EXISTS {q_staging}") self._conn.commit() self.set_state(table, TableState.ERROR) + self.record_error(table, f"{type(exc).__name__}: {exc}") raise self._create_indexes(table, columns) self.mark_table_refreshed(table, total, full) self.set_state(table, TableState.READY) + self.record_success(table) logger.info(f"Table {table!r} cached ({total} rows, columns: {columns})") + def _read_conn(self) -> sqlite3.Connection: + """A per-thread, read-only connection used for cache reads in disk mode. + + Disk mode runs in WAL, which allows many concurrent readers alongside one + writer. Giving each thread its own read connection (rather than sharing the + single write connection under ``_lock``) means a slow ``SELECT`` no longer + blocks writers (loads/upserts) or other readers. In-memory mode can't do + this — each ``:memory:`` connection is a separate database — so it keeps + using the single locked connection. + """ + conn = getattr(self._read_local, "conn", None) + if conn is None: + conn = sqlite3.connect(str(self._db_path), check_same_thread=False) + conn.execute("PRAGMA query_only=ON") # read-only guard + self._read_local.conn = conn + with self._lock: + self._read_conns.append(conn) + return conn + def execute_in_memory( self, sql: str, params: tuple | list | dict | None = None ) -> tuple[list[str], list[tuple]]: - """Run a read query against the in-memory cache, serialized with writers.""" + """Run a read query against the cache. + + In-memory mode serializes with writers on the single connection. Disk mode + reads from a per-thread WAL connection, so reads run concurrently with + writers and each other (see :meth:`_read_conn`). + """ bound = coerce_params(params) - with self._lock: - cursor = self._conn.execute(sql) if bound is None else self._conn.execute(sql, bound) - col_names = [desc[0] for desc in cursor.description] - rows = cursor.fetchall() + if self._in_memory: + with self._lock: + cursor = ( + self._conn.execute(sql) + if bound is None + else self._conn.execute(sql, bound) + ) + col_names = [desc[0] for desc in cursor.description] + rows = cursor.fetchall() + return col_names, rows + + conn = self._read_conn() + cursor = conn.execute(sql) if bound is None else conn.execute(sql, bound) + col_names = [desc[0] for desc in cursor.description] + rows = cursor.fetchall() return col_names, rows # --- delta refresh support --------------------------------------------- def get_table_columns(self, table: str) -> list[str]: """Authoritative ordered column list of a cached table (via PRAGMA).""" - rows = self._conn.execute(f"PRAGMA table_info({table})").fetchall() + rows = self._conn.execute(f"PRAGMA table_info({quote(table)})").fetchall() return [r[1] for r in rows] def create_unique_index(self, table: str, key_columns: list[str]) -> None: """Create the unique index on *key_columns* that makes upsert-by-key work.""" - cols = ", ".join(key_columns) - index = f"idx_{table}_pk" + cols = quote_list(key_columns) + index = quote(f"idx_{table}_pk") with self._lock: self._conn.execute( - f"CREATE UNIQUE INDEX IF NOT EXISTS {index} ON {table} ({cols})" + f"CREATE UNIQUE INDEX IF NOT EXISTS {index} ON {quote(table)} ({cols})" ) self._conn.commit() @@ -378,23 +464,25 @@ class CacheManager: def max_value(self, table: str, column: str) -> str | None: """Maximum value of *column* across cached rows (the delta watermark).""" - row = self._conn.execute(f"SELECT MAX({column}) FROM {table}").fetchone() + row = self._conn.execute( + f"SELECT MAX({quote(column)}) FROM {quote(table)}" + ).fetchone() return row[0] if row else None def upsert_rows(self, table: str, columns: list[str], rows: list[tuple]) -> None: """Insert-or-replace one batch of *rows* by the table's unique key.""" - col_list = ", ".join(columns) + col_list = quote_list(columns) placeholders = ", ".join("?" * len(columns)) clean_rows = [coerce_row(row) for row in rows] with self._lock: self._conn.executemany( - f"INSERT OR REPLACE INTO {table} ({col_list}) VALUES ({placeholders})", + f"INSERT OR REPLACE INTO {quote(table)} ({col_list}) VALUES ({placeholders})", clean_rows, ) self._conn.commit() def count_rows(self, table: str) -> int: - row = self._conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone() + row = self._conn.execute(f"SELECT COUNT(*) FROM {quote(table)}").fetchone() return int(row[0]) if row else 0 def reset(self) -> None: @@ -411,7 +499,7 @@ class CacheManager: ).fetchall() ] for name in user_tables: - self._conn.execute(f"DROP TABLE IF EXISTS {name}") + self._conn.execute(f"DROP TABLE IF EXISTS {quote(name)}") self._conn.execute("DELETE FROM _sqlmem_tables") self._conn.execute("DELETE FROM _sqlmem_columns") self._conn.commit() @@ -434,6 +522,13 @@ class CacheManager: def close(self) -> None: self._backup_to_disk() self._closed = True + with self._lock: + for conn in self._read_conns: + try: + conn.close() + except sqlite3.Error: + pass + self._read_conns.clear() self._conn.close() diff --git a/src/sqlmem/delta.py b/src/sqlmem/delta.py index 539ac03..62cd3b7 100644 --- a/src/sqlmem/delta.py +++ b/src/sqlmem/delta.py @@ -1,13 +1,34 @@ -import sqlite3 from dataclasses import dataclass, field +from datetime import datetime +from typing import Any from loguru import logger +from ._sql import quote_source from .cache import CacheManager -from .config import FETCH_BATCH_SIZE from .stats import TableState +def _bind_watermark(watermark: str) -> datetime | str: + """Bind the delta watermark back to the source in its native type. + + The cache stores the change column as an ISO ``TEXT`` string (see + ``_coerce.to_sqlite``), so ``max(change_column)`` comes back as a string such + as ``'2026-06-05T14:54:24.823000'``. Sending that straight back to the source + as an ``nvarchar`` makes SQL Server do an implicit ``varchar -> datetime`` + conversion, which **fails** on the ``T``-separated, 6-digit-microsecond ISO + form (error 241 / SQLSTATE 22007 — ``datetime`` accepts at most 3 fractional + digits). Parsing it back to a real :class:`~datetime.datetime` makes the + driver send a typed timestamp, so the comparison happens natively with no + string conversion. Non-datetime change columns (e.g. an integer rowversion) + don't parse and are passed through unchanged. + """ + try: + return datetime.fromisoformat(watermark) + except (TypeError, ValueError): + return watermark + + @dataclass(frozen=True) class DeltaConfig: """Per-table configuration for incremental (delta) refresh. @@ -43,28 +64,37 @@ class DeltaRefresher: self._cache = cache self._delta = delta - def refresh(self, source_conn: sqlite3.Connection) -> None: + def refresh(self, source_conn: Any) -> None: for table, cfg in self._delta.items(): if not self._cache.is_table_cached(table): continue try: self._refresh_table(table, cfg, source_conn) + self._cache.record_success(table) except Exception as e: # one bad table must not stop the others logger.error(f"Delta refresh failed for {table!r}: {e}") + # A delta can fail before streaming starts (e.g. a watermark the + # source rejects), leaving state misleadingly READY — mark it and + # record the error so stats reveal the stuck table. + self._cache.set_state(table, TableState.ERROR) + self._cache.record_error(table, f"{type(e).__name__}: {e}") def _refresh_table( - self, table: str, cfg: ResolvedDelta, source_conn: sqlite3.Connection + self, table: str, cfg: ResolvedDelta, source_conn: Any ) -> None: columns = self._cache.get_table_columns(table) watermark = self._cache.get_last_synced_at(table) - col_list = ", ".join(columns) + dialect = self._cache._dialect + col_list = ", ".join(quote_source(c, dialect) for c in columns) + q_table = quote_source(table, dialect) if watermark is None: - cursor = source_conn.execute(f"SELECT {col_list} FROM {table}") + cursor = source_conn.execute(f"SELECT {col_list} FROM {q_table}") else: + change_col = quote_source(cfg.change_column, dialect) cursor = source_conn.execute( - f"SELECT {col_list} FROM {table} WHERE {cfg.change_column} >= ?", - (watermark,), + f"SELECT {col_list} FROM {q_table} WHERE {change_col} >= ?", + (_bind_watermark(watermark),), ) # Stream the delta in batches so a large catch-up never materializes at once. @@ -72,7 +102,7 @@ class DeltaRefresher: self._cache.set_state(table, TableState.REFRESHING) try: while True: - batch = cursor.fetchmany(FETCH_BATCH_SIZE) + batch = cursor.fetchmany(self._cache._fetch_batch) if not batch: break self._cache.upsert_rows(table, columns, batch) diff --git a/src/sqlmem/engine.py b/src/sqlmem/engine.py index 06899c9..68abe4d 100644 --- a/src/sqlmem/engine.py +++ b/src/sqlmem/engine.py @@ -1,18 +1,21 @@ -import sqlite3 import threading from dataclasses import replace -from typing import cast +from pathlib import Path +from typing import Any from loguru import logger from sqlalchemy import inspect -from sqlalchemy.engine import Engine +from sqlalchemy.engine import Connection, Engine -from .cache import CacheManager +from ._sql import quote +from .cache import CacheManager, TableError from .config import ( BACKUP_INTERVAL_SECONDS, CACHE_DB_PATH, + FETCH_BATCH_SIZE, IN_MEMORY, REFRESH_INTERVAL_SECONDS, + SQL_DIALECT, ) from .delta import DeltaConfig, DeltaRefresher, ResolvedDelta from .executor import QueryExecutor @@ -21,6 +24,32 @@ from .registry import ColumnRegistry from .stats import Stats, StatsCollector, TableState, TableStats +class _LazySource: + """A source connection opened on first ``execute`` and shared across one query. + + Most queries are cache hits that never touch the source, so opening it (and + occupying a connection-pool slot) eagerly is wasteful. This proxy forwards + ``execute`` to a real connection opened on demand, then released by ``close``. + """ + + def __init__(self, source_engine: Engine) -> None: + self._source_engine = source_engine + self._sa_conn: Connection | None = None + self._raw: Any = None + + def execute(self, *args: Any, **kwargs: Any) -> Any: + if self._raw is None: + self._sa_conn = self._source_engine.connect() + self._raw = self._sa_conn.connection.dbapi_connection + return self._raw.execute(*args, **kwargs) + + def close(self) -> None: + if self._sa_conn is not None: + self._sa_conn.close() + self._sa_conn = None + self._raw = None + + class CachingEngine: """Transparent SQLAlchemy-compatible cache layer.""" @@ -31,15 +60,28 @@ class CachingEngine: ttl: dict[str, int] | None = None, indexes: dict[str, list[str | list[str]]] | None = None, in_memory: bool | None = None, + cache_db_path: str | Path | None = None, + backup_interval: int | None = None, + refresh_interval: int | None = None, + fetch_batch: int | None = None, + dialect: str | None = None, + blocking_startup_refresh: bool = False, ) -> None: self._source_engine = source_engine use_memory = IN_MEMORY if in_memory is None else in_memory + self._dialect = dialect if dialect is not None else SQL_DIALECT + self._refresh_interval = ( + refresh_interval if refresh_interval is not None else REFRESH_INTERVAL_SECONDS + ) self._cache = CacheManager( - CACHE_DB_PATH, BACKUP_INTERVAL_SECONDS, in_memory=use_memory + Path(cache_db_path) if cache_db_path is not None else CACHE_DB_PATH, + backup_interval if backup_interval is not None else BACKUP_INTERVAL_SECONDS, + in_memory=use_memory, + dialect=self._dialect, + fetch_batch=fetch_batch if fetch_batch is not None else FETCH_BATCH_SIZE, ) self._registry = ColumnRegistry(self._cache.connection) self._stats = StatsCollector() - self._refresh_interval = REFRESH_INTERVAL_SECONDS self._delta = self._resolve_delta(delta or {}) self._ttl = dict(ttl or {}) self._index_columns = self._register_indexes(indexes or {}) @@ -54,8 +96,13 @@ class CachingEngine: ) if self._delta or self._ttl: - self._run_refresh() # catch up tables restored from disk - self._start_refresh_thread() + # The startup catch-up (deltas/TTL reloads for tables restored from + # disk) can take a while on a cold start. By default it runs on the + # background thread so it never blocks application startup; callers + # who need the cache fully fresh before serving can opt back in. + if blocking_startup_refresh: + self._run_refresh() + self._start_refresh_thread(initial_catch_up=not blocking_startup_refresh) logger.info("CachingEngine initialized.") @@ -97,12 +144,18 @@ class CachingEngine: @property def stats(self) -> Stats: states = self._cache.get_states() + errors = self._cache.get_errors() with self._cache._lock: base = self._stats.snapshot(self._cache.connection, states) - return replace(base, tables={n: self._enrich(n, t) for n, t in base.tables.items()}) + base = replace(base, errors=self._cache.error_total) + return replace( + base, tables={n: self._enrich(n, t, errors) for n, t in base.tables.items()} + ) - def _enrich(self, name: str, table_stats: TableStats) -> TableStats: - """Annotate a TableStats with how it is refreshed and TTL staleness.""" + def _enrich( + self, name: str, table_stats: TableStats, errors: dict[str, TableError] + ) -> TableStats: + """Annotate a TableStats with refresh tracking, TTL staleness and errors.""" if name in self._delta: tracking = "delta" elif name in self._ttl: @@ -115,22 +168,37 @@ class CachingEngine: age = self._cache.seconds_since_refresh(name) if age is not None and age > self._ttl[name]: state = TableState.STALE + + err = errors.get(name) + if err is not None: + return replace( + table_stats, + tracking=tracking, + state=state, + last_error=err.message, + last_error_at=err.at, + consecutive_failures=err.consecutive, + ) return replace(table_stats, tracking=tracking, state=state) def execute(self, sql: str, params: Params = None) -> list[dict]: - parsed = parse(sql, params) - with self._source_engine.connect() as sa_conn: - raw_conn = cast(sqlite3.Connection, sa_conn.connection.dbapi_connection) + parsed = parse(sql, params, dialect=self._dialect) + # The source connection is opened lazily — a pure cache hit never touches + # the source and never occupies a pool slot. + source = _LazySource(self._source_engine) + try: executor = QueryExecutor( self._cache, self._registry, - raw_conn, + source, self._stats, self._delta, self._ttl, self._index_columns, ) return executor.execute(parsed) + finally: + source.close() def refresh(self) -> None: """Pull deltas for all delta-tracked tables now (also runs on a timer).""" @@ -139,13 +207,13 @@ class CachingEngine: def _run_refresh(self) -> None: try: with self._source_engine.connect() as sa_conn: - raw_conn = cast(sqlite3.Connection, sa_conn.connection.dbapi_connection) + raw_conn = sa_conn.connection.dbapi_connection self._refresher.refresh(raw_conn) self._refresh_ttl(raw_conn) except Exception as e: logger.error(f"Refresh cycle failed: {e}") - def _refresh_ttl(self, source_conn: sqlite3.Connection) -> None: + def _refresh_ttl(self, source_conn: Any) -> None: """Proactively full-reload TTL-tracked tables whose cache has expired.""" for table, ttl in self._ttl.items(): if not self._cache.is_table_cached(table): @@ -161,8 +229,10 @@ class CachingEngine: except Exception as e: logger.error(f"TTL refresh failed for {table!r}: {e}") - def _start_refresh_thread(self) -> None: + def _start_refresh_thread(self, initial_catch_up: bool = True) -> None: def loop() -> None: + if initial_catch_up: + self._run_refresh() # off-main-thread startup catch-up event = threading.Event() while not event.wait(self._refresh_interval): self._run_refresh() @@ -174,7 +244,7 @@ class CachingEngine: def invalidate(self, table: str) -> None: logger.info(f"Manually invalidating cache for table {table!r}") with self._cache._lock: - self._cache.connection.execute(f"DROP TABLE IF EXISTS {table}") + self._cache.connection.execute(f"DROP TABLE IF EXISTS {quote(table)}") self._cache.connection.execute( "DELETE FROM _sqlmem_tables WHERE table_name = ?", (table,) ) diff --git a/src/sqlmem/executor.py b/src/sqlmem/executor.py index 94997d5..f8ee84d 100644 --- a/src/sqlmem/executor.py +++ b/src/sqlmem/executor.py @@ -1,4 +1,4 @@ -import sqlite3 +from typing import Any from loguru import logger @@ -14,7 +14,7 @@ class QueryExecutor: self, cache: CacheManager, registry: ColumnRegistry, - source_conn: sqlite3.Connection, + source_conn: Any, # raw DBAPI connection (pyodbc/sqlite3/…) — only .execute() is used stats: StatsCollector, delta: dict[str, ResolvedDelta] | None = None, ttl: dict[str, int] | None = None, diff --git a/src/sqlmem/parser.py b/src/sqlmem/parser.py index 475d826..f1d59e5 100644 --- a/src/sqlmem/parser.py +++ b/src/sqlmem/parser.py @@ -25,10 +25,10 @@ class ParsedQuery: wildcard_tables: set[str] = field(default_factory=set) -def parse(sql: str, params: Params = None) -> ParsedQuery: +def parse(sql: str, params: Params = None, dialect: str = SQL_DIALECT) -> ParsedQuery: logger.debug(f"Parsing SQL: {sql!r}") - statement = sqlglot.parse_one(sql, dialect=SQL_DIALECT) + statement = sqlglot.parse_one(sql, dialect=dialect) if isinstance(statement, WRITE_TYPES): raise ReadOnlyError( diff --git a/src/sqlmem/stats.py b/src/sqlmem/stats.py index 5fc81d6..87e9164 100644 --- a/src/sqlmem/stats.py +++ b/src/sqlmem/stats.py @@ -20,6 +20,11 @@ class TableStats: last_refresh: str state: str = TableState.READY tracking: str = "static" # "delta" | "ttl" | "static" + # Most recent load/refresh failure for this table, if any. ``consecutive_failures`` + # resets to 0 on the next success, so > 0 means the table is currently failing. + last_error: str | None = None + last_error_at: str | None = None + consecutive_failures: int = 0 @dataclass(frozen=True) @@ -28,6 +33,7 @@ class Stats: misses: int refetches: int tables: dict[str, TableStats] + errors: int = 0 # total load/refresh failures since start class StatsCollector: diff --git a/tests/test_cache.py b/tests/test_cache.py index 321f462..3070548 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -1,4 +1,5 @@ import sqlite3 +import threading import pytest @@ -96,6 +97,68 @@ def test_disk_mode_reload_in_new_instance(tmp_path, source_conn): c2.close() +def test_quoted_reserved_and_spaced_identifiers(tmp_path): + """Table/column names that are reserved words or contain spaces must work.""" + src = sqlite3.connect(":memory:") + src.execute('CREATE TABLE "weird tbl" ("order" TEXT, "group by" TEXT)') + src.executemany('INSERT INTO "weird tbl" VALUES (?, ?)', [("1", "a"), ("2", "b")]) + src.commit() + + c = CacheManager(db_path=tmp_path / "c.db", backup_interval=9999) + c.load_table("weird tbl", ["order", "group by"], src) + assert c.is_table_cached("weird tbl") is True + _, rows = c.execute_in_memory('SELECT "order", "group by" FROM "weird tbl"') + assert ("1", "a") in rows + c.close() + src.close() + + +def test_disk_mode_uses_separate_read_connection(tmp_path, source_conn): + """Disk-mode reads go through a per-thread read connection, not the writer.""" + c = CacheManager(db_path=tmp_path / "c.db", backup_interval=9999, in_memory=False) + c.load_table("users", ["name", "email"], source_conn) + + _, rows = c.execute_in_memory("SELECT name FROM users ORDER BY name") + assert [r[0] for r in rows] == ["alice", "bob"] + assert len(c._read_conns) == 1 + assert c._read_conns[0] is not c.connection # dedicated read conn + c.close() + + +def test_disk_mode_concurrent_reads(tmp_path, source_conn): + """Several reader threads each get their own connection and correct results.""" + c = CacheManager(db_path=tmp_path / "c.db", backup_interval=9999, in_memory=False) + c.load_table("users", ["name"], source_conn) + + results: list[int] = [] + errors: list[Exception] = [] + + def reader() -> None: + try: + _, rows = c.execute_in_memory("SELECT name FROM users") + results.append(len(rows)) + except Exception as e: # noqa: BLE001 + errors.append(e) + + threads = [threading.Thread(target=reader) for _ in range(5)] + for t in threads: + t.start() + for t in threads: + t.join(5) + + assert not errors + assert results == [2] * 5 + assert len(c._read_conns) == 5 # one read connection per reader thread + c.close() + + +def test_memory_mode_uses_shared_connection(cache, source_conn): + """In-memory mode can't share :memory: across connections — no read conns.""" + cache.load_table("users", ["name"], source_conn) + cache.execute_in_memory("SELECT name FROM users") + assert cache._read_conns == [] + + def test_disk_mode_reset_keeps_file(tmp_path, source_conn): db_path = tmp_path / "cache.db" c = CacheManager(db_path=db_path, backup_interval=9999, in_memory=False) diff --git a/tests/test_delta.py b/tests/test_delta.py index 15f3ea8..1c764c1 100644 --- a/tests/test_delta.py +++ b/tests/test_delta.py @@ -1,4 +1,6 @@ import sqlite3 +import threading +from datetime import datetime from types import SimpleNamespace import pytest @@ -7,7 +9,7 @@ from sqlalchemy import create_engine import sqlmem.engine as eng_mod from sqlmem import CachingEngine, DeltaConfig from sqlmem.cache import CacheManager -from sqlmem.delta import DeltaRefresher, ResolvedDelta +from sqlmem.delta import DeltaRefresher, ResolvedDelta, _bind_watermark from sqlmem.executor import QueryExecutor from sqlmem.parser import parse from sqlmem.registry import ColumnRegistry @@ -117,6 +119,89 @@ def test_refresh_without_changes_is_noop(env): assert before == after +# --------------------------------------------------------------------------- +# Watermark binding — regression for the datetime-as-string delta bug +# (SQL Server error 241: 'T'-separated 6-digit-microsecond ISO string can't be +# implicitly converted varchar->datetime, freezing the delta watermark). +# --------------------------------------------------------------------------- + +def test_bind_watermark_parses_iso_datetime(): + assert _bind_watermark("2026-06-05T14:54:24.823000") == datetime( + 2026, 6, 5, 14, 54, 24, 823000 + ) + + +def test_bind_watermark_parses_space_separated(): + assert _bind_watermark("2026-06-01 10:05:00") == datetime(2026, 6, 1, 10, 5, 0) + + +def test_bind_watermark_passes_through_non_datetime(): + # Integer rowversion / non-datetime change column — left untouched. + assert _bind_watermark("12345") == "12345" + + +class _SpyCursor: + def __init__(self, rows): + self._rows = list(rows) + + def fetchmany(self, n): + batch, self._rows = self._rows[:n], self._rows[n:] + return batch + + +class _SpySource: + """Records the parameters bound to each query (stands in for the pyodbc source).""" + + def __init__(self, rows): + self._rows = rows + self.bound = [] + + def execute(self, sql, params=()): + self.bound.append((sql, params)) + return _SpyCursor(self._rows) + + +def test_refresh_binds_watermark_as_datetime(env): + """The watermark must reach the source as a real datetime, not a raw ISO + string — otherwise SQL Server raises error 241 and the delta freezes.""" + env.cache.set_last_synced_at("products", "2026-06-05T14:54:24.823000") + spy = _SpySource(rows=[("1", "Widget", "9.99", "2026-06-05T14:54:24.823000")]) + + env.refresher.refresh(spy) + + assert spy.bound, "source query was never issued" + _, params = spy.bound[-1] + assert params == (datetime(2026, 6, 5, 14, 54, 24, 823000),) + + +# --------------------------------------------------------------------------- +# Refresh failures are recorded (4.3) so a stuck delta is visible in stats +# --------------------------------------------------------------------------- + +class _RaisingSource: + def execute(self, sql, params=()): + raise RuntimeError("boom 241") + + +def test_failed_delta_refresh_records_error(env): + env.refresher.refresh(_RaisingSource()) + + err = env.cache.get_errors()["products"] + assert err.consecutive == 1 + assert "boom 241" in err.message + assert env.cache.error_total == 1 + # State is marked error even though the cache still holds the last-good data. + assert env.cache.get_states()["products"] == "error" + + +def test_delta_success_resets_failure_streak(env): + env.refresher.refresh(_RaisingSource()) + assert env.cache.get_errors()["products"].consecutive == 1 + + env.refresher.refresh(env.source) # real source — succeeds + assert env.cache.get_errors()["products"].consecutive == 0 + + # --------------------------------------------------------------------------- # Engine-level: PK auto-discovery, reset, end-to-end refresh # --------------------------------------------------------------------------- @@ -170,6 +255,48 @@ def test_engine_reset(source_engine, patched_cache): engine.close() +def test_startup_catch_up_is_non_blocking_by_default(source_engine, patched_cache, monkeypatch): + """By default the startup catch-up runs on the background thread, not the + main thread, so it never blocks application startup.""" + threads: list[str] = [] + started = threading.Event() + real = eng_mod.CachingEngine._run_refresh + + def spy(self): + threads.append(threading.current_thread().name) + started.set() + return real(self) + + monkeypatch.setattr(eng_mod.CachingEngine, "_run_refresh", spy) + engine = CachingEngine( + source_engine, delta={"products": DeltaConfig("changed", ["id"])} + ) + # __init__ has returned; the main thread must not have run the catch-up. + assert "MainThread" not in threads + assert started.wait(2), "background catch-up never ran" + assert threads == ["sqlmem-delta"] + engine.close() + + +def test_blocking_startup_refresh_runs_synchronously(source_engine, patched_cache, monkeypatch): + threads: list[str] = [] + real = eng_mod.CachingEngine._run_refresh + + def spy(self): + threads.append(threading.current_thread().name) + return real(self) + + monkeypatch.setattr(eng_mod.CachingEngine, "_run_refresh", spy) + engine = CachingEngine( + source_engine, + delta={"products": DeltaConfig("changed", ["id"])}, + blocking_startup_refresh=True, + ) + # Opt-in: the catch-up ran on the main thread before __init__ returned. + assert "MainThread" in threads + engine.close() + + def test_engine_delta_refresh_end_to_end(source_engine, source_db, patched_cache): engine = CachingEngine( source_engine, delta={"products": DeltaConfig(change_column="changed", key_columns=["id"])} diff --git a/tests/test_engine.py b/tests/test_engine.py index ccee4cd..6463707 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -124,6 +124,22 @@ def test_second_query_same_columns_is_cache_hit(engine): assert len(rows) == 3 +def test_cache_hit_does_not_open_source(engine, source_engine, monkeypatch): + """A pure cache hit must not open a source connection (lazy source).""" + engine.execute("SELECT id, name FROM products") # miss → caches + + calls = {"n": 0} + original_connect = source_engine.connect + + def counting_connect(*args, **kwargs): + calls["n"] += 1 + return original_connect(*args, **kwargs) + + monkeypatch.setattr(source_engine, "connect", counting_connect) + engine.execute("SELECT id, name FROM products") # hit → no source access + assert calls["n"] == 0 + + # --------------------------------------------------------------------------- # SQL file creation — backup to disk # --------------------------------------------------------------------------- @@ -331,3 +347,41 @@ def test_in_memory_override_respects_config(source_engine, cache_path, monkeypat ce = CachingEngine(source_engine) # no explicit in_memory assert ce._cache._in_memory is False ce.close() + + +# --------------------------------------------------------------------------- +# Per-engine configuration (constructor overrides env defaults) +# --------------------------------------------------------------------------- + +def test_constructor_config_overrides(source_engine, tmp_path): + p = tmp_path / "explicit_cache.db" + ce = CachingEngine( + source_engine, + cache_db_path=p, + fetch_batch=3, + dialect="sqlite", + backup_interval=12345, + refresh_interval=42, + in_memory=False, + ) + ce.execute("SELECT id, name FROM products") + assert p.exists() + assert ce._cache._fetch_batch == 3 + assert ce._cache._dialect == "sqlite" + assert ce._dialect == "sqlite" + assert ce._cache._backup_interval == 12345 + assert ce._refresh_interval == 42 + ce.close() + + +def test_two_engines_separate_cache_files(source_engine, tmp_path): + """Two engines in one process can target different cache files.""" + a = CachingEngine(source_engine, cache_db_path=tmp_path / "a.db", in_memory=False) + b = CachingEngine(source_engine, cache_db_path=tmp_path / "b.db", in_memory=False) + a.execute("SELECT id FROM products") + + assert (tmp_path / "a.db").exists() + assert a._cache.is_table_cached("products") is True + assert b._cache.is_table_cached("products") is False # independent cache + a.close() + b.close() diff --git a/tests/test_logging.py b/tests/test_logging.py new file mode 100644 index 0000000..888ab5a --- /dev/null +++ b/tests/test_logging.py @@ -0,0 +1,24 @@ +from loguru import logger + +import sqlmem + + +def test_add_sink_idempotent_no_duplicate_lines(): + """Calling add_sink twice for the same sink must not duplicate log lines.""" + sqlmem._added_sinks.clear() + msgs: list[str] = [] + sink = lambda message: msgs.append(str(message)) # noqa: E731 + + try: + sqlmem.add_sink(sink, level="DEBUG", colorize=False) + sqlmem.add_sink(sink, level="DEBUG", colorize=False) # second call: no-op + assert len(sqlmem._added_sinks) == 1 + + # Emit one record that passes the "sqlmem" name filter. + logger.patch(lambda r: r.update(name="sqlmem")).info("hello sqlmem") + assert sum("hello sqlmem" in m for m in msgs) == 1 + finally: + for handler_id in sqlmem._added_sinks.values(): + logger.remove(handler_id) + sqlmem._added_sinks.clear() + logger.disable("sqlmem") # restore the default-silent state for other tests diff --git a/tests/test_stats.py b/tests/test_stats.py index fce74a0..b289a7f 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -73,6 +73,29 @@ def test_counters_still_reported(source_engine, patched_cache): engine.close() +def test_stats_exposes_table_error(source_engine, patched_cache): + engine = CachingEngine(source_engine) + engine.execute("SELECT id, name FROM products") + engine._cache.record_error("products", "ValueError: boom") + + s = engine.stats + assert s.errors == 1 + assert s.tables["products"].consecutive_failures == 1 + assert s.tables["products"].last_error == "ValueError: boom" + assert s.tables["products"].last_error_at is not None + engine.close() + + +def test_stats_no_error_by_default(source_engine, patched_cache): + engine = CachingEngine(source_engine) + engine.execute("SELECT id, name FROM products") + s = engine.stats + assert s.errors == 0 + assert s.tables["products"].consecutive_failures == 0 + assert s.tables["products"].last_error is None + engine.close() + + # --- a table being loaded for the first time shows up as "loading" ----------