138 lines
4.5 KiB
Python
138 lines
4.5 KiB
Python
import sqlite3
|
|
|
|
import pytest
|
|
from sqlalchemy import create_engine
|
|
|
|
import sqlmem.engine as eng_mod
|
|
from sqlmem import CachingEngine, DeltaConfig
|
|
from sqlmem.cache import CacheManager
|
|
from sqlmem.executor import QueryExecutor
|
|
from sqlmem.parser import parse
|
|
from sqlmem.registry import ColumnRegistry
|
|
from sqlmem.stats import StatsCollector
|
|
|
|
|
|
@pytest.fixture
|
|
def source_conn():
|
|
conn = sqlite3.connect(":memory:")
|
|
conn.executescript(
|
|
"""
|
|
CREATE TABLE products (id TEXT, name TEXT, price TEXT);
|
|
INSERT INTO products VALUES ('1', 'Widget', '9.99'), ('2', 'Gadget', '19.99');
|
|
"""
|
|
)
|
|
conn.commit()
|
|
yield conn
|
|
conn.close()
|
|
|
|
|
|
def make_executor(tmp_path, source_conn, ttl):
|
|
cache = CacheManager(db_path=tmp_path / "cache.db", backup_interval=9999)
|
|
registry = ColumnRegistry(cache.connection)
|
|
stats = StatsCollector()
|
|
executor = QueryExecutor(cache, registry, source_conn, stats, None, ttl)
|
|
return executor
|
|
|
|
|
|
def run(executor, sql, params=None):
|
|
return executor.execute(parse(sql, params))
|
|
|
|
|
|
# --- lazy (read-time) guarantee --------------------------------------------
|
|
|
|
|
|
def test_ttl_zero_reloads_every_access(tmp_path, source_conn):
|
|
executor = make_executor(tmp_path, source_conn, ttl={"products": 0})
|
|
run(executor, "SELECT id, price FROM products") # miss → load
|
|
source_conn.execute("UPDATE products SET price = '1.11' WHERE id = '1'")
|
|
source_conn.commit()
|
|
|
|
rows = {r["id"]: r for r in run(executor, "SELECT id, price FROM products")}
|
|
assert rows["1"]["price"] == "1.11" # stale → reloaded, sees new value
|
|
assert executor._stats.refetches == 1
|
|
assert executor._stats.misses == 1
|
|
|
|
|
|
def test_ttl_fresh_is_cache_hit(tmp_path, source_conn):
|
|
executor = make_executor(tmp_path, source_conn, ttl={"products": 9999})
|
|
run(executor, "SELECT id, price FROM products")
|
|
source_conn.execute("UPDATE products SET price = '1.11' WHERE id = '1'")
|
|
source_conn.commit()
|
|
|
|
rows = {r["id"]: r for r in run(executor, "SELECT id, price FROM products")}
|
|
assert rows["1"]["price"] == "9.99" # still fresh → old cached value served
|
|
assert executor._stats.hits == 1
|
|
assert executor._stats.refetches == 0
|
|
|
|
|
|
def test_ttl_preserves_full_status(tmp_path, source_conn):
|
|
executor = make_executor(tmp_path, source_conn, ttl={"products": 0})
|
|
run(executor, "SELECT * FROM products") # full load
|
|
run(executor, "SELECT * FROM products") # stale → full reload
|
|
assert executor._cache.is_table_full("products") is True
|
|
|
|
|
|
def test_untracked_table_never_expires(tmp_path, source_conn):
|
|
executor = make_executor(tmp_path, source_conn, ttl={"other": 0})
|
|
run(executor, "SELECT id, name FROM products")
|
|
source_conn.execute("UPDATE products SET name = 'X' WHERE id = '1'")
|
|
source_conn.commit()
|
|
rows = {r["id"]: r for r in run(executor, "SELECT id, name FROM products")}
|
|
assert rows["1"]["name"] == "Widget" # no TTL on this table → cache hit
|
|
assert executor._stats.hits == 1
|
|
|
|
|
|
# --- engine-level: background refresh + config validation -------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def source_db(tmp_path):
|
|
db_path = tmp_path / "source.db"
|
|
conn = sqlite3.connect(db_path)
|
|
conn.executescript(
|
|
"""
|
|
CREATE TABLE products (id TEXT PRIMARY KEY, name TEXT, changed TEXT);
|
|
INSERT INTO products VALUES ('1', 'Widget', '2026-06-01 10:00:00');
|
|
"""
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return db_path
|
|
|
|
|
|
@pytest.fixture
|
|
def source_engine(source_db):
|
|
engine = create_engine(f"sqlite:///{source_db}")
|
|
yield engine
|
|
engine.dispose()
|
|
|
|
|
|
@pytest.fixture
|
|
def patched_cache(tmp_path, monkeypatch):
|
|
monkeypatch.setattr(eng_mod, "CACHE_DB_PATH", tmp_path / "cache.db")
|
|
monkeypatch.setattr(eng_mod, "BACKUP_INTERVAL_SECONDS", 9999)
|
|
|
|
|
|
def test_background_ttl_refresh(source_engine, source_db, patched_cache):
|
|
engine = CachingEngine(source_engine, ttl={"products": 0})
|
|
engine.execute("SELECT id, name FROM products")
|
|
|
|
conn = sqlite3.connect(source_db)
|
|
conn.execute("UPDATE products SET name = 'Widget2' WHERE id = '1'")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
engine.refresh() # background-style full reload of the expired table
|
|
rows = engine.execute("SELECT id, name FROM products")
|
|
assert rows[0]["name"] == "Widget2"
|
|
engine.close()
|
|
|
|
|
|
def test_delta_and_ttl_overlap_raises(source_engine, patched_cache):
|
|
with pytest.raises(ValueError):
|
|
CachingEngine(
|
|
source_engine,
|
|
delta={"products": DeltaConfig(change_column="changed", key_columns=["id"])},
|
|
ttl={"products": 300},
|
|
)
|