From b3a61f9e869bda6f434bf9573edcfdf34dba91a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Doubravsk=C3=BD?= Date: Mon, 15 Jun 2026 17:31:52 +0200 Subject: [PATCH] =?UTF-8?q?Auto-fill=20=C4=8CSFD=20links=20on=20import,=20?= =?UTF-8?q?rename=20in=20pool,=20multi-country=20tags,=20Filmot=C3=A9ka=20?= =?UTF-8?q?layout?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + CHANGELOG.md | 51 +++++- PROJECT.md | 39 +++-- pyproject.toml | 2 +- scripts/filter_magnets_gui.py | 87 ++++++++++ scripts/rargb_magnets.py | 196 ++++++++++++++++++++++ scripts/split_country_tags.py | 120 ++++++++++++++ scripts/strip_tag_categories.py | 101 +++++++++++ src/_version.py | 2 +- src/core/csfd.py | 130 +++++++++++++-- src/core/file.py | 40 ++++- src/core/file_manager.py | 48 +++++- src/core/hardlink_manager.py | 179 ++++++++++++++------ src/ui/qt_app.py | 285 ++++++++++++++++++++++++-------- tests/test_csfd.py | 124 +++++++++++++- tests/test_file.py | 69 ++++++++ tests/test_file_manager.py | 67 ++++++++ tests/test_hardlink_manager.py | 34 ++++ 18 files changed, 1407 insertions(+), 168 deletions(-) create mode 100644 scripts/filter_magnets_gui.py create mode 100644 scripts/rargb_magnets.py create mode 100644 scripts/split_country_tags.py create mode 100644 scripts/strip_tag_categories.py diff --git a/.gitignore b/.gitignore index 7018732..773eefe 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,4 @@ AGENTS.md CLAUDE.md DESIGN_DOCUMENT.md .claude/ + diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a6556b..0b7291a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,8 +25,17 @@ Each version entry uses these sections (include only those that apply): - Fork of the former **Tagger** project as **Curator**, a movie-library manager. - **Pool** concept (single source of truth) with `Filmy` / `Seriály` folders and a configurable **Filmotéka** output folder, stored in the global config. -- "Import movie" flow: pick a video, enter Title + ČSFD link, the file is copied - into `pool/Filmy` as `Title.ext` (non-destructive) and indexed. +- **Multi-file "Import movies" flow**: pick several videos at once and give each + its own Title + ČSFD link (one row per file, more addable in the dialog); a + copy/move toggle chooses whether sources are copied (default, non-destructive) + or moved into `pool/Filmy` as `Title.ext`. Imported movies are indexed and, if + a ČSFD link is set, enriched with tags right away. +- **Auto-find ČSFD links** in the import dialog ("🔎 Najít ČSFD odkazy"): for + every row without a link it cleans the filename into a query + (`clean_filename_to_query` strips resolution/codec/source/group, keeps the + year) and fills in the first ČSFD search hit (`find_csfd_url` → + `search_movies`, reusing one Anubis session). Existing links are never + overwritten; results are a suggestion the user can review before importing. - `File` now stores `title` and `csfd_link`. - New **PySide6** GUI reframed around the Filmotéka workflow (pool setup, import, tag filter sidebar, movie table, one-click Filmotéka generation), replacing the @@ -44,9 +53,17 @@ Each version entry uses these sections (include only those that apply): - Project `README.md` (overview, concepts, workflow, run/build instructions). - **ČSFD scraping** (`csfd.py`, ported from the Tagger devel branch): fetches movie data from a ČSFD link (JSON-LD + HTML parsing). `File.apply_csfd_tags` - assigns Žánr / Rok / Země původu tags and caches the fetched data in the metadata. - The GUI auto-fetches on import when a link is given and offers "Načíst tagy - z ČSFD" for selected movies. + assigns **Žánr / Rok / Země původu / Hodnocení** tags and caches the fetched + data (incl. directors and the first 10 actors) in the metadata. The rating is + bucketed into ten-point bands (`rating_band`, e.g. `80–89 %`, `90–100 %`). + **Directors and actors are collected but intentionally not turned into tags or + Filmotéka folders** — there would be far too many. The GUI auto-fetches on + import when a link is given and offers "Načíst tagy z ČSFD" for selected movies. +- **Rename a pooled movie** from the app ("Přejmenovat…" in the Movie menu / + context menu, F2): `FileManager.rename_movie` renames the physical file in + pool/Filmy to `.` (extension preserved), moves its metadata to + the new index key, and syncs `title`/`filename`. Refuses empty names, names + with path separators, and collisions with an existing pooled file. - App startup injects `truststore` so HTTPS uses the OS certificate store — ČSFD fetching works behind corporate SSL inspection (where certifi's bundle lacks the proxy root CA). @@ -64,14 +81,32 @@ Each version entry uses these sections (include only those that apply): fetch pays the PoW cost). Žánr / Rok / Země původu tags load again. - "Assign tags" dialog crashed on PySide6/Qt6 — `Qt.ItemIsTristate` was renamed to `Qt.ItemIsAutoTristate`. +- Sidebar tag-filter checkboxes never appeared checked: every toggle triggered a + table refresh that rebuilt the tree from scratch (all unchecked), wiping the + click. The active filter is now kept in a separate model (`_active_filter`) and + restored on rebuild. The count after each tag is also now filter-aware — it + shows how many of the currently filtered movies carry that tag (i.e. how many + would remain if it were checked), instead of always the pool-wide total. The + refresh is deferred via `QTimer.singleShot` so the tree is not rebuilt inside + its own `itemChanged` signal (which deleted the item Qt was still processing + and crashed the app with SIGSEGV on a real click). ### Changed - ČSFD country tag category renamed **Země → Země původu**. Added `scripts/migrate_tag_category.py` to rewrite the category in an existing pool index (backs up `.Curator.!index` first); run against the live pool. -- Filmotéka tree now also builds the **Země původu** branch — it was missing - from `FILMOTEKA_CATEGORIES`, so the country level was never generated. Tree - categories are now Rok / Žánr / Země původu / Hodnocení. +- Filmotéka tree **relaid out**: genre folders now sit **directly at the output + root** (next to the copy-as-is Seriály mirror), with year tags grouped under a + **`Dle roku`** folder and country tags under **`Dle země původu`**. + `HardlinkManager` gained a category → root-folder map (`category_roots`, + empty root = tag folders at the output root) and now restricts obsolete-link + cleanup to the tag-tree's own top-level folders, so copy-as-is mirrors are + never touched. The tree also groups the ČSFD rating under `Dle hodnocení`. +- ČSFD origin is now parsed as **multiple countries**: a co-production like + "USA / Velká Británie" becomes a separate **Země původu** tag per country + (so the film is filed under each), instead of one combined tag. `CSFDMovie` + gained `countries: list[str]` (replacing the single `country`); the csfd cache + schema bumped to v2 (legacy single-country caches are split on read). - Movie table trimmed to **Název / Štítky / Velikost** — the Datum and ČSFD columns were dropped (a ČSFD link is a prerequisite, so its indicator was always the same). diff --git a/PROJECT.md b/PROJECT.md index c19adda..0ebc9b7 100644 --- a/PROJECT.md +++ b/PROJECT.md @@ -65,11 +65,21 @@ movie table, and one-click Filmotéka generation. - **Metadata storage:** one **unified metadata file** for the whole pool (a central index), not per-file sidecars. Justified because Curator owns the pool and files are never moved manually, so it is not exposed to path drift. -- **Import dialog:** collects only **Title** + **ČSFD link**. The file is renamed - to `Title.ext`. When a ČSFD link is given, Curator fetches the movie and assigns - Žánr / Rok / Země původu tags automatically; further tags can be added via the UI. -- **Genres:** a movie can have **multiple genres**, so it appears under each of - its genre branches in the Filmotéka (multiple hardlinks). +- **Import dialog:** **multi-file** — pick several videos at once and give each + its own **Title** + **ČSFD link** (one row per file, more can be added from the + dialog), or auto-filled with **"Najít ČSFD odkazy"** (cleans each filename into + a query and fills the first ČSFD search hit; existing links are kept). A single + **copy/move** toggle decides whether the sources are copied (default) or moved + into the pool. Each file is renamed to `Title.ext`. When a + ČSFD link is given, Curator fetches the movie and assigns Žánr / Rok / Země + původu / Hodnocení (ten-point band) tags automatically; further tags can be + added via the UI. Directors and the first 10 actors are fetched and cached too, + but **deliberately not turned into tags/folders** (there would be too many). +- **Genres / countries:** a movie can have **multiple genres** and, for a + co-production, **multiple countries of origin** (ČSFD writes them + slash-separated, e.g. "USA / Velká Británie"). Each becomes its own tag, so the + film appears under every matching genre and country branch in the Filmotéka + (multiple hardlinks). - **Pool layout:** two top-level folders — **Filmy** and **Seriály**. Movies are the first target; the Seriály branch follows the "copy-as-is" rule below. - **Copy-as-is folders (Seriály):** a subfolder inside the pool can be marked as @@ -80,11 +90,18 @@ movie table, and one-click Filmotéka generation. hardlinked files). This is how Seriály work. - **File naming:** imported movies are renamed to **`Title.ext`** (no year in the filename; year lives in metadata/tags). -- **Import is non-destructive:** the original file is **copied** into the pool, - the source is left in place. -- **Filmotéka tree:** **one level per category** — `output/Category/Tag/film` - (hardlink), same shape as the current hardlink manager. For now the tree is - built from these categories: **Rok**, **Žánr**, **Země původu**, **Hodnocení**. +- **Import copy vs move:** by default the original file is **copied** into the + pool (non-destructive); the import dialog also offers a **move** option that + relocates the source into the pool instead. +- **Filmotéka tree layout:** driven by a category → root-folder map + (`FILMOTEKA_CATEGORY_ROOTS`). At the output root sit the **genre folders + directly** (`output/Akční/film`, …), next to the copy-as-is mirrors + (**Seriály**), plus two grouping folders: **`Dle roku`** (`output/Dle + roku//film`) and **`Dle země původu`** (`output/Dle země + původu//film`), plus `Dle hodnocení`. Each is a hardlink. + `HardlinkManager` supports an empty root (tag folders placed directly at the + output root) and restricts obsolete cleanup to the tag-tree's own top-level + folders so mirrors are never touched. ## Tasks @@ -95,6 +112,8 @@ movie table, and one-click Filmotéka generation. - Pool-root and Filmotéka-output folder settings in the global config - Filmy / Seriály top-level folder handling in the pool - "Import movie" dialog (Title + ČSFD link), copy into pool/Filmy as Title.ext +- Rename a pooled movie from the app (`FileManager.rename_movie`): renames the + file in pool/Filmy and moves its metadata to the new index key - Remove-from-pool (delete file + its metadata) - Generate the Filmotéka hardlink tree from the pool (Rok / Žánr / Země původu / Hodnocení) diff --git a/pyproject.toml b/pyproject.toml index 4ca16aa..e6423f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "curator" -version = "0.1.0" +version = "1.0.0" description = "" authors = [ {name = "jan.doubravsky@gmail.com"} diff --git a/scripts/filter_magnets_gui.py b/scripts/filter_magnets_gui.py new file mode 100644 index 0000000..9604757 --- /dev/null +++ b/scripts/filter_magnets_gui.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +"""Minimal PySide6 GUI for filtering magnet lists from ``rargb_magnets.py``. + +Just a text box on top and a list below — type to filter live (same syntax as +the CLI: space-separated AND terms, ``-term`` to exclude). Double-click or press +Enter on a row to copy its magnet link to the clipboard. + + python tools/filter_magnets_gui.py [files/glob/dir ...] + +With no arguments it loads ``magnets_*.txt`` from the current directory. The +loading/filtering logic is reused from ``filter_magnets.py`` in this folder. +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +# Reuse the CLI tool's parsing/filtering (same folder). +sys.path.insert(0, str(Path(__file__).resolve().parent)) +from filter_magnets import Entry, load_entries, apply_filter, resolve_inputs # noqa: E402 + +from PySide6.QtCore import Qt # noqa: E402 +from PySide6.QtWidgets import ( # noqa: E402 + QApplication, QWidget, QVBoxLayout, QLineEdit, QListWidget, QListWidgetItem, +) + + +class MagnetFilter(QWidget): + def __init__(self, entries: list[Entry]) -> None: + super().__init__() + self.entries = entries + + layout = QVBoxLayout(self) + layout.setContentsMargins(6, 6, 6, 6) + + self.search = QLineEdit() + self.search.setPlaceholderText("filtr… (např. 1080p 2022 -hindi) — ↵/dvojklik = kopírovat magnet") + self.search.setClearButtonEnabled(True) + self.search.textChanged.connect(self._refilter) + layout.addWidget(self.search) + + self.list = QListWidget() + self.list.itemActivated.connect(self._copy) # Enter / double-click + layout.addWidget(self.list) + + self.resize(820, 600) + self._refilter("") + self.search.setFocus() + + def _refilter(self, text: str) -> None: + self.list.clear() + for entry in apply_filter(self.entries, text): + short = entry.magnet.split("&", 1)[0] # only the part before the first & + item = QListWidgetItem(f"{entry.name}\n{short}") + item.setData(Qt.UserRole, short) + item.setToolTip(short) + self.list.addItem(item) + self._update_title() + + def _copy(self, item: QListWidgetItem) -> None: + QApplication.clipboard().setText(item.data(Qt.UserRole)) + self._update_title(copied=item.text()) + + def _update_title(self, copied: str | None = None) -> None: + base = f"Magnet filtr — {self.list.count()} / {len(self.entries)}" + self.setWindowTitle(f"{base} ✓ zkopírováno" if copied else base) + + +def main() -> None: + paths = [p for p in resolve_inputs(sys.argv[1:]) if p.exists()] + if not paths: + print("Žádné vstupní soubory (magnets_*.txt) nenalezeny.", file=sys.stderr) + sys.exit(1) + entries = load_entries(paths) + if not entries: + print("Vstupní soubory neobsahují žádné magnet odkazy.", file=sys.stderr) + sys.exit(1) + + app = QApplication(sys.argv) + window = MagnetFilter(entries) + window.show() + sys.exit(app.exec()) + + +if __name__ == "__main__": + main() diff --git a/scripts/rargb_magnets.py b/scripts/rargb_magnets.py new file mode 100644 index 0000000..f96e853 --- /dev/null +++ b/scripts/rargb_magnets.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +"""Standalone scraper: collect magnet links from a rargb.to search. + +Given a search query it walks every results page +(``https://rargb.to/search/?search=`` and ``/search//?search=``), +opens each torrent's detail page and saves its magnet link. + +This is a self-contained tool — it only needs ``requests`` and +``beautifulsoup4`` and does not import anything from the Curator project. + +Examples: + python scripts/rargb_magnets.py "ubuntu 24.04" + python scripts/rargb_magnets.py test --output test_magnets.txt --max-pages 3 + python scripts/rargb_magnets.py test --tsv # also write namemagnet + +Be considerate: a polite delay is inserted between requests by default. Use the +results responsibly and respect the target site's terms and your local law. +""" + +from __future__ import annotations + +import re +import sys +import time +import argparse +from pathlib import Path +from urllib.parse import quote, urljoin + +import requests +from bs4 import BeautifulSoup + +BASE_URL = "https://rargb.to" +HEADERS = { + "User-Agent": ( + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + ), + "Accept-Language": "en-US,en;q=0.9", +} +MAGNET_RE = re.compile(r"magnet:\?[^\"'\s<>]+") + + +def search_page_url(query: str, page: int) -> str: + """URL of the N-th results page for a query (page 1 has no number).""" + q = quote(query) + if page <= 1: + return f"{BASE_URL}/search/?search={q}" + return f"{BASE_URL}/search/{page}/?search={q}" + + +def fetch(session: requests.Session, url: str, timeout: float, retries: int) -> str | None: + """GET ``url`` and return the HTML, or None after exhausting retries.""" + for attempt in range(1, retries + 1): + try: + resp = session.get(url, headers=HEADERS, timeout=timeout) + resp.raise_for_status() + return resp.text + except requests.RequestException as exc: + wait = attempt * 2 + print(f" ! chyba ({attempt}/{retries}) u {url}: {exc} — čekám {wait}s", + file=sys.stderr) + time.sleep(wait) + return None + + +def parse_result_links(html: str) -> list[tuple[str, str]]: + """Return (name, detail_url) for each result row on a search page.""" + soup = BeautifulSoup(html, "html.parser") + results: list[tuple[str, str]] = [] + seen: set[str] = set() + for row in soup.select("tr.lista2"): + link = row.find("a", href=re.compile(r"^/torrent/")) + if not link: + continue + href = link.get("href") + if not href or href in seen: + continue + seen.add(href) + name = link.get("title") or link.get_text(strip=True) or href + results.append((name.strip(), urljoin(BASE_URL, href))) + return results + + +def parse_last_page(html: str) -> int: + """Best-effort highest page number from the pager (1 if none found).""" + pages = [int(n) for n in re.findall(r"/search/(\d+)/\?search=", html)] + return max(pages) if pages else 1 + + +def extract_magnet(html: str) -> str | None: + """First magnet link found on a torrent detail page, or None.""" + match = MAGNET_RE.search(html) + return match.group(0) if match else None + + +def scrape(query: str, max_pages: int | None, delay: float, + timeout: float, retries: int) -> list[tuple[str, str]]: + """Walk all result pages and return a de-duplicated [(name, magnet)] list.""" + session = requests.Session() + collected: list[tuple[str, str]] = [] + seen_magnets: set[str] = set() + seen_details: set[str] = set() + + first_html = fetch(session, search_page_url(query, 1), timeout, retries) + if first_html is None: + print("Nepodařilo se načíst první stránku výsledků.", file=sys.stderr) + return collected + + last_page = parse_last_page(first_html) + if max_pages is not None: + last_page = min(last_page, max_pages) + print(f"Dotaz: {query!r} — stránek k projití: ~{last_page}") + + page = 1 + while True: + html = first_html if page == 1 else fetch( + session, search_page_url(query, page), timeout, retries) + if html is None: + break + + rows = parse_result_links(html) + new_rows = [(n, u) for n, u in rows if u not in seen_details] + if not new_rows: + # No fresh results → past the last real page; stop. + break + + print(f"[strana {page}] nalezeno položek: {len(new_rows)}") + for name, detail_url in new_rows: + seen_details.add(detail_url) + time.sleep(delay) + detail_html = fetch(session, detail_url, timeout, retries) + if detail_html is None: + print(f" - {name}: detail se nenačetl", file=sys.stderr) + continue + magnet = extract_magnet(detail_html) + if not magnet: + print(f" - {name}: magnet nenalezen", file=sys.stderr) + continue + if magnet in seen_magnets: + continue + seen_magnets.add(magnet) + collected.append((name, magnet)) + print(f" + {name}") + + if max_pages is not None and page >= max_pages: + break + page += 1 + if page > last_page: + # Probe one page past the detected last page in case the pager was + # windowed; the empty-results check above will stop us if it's truly + # the end. + last_page = page + time.sleep(delay) + + return collected + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Vyparsuje magnet odkazy z vyhledávání na rargb.to.") + parser.add_argument("query", help="Vyhledávací dotaz (např. \"ubuntu 24.04\")") + parser.add_argument("-o", "--output", type=Path, + help="Výstupní soubor (výchozí: magnets_.txt)") + parser.add_argument("--max-pages", type=int, default=None, + help="Maximální počet stránek (výchozí: všechny)") + parser.add_argument("--delay", type=float, default=1.0, + help="Prodleva mezi requesty v sekundách (výchozí: 1.0)") + parser.add_argument("--timeout", type=float, default=20.0, + help="Timeout requestu v sekundách (výchozí: 20)") + parser.add_argument("--retries", type=int, default=3, + help="Počet pokusů při chybě (výchozí: 3)") + parser.add_argument("--tsv", action="store_true", + help="Uložit i \\t vedle čistých magnetů") + args = parser.parse_args() + + output = args.output or Path( + f"magnets_{re.sub(r'[^A-Za-z0-9._-]+', '_', args.query).strip('_')}.txt") + + results = scrape(args.query, args.max_pages, args.delay, args.timeout, args.retries) + + if not results: + print("Nenalezeny žádné magnet odkazy.") + sys.exit(1) + + output.write_text("".join(f"{magnet}\n" for _, magnet in results), encoding="utf-8") + print(f"\nUloženo {len(results)} magnet odkazů do: {output}") + + if args.tsv: + tsv_path = output.with_suffix(".tsv") + tsv_path.write_text( + "".join(f"{name}\t{magnet}\n" for name, magnet in results), encoding="utf-8") + print(f"Uloženo také název+magnet do: {tsv_path}") + + +if __name__ == "__main__": + main() diff --git a/scripts/split_country_tags.py b/scripts/split_country_tags.py new file mode 100644 index 0000000..493f1bf --- /dev/null +++ b/scripts/split_country_tags.py @@ -0,0 +1,120 @@ +"""One-off migration: split combined country tags in a pool's metadata index. + +Before multi-country support, a co-production fetched from ČSFD was stored as a +single ``"Země původu/USA / Velká Británie"`` tag. This rewrites each such tag +into one tag per country (``"Země původu/USA"`` + ``"Země původu/Velká +Británie"``), de-duplicating within each record. A timestamped backup of the +index is written before saving. + +Usage: + poetry run python scripts/split_country_tags.py [] [--category "Země původu"] + +If ```` is omitted, the pool from the global config is used. +""" + +from __future__ import annotations + +import sys +import json +import shutil +import argparse +from pathlib import Path +from datetime import datetime + +from loguru import logger + +# Allow running as a plain script (``python scripts/...``) by exposing the repo root. +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from src.core.config import load_global_config # noqa: E402 +from src.core.pool_index import INDEX_FILENAME # noqa: E402 + + +def _split_record_tags(tags: list[str], category: str) -> tuple[list[str], int]: + """Return (rewritten tags, number of combined tags split) for one record. + + Order is preserved; duplicates produced by the split are dropped. + """ + prefix = f"{category}/" + result: list[str] = [] + seen: set[str] = set() + split_count = 0 + + def _add(tag: str) -> None: + if tag not in seen: + seen.add(tag) + result.append(tag) + + for tag in tags: + if isinstance(tag, str) and tag.startswith(prefix) and "/" in tag[len(prefix):]: + value = tag[len(prefix):] + countries = [c.strip() for c in value.split("/") if c.strip()] + for country in countries: + _add(f"{prefix}{country}") + split_count += 1 + else: + _add(tag) + return result, split_count + + +def migrate(index_path: Path, category: str) -> int: + """Split combined ``category`` tags in place; return number of tags split.""" + with open(index_path, "r", encoding="utf-8") as f: + data = json.load(f) + + movies: dict[str, dict] = data.get("movies", {}) + total_split = 0 + affected = 0 + for key, record in movies.items(): + tags = record.get("tags", []) + new_tags, split_count = _split_record_tags(tags, category) + if split_count: + record["tags"] = new_tags + total_split += split_count + affected += 1 + logger.debug(f"{key}: {split_count} combined tag(s) split") + + if total_split == 0: + logger.info(f"No combined '{category}/…' tags found — nothing to migrate") + return 0 + + backup = index_path.with_suffix( + index_path.suffix + f".bak-{datetime.now():%Y%m%d-%H%M%S}" + ) + shutil.copy2(index_path, backup) + logger.info(f"Backup written: {backup}") + + with open(index_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + logger.info( + f"Split {total_split} combined '{category}' tag(s) across {affected} record(s)" + ) + return total_split + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "pool_dir", + nargs="?", + help="Pool root (default: pool_dir from the global config)", + ) + parser.add_argument( + "--category", default="Země původu", help="Tag category to split" + ) + args = parser.parse_args() + + pool_dir = args.pool_dir or load_global_config().get("pool_dir") + if not pool_dir: + parser.error("No pool_dir given and none configured in the global config") + + index_path = Path(pool_dir) / INDEX_FILENAME + if not index_path.exists(): + parser.error(f"No index found at {index_path}") + + migrate(index_path, args.category) + + +if __name__ == "__main__": + main() diff --git a/scripts/strip_tag_categories.py b/scripts/strip_tag_categories.py new file mode 100644 index 0000000..bc26431 --- /dev/null +++ b/scripts/strip_tag_categories.py @@ -0,0 +1,101 @@ +"""One-off migration: drop all tags of given categories from a pool's index. + +Used to remove tag categories that turned out to be a bad idea (e.g. Režie / +Herec produced far too many folders). Cached ČSFD data is left intact — only the +``tags`` lists are pruned. A timestamped backup of the index is written first. + +Usage: + poetry run python scripts/strip_tag_categories.py [] \ + --categories "Režie" "Herec" +""" + +from __future__ import annotations + +import sys +import json +import shutil +import argparse +from pathlib import Path +from datetime import datetime + +from loguru import logger + +# Allow running as a plain script (``python scripts/...``) by exposing the repo root. +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from src.core.config import load_global_config # noqa: E402 +from src.core.pool_index import INDEX_FILENAME # noqa: E402 + + +def _strip(tags: list[str], prefixes: tuple[str, ...]) -> tuple[list[str], int]: + """Return (kept tags, number removed) dropping tags under any prefix.""" + kept = [t for t in tags if not (isinstance(t, str) and t.startswith(prefixes))] + return kept, len(tags) - len(kept) + + +def migrate(index_path: Path, categories: list[str]) -> int: + """Remove all tags of ``categories`` in place; return number of tags removed.""" + prefixes = tuple(f"{c}/" for c in categories) + + with open(index_path, "r", encoding="utf-8") as f: + data = json.load(f) + + movies: dict[str, dict] = data.get("movies", {}) + total_removed = 0 + affected = 0 + for key, record in movies.items(): + tags = record.get("tags", []) + kept, removed = _strip(tags, prefixes) + if removed: + record["tags"] = kept + total_removed += removed + affected += 1 + logger.debug(f"{key}: removed {removed} tag(s)") + + if total_removed == 0: + logger.info(f"No tags in {categories} found — nothing to migrate") + return 0 + + backup = index_path.with_suffix( + index_path.suffix + f".bak-{datetime.now():%Y%m%d-%H%M%S}" + ) + shutil.copy2(index_path, backup) + logger.info(f"Backup written: {backup}") + + with open(index_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + logger.info( + f"Removed {total_removed} tag(s) of {categories} across {affected} record(s)" + ) + return total_removed + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "pool_dir", + nargs="?", + help="Pool root (default: pool_dir from the global config)", + ) + parser.add_argument( + "--categories", + nargs="+", + default=["Režie", "Herec"], + help="Tag categories to strip", + ) + args = parser.parse_args() + + pool_dir = args.pool_dir or load_global_config().get("pool_dir") + if not pool_dir: + parser.error("No pool_dir given and none configured in the global config") + + index_path = Path(pool_dir) / INDEX_FILENAME + if not index_path.exists(): + parser.error(f"No index found at {index_path}") + + migrate(index_path, args.categories) + + +if __name__ == "__main__": + main() diff --git a/src/_version.py b/src/_version.py index 941897d..e922c7e 100644 --- a/src/_version.py +++ b/src/_version.py @@ -1,2 +1,2 @@ """Auto-generated — do not edit manually.""" -__version__ = "0.1.0" +__version__ = "1.0.0" diff --git a/src/core/csfd.py b/src/core/csfd.py index 63527f7..6a19827 100644 --- a/src/core/csfd.py +++ b/src/core/csfd.py @@ -48,6 +48,9 @@ ANUBIS_PASS_PATH = "/.within.website/x/cmd/anubis/api/pass-challenge" # Safety cap so a difficulty bump can never spin forever (difficulty 1 needs ~16). ANUBIS_MAX_NONCE = 50_000_000 +# Keep only the top-billed cast from a movie's actor list. +MAX_ACTORS = 10 + @dataclass class CSFDMovie: @@ -61,7 +64,9 @@ class CSFDMovie: rating: Optional[int] = None # Percentage 0-100 rating_count: Optional[int] = None duration: Optional[int] = None # Minutes - country: Optional[str] = None + # A movie can be a co-production, so the origin is a list of countries + # (ČSFD writes them slash-separated, e.g. "Japonsko / USA"). + countries: list[str] = field(default_factory=list) poster_url: Optional[str] = None plot: Optional[str] = None csfd_id: Optional[int] = None @@ -78,7 +83,7 @@ class CSFDMovie: "rating": self.rating, "rating_count": self.rating_count, "duration": self.duration, - "country": self.country, + "countries": self.countries, "poster_url": self.poster_url, "plot": self.plot, "csfd_id": self.csfd_id, @@ -87,6 +92,10 @@ class CSFDMovie: @classmethod def from_dict(cls, data: dict) -> "CSFDMovie": """Deserialize from a plain dict (e.g. loaded from .!tag cache).""" + countries = data.get("countries") + if countries is None: + # Legacy cache stored a single "country" string (possibly slash-joined) + countries = _split_countries(data.get("country")) return cls( title=data.get("title", ""), url=data.get("url", ""), @@ -97,7 +106,7 @@ class CSFDMovie: rating=data.get("rating"), rating_count=data.get("rating_count"), duration=data.get("duration"), - country=data.get("country"), + countries=countries, poster_url=data.get("poster_url"), plot=data.get("plot"), csfd_id=data.get("csfd_id"), @@ -111,11 +120,34 @@ class CSFDMovie: parts.append(f"Hodnocení: {self.rating}%") if self.genres: parts.append(f"Žánr: {', '.join(self.genres)}") + if self.countries: + parts.append(f"Země původu: {', '.join(self.countries)}") if self.directors: parts.append(f"Režie: {', '.join(self.directors)}") return " | ".join(parts) +def rating_band(rating: int) -> str: + """Bucket a 0–100 ČSFD rating into a ten-point band label (e.g. "80–89 %"). + + The top bucket spans 90–100 % so a perfect 100 still lands in a band. + """ + low = min((rating // 10) * 10, 90) + high = 100 if low == 90 else low + 9 + return f"{low}–{high} %" + + +def _split_countries(text: Optional[str]) -> list[str]: + """Split a ČSFD origin country string into individual countries. + + ČSFD writes co-productions slash-separated, e.g. ``"Japonsko / USA"`` → + ``["Japonsko", "USA"]``. ``None``/empty yields an empty list. + """ + if not text: + return [] + return [part.strip() for part in text.split("/") if part.strip()] + + def _check_dependencies(): """Check if required dependencies are installed.""" if not HAS_DEPENDENCIES: @@ -275,11 +307,11 @@ def fetch_movie(url: str, session=None) -> CSFDMovie: if movie_data.get("plot") is None: movie_data["plot"] = _extract_plot(soup) - # Get country and year from origin info + # Get countries and year from origin info origin_info = _extract_origin_info(soup) if origin_info: - if movie_data.get("country") is None: - movie_data["country"] = origin_info.get("country") + if not movie_data.get("countries"): + movie_data["countries"] = origin_info.get("countries", []) if movie_data.get("year") is None: movie_data["year"] = origin_info.get("year") if movie_data.get("duration") is None: @@ -289,6 +321,9 @@ def fetch_movie(url: str, session=None) -> CSFDMovie: if not movie_data.get("genres"): movie_data["genres"] = _extract_genres(soup) + # Keep only the leading cast (ČSFD lists them in billing order) + movie_data["actors"] = movie_data.get("actors", [])[:MAX_ACTORS] + return CSFDMovie(**movie_data) @@ -303,7 +338,7 @@ def _extract_json_ld(soup: BeautifulSoup) -> dict: "rating": None, "rating_count": None, "duration": None, - "country": None, + "countries": [], "poster_url": None, "plot": None, } @@ -441,12 +476,13 @@ def _extract_genres(soup: BeautifulSoup) -> list[str]: def _extract_origin_info(soup: BeautifulSoup) -> dict: - """Extract country, year, duration from the origin info line. + """Extract countries, year, duration from the origin info line. CSFD separates the values with inline bullet ```` elements (no commas), so ``get_text(strip=True)`` would glue them together (e.g. "USA1999136 min"). We tokenize on those inline boundaries (and on commas, for the older format) - before extracting each field. + before extracting each field. The country segment of a co-production is + slash-separated (e.g. "USA / Velká Británie") and is split into a list. """ info: dict = {} @@ -468,20 +504,23 @@ def _extract_origin_info(soup: BeautifulSoup) -> dict: if duration_match: info["duration"] = int(duration_match.group(1)) continue - # Country: first alphabetic token that doesn't start with a digit. - if "country" not in info and not token[0].isdigit() and re.search(r"[^\W\d_]", token): - info["country"] = token + # Countries: first alphabetic token that doesn't start with a digit; + # may list several slash-separated countries for a co-production. + if "countries" not in info and not token[0].isdigit() and re.search(r"[^\W\d_]", token): + info["countries"] = _split_countries(token) return info -def search_movies(query: str, limit: int = 10) -> list[CSFDMovie]: +def search_movies(query: str, limit: int = 10, session=None) -> list[CSFDMovie]: """ Search for movies on CSFD.cz. Args: query: Search query string limit: Maximum number of results to return + session: Optional ``requests.Session`` to reuse (keeps the Anubis auth + cookie across calls so only the first lookup pays the PoW cost). Returns: List of CSFDMovie objects with basic info (title, url, year) @@ -489,8 +528,14 @@ def search_movies(query: str, limit: int = 10) -> list[CSFDMovie]: _check_dependencies() search_url = f"{CSFD_SEARCH_URL}?q={requests.utils.quote(query)}" - with requests.Session() as session: + own_session = session is None + if own_session: + session = requests.Session() + try: response = _get_page(session, search_url) + finally: + if own_session: + session.close() soup = BeautifulSoup(response.text, "html.parser") results = [] @@ -538,3 +583,60 @@ def fetch_movie_by_id(csfd_id: int) -> CSFDMovie: """ url = f"{CSFD_BASE_URL}/film/{csfd_id}/" return fetch_movie(url) + + +# Release-name tokens that mark the end of the actual title in a filename. +_RELEASE_MARKERS = { + "bluray", "blu-ray", "brrip", "bdrip", "bdremux", "remux", "webrip", "web", + "web-dl", "webdl", "hdtv", "dvdrip", "dvd", "dvd5", "dvd9", "hdrip", "cam", + "ts", "tc", "x264", "x265", "h264", "h265", "hevc", "avc", "xvid", "divx", + "aac", "ac3", "eac3", "dts", "dd5", "ddp5", "truehd", "atmos", "flac", + "10bit", "8bit", "hdr", "hdr10", "dolby", "sdr", "proper", "repack", + "extended", "unrated", "remastered", "imax", "multi", "dual", "complete", + "internal", "limited", "uncut", +} +_YEAR_RE = re.compile(r"^(19|20)\d{2}$") +_RESOLUTION_RE = re.compile(r"^\d{3,4}p$|^[24]k$", re.IGNORECASE) + + +def clean_filename_to_query(filename: str) -> str: + """Turn a (possibly release-named) filename into a ČSFD search query. + + Strips the path/extension, splits on common separators and keeps the words + before the first release marker (year, resolution, codec, source, …). The + detected year is appended back as a disambiguator. Example:: + + "Matrix.1999.1080p.BluRay.x264-GROUP.mkv" -> "Matrix 1999" + """ + from pathlib import Path + + stem = Path(filename).stem + tokens = [t for t in re.split(r"[.\s_]+", stem) if t] + + title_words: list[str] = [] + year: Optional[str] = None + for token in tokens: + bare = token.strip("()[]{}") + if _YEAR_RE.match(bare): + year = bare + break + if _RESOLUTION_RE.match(bare) or bare.lower() in _RELEASE_MARKERS: + break + # also stop at a release group glued with a dash (e.g. "x264-GROUP") + title_words.append(token) + + # If nothing survived (title started with a marker), fall back to the stem. + title = " ".join(title_words).strip() or re.sub(r"[.\s_]+", " ", stem).strip() + return f"{title} {year}".strip() if year else title + + +def find_csfd_url(query: str, session=None) -> Optional[str]: + """Return the first ČSFD film URL matching a query, or None. + + Thin wrapper over :func:`search_movies` that takes the top result. Pass a + shared ``session`` to reuse the Anubis auth cookie across several lookups. + """ + if not query.strip(): + return None + results = search_movies(query, limit=1, session=session) + return results[0].url if results else None diff --git a/src/core/file.py b/src/core/file.py index 5d18d84..5e11342 100644 --- a/src/core/file.py +++ b/src/core/file.py @@ -3,7 +3,8 @@ import json from .tag import Tag # Bump this when the csfd_cache schema changes to force re-fetch on next open. -CSFD_CACHE_VERSION = 1 +# v2: country (str) → countries (list[str]) for co-productions. +CSFD_CACHE_VERSION = 2 class File: @@ -109,6 +110,22 @@ class File: elif self.metadata_filename.exists(): self.metadata_filename.unlink() + def relocate(self, new_path: Path) -> None: + """Point this File at a new path, moving its metadata along. + + The physical file must already have been moved/renamed by the caller. + Drops the metadata under the old path (index key or sidecar) and rebinds + to the new path; call ``save_metadata()`` afterwards to write it back. + """ + old_metadata_filename = self.metadata_filename + if self.index is not None: + self.index.delete(self.file_path) + self.file_path = Path(new_path) + self.filename = self.file_path.name + self.metadata_filename = self.file_path.parent / f".{self.filename}.!tag" + if self.index is None and old_metadata_filename.exists(): + old_metadata_filename.rename(self.metadata_filename) + def set_date(self, date_str: str | None): """Nastaví datum (např. '2025-09-25') nebo None pro smazání.""" if date_str is None or date_str == "": @@ -137,9 +154,18 @@ class File: return None def apply_csfd_tags( - self, add_genres: bool = True, add_year: bool = True, add_country: bool = True + self, + add_genres: bool = True, + add_year: bool = True, + add_country: bool = True, + add_rating: bool = True, ) -> dict: - """Načte informace z CSFD a přiřadí tagy (Žánr, Rok, Země původu); cachuje data. + """Načte informace z CSFD a přiřadí tagy; cachuje data. + + Tagy: Žánr, Rok, Země původu a Hodnocení (procenta zařazená do desítkového + pásma, např. ``80–89 %``). Režie a herci se z ČSFD **stahují a cachují** + (``csfd_cache``), ale záměrně se z nich netvoří tagy ani složky — bylo by + jich příliš mnoho. Returns: dict s klíči 'success', 'movie'/'error', 'tags_added' @@ -169,8 +195,12 @@ class File: _add("Žánr", genre) if add_year and movie.year: _add("Rok", str(movie.year)) - if add_country and movie.country: - _add("Země původu", movie.country) + if add_country: + for country in movie.countries: + _add("Země původu", country) + if add_rating and movie.rating is not None: + from .csfd import rating_band + _add("Hodnocení", rating_band(movie.rating)) # Use the CSFD title if we don't have one yet if movie.title and not self.title: diff --git a/src/core/file_manager.py b/src/core/file_manager.py index 151c7c7..fa31a58 100644 --- a/src/core/file_manager.py +++ b/src/core/file_manager.py @@ -93,10 +93,13 @@ class FileManager: file_obj = File(each, self.tagmanager, index=self.index) self.filelist.append(file_obj) - def import_movie(self, source: Path, title: str, csfd_link: str | None = None) -> File: - """Copy a video file into pool/Filmy as 'Title.ext', index its metadata. + def import_movie( + self, source: Path, title: str, csfd_link: str | None = None, move: bool = False + ) -> File: + """Bring a video file into pool/Filmy as 'Title.ext' and index its metadata. - The original file is left in place (non-destructive copy). + By default the original is **copied** (non-destructive). With ``move=True`` + the source file is moved into the pool instead, leaving nothing behind. """ movies = self.movies_dir pool = self.pool_dir @@ -117,7 +120,10 @@ class FileManager: target = movies / f"{safe_title}_{counter}{source.suffix}" counter += 1 - shutil.copy2(source, target) + if move: + shutil.move(str(source), str(target)) + else: + shutil.copy2(source, target) file_obj = File(target, self.tagmanager, index=self.index) file_obj.title = safe_title @@ -129,6 +135,40 @@ class FileManager: self.on_files_changed(self.filelist) return file_obj + def rename_movie(self, file_obj: File, new_title: str) -> File: + """Rename a pooled movie's file to ``.`` and reindex it. + + Renames the physical file in pool/Filmy (keeping its extension), moves + the metadata to the new key, and syncs ``title``/``filename``. The + extension is preserved; ``new_title`` is the bare name without it. + + Raises: + ValueError: empty name or a name containing a path separator. + FileExistsError: another pooled file already uses that name. + """ + new_title = new_title.strip() + if not new_title: + raise ValueError("Název nesmí být prázdný.") + if "/" in new_title or "\\" in new_title: + raise ValueError("Název nesmí obsahovat lomítka.") + + old_path = file_obj.file_path + new_path = old_path.with_name(f"{new_title}{old_path.suffix}") + if new_path == old_path: + return file_obj # no change + + if new_path.exists(): + raise FileExistsError(f"Soubor „{new_path.name}“ už v poolu existuje.") + + old_path.rename(new_path) + file_obj.relocate(new_path) + file_obj.title = new_title + file_obj.save_metadata() + + if self.on_files_changed: + self.on_files_changed(self.filelist) + return file_obj + def append(self, folder: Path) -> None: """Add a folder to scan for files""" self.folders.append(folder) diff --git a/src/core/hardlink_manager.py b/src/core/hardlink_manager.py index cc75445..047a838 100644 --- a/src/core/hardlink_manager.py +++ b/src/core/hardlink_manager.py @@ -19,12 +19,21 @@ Example: """ import os from pathlib import Path -from typing import List, Tuple, Optional +from typing import List, Tuple, Optional, Dict, Set from .file import File class HardlinkManager: - """Manager for creating hardlink-based directory structures from tagged files.""" + """Manager for creating hardlink-based directory structures from tagged files. + + The output layout is driven by a *category → root folder* mapping + (``category_roots``). Each tag is placed at + ``output///``; an empty root means the tag's own + folders sit directly at the output root (e.g. genre folders next to the + "Dle roku" / "Dle země původu" folders). The legacy ``categories`` list + (folder == category name) is still accepted and treated as the identity + mapping ``{cat: cat}``. + """ def __init__(self, output_dir: Path): """ @@ -37,11 +46,61 @@ class HardlinkManager: self.created_links: List[Path] = [] self.errors: List[Tuple[Path, str]] = [] + def _resolve_roots( + self, + categories: Optional[List[str]], + category_roots: Optional[Dict[str, str]], + ) -> Optional[Dict[str, str]]: + """Normalize the two filter styles into a category → root-folder map. + + ``None`` means "all categories", folder == category name. + """ + if category_roots is not None: + return dict(category_roots) + if categories is not None: + return {cat: cat for cat in categories} + return None + + def _target_dir(self, tag, roots: Optional[Dict[str, str]]) -> Optional[Path]: + """Output directory for a tag, or None if its category is excluded.""" + if roots is None: + folder = tag.category + elif tag.category in roots: + folder = roots[tag.category] + else: + return None + base = self.output_dir / folder if folder else self.output_dir + return base / tag.name + + def _managed_top_dirs( + self, files: List[File], roots: Optional[Dict[str, str]] + ) -> Optional[Set[str]]: + """Top-level output folders owned by the tag tree (None = all of them). + + For a category with a non-empty root the root folder is owned; for a + category placed at the output root (empty root, e.g. genres) each of its + tag names is its own top-level folder. This lets cleanup skip unrelated + root entries such as the copy-as-is mirror (Seriály). + """ + if roots is None: + return None + tops: Set[str] = set() + for cat, folder in roots.items(): + if folder: + tops.add(folder) + else: + for file_obj in files: + for tag in file_obj.tags: + if tag.category == cat: + tops.add(tag.name) + return tops + def create_structure_for_files( self, files: List[File], categories: Optional[List[str]] = None, - dry_run: bool = False + dry_run: bool = False, + category_roots: Optional[Dict[str, str]] = None, ) -> Tuple[int, int]: """ Create hardlink structure for given files based on their tags. @@ -50,6 +109,8 @@ class HardlinkManager: files: List of File objects to process categories: Optional list of categories to include (None = all) dry_run: If True, only simulate without creating actual links + category_roots: Optional category → root-folder map (see class doc); + overrides ``categories`` when given. Returns: Tuple of (successful_links, failed_links) @@ -57,6 +118,7 @@ class HardlinkManager: self.created_links = [] self.errors = [] + roots = self._resolve_roots(categories, category_roots) success_count = 0 fail_count = 0 @@ -65,12 +127,10 @@ class HardlinkManager: continue for tag in file_obj.tags: - # Skip if category filter is set and this category is not included - if categories is not None and tag.category not in categories: + # Resolve the target dir; None means this category is excluded + target_dir = self._target_dir(tag, roots) + if target_dir is None: continue - - # Create target directory path: output/category/tag_name/ - target_dir = self.output_dir / tag.category / tag.name target_file = target_dir / file_obj.filename try: @@ -204,17 +264,25 @@ class HardlinkManager: except OSError: pass - def get_preview(self, files: List[File], categories: Optional[List[str]] = None) -> List[Tuple[Path, Path]]: + def get_preview( + self, + files: List[File], + categories: Optional[List[str]] = None, + category_roots: Optional[Dict[str, str]] = None, + ) -> List[Tuple[Path, Path]]: """ Get a preview of what links would be created. Args: files: List of File objects categories: Optional list of categories to include + category_roots: Optional category → root-folder map (overrides + ``categories`` when given). Returns: List of tuples (source_path, target_path) """ + roots = self._resolve_roots(categories, category_roots) preview = [] for file_obj in files: @@ -222,10 +290,9 @@ class HardlinkManager: continue for tag in file_obj.tags: - if categories is not None and tag.category not in categories: + target_dir = self._target_dir(tag, roots) + if target_dir is None: continue - - target_dir = self.output_dir / tag.category / tag.name target_file = target_dir / file_obj.filename preview.append((file_obj.file_path, target_file)) @@ -235,26 +302,33 @@ class HardlinkManager: def find_obsolete_links( self, files: List[File], - categories: Optional[List[str]] = None + categories: Optional[List[str]] = None, + category_roots: Optional[Dict[str, str]] = None, ) -> List[Tuple[Path, Path]]: """ Find hardlinks in the output directory that no longer match file tags. - Scans the output directory for hardlinks that point to source files, - but whose category/tag path no longer matches the file's current tags. + Scans the managed parts of the output directory for hardlinks that point + to source files but whose path no longer matches the file's current tags. + Only the tag-tree's own top-level folders are scanned, so copy-as-is + mirrors (e.g. Seriály) are left untouched. Args: files: List of File objects (source files) categories: Optional list of categories to check (None = all) + category_roots: Optional category → root-folder map (overrides + ``categories`` when given). Returns: List of tuples (link_path, source_path) for obsolete links """ - obsolete = [] + obsolete: List[Tuple[Path, Path]] = [] if not self.output_dir.exists(): return obsolete + roots = self._resolve_roots(categories, category_roots) + # Build a map of source file inodes to File objects inode_to_file: dict[int, File] = {} for file_obj in files: @@ -272,44 +346,33 @@ class HardlinkManager: expected_paths[inode] = set() for tag in file_obj.tags: - if categories is not None and tag.category not in categories: + target_dir = self._target_dir(tag, roots) + if target_dir is None: continue - target = self.output_dir / tag.category / tag.name / file_obj.filename - expected_paths[inode].add(target) + expected_paths[inode].add(target_dir / file_obj.filename) except OSError: continue - # Scan output directory for existing hardlinks - for category_dir in self.output_dir.iterdir(): - if not category_dir.is_dir(): + # Scan only the tag-tree's own top-level folders (skip copy-as-is mirrors) + top_dirs = self._managed_top_dirs(files, roots) + for top in self.output_dir.iterdir(): + if not top.is_dir(): + continue + if top_dirs is not None and top.name not in top_dirs: continue - # Filter by categories if specified - if categories is not None and category_dir.name not in categories: - continue - - for tag_dir in category_dir.iterdir(): - if not tag_dir.is_dir(): + # Depth-agnostic: genres sit one level deep, "Dle roku"/"Dle země + # původu" two levels deep — walk all files under the managed folder. + for link_file in top.rglob("*"): + if not link_file.is_file(): + continue + try: + link_inode = link_file.stat().st_ino + if link_inode in expected_paths: + if link_file not in expected_paths[link_inode]: + obsolete.append((link_file, inode_to_file[link_inode].file_path)) + except OSError: continue - - for link_file in tag_dir.iterdir(): - if not link_file.is_file(): - continue - - try: - link_inode = link_file.stat().st_ino - - # Check if this inode belongs to one of our source files - if link_inode in inode_to_file: - source_file = inode_to_file[link_inode] - - # Check if this link path is expected - if link_inode in expected_paths: - if link_file not in expected_paths[link_inode]: - # This link exists but tag was removed - obsolete.append((link_file, source_file.file_path)) - except OSError: - continue return obsolete @@ -317,7 +380,8 @@ class HardlinkManager: self, files: List[File], categories: Optional[List[str]] = None, - dry_run: bool = False + dry_run: bool = False, + category_roots: Optional[Dict[str, str]] = None, ) -> Tuple[int, List[Path]]: """ Remove hardlinks that no longer match file tags. @@ -326,11 +390,13 @@ class HardlinkManager: files: List of File objects categories: Optional list of categories to check dry_run: If True, only return what would be removed + category_roots: Optional category → root-folder map (overrides + ``categories`` when given). Returns: Tuple of (removed_count, list_of_removed_paths) """ - obsolete = self.find_obsolete_links(files, categories) + obsolete = self.find_obsolete_links(files, categories, category_roots) removed_paths = [] if dry_run: @@ -352,7 +418,8 @@ class HardlinkManager: self, files: List[File], categories: Optional[List[str]] = None, - dry_run: bool = False + dry_run: bool = False, + category_roots: Optional[Dict[str, str]] = None, ) -> Tuple[int, int, int, int]: """ Synchronize hardlink structure with current file tags. @@ -365,19 +432,25 @@ class HardlinkManager: files: List of File objects categories: Optional list of categories to sync dry_run: If True, only simulate + category_roots: Optional category → root-folder map (overrides + ``categories`` when given). Returns: Tuple of (created, create_failed, removed, remove_failed) """ # First find how many obsolete links there are - obsolete_count = len(self.find_obsolete_links(files, categories)) + obsolete_count = len(self.find_obsolete_links(files, categories, category_roots)) # Remove obsolete links - removed, removed_paths = self.remove_obsolete_links(files, categories, dry_run) + removed, removed_paths = self.remove_obsolete_links( + files, categories, dry_run, category_roots + ) remove_failed = obsolete_count - removed if not dry_run else 0 # Then create new links - created, create_failed = self.create_structure_for_files(files, categories, dry_run) + created, create_failed = self.create_structure_for_files( + files, categories, dry_run, category_roots + ) return created, create_failed, removed, remove_failed diff --git a/src/ui/qt_app.py b/src/ui/qt_app.py index e76e07c..b8d085f 100644 --- a/src/ui/qt_app.py +++ b/src/ui/qt_app.py @@ -12,15 +12,15 @@ import os import sys import subprocess from pathlib import Path -from typing import List +from typing import List, Optional -from PySide6.QtCore import Qt +from PySide6.QtCore import Qt, QTimer from PySide6.QtGui import QAction, QKeySequence from PySide6.QtWidgets import ( QApplication, QMainWindow, QWidget, QSplitter, QTreeWidget, QTreeWidgetItem, QTableWidget, QTableWidgetItem, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QFileDialog, QMessageBox, QInputDialog, QDialog, QDialogButtonBox, - QFormLayout, QHeaderView, QMenu, QAbstractItemView, + QHeaderView, QMenu, QAbstractItemView, QCheckBox, ) from src.core.file_manager import FileManager @@ -30,39 +30,125 @@ from src.core.tag import Tag from src.core.constants import APP_NAME, VERSION from src.core.hardlink_manager import HardlinkManager -# Categories that drive the generated Filmotéka tree (see PROJECT.md) -FILMOTEKA_CATEGORIES = ["Rok", "Žánr", "Země původu", "Hodnocení"] +# Layout of the generated Filmotéka tree: category → root folder under the +# output (see PROJECT.md). Genres sit directly at the output root (next to the +# copy-as-is Seriály mirror); Rok and Země původu get their own grouping folder. +FILMOTEKA_CATEGORY_ROOTS = { + "Žánr": "", + "Rok": "Dle roku", + "Země původu": "Dle země původu", + "Hodnocení": "Dle hodnocení", +} -class ImportMovieDialog(QDialog): - """Collect the Title and ČSFD link for a movie being imported into the pool.""" +class ImportMoviesDialog(QDialog): + """Collect a Title + ČSFD link per file for a batch import into the pool. - def __init__(self, parent: QWidget, default_title: str) -> None: + One row per source file (filename shown, Title and ČSFD link editable). More + files can be added from inside the dialog. A single toggle decides whether + the files are copied (default, non-destructive) or moved into the pool. + """ + + def __init__(self, parent: QWidget, sources: List[Path]) -> None: super().__init__(parent) - self.setWindowTitle("Importovat film do poolu") - self.setMinimumWidth(420) + self.setWindowTitle("Importovat filmy do poolu") + self.setMinimumSize(680, 360) + + # (source path, title field, ČSFD field) per row + self._rows: list[tuple[Path, QLineEdit, QLineEdit]] = [] layout = QVBoxLayout(self) - form = QFormLayout() - self.title_edit = QLineEdit(default_title) - self.csfd_edit = QLineEdit() - self.csfd_edit.setPlaceholderText("https://www.csfd.cz/film/...") - form.addRow("Název:", self.title_edit) - form.addRow("ČSFD odkaz:", self.csfd_edit) - layout.addLayout(form) + + self.table = QTableWidget(0, 3) + self.table.setHorizontalHeaderLabels(["Soubor", "Název", "ČSFD odkaz"]) + self.table.setEditTriggers(QAbstractItemView.NoEditTriggers) + header = self.table.horizontalHeader() + header.setSectionResizeMode(0, QHeaderView.ResizeToContents) + header.setSectionResizeMode(1, QHeaderView.Stretch) + header.setSectionResizeMode(2, QHeaderView.Stretch) + layout.addWidget(self.table) + + add_row = QHBoxLayout() + add_btn = QPushButton("➕ Přidat soubory…") + add_btn.clicked.connect(self._add_files) + add_row.addWidget(add_btn) + find_btn = QPushButton("🔎 Najít ČSFD odkazy") + find_btn.setToolTip("Vyhledá na ČSFD podle názvu a vyplní prázdné odkazy") + find_btn.clicked.connect(self._autofill_csfd) + add_row.addWidget(find_btn) + add_row.addStretch(1) + layout.addLayout(add_row) + + self.move_check = QCheckBox("Přesunout soubory do poolu (jinak zkopírovat)") + layout.addWidget(self.move_check) buttons = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel) buttons.accepted.connect(self.accept) buttons.rejected.connect(self.reject) layout.addWidget(buttons) - @property - def title(self) -> str: - return self.title_edit.text().strip() + for source in sources: + self._append_row(source) + + def _append_row(self, source: Path) -> None: + row = self.table.rowCount() + self.table.insertRow(row) + name_item = QTableWidgetItem(source.name) + name_item.setFlags(Qt.ItemIsEnabled) + self.table.setItem(row, 0, name_item) + title_edit = QLineEdit(source.stem) + csfd_edit = QLineEdit() + csfd_edit.setPlaceholderText("https://www.csfd.cz/film/...") + self.table.setCellWidget(row, 1, title_edit) + self.table.setCellWidget(row, 2, csfd_edit) + self._rows.append((source, title_edit, csfd_edit)) + + def _add_files(self) -> None: + paths, _ = QFileDialog.getOpenFileNames(self, "Vyber video soubory") + for path in paths: + self._append_row(Path(path)) + + def _autofill_csfd(self) -> None: + """Fill empty ČSFD fields by searching ČSFD for each file's cleaned name.""" + import requests + from src.core import csfd + + targets = [(t, c) for _, t, c in self._rows if not c.text().strip()] + if not targets: + QMessageBox.information(self, "ČSFD", "Všechny řádky už mají odkaz.") + return + + found = 0 + QApplication.setOverrideCursor(Qt.WaitCursor) + try: + with requests.Session() as session: + for title_edit, csfd_edit in targets: + query = csfd.clean_filename_to_query(title_edit.text()) + try: + url = csfd.find_csfd_url(query, session=session) + except Exception: # noqa: BLE001 — network/parse failure for one row + url = None + if url: + csfd_edit.setText(url) + found += 1 + finally: + QApplication.restoreOverrideCursor() + + QMessageBox.information( + self, "ČSFD", f"Vyplněno {found} z {len(targets)} hledaných odkazů." + ) @property - def csfd_link(self) -> str: - return self.csfd_edit.text().strip() + def move_files(self) -> bool: + return self.move_check.isChecked() + + def entries(self) -> list[tuple[Path, str, str]]: + """Return (source, title, csfd_link) per row; title falls back to stem.""" + result: list[tuple[Path, str, str]] = [] + for source, title_edit, csfd_edit in self._rows: + title = title_edit.text().strip() or source.stem + result.append((source, title, csfd_edit.text().strip())) + return result class AssignTagsDialog(QDialog): @@ -127,6 +213,9 @@ class QtApp(QMainWindow): self.filehandler = filehandler self.tagmanager = tagmanager self.file_rows: dict[int, File] = {} # table row -> File + # Active AND-filter as the source of truth (survives sidebar rebuilds); + # holds tag full_paths ("Category/Name"). + self._active_filter: set[str] = set() self.filehandler.on_files_changed = lambda _=None: self.refresh_table() self.setWindowTitle(f"{APP_NAME} {VERSION} — Filmotéka") @@ -163,7 +252,8 @@ class QtApp(QMainWindow): self._add_action(pool_menu, "Konec", self.close, "Ctrl+Q") movie_menu = bar.addMenu("&Filmy") - self._add_action(movie_menu, "Importovat film…", self.import_movie, "Ctrl+I") + self._add_action(movie_menu, "Importovat filmy…", self.import_movie, "Ctrl+I") + self._add_action(movie_menu, "Přejmenovat…", self.rename_movie, "F2") self._add_action(movie_menu, "Přiřadit štítky…", self.assign_tags, "Ctrl+T") self._add_action(movie_menu, "Nastavit datum…", self.set_date, "Ctrl+D") self._add_action(movie_menu, "Upravit ČSFD odkaz…", self.edit_csfd) @@ -208,7 +298,7 @@ class QtApp(QMainWindow): self.search_edit.setPlaceholderText("Hledat film…") self.search_edit.textChanged.connect(self.refresh_table) search_row.addWidget(self.search_edit) - import_btn = QPushButton("➕ Importovat film") + import_btn = QPushButton("➕ Importovat filmy") import_btn.clicked.connect(self.import_movie) search_row.addWidget(import_btn) main_layout.addLayout(search_row) @@ -241,14 +331,24 @@ class QtApp(QMainWindow): # Sidebar (tag filter) # ------------------------------------------------------------------ - def refresh_sidebar(self) -> None: - self.tag_tree.blockSignals(True) - self.tag_tree.clear() + def refresh_sidebar(self, filtered: Optional[List[File]] = None) -> None: + """Rebuild the filter tree, preserving the active filter and updating counts. + + The count after each tag is how many of ``filtered`` (the movies matching + the current filter; all movies when nothing is checked) also carry that + tag — i.e. how many would remain if that tag were checked. Check state is + restored from ``self._active_filter`` so it survives the rebuild. + """ + if filtered is None: + filtered = self.filehandler.filter_files_by_tags(self._active_filter_tags()) + counts: dict[str, int] = {} - for f in self.filehandler.filelist: + for f in filtered: for t in f.tags: counts[t.full_path] = counts.get(t.full_path, 0) + 1 + self.tag_tree.blockSignals(True) + self.tag_tree.clear() for category in self.tagmanager.get_categories(): cat_item = QTreeWidgetItem([category]) cat_item.setFlags(Qt.ItemIsEnabled) @@ -256,27 +356,32 @@ class QtApp(QMainWindow): cat_item.setExpanded(True) for tag in self.tagmanager.get_tags_in_category(category): count = counts.get(tag.full_path, 0) - label = f"{tag.name} ({count})" if count else tag.name - item = QTreeWidgetItem([label]) + item = QTreeWidgetItem([f"{tag.name} ({count})"]) item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) - item.setCheckState(0, Qt.Unchecked) + checked = tag.full_path in self._active_filter + item.setCheckState(0, Qt.Checked if checked else Qt.Unchecked) item.setData(0, Qt.UserRole, tag.full_path) cat_item.addChild(item) self.tag_tree.blockSignals(False) - def _on_tag_filter_changed(self, _item, _col) -> None: - self.refresh_table() + def _on_tag_filter_changed(self, item, _col) -> None: + full_path = item.data(0, Qt.UserRole) + if full_path is None: + return # category header row, not a tag + if item.checkState(0) == Qt.Checked: + self._active_filter.add(full_path) + else: + self._active_filter.discard(full_path) + # Defer the refresh: rebuilding the tree (clear()) *inside* its own + # itemChanged signal deletes the item Qt is still processing → SIGSEGV. + # Running it on the next event-loop tick lets Qt finish first. + QTimer.singleShot(0, self.refresh_table) - def _checked_filter_tags(self) -> List[Tag]: + def _active_filter_tags(self) -> List[Tag]: tags: List[Tag] = [] - for i in range(self.tag_tree.topLevelItemCount()): - cat = self.tag_tree.topLevelItem(i) - for j in range(cat.childCount()): - child = cat.child(j) - if child.checkState(0) == Qt.Checked: - full_path = child.data(0, Qt.UserRole) - category, name = full_path.split("/", 1) - tags.append(Tag(category, name)) + for full_path in self._active_filter: + category, name = full_path.split("/", 1) + tags.append(Tag(category, name)) return tags # ------------------------------------------------------------------ @@ -284,15 +389,18 @@ class QtApp(QMainWindow): # ------------------------------------------------------------------ def refresh_table(self, *_args) -> None: - filtered = self.filehandler.filter_files_by_tags(self._checked_filter_tags()) + # Tag filter (AND) drives both the table and the sidebar counts; the + # search box further narrows only the table. + tag_filtered = self.filehandler.filter_files_by_tags(self._active_filter_tags()) + shown = tag_filtered search = self.search_edit.text().lower() if hasattr(self, "search_edit") else "" if search: - filtered = [f for f in filtered if search in (f.title or f.filename).lower()] - filtered.sort(key=lambda f: (f.title or f.filename).lower()) + shown = [f for f in shown if search in (f.title or f.filename).lower()] + shown = sorted(shown, key=lambda f: (f.title or f.filename).lower()) - self.table.setRowCount(len(filtered)) + self.table.setRowCount(len(shown)) self.file_rows.clear() - for row, f in enumerate(filtered): + for row, f in enumerate(shown): self.file_rows[row] = f name = f.title or f.filename tags = ", ".join(t.name for t in f.tags) @@ -303,9 +411,9 @@ class QtApp(QMainWindow): for col, value in enumerate([name, tags, size]): self.table.setItem(row, col, QTableWidgetItem(value)) - self.refresh_sidebar() + self.refresh_sidebar(tag_filtered) self._update_selection_status() - self.status.showMessage(f"Zobrazeno {len(filtered)} filmů", 4000) + self.status.showMessage(f"Zobrazeno {len(shown)} filmů", 4000) @staticmethod def _format_size(size_bytes: float) -> str: @@ -322,6 +430,7 @@ class QtApp(QMainWindow): def _show_table_menu(self, pos) -> None: menu = QMenu(self) menu.addAction("Otevřít", self.open_movies) + menu.addAction("Přejmenovat…", self.rename_movie) menu.addAction("Přiřadit štítky…", self.assign_tags) menu.addAction("Nastavit datum…", self.set_date) menu.addAction("Upravit ČSFD odkaz…", self.edit_csfd) @@ -382,36 +491,51 @@ class QtApp(QMainWindow): if not self.filehandler.movies_dir: QMessageBox.warning(self, "Pool", "Nejprve nastavte pool (menu Pool → Nastavit pool).") return - path, _ = QFileDialog.getOpenFileName(self, "Vyber video soubor") - if not path: + paths, _ = QFileDialog.getOpenFileNames(self, "Vyber video soubory") + if not paths: return - source = Path(path) - dialog = ImportMovieDialog(self, default_title=source.stem) + sources = [Path(p) for p in paths] + dialog = ImportMoviesDialog(self, sources) if dialog.exec() != QDialog.Accepted: return - try: - movie = self.filehandler.import_movie(source, dialog.title, dialog.csfd_link) - except Exception as exc: # noqa: BLE001 — surface any import failure to the user - QMessageBox.critical(self, "Chyba importu", str(exc)) + entries = dialog.entries() + if not entries: return + move = dialog.move_files - # If a ČSFD link was given, enrich the movie with tags right away - if movie.csfd_link: - self.status.showMessage("Načítám z ČSFD…") + imported: list[File] = [] + errors: list[str] = [] + for source, title, csfd_link in entries: + try: + movie = self.filehandler.import_movie(source, title, csfd_link or None, move=move) + imported.append(movie) + except Exception as exc: # noqa: BLE001 — surface per-file import failures + errors.append(f"{source.name}: {exc}") + + # Enrich the freshly imported movies that carry a ČSFD link + with_links = [m for m in imported if m.csfd_link] + tags_total = 0 + if with_links: + self.status.showMessage(f"Načítám z ČSFD ({len(with_links)})…") QApplication.setOverrideCursor(Qt.WaitCursor) try: - _, tags_total, errors = self._fetch_csfd_for([movie]) + _, tags_total, csfd_errors = self._fetch_csfd_for(with_links) finally: QApplication.restoreOverrideCursor() - if errors: - QMessageBox.warning(self, "ČSFD", "Tagy se nepodařilo načíst:\n" + errors[0]) - else: - self.status.showMessage( - f"Importováno: {movie.title} (+{tags_total} tagů z ČSFD)", 5000 - ) + errors.extend(csfd_errors) self.refresh_table() - self.status.showMessage(f"Importováno: {dialog.title}", 5000) + + verb = "Přesunuto" if move else "Zkopírováno" + summary = f"{verb} {len(imported)}/{len(entries)} filmů (+{tags_total} tagů z ČSFD)." + if errors: + QMessageBox.warning( + self, "Import dokončen s chybami", + summary + "\n\nChyby:\n" + "\n".join(errors[:5]), + ) + else: + QMessageBox.information(self, "Import", summary) + self.status.showMessage(summary, 5000) def open_movies(self) -> None: for f in self._selected_movies(): @@ -456,6 +580,27 @@ class QtApp(QMainWindow): f.set_date(text.strip() or None) self.refresh_table() + def rename_movie(self) -> None: + files = self._selected_movies() + if len(files) != 1: + QMessageBox.information(self, "Přejmenovat", "Vyberte právě jeden film.") + return + f = files[0] + current = f.file_path.stem # name without extension + text, ok = QInputDialog.getText( + self, "Přejmenovat film", + f"Nový název (bez přípony {f.file_path.suffix}):", text=current, + ) + if not ok: + return + try: + self.filehandler.rename_movie(f, text) + except (ValueError, FileExistsError, OSError) as exc: + QMessageBox.warning(self, "Přejmenování selhalo", str(exc)) + return + self.refresh_table() + self.status.showMessage(f"Přejmenováno na: {f.filename}", 5000) + def edit_csfd(self) -> None: files = self._selected_movies() if len(files) != 1: @@ -536,7 +681,9 @@ class QtApp(QMainWindow): QMessageBox.information(self, "Filmotéka", "Pool je prázdný.") return manager = HardlinkManager(out) - created, create_fail, removed, remove_fail = manager.sync_structure(files, FILMOTEKA_CATEGORIES) + created, create_fail, removed, remove_fail = manager.sync_structure( + files, category_roots=FILMOTEKA_CATEGORY_ROOTS + ) # Copy-as-is folders (e.g. Seriály): mirror each 1:1 (hardlinked) pool = self.filehandler.pool_dir diff --git a/tests/test_csfd.py b/tests/test_csfd.py index a848a1b..b516fa3 100644 --- a/tests/test_csfd.py +++ b/tests/test_csfd.py @@ -17,6 +17,10 @@ from src.core.csfd import ( _extract_origin_info, _check_dependencies, _solve_anubis_pow, + _split_countries, + rating_band, + clean_filename_to_query, + find_csfd_url, ) @@ -87,7 +91,7 @@ class TestCSFDMovie: rating=85, rating_count=1000, duration=120, - country="Česko", + countries=["Česko"], poster_url="https://image.example.com/poster.jpg", plot="A test movie.", csfd_id=123 @@ -96,6 +100,7 @@ class TestCSFDMovie: assert movie.genres == ["Drama", "Thriller"] assert movie.rating == 85 assert movie.duration == 120 + assert movie.countries == ["Česko"] assert movie.csfd_id == 123 def test_csfd_movie_str(self): @@ -145,6 +150,38 @@ class TestHelperFunctions: """Test parsing invalid duration.""" assert _parse_duration("") is None assert _parse_duration("invalid") is None + + def test_split_countries_single(self): + """A single country yields a one-item list.""" + assert _split_countries("USA") == ["USA"] + + def test_split_countries_multiple(self): + """Slash-separated co-production countries are split and trimmed.""" + assert _split_countries("USA / Velká Británie") == ["USA", "Velká Británie"] + assert _split_countries("Japonsko/USA") == ["Japonsko", "USA"] + + def test_split_countries_empty(self): + """None/empty yields an empty list.""" + assert _split_countries(None) == [] + assert _split_countries("") == [] + + def test_from_dict_migrates_legacy_country(self): + """Legacy cache with a single 'country' string maps to countries list.""" + movie = CSFDMovie.from_dict({"title": "X", "country": "USA / Kanada"}) + assert movie.countries == ["USA", "Kanada"] + + def test_from_dict_uses_countries_when_present(self): + """New cache with 'countries' is used verbatim.""" + movie = CSFDMovie.from_dict({"title": "X", "countries": ["Japonsko", "USA"]}) + assert movie.countries == ["Japonsko", "USA"] + + def test_rating_band_buckets(self): + """Rating is bucketed into ten-point bands, top band spans 90–100 %.""" + assert rating_band(0) == "0–9 %" + assert rating_band(86) == "80–89 %" + assert rating_band(90) == "90–100 %" + assert rating_band(95) == "90–100 %" + assert rating_band(100) == "90–100 %" assert _parse_duration("PT") is None @@ -191,7 +228,7 @@ class TestHTMLExtraction: def test_extract_origin_info(self, soup): """Test extracting origin info (comma-separated legacy format).""" info = _extract_origin_info(soup) - assert info["country"] == "Česko" + assert info["countries"] == ["Česko"] assert info["year"] == 2020 assert info["duration"] == 120 @@ -204,10 +241,23 @@ class TestHTMLExtraction: '136 min (Alternativní 131 min)' ) info = _extract_origin_info(BeautifulSoup(html, "html.parser")) - assert info["country"] == "USA" + assert info["countries"] == ["USA"] assert info["year"] == 1999 assert info["duration"] == 136 + def test_extract_origin_info_multiple_countries(self): + """A co-production lists several slash-separated countries.""" + from bs4 import BeautifulSoup + html = ( + '
USA / Velká Británie ' + '2009 ' + ' 166 min
' + ) + info = _extract_origin_info(BeautifulSoup(html, "html.parser")) + assert info["countries"] == ["USA", "Velká Británie"] + assert info["year"] == 2009 + assert info["duration"] == 166 + def test_extract_json_ld_year_from_date_created(self): """Year is taken from JSON-LD dateCreated when present.""" from bs4 import BeautifulSoup @@ -220,6 +270,49 @@ class TestHTMLExtraction: assert data["year"] == 1999 +class TestCleanFilenameToQuery: + """Tests for turning a filename into a ČSFD search query.""" + + def test_strips_release_tags_and_keeps_year(self): + assert clean_filename_to_query( + "Matrix.1999.1080p.BluRay.x264-GROUP.mkv") == "Matrix 1999" + + def test_handles_spaces_and_parens_year(self): + assert clean_filename_to_query( + "Forrest Gump (1994) 2160p HDR.mkv") == "Forrest Gump 1994" + + def test_no_year_no_markers(self): + assert clean_filename_to_query("Amelie.mkv") == "Amelie" + + def test_underscores_and_resolution(self): + assert clean_filename_to_query("Sam_doma_720p.mkv") == "Sam doma" + + def test_falls_back_to_stem_when_starting_with_marker(self): + # No real title words before the marker → fall back to the cleaned stem + assert clean_filename_to_query("1080p.mkv") == "1080p" + + +class TestFindCsfdUrl: + """Tests for find_csfd_url (search is mocked).""" + + def test_returns_first_result_url(self): + from unittest.mock import patch + movies = [ + CSFDMovie(title="Matrix", url="https://www.csfd.cz/film/9499-matrix/"), + CSFDMovie(title="Matrix Reloaded", url="https://www.csfd.cz/film/9497-x/"), + ] + with patch("src.core.csfd.search_movies", return_value=movies): + assert find_csfd_url("Matrix 1999") == "https://www.csfd.cz/film/9499-matrix/" + + def test_returns_none_for_empty_query(self): + assert find_csfd_url(" ") is None + + def test_returns_none_when_no_results(self): + from unittest.mock import patch + with patch("src.core.csfd.search_movies", return_value=[]): + assert find_csfd_url("nonexistent film") is None + + class TestFetchMovie: """Tests for fetch_movie function.""" @@ -240,6 +333,31 @@ class TestFetchMovie: assert "Drama" in movie.genres session.get.assert_called_once() + @patch("src.core.csfd.requests") + def test_fetch_movie_caps_actors_at_ten(self, mock_requests): + """Only the first MAX_ACTORS (10) of a long cast are kept.""" + import json as _json + actors = [{"@type": "Person", "name": f"Actor {i}"} for i in range(25)] + json_ld = _json.dumps({ + "@type": "Movie", "name": "Crowded", "actor": actors, + "director": [{"@type": "Person", "name": "Dir"}], + "aggregateRating": {"ratingValue": 70, "ratingCount": 5}, + }) + html = f'' + mock_response = MagicMock() + mock_response.text = html + mock_response.raise_for_status = MagicMock() + session = _mock_session(mock_requests) + session.get.return_value = mock_response + + movie = fetch_movie("https://www.csfd.cz/film/1-crowded/") + + assert movie.directors == ["Dir"] + assert movie.rating == 70 + assert len(movie.actors) == 10 + assert movie.actors[0] == "Actor 0" + assert movie.actors[-1] == "Actor 9" + @patch("src.core.csfd.requests") def test_fetch_movie_network_error(self, mock_requests): """Test network error handling.""" diff --git a/tests/test_file.py b/tests/test_file.py index 63ad156..7280559 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -261,3 +261,72 @@ class TestFile: tag_paths2 = {tag.full_path for tag in file_obj2.tags} assert tag_paths == tag_paths2 assert file_obj2.date == "2025-01-01" + + +class TestApplyCsfdTags: + """Tests for File.apply_csfd_tags tag assignment (CSFD fetch is mocked).""" + + @pytest.fixture + def tag_manager(self): + return TagManager() + + @pytest.fixture + def movie_file(self, tmp_path, tag_manager): + path = tmp_path / "Matrix.mkv" + path.write_text("x") + f = File(path, tag_manager) + f.set_csfd_link("https://www.csfd.cz/film/9499-matrix/") + return f + + def test_apply_csfd_tags_assigns_expected_categories(self, movie_file): + from unittest.mock import patch + from src.core.csfd import CSFDMovie + + movie = CSFDMovie( + title="Matrix", url="u", year=1999, genres=["Akční", "Sci-Fi"], + directors=["Lana Wachowski", "Lilly Wachowski"], + actors=["Keanu Reeves", "Laurence Fishburne"], + rating=90, countries=["USA"], + ) + with patch("src.core.csfd.fetch_movie", return_value=movie): + result = movie_file.apply_csfd_tags() + + assert result["success"] + paths = {t.full_path for t in movie_file.tags} + assert "Žánr/Akční" in paths + assert "Žánr/Sci-Fi" in paths + assert "Rok/1999" in paths + assert "Země původu/USA" in paths + assert "Hodnocení/90–100 %" in paths + + def test_apply_csfd_tags_does_not_tag_directors_or_actors(self, movie_file): + """Režie/herci se jen cachují, netvoří se z nich tagy (bylo by jich moc).""" + from unittest.mock import patch + from src.core.csfd import CSFDMovie + + movie = CSFDMovie( + title="Matrix", url="u", directors=["Lana Wachowski"], + actors=["Keanu Reeves", "Laurence Fishburne"], genres=["Drama"], + ) + with patch("src.core.csfd.fetch_movie", return_value=movie): + movie_file.apply_csfd_tags() + + paths = {t.full_path for t in movie_file.tags} + assert not any(p.startswith("Režie/") for p in paths) + assert not any(p.startswith("Herec/") for p in paths) + # …but the data is kept in the cache + cached = movie_file.get_cached_movie() + assert cached.directors == ["Lana Wachowski"] + assert cached.actors == ["Keanu Reeves", "Laurence Fishburne"] + + def test_apply_csfd_tags_can_skip_rating(self, movie_file): + from unittest.mock import patch + from src.core.csfd import CSFDMovie + + movie = CSFDMovie(title="Matrix", url="u", rating=90, genres=["Drama"]) + with patch("src.core.csfd.fetch_movie", return_value=movie): + movie_file.apply_csfd_tags(add_rating=False) + + paths = {t.full_path for t in movie_file.tags} + assert "Žánr/Drama" in paths + assert not any(p.startswith("Hodnocení/") for p in paths) diff --git a/tests/test_file_manager.py b/tests/test_file_manager.py index febf74b..8572ef8 100644 --- a/tests/test_file_manager.py +++ b/tests/test_file_manager.py @@ -592,6 +592,73 @@ class TestPoolManagement: assert movie.csfd_link == "https://csfd.cz/film/1" assert file_manager.index.get(movie.file_path) is not None + def test_import_movie_move_removes_source(self, file_manager, tmp_path): + file_manager.set_pool_dir(tmp_path / "pool") + source = tmp_path / "raw.mkv" + source.write_bytes(b"x" * 10) + + movie = file_manager.import_movie(source, "Matrix", move=True) + + assert movie.file_path == tmp_path / "pool" / "Filmy" / "Matrix.mkv" + assert movie.file_path.exists() + assert not source.exists() # moved, not copied + + def test_rename_movie_renames_file_and_reindexes(self, file_manager, tmp_path): + file_manager.set_pool_dir(tmp_path / "pool") + source = tmp_path / "raw.mkv" + source.write_bytes(b"x" * 10) + movie = file_manager.import_movie(source, "Matrix") + movie.add_tag("Žánr/Sci-Fi") + old_path = movie.file_path + + file_manager.rename_movie(movie, "Matrix Reloaded") + + new_path = tmp_path / "pool" / "Filmy" / "Matrix Reloaded.mkv" + assert movie.file_path == new_path + assert new_path.exists() + assert not old_path.exists() + assert movie.title == "Matrix Reloaded" + # metadata moved to the new key, old key gone, tags preserved + assert file_manager.index.get(new_path) is not None + assert file_manager.index.get(old_path) is None + # a fresh manager reading the index sees the renamed file with its tags + reloaded = FileManager(TagManager()) + reloaded.set_pool_dir(tmp_path / "pool") + reloaded.load_pool_movies() + assert [f.filename for f in reloaded.filelist] == ["Matrix Reloaded.mkv"] + assert "Žánr/Sci-Fi" in {t.full_path for t in reloaded.filelist[0].tags} + + def test_rename_movie_preserves_extension(self, file_manager, tmp_path): + file_manager.set_pool_dir(tmp_path / "pool") + source = tmp_path / "raw.mp4" + source.write_bytes(b"x") + movie = file_manager.import_movie(source, "Film") + + file_manager.rename_movie(movie, "Jiný název") + + assert movie.file_path.name == "Jiný název.mp4" + + def test_rename_movie_rejects_existing_name(self, file_manager, tmp_path): + file_manager.set_pool_dir(tmp_path / "pool") + (tmp_path / "a.mkv").write_bytes(b"a") + (tmp_path / "b.mkv").write_bytes(b"b") + first = file_manager.import_movie(tmp_path / "a.mkv", "Already") + second = file_manager.import_movie(tmp_path / "b.mkv", "Other") + + with pytest.raises(FileExistsError): + file_manager.rename_movie(second, "Already") + # second movie is left untouched + assert second.file_path.name == "Other.mkv" + assert first.file_path.exists() + + def test_rename_movie_rejects_empty_name(self, file_manager, tmp_path): + file_manager.set_pool_dir(tmp_path / "pool") + (tmp_path / "a.mkv").write_bytes(b"a") + movie = file_manager.import_movie(tmp_path / "a.mkv", "Name") + + with pytest.raises(ValueError): + file_manager.rename_movie(movie, " ") + def test_load_pool_movies_reads_from_index(self, file_manager, tmp_path): file_manager.set_pool_dir(tmp_path / "pool") source = tmp_path / "raw.mkv" diff --git a/tests/test_hardlink_manager.py b/tests/test_hardlink_manager.py index bf2c66f..f8bb7ed 100644 --- a/tests/test_hardlink_manager.py +++ b/tests/test_hardlink_manager.py @@ -107,6 +107,40 @@ class TestHardlinkManager: assert (temp_output_dir / "žánr" / "Komedie" / "file1.txt").exists() assert not (temp_output_dir / "rok").exists() + def test_create_structure_with_category_roots(self, files_with_tags, temp_output_dir): + """category_roots: genres sit at the output root, rok under 'Dle roku'.""" + manager = HardlinkManager(temp_output_dir) + roots = {"žánr": "", "rok": "Dle roku"} + manager.create_structure_for_files(files_with_tags, category_roots=roots) + + # Genres directly at the output root (no "žánr" wrapper folder) + assert (temp_output_dir / "Komedie" / "file1.txt").exists() + assert (temp_output_dir / "Akční" / "file1.txt").exists() + assert (temp_output_dir / "Drama" / "file2.txt").exists() + assert not (temp_output_dir / "žánr").exists() + + # Rok grouped under its own "Dle roku" folder + assert (temp_output_dir / "Dle roku" / "1988" / "file1.txt").exists() + + def test_sync_with_roots_leaves_unmanaged_mirror_untouched( + self, files_with_tags, temp_source_dir, temp_output_dir + ): + """Cleanup must not delete links in a copy-as-is mirror (e.g. Seriály).""" + manager = HardlinkManager(temp_output_dir) + roots = {"žánr": "", "rok": "Dle roku"} + manager.create_structure_for_files(files_with_tags, category_roots=roots) + + # Simulate a copy-as-is mirror holding a hardlink to a source file + mirror = temp_output_dir / "Seriály" + mirror.mkdir() + mirror_link = mirror / "file1.txt" + os.link(temp_source_dir / "file1.txt", mirror_link) + + manager.sync_structure(files_with_tags, category_roots=roots) + + # The mirror (not a managed tag folder) is left alone + assert mirror_link.exists() + def test_dry_run(self, files_with_tags, temp_output_dir): """Test dry run (bez skutečného vytváření)""" manager = HardlinkManager(temp_output_dir)