
本文详细阐述了如何在fastapi应用中,利用其`lifespan`事件管理器,高效且优雅地集成多个异步tcp服务器。通过正确使用`asyncio.create_task`在应用启动时启动后台服务,并在应用关闭时实现这些服务的平滑终止,确保fastapi与自定义tcp服务在同一个事件循环中协同工作,实现数据从tcp到websocket的无缝转发。
在构建现代异步应用时,我们常常需要将Web服务(如基于FastAPI)与自定义的后台服务(如TCP服务器)结合起来。FastAPI以其高性能和异步特性而闻名,而Python的asyncio库则为构建并发网络应用提供了强大的支持。本文将探讨如何在同一个Python进程和事件循环中,无缝地运行一个FastAPI应用和多个异步TCP服务器,并实现数据在它们之间的流转,例如将TCP接收到的数据通过WebSocket广播给客户端。
FastAPI提供了lifespan事件管理器,这是一个基于contextlib.asynccontextmanager的强大工具,用于在应用程序启动和关闭时执行异步操作。其核心在于yield关键字:
常见误区:将需要持续运行的后台任务的启动逻辑放置在yield之后。这样做会导致任务仅在应用程序关闭时才尝试启动,而非在应用程序运行期间。
为了让TCP服务器与FastAPI应用同时运行,并共享同一个事件循环,我们需要遵循以下策略:
以下是根据上述策略修改后的代码,包括server.py, globals.py, websocket_manager.py 和 main.py。
# websocket_manager.py
from fastapi import WebSocket
from typing import List
class WebSocketManager:
    """
    管理活跃的WebSocket连接,并提供广播功能。
    """
    def __init__(self):
        self.active_connections: List[WebSocket] = []
    async def connect(self, websocket: WebSocket):
        """建立WebSocket连接并将其添加到活跃连接列表。"""
        await websocket.accept()
        self.active_connections.append(websocket)
    def disconnect(self, websocket: WebSocket):
        """从活跃连接列表中移除断开的WebSocket连接。"""
        if websocket in self.active_connections:
            self.active_connections.remove(websocket)
    async def broadcast(self, data: str):
        """向所有活跃的WebSocket连接广播数据。"""
        # 遍历时创建一个副本以避免在迭代过程中修改列表
        for connection in list(self.active_connections):
            try:
                await connection.send_text(data)
            except Exception as e:
                print(f"Error broadcasting to WebSocket: {e}. Disconnecting...")
                self.disconnect(connection) # 广播失败则断开连接# globals.py
import threading
from websocket_manager import WebSocketManager
# 示例:全局数据存储和锁(当前示例中未使用,但保留结构)
data_storage = {}
data_lock = threading.Lock() # 注意:在asyncio环境中,通常应使用asyncio.Lock
# WebSocket管理器实例,供其他模块访问
websocket_manager = WebSocketManager()# server.py
import asyncio
import globals
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    """
    处理单个TCP客户端连接。
    从客户端读取数据,并通过WebSocketManager广播。
    """
    peername = writer.get_extra_info('peername')
    print(f"TCP client connected from {peername}")
    try:
        while True:
            data = await reader.read(1024) # 读取最多1024字节
            if not data:
                print(f"TCP client {peername} disconnected.")
                break
            # 将接收到的原始数据解码为UTF-8字符串并广播
            message = data.decode('utf-8', errors='ignore')
            print(f"Received from TCP {peername}: {message}")
            await globals.websocket_manager.broadcast(message)
    except asyncio.CancelledError:
        print(f"TCP client handler for {peername} cancelled.")
    except Exception as e:
        print(f"Error handling TCP client {peername}: {e}")
    finally:
        writer.close()
        await writer.wait_closed()
        print(f"TCP client writer for {peername} closed.")
async def run_tcp_server_task(port: int):
    """
    启动一个TCP服务器,并在事件循环中运行。
    此函数设计为可取消的后台任务。
    """
    server = None
    try:
        print(f"Starting TCP server on 0.0.0.0:{port}...")
        server = await asyncio.start_server(handle_client, '0.0.0.0', port)
        async with server:
            await server.serve_forever() # 阻塞直到任务被取消
    except asyncio.CancelledError:
        print(f"TCP server on port {port} task cancelled.")
    except Exception as e:
        print(f"Error in TCP server on port {port}: {e}")
    finally:
        if server:
            server.close() # 关闭服务器套接字
            await server.wait_closed() # 等待服务器完全关闭
            print(f"TCP server on port {port} closed.")# main.py
from fastapi import FastAPI, WebSocket
import asyncio
from contextlib import asynccontextmanager
import globals # 导入全局变量
from server import run_tcp_server_task # 导入TCP服务器启动函数
@asynccontextmanager
async def startup_event(app: FastAPI):
    """
    FastAPI应用的生命周期事件管理器。
    在应用启动时启动TCP服务器,在应用关闭时停止它们。
    """
    print("Application startup: Initializing and starting background tasks...")
    # 定义需要启动的TCP服务器端口
    ports = [8001, 8002, 8003]
    # 为每个TCP服务器创建一个后台任务
    # 这些任务会在当前事件循环中并发运行
    tcp_server_tasks = [asyncio.create_task(run_tcp_server_task(port)) for port in ports]
    # `yield` 标志着应用启动完成,可以开始处理请求
    yield
    # `yield` 之后的部分在应用关闭时执行
    print("Application shutdown: Stopping background tasks...")
    # 取消所有TCP服务器任务
    for task in tcp_server_tasks:
        task.cancel()
    # 等待所有任务完成取消和清理工作
    # `return_exceptions=True` 确保即使有任务取消失败也不会阻塞其他任务
    await asyncio.gather(*tcp_server_tasks, return_exceptions=True)
    print("All background tasks stopped gracefully.")
# 使用lifespan事件管理器创建FastAPI应用
app = FastAPI(lifespan=startup_event)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """
    FastAPI的WebSocket端点。
    管理WebSocket连接,并在连接断开时进行清理。
    """
    print("WebSocket connection established.")
    await globals.websocket_manager.connect(websocket)
    try:
        # 保持WebSocket连接活跃,或处理来自客户端的WebSocket消息
        # 在此示例中,我们只接收消息以保持连接开放,实际应用中可能需要处理消息
        while True:
            await websocket.receive_text()
    except Exception as e:
        print(f"WebSocket Error: {e}")
    finally:
        globals.websocket_manager.disconnect(websocket)
        print("WebSocket connection closed.")
# 运行应用:uvicorn main:app --reloadlifespan的正确使用:
TCP服务器的优雅关闭:
WebSocketManager:
全局变量 (globals.py):
运行应用:
通过本文的指导,我们学习了如何利用FastAPI的lifespan事件管理器,在同一个事件循环中有效地运行FastAPI应用和多个异步TCP服务器。关键在于理解yield的语义,并使用asyncio.create_task来调度后台任务,同时实现任务的优雅启动和关闭。这种模式对于构建需要集成多种网络服务类型的复杂异步应用至关重要,它确保了资源的有效利用和应用的健壮性。
以上就是运行异步TCP服务器与FastAPI:统一事件循环下的应用集成的详细内容,更多请关注php中文网其它相关文章!
 
                        
                        每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
 
                Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号