Initial release — PySide6 app for automatic GOG offline installer management
This commit is contained in:
129
src/auth.py
Normal file
129
src/auth.py
Normal file
@@ -0,0 +1,129 @@
|
||||
"""GOG OAuth2 authentication manager."""
|
||||
|
||||
import json
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
from loguru import logger
|
||||
|
||||
GOG_AUTH_URL = "https://auth.gog.com"
|
||||
CLIENT_ID = "46899977096215655"
|
||||
CLIENT_SECRET = "9d85c43b1482497dbbce61f6e4aa173a433796eeae2ca8c5f6129f2dc4de46d9"
|
||||
REDIRECT_URI = "https://embed.gog.com/on_login_success?origin=client"
|
||||
|
||||
LOGIN_URL = (
|
||||
f"https://login.gog.com/auth?client_id={CLIENT_ID}"
|
||||
f"&redirect_uri={REDIRECT_URI}"
|
||||
f"&response_type=code&layout=popup"
|
||||
)
|
||||
|
||||
|
||||
class AuthManager:
|
||||
"""Manages GOG OAuth2 tokens — exchange, refresh, persistence."""
|
||||
|
||||
def __init__(self, config_dir: Path) -> None:
|
||||
self.config_dir = config_dir
|
||||
self.auth_file = config_dir / "auth.json"
|
||||
self.credentials: dict = {}
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({"User-Agent": "GOGUpdater/0.1"})
|
||||
self._load()
|
||||
|
||||
def _load(self) -> None:
|
||||
if self.auth_file.exists():
|
||||
try:
|
||||
self.credentials = json.loads(self.auth_file.read_text(encoding="utf-8"))
|
||||
except (json.JSONDecodeError, OSError):
|
||||
logger.warning("Failed to read auth.json, starting fresh")
|
||||
self.credentials = {}
|
||||
|
||||
def _save(self) -> None:
|
||||
self.config_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.auth_file.write_text(json.dumps(self.credentials, indent=2), encoding="utf-8")
|
||||
|
||||
@property
|
||||
def is_logged_in(self) -> bool:
|
||||
return bool(self.credentials.get("access_token"))
|
||||
|
||||
@property
|
||||
def is_expired(self) -> bool:
|
||||
if not self.is_logged_in:
|
||||
return True
|
||||
login_time = self.credentials.get("login_time", 0)
|
||||
expires_in = self.credentials.get("expires_in", 0)
|
||||
return time.time() >= login_time + expires_in
|
||||
|
||||
@property
|
||||
def access_token(self) -> str | None:
|
||||
if not self.is_logged_in:
|
||||
return None
|
||||
if self.is_expired:
|
||||
if not self.refresh():
|
||||
return None
|
||||
return self.credentials.get("access_token")
|
||||
|
||||
def exchange_code(self, code: str) -> bool:
|
||||
"""Exchange authorization code for tokens."""
|
||||
url = (
|
||||
f"{GOG_AUTH_URL}/token"
|
||||
f"?client_id={CLIENT_ID}"
|
||||
f"&client_secret={CLIENT_SECRET}"
|
||||
f"&grant_type=authorization_code"
|
||||
f"&redirect_uri={REDIRECT_URI}"
|
||||
f"&code={code}"
|
||||
)
|
||||
try:
|
||||
response = self.session.get(url, timeout=15)
|
||||
except (requests.ConnectionError, requests.Timeout):
|
||||
logger.error("Failed to exchange authorization code — network error")
|
||||
return False
|
||||
|
||||
if not response.ok:
|
||||
logger.error(f"Failed to exchange authorization code — HTTP {response.status_code}")
|
||||
return False
|
||||
|
||||
data = response.json()
|
||||
data["login_time"] = time.time()
|
||||
self.credentials = data
|
||||
self._save()
|
||||
logger.info("Successfully authenticated with GOG")
|
||||
return True
|
||||
|
||||
def refresh(self) -> bool:
|
||||
"""Refresh access token using refresh_token."""
|
||||
refresh_token = self.credentials.get("refresh_token")
|
||||
if not refresh_token:
|
||||
logger.error("No refresh token available")
|
||||
return False
|
||||
|
||||
url = (
|
||||
f"{GOG_AUTH_URL}/token"
|
||||
f"?client_id={CLIENT_ID}"
|
||||
f"&client_secret={CLIENT_SECRET}"
|
||||
f"&grant_type=refresh_token"
|
||||
f"&refresh_token={refresh_token}"
|
||||
)
|
||||
try:
|
||||
response = self.session.get(url, timeout=15)
|
||||
except (requests.ConnectionError, requests.Timeout):
|
||||
logger.error("Failed to refresh token — network error")
|
||||
return False
|
||||
|
||||
if not response.ok:
|
||||
logger.error(f"Failed to refresh token — HTTP {response.status_code}")
|
||||
return False
|
||||
|
||||
data = response.json()
|
||||
data["login_time"] = time.time()
|
||||
self.credentials = data
|
||||
self._save()
|
||||
logger.info("Token refreshed successfully")
|
||||
return True
|
||||
|
||||
def logout(self) -> None:
|
||||
"""Clear stored credentials."""
|
||||
self.credentials = {}
|
||||
if self.auth_file.exists():
|
||||
self.auth_file.unlink()
|
||||
logger.info("Logged out")
|
||||
Reference in New Issue
Block a user