0

0

FastAPI 多种认证方式(任选其一)实现指南

碧海醫心

碧海醫心

发布时间:2025-11-25 14:08:53

|

569人浏览过

|

来源于php中文网

原创

FastAPI 多种认证方式(任选其一)实现指南

本教程详细阐述了如何在 fastapi 中实现多种认证机制(如 basic auth 和 jwt auth),并允许客户端任选其一进行认证。核心方法是修改各个认证依赖项,使其在认证失败时返回 `none` 而非立即抛出异常,从而使一个组合认证依赖能够基于“或”逻辑判断任一认证是否成功,最终实现灵活的多重认证支持。

在构建现代 Web API 时,支持多种认证方式以满足不同客户端或场景的需求是常见的实践。例如,您可能希望同时支持传统的 HTTP Basic 认证和基于令牌的 JWT 认证。然而,在 FastAPI 中直接将多个认证依赖项组合使用时,默认行为可能会导致问题:FastAPI 会强制所有依赖项都通过验证,如果其中一个依赖项在解析阶段抛出 HTTPException,那么后续的依赖项将不会被执行,从而无法实现“任选其一”的逻辑。

理解 FastAPI 依赖注入的默认行为

FastAPI 的依赖注入系统在处理多个 Depends 依赖时,遵循严格的顺序和错误处理机制。当一个依赖项(例如,一个认证函数)内部抛出 HTTPException 时,FastAPI 会立即捕获该异常并返回相应的 HTTP 响应,而不会继续执行后续的依赖项或路由处理函数。这意味着,如果您尝试通过一个自定义的组合依赖函数来包装多个认证依赖,并期望它们能像逻辑“或”一样工作,那么默认情况下是行不通的。

例如,以下尝试实现“或”逻辑的组合认证依赖:

# 假设 jwt_logged_user 和 basic_logged_user 在认证失败时会抛出 HTTPException
def auth_user(jwt_auth: Any = Depends(jwt_logged_user),
              basic_auth: Any = Depends(basic_logged_user)):
    if jwt_auth or basic_auth:
        return jwt_auth or basic_auth
    raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid Credentials')

如果 jwt_logged_user 认证失败并抛出异常,那么 basic_auth 依赖将永远不会被解析,auth_user 函数体也永远不会执行,从而无法检查 Basic Auth 是否有效。

核心解决方案:优雅地处理认证失败

要实现“任选其一”的认证逻辑,关键在于修改各个独立的认证依赖项,使其在认证失败时不再立即抛出 HTTPException,而是返回一个指示失败的值(例如 None)。这样,组合认证依赖就可以接收到所有独立认证的结果,并根据这些结果进行逻辑判断。

1. 修改 HTTP Basic 认证依赖

对于 HTTP Basic 认证,FastAPI 提供了 HTTPBasic 类。通过设置 auto_error=False,可以禁用其在认证失败时自动抛出 HTTPException 的行为。

首先,更新 HTTPBasic 实例:

from fastapi.security import HTTPBasic, HTTPBasicCredentials
from typing import Annotated, Optional
import secrets

# ... 其他导入 ...

# 将 auto_error 设置为 False
security = HTTPBasic(auto_error=False)

def basic_logged_user(credentials: Annotated[Optional[HTTPBasicCredentials], Depends(security)]):
    """
    处理 HTTP Basic 认证。
    如果认证失败,返回 None 而非抛出异常。
    """
    if credentials is None:
        # 没有提供 Basic 认证凭据,或者凭据格式不正确 (由 auto_error=False 处理)
        return None

    current_username_bytes = credentials.username.encode("utf8")
    correct_username_bytes = settings.SESSION_LOGIN_USER.encode("utf8")
    is_correct_username = secrets.compare_digest(
        current_username_bytes, correct_username_bytes
    )
    current_password_bytes = credentials.password.encode("utf8")
    correct_password_bytes = settings.SESSION_LOGIN_PASS.encode("utf8")
    is_correct_password = secrets.compare_digest(
        current_password_bytes, correct_password_bytes
    )

    if not (is_correct_username and is_correct_password):
        # 凭据不匹配,返回 None
        return None

    # 认证成功,返回用户名
    return credentials.username

关键点:

百度智能云·曦灵
百度智能云·曦灵

百度旗下的AI数字人平台

