Rework Tagger fork into Curator movie-library manager (PySide6 GUI, pool index, ČSFD import)

This commit is contained in:
Jan Doubravský
2026-06-12 16:01:54 +02:00
parent dd9c7d9ec5
commit 22a14b1e41
52 changed files with 8095 additions and 0 deletions
View File
+2
View File
@@ -0,0 +1,2 @@
"""Auto-generated — do not edit manually."""
__version__ = "0.1.0"
+66
View File
@@ -0,0 +1,66 @@
"""
Application-level constants for Curator (build/runtime metadata).
This module derives the app version and debug flag. The version is loaded from
`pyproject.toml` (preferred) with a generated `src/_version.py` fallback for
frozen/PyInstaller builds. Debug mode is controlled via `.env` (`ENV_DEBUG`).
Note: per-feature constants (window size, tag colors, …) live in
`src/core/constants.py`; this module is only the version/debug surface used by
the build tooling and frozen builds.
"""
import os
import tomllib
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
_PYPROJECT_PATH = Path(__file__).parent.parent / "pyproject.toml"
_VERSION_FILE = Path(__file__).parent / "_version.py"
APP_NAME: str = "Curator"
def get_version() -> str:
"""Return the project version (pyproject → _version.py → unknown)."""
# 1. pyproject.toml (preferred)
try:
with open(_PYPROJECT_PATH, "rb") as f:
version = tomllib.load(f)["project"]["version"]
# Cache a fallback for frozen/PyInstaller builds (best-effort).
try:
_VERSION_FILE.write_text(
f'"""Auto-generated — do not edit manually."""\n__version__ = "{version}"\n',
encoding="utf-8",
)
except OSError:
pass
return version
except (FileNotFoundError, KeyError):
pass
# 2. generated _version.py
try:
from src._version import __version__ # type: ignore[import]
return __version__
except ImportError:
pass
# 3. last resort
return "0.0.0-unknown"
def get_debug_mode() -> bool:
"""Return True when ENV_DEBUG is set to a truthy value in the environment."""
return os.getenv("ENV_DEBUG", "false").lower() in ("true", "1", "yes")
APP_VERSION: str = get_version()
ENV_DEBUG: bool = get_debug_mode()
APP_TITLE: str = f"{APP_NAME} v{APP_VERSION}" + ("-DEV" if ENV_DEBUG else "")
# Backwards-compatible alias used by prebuild.py
VERSION: str = APP_VERSION
View File
+115
View File
@@ -0,0 +1,115 @@
"""
Configuration management for Curator
Three levels of configuration:
1. Global config (.Curator.!gtag next to Curator.py) - app-wide settings
2. Folder config (.Curator.!ftag in project root) - folder-specific settings
3. File tags (.{filename}.!tag) - per-file metadata (handled in file.py)
"""
import json
from pathlib import Path
# Global config file (next to the main script)
GLOBAL_CONFIG_FILE = Path(__file__).parent.parent.parent / ".Curator.!gtag"
# Folder config filename
FOLDER_CONFIG_NAME = ".Curator.!ftag"
# =============================================================================
# GLOBAL CONFIG - Application settings
# =============================================================================
DEFAULT_GLOBAL_CONFIG = {
"window_geometry": "1200x800",
"window_maximized": False,
"last_folder": None,
"sidebar_width": 250,
"recent_folders": [],
"pool_dir": None, # managed pool root (single source of truth)
"filmoteka_dir": None, # generated Filmotéka output (hardlink tree)
"copyasis_folders": ["Seriály"], # pool subfolders mirrored 1:1 (copy-as-is)
}
def load_global_config() -> dict:
"""Load global application config"""
if GLOBAL_CONFIG_FILE.exists():
try:
with open(GLOBAL_CONFIG_FILE, "r", encoding="utf-8") as f:
config = json.load(f)
# Merge with defaults for any missing keys
for key, value in DEFAULT_GLOBAL_CONFIG.items():
if key not in config:
config[key] = value
return config
except Exception:
return DEFAULT_GLOBAL_CONFIG.copy()
return DEFAULT_GLOBAL_CONFIG.copy()
def save_global_config(cfg: dict):
"""Save global application config"""
with open(GLOBAL_CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
# =============================================================================
# FOLDER CONFIG - Per-folder settings
# =============================================================================
DEFAULT_FOLDER_CONFIG = {
"ignore_patterns": [],
"custom_tags": {}, # Additional tags specific to this folder
"recursive": True, # Whether to scan subfolders
"hardlink_output_dir": None, # Output directory for hardlink structure
"hardlink_categories": None, # Categories to include in hardlink (None = all)
}
def get_folder_config_path(folder: Path) -> Path:
"""Get path to folder config file"""
return folder / FOLDER_CONFIG_NAME
def load_folder_config(folder: Path) -> dict:
"""Load folder-specific config"""
config_path = get_folder_config_path(folder)
if config_path.exists():
try:
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
# Merge with defaults for any missing keys
for key, value in DEFAULT_FOLDER_CONFIG.items():
if key not in config:
config[key] = value
return config
except Exception:
return DEFAULT_FOLDER_CONFIG.copy()
return DEFAULT_FOLDER_CONFIG.copy()
def save_folder_config(folder: Path, cfg: dict):
"""Save folder-specific config"""
config_path = get_folder_config_path(folder)
with open(config_path, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
def folder_has_config(folder: Path) -> bool:
"""Check if folder has a tagger config"""
return get_folder_config_path(folder).exists()
# =============================================================================
# BACKWARDS COMPATIBILITY
# =============================================================================
def load_config():
"""Legacy function - returns global config"""
return load_global_config()
def save_config(cfg: dict):
"""Legacy function - saves global config"""
save_global_config(cfg)
+4
View File
@@ -0,0 +1,4 @@
# src/core/constants.py
VERSION = "v1.0.3"
APP_NAME = "Curator"
APP_VIEWPORT = "1000x700"
+429
View File
@@ -0,0 +1,429 @@
"""
CSFD.cz scraper module for fetching movie information.
This module provides functionality to fetch movie data from CSFD.cz (Czech-Slovak Film Database).
"""
from __future__ import annotations
import re
import json
from dataclasses import dataclass, field
from typing import Optional, TYPE_CHECKING
from urllib.parse import urljoin
try:
import requests
from bs4 import BeautifulSoup
HAS_DEPENDENCIES = True
except ImportError:
HAS_DEPENDENCIES = False
requests = None # type: ignore
BeautifulSoup = None # type: ignore
if TYPE_CHECKING:
from bs4 import BeautifulSoup
CSFD_BASE_URL = "https://www.csfd.cz"
CSFD_SEARCH_URL = "https://www.csfd.cz/hledat/"
# User agent to avoid being blocked
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept-Language": "cs,en;q=0.9",
}
@dataclass
class CSFDMovie:
"""Represents movie data from CSFD.cz"""
title: str
url: str
year: Optional[int] = None
genres: list[str] = field(default_factory=list)
directors: list[str] = field(default_factory=list)
actors: list[str] = field(default_factory=list)
rating: Optional[int] = None # Percentage 0-100
rating_count: Optional[int] = None
duration: Optional[int] = None # Minutes
country: Optional[str] = None
poster_url: Optional[str] = None
plot: Optional[str] = None
csfd_id: Optional[int] = None
def to_dict(self) -> dict:
"""Serialize to a plain dict for storage in .!tag cache."""
return {
"title": self.title,
"url": self.url,
"year": self.year,
"genres": self.genres,
"directors": self.directors,
"actors": self.actors,
"rating": self.rating,
"rating_count": self.rating_count,
"duration": self.duration,
"country": self.country,
"poster_url": self.poster_url,
"plot": self.plot,
"csfd_id": self.csfd_id,
}
@classmethod
def from_dict(cls, data: dict) -> "CSFDMovie":
"""Deserialize from a plain dict (e.g. loaded from .!tag cache)."""
return cls(
title=data.get("title", ""),
url=data.get("url", ""),
year=data.get("year"),
genres=data.get("genres", []),
directors=data.get("directors", []),
actors=data.get("actors", []),
rating=data.get("rating"),
rating_count=data.get("rating_count"),
duration=data.get("duration"),
country=data.get("country"),
poster_url=data.get("poster_url"),
plot=data.get("plot"),
csfd_id=data.get("csfd_id"),
)
def __str__(self) -> str:
parts = [self.title]
if self.year:
parts[0] += f" ({self.year})"
if self.rating is not None:
parts.append(f"Hodnocení: {self.rating}%")
if self.genres:
parts.append(f"Žánr: {', '.join(self.genres)}")
if self.directors:
parts.append(f"Režie: {', '.join(self.directors)}")
return " | ".join(parts)
def _check_dependencies():
"""Check if required dependencies are installed."""
if not HAS_DEPENDENCIES:
raise ImportError(
"CSFD module requires 'requests' and 'beautifulsoup4' packages. "
"Install them with: pip install requests beautifulsoup4"
)
def _extract_csfd_id(url: str) -> Optional[int]:
"""Extract CSFD movie ID from URL."""
match = re.search(r"/film/(\d+)", url)
return int(match.group(1)) if match else None
def _parse_duration(duration_str: str) -> Optional[int]:
"""Parse ISO 8601 duration (PT97M) to minutes."""
match = re.search(r"PT(\d+)M", duration_str)
return int(match.group(1)) if match else None
def fetch_movie(url: str) -> CSFDMovie:
"""
Fetch movie information from CSFD.cz URL.
Args:
url: Full CSFD.cz movie URL (e.g., https://www.csfd.cz/film/9423-pane-vy-jste-vdova/)
Returns:
CSFDMovie object with extracted data
Raises:
ImportError: If required dependencies are not installed
requests.RequestException: If network request fails
ValueError: If URL is invalid or page cannot be parsed
"""
_check_dependencies()
response = requests.get(url, headers=HEADERS, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Try to extract JSON-LD structured data first (most reliable)
movie_data = _extract_json_ld(soup)
# Extract additional data from HTML
movie_data["url"] = url
movie_data["csfd_id"] = _extract_csfd_id(url)
# Get rating from HTML if not in JSON-LD
if movie_data.get("rating") is None:
movie_data["rating"] = _extract_rating(soup)
# Get poster URL
if movie_data.get("poster_url") is None:
movie_data["poster_url"] = _extract_poster(soup)
# Get plot summary
if movie_data.get("plot") is None:
movie_data["plot"] = _extract_plot(soup)
# Get country and year from origin info
origin_info = _extract_origin_info(soup)
if origin_info:
if movie_data.get("country") is None:
movie_data["country"] = origin_info.get("country")
if movie_data.get("year") is None:
movie_data["year"] = origin_info.get("year")
if movie_data.get("duration") is None:
movie_data["duration"] = origin_info.get("duration")
# Get genres from HTML if not in JSON-LD
if not movie_data.get("genres"):
movie_data["genres"] = _extract_genres(soup)
return CSFDMovie(**movie_data)
def _extract_json_ld(soup: BeautifulSoup) -> dict:
"""Extract movie data from JSON-LD structured data."""
data = {
"title": "",
"year": None,
"genres": [],
"directors": [],
"actors": [],
"rating": None,
"rating_count": None,
"duration": None,
"country": None,
"poster_url": None,
"plot": None,
}
# Find JSON-LD script
script_tags = soup.find_all("script", type="application/ld+json")
for script in script_tags:
try:
json_data = json.loads(script.string)
# Handle both single object and array
if isinstance(json_data, list):
for item in json_data:
if item.get("@type") == "Movie":
json_data = item
break
else:
continue
if json_data.get("@type") != "Movie":
continue
# Title
data["title"] = json_data.get("name", "")
# Genres
genre = json_data.get("genre", [])
if isinstance(genre, str):
data["genres"] = [genre]
else:
data["genres"] = list(genre)
# Directors
directors = json_data.get("director", [])
if isinstance(directors, dict):
directors = [directors]
data["directors"] = [d.get("name", "") for d in directors if d.get("name")]
# Actors
actors = json_data.get("actor", [])
if isinstance(actors, dict):
actors = [actors]
data["actors"] = [a.get("name", "") for a in actors if a.get("name")]
# Rating
agg_rating = json_data.get("aggregateRating", {})
if agg_rating:
rating_value = agg_rating.get("ratingValue")
if rating_value is not None:
data["rating"] = round(float(rating_value))
data["rating_count"] = agg_rating.get("ratingCount")
# Duration
duration_str = json_data.get("duration", "")
if duration_str:
data["duration"] = _parse_duration(duration_str)
# Year (CSFD exposes it via dateCreated / datePublished)
for date_key in ("dateCreated", "datePublished"):
date_val = json_data.get(date_key)
if date_val:
year_match = re.search(r"(19\d{2}|20\d{2})", str(date_val))
if year_match:
data["year"] = int(year_match.group(1))
break
# Poster
image = json_data.get("image")
if image:
if isinstance(image, str):
data["poster_url"] = image
elif isinstance(image, dict):
data["poster_url"] = image.get("url")
# Description
data["plot"] = json_data.get("description")
break # Found movie data
except (json.JSONDecodeError, KeyError, TypeError):
continue
return data
def _extract_rating(soup: BeautifulSoup) -> Optional[int]:
"""Extract rating percentage from HTML."""
# Look for rating box
rating_elem = soup.select_one(".film-rating-average")
if rating_elem:
text = rating_elem.get_text(strip=True)
match = re.search(r"(\d+)%", text)
if match:
return int(match.group(1))
return None
def _extract_poster(soup: BeautifulSoup) -> Optional[str]:
"""Extract poster image URL from HTML."""
# Look for poster image
poster = soup.select_one(".film-poster img")
if poster:
src = poster.get("src") or poster.get("data-src")
if src:
if src.startswith("//"):
return "https:" + src
return src
return None
def _extract_plot(soup: BeautifulSoup) -> Optional[str]:
"""Extract plot summary from HTML."""
# Look for plot/description section
plot_elem = soup.select_one(".plot-full p")
if plot_elem:
return plot_elem.get_text(strip=True)
# Alternative: shorter plot
plot_elem = soup.select_one(".plot-preview p")
if plot_elem:
return plot_elem.get_text(strip=True)
return None
def _extract_genres(soup: BeautifulSoup) -> list[str]:
"""Extract genres from HTML."""
genres = []
genre_links = soup.select(".genres a")
for link in genre_links:
genre = link.get_text(strip=True)
if genre:
genres.append(genre)
return genres
def _extract_origin_info(soup: BeautifulSoup) -> dict:
"""Extract country, year, duration from the origin info line.
CSFD separates the values with inline bullet ``<span>`` elements (no commas),
so ``get_text(strip=True)`` would glue them together (e.g. "USA1999136 min").
We tokenize on those inline boundaries (and on commas, for the older format)
before extracting each field.
"""
info: dict = {}
origin_elem = soup.select_one(".origin")
if not origin_elem:
return info
# Split on inline element boundaries, then also on commas (older format).
raw = origin_elem.get_text(separator="|", strip=True)
tokens = [t.strip() for part in raw.split("|") for t in part.split(",")]
tokens = [t for t in tokens if t]
for token in tokens:
if "year" not in info and re.fullmatch(r"(19\d{2}|20\d{2})", token):
info["year"] = int(token)
continue
if "duration" not in info:
duration_match = re.search(r"(\d+)\s*min", token)
if duration_match:
info["duration"] = int(duration_match.group(1))
continue
# Country: first alphabetic token that doesn't start with a digit.
if "country" not in info and not token[0].isdigit() and re.search(r"[^\W\d_]", token):
info["country"] = token
return info
def search_movies(query: str, limit: int = 10) -> list[CSFDMovie]:
"""
Search for movies on CSFD.cz.
Args:
query: Search query string
limit: Maximum number of results to return
Returns:
List of CSFDMovie objects with basic info (title, url, year)
"""
_check_dependencies()
search_url = f"{CSFD_SEARCH_URL}?q={requests.utils.quote(query)}"
response = requests.get(search_url, headers=HEADERS, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
results = []
# Find movie results
movie_items = soup.select(".film-title-name, .search-result-item a[href*='/film/']")
for item in movie_items[:limit]:
href = item.get("href", "")
if "/film/" not in href:
continue
title = item.get_text(strip=True)
url = urljoin(CSFD_BASE_URL, href)
# Try to get year from sibling/parent
year = None
parent = item.find_parent(class_="article-content")
if parent:
year_elem = parent.select_one(".info")
if year_elem:
year_match = re.search(r"\((\d{4})\)", year_elem.get_text())
if year_match:
year = int(year_match.group(1))
results.append(CSFDMovie(
title=title,
url=url,
year=year,
csfd_id=_extract_csfd_id(url)
))
return results
def fetch_movie_by_id(csfd_id: int) -> CSFDMovie:
"""
Fetch movie by CSFD ID.
Args:
csfd_id: CSFD movie ID number
Returns:
CSFDMovie object with full data
"""
url = f"{CSFD_BASE_URL}/film/{csfd_id}/"
return fetch_movie(url)
+216
View File
@@ -0,0 +1,216 @@
from pathlib import Path
import json
from .tag import Tag
# Bump this when the csfd_cache schema changes to force re-fetch on next open.
CSFD_CACHE_VERSION = 1
class File:
def __init__(self, file_path: Path, tagmanager=None, index=None) -> None:
self.file_path = file_path
self.filename = file_path.name
self.metadata_filename = self.file_path.parent / f".{self.filename}.!tag"
# Optional unified pool index; when set, metadata lives there instead of
# in the sidecar file (see PoolIndex).
self.index = index
self.new = True
self.ignored = False
self.tags: list[Tag] = []
self.tagmanager = tagmanager
# new: optional date string "YYYY-MM-DD" (assigned manually)
self.date: str | None = None
# movie-library fields
self.title: str | None = None
self.csfd_link: str | None = None
# Cached CSFD data — avoids re-fetching on every open
self.csfd_cache: dict | None = None
self.get_metadata()
def get_metadata(self) -> None:
if self.index is not None:
record = self.index.get(self.file_path)
if record is None:
self._init_new_metadata()
self.save_metadata()
else:
self._apply_record(record)
return
if not self.metadata_filename.exists():
self._init_new_metadata()
self.save_metadata()
else:
self.load_metadata()
def _init_new_metadata(self) -> None:
self.new = True
self.ignored = False
self.tags = []
self.date = None
self.title = None
self.csfd_link = None
self.csfd_cache = None
if self.tagmanager:
tag = self.tagmanager.add_tag("Stav", "Nové")
self.tags.append(tag)
def _build_record(self) -> dict:
data = {
"new": self.new,
"ignored": self.ignored,
# ukládáme full_path tagů
"tags": [tag.full_path if isinstance(tag, Tag) else tag for tag in self.tags],
# date může být None
"date": self.date,
"title": self.title,
"csfd_link": self.csfd_link,
}
if self.csfd_cache is not None:
data["csfd_cache"] = {"version": CSFD_CACHE_VERSION, **self.csfd_cache}
return data
def _apply_record(self, data: dict) -> None:
self.new = data.get("new", True)
self.ignored = data.get("ignored", False)
self.tags = []
self.date = data.get("date", None)
self.title = data.get("title", None)
self.csfd_link = data.get("csfd_link", None)
raw_cache = data.get("csfd_cache")
if raw_cache and raw_cache.get("version") == CSFD_CACHE_VERSION:
self.csfd_cache = {k: v for k, v in raw_cache.items() if k != "version"}
else:
self.csfd_cache = None
if not self.tagmanager:
return
for tag_str in data.get("tags", []):
if "/" in tag_str:
category, name = tag_str.split("/", 1)
tag = self.tagmanager.add_tag(category, name)
self.tags.append(tag)
def save_metadata(self):
data = self._build_record()
if self.index is not None:
self.index.set(self.file_path, data)
return
with open(self.metadata_filename, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def load_metadata(self) -> None:
with open(self.metadata_filename, "r", encoding="utf-8") as f:
data = json.load(f)
self._apply_record(data)
def delete_metadata(self) -> None:
"""Remove this file's metadata (from the index, or its sidecar file)."""
if self.index is not None:
self.index.delete(self.file_path)
elif self.metadata_filename.exists():
self.metadata_filename.unlink()
def set_date(self, date_str: str | None):
"""Nastaví datum (např. '2025-09-25') nebo None pro smazání."""
if date_str is None or date_str == "":
self.date = None
else:
# neprvádíme složitou validaci zde; očekáváme 'YYYY-MM-DD'
self.date = date_str
self.save_metadata()
def set_csfd_link(self, url: str | None) -> None:
"""Nastaví CSFD odkaz nebo None. Invaliduje cache při změně odkazu."""
new_url = url if url else None
if new_url != self.csfd_link:
self.csfd_cache = None # odkaz se změnil — stará cache je neplatná
self.csfd_link = new_url
self.save_metadata()
def get_cached_movie(self):
"""Vrátí CSFDMovie z cache, nebo None. Nevyžaduje síť."""
if self.csfd_cache is None:
return None
try:
from .csfd import CSFDMovie
return CSFDMovie.from_dict(self.csfd_cache)
except Exception:
return None
def apply_csfd_tags(
self, add_genres: bool = True, add_year: bool = True, add_country: bool = True
) -> dict:
"""Načte informace z CSFD a přiřadí tagy (Žánr, Rok, Země); cachuje data.
Returns:
dict s klíči 'success', 'movie'/'error', 'tags_added'
"""
if not self.csfd_link:
return {"success": False, "error": "CSFD odkaz není nastaven", "tags_added": []}
try:
from .csfd import fetch_movie
movie = fetch_movie(self.csfd_link)
self.csfd_cache = movie.to_dict()
except ImportError as e:
return {"success": False, "error": f"Chybí závislosti: {e}", "tags_added": []}
except Exception as e:
return {"success": False, "error": f"Chyba při načítání CSFD: {e}", "tags_added": []}
tags_added: list[str] = []
def _add(category: str, name: str) -> None:
tag_obj = self.tagmanager.add_tag(category, name) if self.tagmanager else Tag(category, name)
if tag_obj not in self.tags:
self.tags.append(tag_obj)
tags_added.append(f"{category}/{name}")
if add_genres and movie.genres:
for genre in movie.genres:
_add("Žánr", genre)
if add_year and movie.year:
_add("Rok", str(movie.year))
if add_country and movie.country:
_add("Země", movie.country)
# Use the CSFD title if we don't have one yet
if movie.title and not self.title:
self.title = movie.title
self.save_metadata()
return {"success": True, "movie": movie, "tags_added": tags_added}
def add_tag(self, tag):
# tag může být Tag nebo string
from .tag import Tag as TagClass
if isinstance(tag, str):
if "/" in tag and self.tagmanager:
category, name = tag.split("/", 1)
tag_obj = self.tagmanager.add_tag(category, name)
else:
tag_obj = TagClass("default", tag)
elif isinstance(tag, TagClass):
tag_obj = tag
else:
return
if tag_obj not in self.tags:
self.tags.append(tag_obj)
self.save_metadata()
def remove_tag(self, tag):
# tag může být Tag nebo string (full_path)
if isinstance(tag, str):
if "/" in tag:
category, name = tag.split("/", 1)
tag_obj = Tag(category, name)
else:
tag_obj = Tag("default", tag)
elif isinstance(tag, Tag):
tag_obj = tag
else:
return
if tag_obj in self.tags:
self.tags.remove(tag_obj)
self.save_metadata()
+272
View File
@@ -0,0 +1,272 @@
from pathlib import Path
import shutil
from .file import File
from .tag_manager import TagManager
from .pool_index import PoolIndex
from .utils import list_files
from typing import Iterable
import fnmatch
from src.core.config import (
load_global_config, save_global_config,
load_folder_config, save_folder_config
)
# Top-level folders inside the managed pool
POOL_MOVIES = "Filmy"
POOL_SERIES = "Seriály"
# Curator metadata files that must never be treated as content
METADATA_SUFFIXES = (".!tag", ".!ftag", ".!gtag", ".!index")
class FileManager:
def __init__(self, tagmanager: TagManager):
self.filelist: list[File] = []
self.folders: list[Path] = []
self.tagmanager = tagmanager
self.on_files_changed = None # callback do GUI
self.global_config = load_global_config()
self.folder_configs: dict[Path, dict] = {} # folder -> config
self.current_folder: Path | None = None
self.index: PoolIndex | None = None # unified pool metadata index
# ------------------------------------------------------------------
# Pool (single source of truth) and Filmotéka (generated output)
# ------------------------------------------------------------------
@property
def pool_dir(self) -> Path | None:
value = self.global_config.get("pool_dir")
return Path(value) if value else None
@property
def movies_dir(self) -> Path | None:
pool = self.pool_dir
return pool / POOL_MOVIES if pool else None
@property
def series_dir(self) -> Path | None:
pool = self.pool_dir
return pool / POOL_SERIES if pool else None
@property
def copyasis_folders(self) -> list[str]:
"""Names of pool subfolders mirrored 1:1 (copy-as-is) into the output."""
return self.global_config.get("copyasis_folders", [POOL_SERIES])
def set_copyasis_folders(self, names: list[str]) -> None:
"""Set the copy-as-is subfolder list and persist it."""
cleaned = [n.strip() for n in names if n.strip()]
self.global_config["copyasis_folders"] = cleaned
save_global_config(self.global_config)
@property
def filmoteka_dir(self) -> Path | None:
value = self.global_config.get("filmoteka_dir")
return Path(value) if value else None
def set_pool_dir(self, pool_dir: Path) -> None:
"""Set the managed pool root and create its top-level folders."""
pool_dir = Path(pool_dir)
(pool_dir / POOL_MOVIES).mkdir(parents=True, exist_ok=True)
(pool_dir / POOL_SERIES).mkdir(parents=True, exist_ok=True)
self.global_config["pool_dir"] = str(pool_dir)
save_global_config(self.global_config)
def set_filmoteka_dir(self, filmoteka_dir: Path) -> None:
"""Set the Filmotéka output folder (generated hardlink tree)."""
self.global_config["filmoteka_dir"] = str(Path(filmoteka_dir))
save_global_config(self.global_config)
def load_pool_movies(self) -> None:
"""Reload the movie list from pool/Filmy using the unified pool index."""
self.filelist = []
movies = self.movies_dir
pool = self.pool_dir
if not (movies and movies.is_dir() and pool):
return
self.index = PoolIndex(pool)
for each in list_files(movies):
if each.name.endswith(METADATA_SUFFIXES):
continue
file_obj = File(each, self.tagmanager, index=self.index)
self.filelist.append(file_obj)
def import_movie(self, source: Path, title: str, csfd_link: str | None = None) -> File:
"""Copy a video file into pool/Filmy as 'Title.ext', index its metadata.
The original file is left in place (non-destructive copy).
"""
movies = self.movies_dir
pool = self.pool_dir
if movies is None or pool is None:
raise RuntimeError("Pool není nastaven.")
movies.mkdir(parents=True, exist_ok=True)
if self.index is None:
self.index = PoolIndex(pool)
source = Path(source)
safe_title = title.strip() or source.stem
target = movies / f"{safe_title}{source.suffix}"
# Avoid clobbering an existing movie of the same name
counter = 1
while target.exists():
target = movies / f"{safe_title}_{counter}{source.suffix}"
counter += 1
shutil.copy2(source, target)
file_obj = File(target, self.tagmanager, index=self.index)
file_obj.title = safe_title
file_obj.csfd_link = csfd_link or None
file_obj.save_metadata()
self.filelist.append(file_obj)
if self.on_files_changed:
self.on_files_changed(self.filelist)
return file_obj
def append(self, folder: Path) -> None:
"""Add a folder to scan for files"""
self.folders.append(folder)
self.current_folder = folder
# Update global config with last folder
self.global_config["last_folder"] = str(folder)
# Update recent folders list
recent = self.global_config.get("recent_folders", [])
folder_str = str(folder)
if folder_str in recent:
recent.remove(folder_str)
recent.insert(0, folder_str)
self.global_config["recent_folders"] = recent[:10] # Keep max 10
save_global_config(self.global_config)
# Load folder-specific config
folder_config = load_folder_config(folder)
self.folder_configs[folder] = folder_config
# Get ignore patterns from folder config
ignore_patterns = folder_config.get("ignore_patterns", [])
for each in list_files(folder):
# Skip all Curator metadata files (.!tag / .!ftag / .!gtag / .!index)
if each.name.endswith(METADATA_SUFFIXES):
continue
full_path = each.as_posix()
# Check against ignore patterns
if any(
fnmatch.fnmatch(each.name, pat) or fnmatch.fnmatch(full_path, pat)
for pat in ignore_patterns
):
continue
file_obj = File(each, self.tagmanager)
self.filelist.append(file_obj)
def get_folder_config(self, folder: Path = None) -> dict:
"""Get config for a folder (or current folder if not specified)"""
if folder is None:
folder = self.current_folder
if folder is None:
return {}
if folder not in self.folder_configs:
self.folder_configs[folder] = load_folder_config(folder)
return self.folder_configs[folder]
def save_folder_config(self, folder: Path = None, config: dict = None):
"""Save config for a folder"""
if folder is None:
folder = self.current_folder
if folder is None:
return
if config is None:
config = self.folder_configs.get(folder, {})
self.folder_configs[folder] = config
save_folder_config(folder, config)
def set_ignore_patterns(self, patterns: list[str], folder: Path = None):
"""Set ignore patterns for a folder"""
config = self.get_folder_config(folder)
config["ignore_patterns"] = patterns
self.save_folder_config(folder, config)
def get_ignore_patterns(self, folder: Path = None) -> list[str]:
"""Get ignore patterns for a folder"""
config = self.get_folder_config(folder)
return config.get("ignore_patterns", [])
def assign_tag_to_file_objects(self, files_objs: list[File], tag):
"""Přiřadí tag (Tag nebo 'category/name' string) ke každému souboru v seznamu."""
for f in files_objs:
if isinstance(tag, str):
if "/" in tag:
category, name = tag.split("/", 1)
tag_obj = self.tagmanager.add_tag(category, name)
else:
tag_obj = self.tagmanager.add_tag("default", tag)
else:
tag_obj = tag
if tag_obj not in f.tags:
f.tags.append(tag_obj)
f.save_metadata()
if self.on_files_changed:
self.on_files_changed(self.filelist)
def remove_tag_from_file_objects(self, files_objs: list[File], tag):
"""Odebere tag (Tag nebo 'category/name') ze všech uvedených souborů."""
for f in files_objs:
if isinstance(tag, str):
if "/" in tag:
category, name = tag.split("/", 1)
from .tag import Tag as TagClass
tag_obj = TagClass(category, name)
else:
from .tag import Tag as TagClass
tag_obj = TagClass("default", tag)
else:
tag_obj = tag
if tag_obj in f.tags:
f.tags.remove(tag_obj)
f.save_metadata()
if self.on_files_changed:
self.on_files_changed(self.filelist)
def filter_files_by_tags(self, tags: Iterable):
"""
Vrátí jen soubory, které obsahují všechny zadané tagy.
'tags' může být iterace Tag objektů nebo stringů 'category/name'.
"""
tags_list = list(tags) if tags is not None else []
if not tags_list:
return self.filelist
target_full_paths = set()
from .tag import Tag as TagClass
for t in tags_list:
if isinstance(t, TagClass):
target_full_paths.add(t.full_path)
elif isinstance(t, str):
target_full_paths.add(t)
else:
continue
filtered = []
for f in self.filelist:
file_tags = {t.full_path for t in f.tags}
if all(tag in file_tags for tag in target_full_paths):
filtered.append(f)
return filtered
# Legacy property for backwards compatibility
@property
def config(self):
"""Legacy: returns global config"""
return self.global_config
+403
View File
@@ -0,0 +1,403 @@
"""
Hardlink Manager for Curator
Creates directory structure based on file tags and creates hardlinks
to organize files without duplicating them on disk.
Example:
A file with tags "žánr/Komedie", "žánr/Akční", "rok/1988" will create:
output/
├── žánr/
│ ├── Komedie/
│ │ └── film.mkv (hardlink)
│ └── Akční/
│ └── film.mkv (hardlink)
└── rok/
└── 1988/
└── film.mkv (hardlink)
"""
import os
from pathlib import Path
from typing import List, Tuple, Optional
from .file import File
class HardlinkManager:
"""Manager for creating hardlink-based directory structures from tagged files."""
def __init__(self, output_dir: Path):
"""
Initialize HardlinkManager.
Args:
output_dir: Base directory where the tag-based structure will be created
"""
self.output_dir = Path(output_dir)
self.created_links: List[Path] = []
self.errors: List[Tuple[Path, str]] = []
def create_structure_for_files(
self,
files: List[File],
categories: Optional[List[str]] = None,
dry_run: bool = False
) -> Tuple[int, int]:
"""
Create hardlink structure for given files based on their tags.
Args:
files: List of File objects to process
categories: Optional list of categories to include (None = all)
dry_run: If True, only simulate without creating actual links
Returns:
Tuple of (successful_links, failed_links)
"""
self.created_links = []
self.errors = []
success_count = 0
fail_count = 0
for file_obj in files:
if not file_obj.tags:
continue
for tag in file_obj.tags:
# Skip if category filter is set and this category is not included
if categories is not None and tag.category not in categories:
continue
# Create target directory path: output/category/tag_name/
target_dir = self.output_dir / tag.category / tag.name
target_file = target_dir / file_obj.filename
try:
if not dry_run:
# Create directory structure
target_dir.mkdir(parents=True, exist_ok=True)
# Skip if link already exists
if target_file.exists():
# Check if it's already a hardlink to the same file
if self._is_same_file(file_obj.file_path, target_file):
continue
else:
# Different file exists, add suffix
target_file = self._get_unique_name(target_file)
# Create hardlink
os.link(file_obj.file_path, target_file)
self.created_links.append(target_file)
success_count += 1
except OSError as e:
self.errors.append((file_obj.file_path, str(e)))
fail_count += 1
return success_count, fail_count
def mirror_as_is(
self,
source_dir: Path,
subfolder: str | None = None,
dry_run: bool = False
) -> Tuple[int, int]:
"""Mirror a "copy-as-is" folder 1:1 into the output as a hardlinked clone.
Recreates the exact directory hierarchy of ``source_dir`` under
``output_dir/subfolder`` (or directly under ``output_dir`` when
``subfolder`` is None) and hardlinks every file. Curator metadata files
(``.!tag`` / ``.!ftag`` / ``.!gtag`` / ``.!index``) are skipped.
Used for Seriály: the pool structure is the source of truth and is cloned
verbatim instead of being rebuilt from tags.
Returns:
Tuple of (successful_links, failed_links)
"""
source_dir = Path(source_dir)
if not source_dir.is_dir():
return 0, 0
base = self.output_dir / subfolder if subfolder else self.output_dir
success_count = 0
fail_count = 0
for src_file in source_dir.rglob("*"):
if not src_file.is_file():
continue
if src_file.name.endswith((".!tag", ".!ftag", ".!gtag", ".!index")):
continue
target_file = base / src_file.relative_to(source_dir)
try:
if not dry_run:
target_file.parent.mkdir(parents=True, exist_ok=True)
if target_file.exists():
if self._is_same_file(src_file, target_file):
success_count += 1
continue
target_file.unlink()
os.link(src_file, target_file)
self.created_links.append(target_file)
success_count += 1
except OSError as e:
self.errors.append((src_file, str(e)))
fail_count += 1
return success_count, fail_count
def _is_same_file(self, path1: Path, path2: Path) -> bool:
"""Check if two paths point to the same file (same inode)."""
try:
return path1.stat().st_ino == path2.stat().st_ino
except OSError:
return False
def _get_unique_name(self, path: Path) -> Path:
"""Get a unique filename by adding a numeric suffix."""
stem = path.stem
suffix = path.suffix
parent = path.parent
counter = 1
while True:
new_name = f"{stem}_{counter}{suffix}"
new_path = parent / new_name
if not new_path.exists():
return new_path
counter += 1
def remove_created_links(self) -> int:
"""
Remove all hardlinks created by the last operation.
Returns:
Number of links removed
"""
removed = 0
for link_path in self.created_links:
try:
if link_path.exists() and link_path.is_file():
link_path.unlink()
removed += 1
# Try to remove empty parent directories
self._remove_empty_parents(link_path.parent)
except OSError:
pass
self.created_links = []
return removed
def _remove_empty_parents(self, path: Path) -> None:
"""Remove empty parent directories up to output_dir."""
try:
while path != self.output_dir and path.is_dir():
if any(path.iterdir()):
break # Directory not empty
path.rmdir()
path = path.parent
except OSError:
pass
def get_preview(self, files: List[File], categories: Optional[List[str]] = None) -> List[Tuple[Path, Path]]:
"""
Get a preview of what links would be created.
Args:
files: List of File objects
categories: Optional list of categories to include
Returns:
List of tuples (source_path, target_path)
"""
preview = []
for file_obj in files:
if not file_obj.tags:
continue
for tag in file_obj.tags:
if categories is not None and tag.category not in categories:
continue
target_dir = self.output_dir / tag.category / tag.name
target_file = target_dir / file_obj.filename
preview.append((file_obj.file_path, target_file))
return preview
def find_obsolete_links(
self,
files: List[File],
categories: Optional[List[str]] = None
) -> List[Tuple[Path, Path]]:
"""
Find hardlinks in the output directory that no longer match file tags.
Scans the output directory for hardlinks that point to source files,
but whose category/tag path no longer matches the file's current tags.
Args:
files: List of File objects (source files)
categories: Optional list of categories to check (None = all)
Returns:
List of tuples (link_path, source_path) for obsolete links
"""
obsolete = []
if not self.output_dir.exists():
return obsolete
# Build a map of source file inodes to File objects
inode_to_file: dict[int, File] = {}
for file_obj in files:
try:
inode = file_obj.file_path.stat().st_ino
inode_to_file[inode] = file_obj
except OSError:
continue
# Build expected paths for each file based on current tags
expected_paths: dict[int, set[Path]] = {}
for file_obj in files:
try:
inode = file_obj.file_path.stat().st_ino
expected_paths[inode] = set()
for tag in file_obj.tags:
if categories is not None and tag.category not in categories:
continue
target = self.output_dir / tag.category / tag.name / file_obj.filename
expected_paths[inode].add(target)
except OSError:
continue
# Scan output directory for existing hardlinks
for category_dir in self.output_dir.iterdir():
if not category_dir.is_dir():
continue
# Filter by categories if specified
if categories is not None and category_dir.name not in categories:
continue
for tag_dir in category_dir.iterdir():
if not tag_dir.is_dir():
continue
for link_file in tag_dir.iterdir():
if not link_file.is_file():
continue
try:
link_inode = link_file.stat().st_ino
# Check if this inode belongs to one of our source files
if link_inode in inode_to_file:
source_file = inode_to_file[link_inode]
# Check if this link path is expected
if link_inode in expected_paths:
if link_file not in expected_paths[link_inode]:
# This link exists but tag was removed
obsolete.append((link_file, source_file.file_path))
except OSError:
continue
return obsolete
def remove_obsolete_links(
self,
files: List[File],
categories: Optional[List[str]] = None,
dry_run: bool = False
) -> Tuple[int, List[Path]]:
"""
Remove hardlinks that no longer match file tags.
Args:
files: List of File objects
categories: Optional list of categories to check
dry_run: If True, only return what would be removed
Returns:
Tuple of (removed_count, list_of_removed_paths)
"""
obsolete = self.find_obsolete_links(files, categories)
removed_paths = []
if dry_run:
return len(obsolete), [link for link, _ in obsolete]
for link_path, _ in obsolete:
try:
link_path.unlink()
removed_paths.append(link_path)
# Try to remove empty parent directories
self._remove_empty_parents(link_path.parent)
except OSError:
pass
return len(removed_paths), removed_paths
def sync_structure(
self,
files: List[File],
categories: Optional[List[str]] = None,
dry_run: bool = False
) -> Tuple[int, int, int, int]:
"""
Synchronize hardlink structure with current file tags.
This will:
1. Remove hardlinks for removed tags
2. Create new hardlinks for new tags
Args:
files: List of File objects
categories: Optional list of categories to sync
dry_run: If True, only simulate
Returns:
Tuple of (created, create_failed, removed, remove_failed)
"""
# First find how many obsolete links there are
obsolete_count = len(self.find_obsolete_links(files, categories))
# Remove obsolete links
removed, removed_paths = self.remove_obsolete_links(files, categories, dry_run)
remove_failed = obsolete_count - removed if not dry_run else 0
# Then create new links
created, create_failed = self.create_structure_for_files(files, categories, dry_run)
return created, create_failed, removed, remove_failed
def create_hardlink_structure(
files: List[File],
output_dir: Path,
categories: Optional[List[str]] = None
) -> Tuple[int, int, List[Tuple[Path, str]]]:
"""
Convenience function to create hardlink structure.
Args:
files: List of File objects to process
output_dir: Base directory for output
categories: Optional list of categories to include
Returns:
Tuple of (successful_count, failed_count, errors_list)
"""
manager = HardlinkManager(output_dir)
success, fail = manager.create_structure_for_files(files, categories)
return success, fail, manager.errors
+20
View File
@@ -0,0 +1,20 @@
from typing import List
from .file import File
class ListManager:
def __init__(self):
# 'name' nebo 'date'
self.sort_mode = "name"
def set_sort(self, mode: str):
if mode in ("name", "date"):
self.sort_mode = mode
def sort_files(self, files: List[File]) -> List[File]:
if self.sort_mode == "name":
return sorted(files, key=lambda f: f.filename.lower())
else:
# sort by date (None last) — nejnovější nahoře? Zde dávám None jako ""
def date_key(f):
return (f.date is None, f.date or "")
return sorted(files, key=date_key)
+43
View File
@@ -0,0 +1,43 @@
# Module header
import sys
from .file import File
from .tag_manager import TagManager
if __name__ == "__main__":
sys.exit("This module is not intended to be executed as the main program.")
# Imports
import subprocess
from PIL import Image, ImageTk
# Functions
def load_icon(path) -> ImageTk.PhotoImage:
img = Image.open(path)
img = img.resize((16, 16), Image.Resampling.LANCZOS)
return ImageTk.PhotoImage(img)
def add_video_resolution_tag(file_obj: File, tagmanager: TagManager):
"""
Zjistí vertikální rozlišení videa a přiřadí tag Rozlišení/{výška}p.
Vyžaduje ffprobe (FFmpeg).
"""
path = str(file_obj.file_path)
try:
# ffprobe vrátí width a height ve formátu JSON
result = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "stream=width,height", "-of", "csv=p=0:s=x", path],
capture_output=True,
text=True,
check=True
)
res = result.stdout.strip() # např. "1920x1080"
if "x" not in res:
return
width, height = map(int, res.split("x"))
tag_name = f"Rozlišení/{height}p"
tag_obj = tagmanager.add_tag("Rozlišení", f"{height}p")
file_obj.add_tag(tag_obj)
print(f"Přiřazen tag {tag_name} k {file_obj.filename}")
except Exception as e:
print(f"Chyba při získávání rozlišení videa {file_obj.filename}: {e}")
+64
View File
@@ -0,0 +1,64 @@
"""
Unified metadata index for the Curator pool.
Instead of one sidecar file per movie, the whole pool keeps a single JSON index
at ``<pool>/.Curator.!index``. Curator owns the pool (it inserts/removes files
itself), so files never move behind its back and a central index is safe.
Records are keyed by the file's path relative to the pool root (POSIX form),
e.g. ``"Filmy/Matrix.mkv"`` — stable and portable across machines.
"""
import json
from pathlib import Path
INDEX_FILENAME = ".Curator.!index"
class PoolIndex:
"""Single-file JSON metadata store for all files in a pool."""
def __init__(self, pool_dir: Path) -> None:
self.pool_dir = Path(pool_dir)
self.index_path = self.pool_dir / INDEX_FILENAME
self.records: dict[str, dict] = {}
self.load()
def _key(self, file_path: Path) -> str:
"""Pool-relative POSIX key for a file path."""
path = Path(file_path)
try:
return path.relative_to(self.pool_dir).as_posix()
except ValueError:
return path.as_posix()
def load(self) -> None:
"""Load the index from disk (missing/corrupt index = empty)."""
if not self.index_path.exists():
self.records = {}
return
try:
with open(self.index_path, "r", encoding="utf-8") as f:
data = json.load(f)
self.records = data.get("movies", {})
except (json.JSONDecodeError, OSError):
self.records = {}
def save(self) -> None:
"""Persist the index to disk."""
self.pool_dir.mkdir(parents=True, exist_ok=True)
with open(self.index_path, "w", encoding="utf-8") as f:
json.dump({"movies": self.records}, f, indent=2, ensure_ascii=False)
def get(self, file_path: Path) -> dict | None:
"""Return the record for a file, or None if it is not indexed."""
return self.records.get(self._key(file_path))
def set(self, file_path: Path, data: dict) -> None:
"""Upsert a record and persist the index."""
self.records[self._key(file_path)] = data
self.save()
def delete(self, file_path: Path) -> None:
"""Remove a record (if present) and persist the index."""
if self.records.pop(self._key(file_path), None) is not None:
self.save()
+22
View File
@@ -0,0 +1,22 @@
class Tag:
def __init__(self, category: str, name: str):
self.category = category
self.name = name
@property
def full_path(self):
return f"{self.category}/{self.name}"
def __str__(self):
return self.full_path
def __repr__(self):
return f"Tag({self.full_path})"
def __eq__(self, other):
if isinstance(other, Tag):
return self.category == other.category and self.name == other.name
return False
def __hash__(self):
return hash((self.category, self.name))
+67
View File
@@ -0,0 +1,67 @@
from .tag import Tag
# Default tags that are always available (order in list = display order)
DEFAULT_TAGS = {
"Hodnocení": ["", "⭐⭐", "⭐⭐⭐", "⭐⭐⭐⭐", "⭐⭐⭐⭐⭐"],
"Barva": ["🔴 Červená", "🟠 Oranžová", "🟡 Žlutá", "🟢 Zelená", "🔵 Modrá", "🟣 Fialová"],
}
# Tag sort order for default categories (preserves display order)
DEFAULT_TAG_ORDER = {
"Hodnocení": {name: i for i, name in enumerate(DEFAULT_TAGS["Hodnocení"])},
"Barva": {name: i for i, name in enumerate(DEFAULT_TAGS["Barva"])},
}
class TagManager:
def __init__(self):
self.tags_by_category = {} # {category: set(Tag)}
self._init_default_tags()
def _init_default_tags(self):
"""Initialize default tags (ratings and colors)"""
for category, tags in DEFAULT_TAGS.items():
for tag_name in tags:
self.add_tag(category, tag_name)
def add_category(self, category: str):
if category not in self.tags_by_category:
self.tags_by_category[category] = set()
def remove_category(self, category: str):
if category in self.tags_by_category:
del self.tags_by_category[category]
def add_tag(self, category: str, name: str) -> Tag:
self.add_category(category)
tag = Tag(category, name)
self.tags_by_category[category].add(tag)
return tag
def remove_tag(self, category: str, name: str):
if category in self.tags_by_category:
tag = Tag(category, name)
self.tags_by_category[category].discard(tag)
if not self.tags_by_category[category]:
self.remove_category(category)
def get_all_tags(self):
"""Vrací list všech tagů full_path"""
return [tag.full_path for tags in self.tags_by_category.values() for tag in tags]
def get_categories(self):
return list(self.tags_by_category.keys())
def get_tags_in_category(self, category: str) -> list[Tag]:
"""Get tags in category, sorted by predefined order for default categories"""
tags = list(self.tags_by_category.get(category, []))
# Use predefined order for default categories
if category in DEFAULT_TAG_ORDER:
order = DEFAULT_TAG_ORDER[category]
tags.sort(key=lambda t: order.get(t.name, 999))
else:
# Sort alphabetically for custom categories
tags.sort(key=lambda t: t.name)
return tags
+7
View File
@@ -0,0 +1,7 @@
from pathlib import Path
def list_files(folder_path: str | Path) -> list[Path]:
folder = Path(folder_path)
if not folder.is_dir():
raise NotADirectoryError(f"{folder} není platná složka.")
return [file_path for file_path in folder.rglob("*") if file_path.is_file()]
Binary file not shown.

After

Width:  |  Height:  |  Size: 596 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 892 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 618 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 961 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 710 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 716 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

View File
+1296
View File
File diff suppressed because it is too large Load Diff
+590
View File
@@ -0,0 +1,590 @@
"""
PySide6 GUI for Curator — reframed around the Filmotéka (movie-library) workflow.
Layout:
- Toolbar / menu: configure the Pool and the Filmotéka output, import a movie,
generate the Filmotéka hardlink tree.
- Left sidebar: tag tree used as an AND-filter over the movie list.
- Center: the movie table (the contents of pool/Filmy).
- Status bar: counts and the current pool/output paths.
"""
import os
import sys
import subprocess
from pathlib import Path
from typing import List
from PySide6.QtCore import Qt
from PySide6.QtGui import QAction, QKeySequence
from PySide6.QtWidgets import (
QApplication, QMainWindow, QWidget, QSplitter, QTreeWidget, QTreeWidgetItem,
QTableWidget, QTableWidgetItem, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit,
QPushButton, QFileDialog, QMessageBox, QInputDialog, QDialog, QDialogButtonBox,
QFormLayout, QHeaderView, QMenu, QAbstractItemView,
)
from src.core.file_manager import FileManager
from src.core.tag_manager import TagManager
from src.core.file import File
from src.core.tag import Tag
from src.core.constants import APP_NAME, VERSION
from src.core.hardlink_manager import HardlinkManager
# Categories that drive the generated Filmotéka tree (see PROJECT.md)
FILMOTEKA_CATEGORIES = ["Rok", "Žánr", "Hodnocení"]
class ImportMovieDialog(QDialog):
"""Collect the Title and ČSFD link for a movie being imported into the pool."""
def __init__(self, parent: QWidget, default_title: str) -> None:
super().__init__(parent)
self.setWindowTitle("Importovat film do poolu")
self.setMinimumWidth(420)
layout = QVBoxLayout(self)
form = QFormLayout()
self.title_edit = QLineEdit(default_title)
self.csfd_edit = QLineEdit()
self.csfd_edit.setPlaceholderText("https://www.csfd.cz/film/...")
form.addRow("Název:", self.title_edit)
form.addRow("ČSFD odkaz:", self.csfd_edit)
layout.addLayout(form)
buttons = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
buttons.accepted.connect(self.accept)
buttons.rejected.connect(self.reject)
layout.addWidget(buttons)
@property
def title(self) -> str:
return self.title_edit.text().strip()
@property
def csfd_link(self) -> str:
return self.csfd_edit.text().strip()
class AssignTagsDialog(QDialog):
"""Tri-state bulk tag assignment over the selected movies.
Result maps full_path -> 1 (assign), 0 (remove), 2 (leave mixed/unchanged).
"""
def __init__(self, parent: QWidget, tagmanager: TagManager, files: List[File]) -> None:
super().__init__(parent)
self.setWindowTitle("Přiřadit štítky")
self.setMinimumSize(380, 480)
self.result_map: dict[str, int] = {}
file_tag_sets = [{t.full_path for t in f.tags} for f in files]
layout = QVBoxLayout(self)
layout.addWidget(QLabel(f"Vybráno filmů: {len(files)}"))
self.tree = QTreeWidget()
self.tree.setHeaderHidden(True)
layout.addWidget(self.tree)
self._items: list[tuple[str, QTreeWidgetItem]] = []
for category in self.tagmanager_categories(tagmanager):
cat_item = QTreeWidgetItem([category])
cat_item.setFlags(Qt.ItemIsEnabled)
self.tree.addTopLevelItem(cat_item)
cat_item.setExpanded(True)
for tag in tagmanager.get_tags_in_category(category):
have = sum(1 for s in file_tag_sets if tag.full_path in s)
if have == 0:
state = Qt.Unchecked
elif have == len(files):
state = Qt.Checked
else:
state = Qt.PartiallyChecked
item = QTreeWidgetItem([tag.name])
item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled | Qt.ItemIsTristate)
item.setCheckState(0, state)
cat_item.addChild(item)
self._items.append((tag.full_path, item))
buttons = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
buttons.accepted.connect(self.accept)
buttons.rejected.connect(self.reject)
layout.addWidget(buttons)
@staticmethod
def tagmanager_categories(tagmanager: TagManager) -> List[str]:
return sorted(tagmanager.get_categories())
def accept(self) -> None:
mapping = {Qt.Checked: 1, Qt.Unchecked: 0, Qt.PartiallyChecked: 2}
self.result_map = {fp: mapping[item.checkState(0)] for fp, item in self._items}
super().accept()
class QtApp(QMainWindow):
def __init__(self, filehandler: FileManager, tagmanager: TagManager) -> None:
super().__init__()
self.filehandler = filehandler
self.tagmanager = tagmanager
self.file_rows: dict[int, File] = {} # table row -> File
self.filehandler.on_files_changed = lambda _=None: self.refresh_table()
self.setWindowTitle(f"{APP_NAME} {VERSION} — Filmotéka")
geometry = self.filehandler.global_config.get("window_geometry", "1200x800")
try:
w, h = (int(x) for x in geometry.lower().split("x"))
self.resize(w, h)
except Exception:
self.resize(1200, 800)
self._build_menu()
self._build_central()
self._build_statusbar()
# Load the pool if one is configured
if self.filehandler.movies_dir:
self.filehandler.load_pool_movies()
self.refresh_sidebar()
self.refresh_table()
# ------------------------------------------------------------------
# UI construction
# ------------------------------------------------------------------
def _build_menu(self) -> None:
bar = self.menuBar()
pool_menu = bar.addMenu("&Pool")
self._add_action(pool_menu, "Nastavit pool…", self.set_pool, "Ctrl+P")
self._add_action(pool_menu, "Nastavit Filmotéku (výstup)…", self.set_filmoteka)
pool_menu.addSeparator()
self._add_action(pool_menu, "Znovu načíst pool", self.reload_pool, "F5")
pool_menu.addSeparator()
self._add_action(pool_menu, "Konec", self.close, "Ctrl+Q")
movie_menu = bar.addMenu("&Filmy")
self._add_action(movie_menu, "Importovat film…", self.import_movie, "Ctrl+I")
self._add_action(movie_menu, "Přiřadit štítky…", self.assign_tags, "Ctrl+T")
self._add_action(movie_menu, "Nastavit datum…", self.set_date, "Ctrl+D")
self._add_action(movie_menu, "Upravit ČSFD odkaz…", self.edit_csfd)
self._add_action(movie_menu, "Načíst tagy z ČSFD", self.apply_csfd_tags_for_selected)
movie_menu.addSeparator()
self._add_action(movie_menu, "Odebrat z poolu…", self.remove_movies, "Del")
film_menu = bar.addMenu("Filmo&téka")
self._add_action(film_menu, "Vygenerovat Filmotéku", self.generate_filmoteka, "Ctrl+G")
self._add_action(film_menu, "Copy-as-is složky…", self.edit_copyasis_folders)
def _add_action(self, menu: QMenu, text: str, slot, shortcut: str | None = None) -> QAction:
action = QAction(text, self)
if shortcut:
action.setShortcut(QKeySequence(shortcut))
action.triggered.connect(slot)
menu.addAction(action)
return action
def _build_central(self) -> None:
splitter = QSplitter(Qt.Horizontal)
# Sidebar — tag filter
sidebar = QWidget()
side_layout = QVBoxLayout(sidebar)
side_layout.setContentsMargins(4, 4, 4, 4)
side_layout.addWidget(QLabel("📂 Štítky (filtr)"))
self.tag_tree = QTreeWidget()
self.tag_tree.setHeaderHidden(True)
self.tag_tree.itemChanged.connect(self._on_tag_filter_changed)
side_layout.addWidget(self.tag_tree)
splitter.addWidget(sidebar)
# Main — search + movie table
main = QWidget()
main_layout = QVBoxLayout(main)
main_layout.setContentsMargins(4, 4, 4, 4)
search_row = QHBoxLayout()
search_row.addWidget(QLabel("🔍"))
self.search_edit = QLineEdit()
self.search_edit.setPlaceholderText("Hledat film…")
self.search_edit.textChanged.connect(self.refresh_table)
search_row.addWidget(self.search_edit)
import_btn = QPushButton(" Importovat film")
import_btn.clicked.connect(self.import_movie)
search_row.addWidget(import_btn)
main_layout.addLayout(search_row)
self.table = QTableWidget(0, 5)
self.table.setHorizontalHeaderLabels(["Název", "Datum", "Štítky", "Velikost", "ČSFD"])
self.table.setSelectionBehavior(QAbstractItemView.SelectRows)
self.table.setSelectionMode(QAbstractItemView.ExtendedSelection)
self.table.setEditTriggers(QAbstractItemView.NoEditTriggers)
self.table.setContextMenuPolicy(Qt.CustomContextMenu)
self.table.customContextMenuRequested.connect(self._show_table_menu)
self.table.doubleClicked.connect(lambda _: self.open_movies())
self.table.itemSelectionChanged.connect(self._update_selection_status)
header = self.table.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.Stretch)
header.setSectionResizeMode(2, QHeaderView.Stretch)
main_layout.addWidget(self.table)
splitter.addWidget(main)
splitter.setStretchFactor(0, 0)
splitter.setStretchFactor(1, 1)
splitter.setSizes([260, 940])
self.setCentralWidget(splitter)
def _build_statusbar(self) -> None:
self.status = self.statusBar()
self._update_path_status()
# ------------------------------------------------------------------
# Sidebar (tag filter)
# ------------------------------------------------------------------
def refresh_sidebar(self) -> None:
self.tag_tree.blockSignals(True)
self.tag_tree.clear()
counts: dict[str, int] = {}
for f in self.filehandler.filelist:
for t in f.tags:
counts[t.full_path] = counts.get(t.full_path, 0) + 1
for category in self.tagmanager.get_categories():
cat_item = QTreeWidgetItem([category])
cat_item.setFlags(Qt.ItemIsEnabled)
self.tag_tree.addTopLevelItem(cat_item)
cat_item.setExpanded(True)
for tag in self.tagmanager.get_tags_in_category(category):
count = counts.get(tag.full_path, 0)
label = f"{tag.name} ({count})" if count else tag.name
item = QTreeWidgetItem([label])
item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled)
item.setCheckState(0, Qt.Unchecked)
item.setData(0, Qt.UserRole, tag.full_path)
cat_item.addChild(item)
self.tag_tree.blockSignals(False)
def _on_tag_filter_changed(self, _item, _col) -> None:
self.refresh_table()
def _checked_filter_tags(self) -> List[Tag]:
tags: List[Tag] = []
for i in range(self.tag_tree.topLevelItemCount()):
cat = self.tag_tree.topLevelItem(i)
for j in range(cat.childCount()):
child = cat.child(j)
if child.checkState(0) == Qt.Checked:
full_path = child.data(0, Qt.UserRole)
category, name = full_path.split("/", 1)
tags.append(Tag(category, name))
return tags
# ------------------------------------------------------------------
# Movie table
# ------------------------------------------------------------------
def refresh_table(self, *_args) -> None:
filtered = self.filehandler.filter_files_by_tags(self._checked_filter_tags())
search = self.search_edit.text().lower() if hasattr(self, "search_edit") else ""
if search:
filtered = [f for f in filtered if search in (f.title or f.filename).lower()]
filtered.sort(key=lambda f: (f.title or f.filename).lower())
self.table.setRowCount(len(filtered))
self.file_rows.clear()
for row, f in enumerate(filtered):
self.file_rows[row] = f
name = f.title or f.filename
tags = ", ".join(t.name for t in f.tags)
try:
size = self._format_size(f.file_path.stat().st_size)
except OSError:
size = "?"
csfd = "🔗" if f.csfd_link else ""
for col, value in enumerate([name, f.date or "", tags, size, csfd]):
self.table.setItem(row, col, QTableWidgetItem(value))
self.refresh_sidebar()
self._update_selection_status()
self.status.showMessage(f"Zobrazeno {len(filtered)} filmů", 4000)
@staticmethod
def _format_size(size_bytes: float) -> str:
for unit in ["B", "KB", "MB", "GB"]:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"
def _selected_movies(self) -> List[File]:
rows = {idx.row() for idx in self.table.selectionModel().selectedRows()}
return [self.file_rows[r] for r in sorted(rows) if r in self.file_rows]
def _show_table_menu(self, pos) -> None:
menu = QMenu(self)
menu.addAction("Otevřít", self.open_movies)
menu.addAction("Přiřadit štítky…", self.assign_tags)
menu.addAction("Nastavit datum…", self.set_date)
menu.addAction("Upravit ČSFD odkaz…", self.edit_csfd)
menu.addAction("Načíst tagy z ČSFD", self.apply_csfd_tags_for_selected)
menu.addSeparator()
menu.addAction("Odebrat z poolu…", self.remove_movies)
menu.exec(self.table.viewport().mapToGlobal(pos))
def _update_selection_status(self) -> None:
files = self._selected_movies()
if not files:
self._update_path_status()
return
total = 0
for f in files:
try:
total += f.file_path.stat().st_size
except OSError:
pass
self.status.showMessage(f"{len(files)} vybráno — {self._format_size(total)}")
def _update_path_status(self) -> None:
pool = self.filehandler.pool_dir
out = self.filehandler.filmoteka_dir
self.status.showMessage(
f"Pool: {pool or ''} | Filmotéka: {out or ''} | "
f"{len(self.filehandler.filelist)} filmů"
)
# ------------------------------------------------------------------
# Actions
# ------------------------------------------------------------------
def set_pool(self) -> None:
folder = QFileDialog.getExistingDirectory(self, "Vyber složku poolu")
if not folder:
return
self.filehandler.set_pool_dir(Path(folder))
self.filehandler.load_pool_movies()
self.refresh_table()
self._update_path_status()
def set_filmoteka(self) -> None:
folder = QFileDialog.getExistingDirectory(self, "Vyber výstupní složku Filmotéky")
if not folder:
return
self.filehandler.set_filmoteka_dir(Path(folder))
self._update_path_status()
def reload_pool(self) -> None:
if not self.filehandler.movies_dir:
QMessageBox.information(self, "Pool", "Nejprve nastavte pool.")
return
self.filehandler.load_pool_movies()
self.refresh_table()
def import_movie(self) -> None:
if not self.filehandler.movies_dir:
QMessageBox.warning(self, "Pool", "Nejprve nastavte pool (menu Pool → Nastavit pool).")
return
path, _ = QFileDialog.getOpenFileName(self, "Vyber video soubor")
if not path:
return
source = Path(path)
dialog = ImportMovieDialog(self, default_title=source.stem)
if dialog.exec() != QDialog.Accepted:
return
try:
movie = self.filehandler.import_movie(source, dialog.title, dialog.csfd_link)
except Exception as exc: # noqa: BLE001 — surface any import failure to the user
QMessageBox.critical(self, "Chyba importu", str(exc))
return
# If a ČSFD link was given, enrich the movie with tags right away
if movie.csfd_link:
self.status.showMessage("Načítám z ČSFD…")
QApplication.setOverrideCursor(Qt.WaitCursor)
try:
_, tags_total, errors = self._fetch_csfd_for([movie])
finally:
QApplication.restoreOverrideCursor()
if errors:
QMessageBox.warning(self, "ČSFD", "Tagy se nepodařilo načíst:\n" + errors[0])
else:
self.status.showMessage(
f"Importováno: {movie.title} (+{tags_total} tagů z ČSFD)", 5000
)
self.refresh_table()
self.status.showMessage(f"Importováno: {dialog.title}", 5000)
def open_movies(self) -> None:
for f in self._selected_movies():
self._open_path(f.file_path)
def _open_path(self, path: Path) -> None:
try:
if sys.platform.startswith("win"):
os.startfile(path) # type: ignore[attr-defined]
elif sys.platform.startswith("darwin"):
subprocess.call(["open", str(path)])
else:
subprocess.call(["xdg-open", str(path)])
except Exception as exc: # noqa: BLE001
QMessageBox.critical(self, "Chyba", f"Nelze otevřít {path}: {exc}")
def assign_tags(self) -> None:
files = self._selected_movies()
if not files:
QMessageBox.information(self, "Štítky", "Nejprve vyberte filmy.")
return
dialog = AssignTagsDialog(self, self.tagmanager, files)
if dialog.exec() != QDialog.Accepted:
return
for full_path, state in dialog.result_map.items():
category, name = full_path.split("/", 1)
if state == 1:
self.filehandler.assign_tag_to_file_objects(files, self.tagmanager.add_tag(category, name))
elif state == 0:
self.filehandler.remove_tag_from_file_objects(files, Tag(category, name))
self.refresh_table()
def set_date(self) -> None:
files = self._selected_movies()
if not files:
QMessageBox.information(self, "Datum", "Nejprve vyberte filmy.")
return
text, ok = QInputDialog.getText(self, "Nastavit datum", "Datum (YYYY-MM-DD, prázdné = smazat):")
if not ok:
return
for f in files:
f.set_date(text.strip() or None)
self.refresh_table()
def edit_csfd(self) -> None:
files = self._selected_movies()
if len(files) != 1:
QMessageBox.information(self, "ČSFD", "Vyberte právě jeden film.")
return
f = files[0]
text, ok = QInputDialog.getText(self, "ČSFD odkaz", "URL:", text=f.csfd_link or "")
if not ok:
return
f.set_csfd_link(text.strip() or None)
self.refresh_table()
def apply_csfd_tags_for_selected(self) -> None:
files = [f for f in self._selected_movies() if f.csfd_link]
if not files:
QMessageBox.information(
self, "ČSFD", "Vyberte filmy, které mají nastavený ČSFD odkaz."
)
return
self.status.showMessage(f"Načítám z ČSFD ({len(files)})…")
QApplication.setOverrideCursor(Qt.WaitCursor)
try:
ok_count, tags_total, errors = self._fetch_csfd_for(files)
finally:
QApplication.restoreOverrideCursor()
self.refresh_table()
msg = f"Načteno: {ok_count}/{len(files)} filmů, přidáno {tags_total} tagů."
if errors:
msg += "\n\nChyby:\n" + "\n".join(errors[:5])
QMessageBox.warning(self, "ČSFD dokončeno s chybami", msg)
else:
QMessageBox.information(self, "ČSFD", msg)
def _fetch_csfd_for(self, files) -> tuple[int, int, list[str]]:
"""Apply CSFD tags to each file; return (ok_count, tags_added, errors)."""
ok_count = 0
tags_total = 0
errors: list[str] = []
for f in files:
result = f.apply_csfd_tags()
if result["success"]:
ok_count += 1
tags_total += len(result["tags_added"])
else:
errors.append(f"{f.title or f.filename}: {result['error']}")
return ok_count, tags_total, errors
def remove_movies(self) -> None:
files = self._selected_movies()
if not files:
return
answer = QMessageBox.question(
self, "Odebrat z poolu",
f"Smazat {len(files)} film(ů) z poolu včetně souboru a metadat?",
)
if answer != QMessageBox.Yes:
return
for f in files:
try:
f.delete_metadata()
if f.file_path.exists():
f.file_path.unlink()
except OSError as exc:
QMessageBox.warning(self, "Chyba", f"Nelze smazat {f.filename}: {exc}")
if f in self.filehandler.filelist:
self.filehandler.filelist.remove(f)
self.refresh_table()
def generate_filmoteka(self) -> None:
out = self.filehandler.filmoteka_dir
if not out:
QMessageBox.warning(self, "Filmotéka", "Nejprve nastavte výstupní složku Filmotéky.")
return
files = self.filehandler.filelist
if not files:
QMessageBox.information(self, "Filmotéka", "Pool je prázdný.")
return
manager = HardlinkManager(out)
created, create_fail, removed, remove_fail = manager.sync_structure(files, FILMOTEKA_CATEGORIES)
# Copy-as-is folders (e.g. Seriály): mirror each 1:1 (hardlinked)
pool = self.filehandler.pool_dir
mirrored = 0
mirror_fail = 0
for name in self.filehandler.copyasis_folders:
src = pool / name if pool else None
if src and src.is_dir() and any(src.iterdir()):
m_created, m_failed = manager.mirror_as_is(src, name)
mirrored += m_created
mirror_fail += m_failed
msg = (
f"Filmy — vytvořeno: {created}, odebráno zastaralých: {removed}\n"
f"Copy-as-is — zrcadleno: {mirrored}"
)
if create_fail or remove_fail or mirror_fail:
msg += f"\nSelhalo: {create_fail + remove_fail + mirror_fail}"
QMessageBox.warning(self, "Filmotéka dokončena s chybami", msg)
else:
QMessageBox.information(self, "Filmotéka vygenerována", msg)
self.status.showMessage(
f"Filmotéka: filmy +{created}/-{removed}, copy-as-is +{mirrored}", 5000
)
def edit_copyasis_folders(self) -> None:
current = ", ".join(self.filehandler.copyasis_folders)
text, ok = QInputDialog.getText(
self, "Copy-as-is složky",
"Názvy pool podsložek zrcadlených 1:1 (oddělené čárkou):", text=current
)
if not ok:
return
self.filehandler.set_copyasis_folders(text.split(","))
self.status.showMessage(
"Copy-as-is složky: " + ", ".join(self.filehandler.copyasis_folders), 5000
)
def closeEvent(self, event) -> None: # noqa: N802 — Qt override
self.filehandler.global_config["window_geometry"] = f"{self.width()}x{self.height()}"
from src.core.config import save_global_config
save_global_config(self.filehandler.global_config)
super().closeEvent(event)
def run(filehandler: FileManager, tagmanager: TagManager) -> None:
app = QApplication.instance() or QApplication(sys.argv)
window = QtApp(filehandler, tagmanager)
window.show()
app.exec()