
本文详解如何使用 asyncio 和 telnetlib3 构建真正非阻塞的 tcp 客户端,避免 `time.sleep()` 阻塞事件循环,并通过异步任务分离数据接收与主逻辑,实现在持续监听服务端广播的同时安全执行其他操作。
在 Python 3.12(及通用 asyncio 环境)中构建非阻塞 TCP 客户端的关键,是彻底摒弃同步阻塞调用(如 time.sleep()),并正确组织协程生命周期:让网络读取在后台异步运行,而主流程可自由执行其他逻辑,二者通过 asyncio.create_task() 协同,而非轮询或错误地“并发启动但不等待”。
下面是一个结构清晰、生产就绪的实现方案:
✅ 正确范式:任务解耦 + 异步生成器
import asyncio
from telnetlib3 import open_connection
TCP_SERVER_ADDRESS = "127.0.0.7"
TCP_SERVER_PORT = 8000
async def read_line_by_line(reader):
"""逐行读取响应,支持 '\r' 或 '\n' 结尾,返回解码后的字符串"""
buffer = ""
while True:
byte = await reader.read(1)
if not byte: # 连接关闭
break
try:
char = byte.decode("utf-8")
except UnicodeDecodeError:
continue # 跳过非法字节(常见于二进制协议)
buffer += char
if char in ("\r", "\n"):
if buffer.strip(): # 忽略空行
yield buffer.rstrip("\r\n")
buffer = ""
async def handle_incoming(reader, max_lines=10):
"""后台任务:接收指定数量的行并打印"""
count = 0
async for line in read_line_by_line(reader):
print(f"✅ reply: {line}")
count += 1
if count >= max_lines:
print(f"⏹️ 已接收 {max_lines} 行,自动停止监听")
break
async def main():
# 1️⃣ 建立异步连接
reader, writer = await open_connection(TCP_SERVER_ADDRESS, TCP_SERVER_PORT)
# 2️⃣ 发送初始化指令(注意:telnet 协议需确保写入后 flush)
writer.write("$SYS,INFO\r\n") # 显式添加换行符更可靠
await writer.drain() # 确保数据已发出
# 3️⃣ 启动后台监听任务(非阻塞!)
listen_task = asyncio.create_task(handle_incoming(reader, max_lines=10))
# 4️⃣ 主循环:执行其他业务逻辑(如状态检查、UI 更新、定时任务等)
while not listen_task.done():
print("⚙️ 正在执行其他任务(例如日志记录、传感器采样...)")
await asyncio.sleep(0.8) # ✅ 使用 asyncio.sleep —— 不阻塞事件循环
# 5️⃣ 清理:等待任务完成并关闭连接
await listen_task
writer.close()
await writer.wait_closed()
print("? 主程序退出,连接已关闭")
if __name__ == "__main__":
asyncio.run(main())⚠️ 关键注意事项
- 永远不要在协程中使用 time.sleep():它会冻结整个事件循环,导致 reader.read() 永远无法被调度,造成“卡死”假象。
- writer.write() 后务必调用 await writer.drain():否则数据可能滞留在缓冲区未真正发送,尤其对无响应握手的广播服务至关重要。
- 字符编码需显式处理:reader.read(1) 返回 bytes,必须 .decode();建议捕获 UnicodeDecodeError 以增强鲁棒性(如服务端发送非 UTF-8 数据)。
- 资源清理不可省略:writer.close() + await writer.wait_closed() 是优雅关闭连接的标准流程。
- 取消任务要谨慎:直接 task.cancel() 可能引发 CancelledError,若需中断,应在 handle_incoming 内部设计退出条件(如本例的 max_lines),更安全可控。
? 扩展建议:支持动态控制与重连
若需长期运行并应对网络波动,可进一步封装为类,集成心跳检测、自动重连和回调注册机制:
class NonBlockingTCPClient:
def __init__(self, host, port, on_message=None):
self.host = host
self.port = port
self.on_message = on_message or (lambda msg: print("?", msg))
self._task = None
self._reader = None
self._writer = None
async def start(self):
while True:
try:
self._reader, self._writer = await open_connection(self.host, self.port)
self._writer.write("$SYS,INFO\r\n")
await self._writer.drain()
self._task = asyncio.create_task(self._listen_loop())
await self._task
except (ConnectionRefusedError, OSError) as e:
print(f"⚠️ 连接失败: {e},5秒后重试...")
await asyncio.sleep(5)
finally:
if self._writer:
self._writer.close()
await self._writer.wait_closed()
async def _listen_loop(self):
async for line in read_line_by_line(self._reader):
self.on_message(line)这种设计使客户端具备工业级健壮性,同时保持主逻辑完全解耦——这才是 asyncio “非阻塞”的本质:让 I/O 等待不占用 CPU,让协作式并发真正服务于业务复杂度,而非制造新问题。
立即学习“Python免费学习笔记(深入)”;










