Files

222 lines
6.8 KiB
Python

import sqlite3
import time
from datetime import datetime, timezone
import pytest
from sqlalchemy import create_engine
from sqlmem import (
TTL,
CachingEngine,
Delta,
DeltaConfig,
TableSpec,
UndeclaredError,
)
@pytest.fixture
def spec_source(tmp_path):
db = tmp_path / "src.db"
conn = sqlite3.connect(db)
conn.execute("CREATE TABLE products (id TEXT, name TEXT, price TEXT, changed TEXT)")
conn.executemany(
"INSERT INTO products VALUES (?, ?, ?, ?)",
[
("1", "Widget", "9.99", "2026-06-01T10:00:00"),
("2", "Gadget", "19.99", "2026-06-02T10:00:00"),
],
)
conn.execute("CREATE TABLE orders (order_id TEXT, qty TEXT)")
conn.executemany("INSERT INTO orders VALUES (?, ?)", [("101", "2")])
conn.commit()
conn.close()
se = create_engine(f"sqlite:///{db}")
yield se
se.dispose()
# --- back-compat / validation -----------------------------------------------
def test_tables_and_legacy_kwargs_are_mutually_exclusive(spec_source):
with pytest.raises(ValueError):
CachingEngine(
spec_source,
tables=[TableSpec("products", ["id"])],
delta={"products": DeltaConfig("changed", ["id"])},
)
def test_duplicate_tablespec_raises(spec_source):
with pytest.raises(ValueError):
CachingEngine(
spec_source,
tables=[TableSpec("products", ["id"]), TableSpec("products", ["name"])],
)
def test_delta_alias_is_deltaconfig():
assert Delta is DeltaConfig
# --- preload ----------------------------------------------------------------
def test_preload_blocking_caches_before_first_query(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[TableSpec("products", ["id", "name"], preload=True)],
blocking_startup_refresh=True,
)
# Cached at construction time — no execute() needed.
assert ce._cache.is_table_cached("products") is True
rows = ce.execute("SELECT id, name FROM products")
assert {r["id"] for r in rows} == {"1", "2"}
ce.close()
def test_preload_non_blocking_eventually_caches(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[TableSpec("products", ["id", "name"], preload=True)],
)
deadline = time.time() + 5
while not ce._cache.is_table_cached("products") and time.time() < deadline:
time.sleep(0.05)
assert ce._cache.is_table_cached("products") is True
ce.close()
def test_non_preload_table_is_not_loaded_at_startup(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[TableSpec("products", ["id", "name"], preload=False)],
blocking_startup_refresh=True,
)
assert ce._cache.is_table_cached("products") is False # loads lazily on first query
ce.execute("SELECT id, name FROM products")
assert ce._cache.is_table_cached("products") is True
ce.close()
# --- fail-fast on undeclared tables / columns -------------------------------
def test_fail_fast_undeclared_table(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
with pytest.raises(UndeclaredError):
ce.execute("SELECT order_id FROM orders")
ce.close()
def test_fail_fast_undeclared_column(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
with pytest.raises(UndeclaredError):
ce.execute("SELECT price FROM products")
ce.close()
def test_declared_columns_query_succeeds(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
rows = ce.execute("SELECT id, name FROM products")
assert len(rows) == 2
ce.close()
def test_columns_none_allows_wildcard_and_any_column(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", columns=None)])
assert len(ce.execute("SELECT * FROM products")) == 2
assert len(ce.execute("SELECT price FROM products")) == 2 # any column allowed
ce.close()
def test_wildcard_on_column_restricted_table_fails(spec_source):
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
with pytest.raises(UndeclaredError):
ce.execute("SELECT * FROM products")
ce.close()
def test_lazy_mode_has_no_fail_fast(spec_source):
"""Without tables=, undeclared columns/tables load lazily as before."""
ce = CachingEngine(spec_source)
rows = ce.execute("SELECT order_id FROM orders")
assert len(rows) == 1
ce.close()
# --- refresh strategies via TableSpec ---------------------------------------
def test_tablespec_delta_tracking(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[
TableSpec(
"products",
["id", "name", "changed"],
refresh=Delta(change_column="changed", key_columns=["id"]),
preload=True,
)
],
blocking_startup_refresh=True,
)
assert ce.stats.tables["products"].tracking == "delta"
ce.close()
def test_tablespec_ttl_tracking(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[TableSpec("products", ["id", "name"], refresh=TTL(seconds=1800), preload=True)],
blocking_startup_refresh=True,
)
assert ce.stats.tables["products"].tracking == "ttl"
ce.close()
def test_tablespec_datetime_columns_roundtrip(spec_source, tmp_path):
ce = CachingEngine(
spec_source,
cache_db_path=tmp_path / "c.db",
tables=[
TableSpec("products", ["id", "changed"], datetime_columns=["changed"], preload=True)
],
blocking_startup_refresh=True,
)
rows = ce.execute("SELECT id, changed FROM products WHERE changed > ?", ("2026-06-01T12:00:00",))
assert [r["id"] for r in rows] == ["2"] # param coercion via datetime_columns
assert rows[0]["changed"] == datetime(2026, 6, 2, 10, 0, 0, tzinfo=timezone.utc)
ce.close()
# --- warm restart: preload skips a copy already fresh on disk ---------------
def test_warm_restart_preload_skips_reload(spec_source, tmp_path):
path = tmp_path / "c.db"
def make() -> CachingEngine:
return CachingEngine(
spec_source,
cache_db_path=path,
in_memory=False,
tables=[TableSpec("products", ["id", "name"], preload=True)],
blocking_startup_refresh=True,
)
ce1 = make()
assert ce1.stats.misses >= 1 # cold preload had to load from source
ce1.close()
ce2 = make()
assert ce2._cache.is_table_cached("products") is True
assert ce2.stats.misses == 0 # warm: preload was a cache hit, no redundant reload
ce2.close()