Wire datetime_columns through query params and reads; add db_size and vacuum guard

This commit is contained in:
Jan Doubravský
2026-06-10 13:58:29 +02:00
parent 8e46ee3547
commit a68b8994e3
13 changed files with 383 additions and 20 deletions
+68
View File
@@ -421,3 +421,71 @@ def test_engine_vacuum_runs(source_engine, tmp_path):
ce.vacuum(incremental=False) # must not raise
assert ce._cache.is_table_cached("products") is True
ce.close()
# ---------------------------------------------------------------------------
# datetime_columns end-to-end: param coercion (A) + read-back datetime (B)
# ---------------------------------------------------------------------------
@pytest.fixture
def events_engine(tmp_path):
src = tmp_path / "events.db"
conn = sqlite3.connect(src)
conn.execute("CREATE TABLE events (id TEXT, changed TEXT)")
conn.executemany(
"INSERT INTO events VALUES (?, ?)",
[("1", "2026-06-01T10:00:00"), ("2", "2026-06-03T10:00:00")],
)
conn.commit()
conn.close()
se = create_engine(f"sqlite:///{src}")
yield se
se.dispose()
def test_datetime_column_where_and_readback(events_engine, tmp_path):
from datetime import datetime, timezone
ce = CachingEngine(
events_engine,
cache_db_path=tmp_path / "cache.db",
in_memory=False,
datetime_columns={"events": ["changed"]},
)
# A: WHERE on the INTEGER-µs column with an ISO string param returns the right row.
rows = ce.execute(
"SELECT id, changed FROM events WHERE changed > ?", ("2026-06-02T00:00:00",)
)
assert [r["id"] for r in rows] == ["2"]
# B: the column comes back as a datetime, not a raw integer.
assert rows[0]["changed"] == datetime(2026, 6, 3, 10, 0, 0, tzinfo=timezone.utc)
ce.close()
def test_datetime_column_return_datetime_false(events_engine, tmp_path):
ce = CachingEngine(
events_engine,
cache_db_path=tmp_path / "cache.db",
in_memory=False,
datetime_columns={"events": ["changed"]},
return_datetime=False,
)
rows = ce.execute("SELECT id, changed FROM events")
assert all(isinstance(r["changed"], int) for r in rows) # opt-out → raw µs
ce.close()
# ---------------------------------------------------------------------------
# db_size_bytes in stats (D)
# ---------------------------------------------------------------------------
def test_stats_reports_db_size_in_disk_mode(source_engine, tmp_path):
ce = CachingEngine(source_engine, cache_db_path=tmp_path / "cache.db", in_memory=False)
ce.execute("SELECT id FROM products")
assert ce.stats.db_size_bytes > 0
ce.close()
def test_stats_db_size_zero_in_memory(engine):
engine.execute("SELECT id, name FROM products")
assert engine.stats.db_size_bytes == 0