import os import secrets import sys from datetime import datetime, timedelta import bcrypt from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from sqlalchemy.orm import Session from database import get_db from models import User PLACEHOLDER_KEY = "change-me-to-a-random-64-char-string" _raw_secret = os.environ.get("SECRET_KEY", "") if not _raw_secret or _raw_secret == PLACEHOLDER_KEY: print("╔══════════════════════════════════════════════════╗") print("║ ERREUR: SECRET_KEY non défini ou valeur par ║") print("║ défaut. Définissez une clé secrète dans .env ║") print("║ ou dans les variables d'environnement. ║") print("╚══════════════════════════════════════════════════╝") sys.exit(1) SECRET_KEY = _raw_secret ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() def verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode(), hashed.encode()) def create_access_token(data: dict) -> str: to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db), ) -> User: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) sub = payload.get("sub") if sub is None: raise credentials_exception user_id = int(sub) token_version = payload.get("token_version", 0) except (JWTError, ValueError): raise credentials_exception user = db.query(User).filter(User.id == user_id).first() if user is None: raise credentials_exception if user.token_version != token_version: raise credentials_exception return user def get_user_from_token(token: str, db: Session) -> User: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide", ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) sub = payload.get("sub") if sub is None: raise credentials_exception user_id = int(sub) token_version = payload.get("token_version", 0) except (JWTError, ValueError): raise credentials_exception user = db.query(User).filter(User.id == user_id).first() if user is None: raise credentials_exception if user.token_version != token_version: raise credentials_exception return user def require_admin(user: User = Depends(get_current_user)) -> User: if not user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Accès réservé à l'administrateur", ) return user