下载
  • security = HTTPBasic(auto_error=False):禁用 HTTPBasic 在凭据缺失或格式错误时自动抛出 401 异常。
  • credentials: Annotated[Optional[HTTPBasicCredentials], Depends(security)]:声明 credentials 为 Optional 类型,因为 security 在 auto_error=False 时可能返回 None。
  • if credentials is None::检查是否提供了凭据。
  • if not (is_correct_username and is_correct_password): return None:在自定义逻辑中,如果凭据无效,也返回 None。

2. 修改 JWT 认证依赖

对于 JWT 认证,通常会使用 OAuth2PasswordBearer。同样,通过设置 auto_error=False 来禁用其自动错误处理。此外,您的令牌验证函数 (utils.verify_token) 也应该被修改,以在验证失败时返回 None 或被 try-except 块包裹。

假设 utils.OAuth2_scheme 是 OAuth2PasswordBearer 的实例,并且 utils.verify_token 函数负责验证 JWT。

from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from typing import Annotated, Optional
from jose import JWTError, jwt # 假设您使用 jose 库
from pydantic import BaseModel

# ... 其他导入 ...

# 假设 utils 模块包含 OAuth2_scheme 和 verify_token
# utils.OAuth2_scheme 应该被初始化为 auto_error=False
# 例如:
# class TokenData(BaseModel):
#     username: Optional[str] = None
# class Utils:
#     OAuth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
#     SECRET_KEY = "your-secret-key" # 替换为实际的密钥
#     ALGORITHM = "HS256"
#     def verify_token(self, token: str) -> dict:
#         try:
#             payload = jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM])
#             username: str = payload.get("sub")
#             if username is None:
#                 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload")
#             return {"username": username}
#         except JWTError:
#             raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
# utils = Utils()


def jwt_logged_user(token: Annotated[Optional[str], Depends(utils.OAuth2_scheme)],
                    db: Session = Depends(db_session)):
    """
    处理 JWT 认证。
    如果认证失败,返回 None 而非抛出异常。
    """
    if token is None:
        # 没有提供 JWT 令牌,或者令牌格式不正确 (由 auto_error=False 处理)
        return None

    try:
        # 尝试验证令牌,并从 payload 中获取用户名
        # 假设 utils.verify_token 在验证失败时会抛出 HTTPException 或 JWTError
        payload = utils.verify_token(token) # 假设返回一个包含用户信息的字典
        username = payload.get("username") # 根据实际 payload 结构调整

        if username is None:
            return None # 令牌有效但没有找到用户名信息

        # 查询数据库以验证用户是否存在
        user = db.query(User).filter(User.username == username).first()
        if user is None:
            return None # 用户不存在

        # 认证成功,返回用户名
        return user.username
    except HTTPException:
        # 捕获 verify_token 内部抛出的 HTTPException
        return None
    except JWTError: # 捕获 jose 库的 JWT 错误
        return None
    except Exception:
        # 捕获其他未知错误
        return None

关键点:

  • utils.OAuth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False):确保 OAuth2PasswordBearer 实例设置了 auto_error=False。
  • token: Annotated[Optional[str], Depends(utils.OAuth2_scheme)]:声明 token 为 Optional 类型。
  • if token is None::检查是否提供了令牌。
  • try...except 块:包裹 utils.verify_token 调用,捕获可能抛出的异常,并在捕获到异常时返回 None。
  • return user.username:认证成功后返回用户名,与 Basic 认证的返回类型保持一致。

3. 实现组合认证逻辑

现在,当 basic_logged_user 和 jwt_logged_user 认证失败时都返回 None,我们可以创建一个新的组合依赖项来检查任一认证是否成功。

from fastapi import HTTPException, status, Depends
from typing import Annotated, Optional, Union

# ... 其他导入 ...

def auth_user(jwt_username: Annotated[Optional[str], Depends(jwt_logged_user)],
              basic_username: Annotated[Optional[str], Depends(basic_logged_user)]) -> str:
    """
    组合认证依赖,允许通过 JWT 或 Basic Auth 中的任一种进行认证。
    如果任一认证成功,返回对应的用户名。
    如果两种认证都失败,则抛出 401 异常。
    """
    if jwt_username:
        # JWT 认证成功
        return jwt_username
    if basic_username:
        # Basic Auth 认证成功
        return basic_username

    # 如果两种认证都失败,则抛出未授权异常
    raise HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail='Invalid Credentials',
        headers={"WWW-Authenticate": "Basic, Bearer"}, # 提示支持的认证方式
    )

