
本教程详细阐述了如何在 fastapi 中实现多种认证机制(如 basic auth 和 jwt auth),并允许客户端任选其一进行认证。核心方法是修改各个认证依赖项,使其在认证失败时返回 `none` 而非立即抛出异常,从而使一个组合认证依赖能够基于“或”逻辑判断任一认证是否成功,最终实现灵活的多重认证支持。
在构建现代 Web API 时,支持多种认证方式以满足不同客户端或场景的需求是常见的实践。例如,您可能希望同时支持传统的 HTTP Basic 认证和基于令牌的 JWT 认证。然而,在 FastAPI 中直接将多个认证依赖项组合使用时,默认行为可能会导致问题:FastAPI 会强制所有依赖项都通过验证,如果其中一个依赖项在解析阶段抛出 HTTPException,那么后续的依赖项将不会被执行,从而无法实现“任选其一”的逻辑。
FastAPI 的依赖注入系统在处理多个 Depends 依赖时,遵循严格的顺序和错误处理机制。当一个依赖项(例如,一个认证函数)内部抛出 HTTPException 时,FastAPI 会立即捕获该异常并返回相应的 HTTP 响应,而不会继续执行后续的依赖项或路由处理函数。这意味着,如果您尝试通过一个自定义的组合依赖函数来包装多个认证依赖,并期望它们能像逻辑“或”一样工作,那么默认情况下是行不通的。
例如,以下尝试实现“或”逻辑的组合认证依赖:
# 假设 jwt_logged_user 和 basic_logged_user 在认证失败时会抛出 HTTPException
def auth_user(jwt_auth: Any = Depends(jwt_logged_user),
basic_auth: Any = Depends(basic_logged_user)):
if jwt_auth or basic_auth:
return jwt_auth or basic_auth
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid Credentials')如果 jwt_logged_user 认证失败并抛出异常,那么 basic_auth 依赖将永远不会被解析,auth_user 函数体也永远不会执行,从而无法检查 Basic Auth 是否有效。
要实现“任选其一”的认证逻辑,关键在于修改各个独立的认证依赖项,使其在认证失败时不再立即抛出 HTTPException,而是返回一个指示失败的值(例如 None)。这样,组合认证依赖就可以接收到所有独立认证的结果,并根据这些结果进行逻辑判断。
对于 HTTP Basic 认证,FastAPI 提供了 HTTPBasic 类。通过设置 auto_error=False,可以禁用其在认证失败时自动抛出 HTTPException 的行为。
首先,更新 HTTPBasic 实例:
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from typing import Annotated, Optional
import secrets
# ... 其他导入 ...
# 将 auto_error 设置为 False
security = HTTPBasic(auto_error=False)
def basic_logged_user(credentials: Annotated[Optional[HTTPBasicCredentials], Depends(security)]):
"""
处理 HTTP Basic 认证。
如果认证失败,返回 None 而非抛出异常。
"""
if credentials is None:
# 没有提供 Basic 认证凭据,或者凭据格式不正确 (由 auto_error=False 处理)
return None
current_username_bytes = credentials.username.encode("utf8")
correct_username_bytes = settings.SESSION_LOGIN_USER.encode("utf8")
is_correct_username = secrets.compare_digest(
current_username_bytes, correct_username_bytes
)
current_password_bytes = credentials.password.encode("utf8")
correct_password_bytes = settings.SESSION_LOGIN_PASS.encode("utf8")
is_correct_password = secrets.compare_digest(
current_password_bytes, correct_password_bytes
)
if not (is_correct_username and is_correct_password):
# 凭据不匹配,返回 None
return None
# 认证成功,返回用户名
return credentials.username关键点:
对于 JWT 认证,通常会使用 OAuth2PasswordBearer。同样,通过设置 auto_error=False 来禁用其自动错误处理。此外,您的令牌验证函数 (utils.verify_token) 也应该被修改,以在验证失败时返回 None 或被 try-except 块包裹。
假设 utils.OAuth2_scheme 是 OAuth2PasswordBearer 的实例,并且 utils.verify_token 函数负责验证 JWT。
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from typing import Annotated, Optional
from jose import JWTError, jwt # 假设您使用 jose 库
from pydantic import BaseModel
# ... 其他导入 ...
# 假设 utils 模块包含 OAuth2_scheme 和 verify_token
# utils.OAuth2_scheme 应该被初始化为 auto_error=False
# 例如:
# class TokenData(BaseModel):
# username: Optional[str] = None
# class Utils:
# OAuth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
# SECRET_KEY = "your-secret-key" # 替换为实际的密钥
# ALGORITHM = "HS256"
# def verify_token(self, token: str) -> dict:
# try:
# payload = jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM])
# username: str = payload.get("sub")
# if username is None:
# raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload")
# return {"username": username}
# except JWTError:
# raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
# utils = Utils()
def jwt_logged_user(token: Annotated[Optional[str], Depends(utils.OAuth2_scheme)],
db: Session = Depends(db_session)):
"""
处理 JWT 认证。
如果认证失败,返回 None 而非抛出异常。
"""
if token is None:
# 没有提供 JWT 令牌,或者令牌格式不正确 (由 auto_error=False 处理)
return None
try:
# 尝试验证令牌,并从 payload 中获取用户名
# 假设 utils.verify_token 在验证失败时会抛出 HTTPException 或 JWTError
payload = utils.verify_token(token) # 假设返回一个包含用户信息的字典
username = payload.get("username") # 根据实际 payload 结构调整
if username is None:
return None # 令牌有效但没有找到用户名信息
# 查询数据库以验证用户是否存在
user = db.query(User).filter(User.username == username).first()
if user is None:
return None # 用户不存在
# 认证成功,返回用户名
return user.username
except HTTPException:
# 捕获 verify_token 内部抛出的 HTTPException
return None
except JWTError: # 捕获 jose 库的 JWT 错误
return None
except Exception:
# 捕获其他未知错误
return None关键点:
现在,当 basic_logged_user 和 jwt_logged_user 认证失败时都返回 None,我们可以创建一个新的组合依赖项来检查任一认证是否成功。
from fastapi import HTTPException, status, Depends
from typing import Annotated, Optional, Union
# ... 其他导入 ...
def auth_user(jwt_username: Annotated[Optional[str], Depends(jwt_logged_user)],
basic_username: Annotated[Optional[str], Depends(basic_logged_user)]) -> str:
"""
组合认证依赖,允许通过 JWT 或 Basic Auth 中的任一种进行认证。
如果任一认证成功,返回对应的用户名。
如果两种认证都失败,则抛出 401 异常。
"""
if jwt_username:
# JWT 认证成功
return jwt_username
if basic_username:
# Basic Auth 认证成功
return basic_username
# 如果两种认证都失败,则抛出未授权异常
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Invalid Credentials',
headers={"WWW-Authenticate": "Basic, Bearer"}, # 提示支持的认证方式
)
# 路由使用组合认证依赖
@router.get("/users/")
async def get_users(db: Session = Depends(db_session),
logged_username: Annotated[str, Depends(auth_user)]):
"""
获取用户列表,需要通过 JWT 或 Basic Auth 认证。
"""
# 假设您需要根据用户名进行某些操作,例如权限检查
print(f"Authenticated user: {logged_username}")
query_users = db.query(User).all()
return query_users关键点:
为了更好地理解,以下是一个整合了上述修改的 FastAPI 应用片段:
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials, OAuth2PasswordBearer
from sqlalchemy.orm import Session
from typing import Annotated, Optional, Union
import secrets
from jose import JWTError, jwt # 假设使用 jose 库
# 假设这些是您的配置和模型
class Settings:
SESSION_LOGIN_USER = "admin"
SESSION_LOGIN_PASS = "securepassword"
JWT_SECRET_KEY = "your-super-secret-jwt-key" # 替换为实际的密钥
JWT_ALGORITHM = "HS256"
settings = Settings()
# 假设 db_session 是您的数据库会话依赖
# 假设 User 是您的 SQLAlchemy 用户模型
class User: # 简化示例
def __init__(self, username):
self.username = username
def __repr__(self):
return f"<User {self.username}>"
class MockDBSession: # 模拟数据库会话
def query(self, model):
return self
def filter(self, *args, **kwargs):
return self
def first(self):
return User("testuser") # 模拟返回一个用户
def all(self):
return [User("user1"), User("user2")]
def get_db():
db = MockDBSession()
try:
yield db
finally:
pass # 实际应用中会关闭会话
db_session = Depends(get_db)
# JWT 相关的工具类
class Utils:
OAuth2_scheme = OAuth2PasswordBearer(tokenUrl="/token", auto_error=False)
SECRET_KEY = settings.JWT_SECRET_KEY
ALGORITHM = settings.JWT_ALGORITHM
def verify_token(self, token: str) -> dict:
try:
payload = jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise JWTError("Invalid token payload: no subject")
return {"username": username}
except JWTError as e:
# 在这里可以记录错误,但不再抛出 HTTPException
raise e # 重新抛出 JWTError,由 jwt_logged_user 捕获
except Exception as e:
raise e
utils = Utils()
router = APIRouter()
# --- 独立认证依赖项 ---
# Basic Auth
security = HTTPBasic(auto_error=False)
def basic_logged_user(credentials: Annotated[Optional[HTTPBasicCredentials], Depends(security)]) -> Optional[str]:
if credentials is None:
return None
current_username_bytes = credentials.username.encode("utf8")
correct_username_bytes = settings.SESSION_LOGIN_USER.encode("utf8")
is_correct_username = secrets.compare_digest(
current_username_bytes, correct_username_bytes
)
current_password_bytes = credentials.password.encode("utf8")
correct_password_bytes = settings.SESSION_LOGIN_PASS.encode("utf8")
is_correct_password = secrets.compare_digest(
current_password_bytes, correct_password_bytes
)
if not (is_correct_username and is_correct_password):
return None
return credentials.username
# JWT Auth
def jwt_logged_user(token: Annotated[Optional[str], Depends(utils.OAuth2_scheme)],
db: Session = Depends(db_session)) -> Optional[str]:
if token is None:
return None
try:
payload = utils.verify_token(token)
username = payload.get("username")
if username is None:
return None
user = db.query(User).filter(User.username == username).first()
if user is None:
return None
return user.username
except JWTError:
return None
except Exception:
return None
# --- 组合认证依赖项 ---
def auth_user(jwt_username: Annotated[Optional[str], Depends(jwt_logged_user)],
basic_username: Annotated[Optional[str], Depends(basic_logged_user)]) -> str:
if jwt_username:
return jwt_username
if basic_username:
return basic_username
raise HTTPException(
status_code=status.以上就是FastAPI 多种认证方式(任选其一)实现指南的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号