222 lines
6.8 KiB
Python
222 lines
6.8 KiB
Python
import sqlite3
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
import pytest
|
|
from sqlalchemy import create_engine
|
|
|
|
from sqlmem import (
|
|
TTL,
|
|
CachingEngine,
|
|
Delta,
|
|
DeltaConfig,
|
|
TableSpec,
|
|
UndeclaredError,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def spec_source(tmp_path):
|
|
db = tmp_path / "src.db"
|
|
conn = sqlite3.connect(db)
|
|
conn.execute("CREATE TABLE products (id TEXT, name TEXT, price TEXT, changed TEXT)")
|
|
conn.executemany(
|
|
"INSERT INTO products VALUES (?, ?, ?, ?)",
|
|
[
|
|
("1", "Widget", "9.99", "2026-06-01T10:00:00"),
|
|
("2", "Gadget", "19.99", "2026-06-02T10:00:00"),
|
|
],
|
|
)
|
|
conn.execute("CREATE TABLE orders (order_id TEXT, qty TEXT)")
|
|
conn.executemany("INSERT INTO orders VALUES (?, ?)", [("101", "2")])
|
|
conn.commit()
|
|
conn.close()
|
|
se = create_engine(f"sqlite:///{db}")
|
|
yield se
|
|
se.dispose()
|
|
|
|
|
|
# --- back-compat / validation -----------------------------------------------
|
|
|
|
|
|
def test_tables_and_legacy_kwargs_are_mutually_exclusive(spec_source):
|
|
with pytest.raises(ValueError):
|
|
CachingEngine(
|
|
spec_source,
|
|
tables=[TableSpec("products", ["id"])],
|
|
delta={"products": DeltaConfig("changed", ["id"])},
|
|
)
|
|
|
|
|
|
def test_duplicate_tablespec_raises(spec_source):
|
|
with pytest.raises(ValueError):
|
|
CachingEngine(
|
|
spec_source,
|
|
tables=[TableSpec("products", ["id"]), TableSpec("products", ["name"])],
|
|
)
|
|
|
|
|
|
def test_delta_alias_is_deltaconfig():
|
|
assert Delta is DeltaConfig
|
|
|
|
|
|
# --- preload ----------------------------------------------------------------
|
|
|
|
|
|
def test_preload_blocking_caches_before_first_query(spec_source, tmp_path):
|
|
ce = CachingEngine(
|
|
spec_source,
|
|
cache_db_path=tmp_path / "c.db",
|
|
tables=[TableSpec("products", ["id", "name"], preload=True)],
|
|
blocking_startup_refresh=True,
|
|
)
|
|
# Cached at construction time — no execute() needed.
|
|
assert ce._cache.is_table_cached("products") is True
|
|
rows = ce.execute("SELECT id, name FROM products")
|
|
assert {r["id"] for r in rows} == {"1", "2"}
|
|
ce.close()
|
|
|
|
|
|
def test_preload_non_blocking_eventually_caches(spec_source, tmp_path):
|
|
ce = CachingEngine(
|
|
spec_source,
|
|
cache_db_path=tmp_path / "c.db",
|
|
tables=[TableSpec("products", ["id", "name"], preload=True)],
|
|
)
|
|
deadline = time.time() + 5
|
|
while not ce._cache.is_table_cached("products") and time.time() < deadline:
|
|
time.sleep(0.05)
|
|
assert ce._cache.is_table_cached("products") is True
|
|
ce.close()
|
|
|
|
|
|
def test_non_preload_table_is_not_loaded_at_startup(spec_source, tmp_path):
|
|
ce = CachingEngine(
|
|
spec_source,
|
|
cache_db_path=tmp_path / "c.db",
|
|
tables=[TableSpec("products", ["id", "name"], preload=False)],
|
|
blocking_startup_refresh=True,
|
|
)
|
|
assert ce._cache.is_table_cached("products") is False # loads lazily on first query
|
|
ce.execute("SELECT id, name FROM products")
|
|
assert ce._cache.is_table_cached("products") is True
|
|
ce.close()
|
|
|
|
|
|
# --- fail-fast on undeclared tables / columns -------------------------------
|
|
|
|
|
|
def test_fail_fast_undeclared_table(spec_source):
|
|
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
|
|
with pytest.raises(UndeclaredError):
|
|
ce.execute("SELECT order_id FROM orders")
|
|
ce.close()
|
|
|
|
|
|
def test_fail_fast_undeclared_column(spec_source):
|
|
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
|
|
with pytest.raises(UndeclaredError):
|
|
ce.execute("SELECT price FROM products")
|
|
ce.close()
|
|
|
|
|
|
def test_declared_columns_query_succeeds(spec_source):
|
|
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
|
|
rows = ce.execute("SELECT id, name FROM products")
|
|
assert len(rows) == 2
|
|
ce.close()
|
|
|
|
|
|
def test_columns_none_allows_wildcard_and_any_column(spec_source):
|
|
ce = CachingEngine(spec_source, tables=[TableSpec("products", columns=None)])
|
|
assert len(ce.execute("SELECT * FROM products")) == 2
|
|
assert len(ce.execute("SELECT price FROM products")) == 2 # any column allowed
|
|
ce.close()
|
|
|
|
|
|
def test_wildcard_on_column_restricted_table_fails(spec_source):
|
|
ce = CachingEngine(spec_source, tables=[TableSpec("products", ["id", "name"])])
|
|
with pytest.raises(UndeclaredError):
|
|
ce.execute("SELECT * FROM products")
|
|
ce.close()
|
|
|
|
|
|
def test_lazy_mode_has_no_fail_fast(spec_source):
|
|
"""Without tables=, undeclared columns/tables load lazily as before."""
|
|
ce = CachingEngine(spec_source)
|
|
rows = ce.execute("SELECT order_id FROM orders")
|
|
assert len(rows) == 1
|
|
ce.close()
|
|
|
|
|
|
# --- refresh strategies via TableSpec ---------------------------------------
|
|
|
|
|
|
def test_tablespec_delta_tracking(spec_source, tmp_path):
|
|
ce = CachingEngine(
|
|
spec_source,
|
|
cache_db_path=tmp_path / "c.db",
|
|
tables=[
|
|
TableSpec(
|
|
"products",
|
|
["id", "name", "changed"],
|
|
refresh=Delta(change_column="changed", key_columns=["id"]),
|
|
preload=True,
|
|
)
|
|
],
|
|
blocking_startup_refresh=True,
|
|
)
|
|
assert ce.stats.tables["products"].tracking == "delta"
|
|
ce.close()
|
|
|
|
|
|
def test_tablespec_ttl_tracking(spec_source, tmp_path):
|
|
ce = CachingEngine(
|
|
spec_source,
|
|
cache_db_path=tmp_path / "c.db",
|
|
tables=[TableSpec("products", ["id", "name"], refresh=TTL(seconds=1800), preload=True)],
|
|
blocking_startup_refresh=True,
|
|
)
|
|
assert ce.stats.tables["products"].tracking == "ttl"
|
|
ce.close()
|
|
|
|
|
|
def test_tablespec_datetime_columns_roundtrip(spec_source, tmp_path):
|
|
ce = CachingEngine(
|
|
spec_source,
|
|
cache_db_path=tmp_path / "c.db",
|
|
tables=[
|
|
TableSpec("products", ["id", "changed"], datetime_columns=["changed"], preload=True)
|
|
],
|
|
blocking_startup_refresh=True,
|
|
)
|
|
rows = ce.execute("SELECT id, changed FROM products WHERE changed > ?", ("2026-06-01T12:00:00",))
|
|
assert [r["id"] for r in rows] == ["2"] # param coercion via datetime_columns
|
|
assert rows[0]["changed"] == datetime(2026, 6, 2, 10, 0, 0, tzinfo=timezone.utc)
|
|
ce.close()
|
|
|
|
|
|
# --- warm restart: preload skips a copy already fresh on disk ---------------
|
|
|
|
|
|
def test_warm_restart_preload_skips_reload(spec_source, tmp_path):
|
|
path = tmp_path / "c.db"
|
|
|
|
def make() -> CachingEngine:
|
|
return CachingEngine(
|
|
spec_source,
|
|
cache_db_path=path,
|
|
in_memory=False,
|
|
tables=[TableSpec("products", ["id", "name"], preload=True)],
|
|
blocking_startup_refresh=True,
|
|
)
|
|
|
|
ce1 = make()
|
|
assert ce1.stats.misses >= 1 # cold preload had to load from source
|
|
ce1.close()
|
|
|
|
ce2 = make()
|
|
assert ce2._cache.is_table_cached("products") is True
|
|
assert ce2.stats.misses == 0 # warm: preload was a cache hit, no redundant reload
|
|
ce2.close()
|