diff --git a/CHANGELOG.md b/CHANGELOG.md index 9999268..1b8ab3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,27 @@ All notable changes to this project will be documented in this file. --- +## [1.3.1] - 2026-06-05 + +### Fixed +- **`decimal.Decimal` (and `datetime`) binding error** — `NUMERIC`/`DECIMAL`/`MONEY` columns from SQL Server (pyodbc) arrive as `decimal.Decimal`, which `sqlite3` cannot bind, crashing the cache load with `type 'decimal.Decimal' is not supported`. Values are now coerced to sqlite-bindable types (`Decimal`→`str`, `datetime`/`date`/`time`→ISO, `uuid.UUID`→`str`, `bytearray`→`bytes`) at the cache boundary — on full load, on delta upsert, and for WHERE parameters. Coercion is local (no global `sqlite3.register_adapter`), so the host application's `sqlite3` behaviour is untouched. Cache columns are `TEXT`, so the conversion is lossless and exact (no rounding). + +### Added +- **Incremental (delta) refresh** — `CachingEngine(engine, delta={...})` with `DeltaConfig(change_column, key_columns)`. Delta-tracked tables are kept in sync by pulling only changed rows (`WHERE change_column >= watermark`) and upserting them by key, instead of full reloads. + - Data-driven high-watermark = `max(change_column)` cached, persisted in `cache.db`; `>=` overlap + idempotent upsert so no row is missed and boundary rows are harmlessly re-read. + - Catch-up on startup (since last shutdown) and a background thread refreshing every `SQLMEM_REFRESH_INTERVAL` seconds (default 300); `engine.refresh()` triggers a pull on demand. + - Primary key is auto-discovered from the source DB (`inspect(engine).get_pk_constraint`) when `key_columns` is omitted; required explicitly for views (raises `ValueError`). +- `DeltaConfig` exported from the public API. +- `engine.reset()` — wipes the whole cache (RAM + `cache.db`) for a clean rebuild after structural source changes. +- `SQLMEM_REFRESH_INTERVAL` env var (default `300`). + +### Changed +- `pyproject.toml` — bumped version to `1.3.1` +- `cache.py` — schema version bumped to `3`; `_sqlmem_tables` gained a `last_synced_at` watermark column. New methods: `execute_in_memory` (lock-serialized read), `get_table_columns`, `create_unique_index`, `get/set_last_synced_at`, `max_value`, `upsert_rows`, `reset`. Existing on-disk caches are discarded and rebuilt on load. +- `executor.py` — loading a delta-tracked table augments the column set with its key and change columns, creates the unique key index, and records the initial watermark; in-memory reads now go through the cache lock. + +--- + ## [1.2.0] - 2026-06-04 ### Added diff --git a/README.md b/README.md index 46e9e32..e696784 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,70 @@ # SQLmem -Transparent in-memory cache layer between SQLAlchemy and your database. Drop it in front of any SQLAlchemy engine — SELECT queries are served from a fast in-memory SQLite cache, writes pass through unchanged. +Transparent in-memory cache layer between SQLAlchemy and your database. Drop it in front of any SQLAlchemy engine — `SELECT` queries are served from a fast in-memory SQLite cache, writes are rejected (read-only cache). + +## Goals + +SQLmem sits **between your application and the database** and behaves like a normal SQLAlchemy connection. It transparently: + +1. **Intercepts every query** that passes through it and learns, from the SQL itself, **which tables and which columns** the application actually uses. +2. **Holds exactly those tables/columns locally in SQLite** — primarily in **RAM**, secondarily persisted to **disk** (`cache.db`) at regular intervals and on shutdown. +3. **Serves repeated queries from RAM** with no database round-trip. +4. **Stays in sync incrementally** (see [Incremental refresh](#incremental-delta-refresh)): for large tables you declare a *change-timestamp* column, and SQLmem only re-fetches rows that changed in the last few minutes (or since the last shutdown) instead of reloading tens of millions of rows on every start. + +The application keeps calling SQL as usual — the cache is an implementation detail behind the interface. ## How it works -``` -Application (SQLAlchemy) - │ - ▼ - [ SQLmem Proxy ] - ┌──────────────────────────────┐ - │ SQL Parser │ → detects SELECT vs. write - │ Column Registry │ → tracks which columns are cached per table - │ Cache Manager (SQLite RAM) │ → stores data in memory - │ Query Executor │ → cache hit / miss logic - └──────────────────────────────┘ - │ - ▼ - Database (via original SQLAlchemy engine) +```mermaid +flowchart TB + App["Application (SQLAlchemy code)"] + DB[("Source database")] + + subgraph SM["SQLmem - transparent cache layer"] + direction TB + P["SQL Parser (sqlglot)
detect SELECT vs write
extract tables + columns"] + R["Column Registry
tracks tables + columns in cache"] + QE["Query Executor
cache hit / miss / refetch"] + MEM[("In-memory SQLite - PRIMARY")] + DISK[("cache.db on disk - SECONDARY")] + P --> R --> QE --> MEM + MEM -->|"backup every N s + on shutdown"| DISK + DISK -->|"load on startup"| MEM + end + + App -->|"execute(sql, params)"| P + QE -->|"cache miss / delta refresh only"| DB + DB -->|"rows"| MEM + MEM -->|"list of dicts"| App ``` -On the first SELECT for a table, SQLmem fetches the required rows from the database and stores them in an in-memory SQLite instance. Subsequent queries for the same columns hit the in-memory cache with no database round-trip. When a query requests a column not yet in cache, SQLmem re-fetches the table with the expanded column set. +On the first `SELECT` touching a table, SQLmem fetches the required rows from the database and stores them in the in-memory SQLite. Subsequent queries for the same columns hit RAM with no database round-trip. When a query requests a column not yet cached, SQLmem re-fetches the table with the expanded column set. Parametrized queries, JOINs and `SELECT *` are all supported; each table in a JOIN is cached independently and the JOIN runs inside the in-memory SQLite. -Parametrized queries, JOINs and `SELECT *` are all supported. Each table referenced in a JOIN is cached independently; the JOIN itself runs in the in-memory SQLite. Query parameters are applied during in-memory filtering, so cache loads always fetch the full table regardless of the `WHERE` values. +### Query lifecycle + +```mermaid +sequenceDiagram + participant App + participant SQLmem + participant Mem as In-memory SQLite + participant DB as Source DB + + App->>SQLmem: execute(SELECT a, b FROM t WHERE id = ?, params) + SQLmem->>SQLmem: parse -> table = t, columns = {a, b, id} + alt columns already cached + SQLmem->>Mem: run query in RAM (with params) + Mem-->>SQLmem: rows + else cache miss or new column + SQLmem->>DB: SELECT a, b, id FROM t (whole table, no WHERE) + DB-->>SQLmem: rows + SQLmem->>Mem: store / expand table + SQLmem->>Mem: run query in RAM (with params) + Mem-->>SQLmem: rows + end + SQLmem-->>App: list[dict] +``` + +Note: query **parameters are applied only to the in-memory query**, never to the source fetch — a cache load always pulls the full table so the cache can answer any later `WHERE` on those columns. ## Installation @@ -38,7 +80,7 @@ Requires Python 3.14. ```python from sqlmem import CachingEngine -from sqlalchemy import create_engine, text +from sqlalchemy import create_engine base_engine = create_engine("postgresql://user:pass@host/db") engine = CachingEngine(base_engine) @@ -65,7 +107,7 @@ engine.execute( engine.execute("SELECT * FROM users") ``` -`execute()` returns a list of dicts. Parameters are passed straight through to SQLite, so positional (`?`) and named (`:name`) styles both work. Results are compatible with standard iteration patterns. +`execute()` returns a list of dicts. Parameters are passed straight through to SQLite, so positional (`?`) and named (`:name`) styles both work. ## Cache behaviour @@ -83,23 +125,106 @@ Query 5: SELECT a FROM orders → cache hit (table already full) **Writes are blocked** — INSERT, UPDATE, and DELETE raise `ReadOnlyError`. SQLmem is a read-only cache. -## Persistence +## Incremental (delta) refresh -The in-memory cache is optionally persisted to `cache.db` on disk: - -- **On startup**: if `cache.db` exists, it is loaded into memory. -- **Hourly**: a background thread writes a snapshot to disk. -- **On shutdown**: a final flush via `atexit` and SIGTERM handler. - -Schema version is checked on load — if it does not match, the stale file is discarded and the cache is rebuilt from the database. - -## Manual cache invalidation +Reloading a table with tens of millions of rows on every startup is unacceptable. To avoid it, SQLmem keeps the cache in sync by pulling **only changed rows**. For each delta-tracked table you declare its **last-change timestamp** column and the **key column(s)** that identify a row: ```python -engine.invalidate("orders") # drops the table from cache; next query re-fetches from DB +from sqlmem import CachingEngine, DeltaConfig + +engine = CachingEngine( + base_engine, + delta={ + "VW_P_PRATVALUES": DeltaConfig( + change_column="LAST_CHANGE_DATE", # required — the row's change timestamp + key_columns=["PRODUCT_PRODUCTNR"], # optional for base tables (auto-discovered) + ), + }, +) +``` + +**What you must configure, and what is automatic:** + +| Item | Source | +|---|---| +| which **tables / columns** to cache | **automatic** — learned from the queries that pass through | +| `change_column` (timestamp) | **manual, always** — its meaning can't be inferred from the column type* | +| `key_columns` (primary key) | **auto-discovered** for real tables (`inspect(engine).get_pk_constraint`); **manual** for views, which carry no key in the DB catalog | + +* The one exception is a true MSSQL `rowversion`/`timestamp`-typed column, which is unique per table and auto-maintained — that could be detected automatically. A plain `DATETIME` like `LAST_CHANGE_DATE` cannot. + +If `key_columns` is omitted, SQLmem tries to read the primary key from the source DB on startup and raises a clear error if it can't (e.g. for a view) so you can supply it explicitly. + +### How sync works + +The boundary of "what changed since last time" is a **data-driven watermark**, not a wall-clock window. SQLmem persists, per delta-tracked table, `last_synced_at` = the **maximum `change_column` value** actually present in the cache after the previous sync (stored in `cache.db`, so it survives restarts). The next sync pulls `WHERE change_column >= last_synced_at`. + +Why a watermark and not `now − 5 min`: + +- **No clock dependency** — it compares DB values to DB values, so app-server vs database clock skew is irrelevant. +- **Survives downtime for free** — after hours offline, `>= watermark` pulls *everything* since then; "catch up since last shutdown" needs no special case. +- **Never misses late commits** — a wall-clock window can drop a row whose timestamp falls outside the window by the time it commits. + +The filter is `>=` (not `>`) so rows sharing the exact boundary timestamp are re-read; combined with **idempotent upsert by `key_columns`**, re-reading a handful of boundary rows each tick is harmless (they overwrite themselves), and no row is ever skipped. The 5-minute interval is only the **polling cadence**, never the filter boundary. + +```mermaid +sequenceDiagram + participant Trigger as Startup / every 5 min + participant SQLmem + participant Mem as In-memory SQLite + participant DB as Source DB + + Trigger->>SQLmem: refresh delta-tracked tables + SQLmem->>Mem: read last_synced_at for table + SQLmem->>DB: SELECT * FROM t WHERE LAST_CHANGE_DATE >= last_synced_at + DB-->>SQLmem: only rows changed since the watermark + SQLmem->>Mem: upsert rows by key_columns (INSERT OR REPLACE) + SQLmem->>Mem: last_synced_at = max(LAST_CHANGE_DATE) +``` + +- **First use** of a delta table → full load; the watermark is set to the table's current `max(change_column)`. +- **On startup** → for each delta table restored from disk, a single catch-up query pulls everything changed **since the last shutdown** and upserts it, bringing the cache back in sync without a full reload. +- **While running** → a background thread repeats the delta pull every `SQLMEM_REFRESH_INTERVAL` seconds (default 5 minutes), so the cache trails the source DB by at most that interval. +- Tables **without** a `DeltaConfig` keep the current behaviour: full load on miss, never auto-refreshed. + +### Requirements and limits of delta sync + +- The `change_column` must be **set by the source DB on every insert/update** and be non-decreasing (e.g. a `DATETIME`/`rowversion`/`timestamp` maintained by a trigger or the application). +- `key_columns` must uniquely identify a row — they are used to upsert changed rows in place. +- **Updates, including "deletes by nulling"** (a row that keeps its identity but has values cleared), are handled automatically: the change timestamp bumps, the row is re-pulled and overwritten in place. +- **Structural changes are not covered by delta sync** — adding/removing attributes, or clearing values *without* bumping `change_column`, won't be picked up. For those, force a clean reload with [`engine.reset()`](#manual-cache-control) (or `invalidate()` for a single table). +- Hard `DELETE`s of whole rows are not detected by a change-timestamp; this workload doesn't delete rows, but if yours does, use a soft-delete flag column or `reset()`. + +## Persistence + +The in-memory cache is persisted to `cache.db` on disk: + +- **On startup**: if `cache.db` exists, it is loaded into memory. +- **Periodically**: a background thread writes a snapshot to disk every `SQLMEM_BACKUP_INTERVAL` seconds. +- **On shutdown**: a final flush via `atexit` and SIGTERM handler. + +The schema version is checked on load — if it does not match, the stale file is discarded and the cache is rebuilt from the database. + +## Manual cache control + +```python +engine.invalidate("orders") # drop one table from cache; next query re-fetches it from DB +engine.reset() # wipe the whole cache (RAM + cache.db) — full clean slate +engine.refresh() # pull deltas for all delta-tracked tables now engine.close() # flush to disk and shut down background thread ``` +Use `reset()` after a **structural change** in the source (columns added/removed, values cleared in bulk without bumping the change timestamp) so the cache rebuilds from scratch. `invalidate(table)` is the targeted version for a single table. + +## Runtime statistics + +```python +stats = engine.stats # Stats snapshot +print(stats.hits, stats.misses, stats.refetches) +for name, t in stats.tables.items(): + print(name, t.rows, t.columns, t.last_refresh) +``` + ## Configuration Set via environment variables or a `.env` file: @@ -108,8 +233,9 @@ Set via environment variables or a `.env` file: |---|---|---| | `SQLMEM_DEBUG` | `false` | `true` enables DEBUG-level logging | | `SQLMEM_CACHE_DB` | `cache.db` | Path to the on-disk persistence file | -| `SQLMEM_BACKUP_INTERVAL` | `3600` | Backup interval in seconds | +| `SQLMEM_BACKUP_INTERVAL` | `3600` | Disk backup interval in seconds | | `SQLMEM_SQL_DIALECT` | `tsql` | sqlglot dialect used to parse incoming SQL (e.g. `tsql`, `postgres`, `mysql`) | +| `SQLMEM_REFRESH_INTERVAL` | `300` | delta-refresh interval in seconds for delta-tracked tables | ## Exceptions @@ -132,7 +258,7 @@ from sqlmem import add_sink add_sink(sys.stderr) # INFO by default add_sink(sys.stderr, level="DEBUG") # verbose: every query, cache hit/miss, backup -add_sink("sqlmem.log", rotation="10 MB") # to a file +add_sink("sqlmem.log", rotation="10 MB") # to a file ``` Set `SQLMEM_DEBUG=true` in `.env` to make the default level DEBUG when no explicit `level` is passed to `add_sink()`. @@ -142,9 +268,16 @@ Set `SQLMEM_DEBUG=true` in `.env` to make the default level DEBUG when no explic - In a multi-table (JOIN) query, every column must be qualified with its table or alias; unqualified columns raise `UnsupportedQueryError`. - Tables are keyed by their base name — two tables with the same name in different schemas share one cache entry. - No distributed cache backend (Redis etc.). -- No transactional consistency guarantees. +- No transactional consistency guarantees; the cache trails the source DB. - Write operations (INSERT/UPDATE/DELETE) are always blocked. +## Roadmap + +- [x] **Incremental (delta) refresh** via per-table change-timestamp + key columns (see above) — the key feature for large tables. +- [x] **Primary-key auto-discovery** from the source DB (`inspect(engine).get_pk_constraint`) so `key_columns` is only needed for views. +- [x] **`engine.reset()`** — wipe RAM + `cache.db` for a clean rebuild after structural changes. +- [ ] Per-table TTL (time-to-live) expiry. + ## Dependencies | Layer | Library | diff --git a/project.md b/project.md index eee965c..6a98149 100644 --- a/project.md +++ b/project.md @@ -191,6 +191,8 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o - [x] **Podpora `SELECT *` (wildcard)**: Načte celou tabulku do cache, označí ji jako `is_full` — další dotazy na libovolný sloupec jsou vždy cache hit bez re-fetch. - [x] **Podpora JOIN**: Parser extrahuje sloupce z každé joinované tabulky zvlášť, Column Registry je sleduje nezávisle. Cache Manager zajistí, že všechny potřebné tabulky jsou v paměti před spuštěním dotazu. - [x] **Třídílné názvy tabulek**: `[catalog].[schema].[table]` se cachuje pod base name, in-memory dotaz prefix stripuje. +- [x] **Inkrementální (delta) refresh**: per-tabulku `DeltaConfig(change_column, key_columns)` — sync jen změněných řádků přes datový watermark `max(change_column)` (`>=` + idempotentní upsert podle klíče), catch-up na startu + background thread (`SQLMEM_REFRESH_INTERVAL`, default 300 s). PK se auto-zjistí ze zdrojové DB, pro views nutno zadat ručně. +- [x] **`engine.reset()`**: smaže celou cache (RAM + `cache.db`) pro čistý rebuild po strukturální změně. ## TODO — budoucí funkce diff --git a/pyproject.toml b/pyproject.toml index f8fc535..b4b690d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sqlmem" -version = "1.2.0" +version = "1.3.1" description = "" authors = [ {name = "jan.doubravsky@gmail.com"} diff --git a/src/sqlmem/__init__.py b/src/sqlmem/__init__.py index a621848..bb52442 100644 --- a/src/sqlmem/__init__.py +++ b/src/sqlmem/__init__.py @@ -3,6 +3,7 @@ from typing import Any from loguru import logger from .config import DEBUG +from .delta import DeltaConfig from .engine import CachingEngine from .exceptions import ReadOnlyError, UnsupportedQueryError from .stats import Stats, TableStats @@ -35,4 +36,12 @@ def add_sink(sink: Any, *, level: str | None = None, **kwargs: Any) -> None: logger.add(sink, level=level or ("DEBUG" if DEBUG else "INFO"), filter="sqlmem", **kwargs) -__all__ = ["CachingEngine", "ReadOnlyError", "UnsupportedQueryError", "Stats", "TableStats", "add_sink"] +__all__ = [ + "CachingEngine", + "DeltaConfig", + "ReadOnlyError", + "UnsupportedQueryError", + "Stats", + "TableStats", + "add_sink", +] diff --git a/src/sqlmem/_coerce.py b/src/sqlmem/_coerce.py new file mode 100644 index 0000000..349136a --- /dev/null +++ b/src/sqlmem/_coerce.py @@ -0,0 +1,40 @@ +"""Coerce source-DB values into types ``sqlite3`` can bind. + +pyodbc returns ``NUMERIC``/``DECIMAL``/``MONEY`` as :class:`decimal.Decimal` and +date/time columns as :mod:`datetime` objects, none of which ``sqlite3`` binds +natively. Cache columns are ``TEXT``, so stringifying is lossless and consistent +with how the data is stored. This is done **locally** — never via a global +``sqlite3.register_adapter`` — so the host application's ``sqlite3`` behaviour is +left untouched. +""" + +import datetime +import decimal +import uuid +from typing import Any + +Params = tuple | list | dict | None + + +def to_sqlite(value: Any) -> Any: + if isinstance(value, decimal.Decimal): + return str(value) + if isinstance(value, (datetime.datetime, datetime.date, datetime.time)): + return value.isoformat() + if isinstance(value, uuid.UUID): + return str(value) + if isinstance(value, bytearray): + return bytes(value) + return value + + +def coerce_row(row: tuple) -> tuple: + return tuple(to_sqlite(v) for v in row) + + +def coerce_params(params: Params) -> tuple | dict | None: + if params is None: + return None + if isinstance(params, dict): + return {key: to_sqlite(val) for key, val in params.items()} + return tuple(to_sqlite(val) for val in params) diff --git a/src/sqlmem/cache.py b/src/sqlmem/cache.py index 7b13a7a..2ab3e9d 100644 --- a/src/sqlmem/cache.py +++ b/src/sqlmem/cache.py @@ -8,8 +8,9 @@ from pathlib import Path from loguru import logger import sqlmem._meta as _meta +from ._coerce import coerce_params, coerce_row -SCHEMA_VERSION = 2 +SCHEMA_VERSION = 3 class CacheManager: @@ -41,7 +42,8 @@ class CacheManager: table_name TEXT PRIMARY KEY, last_refresh_at TEXT NOT NULL, row_count INTEGER, - is_full INTEGER NOT NULL DEFAULT 0 + is_full INTEGER NOT NULL DEFAULT 0, + last_synced_at TEXT ); CREATE TABLE IF NOT EXISTS _sqlmem_columns ( table_name TEXT NOT NULL, @@ -159,18 +161,103 @@ class CacheManager: cols = ", ".join(columns) logger.info(f"Fetching {table!r} columns [{cols}] from source DB") rows = source_conn.execute(f"SELECT {cols} FROM {table}").fetchall() + clean_rows = [coerce_row(row) for row in rows] with self._lock: self._mem_conn.execute(f"DROP TABLE IF EXISTS {table}") col_defs = ", ".join(f"{c} TEXT" for c in columns) self._mem_conn.execute(f"CREATE TABLE {table} ({col_defs})") placeholders = ", ".join("?" * len(columns)) - self._mem_conn.executemany(f"INSERT INTO {table} VALUES ({placeholders})", rows) + self._mem_conn.executemany(f"INSERT INTO {table} VALUES ({placeholders})", clean_rows) self._mem_conn.commit() self.mark_table_refreshed(table, len(rows), full) logger.info(f"Table {table!r} cached ({len(rows)} rows, columns: {columns})") + def execute_in_memory( + self, sql: str, params: tuple | list | dict | None = None + ) -> tuple[list[str], list[tuple]]: + """Run a read query against the in-memory cache, serialized with writers.""" + bound = coerce_params(params) + with self._lock: + cursor = self._mem_conn.execute(sql) if bound is None else self._mem_conn.execute(sql, bound) + col_names = [desc[0] for desc in cursor.description] + rows = cursor.fetchall() + return col_names, rows + + # --- delta refresh support --------------------------------------------- + + def get_table_columns(self, table: str) -> list[str]: + """Authoritative ordered column list of a cached table (via PRAGMA).""" + rows = self._mem_conn.execute(f"PRAGMA table_info({table})").fetchall() + return [r[1] for r in rows] + + def create_unique_index(self, table: str, key_columns: list[str]) -> None: + """Create the unique index on *key_columns* that makes upsert-by-key work.""" + cols = ", ".join(key_columns) + index = f"idx_{table}_pk" + with self._lock: + self._mem_conn.execute( + f"CREATE UNIQUE INDEX IF NOT EXISTS {index} ON {table} ({cols})" + ) + self._mem_conn.commit() + + def get_last_synced_at(self, table: str) -> str | None: + row = self._mem_conn.execute( + "SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,) + ).fetchone() + return row[0] if row else None + + def set_last_synced_at(self, table: str, value: str | None) -> None: + with self._lock: + self._mem_conn.execute( + "UPDATE _sqlmem_tables SET last_synced_at = ? WHERE table_name = ?", + (value, table), + ) + self._mem_conn.commit() + + def max_value(self, table: str, column: str) -> str | None: + """Maximum value of *column* across cached rows (the delta watermark).""" + row = self._mem_conn.execute(f"SELECT MAX({column}) FROM {table}").fetchone() + return row[0] if row else None + + def upsert_rows(self, table: str, columns: list[str], rows: list[tuple]) -> None: + """Insert-or-replace *rows* by the table's unique key, then refresh row_count.""" + col_list = ", ".join(columns) + placeholders = ", ".join("?" * len(columns)) + clean_rows = [coerce_row(row) for row in rows] + with self._lock: + self._mem_conn.executemany( + f"INSERT OR REPLACE INTO {table} ({col_list}) VALUES ({placeholders})", + clean_rows, + ) + self._mem_conn.commit() + count = self._mem_conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] + self.mark_table_refreshed(table, count, self.is_table_full(table)) + + def reset(self) -> None: + """Wipe the entire cache — every cached table plus the on-disk file.""" + logger.info("Resetting cache — dropping all cached tables.") + with self._lock: + user_tables = [ + r[0] + for r in self._mem_conn.execute( + "SELECT name FROM sqlite_master " + r"WHERE type = 'table' AND name NOT LIKE 'sqlite\_%' ESCAPE '\' " + r"AND name NOT LIKE '\_sqlmem\_%' ESCAPE '\'" + ).fetchall() + ] + for name in user_tables: + self._mem_conn.execute(f"DROP TABLE IF EXISTS {name}") + self._mem_conn.execute("DELETE FROM _sqlmem_tables") + self._mem_conn.execute("DELETE FROM _sqlmem_columns") + self._mem_conn.commit() + try: + if self._db_path.exists(): + self._db_path.unlink() + except OSError as e: + logger.error(f"Failed to delete cache file {self._db_path}: {e}") + def close(self) -> None: self._backup_to_disk() self._closed = True diff --git a/src/sqlmem/config.py b/src/sqlmem/config.py index 625bde4..482dd60 100644 --- a/src/sqlmem/config.py +++ b/src/sqlmem/config.py @@ -9,6 +9,8 @@ load_dotenv() DEBUG = os.getenv("SQLMEM_DEBUG", "false").lower() == "true" CACHE_DB_PATH = Path(os.getenv("SQLMEM_CACHE_DB", "cache.db")) BACKUP_INTERVAL_SECONDS = int(os.getenv("SQLMEM_BACKUP_INTERVAL", "3600")) +# How often (seconds) the background thread pulls deltas for delta-tracked tables. +REFRESH_INTERVAL_SECONDS = int(os.getenv("SQLMEM_REFRESH_INTERVAL", "300")) # Dialect used by sqlglot to parse incoming SQL. Defaults to T-SQL (SQL Server), # which also accepts ANSI SQL. In-memory queries are always rendered to SQLite. SQL_DIALECT = os.getenv("SQLMEM_SQL_DIALECT", "tsql") diff --git a/src/sqlmem/delta.py b/src/sqlmem/delta.py new file mode 100644 index 0000000..0c6a7bd --- /dev/null +++ b/src/sqlmem/delta.py @@ -0,0 +1,78 @@ +import sqlite3 +from dataclasses import dataclass, field + +from loguru import logger + +from .cache import CacheManager + + +@dataclass(frozen=True) +class DeltaConfig: + """Per-table configuration for incremental (delta) refresh. + + *change_column* is the column the source DB updates on every insert/update + (a non-decreasing timestamp / rowversion). *key_columns* uniquely identify a + row and are used to upsert changed rows in place; leave them empty to let the + engine auto-discover the primary key from the source DB (works for real + tables, not views). + """ + + change_column: str + key_columns: list[str] = field(default_factory=list) + + +@dataclass(frozen=True) +class ResolvedDelta: + """A :class:`DeltaConfig` with ``key_columns`` resolved to concrete columns.""" + + change_column: str + key_columns: list[str] + + +class DeltaRefresher: + """Pulls only changed rows for delta-tracked tables and upserts them. + + Uses a data-driven high-watermark (``max(change_column)`` actually cached) + with a ``>=`` overlap and idempotent upsert by key, so no row is ever missed + and boundary rows are harmlessly re-read. + """ + + def __init__(self, cache: CacheManager, delta: dict[str, ResolvedDelta]) -> None: + self._cache = cache + self._delta = delta + + def refresh(self, source_conn: sqlite3.Connection) -> None: + for table, cfg in self._delta.items(): + if not self._cache.is_table_cached(table): + continue + try: + self._refresh_table(table, cfg, source_conn) + except Exception as e: # one bad table must not stop the others + logger.error(f"Delta refresh failed for {table!r}: {e}") + + def _refresh_table( + self, table: str, cfg: ResolvedDelta, source_conn: sqlite3.Connection + ) -> None: + columns = self._cache.get_table_columns(table) + watermark = self._cache.get_last_synced_at(table) + col_list = ", ".join(columns) + + if watermark is None: + rows = source_conn.execute(f"SELECT {col_list} FROM {table}").fetchall() + else: + rows = source_conn.execute( + f"SELECT {col_list} FROM {table} WHERE {cfg.change_column} >= ?", + (watermark,), + ).fetchall() + + if not rows: + logger.debug(f"Delta refresh {table!r}: no changes since {watermark!r}") + return + + self._cache.upsert_rows(table, columns, rows) + new_watermark = self._cache.max_value(table, cfg.change_column) + self._cache.set_last_synced_at(table, new_watermark) + logger.info( + f"Delta refresh {table!r}: {len(rows)} row(s) upserted, " + f"watermark {watermark!r} → {new_watermark!r}" + ) diff --git a/src/sqlmem/engine.py b/src/sqlmem/engine.py index daacfbc..b08fb7b 100644 --- a/src/sqlmem/engine.py +++ b/src/sqlmem/engine.py @@ -1,11 +1,14 @@ import sqlite3 +import threading from typing import cast from loguru import logger +from sqlalchemy import inspect from sqlalchemy.engine import Engine from .cache import CacheManager -from .config import BACKUP_INTERVAL_SECONDS, CACHE_DB_PATH +from .config import BACKUP_INTERVAL_SECONDS, CACHE_DB_PATH, REFRESH_INTERVAL_SECONDS +from .delta import DeltaConfig, DeltaRefresher, ResolvedDelta from .executor import QueryExecutor from .parser import Params, parse from .registry import ColumnRegistry @@ -15,24 +18,80 @@ from .stats import Stats, StatsCollector class CachingEngine: """Transparent SQLAlchemy-compatible cache layer.""" - def __init__(self, source_engine: Engine) -> None: + def __init__( + self, + source_engine: Engine, + delta: dict[str, DeltaConfig] | None = None, + ) -> None: self._source_engine = source_engine self._cache = CacheManager(CACHE_DB_PATH, BACKUP_INTERVAL_SECONDS) self._registry = ColumnRegistry(self._cache.connection) self._stats = StatsCollector() + self._refresh_interval = REFRESH_INTERVAL_SECONDS + self._delta = self._resolve_delta(delta or {}) + self._refresher = DeltaRefresher(self._cache, self._delta) + + if self._delta: + self._run_refresh() # catch up tables restored from disk + self._start_refresh_thread() + logger.info("CachingEngine initialized.") + def _resolve_delta(self, delta: dict[str, DeltaConfig]) -> dict[str, ResolvedDelta]: + """Resolve each DeltaConfig, auto-discovering the primary key when omitted.""" + resolved: dict[str, ResolvedDelta] = {} + inspector = None + for table, cfg in delta.items(): + keys = list(cfg.key_columns) + if not keys: + inspector = inspector or inspect(self._source_engine) + pk = inspector.get_pk_constraint(table) + keys = list(pk.get("constrained_columns") or []) + if not keys: + raise ValueError( + f"No primary key found for {table!r} in the source DB " + "(views have none) — set key_columns in its DeltaConfig." + ) + logger.info(f"Delta {table!r}: auto-discovered key columns {keys}") + resolved[table] = ResolvedDelta(change_column=cfg.change_column, key_columns=keys) + return resolved + @property def stats(self) -> Stats: - return self._stats.snapshot(self._cache.connection) + with self._cache._lock: + return self._stats.snapshot(self._cache.connection) def execute(self, sql: str, params: Params = None) -> list[dict]: parsed = parse(sql, params) with self._source_engine.connect() as sa_conn: raw_conn = cast(sqlite3.Connection, sa_conn.connection.dbapi_connection) - executor = QueryExecutor(self._cache, self._registry, raw_conn, self._stats) + executor = QueryExecutor( + self._cache, self._registry, raw_conn, self._stats, self._delta + ) return executor.execute(parsed) + def refresh(self) -> None: + """Pull deltas for all delta-tracked tables now (also runs on a timer).""" + self._run_refresh() + + def _run_refresh(self) -> None: + try: + with self._source_engine.connect() as sa_conn: + raw_conn = cast(sqlite3.Connection, sa_conn.connection.dbapi_connection) + self._refresher.refresh(raw_conn) + except Exception as e: + logger.error(f"Delta refresh cycle failed: {e}") + + def _start_refresh_thread(self) -> None: + def loop() -> None: + event = threading.Event() + while not event.wait(self._refresh_interval): + self._run_refresh() + + t = threading.Thread(target=loop, daemon=True, name="sqlmem-delta") + t.start() + logger.debug(f"Delta refresh thread started (interval={self._refresh_interval}s)") + def invalidate(self, table: str) -> None: logger.info(f"Manually invalidating cache for table {table!r}") with self._cache._lock: @@ -45,6 +104,11 @@ class CachingEngine: ) self._cache.connection.commit() + def reset(self) -> None: + """Wipe the whole cache (RAM + cache.db). Use after structural source changes.""" + self._cache.reset() + logger.info("Cache reset — all tables will be reloaded on next use.") + def close(self) -> None: self._cache.close() logger.info("CachingEngine closed.") diff --git a/src/sqlmem/executor.py b/src/sqlmem/executor.py index b0e03a5..629a1fa 100644 --- a/src/sqlmem/executor.py +++ b/src/sqlmem/executor.py @@ -3,6 +3,7 @@ import sqlite3 from loguru import logger from .cache import CacheManager +from .delta import ResolvedDelta from .parser import ParsedQuery from .registry import ColumnRegistry from .stats import StatsCollector @@ -15,11 +16,13 @@ class QueryExecutor: registry: ColumnRegistry, source_conn: sqlite3.Connection, stats: StatsCollector, + delta: dict[str, ResolvedDelta] | None = None, ) -> None: self._cache = cache self._registry = registry self._source_conn = source_conn self._stats = stats + self._delta = delta or {} def execute(self, parsed: ParsedQuery) -> list[dict]: for table in parsed.tables: @@ -46,8 +49,7 @@ class QueryExecutor: self._stats.record_miss() columns = self._cache.discover_columns(table, self._source_conn) - self._cache.load_table(table, columns, self._source_conn, full=True) - self._registry.update(table, columns) + self._load(table, columns, full=True) def _ensure_columns(self, table: str, columns: list[str]) -> None: """Load *table* with at least *columns*, refetching only when columns are missing.""" @@ -69,16 +71,25 @@ class QueryExecutor: self._stats.record_miss() all_columns = list(self._registry.get_columns(table)) + missing - self._cache.load_table(table, all_columns, self._source_conn) - self._registry.update(table, all_columns) + self._load(table, all_columns, full=False) + + def _load(self, table: str, columns: list[str], full: bool) -> None: + """Fetch *table* into cache, adding delta key/timestamp columns when tracked.""" + cfg = self._delta.get(table) + if cfg: + # The cache must always hold the key (to upsert) and the change column + # (to compute the watermark), even if no query referenced them. + columns = list(dict.fromkeys([*columns, *cfg.key_columns, cfg.change_column])) + + self._cache.load_table(table, columns, self._source_conn, full=full) + self._registry.update(table, columns) + + if cfg: + self._cache.create_unique_index(table, cfg.key_columns) + watermark = self._cache.max_value(table, cfg.change_column) + self._cache.set_last_synced_at(table, watermark) def _run_in_memory(self, parsed: ParsedQuery) -> list[dict]: logger.debug(f"Executing in SQLite RAM: {parsed.sqlite_sql!r} params={parsed.params!r}") - conn = self._cache.connection - if parsed.params is None: - cursor = conn.execute(parsed.sqlite_sql) - else: - cursor = conn.execute(parsed.sqlite_sql, parsed.params) - col_names = [desc[0] for desc in cursor.description] - rows = cursor.fetchall() + col_names, rows = self._cache.execute_in_memory(parsed.sqlite_sql, parsed.params) return [dict(zip(col_names, row)) for row in rows] diff --git a/tests/test_coerce.py b/tests/test_coerce.py new file mode 100644 index 0000000..459a708 --- /dev/null +++ b/tests/test_coerce.py @@ -0,0 +1,109 @@ +import datetime +import decimal +import uuid + +import pytest + +from sqlmem._coerce import coerce_params, to_sqlite +from sqlmem.cache import CacheManager + + +class _FakeCursor: + def __init__(self, rows): + self._rows = rows + self.description = None + + def fetchall(self): + return self._rows + + +class FakeSource: + """Stand-in for a pyodbc connection that returns non-sqlite-native types.""" + + def __init__(self, rows): + self._rows = rows + + def execute(self, sql, *args): + return _FakeCursor(self._rows) + + +@pytest.fixture +def cache(tmp_path): + c = CacheManager(db_path=tmp_path / "cache.db", backup_interval=9999) + yield c + c.close() + + +# --- to_sqlite / coerce_params unit tests ----------------------------------- + + +def test_decimal_to_str(): + assert to_sqlite(decimal.Decimal("9.99")) == "9.99" + + +def test_decimal_keeps_precision(): + assert to_sqlite(decimal.Decimal("123456789.123456789")) == "123456789.123456789" + + +def test_datetime_to_iso(): + assert to_sqlite(datetime.datetime(2026, 6, 1, 10, 0, 0)) == "2026-06-01T10:00:00" + + +def test_date_to_iso(): + assert to_sqlite(datetime.date(2026, 6, 1)) == "2026-06-01" + + +def test_time_to_iso(): + assert to_sqlite(datetime.time(10, 30, 0)) == "10:30:00" + + +def test_uuid_to_str(): + u = uuid.uuid4() + assert to_sqlite(u) == str(u) + + +def test_bytearray_to_bytes(): + assert to_sqlite(bytearray(b"abc")) == b"abc" + + +@pytest.mark.parametrize("value", [1, 1.5, "text", None, b"blob", True]) +def test_native_values_pass_through(value): + assert to_sqlite(value) == value + + +def test_coerce_params_tuple(): + assert coerce_params((decimal.Decimal("1.5"), "x")) == ("1.5", "x") + + +def test_coerce_params_dict(): + assert coerce_params({"p": decimal.Decimal("2")}) == {"p": "2"} + + +def test_coerce_params_none(): + assert coerce_params(None) is None + + +# --- integration: values reach the cache through coercion ------------------- + + +def test_load_table_coerces_decimal_and_datetime(cache): + rows = [("1", decimal.Decimal("9.99"), datetime.datetime(2026, 6, 1, 10, 0, 0))] + cache.load_table("t", ["id", "price", "changed"], FakeSource(rows)) + _, out = cache.execute_in_memory("SELECT id, price, changed FROM t") + assert out == [("1", "9.99", "2026-06-01T10:00:00")] + + +def test_decimal_where_param_matches_text_value(cache): + cache.load_table("t", ["price"], FakeSource([("9.99",)])) + _, out = cache.execute_in_memory( + "SELECT price FROM t WHERE price = ?", (decimal.Decimal("9.99"),) + ) + assert out == [("9.99",)] + + +def test_upsert_rows_coerces_decimal(cache): + cache.load_table("t", ["id", "price"], FakeSource([("1", "0")])) + cache.create_unique_index("t", ["id"]) + cache.upsert_rows("t", ["id", "price"], [("1", decimal.Decimal("12.50"))]) + _, out = cache.execute_in_memory("SELECT price FROM t WHERE id = '1'") + assert out == [("12.50",)] diff --git a/tests/test_delta.py b/tests/test_delta.py new file mode 100644 index 0000000..15f3ea8 --- /dev/null +++ b/tests/test_delta.py @@ -0,0 +1,189 @@ +import sqlite3 +from types import SimpleNamespace + +import pytest +from sqlalchemy import create_engine + +import sqlmem.engine as eng_mod +from sqlmem import CachingEngine, DeltaConfig +from sqlmem.cache import CacheManager +from sqlmem.delta import DeltaRefresher, ResolvedDelta +from sqlmem.executor import QueryExecutor +from sqlmem.parser import parse +from sqlmem.registry import ColumnRegistry +from sqlmem.stats import StatsCollector + + +def cached_rows(cache, sql): + cols, rows = cache.execute_in_memory(sql) + return [dict(zip(cols, row)) for row in rows] + + +# --------------------------------------------------------------------------- +# Refresher unit tests (in-memory source connection) +# --------------------------------------------------------------------------- + +@pytest.fixture +def source_conn(): + conn = sqlite3.connect(":memory:") + conn.executescript( + """ + CREATE TABLE products (id TEXT PRIMARY KEY, name TEXT, price TEXT, changed TEXT); + INSERT INTO products VALUES + ('1', 'Widget', '9.99', '2026-06-01 10:00:00'), + ('2', 'Gadget', '19.99', '2026-06-01 10:05:00'); + """ + ) + conn.commit() + yield conn + conn.close() + + +@pytest.fixture +def env(tmp_path, source_conn): + cache = CacheManager(db_path=tmp_path / "cache.db", backup_interval=9999) + registry = ColumnRegistry(cache.connection) + stats = StatsCollector() + delta = {"products": ResolvedDelta(change_column="changed", key_columns=["id"])} + executor = QueryExecutor(cache, registry, source_conn, stats, delta) + refresher = DeltaRefresher(cache, delta) + # Initial load — caches id, name, price (+ augmented key/change columns). + executor.execute(parse("SELECT id, name, price FROM products")) + yield SimpleNamespace(cache=cache, source=source_conn, refresher=refresher) + cache.close() + + +def test_load_augments_key_and_change_columns(env): + cols = env.cache.get_table_columns("products") + assert {"id", "name", "price", "changed"}.issubset(set(cols)) + + +def test_initial_watermark_is_max_change(env): + assert env.cache.get_last_synced_at("products") == "2026-06-01 10:05:00" + + +def test_refresh_applies_updates(env): + env.source.execute( + "UPDATE products SET price = '7.77', changed = '2026-06-01 10:10:00' WHERE id = '1'" + ) + env.source.commit() + env.refresher.refresh(env.source) + + rows = {r["id"]: r for r in cached_rows(env.cache, "SELECT id, price FROM products")} + assert rows["1"]["price"] == "7.77" + assert env.cache.get_last_synced_at("products") == "2026-06-01 10:10:00" + + +def test_refresh_inserts_new_rows(env): + env.source.execute( + "INSERT INTO products VALUES ('3', 'Sprocket', '5.00', '2026-06-01 10:20:00')" + ) + env.source.commit() + env.refresher.refresh(env.source) + + ids = {r["id"] for r in cached_rows(env.cache, "SELECT id FROM products")} + assert ids == {"1", "2", "3"} + + +def test_boundary_timestamp_not_missed_and_idempotent(env): + # New row sharing the exact watermark timestamp must still be picked up (>=), + # and the row already at that timestamp must not be duplicated. + env.source.execute( + "INSERT INTO products VALUES ('3', 'Sprocket', '5.00', '2026-06-01 10:05:00')" + ) + env.source.commit() + env.refresher.refresh(env.source) + env.refresher.refresh(env.source) # idempotent — running twice changes nothing + + rows = cached_rows(env.cache, "SELECT id FROM products") + assert sorted(r["id"] for r in rows) == ["1", "2", "3"] + + +def test_delete_by_nulling(env): + env.source.execute( + "UPDATE products SET name = NULL, changed = '2026-06-01 10:30:00' WHERE id = '1'" + ) + env.source.commit() + env.refresher.refresh(env.source) + + rows = {r["id"]: r for r in cached_rows(env.cache, "SELECT id, name FROM products")} + assert rows["1"]["name"] is None + + +def test_refresh_without_changes_is_noop(env): + before = cached_rows(env.cache, "SELECT id, name, price FROM products") + env.refresher.refresh(env.source) + after = cached_rows(env.cache, "SELECT id, name, price FROM products") + assert before == after + + +# --------------------------------------------------------------------------- +# Engine-level: PK auto-discovery, reset, end-to-end refresh +# --------------------------------------------------------------------------- + +@pytest.fixture +def source_db(tmp_path): + db_path = tmp_path / "source.db" + conn = sqlite3.connect(db_path) + conn.executescript( + """ + CREATE TABLE products (id TEXT PRIMARY KEY, name TEXT, changed TEXT); + INSERT INTO products VALUES ('1', 'Widget', '2026-06-01 10:00:00'); + CREATE VIEW vw_products AS SELECT id, name FROM products; + """ + ) + conn.commit() + conn.close() + return db_path + + +@pytest.fixture +def source_engine(source_db): + engine = create_engine(f"sqlite:///{source_db}") + yield engine + engine.dispose() + + +@pytest.fixture +def patched_cache(tmp_path, monkeypatch): + monkeypatch.setattr(eng_mod, "CACHE_DB_PATH", tmp_path / "cache.db") + monkeypatch.setattr(eng_mod, "BACKUP_INTERVAL_SECONDS", 9999) + + +def test_pk_auto_discovery(source_engine, patched_cache): + engine = CachingEngine(source_engine, delta={"products": DeltaConfig(change_column="changed")}) + assert engine._delta["products"].key_columns == ["id"] + engine.close() + + +def test_view_without_key_raises(source_engine, patched_cache): + with pytest.raises(ValueError): + CachingEngine(source_engine, delta={"vw_products": DeltaConfig(change_column="name")}) + + +def test_engine_reset(source_engine, patched_cache): + engine = CachingEngine(source_engine) + engine.execute("SELECT id, name FROM products") + assert engine._cache.is_table_cached("products") is True + engine.reset() + assert engine._cache.is_table_cached("products") is False + engine.close() + + +def test_engine_delta_refresh_end_to_end(source_engine, source_db, patched_cache): + engine = CachingEngine( + source_engine, delta={"products": DeltaConfig(change_column="changed", key_columns=["id"])} + ) + engine.execute("SELECT id, name FROM products") # caches, watermark = 10:00 + + conn = sqlite3.connect(source_db) + conn.execute("UPDATE products SET name = 'Widget2', changed = '2026-06-01 10:06:00' WHERE id = '1'") + conn.execute("INSERT INTO products VALUES ('2', 'Gadget', '2026-06-01 10:05:00')") + conn.commit() + conn.close() + + engine.refresh() + rows = {r["id"]: r for r in engine.execute("SELECT id, name FROM products")} + assert rows["1"]["name"] == "Widget2" + assert rows["2"]["name"] == "Gadget" + engine.close()