Run library, checks and downloads in background threads with parallel fetching
This commit is contained in:
@@ -0,0 +1,68 @@
|
||||
"""Tests for background worker threading helpers."""
|
||||
|
||||
import os
|
||||
import time
|
||||
|
||||
os.environ.setdefault("QT_QPA_PLATFORM", "offscreen")
|
||||
|
||||
from PySide6.QtCore import QObject # noqa: E402
|
||||
from PySide6.QtWidgets import QApplication # noqa: E402
|
||||
|
||||
from src.ui.workers import FetchWorker, run_on_thread # noqa: E402
|
||||
|
||||
|
||||
def _app() -> QApplication:
|
||||
return QApplication.instance() or QApplication([])
|
||||
|
||||
|
||||
def _spin_until(app: QApplication, predicate, timeout: float = 5.0) -> None:
|
||||
deadline = time.time() + timeout
|
||||
while not predicate() and time.time() < deadline:
|
||||
app.processEvents()
|
||||
time.sleep(0.01)
|
||||
|
||||
|
||||
def test_worker_runs_when_only_local_reference() -> None:
|
||||
"""Regression: a worker created in a local scope must still run.
|
||||
|
||||
run_on_thread must keep the worker alive; otherwise it is garbage-collected
|
||||
and its `run` slot is never invoked (the 'refresh does nothing' bug).
|
||||
"""
|
||||
app = _app()
|
||||
owner = QObject()
|
||||
captured: dict[str, object] = {}
|
||||
|
||||
def start() -> None:
|
||||
worker = FetchWorker(lambda progress: 42)
|
||||
worker.result.connect(lambda r: captured.update(result=r))
|
||||
run_on_thread(owner, worker)
|
||||
# `worker` goes out of scope here
|
||||
|
||||
start()
|
||||
_spin_until(app, lambda: "result" in captured)
|
||||
assert captured.get("result") == 42
|
||||
|
||||
|
||||
def test_progress_and_cleanup() -> None:
|
||||
app = _app()
|
||||
owner = QObject()
|
||||
progress: list[tuple[int, int]] = []
|
||||
captured: dict[str, object] = {}
|
||||
|
||||
def fn(report) -> str:
|
||||
report(1, 2)
|
||||
report(2, 2)
|
||||
return "ok"
|
||||
|
||||
worker = FetchWorker(fn)
|
||||
worker.progress.connect(lambda d, t: progress.append((d, t)))
|
||||
worker.result.connect(lambda r: captured.update(result=r))
|
||||
run_on_thread(owner, worker)
|
||||
|
||||
_spin_until(app, lambda: "result" in captured)
|
||||
# let the thread.finished cleanup run
|
||||
_spin_until(app, lambda: not owner._active_threads)
|
||||
|
||||
assert captured.get("result") == "ok"
|
||||
assert (2, 2) in progress
|
||||
assert owner._active_threads == []
|
||||
Reference in New Issue
Block a user