162 lines
4.6 KiB
Python
162 lines
4.6 KiB
Python
from datetime import datetime, timedelta
|
|
|
|
import secrets as secrets_mod
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, status
|
|
from sqlalchemy.orm import Session
|
|
|
|
from auth import (
|
|
create_access_token,
|
|
get_current_user,
|
|
hash_password,
|
|
require_admin,
|
|
verify_password,
|
|
)
|
|
from database import get_db
|
|
from models import Invitation, User
|
|
from ratelimit import change_password_limiter, login_limiter, rate_limit, register_limiter
|
|
from schemas import (
|
|
InvitationCreate,
|
|
InvitationResponse,
|
|
PasswordChange,
|
|
TokenResponse,
|
|
UserCreate,
|
|
UserLogin,
|
|
UserResponse,
|
|
)
|
|
|
|
router = APIRouter(prefix="/api/auth", tags=["auth"])
|
|
|
|
INVITATION_EXPIRY_DAYS = 7
|
|
|
|
|
|
@router.post("/register", response_model=TokenResponse)
|
|
@rate_limit(register_limiter)
|
|
def register(request: Request, data: UserCreate, db: Session = Depends(get_db)):
|
|
invite = (
|
|
db.query(Invitation)
|
|
.filter(
|
|
Invitation.token == data.invite_token,
|
|
Invitation.used_by.is_(None),
|
|
)
|
|
.first()
|
|
)
|
|
if not invite:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invitation invalide ou déjà utilisée",
|
|
)
|
|
|
|
if invite.expires_at and invite.expires_at < datetime.utcnow():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invitation expirée",
|
|
)
|
|
|
|
if db.query(User).filter(
|
|
(User.username == data.username) | (User.email == data.email)
|
|
).first():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Un compte avec ces informations existe déjà",
|
|
)
|
|
|
|
user = User(
|
|
username=data.username,
|
|
email=data.email,
|
|
hashed_password=hash_password(data.password),
|
|
)
|
|
db.add(user)
|
|
db.flush()
|
|
|
|
invite.used_by = user.id
|
|
invite.used_at = datetime.utcnow()
|
|
db.flush()
|
|
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
token = create_access_token({"sub": str(user.id), "token_version": user.token_version})
|
|
return TokenResponse(
|
|
access_token=token,
|
|
user=UserResponse.model_validate(user),
|
|
)
|
|
|
|
|
|
@router.post("/login", response_model=TokenResponse)
|
|
@rate_limit(login_limiter)
|
|
def login(request: Request, data: UserLogin, db: Session = Depends(get_db)):
|
|
user = db.query(User).filter(User.username == data.username).first()
|
|
if not user or not verify_password(data.password, user.hashed_password):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Identifiants incorrects",
|
|
)
|
|
|
|
token = create_access_token({"sub": str(user.id), "token_version": user.token_version})
|
|
return TokenResponse(
|
|
access_token=token,
|
|
user=UserResponse.model_validate(user),
|
|
)
|
|
|
|
|
|
@router.get("/me", response_model=UserResponse)
|
|
def me(user: User = Depends(get_current_user)):
|
|
return UserResponse.model_validate(user)
|
|
|
|
|
|
@router.post("/change-password")
|
|
@rate_limit(change_password_limiter)
|
|
def change_password(
|
|
request: Request,
|
|
data: PasswordChange,
|
|
user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
if not verify_password(data.current_password, user.hashed_password):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Mot de passe actuel incorrect",
|
|
)
|
|
user.hashed_password = hash_password(data.new_password)
|
|
user.token_version += 1
|
|
db.commit()
|
|
return {"detail": "Mot de passe modifié"}
|
|
|
|
|
|
@router.post("/logout")
|
|
def logout(user: User = Depends(get_current_user), db: Session = Depends(get_db)):
|
|
user.token_version += 1
|
|
db.commit()
|
|
return {"detail": "Déconnexion réussie"}
|
|
|
|
|
|
@router.post("/invitations", response_model=InvitationResponse)
|
|
def create_invitation(
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
invitation = Invitation(
|
|
token=secrets_mod.token_urlsafe(32),
|
|
created_by=admin.id,
|
|
expires_at=datetime.utcnow() + timedelta(days=INVITATION_EXPIRY_DAYS),
|
|
)
|
|
db.add(invitation)
|
|
db.commit()
|
|
db.refresh(invitation)
|
|
return InvitationResponse.model_validate(invitation)
|
|
|
|
|
|
@router.get("/invitations", response_model=list[InvitationResponse])
|
|
def list_invitations(
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
invitations = (
|
|
db.query(Invitation)
|
|
.filter(Invitation.created_by == admin.id)
|
|
.order_by(Invitation.created_at.desc())
|
|
.all()
|
|
)
|
|
return [InvitationResponse.model_validate(i) for i in invitations]
|