FastAPI+JWT认证:实现简单的用户登录验证

1. 环境准备与依赖安装

要实现FastAPI+JWT的用户登录验证,我们需要先安装必要的Python库。打开终端,执行以下命令:

pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt] python-multipart
  • FastAPI:高性能Web框架,用于构建API
  • uvicorn:ASGI服务器,用于运行FastAPI应用
  • python-jose:用于生成和验证JWT令牌
  • passlib:用于密码哈希处理(安全存储密码)
  • bcrypt:passlib的依赖,用于密码加密算法
  • python-multipart:处理表单数据(登录接口常用)

2. 核心概念理解

  • JWT(JSON Web Token):一种无状态的身份验证方式,服务器通过用户信息生成加密令牌,客户端每次请求携带令牌即可证明身份。
  • 依赖项(Dependency):FastAPI中用于复用逻辑(如验证Token)的机制,可自动处理请求并返回结果。
  • 密码哈希:登录时存储的密码不直接保存明文,而是通过算法(如bcrypt)转为不可逆的哈希值,验证时通过比对哈希值判断密码是否正确。

3. 代码实现步骤

3.1 导入必要库
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
3.2 配置JWT参数
# 配置JWT相关参数(生产环境需改为安全存储,如环境变量)
SECRET_KEY = "your-secret-key-keep-it-safe-and-long-enough"  # 建议16位以上随机字符串
ALGORITHM = "HS256"  # 加密算法(HS256=HMAC-SHA256)
ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token有效期30分钟
3.3 模拟用户数据库

用列表模拟数据库存储用户信息(生产环境建议使用真实数据库如SQLite/PostgreSQL):

# 模拟用户数据(实际需替换为数据库查询)
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "hashed_password": "hashed_secret_password",  # 后续用bcrypt生成
        "disabled": False,
    }
}
3.4 密码哈希工具(passlib)

使用bcrypt算法对密码进行哈希处理:

# 初始化密码哈希上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def get_password_hash(password: str) -> str:
    """将密码转为哈希值(存储)"""
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """验证密码是否匹配哈希值"""
    return pwd_context.verify(plain_password, hashed_password)
3.5 JWT工具函数(生成与验证Token)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    """生成JWT Token"""
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(token: str, credentials_exception) -> dict:
    """验证Token有效性"""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")  # 从Token中提取用户名(sub=subject)
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    return {"username": username}
3.6 OAuth2依赖项(提取Token)

FastAPI提供OAuth2PasswordBearer工具自动从请求头获取Token:

# OAuth2方案:自动从Header获取Bearer Token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
3.7 定义用户模型

用Pydantic定义请求/响应数据结构(保证数据格式正确):

class User(BaseModel):
    username: str
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str
3.8 登录接口(获取Token)

接收用户名/密码,验证后返回Token:

# 登录接口:路径为/token,接收表单参数(username/password)
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db=fake_users_db):
    # 1. 从数据库查用户(实际需替换为查询逻辑)
    user = db.get(form_data.username)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    # 2. 验证密码
    if not verify_password(form_data.password, user["hashed_password"]):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    # 3. 生成Token
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user["username"]}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}
3.9 受保护接口(验证Token)

用依赖项验证Token后返回用户信息:

# 依赖项:验证Token并返回用户
async def get_current_user(
    token: str = Depends(oauth2_scheme),
    credentials_exception=HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid authentication credentials",
        headers={"WWW-Authenticate": "Bearer"},
    ),
):
    user = verify_token(token, credentials_exception)
    return User(username=user["username"])

# 受保护路由示例
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    return {"user": current_user}
3.10 主程序入口
app = FastAPI()

# 初始化哈希密码(生产环境中用户注册时调用,此处为演示先手动哈希)
if __name__ == "__main__":
    # 手动生成测试用户的哈希密码(实际应在注册接口中调用)
    test_password = "testpassword"
    fake_users_db["johndoe"]["hashed_password"] = get_password_hash(test_password)

4. 运行与测试

  1. 启动服务:在代码文件(如main.py)所在目录执行:
   uvicorn main:app --reload
  1. 访问Swagger UI:打开浏览器访问 http://localhost:8000/docs,即可在界面中测试登录接口。
  2. 测试流程
    - 调用POST /token接口,输入username=johndoepassword=testpassword,获取access_token
    - 用Token调用GET /users/me接口,在请求头中添加Authorization: Bearer <token>,即可返回用户信息。

5. 关键知识点总结

  • 依赖项(Dependency):复用Token验证逻辑,自动返回用户信息或拦截无效请求。
  • JWT安全性:需保护SECRET_KEY(生产环境用环境变量存储),避免Token过期或泄露。
  • 密码存储:用passlib哈希算法存储密码,禁止明文存储,防止数据泄露风险。

通过以上步骤,你已实现一个基于FastAPI和JWT的基础用户登录验证系统。后续可扩展功能如刷新Token、多用户角色权限等。

小夜