159 lines
5.1 KiB
Python
159 lines
5.1 KiB
Python
import sqlite3
|
|
import threading
|
|
|
|
import pytest
|
|
from sqlalchemy import create_engine
|
|
|
|
import sqlmem.engine as eng_mod
|
|
from sqlmem import CachingEngine, DeltaConfig
|
|
from sqlmem.cache import CacheManager
|
|
from sqlmem.stats import StatsCollector
|
|
|
|
|
|
@pytest.fixture
|
|
def source_engine(tmp_path):
|
|
db_path = tmp_path / "source.db"
|
|
conn = sqlite3.connect(db_path)
|
|
conn.executescript(
|
|
"""
|
|
CREATE TABLE products (id TEXT PRIMARY KEY, name TEXT, changed TEXT);
|
|
INSERT INTO products VALUES ('1', 'Widget', '2026-06-01 10:00:00');
|
|
"""
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
engine = create_engine(f"sqlite:///{db_path}")
|
|
yield engine
|
|
engine.dispose()
|
|
|
|
|
|
@pytest.fixture
|
|
def patched_cache(tmp_path, monkeypatch):
|
|
monkeypatch.setattr(eng_mod, "CACHE_DB_PATH", tmp_path / "cache.db")
|
|
monkeypatch.setattr(eng_mod, "BACKUP_INTERVAL_SECONDS", 9999)
|
|
|
|
|
|
def test_static_table_state_and_tracking(source_engine, patched_cache):
|
|
engine = CachingEngine(source_engine)
|
|
engine.execute("SELECT id, name FROM products")
|
|
s = engine.stats.tables["products"]
|
|
assert s.state == "ready"
|
|
assert s.tracking == "static"
|
|
assert s.rows == 1
|
|
engine.close()
|
|
|
|
|
|
def test_delta_table_tracking(source_engine, patched_cache):
|
|
engine = CachingEngine(
|
|
source_engine, delta={"products": DeltaConfig("changed", ["id"])}
|
|
)
|
|
engine.execute("SELECT id, name FROM products")
|
|
s = engine.stats.tables["products"]
|
|
assert s.tracking == "delta"
|
|
assert s.state == "ready"
|
|
engine.close()
|
|
|
|
|
|
def test_ttl_table_reports_stale(source_engine, patched_cache):
|
|
engine = CachingEngine(source_engine, ttl={"products": 0})
|
|
engine.execute("SELECT id, name FROM products")
|
|
s = engine.stats.tables["products"]
|
|
assert s.tracking == "ttl"
|
|
assert s.state == "stale" # ttl=0 → already past its max age
|
|
engine.close()
|
|
|
|
|
|
def test_counters_still_reported(source_engine, patched_cache):
|
|
engine = CachingEngine(source_engine)
|
|
engine.execute("SELECT id, name FROM products")
|
|
engine.execute("SELECT id, name FROM products")
|
|
stats = engine.stats
|
|
assert stats.misses == 1
|
|
assert stats.hits == 1
|
|
engine.close()
|
|
|
|
|
|
def test_stats_exposes_table_error(source_engine, patched_cache):
|
|
engine = CachingEngine(source_engine)
|
|
engine.execute("SELECT id, name FROM products")
|
|
engine._cache.record_error("products", "ValueError: boom")
|
|
|
|
s = engine.stats
|
|
assert s.errors == 1
|
|
assert s.tables["products"].consecutive_failures == 1
|
|
assert s.tables["products"].last_error == "ValueError: boom"
|
|
assert s.tables["products"].last_error_at is not None
|
|
engine.close()
|
|
|
|
|
|
def test_stats_no_error_by_default(source_engine, patched_cache):
|
|
engine = CachingEngine(source_engine)
|
|
engine.execute("SELECT id, name FROM products")
|
|
s = engine.stats
|
|
assert s.errors == 0
|
|
assert s.tables["products"].consecutive_failures == 0
|
|
assert s.tables["products"].last_error is None
|
|
engine.close()
|
|
|
|
|
|
def test_stats_exposes_last_upsert_and_last_refresh(source_engine, patched_cache):
|
|
engine = CachingEngine(source_engine)
|
|
engine.execute("SELECT id, name FROM products")
|
|
s = engine.stats.tables["products"]
|
|
assert s.last_upsert is not None # the load wrote rows (persisted)
|
|
assert s.last_refresh is not None # the load also counts as a refresh-cycle run
|
|
engine.close()
|
|
|
|
|
|
# --- a table being loaded for the first time shows up as "loading" ----------
|
|
|
|
|
|
def test_snapshot_surfaces_a_loading_table(tmp_path):
|
|
cache = CacheManager(db_path=tmp_path / "cache.db", backup_interval=9999)
|
|
snap = StatsCollector().snapshot(cache.connection, {"pending": "loading"})
|
|
assert "pending" in snap.tables
|
|
assert snap.tables["pending"].state == "loading"
|
|
assert snap.tables["pending"].rows == 0
|
|
cache.close()
|
|
|
|
|
|
def test_loading_state_visible_from_another_thread_during_load(tmp_path):
|
|
"""A first load in progress is observable as 'loading' from another thread."""
|
|
cache = CacheManager(db_path=tmp_path / "cache.db", backup_interval=9999)
|
|
started = threading.Event()
|
|
release = threading.Event()
|
|
|
|
class BlockingCursor:
|
|
def __init__(self, rows):
|
|
self._rows = list(rows)
|
|
self._done = False
|
|
|
|
def fetchmany(self, size):
|
|
if self._done:
|
|
return []
|
|
started.set()
|
|
release.wait(5) # hold the load open until the test releases it
|
|
self._done = True
|
|
return self._rows
|
|
|
|
class BlockingSource:
|
|
def execute(self, sql):
|
|
return BlockingCursor([("1", "alice")])
|
|
|
|
loader = threading.Thread(
|
|
target=cache.load_table, args=("users", ["id", "name"], BlockingSource())
|
|
)
|
|
loader.start()
|
|
try:
|
|
assert started.wait(5), "load did not start"
|
|
# mid-load: not yet in _sqlmem_tables, but surfaced as loading
|
|
assert cache.get_states()["users"] == "loading"
|
|
snap = StatsCollector().snapshot(cache.connection, cache.get_states())
|
|
assert snap.tables["users"].state == "loading"
|
|
finally:
|
|
release.set()
|
|
loader.join(5)
|
|
assert not loader.is_alive()
|
|
assert cache.get_states()["users"] == "ready"
|
|
cache.close()
|