30c32b7714
- db/ package: async SQLAlchemy engine + Player/Game/Guess models - api/auth.py: passwordless TOTP login (pyotp), session token via socket auth - api/history.py: record guesses/points, DB-backed standings, restore unfinished games on startup, host-only end_game - api/__init__.py: auth-gated handlers, accounts map, rejoin via account - frontend: Auth (QR + code) and History pages, resume/end-game in lobby/table - docker-compose: real PostgreSQL service wired via DATABASE_URL - tests_history.py for the persistence/auth layer; refresh CLAUDE.md Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
101 lines
3.3 KiB
Python
101 lines
3.3 KiB
Python
"""Bezheslova autentifikacia cez TOTP (Google Authenticator a pod.).
|
|
|
|
Domenova logika nad `db/` -- tenke socket handlery v api/__init__.py ju volaju.
|
|
Hodnoty sa overuju cez pyotp; replay sa bloku pomocou Player.totp_last_step.
|
|
"""
|
|
|
|
import secrets
|
|
import time
|
|
|
|
import pyotp
|
|
from sqlalchemy import select
|
|
|
|
from db.db import async_session
|
|
from db.models import Player
|
|
|
|
ISSUER = "Bridžik"
|
|
TOTP_PERIOD = 30 # sekund -- default pyotp
|
|
|
|
|
|
class AuthError(Exception):
|
|
"""Chyba prihlasenia/registracie (slovenska sprava pre klienta)."""
|
|
|
|
|
|
def _new_token() -> str:
|
|
return secrets.token_urlsafe(48)
|
|
|
|
|
|
def _current_step() -> int:
|
|
return int(time.time()) // TOTP_PERIOD
|
|
|
|
|
|
def _verify_code(player: Player, code: str) -> None:
|
|
"""Overi TOTP kod a posunie totp_last_step. Pri neuspechu vyhodi AuthError.
|
|
|
|
Akceptuje +-1 casovy krok (tolerancia hodin) a odmietne uz pouzity krok.
|
|
"""
|
|
totp = pyotp.TOTP(player.totp_secret)
|
|
current = _current_step()
|
|
for step in (current - 1, current, current + 1):
|
|
if step <= player.totp_last_step:
|
|
continue
|
|
if totp.verify(code, for_time=step * TOTP_PERIOD):
|
|
player.totp_last_step = step
|
|
return
|
|
raise AuthError("Nesprávny alebo už použitý kód.")
|
|
|
|
|
|
async def register_account(username: str) -> dict:
|
|
"""Zaregistruje meno a vygeneruje TOTP secret. Vrati otpauth URI pre QR."""
|
|
username = (username or "").strip()
|
|
if not username:
|
|
raise AuthError("Zadajte meno.")
|
|
secret = pyotp.random_base32()
|
|
async with async_session() as session:
|
|
existing = await session.scalar(
|
|
select(Player).where(Player.username == username)
|
|
)
|
|
if existing is not None:
|
|
raise AuthError("Toto meno je už obsadené.")
|
|
session.add(Player(username=username, totp_secret=secret))
|
|
await session.commit()
|
|
otpauth_uri = pyotp.TOTP(secret).provisioning_uri(name=username, issuer_name=ISSUER)
|
|
return {"username": username, "secret": secret, "otpauth_uri": otpauth_uri}
|
|
|
|
|
|
async def confirm_account(username: str, code: str) -> dict:
|
|
"""Potvrdi registraciu prvym kodom z aplikacie a vrati session token."""
|
|
return await _verify_and_issue_token(username, code)
|
|
|
|
|
|
async def login(username: str, code: str) -> dict:
|
|
"""Prihlasi existujuci ucet a vrati session token."""
|
|
return await _verify_and_issue_token(username, code)
|
|
|
|
|
|
async def _verify_and_issue_token(username: str, code: str) -> dict:
|
|
async with async_session() as session:
|
|
player = await session.scalar(
|
|
select(Player).where(Player.username == (username or "").strip())
|
|
)
|
|
if player is None:
|
|
raise AuthError("Účet neexistuje.")
|
|
_verify_code(player, (code or "").strip())
|
|
token = _new_token()
|
|
player.auth_token = token
|
|
await session.commit()
|
|
return {"player_id": player.id, "username": player.username, "token": token}
|
|
|
|
|
|
async def player_by_token(token: str) -> dict | None:
|
|
"""Overi session token (z Socket.IO `auth`). Vrati identitu alebo None."""
|
|
if not token:
|
|
return None
|
|
async with async_session() as session:
|
|
player = await session.scalar(
|
|
select(Player).where(Player.auth_token == token)
|
|
)
|
|
if player is None:
|
|
return None
|
|
return {"player_id": player.id, "username": player.username}
|