Push V1 app

This commit is contained in:
jlacoste
2026-06-26 11:54:29 +02:00
parent 8b7caa1a5a
commit 9d1990523f
3881 changed files with 1291493 additions and 1 deletions
+104
View File
@@ -0,0 +1,104 @@
import os
import secrets
import sys
from datetime import datetime, timedelta
import bcrypt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from database import get_db
from models import User
PLACEHOLDER_KEY = "change-me-to-a-random-64-char-string"
_raw_secret = os.environ.get("SECRET_KEY", "")
if not _raw_secret or _raw_secret == PLACEHOLDER_KEY:
print("╔══════════════════════════════════════════════════╗")
print("║ ERREUR: SECRET_KEY non défini ou valeur par ║")
print("║ défaut. Définissez une clé secrète dans .env ║")
print("║ ou dans les variables d'environnement. ║")
print("╚══════════════════════════════════════════════════╝")
sys.exit(1)
SECRET_KEY = _raw_secret
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode(), hashed.encode())
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db),
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
sub = payload.get("sub")
if sub is None:
raise credentials_exception
user_id = int(sub)
token_version = payload.get("token_version", 0)
except (JWTError, ValueError):
raise credentials_exception
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise credentials_exception
if user.token_version != token_version:
raise credentials_exception
return user
def get_user_from_token(token: str, db: Session) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token invalide",
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
sub = payload.get("sub")
if sub is None:
raise credentials_exception
user_id = int(sub)
token_version = payload.get("token_version", 0)
except (JWTError, ValueError):
raise credentials_exception
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise credentials_exception
if user.token_version != token_version:
raise credentials_exception
return user
def require_admin(user: User = Depends(get_current_user)) -> User:
if not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Accès réservé à l'administrateur",
)
return user