# 路由使用组合认证依赖
@router.get("/users/")
async def get_users(db: Session = Depends(db_session), 
                    logged_username: Annotated[str, Depends(auth_user)]):
    """
    获取用户列表,需要通过 JWT 或 Basic Auth 认证。
    """
    # 假设您需要根据用户名进行某些操作,例如权限检查
    print(f"Authenticated user: {logged_username}") 
    query_users = db.query(User).all()
    return query_users

关键点:

  • jwt_username: Annotated[Optional[str], Depends(jwt_logged_user)] 和 basic_username: Annotated[Optional[str], Depends(basic_logged_user)]:接收两个子认证依赖的结果,它们现在可以是 None。
  • if jwt_username: return jwt_username:如果 JWT 认证成功(返回了用户名),则直接返回该用户名。
  • if basic_username: return basic_username:如果 JWT 认证失败但 Basic Auth 成功,则返回 Basic Auth 的用户名。
  • raise HTTPException(...):只有当所有子认证都失败时,才由 auth_user 统一抛出 401 异常。这样确保了只有在没有任何有效凭据的情况下才拒绝访问。
  • headers={"WWW-Authenticate": "Basic, Bearer"}:在 401 响应头中明确告知客户端支持的认证方式。

完整示例代码结构

为了更好地理解,以下是一个整合了上述修改的 FastAPI 应用片段:

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials, OAuth2PasswordBearer
from sqlalchemy.orm import Session
from typing import Annotated, Optional, Union
import secrets
from jose import JWTError, jwt # 假设使用 jose 库

# 假设这些是您的配置和模型
class Settings:
    SESSION_LOGIN_USER = "admin"
    SESSION_LOGIN_PASS = "securepassword"
    JWT_SECRET_KEY = "your-super-secret-jwt-key" # 替换为实际的密钥
    JWT_ALGORITHM = "HS256"

settings = Settings()

# 假设 db_session 是您的数据库会话依赖
# 假设 User 是您的 SQLAlchemy 用户模型
class User: # 简化示例
    def __init__(self, username):
        self.username = username
    def __repr__(self):
        return f""

class MockDBSession: # 模拟数据库会话
    def query(self, model):
        return self
    def filter(self, *args, **kwargs):
        return self
    def first(self):
        return User("testuser") # 模拟返回一个用户
    def all(self):
        return [User("user1"), User("user2")]

def get_db():
    db = MockDBSession()
    try:
        yield db
    finally:
        pass # 实际应用中会关闭会话

db_session = Depends(get_db)

# JWT 相关的工具类
class Utils:
    OAuth2_scheme = OAuth2PasswordBearer(tokenUrl="/token", auto_error=False)
    SECRET_KEY = settings.JWT_SECRET_KEY
    ALGORITHM = settings.JWT_ALGORITHM

    def verify_token(self, token: str) -> dict:
        try:
            payload = jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM])
            username: str = payload.get("sub")
            if username is None:
                raise JWTError("Invalid token payload: no subject")
            return {"username": username}
        except JWTError as e:
            # 在这里可以记录错误,但不再抛出 HTTPException
            raise e # 重新抛出 JWTError,由 jwt_logged_user 捕获
        except Exception as e:
            raise e

utils = Utils()

router = APIRouter()

# --- 独立认证依赖项 ---

# Basic Auth
security = HTTPBasic(auto_error=False)

def basic_logged_user(credentials: Annotated[Optional[HTTPBasicCredentials], Depends(security)]) -> Optional[str]:
    if credentials is None:
        return None

    current_username_bytes = credentials.username.encode("utf8")
    correct_username_bytes = settings.SESSION_LOGIN_USER.encode("utf8")
    is_correct_username = secrets.compare_digest(
        current_username_bytes, correct_username_bytes
    )
    current_password_bytes = credentials.password.encode("utf8")
    correct_password_bytes = settings.SESSION_LOGIN_PASS.encode("utf8")
    is_correct_password = secrets.compare_digest(
        current_password_bytes, correct_password_bytes
    )

    if not (is_correct_username and is_correct_password):
        return None

    return credentials.username

