import sqlite3 import threading from datetime import datetime, timezone from types import SimpleNamespace import pytest from sqlalchemy import create_engine import sqlmem.engine as eng_mod from sqlmem import CachingEngine, DeltaConfig from sqlmem.cache import CacheManager from sqlmem.delta import DeltaRefresher, ResolvedDelta, _bind_watermark from sqlmem.executor import QueryExecutor from sqlmem.parser import parse from sqlmem.registry import ColumnRegistry from sqlmem.stats import StatsCollector def cached_rows(cache, sql): cols, rows = cache.execute_in_memory(sql) return [dict(zip(cols, row)) for row in rows] # --------------------------------------------------------------------------- # Refresher unit tests (in-memory source connection) # --------------------------------------------------------------------------- @pytest.fixture def source_conn(): conn = sqlite3.connect(":memory:") conn.executescript( """ CREATE TABLE products (id TEXT PRIMARY KEY, name TEXT, price TEXT, changed TEXT); INSERT INTO products VALUES ('1', 'Widget', '9.99', '2026-06-01 10:00:00'), ('2', 'Gadget', '19.99', '2026-06-01 10:05:00'); """ ) conn.commit() yield conn conn.close() @pytest.fixture def env(tmp_path, source_conn): cache = CacheManager(db_path=tmp_path / "cache.db", backup_interval=9999) registry = ColumnRegistry(cache.connection) stats = StatsCollector() delta = {"products": ResolvedDelta(change_column="changed", key_columns=["id"])} executor = QueryExecutor(cache, registry, source_conn, stats, delta) refresher = DeltaRefresher(cache, delta) # Initial load — caches id, name, price (+ augmented key/change columns). executor.execute(parse("SELECT id, name, price FROM products")) yield SimpleNamespace(cache=cache, source=source_conn, refresher=refresher) cache.close() def test_load_augments_key_and_change_columns(env): cols = env.cache.get_table_columns("products") assert {"id", "name", "price", "changed"}.issubset(set(cols)) def test_initial_watermark_is_max_change(env): assert env.cache.get_last_synced_at("products") == "2026-06-01 10:05:00" def test_refresh_applies_updates(env): env.source.execute( "UPDATE products SET price = '7.77', changed = '2026-06-01 10:10:00' WHERE id = '1'" ) env.source.commit() env.refresher.refresh(env.source) rows = {r["id"]: r for r in cached_rows(env.cache, "SELECT id, price FROM products")} assert rows["1"]["price"] == "7.77" assert env.cache.get_last_synced_at("products") == "2026-06-01 10:10:00" def test_refresh_inserts_new_rows(env): env.source.execute( "INSERT INTO products VALUES ('3', 'Sprocket', '5.00', '2026-06-01 10:20:00')" ) env.source.commit() env.refresher.refresh(env.source) ids = {r["id"] for r in cached_rows(env.cache, "SELECT id FROM products")} assert ids == {"1", "2", "3"} def test_boundary_timestamp_not_missed_and_idempotent(env): # New row sharing the exact watermark timestamp must still be picked up (>=), # and the row already at that timestamp must not be duplicated. env.source.execute( "INSERT INTO products VALUES ('3', 'Sprocket', '5.00', '2026-06-01 10:05:00')" ) env.source.commit() env.refresher.refresh(env.source) env.refresher.refresh(env.source) # idempotent — running twice changes nothing rows = cached_rows(env.cache, "SELECT id FROM products") assert sorted(r["id"] for r in rows) == ["1", "2", "3"] def test_delete_by_nulling(env): env.source.execute( "UPDATE products SET name = NULL, changed = '2026-06-01 10:30:00' WHERE id = '1'" ) env.source.commit() env.refresher.refresh(env.source) rows = {r["id"]: r for r in cached_rows(env.cache, "SELECT id, name FROM products")} assert rows["1"]["name"] is None def test_refresh_without_changes_is_noop(env): before = cached_rows(env.cache, "SELECT id, name, price FROM products") env.refresher.refresh(env.source) after = cached_rows(env.cache, "SELECT id, name, price FROM products") assert before == after # --------------------------------------------------------------------------- # Watermark binding — regression for the datetime-as-string delta bug # (SQL Server error 241: 'T'-separated 6-digit-microsecond ISO string can't be # implicitly converted varchar->datetime, freezing the delta watermark). # --------------------------------------------------------------------------- def test_bind_watermark_parses_iso_datetime(): assert _bind_watermark("2026-06-05T14:54:24.823000") == datetime( 2026, 6, 5, 14, 54, 24, 823000 ) def test_bind_watermark_parses_space_separated(): assert _bind_watermark("2026-06-01 10:05:00") == datetime(2026, 6, 1, 10, 5, 0) def test_bind_watermark_passes_through_non_datetime(): # Integer rowversion / non-datetime change column — left untouched. assert _bind_watermark("12345") == "12345" # --- INTEGER µs watermark binding (datetime_columns, 1.12.0) ---------------- def test_bind_watermark_epoch_us_reconstructs_datetime(): dt = datetime(2026, 6, 5, 14, 54, 24, 823000, tzinfo=timezone.utc) us = int(dt.timestamp() * 1_000_000) # Whether the watermark is an int or its digit string (it round-trips through # the TEXT last_synced_at column), it binds back to the same UTC datetime. assert _bind_watermark(us, epoch_us=True) == dt assert _bind_watermark(str(us), epoch_us=True) == dt class _SpyCursor: def __init__(self, rows): self._rows = list(rows) def fetchmany(self, n): batch, self._rows = self._rows[:n], self._rows[n:] return batch class _SpySource: """Records the parameters bound to each query (stands in for the pyodbc source).""" def __init__(self, rows): self._rows = rows self.bound = [] def execute(self, sql, params=()): self.bound.append((sql, params)) return _SpyCursor(self._rows) def test_refresh_binds_watermark_as_datetime(env): """The watermark must reach the source as a real datetime, not a raw ISO string — otherwise SQL Server raises error 241 and the delta freezes.""" env.cache.set_last_synced_at("products", "2026-06-05T14:54:24.823000") spy = _SpySource(rows=[("1", "Widget", "9.99", "2026-06-05T14:54:24.823000")]) env.refresher.refresh(spy) assert spy.bound, "source query was never issued" _, params = spy.bound[-1] assert params == (datetime(2026, 6, 5, 14, 54, 24, 823000),) class _RowSource: """Returns fixed rows for any query (for loading datetime-typed source data).""" def __init__(self, rows): self._rows = rows def execute(self, sql, params=()): return _SpyCursor(self._rows) def test_datetime_column_watermark_stored_as_int_and_bound_back(tmp_path): """A change column declared in datetime_columns is stored as INTEGER µs; the watermark is bound back to a real datetime for the source query.""" cache = CacheManager( db_path=tmp_path / "c.db", backup_interval=9999, datetime_columns={"products": ["changed"]}, ) dt1 = datetime(2026, 6, 1, 10, 0, 0, tzinfo=timezone.utc) dt2 = datetime(2026, 6, 1, 10, 5, 0, tzinfo=timezone.utc) cache.load_table("products", ["id", "changed"], _RowSource([("1", dt1), ("2", dt2)])) cache.create_unique_index("products", ["id"]) cache.set_last_synced_at("products", cache.max_value("products", "changed")) # Watermark persisted as the max INTEGER µs (digit string out of the TEXT col). wm = cache.get_last_synced_at("products") assert wm == str(int(dt2.timestamp() * 1_000_000)) refresher = DeltaRefresher( cache, {"products": ResolvedDelta("changed", ["id"])} ) spy = _SpySource(rows=[]) # no new rows — just capture the bound watermark refresher.refresh(spy) assert spy.bound, "source query was never issued" _, params = spy.bound[-1] assert params == (dt2,) # bound back as datetime, not an int/string cache.close() # --------------------------------------------------------------------------- # Refresh failures are recorded (4.3) so a stuck delta is visible in stats # --------------------------------------------------------------------------- class _RaisingSource: def execute(self, sql, params=()): raise RuntimeError("boom 241") def test_failed_delta_refresh_records_error(env): env.refresher.refresh(_RaisingSource()) err = env.cache.get_errors()["products"] assert err.consecutive == 1 assert "boom 241" in err.message assert env.cache.error_total == 1 # State is marked error even though the cache still holds the last-good data. assert env.cache.get_states()["products"] == "error" def test_delta_success_resets_failure_streak(env): env.refresher.refresh(_RaisingSource()) assert env.cache.get_errors()["products"].consecutive == 1 env.refresher.refresh(env.source) # real source — succeeds assert env.cache.get_errors()["products"].consecutive == 0 # --------------------------------------------------------------------------- # last_upsert (persisted write) vs last_refresh (in-memory run/liveness) # --------------------------------------------------------------------------- def _persisted_last_upsert(cache, table): row = cache.connection.execute( "SELECT last_refresh_at FROM _sqlmem_tables WHERE table_name = ?", (table,) ).fetchone() return row[0] if row else None def test_empty_delta_records_run_but_not_write(env): """An empty delta cycle bumps last_refresh (liveness) but not the persisted last_upsert (write time).""" before = _persisted_last_upsert(env.cache, "products") # Push the watermark past every source row so the next cycle returns 0 rows. env.cache.set_last_synced_at("products", "2099-01-01 00:00:00") env.refresher.refresh(env.source) # No rows written → persisted write time unchanged. assert _persisted_last_upsert(env.cache, "products") == before # But the cycle ran → in-memory run time recorded (and at/after the last write). runs = env.cache.get_last_runs() assert runs["products"] is not None assert runs["products"] >= before # --------------------------------------------------------------------------- # Engine-level: PK auto-discovery, reset, end-to-end refresh # --------------------------------------------------------------------------- @pytest.fixture def source_db(tmp_path): db_path = tmp_path / "source.db" conn = sqlite3.connect(db_path) conn.executescript( """ CREATE TABLE products (id TEXT PRIMARY KEY, name TEXT, changed TEXT); INSERT INTO products VALUES ('1', 'Widget', '2026-06-01 10:00:00'); CREATE VIEW vw_products AS SELECT id, name FROM products; """ ) conn.commit() conn.close() return db_path @pytest.fixture def source_engine(source_db): engine = create_engine(f"sqlite:///{source_db}") yield engine engine.dispose() @pytest.fixture def patched_cache(tmp_path, monkeypatch): monkeypatch.setattr(eng_mod, "CACHE_DB_PATH", tmp_path / "cache.db") monkeypatch.setattr(eng_mod, "BACKUP_INTERVAL_SECONDS", 9999) def test_pk_auto_discovery(source_engine, patched_cache): engine = CachingEngine(source_engine, delta={"products": DeltaConfig(change_column="changed")}) assert engine._delta["products"].key_columns == ["id"] engine.close() def test_view_without_key_raises(source_engine, patched_cache): with pytest.raises(ValueError): CachingEngine(source_engine, delta={"vw_products": DeltaConfig(change_column="name")}) def test_engine_reset(source_engine, patched_cache): engine = CachingEngine(source_engine) engine.execute("SELECT id, name FROM products") assert engine._cache.is_table_cached("products") is True engine.reset() assert engine._cache.is_table_cached("products") is False engine.close() def test_startup_catch_up_is_non_blocking_by_default(source_engine, patched_cache, monkeypatch): """By default the startup catch-up runs on the background thread, not the main thread, so it never blocks application startup.""" threads: list[str] = [] started = threading.Event() real = eng_mod.CachingEngine._run_refresh def spy(self): threads.append(threading.current_thread().name) started.set() return real(self) monkeypatch.setattr(eng_mod.CachingEngine, "_run_refresh", spy) engine = CachingEngine( source_engine, delta={"products": DeltaConfig("changed", ["id"])} ) # __init__ has returned; the main thread must not have run the catch-up. assert "MainThread" not in threads assert started.wait(2), "background catch-up never ran" assert threads == ["sqlmem-delta"] engine.close() def test_blocking_startup_refresh_runs_synchronously(source_engine, patched_cache, monkeypatch): threads: list[str] = [] real = eng_mod.CachingEngine._run_refresh def spy(self): threads.append(threading.current_thread().name) return real(self) monkeypatch.setattr(eng_mod.CachingEngine, "_run_refresh", spy) engine = CachingEngine( source_engine, delta={"products": DeltaConfig("changed", ["id"])}, blocking_startup_refresh=True, ) # Opt-in: the catch-up ran on the main thread before __init__ returned. assert "MainThread" in threads engine.close() def test_engine_delta_refresh_end_to_end(source_engine, source_db, patched_cache): engine = CachingEngine( source_engine, delta={"products": DeltaConfig(change_column="changed", key_columns=["id"])} ) engine.execute("SELECT id, name FROM products") # caches, watermark = 10:00 conn = sqlite3.connect(source_db) conn.execute("UPDATE products SET name = 'Widget2', changed = '2026-06-01 10:06:00' WHERE id = '1'") conn.execute("INSERT INTO products VALUES ('2', 'Gadget', '2026-06-01 10:05:00')") conn.commit() conn.close() engine.refresh() rows = {r["id"]: r for r in engine.execute("SELECT id, name FROM products")} assert rows["1"]["name"] == "Widget2" assert rows["2"]["name"] == "Gadget" engine.close()