Add ČSFD Anubis bypass, drop legacy preset tags, rename Země → Země původu

This commit is contained in:
2026-06-12 20:30:14 +02:00
parent 22a14b1e41
commit 86c689b9f1
14 changed files with 349 additions and 146 deletions
+116 -5
View File
@@ -8,10 +8,14 @@ from __future__ import annotations
import re
import json
import time
import hashlib
from dataclasses import dataclass, field
from typing import Optional, TYPE_CHECKING
from urllib.parse import urljoin
from loguru import logger
try:
import requests
from bs4 import BeautifulSoup
@@ -34,6 +38,16 @@ HEADERS = {
"Accept-Language": "cs,en;q=0.9",
}
# Anubis is the proof-of-work anti-bot wall ČSFD now puts in front of every page.
# A plain request gets a 200 with a JS challenge page (title "Ujišťujeme se, že
# nejste robot!") instead of the movie, so JSON-LD/genres/year all parse empty.
# We detect that page, solve the PoW the way the bundled worker JS does, and
# replay the request through the same session to obtain the auth cookie.
ANUBIS_CHALLENGE_MARKER = 'id="anubis_challenge"'
ANUBIS_PASS_PATH = "/.within.website/x/cmd/anubis/api/pass-challenge"
# Safety cap so a difficulty bump can never spin forever (difficulty 1 needs ~16).
ANUBIS_MAX_NONCE = 50_000_000
@dataclass
class CSFDMovie:
@@ -123,12 +137,103 @@ def _parse_duration(duration_str: str) -> Optional[int]:
return int(match.group(1)) if match else None
def fetch_movie(url: str) -> CSFDMovie:
def _extract_json_blob(html: str, element_id: str):
"""Return the parsed JSON from an Anubis ``<script id=...>`` blob, or None."""
match = re.search(
rf'<script id="{re.escape(element_id)}" type="application/json">(.*?)</script>',
html,
re.S,
)
if not match:
return None
try:
return json.loads(match.group(1))
except json.JSONDecodeError:
return None
def _solve_anubis_pow(random_data: str, difficulty: int) -> tuple[str, int, int]:
"""Brute-force the Anubis proof-of-work.
Mirrors the bundled ``sha256-purejs`` worker: find the smallest ``nonce``
such that ``sha256(random_data + str(nonce))`` has ``difficulty`` leading
zero nibbles. Returns ``(hash_hex, nonce, elapsed_ms)``.
"""
full_zero_bytes = difficulty // 2
needs_half_byte = difficulty % 2 != 0
start = time.monotonic()
for nonce in range(ANUBIS_MAX_NONCE):
digest = hashlib.sha256(f"{random_data}{nonce}".encode()).digest()
if any(digest[i] != 0 for i in range(full_zero_bytes)):
continue
if needs_half_byte and digest[full_zero_bytes] >> 4 != 0:
continue
elapsed_ms = int((time.monotonic() - start) * 1000)
return digest.hex(), nonce, elapsed_ms
raise ValueError(
f"Anubis PoW unsolved within {ANUBIS_MAX_NONCE} attempts (difficulty {difficulty})"
)
def _solve_anubis_challenge(session, html: str, url: str):
"""Solve the Anubis challenge in ``html`` and return the real page response.
Posts the proof-of-work back to the pass-challenge endpoint through
``session`` (which stores the resulting auth cookie) and follows the
redirect to the originally requested page.
"""
payload = _extract_json_blob(html, "anubis_challenge")
if not payload:
raise ValueError("ČSFD anti-bot stránka bez čitelné Anubis challenge")
rules = payload.get("rules", {})
challenge = payload.get("challenge", {})
random_data = challenge.get("randomData")
difficulty = int(rules.get("difficulty", 1))
if not random_data:
raise ValueError("Anubis challenge neobsahuje randomData")
base_prefix = _extract_json_blob(html, "anubis_base_prefix") or ""
logger.debug(f"Solving Anubis challenge (difficulty {difficulty}) for {url}")
hash_hex, nonce, elapsed_ms = _solve_anubis_pow(random_data, difficulty)
logger.debug(f"Anubis solved: nonce={nonce}, elapsed={elapsed_ms}ms")
pass_url = urljoin(CSFD_BASE_URL, f"{base_prefix}{ANUBIS_PASS_PATH}")
response = session.get(
pass_url,
params={
"id": challenge.get("id"),
"response": hash_hex,
"nonce": nonce,
"redir": url,
"elapsedTime": elapsed_ms,
},
headers=HEADERS,
timeout=10,
)
response.raise_for_status()
if ANUBIS_CHALLENGE_MARKER in response.text:
raise ValueError("ČSFD Anubis challenge se nepodařilo vyřešit (odmítnuto)")
return response
def _get_page(session, url: str):
"""GET ``url`` through ``session``, transparently clearing an Anubis wall."""
response = session.get(url, headers=HEADERS, timeout=10)
response.raise_for_status()
if ANUBIS_CHALLENGE_MARKER in response.text:
response = _solve_anubis_challenge(session, response.text, url)
return response
def fetch_movie(url: str, session=None) -> CSFDMovie:
"""
Fetch movie information from CSFD.cz URL.
Args:
url: Full CSFD.cz movie URL (e.g., https://www.csfd.cz/film/9423-pane-vy-jste-vdova/)
session: Optional ``requests.Session`` to reuse (keeps the Anubis auth
cookie across calls so only the first fetch pays the PoW cost).
Returns:
CSFDMovie object with extracted data
@@ -140,8 +245,14 @@ def fetch_movie(url: str) -> CSFDMovie:
"""
_check_dependencies()
response = requests.get(url, headers=HEADERS, timeout=10)
response.raise_for_status()
own_session = session is None
if own_session:
session = requests.Session()
try:
response = _get_page(session, url)
finally:
if own_session:
session.close()
soup = BeautifulSoup(response.text, "html.parser")
@@ -378,8 +489,8 @@ def search_movies(query: str, limit: int = 10) -> list[CSFDMovie]:
_check_dependencies()
search_url = f"{CSFD_SEARCH_URL}?q={requests.utils.quote(query)}"
response = requests.get(search_url, headers=HEADERS, timeout=10)
response.raise_for_status()
with requests.Session() as session:
response = _get_page(session, search_url)
soup = BeautifulSoup(response.text, "html.parser")
results = []
+2 -5
View File
@@ -51,9 +51,6 @@ class File:
self.title = None
self.csfd_link = None
self.csfd_cache = None
if self.tagmanager:
tag = self.tagmanager.add_tag("Stav", "Nové")
self.tags.append(tag)
def _build_record(self) -> dict:
data = {
@@ -142,7 +139,7 @@ class File:
def apply_csfd_tags(
self, add_genres: bool = True, add_year: bool = True, add_country: bool = True
) -> dict:
"""Načte informace z CSFD a přiřadí tagy (Žánr, Rok, Země); cachuje data.
"""Načte informace z CSFD a přiřadí tagy (Žánr, Rok, Země původu); cachuje data.
Returns:
dict s klíči 'success', 'movie'/'error', 'tags_added'
@@ -173,7 +170,7 @@ class File:
if add_year and movie.year:
_add("Rok", str(movie.year))
if add_country and movie.country:
_add("Země", movie.country)
_add("Země původu", movie.country)
# Use the CSFD title if we don't have one yet
if movie.title and not self.title:
+8 -8
View File
@@ -1,15 +1,15 @@
from .tag import Tag
# Default tags that are always available (order in list = display order)
DEFAULT_TAGS = {
"Hodnocení": ["", "⭐⭐", "⭐⭐⭐", "⭐⭐⭐⭐", "⭐⭐⭐⭐⭐"],
"Barva": ["🔴 Červená", "🟠 Oranžová", "🟡 Žlutá", "🟢 Zelená", "🔵 Modrá", "🟣 Fialová"],
}
# Default tags that are always available (order in list = display order).
# The legacy Tagger presets (Hodnocení / Barva) were removed for Curator; the
# pool is driven by ČSFD-derived tags (Žánr / Rok / Země původu). Add entries here to
# reintroduce always-available predefined tags.
DEFAULT_TAGS: dict[str, list[str]] = {}
# Tag sort order for default categories (preserves display order)
DEFAULT_TAG_ORDER = {
"Hodnocení": {name: i for i, name in enumerate(DEFAULT_TAGS["Hodnocení"])},
"Barva": {name: i for i, name in enumerate(DEFAULT_TAGS["Barva"])},
DEFAULT_TAG_ORDER: dict[str, dict[str, int]] = {
category: {name: i for i, name in enumerate(names)}
for category, names in DEFAULT_TAGS.items()
}
+7 -8
View File
@@ -31,7 +31,7 @@ from src.core.constants import APP_NAME, VERSION
from src.core.hardlink_manager import HardlinkManager
# Categories that drive the generated Filmotéka tree (see PROJECT.md)
FILMOTEKA_CATEGORIES = ["Rok", "Žánr", "Hodnocení"]
FILMOTEKA_CATEGORIES = ["Rok", "Žánr", "Země původu", "Hodnocení"]
class ImportMovieDialog(QDialog):
@@ -101,7 +101,7 @@ class AssignTagsDialog(QDialog):
else:
state = Qt.PartiallyChecked
item = QTreeWidgetItem([tag.name])
item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled | Qt.ItemIsTristate)
item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled | Qt.ItemIsAutoTristate)
item.setCheckState(0, state)
cat_item.addChild(item)
self._items.append((tag.full_path, item))
@@ -213,8 +213,8 @@ class QtApp(QMainWindow):
search_row.addWidget(import_btn)
main_layout.addLayout(search_row)
self.table = QTableWidget(0, 5)
self.table.setHorizontalHeaderLabels(["Název", "Datum", "Štítky", "Velikost", "ČSFD"])
self.table = QTableWidget(0, 3)
self.table.setHorizontalHeaderLabels(["Název", "Štítky", "Velikost"])
self.table.setSelectionBehavior(QAbstractItemView.SelectRows)
self.table.setSelectionMode(QAbstractItemView.ExtendedSelection)
self.table.setEditTriggers(QAbstractItemView.NoEditTriggers)
@@ -223,8 +223,8 @@ class QtApp(QMainWindow):
self.table.doubleClicked.connect(lambda _: self.open_movies())
self.table.itemSelectionChanged.connect(self._update_selection_status)
header = self.table.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.Stretch)
header.setSectionResizeMode(2, QHeaderView.Stretch)
header.setSectionResizeMode(0, QHeaderView.Stretch) # Název
header.setSectionResizeMode(1, QHeaderView.Stretch) # Štítky
main_layout.addWidget(self.table)
splitter.addWidget(main)
@@ -300,8 +300,7 @@ class QtApp(QMainWindow):
size = self._format_size(f.file_path.stat().st_size)
except OSError:
size = "?"
csfd = "🔗" if f.csfd_link else ""
for col, value in enumerate([name, f.date or "", tags, size, csfd]):
for col, value in enumerate([name, tags, size]):
self.table.setItem(row, col, QTableWidgetItem(value))
self.refresh_sidebar()