diff --git a/.gitignore b/.gitignore index ce79316..5fbd3f3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,6 @@ # Python __pycache__/ *.py[cod] -*.pyo -*.pyd *.egg *.egg-info/ dist/ @@ -40,9 +38,7 @@ Thumbs.db .env.* # sqlmem cache (incl. WAL sidecars from disk-backed mode) -cache.db -cache.db-wal -cache.db-shm +cache.db* # Agents AGENTS.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 874ec8e..98aaac9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,24 @@ All notable changes to this project will be documented in this file. --- +## [1.12.0] - 2026-06-09 + +### ⚠️ Breaking +- **`SCHEMA_VERSION` bumped `3` → `4`** — on upgrade the existing cache is wiped automatically (disk mode wipes the file in place, in-memory discards the backup) and reloaded from the source on next use. For a large cache (e.g. a multi-hundred-million-row table) the full reload can take a while; deploy in a maintenance window. +- **`datetime_columns` change the public output contract for the chosen columns** — a column listed in `datetime_columns` is stored and returned as an **INTEGER (microseconds since the Unix epoch, UTC)**, not an ISO `TEXT` string. This is opt-in per column, so no table is affected unless you name its columns; consumers that read or filter such a column must adapt (compare against integer µs, or convert on read). + +### Added +- **`datetime_columns=` parameter on `CachingEngine` / `CacheManager`** — `datetime_columns={"VW_X": ["CHANGE_DATE"]}` stores the named datetime columns as INTEGER µs-since-epoch instead of ~28-byte ISO `TEXT`. Saves ~20 bytes per row and makes index comparisons on the column operate on native integers instead of string collation — worthwhile for a pure datetime column on a very large table (e.g. a delta change column that is also range-scanned). + - `_coerce.to_sqlite_datetime()` converts datetimes (and ISO/`date` values) to exact integer microseconds via integer arithmetic (no float rounding); a naive datetime is treated as UTC, `None` passes through. + - `load_table` declares those columns `INTEGER` and `upsert_rows` coerces them the same way, so full loads and delta upserts agree on the on-disk representation. + - The delta high-watermark for such a column is the stored integer; `delta._bind_watermark(..., epoch_us=True)` reconstructs a real UTC `datetime` before binding, so the source still receives a typed timestamp (and the watermark fix from 1.8.0 keeps holding). + +### Changed +- `pyproject.toml` — bumped version to `1.12.0`. +- `CacheManager.max_value` / `set_last_synced_at` now accept/return `int` watermarks alongside `str` (the INTEGER-µs watermark round-trips through the `last_synced_at` TEXT column as its digit string). + +--- + ## [1.11.0] - 2026-06-09 ### Added diff --git a/README.md b/README.md index 9a761d0..6e8f094 100644 --- a/README.md +++ b/README.md @@ -284,6 +284,23 @@ engine = CachingEngine( - Every entry is applied as `PRAGMA = ` when the cache connection opens. **Unknown or inapplicable pragmas are silently ignored** by SQLite, so a bad value degrades gracefully instead of crashing startup. - **`page_size` and `auto_vacuum` are layout pragmas** — they only take effect on a *fresh* file (set before the first table). On an existing cache, `page_size` is ignored with a one-time warning; use [`hard_reset()`](#manual-cache-control) to rebuild the file with the new value. +#### INTEGER datetime columns (`datetime_columns=`) + +A pure datetime column stored as an ISO `TEXT` string costs ~28 bytes per row and compares by string collation. For a large table you can store named datetime columns as **INTEGER microseconds since the Unix epoch** instead — 8 bytes, native integer comparison: + +```python +engine = CachingEngine( + base_engine, + delta={"VW_P_PRATVALUES": DeltaConfig("CHANGE_DATE", ["PRATVALUE_ID"])}, + datetime_columns={"VW_P_PRATVALUES": ["CHANGE_DATE"]}, +) +``` + +- **Opt-in per column.** Only the columns you name change; everything else keeps the default lossless `TEXT` storage. +- ⚠️ **It changes the output contract for those columns** — `execute()` returns them as `int` (µs since epoch), not ISO strings, and a `WHERE` on them must compare against integer µs. Don't list a column your callers read as a string. +- The delta watermark is handled transparently: it is persisted as the integer and bound back to a real `datetime` for the source query, so incremental refresh keeps working. +- ⚠️ This is a **breaking on-disk change** (`SCHEMA_VERSION` 4): an existing cache is wiped and reloaded on first start after enabling it — schedule a maintenance window for a large reload. + ## Manual cache control ```python @@ -374,6 +391,7 @@ engine = CachingEngine( fetch_batch=10000, # SQLMEM_FETCH_BATCH dialect="tsql", # SQLMEM_SQL_DIALECT pragmas={"mmap_size": 32 * 1024**3, "page_size": 8192}, # disk-mode SQLite tuning + datetime_columns={"orders": ["created_at"]}, # store these as INTEGER µs (opt-in) blocking_startup_refresh=False, # block startup until caught up? (default: no) ) ``` diff --git a/project.md b/project.md index ab26657..561bb13 100644 --- a/project.md +++ b/project.md @@ -218,10 +218,11 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o - [x] **Ladění SQLite vrstvy (`pragmas=`)**: `CachingEngine(..., pragmas={...})` aplikuje libovolné PRAGMA na cache spojení (`mmap_size`, `cache_size`, `temp_store`, `page_size`, `auto_vacuum`). `page_size` a `auto_vacuum` jsou layout-pragmata — platí jen na čerstvém souboru (page_size na existující cache se ignoruje s warningem). Neznámá pragmata SQLite tiše ignoruje. - [x] **`hard_reset()`**: smaže on-disk soubor (+ WAL/SHM) a otevře nový s aktuálními pragmaty — na rozdíl od `reset()` umožní změnit `page_size`/`auto_vacuum`. Jen disk mód (v memory módu fallback na `reset()`). - [x] **`vacuum(incremental=, pages=)`**: údržbový VACUUM cache souboru — inkrementální (uvolní volné stránky po delta `INSERT OR REPLACE`, vyžaduje `auto_vacuum=INCREMENTAL`) nebo plný (přepíše soubor, jen v maintenance okně). V memory módu no-op. +- [x] **Nativní INTEGER ukládání datetime sloupců (`datetime_columns=`)**: `datetime_columns={"VW_X": ["CHANGE_DATE"]}` — vyjmenované datetime sloupce se ukládají jako INTEGER µs-od-epochy místo ~28 B ISO TEXT (úspora místa + nativní celočíselné porovnání indexu). Opt-in per sloupec → mění výstupní kontrakt jen u zvolených sloupců (vrací int, ne ISO string). Breaking: `SCHEMA_VERSION` 3→4, cache se při upgrade smaže a načte znovu. Watermark se persistuje jako int a `_bind_watermark(epoch_us=True)` ho rekonstruuje zpět na `datetime` pro zdroj. ## TODO — budoucí funkce -- [ ] **Nativní INTEGER ukládání datetime sloupců (`datetime_columns=`)** — `CHANGE_DATE` apod. jako µs-od-epochy INTEGER místo 28 B ISO TEXT (úspora místa + rychlejší porovnání indexu). Breaking (`SCHEMA_VERSION` 3→4, wipe cache). Plán pro 1.12.0. +- _(zatím žádné otevřené položky)_ --- diff --git a/pyproject.toml b/pyproject.toml index 43abcff..65cbb3f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sqlmem" -version = "1.11.0" +version = "1.12.0" description = "" authors = [ {name = "jan.doubravsky@gmail.com"} diff --git a/src/sqlmem/_coerce.py b/src/sqlmem/_coerce.py index 349136a..926323e 100644 --- a/src/sqlmem/_coerce.py +++ b/src/sqlmem/_coerce.py @@ -15,6 +15,8 @@ from typing import Any Params = tuple | list | dict | None +_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + def to_sqlite(value: Any) -> Any: if isinstance(value, decimal.Decimal): @@ -28,6 +30,29 @@ def to_sqlite(value: Any) -> Any: return value +def to_sqlite_datetime(value: Any) -> int | None: + """Store a datetime as INTEGER microseconds since the Unix epoch (UTC). + + Used for columns the caller marks via ``datetime_columns``: 8 bytes as an + INTEGER instead of a ~28-byte ISO ``TEXT`` string, and integer comparison on + the change column instead of string collation. ``None`` passes through; a + naive datetime is treated as UTC. A non-datetime value is parsed from its ISO + string form (so ``date``/ISO-``str`` inputs work too); anything unparseable + becomes ``None``. + """ + if value is None: + return None + if isinstance(value, datetime.datetime): + if value.tzinfo is None: + value = value.replace(tzinfo=datetime.timezone.utc) + delta = value - _EPOCH # exact integer arithmetic (no float rounding) + return delta.days * 86_400_000_000 + delta.seconds * 1_000_000 + delta.microseconds + try: + return to_sqlite_datetime(datetime.datetime.fromisoformat(str(value))) + except (TypeError, ValueError): + return None + + def coerce_row(row: tuple) -> tuple: return tuple(to_sqlite(v) for v in row) diff --git a/src/sqlmem/cache.py b/src/sqlmem/cache.py index b56156c..a5e2c6f 100644 --- a/src/sqlmem/cache.py +++ b/src/sqlmem/cache.py @@ -9,12 +9,12 @@ from pathlib import Path from loguru import logger import sqlmem._meta as _meta -from ._coerce import coerce_params, coerce_row +from ._coerce import coerce_params, coerce_row, to_sqlite, to_sqlite_datetime from ._sql import quote, quote_list, quote_source from .config import FETCH_BATCH_SIZE, SQL_DIALECT from .stats import TableState -SCHEMA_VERSION = 3 +SCHEMA_VERSION = 4 @dataclass(frozen=True) @@ -41,6 +41,7 @@ class CacheManager: dialect: str = SQL_DIALECT, fetch_batch: int = FETCH_BATCH_SIZE, pragmas: dict[str, str | int] | None = None, + datetime_columns: dict[str, list[str]] | None = None, ) -> None: self._db_path = db_path self._backup_interval = backup_interval @@ -48,6 +49,8 @@ class CacheManager: self._dialect = dialect # source-DB dialect, for identifier quoting self._fetch_batch = fetch_batch # rows fetched per source batch self._pragmas = dict(pragmas or {}) # extra read/layout PRAGMAs (disk mode) + # table → columns stored as INTEGER µs-since-epoch instead of ISO TEXT + self._datetime_columns = {t: list(c) for t, c in (datetime_columns or {}).items()} self._lock = threading.Lock() # serializes connection access self._load_lock = threading.Lock() # serializes full table loads self._states: dict[str, str] = {} # table → live processing state @@ -387,6 +390,27 @@ class CacheManager: self._conn.commit() logger.debug(f"Index {idx.name!r} ready on {table} ({cols})") + def _row_coercer(self, table: str, columns: list[str]): + """Return a per-row coercer for *columns* in source order. + + Columns registered in ``datetime_columns`` for *table* are coerced to + INTEGER µs-since-epoch (``to_sqlite_datetime``); everything else keeps the + default stringifying coercion (``to_sqlite``). With no datetime columns it + is the plain :func:`coerce_row`, so the common path is unchanged. + """ + dt_cols = set(self._datetime_columns.get(table, ())) + dt_idx = {i for i, c in enumerate(columns) if c in dt_cols} + if not dt_idx: + return coerce_row + + def coerce(row: tuple) -> tuple: + return tuple( + to_sqlite_datetime(v) if i in dt_idx else to_sqlite(v) + for i, v in enumerate(row) + ) + + return coerce + def load_table( self, table: str, @@ -403,7 +427,11 @@ class CacheManager: connection lock is only held for the brief per-batch inserts and the swap. """ src_cols = ", ".join(quote_source(c, self._dialect) for c in columns) - col_defs = ", ".join(f"{quote(c)} TEXT" for c in columns) + dt_cols = set(self._datetime_columns.get(table, ())) + col_defs = ", ".join( + f"{quote(c)} {'INTEGER' if c in dt_cols else 'TEXT'}" for c in columns + ) + coerce = self._row_coercer(table, columns) placeholders = ", ".join("?" * len(columns)) staging = f"{table}__sqlmem_load" q_staging = quote(staging) @@ -427,7 +455,7 @@ class CacheManager: batch = cursor.fetchmany(self._fetch_batch) # network outside _lock if not batch: break - clean = [coerce_row(row) for row in batch] + clean = [coerce(row) for row in batch] with self._lock: self._conn.executemany(insert_sql, clean) self._conn.commit() @@ -518,9 +546,11 @@ class CacheManager: row = self._conn.execute( "SELECT last_synced_at FROM _sqlmem_tables WHERE table_name = ?", (table,) ).fetchone() + # Stored in a TEXT column: an INTEGER-µs watermark (datetime_columns) comes + # back as its digit string; delta._bind_watermark reconstructs the datetime. return row[0] if row else None - def set_last_synced_at(self, table: str, value: str | None) -> None: + def set_last_synced_at(self, table: str, value: str | int | None) -> None: with self._lock: self._conn.execute( "UPDATE _sqlmem_tables SET last_synced_at = ? WHERE table_name = ?", @@ -528,8 +558,11 @@ class CacheManager: ) self._conn.commit() - def max_value(self, table: str, column: str) -> str | None: - """Maximum value of *column* across cached rows (the delta watermark).""" + def max_value(self, table: str, column: str) -> str | int | None: + """Maximum value of *column* across cached rows (the delta watermark). + + Returns an ``int`` for a datetime column stored as INTEGER µs, else the + ISO ``TEXT`` string.""" row = self._conn.execute( f"SELECT MAX({quote(column)}) FROM {quote(table)}" ).fetchone() @@ -539,7 +572,8 @@ class CacheManager: """Insert-or-replace one batch of *rows* by the table's unique key.""" col_list = quote_list(columns) placeholders = ", ".join("?" * len(columns)) - clean_rows = [coerce_row(row) for row in rows] + coerce = self._row_coercer(table, columns) + clean_rows = [coerce(row) for row in rows] with self._lock: self._conn.executemany( f"INSERT OR REPLACE INTO {quote(table)} ({col_list}) VALUES ({placeholders})", diff --git a/src/sqlmem/delta.py b/src/sqlmem/delta.py index 5bf139e..4c3001d 100644 --- a/src/sqlmem/delta.py +++ b/src/sqlmem/delta.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timedelta, timezone from typing import Any from loguru import logger @@ -8,8 +8,10 @@ from ._sql import quote_source from .cache import CacheManager from .stats import TableState +_EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc) -def _bind_watermark(watermark: str) -> datetime | str: + +def _bind_watermark(watermark: str | int, epoch_us: bool = False) -> datetime | str: """Bind the delta watermark back to the source in its native type. The cache stores the change column as an ISO ``TEXT`` string (see @@ -22,11 +24,21 @@ def _bind_watermark(watermark: str) -> datetime | str: driver send a typed timestamp, so the comparison happens natively with no string conversion. Non-datetime change columns (e.g. an integer rowversion) don't parse and are passed through unchanged. + + When the change column is stored as INTEGER µs-since-epoch (``datetime_columns``) + *epoch_us* is set: the watermark is a microsecond count (an ``int`` or its digit + string, since it round-trips through a TEXT column) and is reconstructed into a + UTC :class:`~datetime.datetime` so the source still receives a typed timestamp. """ + if epoch_us: + try: + return _EPOCH + timedelta(microseconds=int(watermark)) + except (TypeError, ValueError): + return watermark if isinstance(watermark, str) else str(watermark) try: - return datetime.fromisoformat(watermark) + return datetime.fromisoformat(watermark) # type: ignore[arg-type] except (TypeError, ValueError): - return watermark + return watermark if isinstance(watermark, str) else str(watermark) @dataclass(frozen=True) @@ -92,9 +104,10 @@ class DeltaRefresher: cursor = source_conn.execute(f"SELECT {col_list} FROM {q_table}") else: change_col = quote_source(cfg.change_column, dialect) + epoch_us = cfg.change_column in self._cache._datetime_columns.get(table, ()) cursor = source_conn.execute( f"SELECT {col_list} FROM {q_table} WHERE {change_col} >= ?", - (_bind_watermark(watermark),), + (_bind_watermark(watermark, epoch_us),), ) # Stream the delta in batches so a large catch-up never materializes at once. diff --git a/src/sqlmem/engine.py b/src/sqlmem/engine.py index 0599143..436d21c 100644 --- a/src/sqlmem/engine.py +++ b/src/sqlmem/engine.py @@ -66,6 +66,7 @@ class CachingEngine: fetch_batch: int | None = None, dialect: str | None = None, pragmas: dict[str, str | int] | None = None, + datetime_columns: dict[str, list[str]] | None = None, blocking_startup_refresh: bool = False, ) -> None: self._source_engine = source_engine @@ -81,6 +82,7 @@ class CachingEngine: dialect=self._dialect, fetch_batch=fetch_batch if fetch_batch is not None else FETCH_BATCH_SIZE, pragmas=pragmas, + datetime_columns=datetime_columns, ) self._registry = ColumnRegistry(self._cache.connection) self._stats = StatsCollector() diff --git a/tests/test_coerce.py b/tests/test_coerce.py index 3914e60..0923076 100644 --- a/tests/test_coerce.py +++ b/tests/test_coerce.py @@ -4,7 +4,7 @@ import uuid import pytest -from sqlmem._coerce import coerce_params, to_sqlite +from sqlmem._coerce import coerce_params, to_sqlite, to_sqlite_datetime from sqlmem.cache import CacheManager @@ -91,6 +91,80 @@ def test_coerce_params_none(): assert coerce_params(None) is None +# --- to_sqlite_datetime (INTEGER µs storage, 1.12.0) ------------------------ + + +def test_datetime_to_epoch_micros(): + # 2026-06-01T10:00:00Z -> microseconds since epoch + dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc) + expected = int(dt.timestamp() * 1_000_000) + assert to_sqlite_datetime(dt) == expected + + +def test_datetime_naive_treated_as_utc(): + naive = datetime.datetime(2026, 6, 1, 10, 0, 0) + aware = naive.replace(tzinfo=datetime.timezone.utc) + assert to_sqlite_datetime(naive) == to_sqlite_datetime(aware) + + +def test_datetime_micros_are_exact(): + dt = datetime.datetime(2026, 6, 5, 14, 54, 24, 823000, tzinfo=datetime.timezone.utc) + us = to_sqlite_datetime(dt) + # round-trips back to the same instant with no rounding loss + back = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta( + microseconds=us + ) + assert back == dt + + +def test_datetime_none_passes_through(): + assert to_sqlite_datetime(None) is None + + +def test_datetime_iso_string_parsed(): + assert to_sqlite_datetime("2026-06-01T10:00:00+00:00") == to_sqlite_datetime( + datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc) + ) + + +def test_datetime_unparseable_is_none(): + assert to_sqlite_datetime("not a date") is None + + +# --- integration: datetime_columns are stored as INTEGER -------------------- + + +def test_datetime_column_stored_as_integer(tmp_path): + c = CacheManager( + db_path=tmp_path / "cache.db", + backup_interval=9999, + datetime_columns={"t": ["changed"]}, + ) + dt = datetime.datetime(2026, 6, 1, 10, 0, 0, tzinfo=datetime.timezone.utc) + c.load_table("t", ["id", "changed"], FakeSource([("1", dt)])) + + # Column declared INTEGER, value stored as µs-since-epoch. + coltype = c.connection.execute("PRAGMA table_info(t)").fetchall() + types = {row[1]: row[2] for row in coltype} + assert types["changed"] == "INTEGER" + assert types["id"] == "TEXT" + _, out = c.execute_in_memory("SELECT changed FROM t") + assert out == [(to_sqlite_datetime(dt),)] + c.close() + + +def test_non_datetime_columns_unaffected_by_datetime_columns(tmp_path): + c = CacheManager( + db_path=tmp_path / "cache.db", + backup_interval=9999, + datetime_columns={"t": ["changed"]}, + ) + c.load_table("t", ["id", "price"], FakeSource([("1", decimal.Decimal("9.99"))])) + _, out = c.execute_in_memory("SELECT id, price FROM t") + assert out == [("1", "9.99")] # still TEXT/ISO coercion + c.close() + + # --- integration: values reach the cache through coercion ------------------- diff --git a/tests/test_delta.py b/tests/test_delta.py index f8fcf6d..600269d 100644 --- a/tests/test_delta.py +++ b/tests/test_delta.py @@ -1,6 +1,6 @@ import sqlite3 import threading -from datetime import datetime +from datetime import datetime, timezone from types import SimpleNamespace import pytest @@ -140,6 +140,18 @@ def test_bind_watermark_passes_through_non_datetime(): assert _bind_watermark("12345") == "12345" +# --- INTEGER µs watermark binding (datetime_columns, 1.12.0) ---------------- + + +def test_bind_watermark_epoch_us_reconstructs_datetime(): + dt = datetime(2026, 6, 5, 14, 54, 24, 823000, tzinfo=timezone.utc) + us = int(dt.timestamp() * 1_000_000) + # Whether the watermark is an int or its digit string (it round-trips through + # the TEXT last_synced_at column), it binds back to the same UTC datetime. + assert _bind_watermark(us, epoch_us=True) == dt + assert _bind_watermark(str(us), epoch_us=True) == dt + + class _SpyCursor: def __init__(self, rows): self._rows = list(rows) @@ -174,6 +186,46 @@ def test_refresh_binds_watermark_as_datetime(env): assert params == (datetime(2026, 6, 5, 14, 54, 24, 823000),) +class _RowSource: + """Returns fixed rows for any query (for loading datetime-typed source data).""" + + def __init__(self, rows): + self._rows = rows + + def execute(self, sql, params=()): + return _SpyCursor(self._rows) + + +def test_datetime_column_watermark_stored_as_int_and_bound_back(tmp_path): + """A change column declared in datetime_columns is stored as INTEGER µs; the + watermark is bound back to a real datetime for the source query.""" + cache = CacheManager( + db_path=tmp_path / "c.db", + backup_interval=9999, + datetime_columns={"products": ["changed"]}, + ) + dt1 = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc) + dt2 = datetime(2026, 6, 1, 10, 5, 0, tzinfo=timezone.utc) + cache.load_table("products", ["id", "changed"], _RowSource([("1", dt1), ("2", dt2)])) + cache.create_unique_index("products", ["id"]) + cache.set_last_synced_at("products", cache.max_value("products", "changed")) + + # Watermark persisted as the max INTEGER µs (digit string out of the TEXT col). + wm = cache.get_last_synced_at("products") + assert wm == str(int(dt2.timestamp() * 1_000_000)) + + refresher = DeltaRefresher( + cache, {"products": ResolvedDelta("changed", ["id"])} + ) + spy = _SpySource(rows=[]) # no new rows — just capture the bound watermark + refresher.refresh(spy) + + assert spy.bound, "source query was never issued" + _, params = spy.bound[-1] + assert params == (dt2,) # bound back as datetime, not an int/string + cache.close() + + # --------------------------------------------------------------------------- # Refresh failures are recorded (4.3) so a stuck delta is visible in stats # ---------------------------------------------------------------------------