From 757a8f4ebab42cb859222bdb0889b2fd7b46d438 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Doubravsk=C3=BD?= Date: Fri, 5 Jun 2026 18:17:55 +0200 Subject: [PATCH] Add secondary indexes to accelerate cache lookups --- CHANGELOG.md | 10 ++++ README.md | 20 +++++++ project.md | 1 + pyproject.toml | 2 +- src/sqlmem/cache.py | 33 ++++++++++++ src/sqlmem/engine.py | 26 ++++++++- src/sqlmem/executor.py | 9 +++- tests/test_indexes.py | 116 +++++++++++++++++++++++++++++++++++++++++ 8 files changed, 213 insertions(+), 4 deletions(-) create mode 100644 tests/test_indexes.py diff --git a/CHANGELOG.md b/CHANGELOG.md index d1dfb71..eb25d0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,16 @@ All notable changes to this project will be documented in this file. --- +## [1.6.0] - 2026-06-05 + +### Added +- **Secondary indexes** — `CachingEngine(engine, indexes={"VW_X": ["col", ["a", "b"]]})` creates indexes on the in-memory cache to accelerate `WHERE`/`JOIN` lookups. Index columns are auto-loaded so the index exists from the first load, and indexes are recreated after every (re)load and persist in `cache.db`. Combines freely with `delta` and `ttl`. + +### Changed +- `pyproject.toml` — bumped version to `1.6.0` + +--- + ## [1.5.0] - 2026-06-05 ### Added diff --git a/README.md b/README.md index 9408ad3..b625885 100644 --- a/README.md +++ b/README.md @@ -218,6 +218,26 @@ engine = CachingEngine( engine.refresh() # also reloads any expired TTL tables on demand ``` +## Secondary indexes + +To accelerate lookups, you can declare **secondary indexes** per table — they are created on the in-memory SQLite copy so `WHERE`/`JOIN` filters on those columns run as indexed searches instead of full scans: + +```python +engine = CachingEngine( + base_engine, + indexes={ + "VW_P_PRATVALUES": ["PRODUCT_PRODUCTNR"], # single-column index + "VW_ELEMENTS": [["ELEMENT_ID", "ELEMENTVARIANT_ID"], "ELEMENTVARIANT_NAME"], + }, +) +``` + +Each value is a list of index definitions: a string is a single-column index, a nested list is a composite index. + +- **Index columns are pulled into the cache automatically** (like delta key columns), so the index exists from the first load even if your queries don't select those columns. +- Indexes are **recreated after every (re)load** — full loads, TTL reloads, and `invalidate()` + re-fetch all rebuild them — so they're always present, and they persist in `cache.db` across restarts. +- Delta-tracked tables already get a unique index on their key columns; secondary indexes are independent and can be combined with `delta` or `ttl`. + ## Persistence The in-memory cache is persisted to `cache.db` on disk: diff --git a/project.md b/project.md index 1a0b53d..9eca4ce 100644 --- a/project.md +++ b/project.md @@ -193,6 +193,7 @@ SQLMEM_DEBUG=true # DEBUG level — podrobný výpis každého dotazu, cache o - [x] **Třídílné názvy tabulek**: `[catalog].[schema].[table]` se cachuje pod base name, in-memory dotaz prefix stripuje. - [x] **Inkrementální (delta) refresh**: per-tabulku `DeltaConfig(change_column, key_columns)` — sync jen změněných řádků přes datový watermark `max(change_column)` (`>=` + idempotentní upsert podle klíče), catch-up na startu + background thread (`SQLMEM_REFRESH_INTERVAL`, default 300 s). PK se auto-zjistí ze zdrojové DB, pro views nutno zadat ručně. - [x] **`engine.reset()`**: smaže celou cache (RAM + `cache.db`) pro čistý rebuild po strukturální změně. +- [x] **Sekundární indexy**: `indexes={"VW_X": ["col", ["a","b"]]}` — indexy na in-memory cache pro zrychlení `WHERE`/`JOIN`; index-sloupce se auto-dotáhnou, indexy se obnoví po každém (re)loadu. - [x] **TTL na úrovni tabulky**: `ttl={"VW_X": 300}` — pro tabulky bez timestamp sloupce. Garantuje, že cache není starší než interval (full reload při čtení po expiraci + proaktivně na pozadí). ## TODO — budoucí funkce diff --git a/pyproject.toml b/pyproject.toml index c1d5b12..9ef6049 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sqlmem" -version = "1.5.0" +version = "1.6.0" description = "" authors = [ {name = "jan.doubravsky@gmail.com"} diff --git a/src/sqlmem/cache.py b/src/sqlmem/cache.py index c94772a..1fa329b 100644 --- a/src/sqlmem/cache.py +++ b/src/sqlmem/cache.py @@ -2,6 +2,7 @@ import atexit import signal import sqlite3 import threading +from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path @@ -15,6 +16,12 @@ from .stats import TableState SCHEMA_VERSION = 3 +@dataclass(frozen=True) +class _Index: + name: str + columns: tuple[str, ...] + + class CacheManager: def __init__(self, db_path: Path, backup_interval: int) -> None: self._db_path = db_path @@ -23,6 +30,7 @@ class CacheManager: self._lock = threading.Lock() # serializes connection access self._load_lock = threading.Lock() # serializes full table loads self._states: dict[str, str] = {} # table → live processing state + self._index_defs: dict[str, list[_Index]] = {} # table → secondary indexes self._closed = False self._ensure_meta_tables() @@ -190,6 +198,30 @@ class CacheManager: def clear_state(self, table: str) -> None: self._states.pop(table, None) + def add_index(self, table: str, columns: list[str]) -> None: + """Register a secondary index to (re)create on *columns* after each load.""" + name = "sqlmem_idx_" + "_".join([table, *columns]) + defs = self._index_defs.setdefault(table, []) + if all(d.name != name for d in defs): + defs.append(_Index(name=name, columns=tuple(columns))) + + def _create_indexes(self, table: str, available: list[str]) -> None: + """Create the registered secondary indexes whose columns are all cached.""" + available_set = set(available) + for idx in self._index_defs.get(table, []): + if not set(idx.columns) <= available_set: + logger.warning( + f"Skipping index {idx.name!r}: columns {idx.columns} not all cached." + ) + continue + cols = ", ".join(idx.columns) + with self._lock: + self._mem_conn.execute( + f"CREATE INDEX IF NOT EXISTS {idx.name} ON {table} ({cols})" + ) + self._mem_conn.commit() + logger.debug(f"Index {idx.name!r} ready on {table} ({cols})") + def load_table( self, table: str, @@ -243,6 +275,7 @@ class CacheManager: self.set_state(table, TableState.ERROR) raise + self._create_indexes(table, columns) self.mark_table_refreshed(table, total, full) self.set_state(table, TableState.READY) logger.info(f"Table {table!r} cached ({total} rows, columns: {columns})") diff --git a/src/sqlmem/engine.py b/src/sqlmem/engine.py index ef7aead..a5fabbf 100644 --- a/src/sqlmem/engine.py +++ b/src/sqlmem/engine.py @@ -24,6 +24,7 @@ class CachingEngine: source_engine: Engine, delta: dict[str, DeltaConfig] | None = None, ttl: dict[str, int] | None = None, + indexes: dict[str, list[str | list[str]]] | None = None, ) -> None: self._source_engine = source_engine self._cache = CacheManager(CACHE_DB_PATH, BACKUP_INTERVAL_SECONDS) @@ -32,6 +33,7 @@ class CachingEngine: self._refresh_interval = REFRESH_INTERVAL_SECONDS self._delta = self._resolve_delta(delta or {}) self._ttl = dict(ttl or {}) + self._index_columns = self._register_indexes(indexes or {}) self._refresher = DeltaRefresher(self._cache, self._delta) overlap = set(self._delta) & set(self._ttl) @@ -48,6 +50,22 @@ class CachingEngine: logger.info("CachingEngine initialized.") + def _register_indexes( + self, indexes: dict[str, list[str | list[str]]] + ) -> dict[str, list[str]]: + """Register secondary indexes on the cache; return columns to load per table.""" + index_columns: dict[str, list[str]] = {} + for table, specs in indexes.items(): + wanted: list[str] = [] + for spec in specs: + columns = [spec] if isinstance(spec, str) else list(spec) + self._cache.add_index(table, columns) + for col in columns: + if col not in wanted: + wanted.append(col) + index_columns[table] = wanted + return index_columns + def _resolve_delta(self, delta: dict[str, DeltaConfig]) -> dict[str, ResolvedDelta]: """Resolve each DeltaConfig, auto-discovering the primary key when omitted.""" resolved: dict[str, ResolvedDelta] = {} @@ -95,7 +113,13 @@ class CachingEngine: with self._source_engine.connect() as sa_conn: raw_conn = cast(sqlite3.Connection, sa_conn.connection.dbapi_connection) executor = QueryExecutor( - self._cache, self._registry, raw_conn, self._stats, self._delta, self._ttl + self._cache, + self._registry, + raw_conn, + self._stats, + self._delta, + self._ttl, + self._index_columns, ) return executor.execute(parsed) diff --git a/src/sqlmem/executor.py b/src/sqlmem/executor.py index 2c3b46f..94997d5 100644 --- a/src/sqlmem/executor.py +++ b/src/sqlmem/executor.py @@ -18,6 +18,7 @@ class QueryExecutor: stats: StatsCollector, delta: dict[str, ResolvedDelta] | None = None, ttl: dict[str, int] | None = None, + index_columns: dict[str, list[str]] | None = None, ) -> None: self._cache = cache self._registry = registry @@ -25,6 +26,7 @@ class QueryExecutor: self._stats = stats self._delta = delta or {} self._ttl = ttl or {} + self._index_columns = index_columns or {} def _ttl_expired(self, table: str) -> bool: """True if *table* has a TTL and its cached copy is older than that TTL.""" @@ -96,12 +98,15 @@ class QueryExecutor: self._load(table, all_columns, full=full) def _load(self, table: str, columns: list[str], full: bool) -> None: - """Fetch *table* into cache, adding delta key/timestamp columns when tracked.""" + """Fetch *table* into cache, adding delta key/timestamp and index columns.""" cfg = self._delta.get(table) + extra = list(self._index_columns.get(table, [])) if cfg: # The cache must always hold the key (to upsert) and the change column # (to compute the watermark), even if no query referenced them. - columns = list(dict.fromkeys([*columns, *cfg.key_columns, cfg.change_column])) + extra += [*cfg.key_columns, cfg.change_column] + if extra: + columns = list(dict.fromkeys([*columns, *extra])) self._cache.load_table(table, columns, self._source_conn, full=full) self._registry.update(table, columns) diff --git a/tests/test_indexes.py b/tests/test_indexes.py new file mode 100644 index 0000000..b57fede --- /dev/null +++ b/tests/test_indexes.py @@ -0,0 +1,116 @@ +import sqlite3 + +import pytest +from sqlalchemy import create_engine + +import sqlmem.engine as eng_mod +from sqlmem import CachingEngine +from sqlmem.cache import CacheManager + + +def index_names(conn, table=None): + sql = "SELECT name FROM sqlite_master WHERE type = 'index'" + return {r[0] for r in conn.execute(sql).fetchall()} + + +# --- cache level ------------------------------------------------------------ + + +@pytest.fixture +def source_conn(): + conn = sqlite3.connect(":memory:") + conn.execute("CREATE TABLE big (id TEXT, val TEXT)") + conn.executemany( + "INSERT INTO big VALUES (?, ?)", [(str(i), f"v{i}") for i in range(100)] + ) + conn.commit() + yield conn + conn.close() + + +@pytest.fixture +def cache(tmp_path): + c = CacheManager(db_path=tmp_path / "cache.db", backup_interval=9999) + yield c + c.close() + + +def test_index_created_on_load(cache, source_conn): + cache.add_index("big", ["val"]) + cache.load_table("big", ["id", "val"], source_conn) + assert "sqlmem_idx_big_val" in index_names(cache.connection) + + +def test_index_used_by_query_planner(cache, source_conn): + cache.add_index("big", ["val"]) + cache.load_table("big", ["id", "val"], source_conn) + plan = cache.connection.execute( + "EXPLAIN QUERY PLAN SELECT id FROM big WHERE val = 'v50'" + ).fetchall() + assert any("sqlmem_idx_big_val" in str(row) for row in plan) + + +def test_index_skipped_when_columns_not_cached(cache, source_conn): + cache.add_index("big", ["missing_col"]) + cache.load_table("big", ["id", "val"], source_conn) # must not raise + assert "sqlmem_idx_big_missing_col" not in index_names(cache.connection) + + +def test_index_recreated_on_reload(cache, source_conn): + cache.add_index("big", ["val"]) + cache.load_table("big", ["id", "val"], source_conn) + cache.load_table("big", ["id", "val"], source_conn) # reload (staging swap) + assert "sqlmem_idx_big_val" in index_names(cache.connection) + + +# --- engine level ----------------------------------------------------------- + + +@pytest.fixture +def source_engine(tmp_path): + db_path = tmp_path / "source.db" + conn = sqlite3.connect(db_path) + conn.execute("CREATE TABLE products (id TEXT, name TEXT, price TEXT)") + conn.executemany( + "INSERT INTO products VALUES (?, ?, ?)", + [(str(i), f"n{i}", f"{i}.00") for i in range(20)], + ) + conn.commit() + conn.close() + engine = create_engine(f"sqlite:///{db_path}") + yield engine + engine.dispose() + + +@pytest.fixture +def patched_cache(tmp_path, monkeypatch): + monkeypatch.setattr(eng_mod, "CACHE_DB_PATH", tmp_path / "cache.db") + monkeypatch.setattr(eng_mod, "BACKUP_INTERVAL_SECONDS", 9999) + + +def test_index_column_auto_loaded_even_if_not_selected(source_engine, patched_cache): + engine = CachingEngine(source_engine, indexes={"products": ["name"]}) + engine.execute("SELECT id FROM products") # does not select 'name' + cols = { + r[1] + for r in engine._cache.connection.execute("PRAGMA table_info(products)").fetchall() + } + assert "name" in cols # pulled in so the index can be built + assert "sqlmem_idx_products_name" in index_names(engine._cache.connection) + engine.close() + + +def test_composite_index(source_engine, patched_cache): + engine = CachingEngine(source_engine, indexes={"products": [["name", "price"]]}) + engine.execute("SELECT id FROM products") + assert "sqlmem_idx_products_name_price" in index_names(engine._cache.connection) + engine.close() + + +def test_index_survives_invalidate_and_reload(source_engine, patched_cache): + engine = CachingEngine(source_engine, indexes={"products": ["name"]}) + engine.execute("SELECT id, name FROM products") + engine.invalidate("products") + engine.execute("SELECT id, name FROM products") + assert "sqlmem_idx_products_name" in index_names(engine._cache.connection) + engine.close()