diff --git a/CHANGELOG.md b/CHANGELOG.md index 98aaac9..eaa6e35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,25 @@ All notable changes to this project will be documented in this file. --- +## [1.14.0] - 2026-06-10 + +Follow-up to 1.12.0 from running `datetime_columns` in production: the feature was only half-wired (writes were coerced, reads and query params were not). + +### Fixed +- **`WHERE` on an INTEGER-µs `datetime_columns` column silently returned 0 rows** — `execute_in_memory()` coerced query params with `to_sqlite()`, which leaves an ISO string a string. Comparing the stored `INTEGER` against a `TEXT` param is always false under SQLite affinity, so `WHERE CHANGE_DATE > '2026-05-01T…'` matched nothing. Params for a query that touches a `datetime_columns` table are now coerced to epoch µs (datetime objects and ISO-datetime strings alike), so the comparison matches the stored integer. Scoped to the query's tables, so non-datetime queries are unaffected. + +### Added +- **Read-time coercion — `datetime_columns` come back as `datetime`** — `execute()` now returns those columns as real `datetime` objects (UTC) instead of the raw INTEGER µs, restoring the transparent-proxy contract (you get the same type a direct source query would give). Opt out with `CachingEngine(..., return_datetime=False)` to get the raw integers. +- **`Stats.db_size_bytes`** — on-disk size of the cache file (0 in memory mode), so `engine.stats` exposes cache growth for monitoring without an external file check. +- **Public `datetime_to_epoch_us` helper** — `from sqlmem import datetime_to_epoch_us` exposes the same datetime→epoch-µs conversion used internally, so callers building `WHERE change_col > ?` params don't have to re-implement it. + +### Changed +- `pyproject.toml` — bumped version to `1.14.0`. +- **`vacuum(incremental=True)` now warns instead of silently doing nothing** when the cache was not created with `auto_vacuum=INCREMENTAL` (the only mode in which incremental vacuum can reclaim pages); it logs how to fix it (`hard_reset()` with the pragma, or a full `vacuum(incremental=False)`) and returns. +- `CacheManager.execute_in_memory()` gained an optional `tables` argument (the query's tables) used to scope datetime param/result coercion; `CacheManager`/`CachingEngine` gained a `return_datetime` flag. + +--- + ## [1.12.0] - 2026-06-09 ### ⚠️ Breaking diff --git a/README.md b/README.md index 6e8f094..b236ac2 100644 --- a/README.md +++ b/README.md @@ -297,10 +297,17 @@ engine = CachingEngine( ``` - **Opt-in per column.** Only the columns you name change; everything else keeps the default lossless `TEXT` storage. -- ⚠️ **It changes the output contract for those columns** — `execute()` returns them as `int` (µs since epoch), not ISO strings, and a `WHERE` on them must compare against integer µs. Don't list a column your callers read as a string. +- **Transparent in and out.** A `WHERE` on such a column accepts a `datetime` or an ISO string — the param is coerced to integer µs so the comparison matches — and `execute()` returns the column as a real `datetime` (UTC), the same type a direct source query would give. Pass `return_datetime=False` to get the raw integers instead. - The delta watermark is handled transparently: it is persisted as the integer and bound back to a real `datetime` for the source query, so incremental refresh keeps working. - ⚠️ This is a **breaking on-disk change** (`SCHEMA_VERSION` 4): an existing cache is wiped and reloaded on first start after enabling it — schedule a maintenance window for a large reload. +To build a `WHERE` param yourself (e.g. an HTTP `?since=` filter) without re-implementing the conversion, use the exported helper: + +```python +from sqlmem import datetime_to_epoch_us +rows = engine.execute("SELECT * FROM events WHERE changed > ?", (datetime_to_epoch_us(since),)) +``` + ## Manual cache control ```python @@ -316,20 +323,20 @@ Use `reset()` after a **structural change** in the source (columns added/removed `hard_reset()` goes further than `reset()` in disk mode: it closes every connection, deletes `cache.db` (and its `-wal`/`-shm` sidecars) and reopens from scratch — the only way to change a baked-in `page_size`/`auto_vacuum`. In memory mode it falls back to `reset()`. -`vacuum()` reclaims free pages left behind by delta `INSERT OR REPLACE` churn. Incremental (the default) is cheap and non-blocking but needs `auto_vacuum=INCREMENTAL`; `vacuum(incremental=False)` runs a full VACUUM that rewrites the file (~2× disk, blocks readers) — schedule it in a maintenance window. Both are no-ops in memory mode. +`vacuum()` reclaims free pages left behind by delta `INSERT OR REPLACE` churn. Incremental (the default) is cheap and non-blocking but needs `auto_vacuum=INCREMENTAL` (set it via `pragmas=` on a fresh cache); if the cache wasn't created that way, `vacuum(incremental=True)` logs a warning and does nothing rather than silently no-op'ing. `vacuum(incremental=False)` runs a full VACUUM that rewrites the file (~2× disk, blocks readers) — schedule it in a maintenance window. Both are no-ops in memory mode. ## Runtime statistics ```python stats = engine.stats # Stats snapshot -print(stats.hits, stats.misses, stats.refetches, stats.errors) +print(stats.hits, stats.misses, stats.refetches, stats.errors, stats.db_size_bytes) for name, t in stats.tables.items(): print(name, t.rows, t.state, t.tracking, t.last_upsert, t.last_refresh) if t.consecutive_failures: print(f" {name} failing ×{t.consecutive_failures}: {t.last_error} ({t.last_error_at})") ``` -`Stats.errors` is the total number of load/refresh failures since start. Each `TableStats` also carries `last_error`, `last_error_at` and `consecutive_failures` (reset to 0 on the next success) — so a delta that fails *before* streaming (which otherwise leaves `state` looking `ready`) is still visible, and the table is marked `error`. +`Stats.db_size_bytes` is the on-disk cache file size (0 in memory mode) — handy for monitoring cache growth. `Stats.errors` is the total number of load/refresh failures since start. Each `TableStats` also carries `last_error`, `last_error_at` and `consecutive_failures` (reset to 0 on the next success) — so a delta that fails *before* streaming (which otherwise leaves `state` looking `ready`) is still visible, and the table is marked `error`. Two timestamps distinguish *data freshness* from *liveness*: @@ -392,6 +399,7 @@ engine = CachingEngine( dialect="tsql", # SQLMEM_SQL_DIALECT pragmas={"mmap_size": 32 * 1024**3, "page_size": 8192}, # disk-mode SQLite tuning datetime_columns={"orders": ["created_at"]}, # store these as INTEGER µs (opt-in) + return_datetime=True, # return datetime_columns as datetime (vs raw µs int) blocking_startup_refresh=False, # block startup until caught up? (default: no) ) ``` diff --git a/project.md b/project.md index 561bb13..7f98a4b 100644 --- a/project.md +++ b/project.md @@ -218,7 +218,12 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o - [x] **Ladění SQLite vrstvy (`pragmas=`)**: `CachingEngine(..., pragmas={...})` aplikuje libovolné PRAGMA na cache spojení (`mmap_size`, `cache_size`, `temp_store`, `page_size`, `auto_vacuum`). `page_size` a `auto_vacuum` jsou layout-pragmata — platí jen na čerstvém souboru (page_size na existující cache se ignoruje s warningem). Neznámá pragmata SQLite tiše ignoruje. - [x] **`hard_reset()`**: smaže on-disk soubor (+ WAL/SHM) a otevře nový s aktuálními pragmaty — na rozdíl od `reset()` umožní změnit `page_size`/`auto_vacuum`. Jen disk mód (v memory módu fallback na `reset()`). - [x] **`vacuum(incremental=, pages=)`**: údržbový VACUUM cache souboru — inkrementální (uvolní volné stránky po delta `INSERT OR REPLACE`, vyžaduje `auto_vacuum=INCREMENTAL`) nebo plný (přepíše soubor, jen v maintenance okně). V memory módu no-op. -- [x] **Nativní INTEGER ukládání datetime sloupců (`datetime_columns=`)**: `datetime_columns={"VW_X": ["CHANGE_DATE"]}` — vyjmenované datetime sloupce se ukládají jako INTEGER µs-od-epochy místo ~28 B ISO TEXT (úspora místa + nativní celočíselné porovnání indexu). Opt-in per sloupec → mění výstupní kontrakt jen u zvolených sloupců (vrací int, ne ISO string). Breaking: `SCHEMA_VERSION` 3→4, cache se při upgrade smaže a načte znovu. Watermark se persistuje jako int a `_bind_watermark(epoch_us=True)` ho rekonstruuje zpět na `datetime` pro zdroj. +- [x] **Nativní INTEGER ukládání datetime sloupců (`datetime_columns=`)**: `datetime_columns={"VW_X": ["CHANGE_DATE"]}` — vyjmenované datetime sloupce se ukládají jako INTEGER µs-od-epochy místo ~28 B ISO TEXT (úspora místa + nativní celočíselné porovnání indexu). Opt-in per sloupec. Breaking: `SCHEMA_VERSION` 3→4, cache se při upgrade smaže a načte znovu. Watermark se persistuje jako int a `_bind_watermark(epoch_us=True)` ho rekonstruuje zpět na `datetime` pro zdroj. + - **Param coercion**: `WHERE col > ?` s ISO/`datetime` parametrem se zkoercuje na epoch µs (scoped na tabulky dotazu), takže porovnání INTEGER sloupce sedí (dřív vracelo 0 řádků). + - **Read-time coercion**: čtení vrací `datetime` objekt místo raw int (transparentní proxy); opt-out `CachingEngine(..., return_datetime=False)`. + - Veřejný helper `from sqlmem import datetime_to_epoch_us` pro konstrukci parametrů bez duplicitní logiky. +- [x] **`vacuum(incremental=True)` varuje bez `auto_vacuum=INCREMENTAL`**: dřív tichý no-op; teď zaloguje warning (a jak to opravit) a vrátí se. +- [x] **`Stats.db_size_bytes`**: velikost cache souboru na disku (0 v memory módu) ve `stats` pro monitoring. ## TODO — budoucí funkce diff --git a/pyproject.toml b/pyproject.toml index 65cbb3f..f2d007b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sqlmem" -version = "1.12.0" +version = "1.14.0" description = "" authors = [ {name = "jan.doubravsky@gmail.com"} diff --git a/src/sqlmem/__init__.py b/src/sqlmem/__init__.py index 419b762..54e8dd8 100644 --- a/src/sqlmem/__init__.py +++ b/src/sqlmem/__init__.py @@ -3,6 +3,7 @@ from typing import Any from loguru import logger +from ._coerce import to_sqlite_datetime as datetime_to_epoch_us from .config import DEBUG from .delta import DeltaConfig from .engine import CachingEngine @@ -63,4 +64,5 @@ __all__ = [ "Stats", "TableStats", "add_sink", + "datetime_to_epoch_us", ] diff --git a/src/sqlmem/_coerce.py b/src/sqlmem/_coerce.py index 926323e..5b5b96e 100644 --- a/src/sqlmem/_coerce.py +++ b/src/sqlmem/_coerce.py @@ -10,6 +10,7 @@ left untouched. import datetime import decimal +import re import uuid from typing import Any @@ -17,6 +18,10 @@ Params = tuple | list | dict | None _EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) +# A string that *starts* with an ISO date+time (``2026-05-01T00:00:00`` or +# space-separated). Used to spot a datetime passed as a string in a query param. +_ISO_DATETIME_RE = re.compile(r"^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}") + def to_sqlite(value: Any) -> Any: if isinstance(value, decimal.Decimal): @@ -53,13 +58,62 @@ def to_sqlite_datetime(value: Any) -> int | None: return None +def from_sqlite_datetime(value: Any) -> Any: + """Inverse of :func:`to_sqlite_datetime`: INTEGER µs-since-epoch → UTC datetime. + + Non-integers (a ``NULL`` value, or a column that isn't datetime-typed) pass + through unchanged. + """ + if isinstance(value, bool) or not isinstance(value, int): + return value + return _EPOCH + datetime.timedelta(microseconds=value) + + def coerce_row(row: tuple) -> tuple: return tuple(to_sqlite(v) for v in row) -def coerce_params(params: Params) -> tuple | dict | None: +def _coerce_param(value: Any, dt_table: bool) -> Any: + """Coerce a single query parameter. + + When the query touches a table that stores datetime columns as INTEGER µs + (*dt_table*), a datetime object or an ISO-datetime string is converted to + epoch µs so a ``WHERE`` comparison matches the stored INTEGER instead of + comparing INTEGER against TEXT (which SQLite affinity makes always false). + Otherwise the default stringifying coercion applies, unchanged. + """ + if dt_table and ( + isinstance(value, datetime.datetime) + or (isinstance(value, str) and _ISO_DATETIME_RE.match(value)) + ): + result = to_sqlite_datetime(value) + if result is not None: + return result + return to_sqlite(value) + + +def coerce_params(params: Params, dt_table: bool = False) -> tuple | dict | None: if params is None: return None if isinstance(params, dict): - return {key: to_sqlite(val) for key, val in params.items()} - return tuple(to_sqlite(val) for val in params) + return {key: _coerce_param(val, dt_table) for key, val in params.items()} + return tuple(_coerce_param(val, dt_table) for val in params) + + +def reverse_coerce_rows( + rows: list[tuple], col_names: list[str], dt_cols: set[str] +) -> list[tuple]: + """Turn INTEGER µs back into ``datetime`` for result columns in *dt_cols*. + + A no-op when no result column is a datetime column, so non-datetime queries + pay nothing. + """ + if not dt_cols: + return rows + dt_idx = {i for i, c in enumerate(col_names) if c in dt_cols} + if not dt_idx: + return rows + return [ + tuple(from_sqlite_datetime(v) if i in dt_idx else v for i, v in enumerate(row)) + for row in rows + ] diff --git a/src/sqlmem/cache.py b/src/sqlmem/cache.py index a5e2c6f..366cdf0 100644 --- a/src/sqlmem/cache.py +++ b/src/sqlmem/cache.py @@ -9,7 +9,13 @@ from pathlib import Path from loguru import logger import sqlmem._meta as _meta -from ._coerce import coerce_params, coerce_row, to_sqlite, to_sqlite_datetime +from ._coerce import ( + coerce_params, + coerce_row, + reverse_coerce_rows, + to_sqlite, + to_sqlite_datetime, +) from ._sql import quote, quote_list, quote_source from .config import FETCH_BATCH_SIZE, SQL_DIALECT from .stats import TableState @@ -42,6 +48,7 @@ class CacheManager: fetch_batch: int = FETCH_BATCH_SIZE, pragmas: dict[str, str | int] | None = None, datetime_columns: dict[str, list[str]] | None = None, + return_datetime: bool = True, ) -> None: self._db_path = db_path self._backup_interval = backup_interval @@ -51,6 +58,7 @@ class CacheManager: self._pragmas = dict(pragmas or {}) # extra read/layout PRAGMAs (disk mode) # table → columns stored as INTEGER µs-since-epoch instead of ISO TEXT self._datetime_columns = {t: list(c) for t, c in (datetime_columns or {}).items()} + self._return_datetime = return_datetime # reverse-coerce reads back to datetime self._lock = threading.Lock() # serializes connection access self._load_lock = threading.Lock() # serializes full table loads self._states: dict[str, str] = {} # table → live processing state @@ -498,16 +506,38 @@ class CacheManager: self._read_conns.append(conn) return conn + def _query_datetime_cols(self, tables: list[str] | None) -> set[str]: + """Datetime columns (stored as INTEGER µs) belonging to *tables*. + + Empty when no table is known/configured, so a query that touches no + datetime column pays nothing and behaves exactly as before. + """ + if not self._datetime_columns or not tables: + return set() + cols: set[str] = set() + for table in tables: + cols.update(self._datetime_columns.get(table, ())) + return cols + def execute_in_memory( - self, sql: str, params: tuple | list | dict | None = None + self, + sql: str, + params: tuple | list | dict | None = None, + tables: list[str] | None = None, ) -> tuple[list[str], list[tuple]]: """Run a read query against the cache. In-memory mode serializes with writers on the single connection. Disk mode reads from a per-thread WAL connection, so reads run concurrently with writers and each other (see :meth:`_read_conn`). + + When *tables* names a table with ``datetime_columns``, ISO/datetime query + params are coerced to epoch µs so a ``WHERE`` matches the stored INTEGER, + and (unless ``return_datetime=False``) those columns are returned as real + :class:`~datetime.datetime` objects rather than raw integers. """ - bound = coerce_params(params) + dt_cols = self._query_datetime_cols(tables) + bound = coerce_params(params, dt_table=bool(dt_cols)) if self._in_memory: with self._lock: cursor = ( @@ -517,12 +547,14 @@ class CacheManager: ) col_names = [desc[0] for desc in cursor.description] rows = cursor.fetchall() - return col_names, rows + else: + conn = self._read_conn() + cursor = conn.execute(sql) if bound is None else conn.execute(sql, bound) + col_names = [desc[0] for desc in cursor.description] + rows = cursor.fetchall() - conn = self._read_conn() - cursor = conn.execute(sql) if bound is None else conn.execute(sql, bound) - col_names = [desc[0] for desc in cursor.description] - rows = cursor.fetchall() + if self._return_datetime and dt_cols: + rows = reverse_coerce_rows(rows, col_names, dt_cols) return col_names, rows # --- delta refresh support --------------------------------------------- @@ -585,6 +617,15 @@ class CacheManager: row = self._conn.execute(f"SELECT COUNT(*) FROM {quote(table)}").fetchone() return int(row[0]) if row else 0 + def db_size_bytes(self) -> int: + """On-disk size of the cache file in bytes (0 in memory mode / if absent).""" + if self._in_memory: + return 0 + try: + return self._db_path.stat().st_size + except OSError: + return 0 + def reset(self) -> None: """Wipe the entire cache — every cached table plus the on-disk data (the file is deleted in memory mode, VACUUMed in place in disk mode).""" @@ -676,6 +717,15 @@ class CacheManager: logger.debug("vacuum() called in memory mode — no-op.") return if incremental: + av = self._conn.execute("PRAGMA auto_vacuum").fetchone()[0] + if av != 2: # 0 = NONE, 1 = FULL, 2 = INCREMENTAL + logger.warning( + f"vacuum(incremental=True) called but auto_vacuum={av} (not " + "INCREMENTAL) — no pages will be reclaimed. Rebuild the cache " + "with pragmas={'auto_vacuum': 'INCREMENTAL'} via hard_reset(), " + "or run vacuum(incremental=False) for a full VACUUM." + ) + return with self._lock: self._conn.execute(f"PRAGMA incremental_vacuum({pages})") self._conn.commit() diff --git a/src/sqlmem/engine.py b/src/sqlmem/engine.py index 436d21c..5ded193 100644 --- a/src/sqlmem/engine.py +++ b/src/sqlmem/engine.py @@ -67,6 +67,7 @@ class CachingEngine: dialect: str | None = None, pragmas: dict[str, str | int] | None = None, datetime_columns: dict[str, list[str]] | None = None, + return_datetime: bool = True, blocking_startup_refresh: bool = False, ) -> None: self._source_engine = source_engine @@ -83,6 +84,7 @@ class CachingEngine: fetch_batch=fetch_batch if fetch_batch is not None else FETCH_BATCH_SIZE, pragmas=pragmas, datetime_columns=datetime_columns, + return_datetime=return_datetime, ) self._registry = ColumnRegistry(self._cache.connection) self._stats = StatsCollector() @@ -152,7 +154,11 @@ class CachingEngine: last_runs = self._cache.get_last_runs() with self._cache._lock: base = self._stats.snapshot(self._cache.connection, states) - base = replace(base, errors=self._cache.error_total) + base = replace( + base, + errors=self._cache.error_total, + db_size_bytes=self._cache.db_size_bytes(), + ) return replace( base, tables={n: self._enrich(n, t, errors, last_runs) for n, t in base.tables.items()}, diff --git a/src/sqlmem/executor.py b/src/sqlmem/executor.py index f8ee84d..2bde0d7 100644 --- a/src/sqlmem/executor.py +++ b/src/sqlmem/executor.py @@ -118,5 +118,7 @@ class QueryExecutor: def _run_in_memory(self, parsed: ParsedQuery) -> list[dict]: logger.debug(f"Executing in SQLite RAM: {parsed.sqlite_sql!r} params={parsed.params!r}") - col_names, rows = self._cache.execute_in_memory(parsed.sqlite_sql, parsed.params) + col_names, rows = self._cache.execute_in_memory( + parsed.sqlite_sql, parsed.params, parsed.tables + ) return [dict(zip(col_names, row)) for row in rows] diff --git a/src/sqlmem/stats.py b/src/sqlmem/stats.py index 121f3a4..b45a413 100644 --- a/src/sqlmem/stats.py +++ b/src/sqlmem/stats.py @@ -40,6 +40,7 @@ class Stats: refetches: int tables: dict[str, TableStats] errors: int = 0 # total load/refresh failures since start + db_size_bytes: int = 0 # on-disk cache file size (0 in memory mode) class StatsCollector: diff --git a/tests/test_cache.py b/tests/test_cache.py index 40ce442..0101650 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -288,3 +288,21 @@ def test_vacuum_in_memory_is_noop(cache, source_conn): cache.load_table("users", ["name"], source_conn) cache.vacuum(incremental=False) # no-op, no error assert cache.is_table_cached("users") is True + + +def test_incremental_vacuum_warns_without_incremental_auto_vacuum(tmp_path, source_conn): + """Incremental vacuum on a DB that isn't auto_vacuum=INCREMENTAL warns and skips.""" + from loguru import logger + + messages: list[str] = [] + sink_id = logger.add(messages.append, level="WARNING", filter="sqlmem") + logger.enable("sqlmem") + try: + c = CacheManager(db_path=tmp_path / "c.db", backup_interval=9999, in_memory=False) + c.load_table("users", ["name"], source_conn) + c.vacuum(incremental=True) # auto_vacuum defaults to NONE → no-op + warning + c.close() + finally: + logger.remove(sink_id) + logger.disable("sqlmem") + assert any("auto_vacuum" in m for m in messages) diff --git a/tests/test_coerce.py b/tests/test_coerce.py index 0923076..9203863 100644 --- a/tests/test_coerce.py +++ b/tests/test_coerce.py @@ -4,9 +4,17 @@ import uuid import pytest -from sqlmem._coerce import coerce_params, to_sqlite, to_sqlite_datetime +from sqlmem._coerce import ( + coerce_params, + from_sqlite_datetime, + reverse_coerce_rows, + to_sqlite, + to_sqlite_datetime, +) from sqlmem.cache import CacheManager +_UTC = datetime.timezone.utc + class _FakeCursor: def __init__(self, rows): @@ -165,6 +173,128 @@ def test_non_datetime_columns_unaffected_by_datetime_columns(tmp_path): c.close() +# --- param coercion for datetime_columns (A) -------------------------------- + + +def test_coerce_params_dt_table_iso_string_to_epoch(): + p = coerce_params(("2026-06-01T10:00:00",), dt_table=True) + assert p == (to_sqlite_datetime("2026-06-01T10:00:00"),) + + +def test_coerce_params_dt_table_datetime_to_epoch(): + dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC) + assert coerce_params((dt,), dt_table=True) == (to_sqlite_datetime(dt),) + + +def test_coerce_params_dt_table_false_keeps_iso_string(): + # No datetime table in the query → behaviour unchanged (string stays a string). + assert coerce_params(("2026-06-01T10:00:00",), dt_table=False) == ( + "2026-06-01T10:00:00", + ) + + +def test_coerce_params_dt_table_leaves_non_datetime_values(): + assert coerce_params(("hello", 5, None), dt_table=True) == ("hello", 5, None) + + +def test_where_on_datetime_column_matches_with_iso_param(tmp_path): + """The critical fix: a WHERE on an INTEGER-µs column with an ISO string param + must match instead of comparing INTEGER against TEXT (always 0 rows).""" + c = CacheManager( + db_path=tmp_path / "cache.db", + backup_interval=9999, + datetime_columns={"t": ["changed"]}, + return_datetime=False, + ) + rows = [ + ("1", datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)), + ("2", datetime.datetime(2026, 6, 3, 10, 0, 0, tzinfo=_UTC)), + ] + c.load_table("t", ["id", "changed"], FakeSource(rows)) + _, out = c.execute_in_memory( + "SELECT id FROM t WHERE changed > ?", ("2026-06-02T00:00:00",), ["t"] + ) + assert [r[0] for r in out] == ["2"] + c.close() + + +def test_where_on_datetime_column_without_table_scope_is_unchanged(tmp_path): + """Without table scope the param isn't coerced — proves the fix is scoped.""" + c = CacheManager( + db_path=tmp_path / "cache.db", + backup_interval=9999, + datetime_columns={"t": ["changed"]}, + return_datetime=False, + ) + c.load_table( + "t", + ["id", "changed"], + FakeSource([("1", datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC))]), + ) + # No `tables` arg → INTEGER vs TEXT comparison → no match (legacy behaviour). + _, out = c.execute_in_memory("SELECT id FROM t WHERE changed > ?", ("2026-01-01T00:00:00",)) + assert out == [] + c.close() + + +# --- reverse coercion: read back as datetime (B) ---------------------------- + + +def test_from_sqlite_datetime_roundtrip(): + dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC) + assert from_sqlite_datetime(to_sqlite_datetime(dt)) == dt + + +def test_from_sqlite_datetime_passes_non_int(): + assert from_sqlite_datetime("x") == "x" + assert from_sqlite_datetime(None) is None + assert from_sqlite_datetime(True) is True # bool is not treated as µs + + +def test_reverse_coerce_rows_only_named_columns(): + us = to_sqlite_datetime(datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC)) + out = reverse_coerce_rows([("1", us)], ["id", "changed"], {"changed"}) + assert out[0][0] == "1" + assert out[0][1] == datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC) + + +def test_read_returns_datetime_by_default(tmp_path): + c = CacheManager( + db_path=tmp_path / "cache.db", + backup_interval=9999, + datetime_columns={"t": ["changed"]}, + ) + dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC) + c.load_table("t", ["id", "changed"], FakeSource([("1", dt)])) + _, out = c.execute_in_memory("SELECT id, changed FROM t", None, ["t"]) + assert out == [("1", dt)] # returned as a datetime, not the raw int + c.close() + + +def test_return_datetime_false_keeps_raw_int(tmp_path): + c = CacheManager( + db_path=tmp_path / "cache.db", + backup_interval=9999, + datetime_columns={"t": ["changed"]}, + return_datetime=False, + ) + dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC) + c.load_table("t", ["id", "changed"], FakeSource([("1", dt)])) + _, out = c.execute_in_memory("SELECT changed FROM t", None, ["t"]) + assert out == [(to_sqlite_datetime(dt),)] # raw INTEGER µs + c.close() + + +# --- public export (F) ------------------------------------------------------ + + +def test_datetime_to_epoch_us_is_public(): + from sqlmem import datetime_to_epoch_us + + dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=_UTC) + assert datetime_to_epoch_us(dt) == to_sqlite_datetime(dt) + + # --- integration: values reach the cache through coercion ------------------- diff --git a/tests/test_engine.py b/tests/test_engine.py index a2df15b..5f46199 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -421,3 +421,71 @@ def test_engine_vacuum_runs(source_engine, tmp_path): ce.vacuum(incremental=False) # must not raise assert ce._cache.is_table_cached("products") is True ce.close() + + +# --------------------------------------------------------------------------- +# datetime_columns end-to-end: param coercion (A) + read-back datetime (B) +# --------------------------------------------------------------------------- + +@pytest.fixture +def events_engine(tmp_path): + src = tmp_path / "events.db" + conn = sqlite3.connect(src) + conn.execute("CREATE TABLE events (id TEXT, changed TEXT)") + conn.executemany( + "INSERT INTO events VALUES (?, ?)", + [("1", "2026-06-01T10:00:00"), ("2", "2026-06-03T10:00:00")], + ) + conn.commit() + conn.close() + se = create_engine(f"sqlite:///{src}") + yield se + se.dispose() + + +def test_datetime_column_where_and_readback(events_engine, tmp_path): + from datetime import datetime, timezone + + ce = CachingEngine( + events_engine, + cache_db_path=tmp_path / "cache.db", + in_memory=False, + datetime_columns={"events": ["changed"]}, + ) + # A: WHERE on the INTEGER-µs column with an ISO string param returns the right row. + rows = ce.execute( + "SELECT id, changed FROM events WHERE changed > ?", ("2026-06-02T00:00:00",) + ) + assert [r["id"] for r in rows] == ["2"] + # B: the column comes back as a datetime, not a raw integer. + assert rows[0]["changed"] == datetime(2026, 6, 3, 10, 0, 0, tzinfo=timezone.utc) + ce.close() + + +def test_datetime_column_return_datetime_false(events_engine, tmp_path): + ce = CachingEngine( + events_engine, + cache_db_path=tmp_path / "cache.db", + in_memory=False, + datetime_columns={"events": ["changed"]}, + return_datetime=False, + ) + rows = ce.execute("SELECT id, changed FROM events") + assert all(isinstance(r["changed"], int) for r in rows) # opt-out → raw µs + ce.close() + + +# --------------------------------------------------------------------------- +# db_size_bytes in stats (D) +# --------------------------------------------------------------------------- + +def test_stats_reports_db_size_in_disk_mode(source_engine, tmp_path): + ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False) + ce.execute("SELECT id FROM products") + assert ce.stats.db_size_bytes > 0 + ce.close() + + +def test_stats_db_size_zero_in_memory(engine): + engine.execute("SELECT id, name FROM products") + assert engine.stats.db_size_bytes == 0