Skip to content

FastAPI + JWT

Давайте разберем реализацию аутентификации и авторизации в FastAPI с JWT и Pydantic.

1. Установите зависимости:

bash
pip install fastapi uvicorn python-jose[cryptography] passlib python-multipart

2. Базовая структура проекта:

.
├── main.py
└── auth
    ├── __init__.py
    ├── models.py
    ├── schemas.py
    └── security.py

3. Реализация компонентов:

auth/models.py (Модели данных):

python
from pydantic import BaseModel

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: str | None = None

class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None

class UserInDB(User):
    hashed_password: str

auth/security.py (Логика безопасности):

python
from datetime import datetime, timedelta
from typing import Annotated
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer

# Конфигурация
SECRET_KEY = "your-secret-key-keep-it-secret"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")

def verify_password(plain_password: str, hashed_password: str):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str):
    return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    
    # Здесь должна быть логика получения пользователя из БД
    user = fake_users_db.get(token_data.username)
    if user is None:
        raise credentials_exception
    return user

main.py (Основное приложение):

python
from fastapi import FastAPI, Depends, HTTPException, status
from auth.models import User, Token, UserInDB
from auth.security import (
    get_current_user,
    create_access_token,
    get_password_hash,
    verify_password,
    oauth2_scheme,
    ACCESS_TOKEN_EXPIRE_MINUTES
)
from datetime import timedelta

app = FastAPI()

# Заглушка базы данных
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": get_password_hash("secret"),
        "disabled": False,
    }
}

@app.post("/auth/token", response_model=Token)
async def login_for_access_token(
    username: str,
    password: str
):
    user = fake_users_db.get(username)
    if not user or not verify_password(password, user["hashed_password"]):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user["username"]}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.post("/register")
async def register_user(username: str, password: str):
    if username in fake_users_db:
        raise HTTPException(status_code=400, detail="Username already registered")
    hashed_password = get_password_hash(password)
    fake_users_db[username] = {
        "username": username,
        "hashed_password": hashed_password,
        "disabled": False
    }
    return {"message": "User registered successfully"}

@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

@app.get("/protected")
async def protected_route(current_user: User = Depends(get_current_user)):
    return {
        "message": "This is a protected route",
        "user": current_user
    }

4. Как использовать:

  1. Запустите сервер:
bash
uvicorn main:app --reload
  1. Зарегистрируйте пользователя:
bash
curl -X POST "http://localhost:8000/register?username=johndoe&password=secret"
  1. Получите токен:
bash
curl -X POST "http://localhost:8000/auth/token?username=johndoe&password=secret"
  1. Доступ к защищенным роутам:
bash
curl -H "Authorization: Bearer YOUR_TOKEN" http://localhost:8000/protected

Важные замечания:

  1. Замените SECRET_KEY на случайную сложную строку
  2. Для продакшена используйте настоящую базу данных
  3. Добавьте обработку ошибок и валидацию
  4. Реализуйте механизм обновления токенов
  5. Добавьте ролевую модель для авторизации
  6. Используйте HTTPS в продакшене
  7. Реализуйте ограничение попыток входа
  8. Добавьте логаут (хранение невалидных токенов)

Для расширения функционала можно добавить:

  • Подтверждение email
  • Восстановление пароля
  • Двухфакторную аутентификацию
  • Логирование действий пользователя
  • RBAC (Role-Based Access Control)