Add initial SQLmem package structure with SQL parser, cache manager, column registry, and tests

This commit is contained in:
Jan Doubravský
2026-06-01 16:44:25 +02:00
parent 54879ef9d0
commit 74772cee4a
18 changed files with 835 additions and 0 deletions
View File
+61
View File
@@ -0,0 +1,61 @@
import sqlite3
from pathlib import Path
import pytest
from sqlmem.cache import CacheManager
@pytest.fixture
def cache(tmp_path):
c = CacheManager(db_path=tmp_path / "test_cache.db", backup_interval=9999)
yield c
c.close()
@pytest.fixture
def source_conn():
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE users (name TEXT, email TEXT, status TEXT)")
conn.executemany(
"INSERT INTO users VALUES (?, ?, ?)",
[("alice", "alice@example.com", "active"), ("bob", "bob@example.com", "inactive")],
)
conn.commit()
yield conn
conn.close()
def test_table_not_cached_initially(cache):
assert cache.is_table_cached("users") is False
def test_load_table(cache, source_conn):
cache.load_table("users", ["name", "email"], source_conn)
assert cache.is_table_cached("users") is True
def test_loaded_data_correct(cache, source_conn):
cache.load_table("users", ["name", "email"], source_conn)
rows = cache.connection.execute("SELECT name, email FROM users").fetchall()
assert len(rows) == 2
assert ("alice", "alice@example.com") in rows
def test_mark_table_refreshed(cache, source_conn):
cache.load_table("users", ["name"], source_conn)
row = cache.connection.execute(
"SELECT row_count FROM _sqlmem_tables WHERE table_name = 'users'"
).fetchone()
assert row[0] == 2
def test_backup_and_reload(tmp_path, source_conn):
db_path = tmp_path / "cache.db"
c1 = CacheManager(db_path=db_path, backup_interval=9999)
c1.load_table("users", ["name"], source_conn)
c1.close()
c2 = CacheManager(db_path=db_path, backup_interval=9999)
assert c2.is_table_cached("users") is True
c2.close()
+43
View File
@@ -0,0 +1,43 @@
import pytest
from sqlmem.exceptions import ReadOnlyError, UnsupportedQueryError
from sqlmem.parser import parse
def test_simple_select():
result = parse("SELECT name, email FROM users WHERE status = 'active'")
assert result.table == "users"
# WHERE columns are also extracted — needed for in-memory SQLite filtering
assert {"name", "email"}.issubset(set(result.columns))
assert "status" in result.columns
def test_multiple_columns():
result = parse("SELECT a, b, c FROM orders")
assert result.table == "orders"
assert set(result.columns) == {"a", "b", "c"}
def test_insert_raises_readonly():
with pytest.raises(ReadOnlyError):
parse("INSERT INTO users (name) VALUES ('alice')")
def test_update_raises_readonly():
with pytest.raises(ReadOnlyError):
parse("UPDATE users SET name = 'bob' WHERE id = 1")
def test_delete_raises_readonly():
with pytest.raises(ReadOnlyError):
parse("DELETE FROM users WHERE id = 1")
def test_wildcard_raises_unsupported():
with pytest.raises(UnsupportedQueryError):
parse("SELECT * FROM users")
def test_join_raises_unsupported():
with pytest.raises(UnsupportedQueryError):
parse("SELECT a.name, b.title FROM users a JOIN orders b ON a.id = b.user_id")
+45
View File
@@ -0,0 +1,45 @@
import sqlite3
import pytest
from sqlmem.registry import ColumnRegistry
@pytest.fixture
def registry():
conn = sqlite3.connect(":memory:")
return ColumnRegistry(conn)
def test_empty_registry(registry):
assert registry.get_columns("users") == set()
def test_update_and_get(registry):
registry.update("users", ["name", "email"])
assert registry.get_columns("users") == {"name", "email"}
def test_update_accumulates(registry):
registry.update("users", ["name", "email"])
registry.update("users", ["email", "status"])
assert registry.get_columns("users") == {"name", "email", "status"}
def test_needs_refetch_missing(registry):
registry.update("users", ["name"])
missing = registry.needs_refetch("users", ["name", "email"])
assert missing == ["email"]
def test_needs_refetch_none_missing(registry):
registry.update("users", ["name", "email"])
missing = registry.needs_refetch("users", ["name"])
assert missing == []
def test_independent_tables(registry):
registry.update("users", ["name"])
registry.update("orders", ["total"])
assert registry.get_columns("users") == {"name"}
assert registry.get_columns("orders") == {"total"}