From 74772cee4ab4d27f5cb52f4e03c813a1d3bd0f72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Doubravsk=C3=BD?= Date: Mon, 1 Jun 2026 16:44:25 +0200 Subject: [PATCH] Add initial SQLmem package structure with SQL parser, cache manager, column registry, and tests --- .env | 1 + .gitignore | 41 ++++++++ CHANGELOG.md | 23 +++++ project.md | 202 +++++++++++++++++++++++++++++++++++++++ pyproject.toml | 27 ++++++ src/sqlmem/__init__.py | 4 + src/sqlmem/_meta.py | 1 + src/sqlmem/cache.py | 158 ++++++++++++++++++++++++++++++ src/sqlmem/config.py | 19 ++++ src/sqlmem/engine.py | 43 +++++++++ src/sqlmem/exceptions.py | 6 ++ src/sqlmem/executor.py | 42 ++++++++ src/sqlmem/parser.py | 71 ++++++++++++++ src/sqlmem/registry.py | 48 ++++++++++ tests/__init__.py | 0 tests/test_cache.py | 61 ++++++++++++ tests/test_parser.py | 43 +++++++++ tests/test_registry.py | 45 +++++++++ 18 files changed, 835 insertions(+) create mode 100644 .env create mode 100644 .gitignore create mode 100644 CHANGELOG.md create mode 100644 project.md create mode 100644 pyproject.toml create mode 100644 src/sqlmem/__init__.py create mode 100644 src/sqlmem/_meta.py create mode 100644 src/sqlmem/cache.py create mode 100644 src/sqlmem/config.py create mode 100644 src/sqlmem/engine.py create mode 100644 src/sqlmem/exceptions.py create mode 100644 src/sqlmem/executor.py create mode 100644 src/sqlmem/parser.py create mode 100644 src/sqlmem/registry.py create mode 100644 tests/__init__.py create mode 100644 tests/test_cache.py create mode 100644 tests/test_parser.py create mode 100644 tests/test_registry.py diff --git a/.env b/.env new file mode 100644 index 0000000..124cdd9 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +ENV_DEBUG = true \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9490572 --- /dev/null +++ b/.gitignore @@ -0,0 +1,41 @@ +# Python +__pycache__/ +*.py[cod] +*.pyo +*.pyd +*.egg +*.egg-info/ +dist/ +build/ +wheels/ + +# Virtual environments +.venv/ +venv/ +env/ + +# Poetry +poetry.lock + +# pytest +.pytest_cache/ +htmlcov/ +.coverage +coverage.xml + +# mypy / type checkers +.mypy_cache/ +.ruff_cache/ + +# IDE +.vscode/ +.idea/ + +# OS +.DS_Store +Thumbs.db + +# Agents +AGENTS.md +CLAUDE.md +DESIGN_DOCUMENT_MODULE.md \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..cbb8879 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,23 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [Unreleased] + +### Added +- Project specification in `project.md` — architecture, API design, cache backend, metadata schema, logging strategy, and TODO for future features (JOIN, SELECT * support) +- `.gitignore` for Python/Poetry project +- `pyproject.toml` dependencies: `sqlglot`, `sqlalchemy`, `loguru`, `python-dotenv`; dev dependencies: `pytest`, `ruff`, `mypy` +- `src/sqlmem/` package structure with src layout +- `src/sqlmem/exceptions.py` — `ReadOnlyError` (blocks INSERT/UPDATE/DELETE), `UnsupportedQueryError` (blocks JOIN and SELECT *) +- `src/sqlmem/config.py` — loads `.env`, configures `loguru` with DEBUG/INFO level based on `SQLMEM_DEBUG` +- `src/sqlmem/_meta.py` — package version constant +- `src/sqlmem/parser.py` — SQL Parser using `sqlglot`; extracts table and columns from SELECT, raises on writes/JOIN/wildcard +- `src/sqlmem/registry.py` — Column Registry; accumulates requested columns per table, detects missing columns requiring re-fetch +- `src/sqlmem/cache.py` — Cache Manager; SQLite in-memory storage, load from `cache.db` on startup (with schema version check), hourly backup thread, `atexit`/SIGTERM flush, metadata tables (`_sqlmem_meta`, `_sqlmem_tables`, `_sqlmem_columns`) +- `src/sqlmem/executor.py` — Query Executor; cache hit/miss logic, re-fetch on new columns with WARNING log +- `src/sqlmem/engine.py` — `CachingEngine` wrapper; public API compatible with SQLAlchemy, `invalidate(table)` for manual cache clearing +- `src/sqlmem/__init__.py` — public exports: `CachingEngine`, `ReadOnlyError`, `UnsupportedQueryError` +- `tests/test_parser.py` — parser tests: SELECT parsing, ReadOnlyError, UnsupportedQueryError +- `tests/test_cache.py` — cache tests: load, data correctness, metadata, disk backup/reload +- `tests/test_registry.py` — registry tests: accumulation, needs_refetch, table isolation diff --git a/project.md b/project.md new file mode 100644 index 0000000..1120387 --- /dev/null +++ b/project.md @@ -0,0 +1,202 @@ +# SQLmem — Project Specification + +## Cíl + +Python modul fungující jako **transparentní cache vrstva mezi SQLAlchemy a databází**. Aplikace volá SQLAlchemy stejně jako dosud — SQLmem sedí mezi nimi, zachytává SELECT dotazy a vrací výsledky z in-memory SQLite cache. Zápisy (INSERT/UPDATE/DELETE) jsou přepouštěny přímo do databáze bez zásahu. + +--- + +## Architektura + +``` +Aplikace (SQLAlchemy) + │ + ▼ + [ SQLmem Proxy ] + ┌───────────────────────────────┐ + │ SQL Parser │ → rozezná SELECT vs. zápis + │ Cache Manager (SQLite RAM) │ → drží data v paměti + │ Query Executor │ → cache hit / miss logika + └───────────────────────────────┘ + │ + ▼ + Databáze (přes původní SQLAlchemy engine) +``` + +--- + +## API + +Modul implementuje **SQLAlchemy event hooks / custom engine wrapper** — aplikace nemusí měnit způsob volání ani formát vrácených dat. + +```python +from sqlmem import CachingEngine +from sqlalchemy import create_engine + +base_engine = create_engine("postgresql://...") +engine = CachingEngine(base_engine) + +# Dál se pracuje stejně jako s běžným SQLAlchemy enginem: +with engine.connect() as conn: + result = conn.execute(text("SELECT * FROM users WHERE status = 'active'")) +``` + +`CachingEngine` vrací objekty kompatibilní s `CursorResult` — `fetchall()`, `fetchone()`, `keys()` atd. fungují stejně. + +--- + +## Cache backend + +- **SQLite in-memory** jako primární úložiště — veškeré dotazy běží v RAM. +- **Persistence na disk** (`cache.db`) ve třech situacích: + - **Při startu**: pokud soubor existuje, načte se do paměti (`ATTACH` + kopie). + - **Periodicky každou hodinu**: snapshot in-memory SQLite se zapíše na disk (backup API). + - **Při vypnutí**: finální zápis na disk před ukončením (signal handler + context manager). +- Celé tabulky se při cache miss načtou z databáze a drží v paměti. + +--- + +## Komponenty + +### 1. SQL Parser +- Detekuje typ dotazu (SELECT / zápis). +- Extrahuje názvy tabulek z FROM a JOIN klauzulí. +- Extrahuje seznam požadovaných sloupců. +- Detekuje `SELECT *` (wildcard) a JOIN — vyhodí `UnsupportedQueryError`. +- Rozhoduje, zda je dotaz obsloužitelný z cache. + +### 2. Column Registry + +Modul se **za běhu učí**, jaké sloupce z každé tabulky aplikace potřebuje — nevyžaduje žádnou předem danou konfiguraci. + +**Logika při každém příchozím dotazu:** + +1. Parser detekuje `SELECT *` nebo JOIN → vyhodí `UnsupportedQueryError` (není implementováno). +2. Parser extrahuje `(tabulka, sloupce)` z dotazu. +3. Registry provede **union** nově požadovaných sloupců s již známými. +4. Cache Manager zkontroluje, zda cache pro danou tabulku obsahuje všechny potřebné sloupce: + - **Ano** → dotaz jde přímo do SQLite RAM (cache hit). + - **Ne** → re-fetch tabulky z DB s rozšířenou sadou sloupců → přepíše cache → dotaz do SQLite RAM. + +**Příklad akumulace sloupců:** + +``` +Dotaz 1: SELECT A, B FROM T3 → Registry: T3 = {A, B} → fetch T3(A,B) z DB +Dotaz 2: SELECT A, D FROM T3 → Registry: T3 = {A, B, D} → re-fetch T3(A,B,D) z DB +Dotaz 3: SELECT B FROM T3 → cache hit, žádný DB dotaz +Dotaz 4: SELECT * FROM T3 → UnsupportedQueryError (wildcard není podporován) +Dotaz 5: SELECT A FROM T3 JOIN T4 ... → UnsupportedQueryError (JOIN není podporován) +``` + +**Metadata tabulka `_sqlmem_columns`** (uložena v SQLite): + +```sql +CREATE TABLE _sqlmem_columns ( + table_name TEXT NOT NULL, + column_name TEXT NOT NULL, + PRIMARY KEY (table_name, column_name) +); +``` + +- Při startu se načte z `cache.db` — Registry ví, co bylo kešováno v minulé session. +- Při každém rozšíření sady sloupců se záznam aktualizuje. + +--- + +### 3. Cache Manager +- Drží in-memory SQLite instanci. +- Sleduje, které tabulky jsou již načteny. +- Podporuje TTL (Time-to-Live) pro automatické vypršení cache. +- Umožňuje manuální invalidaci konkrétní tabulky. +- **Persistence**: + - Při inicializaci načte `cache.db` ze disku (pokud existuje) do paměti. + - Spustí background vlákno, které každou hodinu provede `sqlite3` backup API (`conn.backup(disk_conn)`). + - Zaregistruje `atexit` handler a `SIGTERM` handler pro finální zápis při vypnutí. +- **Metadata tabulky** (`_sqlmem_meta`) uložená přímo v SQLite cache: + +```sql +CREATE TABLE _sqlmem_meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL +); +-- Záznamy: +-- app_version verze sqlmem balíčku (např. "0.1.0") +-- schema_version verze schématu cache.db (integer, pro migrace) +-- created_at ISO timestamp prvního vytvoření cache.db + +CREATE TABLE _sqlmem_tables ( + table_name TEXT PRIMARY KEY, + last_refresh_at TEXT NOT NULL, -- ISO 8601 UTC timestamp + row_count INTEGER +); +``` + + - `last_refresh_at` se aktualizuje pokaždé, když se tabulka znovu načte z databáze. + - Při načtení `cache.db` ze disku se zkontroluje `schema_version` — pokud nesedí, cache se zahodí a načte znovu z DB. + +### 3. Query Executor +- **Cache hit**: Spustí dotaz přímo v in-memory SQLite, vrátí výsledek. +- **Cache miss**: Načte potřebné tabulky z databáze → uloží do cache → spustí dotaz. +- **Zápisy (INSERT / UPDATE / DELETE)**: Vyhodí výjimku `ReadOnlyError` a dotaz zablokuje. Modul je striktně read-only. + +--- + +## Rozsah MVP + +- [ ] `CachingEngine` wrapper kompatibilní se SQLAlchemy +- [ ] Načtení `cache.db` při startu do in-memory SQLite +- [ ] Periodický zápis na disk každou hodinu (background vlákno) +- [ ] Zápis na disk při vypnutí (`atexit` + `SIGTERM`) +- [ ] Parser pro detekci tabulek, sloupců a typu dotazu +- [ ] Column Registry s akumulační logikou (union sloupců, wildcard detekce) +- [ ] Re-fetch cache při rozšíření sady sloupců +- [ ] Cache Manager s SQLite in-memory backendem +- [ ] Cache hit/miss logika v Query Executoru +- [ ] TTL podpora na úrovni tabulky +- [ ] Manuální invalidace cache (`engine.cache.invalidate("tabulka")`) +- [ ] Testy pokrývající cache hit, cache miss a blokování zápisů (`ReadOnlyError`) + +--- + +## Co modul NEŘEŠÍ (mimo scope) + +- INSERT/UPDATE/DELETE — tyto operace jsou **zakázány** a vyhodí `ReadOnlyError` +- Redis nebo jiný distribuovaný backend +- Transakční konzistence + +--- + +## Logování + +Řízeno přes `loguru`. Úroveň se nastavuje v `.env`: + +```env +SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache operace, backup + # false (výchozí) — INFO a výše +``` + +| Level | Kdy | +|---|---| +| `DEBUG` | Každý příchozí dotaz, extrahované tabulky/sloupce, cache hit/miss/re-fetch, backup start/konec | +| `INFO` | Start/stop modulu, načtení cache.db, periodický backup, refresh tabulky | +| `WARNING` | Re-fetch tabulky kvůli novým sloupcům, blížící se TTL expirace | +| `ERROR` | `ReadOnlyError`, `UnsupportedQueryError`, selhání připojení k DB, selhání zápisu cache.db | + +--- + +## TODO — budoucí funkce + +- **Podpora `SELECT *` (wildcard)**: Načte celou tabulku do cache, označí ji jako `full` — další dotazy na libovolný sloupec jsou vždy cache hit bez re-fetch. +- **Podpora JOIN**: Parser extrahuje sloupce z každé joinované tabulky zvlášť, Column Registry je sleduje nezávisle. Cache Manager zajistí, že všechny potřebné tabulky jsou v paměti před spuštěním dotazu. + +--- + +## Technologie + +| Vrstva | Knihovna | +|---|---| +| SQL parsing | `sqlglot` | +| Cache úložiště | `sqlite3` (stdlib) | +| Integrace | SQLAlchemy events / engine wrapper | +| Logování | `loguru`, `python-dotenv` | +| Testování | `pytest` | diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9ce3ea4 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,27 @@ +[project] +name = "sqlmem" +version = "0.1.0" +description = "" +authors = [ + {name = "jan.doubravsky@gmail.com"} +] +readme = "README.md" +requires-python = ">=3.14,<3.15" +dependencies = [ + "sqlglot (>=30.8.0,<31.0.0)", + "sqlalchemy (>=2.0.50,<3.0.0)", + "loguru (>=0.7.3,<0.8.0)", + "python-dotenv (>=1.2.2,<2.0.0)" +] + + +[build-system] +requires = ["poetry-core>=2.0.0,<3.0.0"] +build-backend = "poetry.core.masonry.api" + +[dependency-groups] +dev = [ + "pytest (>=9.0.3,<10.0.0)", + "ruff (>=0.15.15,<0.16.0)", + "mypy (>=2.1.0,<3.0.0)" +] diff --git a/src/sqlmem/__init__.py b/src/sqlmem/__init__.py new file mode 100644 index 0000000..3e8a707 --- /dev/null +++ b/src/sqlmem/__init__.py @@ -0,0 +1,4 @@ +from .engine import CachingEngine +from .exceptions import ReadOnlyError, UnsupportedQueryError + +__all__ = ["CachingEngine", "ReadOnlyError", "UnsupportedQueryError"] diff --git a/src/sqlmem/_meta.py b/src/sqlmem/_meta.py new file mode 100644 index 0000000..3dc1f76 --- /dev/null +++ b/src/sqlmem/_meta.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/src/sqlmem/cache.py b/src/sqlmem/cache.py new file mode 100644 index 0000000..00ba0c3 --- /dev/null +++ b/src/sqlmem/cache.py @@ -0,0 +1,158 @@ +import atexit +import signal +import sqlite3 +import threading +from datetime import datetime, timezone +from pathlib import Path + +from loguru import logger + +import sqlmem._meta as _meta + +SCHEMA_VERSION = 1 + + +class CacheManager: + def __init__(self, db_path: Path, backup_interval: int) -> None: + self._db_path = db_path + self._backup_interval = backup_interval + self._mem_conn = sqlite3.connect(":memory:", check_same_thread=False) + self._lock = threading.Lock() + self._closed = False + + self._ensure_meta_tables() + self._load_from_disk() + self._start_backup_thread() + + atexit.register(self._backup_to_disk) + signal.signal(signal.SIGTERM, self._on_sigterm) + + @property + def connection(self) -> sqlite3.Connection: + return self._mem_conn + + def _ensure_meta_tables(self) -> None: + self._mem_conn.executescript(""" + CREATE TABLE IF NOT EXISTS _sqlmem_meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS _sqlmem_tables ( + table_name TEXT PRIMARY KEY, + last_refresh_at TEXT NOT NULL, + row_count INTEGER + ); + CREATE TABLE IF NOT EXISTS _sqlmem_columns ( + table_name TEXT NOT NULL, + column_name TEXT NOT NULL, + PRIMARY KEY (table_name, column_name) + ); + """) + self._mem_conn.execute( + "INSERT OR IGNORE INTO _sqlmem_meta (key, value) VALUES (?, ?)", + ("app_version", _meta.__version__), + ) + self._mem_conn.execute( + "INSERT OR IGNORE INTO _sqlmem_meta (key, value) VALUES (?, ?)", + ("schema_version", str(SCHEMA_VERSION)), + ) + self._mem_conn.execute( + "INSERT OR IGNORE INTO _sqlmem_meta (key, value) VALUES (?, ?)", + ("created_at", _now()), + ) + self._mem_conn.commit() + + def _load_from_disk(self) -> None: + if not self._db_path.exists(): + logger.info(f"No cache file found at {self._db_path}, starting fresh.") + return + + logger.info(f"Loading cache from {self._db_path}") + disk_conn = sqlite3.connect(self._db_path) + try: + schema_version = disk_conn.execute( + "SELECT value FROM _sqlmem_meta WHERE key = 'schema_version'" + ).fetchone() + if schema_version is None or int(schema_version[0]) != SCHEMA_VERSION: + logger.warning("Cache schema version mismatch — discarding cache file, starting fresh.") + disk_conn.close() + return + + disk_conn.backup(self._mem_conn) + logger.info("Cache loaded from disk successfully.") + except Exception as e: + logger.error(f"Failed to load cache from disk: {e} — starting fresh.") + finally: + disk_conn.close() + + def _backup_to_disk(self) -> None: + if self._closed: + return + logger.info(f"Backing up cache to {self._db_path}") + try: + with self._lock: + disk_conn = sqlite3.connect(self._db_path) + self._mem_conn.backup(disk_conn) + disk_conn.close() + logger.info("Cache backup complete.") + except Exception as e: + logger.error(f"Cache backup failed: {e}") + + def _start_backup_thread(self) -> None: + def loop() -> None: + event = threading.Event() + while not event.wait(self._backup_interval): + self._backup_to_disk() + + t = threading.Thread(target=loop, daemon=True, name="sqlmem-backup") + t.start() + logger.debug(f"Backup thread started (interval={self._backup_interval}s)") + + def _on_sigterm(self, signum, frame) -> None: + logger.info("SIGTERM received — flushing cache to disk.") + self._backup_to_disk() + + def mark_table_refreshed(self, table: str, row_count: int) -> None: + with self._lock: + self._mem_conn.execute( + """ + INSERT INTO _sqlmem_tables (table_name, last_refresh_at, row_count) + VALUES (?, ?, ?) + ON CONFLICT(table_name) DO UPDATE SET + last_refresh_at = excluded.last_refresh_at, + row_count = excluded.row_count + """, + (table, _now(), row_count), + ) + self._mem_conn.commit() + + def is_table_cached(self, table: str) -> bool: + row = self._mem_conn.execute( + "SELECT 1 FROM _sqlmem_tables WHERE table_name = ?", (table,) + ).fetchone() + return row is not None + + def load_table(self, table: str, columns: list[str], source_conn: sqlite3.Connection) -> None: + cols = ", ".join(columns) + logger.info(f"Fetching {table!r} columns [{cols}] from source DB") + rows = source_conn.execute(f"SELECT {cols} FROM {table}").fetchall() + + with self._lock: + self._mem_conn.execute(f"DROP TABLE IF EXISTS {table}") + col_defs = ", ".join(f"{c} TEXT" for c in columns) + self._mem_conn.execute(f"CREATE TABLE {table} ({col_defs})") + placeholders = ", ".join("?" * len(columns)) + self._mem_conn.executemany(f"INSERT INTO {table} VALUES ({placeholders})", rows) + self._mem_conn.commit() + + self.mark_table_refreshed(table, len(rows)) + logger.info(f"Table {table!r} cached ({len(rows)} rows, columns: {columns})") + + def close(self) -> None: + self._backup_to_disk() + self._closed = True + self._mem_conn.close() + + +def _now() -> str: + return datetime.now(timezone.utc).isoformat() diff --git a/src/sqlmem/config.py b/src/sqlmem/config.py new file mode 100644 index 0000000..f8b7771 --- /dev/null +++ b/src/sqlmem/config.py @@ -0,0 +1,19 @@ +import os +from pathlib import Path + +from dotenv import load_dotenv +from loguru import logger + +load_dotenv() + +DEBUG = os.getenv("SQLMEM_DEBUG", "false").lower() == "true" +CACHE_DB_PATH = Path(os.getenv("SQLMEM_CACHE_DB", "cache.db")) +BACKUP_INTERVAL_SECONDS = int(os.getenv("SQLMEM_BACKUP_INTERVAL", "3600")) + +logger.remove() +logger.add( + sink=lambda msg: print(msg, end=""), + level="DEBUG" if DEBUG else "INFO", + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{line} - {message}", + colorize=True, +) diff --git a/src/sqlmem/engine.py b/src/sqlmem/engine.py new file mode 100644 index 0000000..0325756 --- /dev/null +++ b/src/sqlmem/engine.py @@ -0,0 +1,43 @@ +import sqlite3 + +from loguru import logger +from sqlalchemy.engine import Engine + +from .cache import CacheManager +from .config import BACKUP_INTERVAL_SECONDS, CACHE_DB_PATH +from .executor import QueryExecutor +from .parser import parse +from .registry import ColumnRegistry + + +class CachingEngine: + """Transparent SQLAlchemy-compatible cache layer.""" + + def __init__(self, source_engine: Engine) -> None: + self._source_engine = source_engine + self._cache = CacheManager(CACHE_DB_PATH, BACKUP_INTERVAL_SECONDS) + self._registry = ColumnRegistry(self._cache.connection) + logger.info("CachingEngine initialized.") + + def execute(self, sql: str) -> list[dict]: + parsed = parse(sql) + with self._source_engine.connect() as sa_conn: + raw_conn: sqlite3.Connection = sa_conn.connection.dbapi_connection + executor = QueryExecutor(self._cache, self._registry, raw_conn) + return executor.execute(parsed) + + def invalidate(self, table: str) -> None: + logger.info(f"Manually invalidating cache for table {table!r}") + with self._cache._lock: + self._cache.connection.execute(f"DROP TABLE IF EXISTS {table}") + self._cache.connection.execute( + "DELETE FROM _sqlmem_tables WHERE table_name = ?", (table,) + ) + self._cache.connection.execute( + "DELETE FROM _sqlmem_columns WHERE table_name = ?", (table,) + ) + self._cache.connection.commit() + + def close(self) -> None: + self._cache.close() + logger.info("CachingEngine closed.") diff --git a/src/sqlmem/exceptions.py b/src/sqlmem/exceptions.py new file mode 100644 index 0000000..8a8bba4 --- /dev/null +++ b/src/sqlmem/exceptions.py @@ -0,0 +1,6 @@ +class ReadOnlyError(Exception): + """Raised when a write operation (INSERT/UPDATE/DELETE) is attempted.""" + + +class UnsupportedQueryError(Exception): + """Raised when a query uses unsupported features (JOIN, SELECT *).""" diff --git a/src/sqlmem/executor.py b/src/sqlmem/executor.py new file mode 100644 index 0000000..23c7ea4 --- /dev/null +++ b/src/sqlmem/executor.py @@ -0,0 +1,42 @@ +import sqlite3 + +from loguru import logger + +from .cache import CacheManager +from .parser import ParsedQuery +from .registry import ColumnRegistry + + +class QueryExecutor: + def __init__(self, cache: CacheManager, registry: ColumnRegistry, source_conn: sqlite3.Connection) -> None: + self._cache = cache + self._registry = registry + self._source_conn = source_conn + + def execute(self, parsed: ParsedQuery) -> list[dict]: + table = parsed.table + columns = parsed.columns + + missing = self._registry.needs_refetch(table, columns) + table_cached = self._cache.is_table_cached(table) + + if missing or not table_cached: + if table_cached and missing: + logger.warning( + f"Re-fetching {table!r} — new columns requested: {missing}. " + f"Expanding cache from {self._registry.get_columns(table)} + {missing}" + ) + all_columns = list(self._registry.get_columns(table)) + missing + self._cache.load_table(table, all_columns, self._source_conn) + self._registry.update(table, all_columns) + else: + logger.debug(f"Cache hit: {table!r} columns={columns}") + + return self._run_in_memory(parsed) + + def _run_in_memory(self, parsed: ParsedQuery) -> list[dict]: + logger.debug(f"Executing in SQLite RAM: {parsed.original_sql!r}") + cursor = self._cache.connection.execute(parsed.original_sql) + col_names = [desc[0] for desc in cursor.description] + rows = cursor.fetchall() + return [dict(zip(col_names, row)) for row in rows] diff --git a/src/sqlmem/parser.py b/src/sqlmem/parser.py new file mode 100644 index 0000000..2316654 --- /dev/null +++ b/src/sqlmem/parser.py @@ -0,0 +1,71 @@ +from dataclasses import dataclass + +import sqlglot +import sqlglot.expressions as exp +from loguru import logger + +from .exceptions import ReadOnlyError, UnsupportedQueryError + +WRITE_TYPES = (exp.Insert, exp.Update, exp.Delete) + + +@dataclass +class ParsedQuery: + table: str + columns: list[str] + original_sql: str + + +def parse(sql: str) -> ParsedQuery: + logger.debug(f"Parsing SQL: {sql!r}") + + statement = sqlglot.parse_one(sql) + + if isinstance(statement, WRITE_TYPES): + raise ReadOnlyError( + f"Write operations are not allowed. Attempted: {type(statement).__name__.upper()}" + ) + + if not isinstance(statement, exp.Select): + raise UnsupportedQueryError(f"Only SELECT statements are supported, got: {sql!r}") + + _check_joins(statement) + _check_wildcard(statement) + + table = _extract_table(statement) + columns = _extract_columns(statement) + + logger.debug(f"Parsed → table={table!r}, columns={columns}") + return ParsedQuery(table=table, columns=columns, original_sql=sql) + + +def _check_joins(statement: exp.Select) -> None: + if statement.find(exp.Join): + raise UnsupportedQueryError("JOIN is not supported yet. Use simple single-table SELECT.") + + +def _check_wildcard(statement: exp.Select) -> None: + for col in statement.find_all(exp.Column): + if isinstance(col.this, exp.Star): + raise UnsupportedQueryError("SELECT * is not supported yet. Specify columns explicitly.") + if statement.find(exp.Star): + raise UnsupportedQueryError("SELECT * is not supported yet. Specify columns explicitly.") + + +def _extract_table(statement: exp.Select) -> str: + from_clause = statement.find(exp.From) + if not from_clause: + raise UnsupportedQueryError("SELECT without FROM is not supported.") + table = from_clause.find(exp.Table) + if not table: + raise UnsupportedQueryError("Could not extract table name from query.") + return table.name + + +def _extract_columns(statement: exp.Select) -> list[str]: + columns = [] + for col in statement.find_all(exp.Column): + columns.append(col.name) + if not columns: + raise UnsupportedQueryError("Could not extract column names from query.") + return columns diff --git a/src/sqlmem/registry.py b/src/sqlmem/registry.py new file mode 100644 index 0000000..ac89032 --- /dev/null +++ b/src/sqlmem/registry.py @@ -0,0 +1,48 @@ +import sqlite3 +from threading import Lock + +from loguru import logger + + +class ColumnRegistry: + """Tracks which columns per table have been requested and are held in cache.""" + + def __init__(self, mem_conn: sqlite3.Connection) -> None: + self._conn = mem_conn + self._lock = Lock() + self._ensure_table() + + def _ensure_table(self) -> None: + self._conn.execute(""" + CREATE TABLE IF NOT EXISTS _sqlmem_columns ( + table_name TEXT NOT NULL, + column_name TEXT NOT NULL, + PRIMARY KEY (table_name, column_name) + ) + """) + self._conn.commit() + + def get_columns(self, table: str) -> set[str]: + rows = self._conn.execute( + "SELECT column_name FROM _sqlmem_columns WHERE table_name = ?", (table,) + ).fetchall() + return {row[0] for row in rows} + + def needs_refetch(self, table: str, requested: list[str]) -> list[str]: + """Returns columns that are requested but not yet in registry (missing columns).""" + known = self.get_columns(table) + missing = [c for c in requested if c not in known] + return missing + + def update(self, table: str, columns: list[str]) -> None: + with self._lock: + existing = self.get_columns(table) + new_columns = [c for c in columns if c not in existing] + if not new_columns: + return + self._conn.executemany( + "INSERT OR IGNORE INTO _sqlmem_columns (table_name, column_name) VALUES (?, ?)", + [(table, col) for col in new_columns], + ) + self._conn.commit() + logger.info(f"Registry updated: {table!r} now tracks columns {self.get_columns(table)}") diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_cache.py b/tests/test_cache.py new file mode 100644 index 0000000..dba7043 --- /dev/null +++ b/tests/test_cache.py @@ -0,0 +1,61 @@ +import sqlite3 +from pathlib import Path + +import pytest + +from sqlmem.cache import CacheManager + + +@pytest.fixture +def cache(tmp_path): + c = CacheManager(db_path=tmp_path / "test_cache.db", backup_interval=9999) + yield c + c.close() + + +@pytest.fixture +def source_conn(): + conn = sqlite3.connect(":memory:") + conn.execute("CREATE TABLE users (name TEXT, email TEXT, status TEXT)") + conn.executemany( + "INSERT INTO users VALUES (?, ?, ?)", + [("alice", "alice@example.com", "active"), ("bob", "bob@example.com", "inactive")], + ) + conn.commit() + yield conn + conn.close() + + +def test_table_not_cached_initially(cache): + assert cache.is_table_cached("users") is False + + +def test_load_table(cache, source_conn): + cache.load_table("users", ["name", "email"], source_conn) + assert cache.is_table_cached("users") is True + + +def test_loaded_data_correct(cache, source_conn): + cache.load_table("users", ["name", "email"], source_conn) + rows = cache.connection.execute("SELECT name, email FROM users").fetchall() + assert len(rows) == 2 + assert ("alice", "alice@example.com") in rows + + +def test_mark_table_refreshed(cache, source_conn): + cache.load_table("users", ["name"], source_conn) + row = cache.connection.execute( + "SELECT row_count FROM _sqlmem_tables WHERE table_name = 'users'" + ).fetchone() + assert row[0] == 2 + + +def test_backup_and_reload(tmp_path, source_conn): + db_path = tmp_path / "cache.db" + c1 = CacheManager(db_path=db_path, backup_interval=9999) + c1.load_table("users", ["name"], source_conn) + c1.close() + + c2 = CacheManager(db_path=db_path, backup_interval=9999) + assert c2.is_table_cached("users") is True + c2.close() diff --git a/tests/test_parser.py b/tests/test_parser.py new file mode 100644 index 0000000..3770fc0 --- /dev/null +++ b/tests/test_parser.py @@ -0,0 +1,43 @@ +import pytest + +from sqlmem.exceptions import ReadOnlyError, UnsupportedQueryError +from sqlmem.parser import parse + + +def test_simple_select(): + result = parse("SELECT name, email FROM users WHERE status = 'active'") + assert result.table == "users" + # WHERE columns are also extracted — needed for in-memory SQLite filtering + assert {"name", "email"}.issubset(set(result.columns)) + assert "status" in result.columns + + +def test_multiple_columns(): + result = parse("SELECT a, b, c FROM orders") + assert result.table == "orders" + assert set(result.columns) == {"a", "b", "c"} + + +def test_insert_raises_readonly(): + with pytest.raises(ReadOnlyError): + parse("INSERT INTO users (name) VALUES ('alice')") + + +def test_update_raises_readonly(): + with pytest.raises(ReadOnlyError): + parse("UPDATE users SET name = 'bob' WHERE id = 1") + + +def test_delete_raises_readonly(): + with pytest.raises(ReadOnlyError): + parse("DELETE FROM users WHERE id = 1") + + +def test_wildcard_raises_unsupported(): + with pytest.raises(UnsupportedQueryError): + parse("SELECT * FROM users") + + +def test_join_raises_unsupported(): + with pytest.raises(UnsupportedQueryError): + parse("SELECT a.name, b.title FROM users a JOIN orders b ON a.id = b.user_id") diff --git a/tests/test_registry.py b/tests/test_registry.py new file mode 100644 index 0000000..22a0a0a --- /dev/null +++ b/tests/test_registry.py @@ -0,0 +1,45 @@ +import sqlite3 + +import pytest + +from sqlmem.registry import ColumnRegistry + + +@pytest.fixture +def registry(): + conn = sqlite3.connect(":memory:") + return ColumnRegistry(conn) + + +def test_empty_registry(registry): + assert registry.get_columns("users") == set() + + +def test_update_and_get(registry): + registry.update("users", ["name", "email"]) + assert registry.get_columns("users") == {"name", "email"} + + +def test_update_accumulates(registry): + registry.update("users", ["name", "email"]) + registry.update("users", ["email", "status"]) + assert registry.get_columns("users") == {"name", "email", "status"} + + +def test_needs_refetch_missing(registry): + registry.update("users", ["name"]) + missing = registry.needs_refetch("users", ["name", "email"]) + assert missing == ["email"] + + +def test_needs_refetch_none_missing(registry): + registry.update("users", ["name", "email"]) + missing = registry.needs_refetch("users", ["name"]) + assert missing == [] + + +def test_independent_tables(registry): + registry.update("users", ["name"]) + registry.update("orders", ["total"]) + assert registry.get_columns("users") == {"name"} + assert registry.get_columns("orders") == {"total"}