"""Bezheslova autentifikacia cez TOTP (Google Authenticator a pod.). Domenova logika nad `db/` -- tenke socket handlery v api/__init__.py ju volaju. Hodnoty sa overuju cez pyotp; replay sa bloku pomocou Player.totp_last_step. """ import secrets import time import pyotp from sqlalchemy import select from db.db import async_session from db.models import Player ISSUER = "Bridžik" TOTP_PERIOD = 30 # sekund -- default pyotp class AuthError(Exception): """Chyba prihlasenia/registracie (slovenska sprava pre klienta).""" def _new_token() -> str: return secrets.token_urlsafe(48) def _current_step() -> int: return int(time.time()) // TOTP_PERIOD def _verify_code(player: Player, code: str) -> None: """Overi TOTP kod a posunie totp_last_step. Pri neuspechu vyhodi AuthError. Akceptuje +-1 casovy krok (tolerancia hodin) a odmietne uz pouzity krok. """ totp = pyotp.TOTP(player.totp_secret) current = _current_step() for step in (current - 1, current, current + 1): if step <= player.totp_last_step: continue if totp.verify(code, for_time=step * TOTP_PERIOD): player.totp_last_step = step return raise AuthError("Nesprávny alebo už použitý kód.") async def register_account(username: str) -> dict: """Zaregistruje meno a vygeneruje TOTP secret. Vrati otpauth URI pre QR.""" username = (username or "").strip() if not username: raise AuthError("Zadajte meno.") secret = pyotp.random_base32() async with async_session() as session: existing = await session.scalar( select(Player).where(Player.username == username) ) if existing is not None: raise AuthError("Toto meno je už obsadené.") session.add(Player(username=username, totp_secret=secret)) await session.commit() otpauth_uri = pyotp.TOTP(secret).provisioning_uri(name=username, issuer_name=ISSUER) return {"username": username, "secret": secret, "otpauth_uri": otpauth_uri} async def confirm_account(username: str, code: str) -> dict: """Potvrdi registraciu prvym kodom z aplikacie a vrati session token.""" return await _verify_and_issue_token(username, code) async def login(username: str, code: str) -> dict: """Prihlasi existujuci ucet a vrati session token.""" return await _verify_and_issue_token(username, code) async def _verify_and_issue_token(username: str, code: str) -> dict: async with async_session() as session: player = await session.scalar( select(Player).where(Player.username == (username or "").strip()) ) if player is None: raise AuthError("Účet neexistuje.") _verify_code(player, (code or "").strip()) token = _new_token() player.auth_token = token await session.commit() return {"player_id": player.id, "username": player.username, "token": token} async def player_by_token(token: str) -> dict | None: """Overi session token (z Socket.IO `auth`). Vrati identitu alebo None.""" if not token: return None async with async_session() as session: player = await session.scalar( select(Player).where(Player.auth_token == token) ) if player is None: return None return {"player_id": player.id, "username": player.username}