249 lines
7.9 KiB
Python
249 lines
7.9 KiB
Python
import sqlite3
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from sqlalchemy import create_engine
|
|
|
|
import sqlmem.engine as eng_mod
|
|
from sqlmem import CachingEngine, ReadOnlyError, UnsupportedQueryError
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture
|
|
def source_db(tmp_path):
|
|
"""File-based SQLite source with two pre-populated tables."""
|
|
db_path = tmp_path / "source.db"
|
|
conn = sqlite3.connect(db_path)
|
|
conn.execute("CREATE TABLE products (id TEXT, name TEXT, price TEXT)")
|
|
conn.executemany(
|
|
"INSERT INTO products VALUES (?, ?, ?)",
|
|
[("1", "Widget", "9.99"), ("2", "Gadget", "19.99"), ("3", "Doohickey", "4.99")],
|
|
)
|
|
conn.execute("CREATE TABLE orders (order_id TEXT, product_id TEXT, qty TEXT)")
|
|
conn.executemany(
|
|
"INSERT INTO orders VALUES (?, ?, ?)",
|
|
[("101", "1", "2"), ("102", "2", "1")],
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return db_path
|
|
|
|
|
|
@pytest.fixture
|
|
def source_engine(source_db):
|
|
engine = create_engine(f"sqlite:///{source_db}")
|
|
yield engine
|
|
engine.dispose()
|
|
|
|
|
|
@pytest.fixture
|
|
def cache_path(tmp_path):
|
|
return tmp_path / "cache.db"
|
|
|
|
|
|
@pytest.fixture
|
|
def engine(source_engine, cache_path, monkeypatch):
|
|
"""CachingEngine pointed at a temp cache DB."""
|
|
monkeypatch.setattr(eng_mod, "CACHE_DB_PATH", cache_path)
|
|
monkeypatch.setattr(eng_mod, "BACKUP_INTERVAL_SECONDS", 9999)
|
|
ce = CachingEngine(source_engine)
|
|
yield ce
|
|
ce.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Basic SELECT execution (in-memory)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_select_returns_list_of_dicts(engine):
|
|
rows = engine.execute("SELECT id, name FROM products")
|
|
assert isinstance(rows, list)
|
|
assert all(isinstance(r, dict) for r in rows)
|
|
|
|
|
|
def test_select_correct_row_count(engine):
|
|
assert len(engine.execute("SELECT id, name FROM products")) == 3
|
|
|
|
|
|
def test_select_correct_values(engine):
|
|
rows = engine.execute("SELECT id, name FROM products")
|
|
assert {r["name"] for r in rows} == {"Widget", "Gadget", "Doohickey"}
|
|
|
|
|
|
def test_select_with_where_clause(engine):
|
|
rows = engine.execute("SELECT id, price FROM products WHERE id = '1'")
|
|
assert len(rows) == 1
|
|
assert rows[0]["price"] == "9.99"
|
|
|
|
|
|
def test_select_with_order_and_limit(engine):
|
|
rows = engine.execute("SELECT id, name FROM products ORDER BY id LIMIT 2")
|
|
assert len(rows) == 2
|
|
assert rows[0]["id"] == "1"
|
|
|
|
|
|
def test_select_different_table(engine):
|
|
rows = engine.execute("SELECT order_id, qty FROM orders")
|
|
assert len(rows) == 2
|
|
|
|
|
|
def test_where_on_non_selected_column(engine):
|
|
"""WHERE references a column not in SELECT — parser must extract it for the cache."""
|
|
rows = engine.execute("SELECT name FROM products WHERE price = '9.99'")
|
|
assert len(rows) == 1
|
|
assert rows[0]["name"] == "Widget"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# In-memory caching behaviour
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_cache_hit_survives_source_deletion(engine, source_db):
|
|
engine.execute("SELECT id, name FROM products")
|
|
# Wipe source — cache must still answer
|
|
conn = sqlite3.connect(source_db)
|
|
conn.execute("DELETE FROM products")
|
|
conn.commit()
|
|
conn.close()
|
|
rows = engine.execute("SELECT id, name FROM products")
|
|
assert len(rows) == 3
|
|
|
|
|
|
def test_new_column_triggers_refetch(engine):
|
|
engine.execute("SELECT id FROM products")
|
|
rows = engine.execute("SELECT id, name FROM products")
|
|
assert "Widget" in {r["name"] for r in rows}
|
|
|
|
|
|
def test_second_query_same_columns_is_cache_hit(engine):
|
|
engine.execute("SELECT id, name FROM products")
|
|
assert engine._cache.is_table_cached("products") is True
|
|
rows = engine.execute("SELECT id, name FROM products")
|
|
assert len(rows) == 3
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SQL file creation — backup to disk
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_close_creates_sql_file(engine, cache_path):
|
|
engine.execute("SELECT id, name FROM products")
|
|
engine.close()
|
|
assert cache_path.exists()
|
|
|
|
|
|
def test_sql_file_is_valid_sqlite(engine, cache_path):
|
|
engine.execute("SELECT id, name FROM products")
|
|
engine.close()
|
|
conn = sqlite3.connect(cache_path)
|
|
tables = {t[0] for t in conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table'"
|
|
).fetchall()}
|
|
conn.close()
|
|
assert "_sqlmem_tables" in tables
|
|
assert "products" in tables
|
|
|
|
|
|
def test_sql_file_contains_cached_rows(engine, cache_path):
|
|
engine.execute("SELECT id, name FROM products")
|
|
engine.close()
|
|
conn = sqlite3.connect(cache_path)
|
|
rows = conn.execute("SELECT id, name FROM products").fetchall()
|
|
conn.close()
|
|
assert len(rows) == 3
|
|
|
|
|
|
def test_sql_file_meta_table_present(engine, cache_path):
|
|
engine.execute("SELECT id FROM products")
|
|
engine.close()
|
|
conn = sqlite3.connect(cache_path)
|
|
row = conn.execute(
|
|
"SELECT value FROM _sqlmem_meta WHERE key = 'schema_version'"
|
|
).fetchone()
|
|
conn.close()
|
|
assert row is not None
|
|
assert int(row[0]) >= 1
|
|
|
|
|
|
def test_reload_from_disk_file(source_engine, cache_path, monkeypatch):
|
|
"""New CachingEngine picks up table cached by a previous instance."""
|
|
monkeypatch.setattr(eng_mod, "CACHE_DB_PATH", cache_path)
|
|
monkeypatch.setattr(eng_mod, "BACKUP_INTERVAL_SECONDS", 9999)
|
|
|
|
ce1 = CachingEngine(source_engine)
|
|
ce1.execute("SELECT id, name FROM products")
|
|
ce1.close()
|
|
|
|
ce2 = CachingEngine(source_engine)
|
|
assert ce2._cache.is_table_cached("products") is True
|
|
ce2.close()
|
|
|
|
|
|
def test_reload_data_intact_after_restart(source_engine, cache_path, monkeypatch):
|
|
monkeypatch.setattr(eng_mod, "CACHE_DB_PATH", cache_path)
|
|
monkeypatch.setattr(eng_mod, "BACKUP_INTERVAL_SECONDS", 9999)
|
|
|
|
ce1 = CachingEngine(source_engine)
|
|
ce1.execute("SELECT id, name FROM products")
|
|
ce1.close()
|
|
|
|
ce2 = CachingEngine(source_engine)
|
|
rows = ce2.execute("SELECT id, name FROM products")
|
|
ce2.close()
|
|
assert {r["name"] for r in rows} == {"Widget", "Gadget", "Doohickey"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Error handling
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_insert_raises_readonly(engine):
|
|
with pytest.raises(ReadOnlyError):
|
|
engine.execute("INSERT INTO products VALUES ('4', 'New', '1.00')")
|
|
|
|
|
|
def test_update_raises_readonly(engine):
|
|
with pytest.raises(ReadOnlyError):
|
|
engine.execute("UPDATE products SET price = '0' WHERE id = '1'")
|
|
|
|
|
|
def test_delete_raises_readonly(engine):
|
|
with pytest.raises(ReadOnlyError):
|
|
engine.execute("DELETE FROM products WHERE id = '1'")
|
|
|
|
|
|
def test_join_raises_unsupported(engine):
|
|
with pytest.raises(UnsupportedQueryError):
|
|
engine.execute(
|
|
"SELECT p.name, o.qty FROM products p JOIN orders o ON p.id = o.product_id"
|
|
)
|
|
|
|
|
|
def test_select_star_raises_unsupported(engine):
|
|
with pytest.raises(UnsupportedQueryError):
|
|
engine.execute("SELECT * FROM products")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cache invalidation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_invalidate_marks_table_absent(engine):
|
|
engine.execute("SELECT id, name FROM products")
|
|
engine.invalidate("products")
|
|
assert engine._cache.is_table_cached("products") is False
|
|
|
|
|
|
def test_invalidate_then_refetch_works(engine):
|
|
engine.execute("SELECT id, name FROM products")
|
|
engine.invalidate("products")
|
|
rows = engine.execute("SELECT id, name FROM products")
|
|
assert len(rows) == 3
|
|
|
|
|
|
def test_invalidate_unknown_table_is_noop(engine):
|
|
engine.invalidate("nonexistent_table") # must not raise
|