import json import logging import os import re import secrets import time import threading from datetime import datetime, timedelta, timezone from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from passlib.context import CryptContext from pydantic import BaseModel from sqlalchemy.orm import Session from database import get_db from models import User router = APIRouter() _SECRET_KEY_FILE = "data/secret_key.txt" _audit = logging.getLogger("audit") def _log_audit(event: str, **kw) -> None: _audit.info(json.dumps({"event": event, "ts": datetime.now(timezone.utc).isoformat(), **kw})) def _load_secret_key() -> str: env = os.environ.get("SECRET_KEY") if env: return env if os.path.exists(_SECRET_KEY_FILE): return open(_SECRET_KEY_FILE).read().strip() key = secrets.token_hex(32) os.makedirs(os.path.dirname(_SECRET_KEY_FILE), exist_ok=True) # Create with owner-only permissions (0600) to prevent other users from reading the key fd = os.open(_SECRET_KEY_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) try: with os.fdopen(fd, "w") as f: f.write(key) except Exception: os.close(fd) raise return key SECRET_KEY = _load_secret_key() ALGORITHM = "HS256" TOKEN_EXPIRE_HOURS = 24 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") # --- Rate limiting --- _login_attempts: dict[str, list[float]] = {} # username → timestamps _ip_attempts: dict[str, list[float]] = {} # ip → timestamps _rate_lock = threading.Lock() _USERNAME_WINDOW = 900 # 15 min _USERNAME_MAX = 10 _IP_WINDOW = 60 # 1 min _IP_MAX = 20 def _check_username_rate_limit(username: str) -> None: now = time.time() with _rate_lock: attempts = [t for t in _login_attempts.get(username, []) if now - t < _USERNAME_WINDOW] if len(attempts) >= _USERNAME_MAX: raise HTTPException(status_code=429, detail="Too many attempts, try again later") attempts.append(now) _login_attempts[username] = attempts def _check_ip_rate_limit(ip: str) -> None: now = time.time() with _rate_lock: attempts = [t for t in _ip_attempts.get(ip, []) if now - t < _IP_WINDOW] if len(attempts) >= _IP_MAX: raise HTTPException(status_code=429, detail="Too many attempts, try again later") attempts.append(now) _ip_attempts[ip] = attempts def _clear_login_attempts(username: str) -> None: with _rate_lock: _login_attempts.pop(username, None) def create_token(username: str, version: int) -> str: expire = datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRE_HOURS) return jwt.encode( {"sub": username, "ver": version, "exp": expire}, SECRET_KEY, algorithm=ALGORITHM, ) def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User: try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") token_ver: int = payload.get("ver", 1) if not username: raise HTTPException(status_code=401, detail="Invalid token") except JWTError: raise HTTPException(status_code=401, detail="Invalid token", headers={"WWW-Authenticate": "Bearer"}) user = db.query(User).filter(User.username == username).first() if not user: _log_audit("auth.token_rejected", username=username, reason="user_not_found") raise HTTPException(status_code=401, detail="User not found") if (user.token_version or 1) != token_ver: _log_audit("auth.token_rejected", username=username, reason="version_mismatch") raise HTTPException(status_code=401, detail="Session expired, please log in again") return user def require_password_changed(current_user: User = Depends(get_current_user)) -> User: if current_user.must_change_password: raise HTTPException(status_code=403, detail="Password change required") return current_user # --- Validation helpers --- _USERNAME_RE = re.compile(r"^[a-zA-Z0-9._-]{1,64}$") def _validate_new_password(password: str) -> None: if len(password) < 8: raise HTTPException(status_code=400, detail="password_too_short") if not re.search(r"[a-zA-Z]", password) or not re.search(r"[0-9]", password): raise HTTPException(status_code=400, detail="password_too_weak") def _validate_new_username(username: str) -> None: if not _USERNAME_RE.match(username): raise HTTPException(status_code=400, detail="username_invalid") class TokenOut(BaseModel): access_token: str token_type: str username: str must_change_password: bool = False class AccountUpdate(BaseModel): current_password: str new_username: str | None = None new_password: str | None = None @router.post("/login", response_model=TokenOut) def login(request: Request, form: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): client_ip = request.client.host if request.client else "unknown" try: _check_ip_rate_limit(client_ip) except HTTPException: _log_audit("auth.login.rate_limited", ip=client_ip, reason="ip") raise try: _check_username_rate_limit(form.username) except HTTPException: _log_audit("auth.login.rate_limited", ip=client_ip, username=form.username, reason="username") raise user = db.query(User).filter(User.username == form.username).first() if not user or not pwd_context.verify(form.password, user.hashed_password): _log_audit("auth.login.failure", username=form.username, ip=client_ip) raise HTTPException(status_code=401, detail="Incorrect username or password") _clear_login_attempts(form.username) _log_audit("auth.login.success", username=user.username, ip=client_ip) return { "access_token": create_token(user.username, user.token_version or 1), "token_type": "bearer", "username": user.username, "must_change_password": bool(user.must_change_password), } @router.put("/account", response_model=TokenOut) def update_account( data: AccountUpdate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): if not pwd_context.verify(data.current_password, current_user.hashed_password): _log_audit("auth.account.bad_password", username=current_user.username) raise HTTPException(status_code=400, detail="Current password is incorrect") if data.new_username and data.new_username != current_user.username: _validate_new_username(data.new_username) if db.query(User).filter(User.username == data.new_username).first(): raise HTTPException(status_code=400, detail="Username already taken") old_username = current_user.username current_user.username = data.new_username _log_audit("auth.account.username_changed", old_username=old_username, new_username=data.new_username) if data.new_password: _validate_new_password(data.new_password) current_user.hashed_password = pwd_context.hash(data.new_password) current_user.must_change_password = False # Invalidate all previously issued tokens by bumping the version current_user.token_version = (current_user.token_version or 1) + 1 _log_audit("auth.account.password_changed", username=current_user.username) db.commit() return { "access_token": create_token(current_user.username, current_user.token_version or 1), "token_type": "bearer", "username": current_user.username, "must_change_password": bool(current_user.must_change_password), } @router.get("/me") def get_me(current_user: User = Depends(get_current_user)): return { "username": current_user.username, "must_change_password": bool(current_user.must_change_password), }