# JWT Auth
def jwt_logged_user(token: Annotated[Optional[str], Depends(utils.OAuth2_scheme)],
                    db: Session = Depends(db_session)) -> Optional[str]:
    if token is None:
        return None

    try:
        payload = utils.verify_token(token)
        username = payload.get("username")

        if username is None:
            return None

        user = db.query(User).filter(User.username == username).first()
        if user is None:
            return None

        return user.username
    except JWTError:
        return None
    except Exception:
        return None

# --- 组合认证依赖项 ---

def auth_user(jwt_username: Annotated[Optional[str], Depends(jwt_logged_user)],
              basic_username: Annotated[Optional[str], Depends(basic_logged_user)]) -> str:
    if jwt_username:
        return jwt_username
    if basic_username:
        return basic_username

    raise HTTPException(
        status_code=status.

相关专题

更多
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API

Python FastAPI 异步开发利用 async/await 关键字,通过定义异步视图函数、使用异步数据库库 (如 databases)、异步 HTTP 客户端 (如 httpx),并结合后台任务队列(如 Celery)和异步依赖项,实现高效的 I/O 密集型 API,显著提升吞吐量和响应速度,尤其适用于处理数据库查询、网络请求等耗时操作,无需阻塞主线程。

27

2025.12.22

if什么意思
if什么意思

if的意思是“如果”的条件。它是一个用于引导条件语句的关键词,用于根据特定条件的真假情况来执行不同的代码块。本专题提供if什么意思的相关文章,供大家免费阅读。

737

2023.08.22

登录token无效
登录token无效

登录token无效解决方法:1、检查token的有效期限,如果token已经过期,需要重新获取一个新的token;2、检查token的签名,如果签名不正确,需要重新获取一个新的token;3、检查密钥的正确性,如果密钥不正确,需要重新获取一个新的token;4、使用HTTPS协议传输token,建议使用HTTPS协议进行传输 ;5、使用双因素认证,双因素认证可以提高账户的安全性。

6084

2023.09.14

登录token无效怎么办
登录token无效怎么办

登录token无效的解决办法有检查Token是否过期、检查Token是否正确、检查Token是否被篡改、检查Token是否与用户匹配、清除缓存或Cookie、检查网络连接和服务器状态、重新登录或请求新的Token、联系技术支持或开发人员等。本专题为大家提供token相关的文章、下载、课程内容,供大家免费下载体验。

803

2023.09.14

token怎么获取
token怎么获取

获取token值的方法:1、小程序调用“wx.login()”获取 临时登录凭证code,并回传到开发者服务器;2、开发者服务器以code换取,用户唯一标识openid和会话密钥“session_key”。想了解更详细的内容,可以阅读本专题下面的文章。

1059

2023.12.21

token什么意思
token什么意思

token是一种用于表示用户权限、记录交易信息、支付虚拟货币的数字货币。可以用来在特定的网络上进行交易,用来购买或出售特定的虚拟货币,也可以用来支付特定的服务费用。想了解更多token什么意思的相关内容可以访问本专题下面的文章。

1220

2024.03.01

http500解决方法
http500解决方法

http500解决方法有检查服务器日志、检查代码错误、检查服务器配置、检查文件和目录权限、检查资源不足、更新软件版本、重启服务器或寻求专业帮助等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

336

2023.11.09

http请求415错误怎么解决
http请求415错误怎么解决

解决方法:1、检查请求头中的Content-Type;2、检查请求体中的数据格式;3、使用适当的编码格式;4、使用适当的请求方法;5、检查服务器端的支持情况。更多http请求415错误怎么解决的相关内容,可以阅读下面的文章。

406

2023.11.14

Java 桌面应用开发(JavaFX 实战)
Java 桌面应用开发(JavaFX 实战)

本专题系统讲解 Java 在桌面应用开发领域的实战应用,重点围绕 JavaFX 框架,涵盖界面布局、控件使用、事件处理、FXML、样式美化(CSS)、多线程与UI响应优化,以及桌面应用的打包与发布。通过完整示例项目,帮助学习者掌握 使用 Java 构建现代化、跨平台桌面应用程序的核心能力。

36

2026.01.14

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Go 教程
Go 教程

共32课时 | 3.7万人学习

Go语言实战之 GraphQL
Go语言实战之 GraphQL

共10课时 | 0.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号