Auto-fill ČSFD links on import, rename in pool, multi-country tags, Filmotéka layout

This commit is contained in:
2026-06-15 17:31:52 +02:00
parent 86c689b9f1
commit b3a61f9e86
18 changed files with 1407 additions and 168 deletions
+87
View File
@@ -0,0 +1,87 @@
#!/usr/bin/env python3
"""Minimal PySide6 GUI for filtering magnet lists from ``rargb_magnets.py``.
Just a text box on top and a list below — type to filter live (same syntax as
the CLI: space-separated AND terms, ``-term`` to exclude). Double-click or press
Enter on a row to copy its magnet link to the clipboard.
python tools/filter_magnets_gui.py [files/glob/dir ...]
With no arguments it loads ``magnets_*.txt`` from the current directory. The
loading/filtering logic is reused from ``filter_magnets.py`` in this folder.
"""
from __future__ import annotations
import sys
from pathlib import Path
# Reuse the CLI tool's parsing/filtering (same folder).
sys.path.insert(0, str(Path(__file__).resolve().parent))
from filter_magnets import Entry, load_entries, apply_filter, resolve_inputs # noqa: E402
from PySide6.QtCore import Qt # noqa: E402
from PySide6.QtWidgets import ( # noqa: E402
QApplication, QWidget, QVBoxLayout, QLineEdit, QListWidget, QListWidgetItem,
)
class MagnetFilter(QWidget):
def __init__(self, entries: list[Entry]) -> None:
super().__init__()
self.entries = entries
layout = QVBoxLayout(self)
layout.setContentsMargins(6, 6, 6, 6)
self.search = QLineEdit()
self.search.setPlaceholderText("filtr… (např. 1080p 2022 -hindi) — ↵/dvojklik = kopírovat magnet")
self.search.setClearButtonEnabled(True)
self.search.textChanged.connect(self._refilter)
layout.addWidget(self.search)
self.list = QListWidget()
self.list.itemActivated.connect(self._copy) # Enter / double-click
layout.addWidget(self.list)
self.resize(820, 600)
self._refilter("")
self.search.setFocus()
def _refilter(self, text: str) -> None:
self.list.clear()
for entry in apply_filter(self.entries, text):
short = entry.magnet.split("&", 1)[0] # only the part before the first &
item = QListWidgetItem(f"{entry.name}\n{short}")
item.setData(Qt.UserRole, short)
item.setToolTip(short)
self.list.addItem(item)
self._update_title()
def _copy(self, item: QListWidgetItem) -> None:
QApplication.clipboard().setText(item.data(Qt.UserRole))
self._update_title(copied=item.text())
def _update_title(self, copied: str | None = None) -> None:
base = f"Magnet filtr — {self.list.count()} / {len(self.entries)}"
self.setWindowTitle(f"{base} ✓ zkopírováno" if copied else base)
def main() -> None:
paths = [p for p in resolve_inputs(sys.argv[1:]) if p.exists()]
if not paths:
print("Žádné vstupní soubory (magnets_*.txt) nenalezeny.", file=sys.stderr)
sys.exit(1)
entries = load_entries(paths)
if not entries:
print("Vstupní soubory neobsahují žádné magnet odkazy.", file=sys.stderr)
sys.exit(1)
app = QApplication(sys.argv)
window = MagnetFilter(entries)
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()
+196
View File
@@ -0,0 +1,196 @@
#!/usr/bin/env python3
"""Standalone scraper: collect magnet links from a rargb.to search.
Given a search query it walks every results page
(``https://rargb.to/search/?search=<query>`` and ``/search/<N>/?search=<query>``),
opens each torrent's detail page and saves its magnet link.
This is a self-contained tool — it only needs ``requests`` and
``beautifulsoup4`` and does not import anything from the Curator project.
Examples:
python scripts/rargb_magnets.py "ubuntu 24.04"
python scripts/rargb_magnets.py test --output test_magnets.txt --max-pages 3
python scripts/rargb_magnets.py test --tsv # also write name<TAB>magnet
Be considerate: a polite delay is inserted between requests by default. Use the
results responsibly and respect the target site's terms and your local law.
"""
from __future__ import annotations
import re
import sys
import time
import argparse
from pathlib import Path
from urllib.parse import quote, urljoin
import requests
from bs4 import BeautifulSoup
BASE_URL = "https://rargb.to"
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
),
"Accept-Language": "en-US,en;q=0.9",
}
MAGNET_RE = re.compile(r"magnet:\?[^\"'\s<>]+")
def search_page_url(query: str, page: int) -> str:
"""URL of the N-th results page for a query (page 1 has no number)."""
q = quote(query)
if page <= 1:
return f"{BASE_URL}/search/?search={q}"
return f"{BASE_URL}/search/{page}/?search={q}"
def fetch(session: requests.Session, url: str, timeout: float, retries: int) -> str | None:
"""GET ``url`` and return the HTML, or None after exhausting retries."""
for attempt in range(1, retries + 1):
try:
resp = session.get(url, headers=HEADERS, timeout=timeout)
resp.raise_for_status()
return resp.text
except requests.RequestException as exc:
wait = attempt * 2
print(f" ! chyba ({attempt}/{retries}) u {url}: {exc} — čekám {wait}s",
file=sys.stderr)
time.sleep(wait)
return None
def parse_result_links(html: str) -> list[tuple[str, str]]:
"""Return (name, detail_url) for each result row on a search page."""
soup = BeautifulSoup(html, "html.parser")
results: list[tuple[str, str]] = []
seen: set[str] = set()
for row in soup.select("tr.lista2"):
link = row.find("a", href=re.compile(r"^/torrent/"))
if not link:
continue
href = link.get("href")
if not href or href in seen:
continue
seen.add(href)
name = link.get("title") or link.get_text(strip=True) or href
results.append((name.strip(), urljoin(BASE_URL, href)))
return results
def parse_last_page(html: str) -> int:
"""Best-effort highest page number from the pager (1 if none found)."""
pages = [int(n) for n in re.findall(r"/search/(\d+)/\?search=", html)]
return max(pages) if pages else 1
def extract_magnet(html: str) -> str | None:
"""First magnet link found on a torrent detail page, or None."""
match = MAGNET_RE.search(html)
return match.group(0) if match else None
def scrape(query: str, max_pages: int | None, delay: float,
timeout: float, retries: int) -> list[tuple[str, str]]:
"""Walk all result pages and return a de-duplicated [(name, magnet)] list."""
session = requests.Session()
collected: list[tuple[str, str]] = []
seen_magnets: set[str] = set()
seen_details: set[str] = set()
first_html = fetch(session, search_page_url(query, 1), timeout, retries)
if first_html is None:
print("Nepodařilo se načíst první stránku výsledků.", file=sys.stderr)
return collected
last_page = parse_last_page(first_html)
if max_pages is not None:
last_page = min(last_page, max_pages)
print(f"Dotaz: {query!r} — stránek k projití: ~{last_page}")
page = 1
while True:
html = first_html if page == 1 else fetch(
session, search_page_url(query, page), timeout, retries)
if html is None:
break
rows = parse_result_links(html)
new_rows = [(n, u) for n, u in rows if u not in seen_details]
if not new_rows:
# No fresh results → past the last real page; stop.
break
print(f"[strana {page}] nalezeno položek: {len(new_rows)}")
for name, detail_url in new_rows:
seen_details.add(detail_url)
time.sleep(delay)
detail_html = fetch(session, detail_url, timeout, retries)
if detail_html is None:
print(f" - {name}: detail se nenačetl", file=sys.stderr)
continue
magnet = extract_magnet(detail_html)
if not magnet:
print(f" - {name}: magnet nenalezen", file=sys.stderr)
continue
if magnet in seen_magnets:
continue
seen_magnets.add(magnet)
collected.append((name, magnet))
print(f" + {name}")
if max_pages is not None and page >= max_pages:
break
page += 1
if page > last_page:
# Probe one page past the detected last page in case the pager was
# windowed; the empty-results check above will stop us if it's truly
# the end.
last_page = page
time.sleep(delay)
return collected
def main() -> None:
parser = argparse.ArgumentParser(
description="Vyparsuje magnet odkazy z vyhledávání na rargb.to.")
parser.add_argument("query", help="Vyhledávací dotaz (např. \"ubuntu 24.04\")")
parser.add_argument("-o", "--output", type=Path,
help="Výstupní soubor (výchozí: magnets_<dotaz>.txt)")
parser.add_argument("--max-pages", type=int, default=None,
help="Maximální počet stránek (výchozí: všechny)")
parser.add_argument("--delay", type=float, default=1.0,
help="Prodleva mezi requesty v sekundách (výchozí: 1.0)")
parser.add_argument("--timeout", type=float, default=20.0,
help="Timeout requestu v sekundách (výchozí: 20)")
parser.add_argument("--retries", type=int, default=3,
help="Počet pokusů při chybě (výchozí: 3)")
parser.add_argument("--tsv", action="store_true",
help="Uložit i <název>\\t<magnet> vedle čistých magnetů")
args = parser.parse_args()
output = args.output or Path(
f"magnets_{re.sub(r'[^A-Za-z0-9._-]+', '_', args.query).strip('_')}.txt")
results = scrape(args.query, args.max_pages, args.delay, args.timeout, args.retries)
if not results:
print("Nenalezeny žádné magnet odkazy.")
sys.exit(1)
output.write_text("".join(f"{magnet}\n" for _, magnet in results), encoding="utf-8")
print(f"\nUloženo {len(results)} magnet odkazů do: {output}")
if args.tsv:
tsv_path = output.with_suffix(".tsv")
tsv_path.write_text(
"".join(f"{name}\t{magnet}\n" for name, magnet in results), encoding="utf-8")
print(f"Uloženo také název+magnet do: {tsv_path}")
if __name__ == "__main__":
main()
+120
View File
@@ -0,0 +1,120 @@
"""One-off migration: split combined country tags in a pool's metadata index.
Before multi-country support, a co-production fetched from ČSFD was stored as a
single ``"Země původu/USA / Velká Británie"`` tag. This rewrites each such tag
into one tag per country (``"Země původu/USA"`` + ``"Země původu/Velká
Británie"``), de-duplicating within each record. A timestamped backup of the
index is written before saving.
Usage:
poetry run python scripts/split_country_tags.py [<pool_dir>] [--category "Země původu"]
If ``<pool_dir>`` is omitted, the pool from the global config is used.
"""
from __future__ import annotations
import sys
import json
import shutil
import argparse
from pathlib import Path
from datetime import datetime
from loguru import logger
# Allow running as a plain script (``python scripts/...``) by exposing the repo root.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from src.core.config import load_global_config # noqa: E402
from src.core.pool_index import INDEX_FILENAME # noqa: E402
def _split_record_tags(tags: list[str], category: str) -> tuple[list[str], int]:
"""Return (rewritten tags, number of combined tags split) for one record.
Order is preserved; duplicates produced by the split are dropped.
"""
prefix = f"{category}/"
result: list[str] = []
seen: set[str] = set()
split_count = 0
def _add(tag: str) -> None:
if tag not in seen:
seen.add(tag)
result.append(tag)
for tag in tags:
if isinstance(tag, str) and tag.startswith(prefix) and "/" in tag[len(prefix):]:
value = tag[len(prefix):]
countries = [c.strip() for c in value.split("/") if c.strip()]
for country in countries:
_add(f"{prefix}{country}")
split_count += 1
else:
_add(tag)
return result, split_count
def migrate(index_path: Path, category: str) -> int:
"""Split combined ``category`` tags in place; return number of tags split."""
with open(index_path, "r", encoding="utf-8") as f:
data = json.load(f)
movies: dict[str, dict] = data.get("movies", {})
total_split = 0
affected = 0
for key, record in movies.items():
tags = record.get("tags", [])
new_tags, split_count = _split_record_tags(tags, category)
if split_count:
record["tags"] = new_tags
total_split += split_count
affected += 1
logger.debug(f"{key}: {split_count} combined tag(s) split")
if total_split == 0:
logger.info(f"No combined '{category}/…' tags found — nothing to migrate")
return 0
backup = index_path.with_suffix(
index_path.suffix + f".bak-{datetime.now():%Y%m%d-%H%M%S}"
)
shutil.copy2(index_path, backup)
logger.info(f"Backup written: {backup}")
with open(index_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logger.info(
f"Split {total_split} combined '{category}' tag(s) across {affected} record(s)"
)
return total_split
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"pool_dir",
nargs="?",
help="Pool root (default: pool_dir from the global config)",
)
parser.add_argument(
"--category", default="Země původu", help="Tag category to split"
)
args = parser.parse_args()
pool_dir = args.pool_dir or load_global_config().get("pool_dir")
if not pool_dir:
parser.error("No pool_dir given and none configured in the global config")
index_path = Path(pool_dir) / INDEX_FILENAME
if not index_path.exists():
parser.error(f"No index found at {index_path}")
migrate(index_path, args.category)
if __name__ == "__main__":
main()
+101
View File
@@ -0,0 +1,101 @@
"""One-off migration: drop all tags of given categories from a pool's index.
Used to remove tag categories that turned out to be a bad idea (e.g. Režie /
Herec produced far too many folders). Cached ČSFD data is left intact — only the
``tags`` lists are pruned. A timestamped backup of the index is written first.
Usage:
poetry run python scripts/strip_tag_categories.py [<pool_dir>] \
--categories "Režie" "Herec"
"""
from __future__ import annotations
import sys
import json
import shutil
import argparse
from pathlib import Path
from datetime import datetime
from loguru import logger
# Allow running as a plain script (``python scripts/...``) by exposing the repo root.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from src.core.config import load_global_config # noqa: E402
from src.core.pool_index import INDEX_FILENAME # noqa: E402
def _strip(tags: list[str], prefixes: tuple[str, ...]) -> tuple[list[str], int]:
"""Return (kept tags, number removed) dropping tags under any prefix."""
kept = [t for t in tags if not (isinstance(t, str) and t.startswith(prefixes))]
return kept, len(tags) - len(kept)
def migrate(index_path: Path, categories: list[str]) -> int:
"""Remove all tags of ``categories`` in place; return number of tags removed."""
prefixes = tuple(f"{c}/" for c in categories)
with open(index_path, "r", encoding="utf-8") as f:
data = json.load(f)
movies: dict[str, dict] = data.get("movies", {})
total_removed = 0
affected = 0
for key, record in movies.items():
tags = record.get("tags", [])
kept, removed = _strip(tags, prefixes)
if removed:
record["tags"] = kept
total_removed += removed
affected += 1
logger.debug(f"{key}: removed {removed} tag(s)")
if total_removed == 0:
logger.info(f"No tags in {categories} found — nothing to migrate")
return 0
backup = index_path.with_suffix(
index_path.suffix + f".bak-{datetime.now():%Y%m%d-%H%M%S}"
)
shutil.copy2(index_path, backup)
logger.info(f"Backup written: {backup}")
with open(index_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logger.info(
f"Removed {total_removed} tag(s) of {categories} across {affected} record(s)"
)
return total_removed
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"pool_dir",
nargs="?",
help="Pool root (default: pool_dir from the global config)",
)
parser.add_argument(
"--categories",
nargs="+",
default=["Režie", "Herec"],
help="Tag categories to strip",
)
args = parser.parse_args()
pool_dir = args.pool_dir or load_global_config().get("pool_dir")
if not pool_dir:
parser.error("No pool_dir given and none configured in the global config")
index_path = Path(pool_dir) / INDEX_FILENAME
if not index_path.exists():
parser.error(f"No index found at {index_path}")
migrate(index_path, args.categories)
if __name__ == "__main__":
main()