Add secondary indexes to accelerate cache lookups
This commit is contained in:
@@ -2,6 +2,7 @@ import atexit
|
||||
import signal
|
||||
import sqlite3
|
||||
import threading
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
@@ -15,6 +16,12 @@ from .stats import TableState
|
||||
SCHEMA_VERSION = 3
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class _Index:
|
||||
name: str
|
||||
columns: tuple[str, ...]
|
||||
|
||||
|
||||
class CacheManager:
|
||||
def __init__(self, db_path: Path, backup_interval: int) -> None:
|
||||
self._db_path = db_path
|
||||
@@ -23,6 +30,7 @@ class CacheManager:
|
||||
self._lock = threading.Lock() # serializes connection access
|
||||
self._load_lock = threading.Lock() # serializes full table loads
|
||||
self._states: dict[str, str] = {} # table → live processing state
|
||||
self._index_defs: dict[str, list[_Index]] = {} # table → secondary indexes
|
||||
self._closed = False
|
||||
|
||||
self._ensure_meta_tables()
|
||||
@@ -190,6 +198,30 @@ class CacheManager:
|
||||
def clear_state(self, table: str) -> None:
|
||||
self._states.pop(table, None)
|
||||
|
||||
def add_index(self, table: str, columns: list[str]) -> None:
|
||||
"""Register a secondary index to (re)create on *columns* after each load."""
|
||||
name = "sqlmem_idx_" + "_".join([table, *columns])
|
||||
defs = self._index_defs.setdefault(table, [])
|
||||
if all(d.name != name for d in defs):
|
||||
defs.append(_Index(name=name, columns=tuple(columns)))
|
||||
|
||||
def _create_indexes(self, table: str, available: list[str]) -> None:
|
||||
"""Create the registered secondary indexes whose columns are all cached."""
|
||||
available_set = set(available)
|
||||
for idx in self._index_defs.get(table, []):
|
||||
if not set(idx.columns) <= available_set:
|
||||
logger.warning(
|
||||
f"Skipping index {idx.name!r}: columns {idx.columns} not all cached."
|
||||
)
|
||||
continue
|
||||
cols = ", ".join(idx.columns)
|
||||
with self._lock:
|
||||
self._mem_conn.execute(
|
||||
f"CREATE INDEX IF NOT EXISTS {idx.name} ON {table} ({cols})"
|
||||
)
|
||||
self._mem_conn.commit()
|
||||
logger.debug(f"Index {idx.name!r} ready on {table} ({cols})")
|
||||
|
||||
def load_table(
|
||||
self,
|
||||
table: str,
|
||||
@@ -243,6 +275,7 @@ class CacheManager:
|
||||
self.set_state(table, TableState.ERROR)
|
||||
raise
|
||||
|
||||
self._create_indexes(table, columns)
|
||||
self.mark_table_refreshed(table, total, full)
|
||||
self.set_state(table, TableState.READY)
|
||||
logger.info(f"Table {table!r} cached ({total} rows, columns: {columns})")
|
||||
|
||||
+25
-1
@@ -24,6 +24,7 @@ class CachingEngine:
|
||||
source_engine: Engine,
|
||||
delta: dict[str, DeltaConfig] | None = None,
|
||||
ttl: dict[str, int] | None = None,
|
||||
indexes: dict[str, list[str | list[str]]] | None = None,
|
||||
) -> None:
|
||||
self._source_engine = source_engine
|
||||
self._cache = CacheManager(CACHE_DB_PATH, BACKUP_INTERVAL_SECONDS)
|
||||
@@ -32,6 +33,7 @@ class CachingEngine:
|
||||
self._refresh_interval = REFRESH_INTERVAL_SECONDS
|
||||
self._delta = self._resolve_delta(delta or {})
|
||||
self._ttl = dict(ttl or {})
|
||||
self._index_columns = self._register_indexes(indexes or {})
|
||||
self._refresher = DeltaRefresher(self._cache, self._delta)
|
||||
|
||||
overlap = set(self._delta) & set(self._ttl)
|
||||
@@ -48,6 +50,22 @@ class CachingEngine:
|
||||
|
||||
logger.info("CachingEngine initialized.")
|
||||
|
||||
def _register_indexes(
|
||||
self, indexes: dict[str, list[str | list[str]]]
|
||||
) -> dict[str, list[str]]:
|
||||
"""Register secondary indexes on the cache; return columns to load per table."""
|
||||
index_columns: dict[str, list[str]] = {}
|
||||
for table, specs in indexes.items():
|
||||
wanted: list[str] = []
|
||||
for spec in specs:
|
||||
columns = [spec] if isinstance(spec, str) else list(spec)
|
||||
self._cache.add_index(table, columns)
|
||||
for col in columns:
|
||||
if col not in wanted:
|
||||
wanted.append(col)
|
||||
index_columns[table] = wanted
|
||||
return index_columns
|
||||
|
||||
def _resolve_delta(self, delta: dict[str, DeltaConfig]) -> dict[str, ResolvedDelta]:
|
||||
"""Resolve each DeltaConfig, auto-discovering the primary key when omitted."""
|
||||
resolved: dict[str, ResolvedDelta] = {}
|
||||
@@ -95,7 +113,13 @@ class CachingEngine:
|
||||
with self._source_engine.connect() as sa_conn:
|
||||
raw_conn = cast(sqlite3.Connection, sa_conn.connection.dbapi_connection)
|
||||
executor = QueryExecutor(
|
||||
self._cache, self._registry, raw_conn, self._stats, self._delta, self._ttl
|
||||
self._cache,
|
||||
self._registry,
|
||||
raw_conn,
|
||||
self._stats,
|
||||
self._delta,
|
||||
self._ttl,
|
||||
self._index_columns,
|
||||
)
|
||||
return executor.execute(parsed)
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ class QueryExecutor:
|
||||
stats: StatsCollector,
|
||||
delta: dict[str, ResolvedDelta] | None = None,
|
||||
ttl: dict[str, int] | None = None,
|
||||
index_columns: dict[str, list[str]] | None = None,
|
||||
) -> None:
|
||||
self._cache = cache
|
||||
self._registry = registry
|
||||
@@ -25,6 +26,7 @@ class QueryExecutor:
|
||||
self._stats = stats
|
||||
self._delta = delta or {}
|
||||
self._ttl = ttl or {}
|
||||
self._index_columns = index_columns or {}
|
||||
|
||||
def _ttl_expired(self, table: str) -> bool:
|
||||
"""True if *table* has a TTL and its cached copy is older than that TTL."""
|
||||
@@ -96,12 +98,15 @@ class QueryExecutor:
|
||||
self._load(table, all_columns, full=full)
|
||||
|
||||
def _load(self, table: str, columns: list[str], full: bool) -> None:
|
||||
"""Fetch *table* into cache, adding delta key/timestamp columns when tracked."""
|
||||
"""Fetch *table* into cache, adding delta key/timestamp and index columns."""
|
||||
cfg = self._delta.get(table)
|
||||
extra = list(self._index_columns.get(table, []))
|
||||
if cfg:
|
||||
# The cache must always hold the key (to upsert) and the change column
|
||||
# (to compute the watermark), even if no query referenced them.
|
||||
columns = list(dict.fromkeys([*columns, *cfg.key_columns, cfg.change_column]))
|
||||
extra += [*cfg.key_columns, cfg.change_column]
|
||||
if extra:
|
||||
columns = list(dict.fromkeys([*columns, *extra]))
|
||||
|
||||
self._cache.load_table(table, columns, self._source_conn, full=full)
|
||||
self._registry.update(table, columns)
|
||||
|
||||
Reference in New Issue
Block a user